什么是死信队列?死信队列是如何导致的?
死信交换机(Dead Letter Exchange,DLX)
- 定义:死信交换机是一种特殊的交换机,专门用于**接收从其他队列中因特定原因变成死信的消息**。它的本质还是交换机,遵循RabbitMQ中交换机的基本工作原理,如根据路由规则将消息发送到绑定的队列。
- 作用:为死信提供一个集中处理的入口点。通过将死信发送到死信交换机,再由其路由到相应的死信队列,可以方便地对这些异常消息进行统一管理和处理,确保数据不丢失。
死信队列(Dead Letter Queue,DLQ)
- 定义:死信队列用于存储那些无法在正常流程中被消费的消息,即死信。这些消息进入死信队列后,可以后续进行分析、重试或其他特殊处理。
- 产生死信的原因:
- 消息被拒绝且不重新入队:消费者调用
basic.reject
或basic.nack
方法拒绝消息,并将requeue
参数设置为false
,表明该消息不再重新放回原队列等待消费,从而成为死信。 - 消息过期:可以为消息或队列设置生存时间(TTL,Time-To-Live)。当消息在队列中的存活时间超过设定的TTL值时,消息就会过期成为死信。消息的TTL既可以在发送消息时针对单条消息设置,也可以在声明队列时对队列中的所有消息统一设置。
- 队列达到最大长度:当为队列设置了最大长度(
Max-Length
),并且队列中的消息数量达到这个上限时,新进入的消息会被丢弃成为死信。
- 消息被拒绝且不重新入队:消费者调用
代码举例
下面将用代码举例,由于消息过期而进入死信队列
初始化RabbitMQ的连接配置、队列和交换机的声明
/*** RabbitMQ配置类* 负责管理RabbitMQ的连接配置、队列和交换机的声明*/
@Slf4j
public class RabbitMQConfig {// 普通队列和死信队列的配置常量public static final String NORMAL_QUEUE = "normal.queue"; // 普通队列名称public static final String DLX_QUEUE = "dlx.queue"; // 死信队列名称public static final String NORMAL_EXCHANGE = "normal.exchange"; // 普通交换机名称public static final String DLX_EXCHANGE = "dlx.exchange"; // 死信交换机名称public static final String NORMAL_ROUTING_KEY = "normal.routing.key"; // 普通路由键public static final String DLX_ROUTING_KEY = "dlx.routing.key"; // 死信路由键/*** 创建RabbitMQ连接** @return Connection RabbitMQ连接对象* @throws Exception*/public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxx"); // 设置RabbitMQ服务器地址factory.setPort(5672); // 设置RabbitMQ服务器端口factory.setUsername("xxxx"); // 设置用户名factory.setPassword("xxxx"); // 设置密码return factory.newConnection(); // 创建并返回新的连接}/*** 初始化RabbitMQ的队列和交换机* 包括:* 1. 删除已存在的队列和交换机* 2. 声明死信交换机和队列* 3. 声明普通交换机和队列* 4. 设置队列的死信参数* 5. 绑定队列和交换机** @throws Exception*/public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 删除已存在的队列和交换机try {channel.queueDelete(NORMAL_QUEUE);channel.queueDelete(DLX_QUEUE);channel.exchangeDelete(NORMAL_EXCHANGE);channel.exchangeDelete(DLX_EXCHANGE);} catch (Exception e) {// 忽略删除不存在的队列或交换机时的错误log.warn("删除队列或交换机时出错(可能是首次创建): {}", e.getMessage());}// 声明死信交换机,类型为direct,持久化channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);// 声明死信队列,持久化channel.queueDeclare(DLX_QUEUE, true, false, false, null);// 将死信队列绑定到死信交换机,使用死信路由键channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);// 声明普通交换机,类型为direct,持久化channel.exchangeDeclare(NORMAL_EXCHANGE, "direct", true);// 设置普通队列的死信参数Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 设置死信交换机args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键// 声明普通队列,并应用死信参数channel.queueDeclare(NORMAL_QUEUE, true, false, false, args);// 将普通队列绑定到普通交换机,使用普通路由键channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);}}
}
消息生产者
/*** 消息生产者类* 负责向RabbitMQ发送消息*/
@Slf4j
public class MessageProducer {/*** 发送消息到普通队列* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 将消息发布到普通交换机* 3. 使用try-with-resources自动关闭连接和通道* * @param message 要发送的消息内容* @throws Exception */public void sendMessage(String message) throws Exception {// 使用try-with-resources自动管理连接和通道的关闭try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 打印发送的消息内容log.info("发送消息: {}", message);// 发布消息到普通交换机// 参数说明:// 1. 交换机名称// 2. 路由键// 3. 消息属性(这里为null表示使用默认属性)// 4. 消息内容(转换为字节数组)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,null,message.getBytes());}}/*** 发送带TTL的消息到普通队列* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置消息的TTL属性* 3. 将消息发布到普通交换机* 4. 使用try-with-resources自动关闭连接和通道* * @param message 要发送的消息内容* @param ttl 消息的过期时间(毫秒)* @throws Exception 如果发送过程中出现错误则抛出异常*/public void sendMessageWithTTL(String message, int ttl) throws Exception {// 使用try-with-resources自动管理连接和通道的关闭try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 设置消息属性,包括TTLAMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(ttl)).build();// 打印发送的消息内容log.info("发送消息: {}, TTL: {}ms", message, ttl);// 发布消息到普通交换机// 参数说明:// 1. 交换机名称// 2. 路由键// 3. 消息属性(包含TTL)// 4. 消息内容(转换为字节数组)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,properties,message.getBytes());}}
}
消息消费者
/*** 消息消费者类* 负责从普通队列和死信队列中消费消息*/
@Slf4j
public class MessageConsumer {/*** 消费普通队列中的消息* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置预取计数为1,确保公平分发* 3. 创建消费者回调处理消息* 4. 确认消息处理完成** @throws Exception 异常*/public void consumeNormalQueue() throws Exception {// 创建RabbitMQ连接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1,确保公平分发,避免某个消费者处理过多消息channel.basicQos(1);// 创建普通队列消费者回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取消息内容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到普通队列消息: {}", message);// 模拟消息处理耗时try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 确认消息处理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费普通队列// 参数说明:// 1. 队列名称// 2. 是否自动确认消息(false表示手动确认)// 3. 消息处理回调// 4. 消费者取消回调(这里为空实现)channel.basicConsume(RabbitMQConfig.NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}/*** 消费死信队列中的消息* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置预取计数为1,确保公平分发* 3. 创建消费者回调处理消息* 4. 确认消息处理完成** @throws Exception 异常*/public void consumeDlxQueue() throws Exception {// 创建RabbitMQ连接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1,确保公平分发channel.basicQos(1);// 创建死信队列消费者回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取消息内容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到死信队列消息: {}", message);// 确认消息处理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费死信队列// 参数说明:// 1. 队列名称// 2. 是否自动确认消息(false表示手动确认)// 3. 消息处理回调// 4. 消费者取消回调(这里为空实现)channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {});}
}
测试
@Slf4j
public class DLXTest {private static final int THREAD_COUNT = 1; // 并发线程数private static final int MESSAGE_COUNT = 2; // 每个线程发送的消息数/*** 主方法,执行死信队列测试流程* 测试流程:* 1. 初始化RabbitMQ的队列和交换机* 2. 创建生产者和消费者实例* 3. 启动普通队列和死信队列的消费者线程* 4. 使用线程池发送测试消息* 5. 等待消息处理完成** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 初始化RabbitMQ的队列和交换机RabbitMQConfig.init();// 创建生产者和消费者实例MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 启动普通队列消费者线程new Thread(() -> {try {consumer.consumeNormalQueue();} catch (Exception e) {log.error("普通队列消费者异常", e);}}).start();// 启动死信队列消费者线程new Thread(() -> {try {consumer.consumeDlxQueue();} catch (Exception e) {log.error("死信队列消费者异常", e);}}).start();// 创建线程池和计数器ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);CountDownLatch latch = new CountDownLatch(THREAD_COUNT);// 提交任务到线程池for (int i = 0; i < THREAD_COUNT; i++) {final int threadId = i;executorService.submit(() -> {try {// 每个线程发送MESSAGE_COUNT条消息for (int j = 0; j < MESSAGE_COUNT; j++) {// 随机生成消息TTL(1-30秒)int ttl = (int) (Math.random() * 30000) + 1000;String message = String.format("消息-线程%d-第%d条 (消息TTL: %dms)", threadId + 1, j + 1, ttl);producer.sendMessageWithTTL(message, ttl);// 随机延迟0-100ms,模拟真实场景Thread.sleep((long) (Math.random() * 100));}} catch (Exception e) {log.error("发送消息异常", e);} finally {latch.countDown();}});}// 等待所有消息发送完成latch.await();log.info("所有消息已发送完成");// 关闭线程池executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);// 保持程序运行,等待消息处理完成Thread.sleep(60000);log.info("测试完成");}
}
从结果可以看出,第一条消息 ttl 为 28301ms,被普通消费者进行消费,而产生的第二条消息得到 ttl 为 4332ms,由于第一条消息在消费时耗时较久,在此期间 第二条消息已经过期,不得不进入死信队列,由死信消费者进行处理,从前面的日志时间也可以看出,刚好间隔 4s 左右。