当前位置: 首页 > news >正文

【MQ篇】RabbitMQ之死信交换机!

在这里插入图片描述

目录

    • 引言:消息不死,只是变成死信?
    • 初识死信交换机:死信从哪来?DLX 干啥的?
      • 什么是死信?
      • 什么是死信交换机 (DLX)?
      • 死信的旅程:如何从队列到达 DLX 并被路由?🗺️
      • 死信交换机的使用场景总结 📜
    • TTL:让消息“过期”变死信 🕰️
      • 延迟队列:DLX + TTL 的“神仙组合” ✨📦⏳
      • DLX + TTL 实现延迟队列的代码配置(简要回顾)
      • RabbitMQ 官方 Delay Exchange 插件:更原生的延迟方案!
    • 总结:死信、TTL 与延迟队列 📜

🌟我的其他文章也讲解的比较有趣😁,如果喜欢博主的讲解方式,可以多多支持一下,感谢🤗!

🌟了解 MQ 请看 : 【MQ篇】初识MQ!

其他优质专栏: 【🎇SpringBoot】【🎉多线程】【🎨Redis】【✨设计模式专栏(已完结)】…等

如果喜欢作者的讲解方式,可以点赞收藏加关注,你的支持就是我的动力
✨更多文章请看个人主页: 码熔burning

引言:消息不死,只是变成死信?

朋友们!👋 咱们之前聊了 RabbitMQ 消息的可靠传输,确保消息能从生产者安全到达队列,不丢不失。但是,消息进了队列,就万事大吉了吗?图样图森破!🙅‍♀️ 消息在队列里可能会遇到各种“意外”,导致它无法被正常消费。比如:

  • 消费者处理不了,直接跟你“撂挑子”拒绝了!
  • 消息在队列里待太久,过期了!
  • 队列消息爆满了,新来的消息没地儿去,老消息就被挤掉了!

这些“命运多舛”的消息,RabbitMQ 给它们起了一个特别的名字——死信(Dead Letter)!👻✉️

那么问题来了,这些“死掉”的消息,RabbitMQ 会怎么处理呢?直接丢进回收站吗?当然不是(除非你没配置好)!对于重要的消息,每一条都不能轻易放弃!这时候,就要请出咱们今天的主角——死信交换机(Dead Letter Exchange - DLX) 登场了!它就像 RabbitMQ 里的“问题包裹回收中心”或者“死信中转站”,专门负责接收和处理这些来自“五湖四海”的死信!📦➡️💀➡️🔄

初识死信交换机:死信从哪来?DLX 干啥的?

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为 false。这是最常见的主动制造死信的方式!“我不要这烫手山芋,你也别再发给我了,按死信处理吧!” 👋❌
  • 消息是一个过期消息,超时无人消费。消息或者它所在的队列设置了存活时间(TTL),时间到了还没被消费,就“自然死亡”了。🕰️👻
  • 要投递的队列消息满了,无法投递。队列像个仓库,容量有限,满了再来货,最老的可能被挤压“致死”。📏💀

我将用图片来展示了消息被消费者拒绝 (requeue=false) 后变成死信的过程:

在这里插入图片描述

什么是死信交换机 (DLX)?

DLX 并不是 RabbitMQ 里一种全新的交换机类型。它就是一个普通的交换机(可以是 Direct、Topic 或 Fanout),只不过它被某个普通队列指定为了接收死信的“专属通道”

如果一个包含死信的队列(比如 simple.queue)配置了 dead-letter-exchange 属性,指定了一个交换机(比如 dl.direct),那么队列中的死信就不会被丢弃,而是投递到这个指定的交换机中。这个被指定的交换机,就是死信交换机 (DLX)。

在这里插入图片描述

如果这个死信交换机也绑定了其他的队列,那么这些死信最终会进入这些队列,等待后续处理:

在这里插入图片描述

DLX 的核心作用: 就是作为一个“中转站”,收集各种原因产生的死信,并根据路由规则把它们发往最终的处理目的地(比如专门存放死信的队列、用于延迟重试的队列等)。

死信的旅程:如何从队列到达 DLX 并被路由?🗺️

队列将死信投递给死信交换机时,必须知道两个信息:死信交换机名称 和 死信交换机与死信队列绑定的 RoutingKey。这正是死信路由的关键所在!

  1. 指定 DLX 名称:普通队列声明时,通过 arguments 参数设置 x-dead-letter-exchange 来指定死信要去的 DLX 名称。

    // QueueBuilder.durable("simple.queue") // 指定队列名称
    //   .deadLetterExchange("dl.direct") // ⭐ 这里指定死信交换机名称 ⭐
    //   .build();
    

    这就是告诉 RabbitMQ,“我的死信都送去 dl.direct !”

  2. 死信的路由键: 当消息变成死信投递到 DLX 时,它需要一个路由键才能被 DLX 正确路由。这个路由键默认是原消息发送时的路由键。例如,原消息是发往 order.exchange,路由键是 "create",那么它变成死信后,默认会带着 "create" 这个路由键发往 DLX。

    • 可选: 你也可以在普通队列声明时,通过 arguments 参数设置 x-dead-letter-routing-key指定死信的路由键,这样所有从这个队列出来的死信都会使用这个指定的路由键,覆盖掉原路由键。
  3. DLX 的绑定与路由: DLX 收到死信后,就像处理普通消息一样,根据自身的交换机类型和死信的路由键,查找匹配的绑定,将死信路由到与之绑定的队列(这就是“死信队列”, dl.queue 就是一个例子)。

    在这里插入图片描述

    (死信携带路由键到达 DLX,DLX 按绑定规则路由到死信队列)

完整的 Spring Boot 代码示例,演示了如何配置一个普通队列 (simple.queue),让它的死信进入指定的 DLX (dl.direct),然后这个 DLX 又绑定了一个专门接收死信的队列 (dl.queue):

// producer服务CommonConfig中定义死信交换机、死信队列的代码
package cn.itcast.mq.config; // 假设你的包名import org.springframework.amqp.core.Binding; //
import org.springframework.amqp.core.BindingBuilder; //
import org.springframework.amqp.core.DirectExchange; //
import org.springframework.amqp.core.Queue; //
import org.springframework.amqp.core.QueueBuilder; //
import org.springframework.context.annotation.Bean; //
import org.springframework.context.annotation.Configuration; //@Configuration //
public class CommonConfig { //// ⭐ 定义业务队列,并配置其死信发往 DLX ⭐@Beanpublic Queue simpleQueue(){System.out.println("🛠️ 定义业务队列,指定死信交换机"); // 添加日志return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化 ✅.deadLetterExchange("dl.direct") // ⭐ 配置死信交换机为 dl.direct ⭐.build();}// ⭐ 声明死信交换机 dl.direct ⭐// 这个交换机用来接收来自 simple.queue 的死信@Beanpublic DirectExchange dlExchange(){System.out.println("🛠️ 声明死信交换机 dl.direct"); // 添加日志return new DirectExchange("dl.direct", true, false); // 持久化 ✅}// ⭐ 声明存储死信的队列 dl.queue ⭐// 这个队列绑定到 dl.direct 接收死信@Beanpublic Queue dlQueue(){System.out.println("🛠️ 声明存储死信的队列 dl.queue"); // 添加日志return new Queue("dl.queue", true); // 持久化 ✅}// ⭐ 将死信队列 dl.queue 与 死信交换机 dl.direct 绑定 ⭐// 绑定键要和从 simple.queue 出来的死信路由键匹配 (simple.queue 没有指定 deadLetterRoutingKey, 默认使用原路由键)// 假设原消息发送到 simple.queue 时使用的路由键是 "dl"@Beanpublic Binding dlBinding(){System.out.println("🛠️ 将死信队列 dl.queue 与 死信交换机 dl.direct 绑定,路由键为 'dl'"); // 添加日志return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl"); // ⭐ 绑定键 "dl" ⭐}
}

注意:如果你之前已经用 @Bean@RabbitListener 声明过 simple.queue 队列且没有指定 deadLetterExchange 属性,现在又用同样的名字多加了这个属性来声明,启动时会因为队列属性冲突而报错。你需要先删除 RabbitMQ 中的旧队列再启动应用。

死信交换机的使用场景总结 📜

  • 如果队列绑定了死信交换机,死信会投递到死信交换机。这是机制本身。
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。这是最直接的用途,创建一个专门的 DLQ,把所有失败消息都导进去,方便运维人员查看和处理。

除了提到的收集失败消息用于人工处理,死信交换机还有一个更重要、更常见的应用场景,那就是实现延迟队列和延迟重试!📦⏳ 这就引出了后面详细介绍的 TTL。

TTL:让消息“过期”变死信 🕰️

TTL (Time To Live) 就像消息的“生命倒计时”。一个队列中的消息如果超时未被消费,就会变为死信。超时的方式有两种:

  1. 消息所在的队列设置了超时时间: 在队列声明时配置 x-message-ttl 属性。进入这个队列的消息,如果超过队列设定的 TTL 时间还没被消费,就会死掉。

    // RabbitConfig.java
    @Bean
    public Queue ttlQueue(){System.out.println("🛠️ 声明一个带 TTL 的队列"); // 添加日志return QueueBuilder.durable("ttl.queue") // 持久化.ttl(10000) // ⭐ 设置队列消息的 TTL,10 秒 (10000 毫秒) ⭐.deadLetterExchange("dl.ttl.direct") // ⭐ 这个队列的死信发往 dl.ttl.direct ⭐.build();
    }
    // 这个队列的死信交换机是 dl.ttl.direct,你需要定义它并绑定接收死信的队列
    // @Bean public DirectExchange dlTtlExchange() { ... }
    // @Bean public Queue dlTtlQueue() { ... }
    // @Bean public Binding dlTtlBinding(...) { ... }
    
  2. 消息本身设置了超时时间: 在发送消息时给消息设置 expiration 属性。消息进入队列后,如果它本身的过期时间先于队列的 TTL 到期,或者队列没有设置 TTL,消息也会死掉。发送时设置消息 TTL 的代码示例:

    // 发送消息时设置 TTL 的代码
    @Test // 这是一个测试方法
    public void testTTLMsg() {System.out.println("📨 正在发送一条带 TTL 的消息"); // 添加日志// 创建消息Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration("5000") // ⭐ 设置消息本身的 TTL,5 秒 (5000 毫秒) ⭐.build();// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息到 ttl.direct 交换机,路由键 "ttl"rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);log.debug("发送消息成功");
    }
    

    注意: 当队列和消息都设置了 TTL 时,两者之间 TTL 值小的那个会生效,先到期的那个会让消息变成死信。

延迟队列:DLX + TTL 的“神仙组合” ✨📦⏳

利用 TTL 让消息过期变成死信,再利用死信交换机把死信路由到其他地方,就实现了消息发出后不是立即被消费,而是延迟一段时间后才被处理的效果!这种模式就称为延迟队列 (Delay Queue) 模式。

延迟队列的经典场景:延迟发送短信、用户下单 15 分钟未支付自动取消订单、预约会议 20 分钟后通知参会人员等。

DLX + TTL 实现延迟队列的原理:

  1. 生产者发送一个需要延迟的消息到业务交换机,消息路由到业务队列
  2. 业务队列被配置了死信交换机 x-dead-letter-exchange 指向 DLX
  3. 消息进入业务队列后,因为我们最终想要它延迟消费,所以它不能被立即消费。它需要变成死信!这里最常用的是利用 TTL
  4. 给业务队列或消息设置 TTL。消息在业务队列里等待 TTL 时间。⏰
  5. TTL 到期,消息变成死信。
  6. 因为业务队列配置了 DLX,死信被发送到 DLX。💀➡️
  7. DLX 绑定了一个新的队列,这个队列是真正的延迟队列。这个延迟队列没有消费者监听!它的唯一作用就是 “中转”和“等待”
  8. DLX 把死信路由到这个延迟队列
  9. 消息在这个延迟队列里等待。关键点来了! 这个延迟队列也要配置 x-message-ttl,并且这个 TTL 值就是你想要的延迟时间!⏳
  10. 消息在延迟队列里等待 TTL 时间到期后,再次变成死信。
  11. 这个延迟队列也要配置 x-dead-letter-exchange,并且,它把死信发回原来的业务交换机!🤯
  12. 业务交换机收到消息,再次把它路由回原来的业务队列
  13. 消费者监听的是原来的业务队列,于是它就收到了这条“延迟”后回来的消息!🎉

通过这个流程,消息就像在业务队列和“中转+等待”队列之间绕了个圈,成功实现了延迟消费。

DLX + TTL 实现延迟队列的代码配置(简要回顾)

你需要定义:

  • 业务交换机和业务队列: 业务队列配置 x-dead-letter-exchange 指向你的 DLX。
  • 死信交换机 (DLX): 一个普通交换机。
  • 延迟队列: 配置 x-message-ttl (延迟时间) 和 x-dead-letter-exchange 指向业务交换机。
  • 绑定: 将延迟队列绑定到 DLX,绑定键匹配业务队列死信的路由键。

这样,发送到业务队列的消息,如果设置了小于业务队列 TTL 的 TTL(或者业务队列没有 TTL),就会在业务队列里变成死信 -> 进入 DLX -> 进入延迟队列 -> 在延迟队列里等待 TTL -> 变成死信 -> 发回业务交换机 -> 回到业务队列被消费。

RabbitMQ 官方 Delay Exchange 插件:更原生的延迟方案!

RabbitMQ 官方提供的 Delay Exchange 插件!👏 如果你的 RabbitMQ 版本支持,使用这个插件实现延迟功能会更简单粗暴,不需要 DLX + TTL 这种“曲线救国”的方式。

  • 原理: 声明一个类型为 x-delayed-message 并设置 delayed = true 的交换机。当你发送消息到这个交换机时,消息会先被插件接收并持久化,然后读取消息头的 x-delay 属性作为延迟时间。时间到期后,插件会模拟一次消息投递,把消息发送到该交换机绑定的队列。

  • 使用方式:

    1. 安装 Delay Exchange 插件(自行查找安装教程)。
    2. 声明一个交换机,类型可以是任意类型 (如 Direct),但必须添加 delayed = true 属性。基于注解 @Exchange(name = "delay.direct", delayed = "true") 或者基于 Bean 配置都可以。
    3. 发送消息时,在消息头里添加 x-delay 属性,值就是你想要的延迟时间(毫秒)。发送消息的代码示例:
    // 发送延迟消息的示例
    @Test // 测试方法
    public void testDelayedMsg() {System.out.println("📨 正在发送一条使用 Delay Exchange 插件的延迟消息"); // 添加日志// 创建消息Message message = MessageBuilder.withBody("hello, delay message".getBytes(StandardCharsets.UTF_8)).setHeader("x-delay",10000) // ⭐ 在消息头设置 x-delay,指定延迟时间 10 秒 ⭐.build();// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息到延迟交换机 delay.direct,路由键 "delay"rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);log.debug("发送消息成功");
    }
    

    使用 Delay Exchange 插件,流程更直观:消息 -> 延迟交换机 (等待) -> 延迟时间到 -> 发往绑定队列 -> 被消费者消费。省去了 DLX 和额外队列的周转。

总结:死信、TTL 与延迟队列 📜

我们可以更全面地理解死信、TTL 和延迟队列:

什么样的消息会成为死信?

  • 消息被消费者 reject 或者 nackrequeue=false
  • 消息超时未消费(消息或队列 TTL 到期)。
  • 队列满了。

死信交换机的使用场景是什么?

  • 收集所有消费者处理失败的消息(死信),交由人工处理,提高可靠性。
  • 与 TTL 结合,实现消息的延迟队列功能。

消息超时的两种方式是?

  • 给队列设置 x-message-ttl 属性。
  • 给消息设置 expiration 属性。

如何实现发送一个消息 N 秒后消费者才收到消息? (使用 DLX + TTL 方案)

  1. 给消息的目标队列指定死信交换机 (x-dead-letter-exchange)。
  2. 声明一个延迟队列,设置 x-message-ttl 为 N 秒,并将其 x-dead-letter-exchange 指向原业务交换机
  3. 将这个延迟队列绑定到业务队列指定的死信交换机(绑定键匹配死信路由键)。
  4. 发送消息到业务队列,确保消息进入业务队列后不被立即消费(比如没有消费者或消费者收到后 NACK(false, false))。消息会在业务队列里因无人消费或被 NACK 变成死信,进入 DLX -> 进入延迟队列等待 N 秒 -> 过期变死信 -> 回到业务交换机 -> 回到业务队列被消费。

或者,更简单的方案是使用 RabbitMQ Delay Exchange 插件

  1. 安装插件。
  2. 声明一个 delayed = true 的交换机。
  3. 将消费者监听的队列绑定到这个延迟交换机。
  4. 发送消息到这个延迟交换机,并在消息头设置 x-delay 为 N 毫秒。

死信交换机是 RabbitMQ 处理异常消息、实现延迟重试和构建复杂工作流的核心组件。理解了它的工作原理和配置方式,你就掌握了 RabbitMQ 消息高级玩法的敲门砖!🔑🚪

希望这篇超详细的死信交换机“攻略”能帮助你彻底吃透它!😊🚀

了解RabbitMQ消息不丢的“三板斧”请看:
【MQ篇】RabbitMQ的生产者消息确认实战!
【MQ篇】RabbitMQ之消息持久化!
【MQ篇】RabbitMQ的消费者确认机制实战!
了解RabbitMQ消息失败重试请看:
【MQ篇】RabbitMQ之消费失败重试!

相关文章:

  • OpenCV 图形API(65)图像结构分析和形状描述符------拟合二维点集的直线函数 fitLine2D()
  • FlinkUpsertKafka深度解析
  • 基础的贝叶斯神经网络(BNN)回归
  • 零基础小白如何上岸数模国奖
  • 大学之大:伦敦政治经济学院2025.4.27
  • 【音视频】FFmpeg过滤器框架分析
  • 人工智能—— K-means 聚类算法
  • Spring Cloud Alibaba 整合 Sentinel:实现微服务高可用防护
  • Awesome-Embodied-AI: 具身机器人的资源库
  • [论文梳理] 足式机器人规划控制流程 - 接触碰撞的控制 - 模型误差 - 自动驾驶车的安全合规(4个课堂讨论问题)
  • 【读写视频】MATLAB详细代码
  • 简述删除一个Pod流程?
  • 【计算机组成原理实验】实验一 运算部件实验_加法器及计算机性能指标
  • Redis超详细入门教程(基础篇)
  • 【每日随笔】文化属性 ② ( 高维度信息处理 | 强者思维形成 | 认知重构 | 资源捕获 | 进化路径 )
  • Spark SQL核心概念与编程实战:从DataFrame到DataSet的结构化数据处理
  • Spark-Streaming核心编程(四)总结
  • 关于堆栈指针的那些事 | bootloader 如何跳转app
  • 如何解决无训练数据问题:一种更为智能化的解决方案
  • k8s学习记录(五):Pod亲和性详解
  • 辽宁召开假期安全生产工作调度会:绝不允许层层失守,绝不允许“带病运行”
  • 中国驻美国使领馆提醒在美中国公民注意交通安全
  • 习近平给谢依特小学戍边支教西部计划志愿者服务队队员回信
  • 一代名伶程砚秋经典影像:一箱旧影,芳华满堂
  • 国际观察|韩国在政局多重不确定性中迎接总统选举
  • 五大白酒去年净利超1500亿元:贵州茅台862亿领跑,洋河营收净利齐降