当前位置: 首页 > news >正文

RabbitMQ 发送方确认机制详解

一、发送方确认的核心原理与价值

1.1 为什么需要发送方确认?

消息持久化仅能解决“服务器接收消息后宕机”的问题,但无法解决“消息在传输过程中丢失”或“消息到达服务器前服务器重启”的场景——例如:

  • 生产者发送消息时,网络中断导致消息未到达 RabbitMQ;
  • 生产者向不存在的交换机发送消息,消息被直接丢弃;
  • RabbitMQ 重启期间,生产者发送的消息因服务器未就绪而丢失。

发送方确认机制通过“RabbitMQ 显式向生产者反馈消息接收状态”,解决上述问题:

  1. 生产者开启确认模式后,每条消息会被分配唯一的 deliveryTag(与消费者确认机制的 deliveryTag 类似,按 Channel 独立计数);
  2. RabbitMQ 处理消息后(如成功路由到交换机、写入磁盘等),会向生产者发送 ACK(确认)NACK(否定确认) 信号;
  3. 生产者通过回调函数接收信号,根据结果处理重试、日志记录等逻辑。

1.2 两种核心实现模式

RabbitMQ 发送方确认机制包含两种关键模式,覆盖不同业务场景:

模式核心逻辑优点缺点
Confirm 确认模式无论消息是否成功到达交换机,RabbitMQ 都会发送 ACK/NACK 信号,告知生产者结果覆盖“消息是否到达交换机”场景无法感知“消息到达交换机后路由失败”
Return 退回模式消息到达交换机后,若无法路由到任何队列,RabbitMQ 将消息退回给生产者补充“路由失败”场景的确认仅在消息到达交换机后生效

两种模式需配合使用,才能完整覆盖“消息从生产者到队列”的全链路确认。

二、模式一:Confirm 确认模式(消息到达交换机确认)

Confirm 模式是发送方确认的基础,核心解决“消息是否成功到达交换机”的问题。

2.1 核心流程

  1. 生产者配置 publisher-confirm-type,开启 Confirm 模式;
  2. 生产者通过 RabbitTemplate 设置 ConfirmCallback 回调函数;
  3. 生产者发送消息,RabbitMQ 处理后向生产者发送 ACK/NACK;
  4. 回调函数接收信号,根据 ack 布尔值判断结果(true 为成功,false 为失败)。

2.2 Spring Boot 配置与代码实现

2.2.1 配置文件(关键配置)

文档中指定的配置需开启 Confirm 模式,并设置确认类型为 correlated(关联模式,支持通过 CorrelationData 匹配消息与确认结果):

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:5672/bite  listener:simple:acknowledge-mode: manual  # 配合消费者手动确认(可选,非必需)publisher-confirm-type: correlated  # 开启Confirm模式,关联消息与确认结果
  • publisher-confirm-type 需设为 correlated(而非 simplenone),才能通过 CorrelationData 跟踪每条消息的确认状态。
2.2.2 声明交换机与队列(基础准备)

需先声明持久化的交换机和队列,确保消息到达后有存储载体:

// 常量类:管理交换机、队列名称
class Constant {public static final String CONFIRM_EXCHANGE = "confirm_exchange";  // Confirm模式交换机public static final String CONFIRM_QUEUE = "confirm_queue";        // 绑定队列
}// 配置类:声明交换机、队列及绑定关系
@Configuration
public class RabbitConfirmConfig {// 1. 声明Confirm模式交换机(topic类型,持久化)@Bean("confirmExchange")public Exchange confirmExchange() {return ExchangeBuilder.topicExchange(Constant.CONFIRM_EXCHANGE).durable(true)  // 持久化,避免服务器重启后交换机丢失.build();}// 2. 声明绑定队列(持久化)@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();}// 3. 绑定交换机与队列(路由键=confirm.key)@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmExchange") Exchange exchange,@Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("confirm.key").noargs();}
}
2.2.3 配置 ConfirmCallback 回调

通过 RabbitTemplate 设置 ConfirmCallback,实现消息确认后的逻辑处理(如日志记录、失败重试):

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {// 配置带有ConfirmCallback的RabbitTemplate@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 设置Confirm回调:接收RabbitMQ的ACK/NACK信号rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 发送消息时携带的关联数据(含消息唯一ID)* @param ack RabbitMQ是否成功接收消息(true=成功,false=失败)* @param cause 失败原因(ack=false时非null)*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String msgId = correlationData != null ? correlationData.getId() : "未知ID";if (ack) {System.out.printf("Confirm成功:消息ID=%s,已到达交换机%n", msgId);// 成功逻辑:如更新消息状态为“已发送”} else {System.err.printf("Confirm失败:消息ID=%s,原因=%s%n", msgId, cause);// 失败逻辑:如重试发送、记录错误日志}}});return rabbitTemplate;}
}
2.2.4 生产者发送消息(携带关联数据)

发送消息时需创建 CorrelationData 并设置唯一 ID(如 UUID),用于在回调中匹配消息:

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Resource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;@RestController
@RequestMapping("/producer")
public class ConfirmProducerController {// 注入带有ConfirmCallback的RabbitTemplate@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;// 发送Confirm模式消息@RequestMapping("/confirm/send")public String sendConfirmMsg() {// 1. 构建消息内容与关联数据(消息唯一ID)String msg = "Confirm模式测试消息";String msgId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(msgId);// 2. 发送消息(指定交换机、路由键、关联数据)confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"confirm.key",  // 与绑定关系的路由键一致msg,correlationData);// 3. 测试失败场景:向不存在的交换机发送消息// CorrelationData failCorrelationData = new CorrelationData(UUID.randomUUID().toString());// confirmRabbitTemplate.convertAndSend(//         "non_existent_exchange",  // 不存在的交换机//         "confirm.key",//         "失败测试消息",//         failCorrelationData// );return "消息发送中,消息ID=" + msgId;}
}

2.3 运行结果验证(文档中测试步骤)

  1. 成功场景:向存在的 confirm_exchange 发送消息,回调打印 Confirm成功:消息ID=xxx,已到达交换机,RabbitMQ 管理界面中 confirm_queueReady 消息数增加 1;
  2. 失败场景:向不存在的 non_existent_exchange 发送消息,回调打印 Confirm失败:消息ID=xxx,原因=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non_existent_exchange' in vhost 'bite', ...),明确提示“交换机不存在”。

三、模式二:Return 退回模式(消息路由失败确认)

Confirm 模式仅能确认“消息到达交换机”,但无法确认“消息到达交换机后是否成功路由到队列”。若消息无法路由到任何队列(如路由键不匹配、队列不存在),RabbitMQ 会默认丢弃消息,此时需通过 Return 模式 将消息退回给生产者,避免“无声丢失”。

3.1 核心流程

  1. 生产者开启 mandatory 属性(强制要求 RabbitMQ 退回无法路由的消息);
  2. 生产者通过 RabbitTemplate 设置 ReturnsCallback 回调函数;
  3. 消息到达交换机后,若无法路由,RabbitMQ 将消息退回并携带退回原因;
  4. 生产者通过回调接收退回消息,处理重试或日志记录。

3.2 Spring Boot 配置与代码实现

3.2.1 配置关键:开启 mandatory 属性

需在 RabbitTemplate 中设置 mandatory=true,否则 RabbitMQ 会直接丢弃无法路由的消息,不触发 Return 回调:

@Configuration
public class RabbitTemplateConfig {@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 1. 开启mandatory:强制退回无法路由的消息rabbitTemplate.setMandatory(true);// 2. 配置ConfirmCallback(同前文)rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** @param correlationData 发送消息时携带的关联数据(含消息唯一ID)* @param ack RabbitMQ是否成功接收消息(true=成功,false=失败)* @param cause 失败原因(ack=false时非null)*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String msgId = correlationData != null ? correlationData.getId() : "未知ID";if (ack) {System.out.printf("Confirm成功:消息ID=%s,已到达交换机%n", msgId);// 成功逻辑:如更新消息状态为“已发送”} else {System.err.printf("Confirm失败:消息ID=%s,原因=%s%n", msgId, cause);// 失败逻辑:如重试发送、记录错误日志}}});// 3. 配置ReturnsCallback:接收路由失败的退回消息rabbitTemplate.setReturnsCallback(returnedMessage -> {// ReturnedMessage包含退回消息的完整信息String msg = new String(returnedMessage.getMessage().getBody());System.err.printf("Return退回:消息=%s,交换机=%s,路由键=%s,退回码=%d,原因=%s%n",msg,returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());// 退回逻辑:如重新发送到备用交换机、记录路由失败日志});return rabbitTemplate;}
}
3.2.2 测试路由失败场景

向存在的交换机发送“路由键不匹配”的消息,触发 Return 回调:

@RestController
@RequestMapping("/producer")
public class ConfirmProducerController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;// 测试Return退回模式(路由键不匹配)@RequestMapping("/confirm/return")public String sendReturnMsg() {String msg = "Return模式测试消息(路由失败)";String msgId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(msgId);// 发送消息:路由键=confirm.wrong(与绑定的confirm.key不匹配)confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"confirm.wrong",  // 错误的路由键msg,correlationData);return "Return测试消息发送中,消息ID=" + msgId;}
}

3.3 运行结果验证

  • Confirm 回调:打印 Confirm成功:消息ID=xxx,已到达交换机(消息成功到达交换机,Confirm 确认成功);
  • Return 回调:打印 Return退回:消息=Return模式测试消息(路由失败),交换机=confirm_exchange,路由键=confirm.wrong,退回码=312,原因=NO_ROUTE(路由失败,消息被退回,NO_ROUTE 表示“无匹配路由”)。

四、发送方确认的注意事项

  1. Confirm 与 Return 的分工

    • Confirm 确认“消息是否到达交换机”,无论后续路由是否成功;
    • Return 仅在“消息到达交换机但路由失败”时触发,补充 Confirm 无法覆盖的场景;
    • 两者需配合使用,才能完整确认“消息从生产者到队列”的全链路。
  2. CorrelationData 的必要性

    • 必须为每条消息设置唯一的 CorrelationData(如 UUID),否则无法在回调中匹配“发送的消息”与“确认结果”;
    • 若未设置 CorrelationData,回调中的 correlationData 参数为 null,无法追踪消息状态。
  3. 失败重试的幂等性

    • 发送方确认失败后(如网络故障、路由失败),重试发送时需确保消息幂等(如通过消息 ID 避免重复处理);
    • 建议:重试次数不宜过多(如 5 次),超过次数后记录错误日志并人工介入,避免无限重试导致系统压力。
  4. 性能影响

    • 发送方确认会增加网络交互(生产者 ↔ RabbitMQ),轻微降低吞吐量;
    • 对于核心业务(如订单、支付),可靠性优先于吞吐量,必须开启发送方确认;非核心业务(如日志)可权衡关闭,以提高性能。

五、总结

机制核心目标关键配置典型场景
Confirm 确认模式确认消息是否到达交换机publisher-confirm-type=correlated + ConfirmCallback网络不稳定、交换机可能不存在的场景
Return 退回模式确认消息到达交换机后是否路由成功mandatory=true + ReturnsCallback路由键可能错误、队列可能不存在的场景

在 Spring Boot 项目中,需通过 RabbitTemplate 配置两种回调,并结合 CorrelationData 跟踪消息,最终实现“消息发送可确认、失败可感知、异常可处理”的生产端可靠性保障。

http://www.dtcms.com/a/524102.html

相关文章:

  • Keepalived 多节点负载均衡配置
  • Windows下载安装配置rabbitmq
  • 了解前端连接 RabbitMQ 的方式
  • 【ROS2】ROS2+Qt6在编译时报错:target_link_libraries
  • 从0到1理解智能体模式
  • 怎么做家具定制网站qq自动发货平台网站怎么做
  • 微网站开发合同网站建设项目付款方式
  • HarmonyOS ArkUI框架中AceContainer类的成员变量定义
  • 数据结构——希尔排序
  • 分组卷积(Grouped Convolution)原理与应用详解
  • 【信道利用率】为什么卫星链路用 SW 协议效率低?ARQ 信道利用率公式 + 计算题全解!
  • 三极管MOS管
  • PHP拆分重组pdf,php拆分pdf指定页数,并合并成新pdf
  • 详解C语言数组
  • 鹤山做网站公司建设网站宣传
  • 微信网站开发视频教程开发一个小软件多少钱
  • 释放内存与加速推理:PyTorch的torch.no_grad()与torch.inference_mode()
  • 论文笔记(九十六)VGGT: Visual Geometry Grounded Transformer
  • 城市基础设施安全运行监管平台
  • 网络 UDP 和 TCP / IP详细介绍
  • 数据结构(8)
  • [cpprestsdk] ~异步流处理(eg`basic_istream`、`basic_ostream`、`streambuf`) 底层
  • Linux 查找符合条件的文档
  • ​九小场所 / 乡镇监督防火 ——1 个平台管水源 / 隐患,整改率提 80%
  • 郑州做网站找绝唯科技地方类门户网站
  • 哪里可以做免费的物流网站国外室内设计案例网站
  • 【Linux系统】从零掌握make与Makefile:高效自动化构建项目的工具
  • ML:Supervised/Unsupervised
  • 开发网站多少钱北京 工业网站建设公司排名
  • 【后端开发面试题】