当前位置: 首页 > news >正文

RabbitMQ ,消息进入死信交换机

在 RabbitMQ 中,消息会进入死信交换机(Dead Letter Exchange,简称 DLX)通常是以下几种情况之一发生时:

  1. 消息被拒绝(nack)且没有重试

    • 如果消费者拒绝了一个消息(使用 basic.rejectbasic.nack),并且该消息没有被重新排队(即 requeue=false),则该消息会进入死信队列。
  2. 消息超时

    • 如果设置了 消息过期时间x-message-ttl),当消息在队列中停留时间超过设置的 TTL 后,消息会自动被删除,并进入死信交换机。
  3. 队列满

    • 如果队列的大小超过了其最大长度限制(x-max-length),新消息将被丢弃,或者进入死信队列(取决于队列的配置)。
  4. 队列被删除

    • 如果队列在消息仍然存在时被删除,那么这些未被消费的消息也会进入死信交换机。
  5. 队列绑定到死信交换机

    • 你可以为队列设置一个死信交换机(x-dead-letter-exchange),当队列中的消息因为某些原因无法消费或被丢弃时,它们会被路由到该死信交换机中。

通过配置死信交换机,RabbitMQ 可以帮助你捕捉到这些消息并进行后续处理,比如重试或记录日志。

在 RabbitMQ 中配置死信交换机(DLX)包括以下几个步骤:

1. 创建死信交换机(DLX)

死信交换机通常是一个普通的交换机(direct, fanout, topic等),没有特殊的配置要求。可以选择一个交换机来作为消息的死信目标。

const amqp = require('amqplib');async function createDLX() {//连接RabbitMQ服务器const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();// 创建一个普通交换机// 创建一个名为normal_exchange的普通交换机,类型为direct(直连交换机)。直连交换机是一种最简单的交换机类型,消息会根据路由键直接发送到与之绑定的队列const exchangeName = 'normal_exchange';// durable: true表示这个交换机是持久的,即使RabbitMQ重启,交换机也会存在。await channel.assertExchange(exchangeName, 'direct', { durable: true });// 创建死信交换机const dlxExchangeName = 'dlx_exchange';await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });// 创建死信队列(DLQ)const dlqName = 'dlq_queue';await channel.assertQueue(dlqName, { durable: true });// 将死信队列绑定到死信交换机await channel.bindQueue(dlqName, dlxExchangeName, 'dlx_routing_key');// 设置队列的死信交换机(DLX)/*arguments部分非常重要,x-dead-letter-exchange和x-dead-letter-routing-key设置了这个队列的死信交换机和死信路由键。
x-dead-letter-exchange:表示如果队列中的消息无法正常消费(例如过期,队列满了,或消费者拒绝),这些消息会被发送到dlx_exchange死信交换机。
x-dead-letter-routing-key:死信交换机的路由键,用来将消息路由到dlq_queue*/const queueName = 'normal_queue';await channel.assertQueue(queueName, {durable: true,arguments: {'x-dead-letter-exchange': dlxExchangeName, // 设置死信交换机'x-dead-letter-routing-key': 'dlx_routing_key' // 死信路由键}});// 将普通队列绑定到普通交换机await channel.bindQueue(queueName, exchangeName, 'normal_routing_key');// 关闭连接console.log('Created DLX and DLQ successfully');await channel.close();await connection.close();
}createDLX().catch(console.error);

2. 创建死信队列(DLQ)

创建一个死信队列,来接收死信交换机发送过来的消息。

3. 配置原始队列(生产者队列)

在生产者队列的声明时,你需要设置一些额外的队列参数,以便让消息能够进入死信队列。主要是配置 x-dead-letter-exchange 和(可选的)x-dead-letter-routing-key

示例:

rabbitmqctl add_queue myqueue
rabbitmqctl add_queue my_dead_letter_queue

然后配置生产者队列,使其消息能够进入死信交换机。

4. 设置生产者队列参数

在声明生产者队列时,设置 x-dead-letter-exchange(死信交换机)和(可选的)x-dead-letter-routing-key。你还可以设置其他参数,比如 x-message-ttl(消息过期时间)或 x-max-length(队列最大长度),这些都会影响消息何时被投递到死信交换机。

例如,如果你使用的是 direct 类型的交换机,配置方法如下:

channel.assertQueue('myqueue', {arguments: {'x-dead-letter-exchange': 'dlx_exchange',  // 死信交换机'x-dead-letter-routing-key': 'dlx_routing_key'  // 可选的死信路由键}
});

5. 创建死信交换机绑定死信队列

创建并绑定死信交换机到死信队列,以便死信消息能够被路由到正确的队列。

channel.assertExchange('dlx_exchange', 'direct');
channel.assertQueue('my_dead_letter_queue');
channel.bindQueue('my_dead_letter_queue', 'dlx_exchange', 'dlx_routing_key');

6. 处理死信消息

你可以在消费者端对死信队列进行监听、处理,比如进行重试,或者记录日志等。

const amqp = require('amqplib');async function consumeDLQ() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const dlqName = 'dlq_queue'; // 死信队列const normalExchangeName = 'normal_exchange'; // 普通交换机const normalQueueName = 'normal_queue'; // 普通队列// 监听死信队列 channel.assertQueue(dlqName)确保死信队列存在,并开始监听await channel.assertQueue(dlqName, { durable: true });console.log(`Waiting for messages in ${dlqName}.`);// 消费死信队列中的消息channel.consume(dlqName, async (msg) => {if (msg !== null) {console.log(`Received dead letter message: ${msg.content.toString()}`);// 假设我们想重试死信消息,将它发送回普通交换机和队列const messageContent = msg.content.toString();// 重试:将死信消息重新发送到普通交换机try {await channel.publish(normalExchangeName, 'normal_routing_key', Buffer.from(messageContent));console.log(`Re-published message: ${messageContent}`);// 如果成功消费,确认消息已处理channel.ack(msg);} catch (error) {// 处理重试失败的情况,可能会记录日志或报警console.error(`Failed to retry message: ${error.message}`);// 你可以选择不确认(不ack),并处理失败的消息channel.nack(msg);}}});
}
consumeDLQ().catch(console.error);

这样配置后,任何因消息超时、拒绝、队列溢出等原因未被消费的消息都会被路由到配置好的死信交换机和死信队列中。

总结

  • 死信交换机的配置关键在于使用 x-dead-letter-exchange 和(可选的)x-dead-letter-routing-key
  • 配置适当的消息过期时间、队列大小限制等,可以影响消息何时被送到死信队列。
  • 死信队列和交换机要确保有合适的绑定配置。
http://www.dtcms.com/a/337770.html

相关文章:

  • QT 字节大小端转序方法
  • Qt5基础控件详细讲解
  • VSCode REST Client 使用总结
  • 【力扣-轮转数组 Java / Python】
  • leetcode415. 字符串相加
  • 【论文阅读】-《HopSkipJumpAttack: A Query-Efficient Decision-Based Attack》
  • Jenkins全链路教程——Jenkins调用Maven构建项目
  • 北京朝阳公园——夏日清凉来袭
  • 第7节 神经网络
  • 登上Nature!清华大学光学神经网络研究突破
  • FastAPI + React:现代 Web 前后端分离开发的全栈实践指南
  • 【原理】Unity GC 对比 C# GC
  • 电竞酒店和高校宿舍对AI云电竞游戏盒子的需求有什么不同?
  • 静态资源保存插件横评:Save All Resources 与 ResourcesSaverExt 哪个更适合你?
  • 无人机基础知识
  • 测绘级组合导航如何重新定义大型无人机的高精度导航标准?
  • 用本地代理 + ZIP 打包 + Excel 命名,优雅批量下载跨域 PDF
  • PDF转图片需要用到什么技术?苹果手机怎样将PDF转为jpg?
  • HTML/CSS 实战知识点总结:从基础到常用效果全解析
  • 2025 世界机器人大会启示录:机构学 × AI × 视频链路的融合之路
  • 【低空安全】低空安全简介
  • 27.Linux 使用yum安装lamp,部署wordpress
  • Kafka 零拷贝(Zero-Copy)技术详解
  • 【学习嵌入式-day-27-进程间通信】
  • 开放最短路径优先协议
  • Read View 在 MVCC 里如何工作的?
  • DSP音频算法工程师技能2
  • IDE开发系列(2)扩展的IDE框架设计
  • GNhao/GN号,海外SIM号怎么注册详细步骤!
  • 纯前端表格控件SpreadJS v18.0 Update1正式发布——集成AI智能化插件