当前位置: 首页 > news >正文

RabbitMQ 消息可靠投递

大家好,今天我们来聊聊 RabbitMQ 中一个至关重要的话题——如何确保消息从生产者可靠地发送,到消费者被成功处理的整个过程万无一失。这是一个在面试中几乎必问,同时也是生产环境中必须解决的核心问题。

我将把这个问题拆解为两个部分:生产端的可靠发送消费端的可靠处理,并为你提供完整的实战指南。

一、面试时如何回答?(黄金结构)

当面试官问你“如何保证 RabbitMQ 的消息可靠投递”时,你可以采用以下结构清晰的回答,这能充分展示你的专业性:

“面试官您好,要保证 RabbitMQ 消息的可靠投递,需要从两个关键环节入手,形成一个闭环:

  1. 确保消息成功发送到 Broker
    • 首选方案是使用“发布者确认机制(Publisher Confirm)”。我们可以在客户端开启确认模式,然后通过同步等待(waitForConfirms)或异步监听(addConfirmListener)的方式,接收 Broker 发来的确认通知,从而确切知道消息是否已被成功接收。
    • 备选方案是使用“事务机制(Transactions)”。它类似于数据库的事务,通过 txSelecttxCommittxRollback 来确保一批消息要么全部成功,要么全部失败。但由于其性能开销较大,通常作为兜底方案。
  1. 确保消息被消费者成功处理
    • 核心机制是“消费者确认机制(Ack)”。我们将消费者的自动确认(autoAck)关闭,改为手动确认。当消费者成功处理完消息后,手动调用 basicAck 方法向 Broker 发送一个确认信号。Broker 只有在收到这个 Ack 信号后,才会认为消息已被成功消费并将其从队列中删除。否则,Broker 会认为消息处理失败,将其重新投递给其他消费者。

通过以上两端的机制组合,就可以实现 RabbitMQ 消息的端到端可靠投递。”


二、生产端实战:如何确保消息成功发送?

生产端的目标是:在消息成功到达 RabbitMQ Broker 并被妥善处理(如写入磁盘)之前,生产者能够感知到任何可能的失败。

方案一:发布者确认机制(Publisher Confirm)- 首选方案

这是最常用且性能最高的方式。它的核心思想是:生产者发送消息后,RabbitMQ 会在消息被处理后,给生产者一个“确认”或“否认”的回执。

两种实现方式:

1. 同步确认 (waitForConfirms)

这种方式简单直接,但会阻塞当前线程,直到收到确认。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PublisherConfirmSyncExample {private final static String QUEUE_NAME = "confirm_test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 1. 开启发布者确认模式channel.confirmSelect();String message = "这是一条需要确认的消息";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] 已发送消息: '" + message + "'");// 2. 同步等待确认// 这个方法会阻塞,直到Broker确认所有已发送但未确认的消息if (channel.waitForConfirms()) {System.out.println(" [√] Broker已确认消息接收成功!");// 在这里可以安全地更新本地数据库状态,如“订单已发送”} else {System.err.println(" [×] 消息发送失败,可能已丢失!");// 在这里执行失败逻辑,如记录日志、发起重试等}}}
}

2. 异步确认 (addConfirmListener)

这种方式不会阻塞线程,性能更好。通过注册一个监听器来处理确认和否认的回调。

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;public class PublisherConfirmAsyncExample {private final static String QUEUE_NAME = "async_confirm_test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.confirmSelect(); // 开启确认模式// 使用一个有序Map来存储未确认的消息,key为deliveryTag,value为消息内容ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();// 1. 添加确认监听器ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {if (multiple) {// 如果multiple为true,表示到sequenceNumber为止的所有消息都已确认ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);confirmed.clear();} else {// 如果为false,只清除当前sequenceNumber对应的消息outstandingConfirms.remove(sequenceNumber);}System.out.println(" [√] 消息已确认, sequenceNumber: " + sequenceNumber);};// 确认回调channel.addConfirmListener(cleanOutstandingConfirms,// 否认回调(sequenceNumber, multiple) -> {String body = outstandingConfirms.get(sequenceNumber);System.err.println(" [×] 消息被否认, sequenceNumber: " + sequenceNumber + ", message: " + body);// 处理消息丢失的逻辑,如重试cleanOutstandingConfirms.handle(sequenceNumber, multiple);});// 2. 发送消息for (int i = 0; i < 10; i++) {String message = "Async Confirm Message " + i;// 记录发送的消息outstandingConfirms.put(channel.getNextPublishSeqNo(), message);channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] 已发送消息: '" + message + "'");}// 等待所有确认完成(仅为演示)Thread.sleep(5000);}}
}
方案二:事务机制(Transactions)- 备选方案

事务机制通过将消息的发送包裹在一个事务中来保证原子性。

缺点: 性能极差。因为每个事务都需要客户端和 Broker 之间进行多次网络交互(txSelect, txCommit/txRollback),会严重降低吞吐量。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class TransactionExample {private final static String QUEUE_NAME = "transaction_test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);try {// 1. 开启事务channel.txSelect();String message = "这是一条事务消息";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] 已在事务中发送消息: '" + message + "'");// 模拟业务异常// int i = 1 / 0; // 2. 提交事务channel.txCommit();System.out.println(" [√] 事务提交成功!");} catch (Exception e) {// 3. 发生异常,回滚事务channel.txRollback();System.err.println(" [×] 发生异常,事务已回滚!");e.printStackTrace();}}}
}

三、消费端实战:如何确保消息被成功处理?

消费端的目标是:只有当消息被消费者的业务逻辑成功处理后,才通知 Broker 可以删除该消息。

核心机制:手动 Ack

工作流程:

  1. 关闭自动确认:在 basicConsume 方法中,将 autoAck 参数设置为 false
  2. 处理业务逻辑:在消息处理的回调函数中,执行你的业务代码。
  3. 手动发送 Ack:业务逻辑成功执行后,调用 channel.basicAck() 方法。
import com.rabbitmq.client.*;import java.io.IOException;public class ConsumerAckExample {private final static String QUEUE_NAME = "ack_test_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [*] 等待接收消息。。。");// 1. 将 autoAck 设置为 falseboolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");long deliveryTag = envelope.getDeliveryTag();try {System.out.println(" [x] 收到消息: '" + message + "'");// 2. 模拟业务处理processMessage(message);// 3. 业务成功,手动确认// deliveryTag: 消息的唯一标识// false: 表示只确认当前这一条消息channel.basicAck(deliveryTag, false);System.out.println(" [√] 消息处理成功,已发送Ack, deliveryTag: " + deliveryTag);} catch (Exception e) {System.err.println(" [×] 消息处理失败!");e.printStackTrace();// 如果处理失败,可以选择拒绝消息并重新入队// 第三个参数 requeue: true 表示重新入队,false 表示丢弃(或进入死信队列)channel.basicNack(deliveryTag, false, true);}}});}private static void processMessage(String message) throws InterruptedException {// 模拟耗时操作Thread.sleep(1000);// 如果这里发生异常,Ack将不会被发送// if (message.contains("error")) throw new RuntimeException("Processing failed!");}
}

如果不发送 Ack 会怎样?
如果消费者在处理消息期间宕机,或者代码中没有调用 basicAck,RabbitMQ 会认为这条消息没有被成功消费。当消费者断开连接后,RabbitMQ 会将这条消息重新投递给队列中的其他消费者。


四、总结与对比

机制

解决问题

优点

缺点

推荐场景

发布者确认

确保消息成功发送到 Broker

性能高,非阻塞(异步模式),是 RabbitMQ 官方推荐的标准做法。

需要额外的代码来处理确认逻辑。

绝大多数场景下的首选

事务

确保消息发送的原子性

语义清晰,易于理解。

性能极差,严重影响吞吐量。

对性能要求不高,但对一致性要求极高的罕见场景。

消费者手动 Ack

确保消息被成功处理

可靠性高,是保证消费端不丢消息的唯一标准做法。

需要开发者手动管理 Ack 的发送时机,逻辑上要确保不遗漏。

所有需要确保消息被可靠处理的场景,应始终开启

http://www.dtcms.com/a/481739.html

相关文章:

  • RabbitMQ全面详解:从核心概念到企业级应用
  • 北京市建设工程第四检测所网站小程序定制开发团队
  • 安徽网站优化flash如何做网页
  • AI文档处理:AI在处理扫描版PDF时准确率低,如何提升?
  • TDengine 数学函数 EXP 用户手册
  • C语言自定义变量类型结构体理论:从初见到精通​​​​​​​(下)
  • 医疗网络功能虚拟化与深度强化学习的动态流量调度优化研究(下)
  • SpringMVC练习:加法计算器与登录
  • 小模型的应用
  • 深度学习进阶(一)——从 LeNet 到 Transformer:卷积的荣光与注意力的崛起
  • QPSK信号载波同步技术---极性Costas 法载波同步
  • 盘多多网盘搜索苏州seo排名公司
  • 国外有趣的网站wordpress小视频主题
  • RTC、UDP、TCP和HTTP以及直播等区别
  • Java面试场景:从Spring Web到Kafka的音视频应用挑战
  • 基于EDBO-ELM(改进蜣螂算法优化极限学习机)数据回归预测
  • gaussdb数据库的集中式和分布式
  • Ubuntu中使用Hadoop的HDFS和MapReduce
  • F024 RNN+Vue+Flask电影推荐可视化系统 python flask mysql 深度学习 echarts
  • Building-GAN模型结构详解
  • web开发,学院培养计划系统,基于Python,FlaskWeb,Mysql数据库
  • 三维旋转矩阵的左乘与右乘
  • c 网站开发数据库连接网站扫码充值怎么做的
  • 第三方媒体流压力测试:k6插件xk6-webrtc的使用来测试媒体流的性能
  • 综合门户媒体发稿哪家靠谱
  • iis网站属性没有asp.net微信订阅号做微网站
  • 【Nest】权限管理——RBAC/CASL
  • 使用LSTM进行人类活动识别
  • 列表标签之有序标签(本文为个人学习笔记,内容整理自哔哩哔哩UP主【非学者勿扰】的公开课程。 > 所有知识点归属原作者,仅作非商业用途分享)
  • AI时代BaaS | 开源的后端即服务(BaaS)平台Supaba