当前位置: 首页 > news >正文

RabbitMQ--消息丢失问题及解决

一、什么是消息的丢失?

消息丢失是指在消息从生产者到消费者的过程中,消息因为某种原因未能被正确接收、持久化、消费或处理,从而导致业务数据丢失。

常见消息丢失场景:

场景描述
生产者发送失败由于网络问题、Broker 宕机,消息未送达 RabbitMQ
消息未持久化队列/交换机/消息未持久化,Broker 重启后丢失
消费者未确认消费者处理失败未 ack,RabbitMQ 误认为已消费
消息被拒绝且不重回队列basicReject/requeue=false
网络异常丢失在传输过程中消息被中断或异常

二、RabbitMQ 如何解决消息丢失?

RabbitMQ 提供如下机制来保障消息可靠性:

机制作用对应代码
消息持久化(Durability)将交换机、队列、消息写入磁盘✅ Demo1
生产者确认(Publisher Confirms)确保消息成功送达交换机/队列✅ Demo2(同步/异步 + 批量)
消费者确认(Consumer Ack)确保消费成功后再从队列删除✅ Demo3
死信队列(DLX)失败或拒绝的消息进入备用队列,避免丢失✅ Demo4


三、解决方案与代码示例(原生 Java)


✅ Demo1:持久化机制(队列 + 消息)

目的:解决 RabbitMQ 宕机/重启后消息丢失问题。

✅ Producer(生产者)
Channel channel = connection.createChannel();
// 声明持久化队列(durable = true)
channel.queueDeclare("persistent_queue", true, false, false, null);// 设置消息持久化属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2 = 持久化.build();String msg = "Persistent Hello";
channel.basicPublish("", "persistent_queue", props, msg.getBytes());
✅ Consumer(消费者)
channel.basicConsume("persistent_queue", true, (tag, delivery) -> {System.out.println("接收到消息:" + new String(delivery.getBody()));
}, tag -> {});

✅ Demo2:生产者确认机制(Publisher Confirms)

用于确保消息是否真正送达 Broker


🌟 说明:两种确认方式
类型特点方法
同步确认发送后阻塞等待结果channel.waitForConfirms()
异步确认通过监听回调addConfirmListener()

🌟 说明:单条和批量

    long timeout:这两方法都会阻塞线程,可以设置超时时间避免一直阻塞等待

方法确认失败时的行为是否抛出异常返回值适合场景
waitForConfirms(long timeout)不会抛异常,只返回 false❌ 不抛异常(除非网络/线程异常)true 表示全部确认成功,false 表示有未确认消息想要根据返回值判断并自己处理未确认
waitForConfirmsOrDie()一旦有未确认消息,直接抛异常,终止发送流程✅ 抛出 IOException无返回值(void想快速失败,进入兜底处理流程

✅ Producer - 同步确认(非批量)
channel.confirmSelect(); // 开启确认模式
String msg = "Confirm message";channel.basicPublish("", "confirm_queue", null, msg.getBytes());
//waitForConfirms方法会主动阻塞等待确认结果。
if (channel.waitForConfirms()) {System.out.println("消息成功发送并被确认");
} else {System.out.println("发送失败!");
}
✅ Producer - 批量确认(批量 + 同步)
channel.confirmSelect();
for (int i = 0; i < 10; i++) {String msg = "batch msg " + i;channel.basicPublish("", "confirm_queue", null, msg.getBytes());
}// 批量确认所有消息,主动阻塞等待确认结果。如果有未处理的会抛出异常
try {// 可设置超时时间(毫秒),避免无限阻塞(不加参数是一直阻塞)channel.waitForConfirmsOrDie(5000);// 5秒超时System.out.println("✅ 所有消息均已被RabbitMQ确认接收");} catch (IOException e) {System.err.println("❌ 消息确认失败:有消息未被接收");e.printStackTrace();// 可添加重发逻辑} 
✅ Producer - 异步确认(推荐 + 高性能)
channel.confirmSelect(); // 开启异步确认channel.addConfirmListener(new ConfirmListener() {
//multiple 的值是由 RabbitMQ 自动传递的,开发者 开发者无需手动设置。
//消息少、发送间隔长 → 大概率 multiple=false(单条确认)。
//消息多、发送间隔短 → 大概率 multiple=true(批量确认)。@Overridepublic void handleAck(long deliveryTag, boolean multiple) {if (multiple) {// multiple=true:确认的是 <= deliveryTag 的所有消息(批量确认)System.out.println("✅ 批量确认成功");} else {// multiple=false:仅确认当前deliveryTag对应的消息(单条确认)System.out.println("✅ 单条确认成功");}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) {if (multiple) {// multiple=true:确认的是 <= deliveryTag 的所有消息(批量确认)System.out.println("❌ 批量确认失败");} else {// multiple=false:仅确认当前deliveryTag对应的消息(单条确认)System.out.println("❌ 单条确认失败");}}
});// 发送消息
channel.basicPublish("", "confirm_queue", null, "Hello Async".getBytes());
✅ Consumer
channel.queueDeclare("confirm_queue", false, false, false, null);
channel.basicConsume("confirm_queue", true, (tag, msg) -> {System.out.println("消费:" + new String(msg.getBody()));
}, tag -> {});

✅ Demo3:消费者手动确认(Manual ACK)

用于保证消费成功后再从队列移除,否则可拒绝并重试。

✅ Producer
channel.queueDeclare("ack_queue", false, false, false, null);
channel.basicPublish("", "ack_queue", null, "Manual ACK!".getBytes());
✅ Consumer(手动 ACK)
channel.basicConsume("ack_queue", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {try {System.out.println("处理:" + new String(body));// 成功后确认channel.basicAck(envelope.getDeliveryTag(), false);} catch (Exception e) {// 失败后拒绝并重新入队channel.basicNack(envelope.getDeliveryTag(), false, true);}}
});

✅ Demo4:死信队列(DLX)

用于处理消息拒绝/过期/队列满等异常情况,防止丢失。

✅ 声明死信交换机和死信队列
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");channel.queueDeclare("normal_queue", true, false, false, args);
channel.exchangeDeclare("dlx-exchange", "fanout", true);
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx-exchange", "");
✅ 消费者拒绝消息时进入死信队列
channel.basicConsume("normal_queue", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body);if (msg.contains("error")) {// 拒绝并不重新入队channel.basicReject(envelope.getDeliveryTag(), false);} else {channel.basicAck(envelope.getDeliveryTag(), false);}}
});

四、总结:机制对比一览表

        生产者不推荐使用事务机制,建议使用Confirm

机制防丢原理配置或代码关键点是否可靠性能影响
队列/消息持久化重启后消息不丢durable=true, deliveryMode=2✅ 高中等
生产者 Confirm保障消息送达confirmSelect, ConfirmListener✅ 高略高
消费者手动 ACK处理失败可拒绝重试autoAck=false + basicAck✅ 高中等
死信队列 DLX异常消息转储x-dead-letter-exchange✅ 高略高
生产者 事务机制严格事务txSelect/txCommit✅ 非常高❌ 性能差

五、🧠 笔记补充:生产者事务机制


📌 什么是事务机制?

         RabbitMQ 提供了事务支持,允许你通过 channel.txSelect() 开启事务、channel.txCommit() 提交事务,或者 channel.txRollback() 回滚事务。

作用类似数据库事务:保证消息被 Broker 成功接收或回滚


✅ 场景适用:

        在没有启用 Confirm 机制 的老代码中,有时使用事务来保证消息“确实投递到 Broker”。
不过由于性能开销极大(每条消息都要同步确认),不推荐用于高并发场景


🚫 存在的问题:

  • 每次发送消息都要等待 RabbitMQ 的 ACK,性能极差

  • 与 Confirm 模式不能共存(只能用其一)

  • 几乎已被生产者 Confirm 替代


✅ 原生 Java 示例代码:事务机制

🔧 生产者(使用事务)
public class TxProducer {public static void main(String[] args) throws Exception {// 1. 建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 2. 开启事务channel.txSelect();try {String exchange = "tx_exchange";String routingKey = "tx_key";String message = "事务消息";// 声明交换机和队列channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, true);channel.queueDeclare("tx_queue", true, false, false, null);channel.queueBind("tx_queue", exchange, routingKey);// 3. 发送消息channel.basicPublish(exchange, routingKey, null, message.getBytes());// 4. 提交事务channel.txCommit();System.out.println("消息发送成功,事务提交!");} catch (Exception e) {e.printStackTrace();// 5. 回滚事务channel.txRollback();System.out.println("消息发送失败,事务回滚!");}}}
}

📥 消费者(常规处理即可)
public class TxConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare("tx_queue", true, false, false, null);System.out.println("等待接收消息...");DeliverCallback callback = (consumerTag, message) -> {System.out.println("收到消息:" + new String(message.getBody()));// 正常业务处理channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};channel.basicConsume("tx_queue", false, callback, consumerTag -> {});}}
}

🆚 事务 vs Confirm 模式对比表

特性事务模式(tx)Confirm 模式
可靠性✅ 高(消息必达)✅ 高
性能❌ 非常低✅ 高
编程复杂度❌ 中等✅ 可批量/异步处理
使用方式txSelect()/txCommit()confirmSelect()/回调处理
推荐程度🚫 不推荐(适合低吞吐测试或遗留系统)✅ 推荐
http://www.dtcms.com/a/299747.html

相关文章:

  • 是德科技 | AI上车后,这条“高速公路”如何畅通?
  • 如何高效合并音视频文件(时间短消耗资源少)(二)
  • 计网-TCP可靠传输
  • 周末小游戏推荐,离线也能畅玩
  • 阿里云对象存储OSS(Object Storage Service)全面解析
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘ipython’问题
  • 【计算机组成原理】第一章:计算机系统概述
  • GO 从入门到精通
  • STL——vector
  • GO 从入门到精通2
  • MyBatis-Plus 通用 Service
  • J2EE模式---表现层集成模式
  • MyBatis Plus 对数据表常用注解
  • 进阶数据结构:用红黑树实现封装map和set
  • Sql server查询汇总补缺月份
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 热词评论查询功能实现
  • Android开发中内存泄漏问题治理方案
  • 四通OKI5560SC针式打印机如何复位清零和恢复出厂设置??
  • 昇思学习营-昇思+香橙派+deepseek介绍课程内容及心得
  • Chukonu 阅读笔记
  • Rerank 模型的其中两种路径:BERT 相似度与 CoT 推理
  • 如何应对心事干扰学习工作?
  • 高可用集群KEEPALIVED的详细部署
  • 【CTF-Web】dirsearch寻找download.php进行?path=flag.txt任意文件下载
  • 深入解析命名管道:原理、实现与进程间通信应用
  • 机器学习对中特估股票关键特征选取的应用与研究
  • 【橘子分布式】gRPC(番外篇-监听流)
  • Thinkph6中常用的验证方式实例
  • 【时时三省】(C语言基础)用指向函数的指针作函数参数
  • 网络:应用层