【星海出品】RabbitMQ 死信
【开篇一段小独白】
今天想专门开一期讲一下死信,因为我记得在我刚毕业的时候,面试会有一些大厂面试,暴露了最近几年没有大厂面试了【🤣】
所以这里问一下
Q:大厂是不是特别钟爱面试刚毕业的大学生。
A:评论区告诉我
注: 任何真理都是有 时间、空间 和 对象限制的。
当前时间:2025年9月26日
死信
死信 其实是一项比较高级的功能,一般小微企业和初级工程师很难遇到,比如小微企业业务单一,很难出现无法消费的消息,又或者初级工程师不会用到 高级功能 既短期无法消费重新入队,等待其他进程消费后再消费的功能。
【死信】顾名思义就是没办法被消费的消息。
一个需要注意的指标:TTL(Time To Live)
指消息在队列中可以存活的时间,如果消息在队列中存活的时间超过了TTL,那么消息就会被标记为死信,然后进入死信队列
【死信的存在】
来源于异步任务中,需要大量的颗粒度代码入队,但是代码存在顺序性,也就是收到这个任务的key和关键字,判断其前置条件,如果没有完成则使其重新入队,因为 队列的特性 是 先入先出 的处理模型,所以可以有效的调整代码运行顺序。
但是如果超过了TTL 的时间,则会将这个消息放入死信队列
我们可以设置一个时间,比如我们想让这个消息延迟10分钟再发送到死信队列中,那么我们就可以将这个消息发送到延迟队列中,然后定时去消费延迟队列中的消息,然后进行相应的处理
有些人初级工程师可能会说我这边服务器内存足够直接调用 restful API 不就好了,为什么需要前置队列。
A:
1.首先有前置队列可以实现耗时任务的解耦,不会占用大量的时间去等待,当然接收消息后也可以去发送后台进程去完成,但是如果不解耦那么就需要对整个代码进行调整,增大了开发周期,尤其是代码越来越大的时候,有时候可能你写的函数会被别人复用,那么你要怎么改怎么调整,为了快捷可能就是复制粘贴一块代码改个名字继续修改,好了,你的代码成功变成了屎山代码。
2.分布式的合作,任务颗粒度的入队处理。
3.解决端口及单点压力处理复杂的网页业务场景。比如: netstat -an 可以查看 大量TIME_WAIT连接:127.0.0.1本地环回接口存在大量TIME_WAIT状态(如127.0.0.1:56778 -> 127.0.0.1:8000),表明短连接未复用,可能触发TCP端口耗尽(Linux默认本地端口范围32768-60999,约2.8万可用)。
CLOSE_WAIT堆积:存在多条CLOSE_WAIT记录(如127.0.0.1:51920 -> 127.0.0.1:8000),说明服务端已关闭连接但客户端未发送FIN,可能由客户端超时或网络中断导致。
本地127.0.0.1:8000与多个客户端端口建立ESTABLISHED连接,暗示服务端内部处理队列可能积压(如线程池饱和、数据库查询慢)。
【DLX(死信交换机)】是RabbitMQ中用于处理消息过期或被拒绝后转发的机制,结合消息TTL(生存时间)和死信队列可实现延迟消费功能。
实现:
死信队列(DLX):当消息满足特定条件(如被拒绝、TTL 过期、队列满)时,RabbitMQ 将其重新路由到一个指定的 Dead Letter Exchange(死信交换机),再由该交换机根据绑定规则投递到对应的“死信队列”。
比如:
被消费者拒绝 调用 basic.reject 或 basic.nack 且 requeue=false
TTL(生存时间)过期 消息或队列设置了 x-message-ttl,超时未被消费
队列达到最大长度 队列设置了 x-max-length,新消息无法入队
创建死信交换机和队列
设置死信路由键(如deadExchange),并指定x-dead-letter-exchange参数为该死信交换机。 import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()# 1. 创建死信交换机(Topic 类型)
channel.exchange_declare(exchange='dlx_topic_exchange', exchange_type='topic')# 2. 创建死信队列
channel.queue_declare(queue='dead_letter_queue')# 3. 绑定死信队列到 Topic 交换机(使用通配符)
channel.queue_bind(exchange='dlx_topic_exchange',queue='dead_letter_queue',routing_key='dead_letter.*' # 匹配所有死信消息
)# 4. 创建主队列,指定死信交换机
channel.queue_declare(queue='main_queue', arguments={'x-dead-letter-exchange': 'dlx_topic_exchange', # 指向 Topic 交换机'x-dead-letter-routing-key': 'dead_letter.orders' # 设置死信路由键
})connection.close()# 监控死信队列消息