当前位置: 首页 > news >正文

【RabbitMQ】高级特性—死信队列详解

文章目录

    • 死信的概念
    • 代码示例
      • 1. 声明队列和交换机
      • 2. 正常队列绑定死信交换机
      • 3. 制造死信产生的条件
      • 4. 发送消息
      • 5. 测试死信
        • 达到过期时间
        • 超出长度
        • 消息拒收
    • 常见面试题
      • 1. 死信队列的概念
      • 2. 死信的来源
      • 3. 死信队列的应用场景

死信的概念

死信(dead message)简单理解就是因为种种原因,无法被消费的信息,就是死信

有死信,自然就有死信队列。当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器就是 DLXDead Letter Exchange),绑定 DLX 的队列,就称为死信对类(Dead Letter Queue,简称 DLQimage.png

消息变成死信一般是由于以下几种情况:

  1. 消息被拒绝(Basic.Reject/Basic.Nack),并设置 requeue 参数为 false
  2. 消息过期
  3. 队列达到最大长度

代码示例

1. 声明队列和交换机

包含两部分:

  • 声明正常的队列和正常的交换机
  • 声明死信队列和死信交换机

死信交换机和死信队列和普通的交换机,队列没有区别

// 1. 正常部分  
// 正常交换机  
@Bean("normalExchange")  
public Exchange normalExchange() {  return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();  
}  // 正常队列  
@Bean("normalQueue")  
public Queue normalQueue() {  return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();  
}  // 正常队列和交换机绑定  
@Bean("normalBinding")  
public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {  return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();  
}  // 2. 死信部分  
// 死信交换机  
@Bean("dlxExchange")  
public Exchange dlxExchange() {  return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();  
}  // 死信队列  
@Bean("dlxQueue")  
public Queue ndlxQueue() {  return QueueBuilder.durable(Constant.DLX_QUEUE).build();  
}  // 死信队列和交换机绑定  
@Bean("dlxBinding")  
public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue) {  return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();  
}

2. 正常队列绑定死信交换机

当这个队列中存在死信时,RabbitMQ 会自动地把这个消息发布到设置的 DLX 上,进而被路由到另一个队列,即死信队列

  • 可以监听这个死信队列中的消息以进行相应的处理
@Bean("normalQueue")  
public Queue normalQueue() {  Map<String, Object> arguments = new HashMap<>();  // 绑定死信队列  arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);  // 设置发送给死信队列的 RoutingKey    arguments.put("x-dead-letter-routing-key", "dlx");  return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();  
}

简写为:

 @Bean("normalQueue")  public Queue normalQueue() {   return QueueBuilder.durable(Constant.NORMAL_QUEUE)  .deadLetterExchange(Constant.DLX_EXCHANGE_NAME)  .deadLetterRoutingKey("dlx").build();  }

3. 制造死信产生的条件

@Bean("normalQueue")  
public Queue normalQueue() {  Map<String, Object> arguments = new HashMap<>();  // 1. 绑定死信队列  arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);  // 设置发送给死信队列的 RoutingKey    arguments.put("x-dead-letter-routing-key", "dlx");  // 2. 制造死信产生的条件  arguments.put("x-message-ttl", 10000); // 10 秒过期  arguments.put("x-max-length", 10);  // 队列长度  return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();  
}

简写为

return QueueBuilder.durable(Constant.NORMAL_QUEUE)  .deadLetterExchange(Constant.DLX_EXCHANGE_NAME)  .deadLetterRoutingKey("dlx")  .ttl(10*1000)  .maxLength(10L)  .build();

4. 发送消息

@RequestMapping("/dlx")  
public void dlx() {  // 测试过期时间,当时间到达 TTL,消息自动进入到死信队列  rabbitTemplate.convertAndSend(Constant.DLX_EXCHANGE_NAME, "normal", "dlx test...");  // 测试队列长度  for (int i = 0; i < 20; i++) {  rabbitTemplate.convertAndSend(Constant.DLX_EXCHANGE_NAME, "normal", "dlx test...");  }  // 测试消息拒收  rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");  
}

5. 测试死信

程序启动之后,观察队列
image.png

队列 Features 说明:

  • Ddurable 的缩写,设置持久化
  • TTLTime to Live,队列设置了 TTL
  • Lim:队列设置了长度(x-max-length
  • DLX:队列设置了死信交换机(x-dead-letter-exchange
  • DLK:队列设置了死信 RoutingKeyx-dead-letter-routing-key
达到过期时间

测试过期时间,到达过期时间之后,进入死信队列

调用接口,发送消息:

发送之后:image.png

10 秒后,消息进入到死信队列 image.png

生产者首先发送一条消息,然后经过见换气(normal_exchange)顺利地存储到队列(normal_queue)中

  • 由于队列 normal_queue 设置了过期时间为 10 秒,在这 10s 内没有消费者消费这条消息,那么判定这条消息过期
  • 由于设置了 DLX,过期之后,消息会被丢给交换器(dlx_exchange)中,这时根据 RoutingKey 匹配,找到匹配的队列(dlx_queue),最后消息被存储在 queue.dlx 这个死信队列中

超出长度

测试达到队列长度,消息进入死信队列

队列长度设置为 10,我们发送 20 条数据,会有 10 条数据直接进入到死信队列

发送 20 条消息

// 测试队列长度  
for (int i = 0; i < 20; i++) {  rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");  
}

运行后:image.png

过期之后,正常队列的 10 条消息也会进入到死信队列 image.png

消息拒收

测试消息拒收

写消费者代码,并强制异常,测试拒绝签收

@Component  
public class DlxQueueListener {  // 指定监听队列的名称  @RabbitListener(queues = Constant.NORMAL_QUEUE)  public void ListenerQueue(Message message, Channel channel) throws Exception {  long deliveryTag = message.getMessageProperties().getDeliveryTag();  try {  System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  message.getMessageProperties().getDeliveryTag());  // 模拟处理失败  int num = 3 / 0;  System.out.println("处理完成");  // 3. 手动签收  channel.basicAck(deliveryTag, true);  } catch (Exception e) {  // 4. 异常了就拒绝签收  Thread.sleep(1000);  // 第三个参数 requeue,是否重新发送。true,会重新发送;false,不会重新发送  channel.basicNack(deliveryTag, true, false);  }  }  // 指定监听队列的名称  @RabbitListener(queues = Constant.DLX_QUEUE)  public void ListenerDLXQueue(Message message, Channel channel) throws Exception {  long deliveryTag = message.getMessageProperties().getDeliveryTag();  System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  message.getMessageProperties().getDeliveryTag());  }  
}

发送消息 image.png|439

常见面试题

#高频面试
死信队列作为 RabbitMQ 的高级特征,也是面试的一大重点

1. 死信队列的概念

死信(Dead Letter)是消息队列中的一种特殊消息,它指的是那些无法正常消费或处理的消息。

在消息队列系统中,如 RabbitMQ,死信队列用于存储这些死信信息

2. 死信的来源

  1. 消息过期:消息在队列中存活的时间超过了设定的 TTL
  2. 消息被拒绝:消费者在处理消息时,可能因为消息内容错误,处理逻辑异常等原因,拒绝处理该信息。如果拒绝时指定不重新入队(requeue=false),消息也会成为死信
  3. 队列满了:当队列达到最大长度,无法再容纳新的消息时,新来的消息会被处理为死信

3. 死信队列的应用场景

对于 RabbitMQ 来说,死信队列是一个非常有用的特性。

它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统

场景的应用还有:

  • 消息重试:将死信消息重新发送到原队列或另一个队列进行重试处理
  • 消息丢失:直接丢这些无法处理的消息,以避免他们占用系统资源
  • 日志收集:将死信消息作为日志收集起来,用于后续分析和问题定位
http://www.dtcms.com/a/318566.html

相关文章:

  • 560. 和为 K 的子数组 - 前缀和思想
  • MATLAB下载教程MATLAB R2025a 保姆级安装步骤(附安装包)
  • 数据结构——双向链表及makefile
  • c++ 中 原子锁、互斥锁、自旋锁的区别和详细用法
  • 大模型 + 垂直场景:搜索 / 推荐 / 营销 / 客服领域开发
  • 【Redis】Linux部署redis 7集群部署三主三从、ACL新建用户及密码(图文教程)
  • ​ubuntu22.04系统入门 (四)linux入门命令 权限管理、ACL权限、管道与重定向
  • 集合数据类型Map和Set
  • pcl手动直通滤波
  • LeetCode每日一题,8-6
  • 基于Simulink/MWORKS的文字与开关量混合传输系统设计
  • 流式输出 vs 非流式输出
  • SpringBoot设置跨域的几种方式
  • 互斥锁与条件变量
  • 每日五个pyecharts可视化图表-bars(5)
  • Java语言基础深度面试题
  • List、ArrayList 与顺序表
  • 智能学号抽取系统 V5.7.4 更新报告:修复关键同步漏洞,体验更臻完美
  • Spring Boot 项目代码笔记
  • 三、Istio流量治理(二)
  • 文件权限合规扫描针对香港服务器安全基线的实施流程
  • 《零基础入门AI:深度学习入门(从PyTorch安装到自动微分)》
  • Anthropic于本周一推出了其旗舰模型的升级版Claude Opus 4.1
  • 《第十三篇》深入解析 `kb_api.py`:知识库的创建、删除与查询接口
  • 基于Vue 3 的智能支付二维码弹窗组件设计与实现
  • Effective C++ 条款26: 尽可能延后变量定义式的出现时间
  • 007 前端( JavaScript HTML DOM+Echarts)
  • 【保留小数精度不舍入】2022-10-8
  • MaxKB 使用 MCP 连接 Oracle (免安装 cx_Oracle 和 Oracle Instant Client)
  • 智慧水务管理系统