【RabbitMQ】高级特性—死信队列详解
文章目录
- 死信的概念
- 代码示例
- 1. 声明队列和交换机
- 2. 正常队列绑定死信交换机
- 3. 制造死信产生的条件
- 4. 发送消息
- 5. 测试死信
- 达到过期时间
- 超出长度
- 消息拒收
- 常见面试题
- 1. 死信队列的概念
- 2. 死信的来源
- 3. 死信队列的应用场景
死信的概念
死信(dead message
)简单理解就是因为种种原因,无法被消费的信息,就是死信
有死信,自然就有死信队列。当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器就是 DLX
(Dead Letter Exchange
),绑定 DLX
的队列,就称为死信对类(Dead Letter Queue
,简称 DLQ
)
消息变成死信一般是由于以下几种情况:
- 消息被拒绝(
Basic.Reject/Basic.Nack
),并设置requeue
参数为false
- 消息过期
- 队列达到最大长度
代码示例
1. 声明队列和交换机
包含两部分:
- 声明正常的队列和正常的交换机
- 声明死信队列和死信交换机
死信交换机和死信队列和普通的交换机,队列没有区别
// 1. 正常部分
// 正常交换机
@Bean("normalExchange")
public Exchange normalExchange() { return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();
} // 正常队列
@Bean("normalQueue")
public Queue normalQueue() { return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
} // 正常队列和交换机绑定
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
} // 2. 死信部分
// 死信交换机
@Bean("dlxExchange")
public Exchange dlxExchange() { return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();
} // 死信队列
@Bean("dlxQueue")
public Queue ndlxQueue() { return QueueBuilder.durable(Constant.DLX_QUEUE).build();
} // 死信队列和交换机绑定
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
2. 正常队列绑定死信交换机
当这个队列中存在死信时,RabbitMQ
会自动地把这个消息发布到设置的 DLX
上,进而被路由到另一个队列,即死信队列
- 可以监听这个死信队列中的消息以进行相应的处理
@Bean("normalQueue")
public Queue normalQueue() { Map<String, Object> arguments = new HashMap<>(); // 绑定死信队列 arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME); // 设置发送给死信队列的 RoutingKey arguments.put("x-dead-letter-routing-key", "dlx"); return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}
简写为:
@Bean("normalQueue") public Queue normalQueue() { return QueueBuilder.durable(Constant.NORMAL_QUEUE) .deadLetterExchange(Constant.DLX_EXCHANGE_NAME) .deadLetterRoutingKey("dlx").build(); }
3. 制造死信产生的条件
@Bean("normalQueue")
public Queue normalQueue() { Map<String, Object> arguments = new HashMap<>(); // 1. 绑定死信队列 arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME); // 设置发送给死信队列的 RoutingKey arguments.put("x-dead-letter-routing-key", "dlx"); // 2. 制造死信产生的条件 arguments.put("x-message-ttl", 10000); // 10 秒过期 arguments.put("x-max-length", 10); // 队列长度 return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}
简写为:
return QueueBuilder.durable(Constant.NORMAL_QUEUE) .deadLetterExchange(Constant.DLX_EXCHANGE_NAME) .deadLetterRoutingKey("dlx") .ttl(10*1000) .maxLength(10L) .build();
4. 发送消息
@RequestMapping("/dlx")
public void dlx() { // 测试过期时间,当时间到达 TTL,消息自动进入到死信队列 rabbitTemplate.convertAndSend(Constant.DLX_EXCHANGE_NAME, "normal", "dlx test..."); // 测试队列长度 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend(Constant.DLX_EXCHANGE_NAME, "normal", "dlx test..."); } // 测试消息拒收 rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
5. 测试死信
程序启动之后,观察队列
队列 Features
说明:
D
:durable
的缩写,设置持久化TTL
:Time to Live
,队列设置了TTL
Lim
:队列设置了长度(x-max-length
)DLX
:队列设置了死信交换机(x-dead-letter-exchange
)DLK
:队列设置了死信RoutingKey
(x-dead-letter-routing-key
)
达到过期时间
测试过期时间,到达过期时间之后,进入死信队列
调用接口,发送消息:
发送之后:
10 秒后,消息进入到死信队列
生产者首先发送一条消息,然后经过见换气(normal_exchange
)顺利地存储到队列(normal_queue
)中
- 由于队列
normal_queue
设置了过期时间为 10 秒,在这10s
内没有消费者消费这条消息,那么判定这条消息过期 - 由于设置了
DLX
,过期之后,消息会被丢给交换器(dlx_exchange
)中,这时根据RoutingKey
匹配,找到匹配的队列(dlx_queue
),最后消息被存储在queue.dlx
这个死信队列中
超出长度
测试达到队列长度,消息进入死信队列
队列长度设置为 10,我们发送 20 条数据,会有 10 条数据直接进入到死信队列
发送 20 条消息
// 测试队列长度
for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
运行后:
过期之后,正常队列的 10 条消息也会进入到死信队列
消息拒收
测试消息拒收
写消费者代码,并强制异常,测试拒绝签收
@Component
public class DlxQueueListener { // 指定监听队列的名称 @RabbitListener(queues = Constant.NORMAL_QUEUE) public void ListenerQueue(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); // 模拟处理失败 int num = 3 / 0; System.out.println("处理完成"); // 3. 手动签收 channel.basicAck(deliveryTag, true); } catch (Exception e) { // 4. 异常了就拒绝签收 Thread.sleep(1000); // 第三个参数 requeue,是否重新发送。true,会重新发送;false,不会重新发送 channel.basicNack(deliveryTag, true, false); } } // 指定监听队列的名称 @RabbitListener(queues = Constant.DLX_QUEUE) public void ListenerDLXQueue(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); }
}
发送消息
常见面试题
#高频面试
死信队列作为 RabbitMQ
的高级特征,也是面试的一大重点
1. 死信队列的概念
死信(Dead Letter
)是消息队列中的一种特殊消息,它指的是那些无法正常消费或处理的消息。
在消息队列系统中,如 RabbitMQ
,死信队列用于存储这些死信信息
2. 死信的来源
- 消息过期:消息在队列中存活的时间超过了设定的
TTL
- 消息被拒绝:消费者在处理消息时,可能因为消息内容错误,处理逻辑异常等原因,拒绝处理该信息。如果拒绝时指定不重新入队(
requeue=false
),消息也会成为死信 - 队列满了:当队列达到最大长度,无法再容纳新的消息时,新来的消息会被处理为死信
3. 死信队列的应用场景
对于 RabbitMQ
来说,死信队列是一个非常有用的特性。
它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统
场景的应用还有:
- 消息重试:将死信消息重新发送到原队列或另一个队列进行重试处理
- 消息丢失:直接丢这些无法处理的消息,以避免他们占用系统资源
- 日志收集:将死信消息作为日志收集起来,用于后续分析和问题定位