RabbitMQ 发送方确认的两大工具 (With Spring Boot)
核心概念解析
发布者确认机制的核心思想是:将消息投递的可靠性从“尽力而为”提升为“契约保证”。生产者不再是“发后不理”,而是与 Broker 建立一个双向的沟通渠道。
在 Spring AMQP 的封装下,这个机制主要由两个回调接口实现:
1. ConfirmCallback: 确认消息是否到达 Exchange
这是最核心的确认机制。它关注的是消息从生产者到 Broker 内的交换机 (Exchange) 这一段路程是否成功。
- 触发时机:无论消息是否成功到达 Exchange,Broker 都会异步地调用生产者的这个回调函数。
- 如何工作:
- 如果 Broker 成功接收消息并将其放入 Exchange,回调中的
ack
参数将为true
。 - 如果 Broker 因故(如内部错误、交换机不存在等)未能接收消息,
ack
参数将为false
,同时cause
参数会提供失败的原因描述。
- 如果 Broker 成功接收消息并将其放入 Exchange,回调中的
- 作用:它回答了问题:“Broker 收到我的消息了吗?”
2. ReturnCallback: 确认消息是否路由到 Queue
这是一个补充机制,处理的是一个更细分的场景。它在 ConfirmCallback
返回成功 (ack=true
) 的前提下才可能被触发。
- 触发时机:当消息已成功到达 Exchange,但 Exchange 无法根据路由键 (Routing Key) 将消息路由到任何一个绑定的队列时,Broker 会将这条“无法投递”的消息退回给生产者,并调用此回调。
- 如何工作:
- 如果消息被正常路由到一个或多个队列,
ReturnCallback
不会被触发。 - 如果消息无法路由(例如,路由键写错,或者没有队列绑定这个路由键),回调函数将被调用,你可以从
ReturnedMessage
参数中获取到被退回的消息内容、路由信息和退回原因。
- 如果消息被正常路由到一个或多个队列,
- 作用:它回答了问题:“我发给 Exchange 的消息,有队列接收它吗?”
- 回调参数:回调函数中有⼀个参数:
ReturnedMessage
包含以下属性
public class ReturnedMessage {//返回的消息对象,包含了消息体和消息属性private final Message message;//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义.private final int replyCode;//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.private final String replyText;//消息被发送到的交换机名称private final String exchange;//消息的路由键,即发送消息时指定的键private final String routingKey;
}
工作流程图
下图清晰地展示了这两种回调机制在消息发送过程中的作用点:
demo演练
接下来,我们通过一个 Spring Boot 项目来演示如何实现发布者确认。
项目结构
一个简单的 Spring Boot 项目结构如下:
此处的
com.example.ackdemo
要改成读者自己的项目路径,包括后面相关代码的路径引入也需要进行对应修改
ack-demo
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ └── publishackdemo
│ │ │ │ ├── RabbitMQConfig.java
│ │ │ │ └── RabbitTemplateConfig.java
│ │ │ │ └── MessageController.java
│ │ │ └── AckDemoApplication.java
│ │ └── resources
│ │ └── application.yml
└── pom.xml
配置发布者确认模式
在 src/main/resources/application.yml
文件中,进行如下配置:
spring:application:name: ackDemorabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated # 开启ConfirmCallback,correlated表示回调时会携带CorrelationDatapublisher-returns: true # 开启ReturnCallback
声明 Exchange 和 Queue
在 config/RabbitMQConfig.java
中声明我们需要的交换机和队列。
注意此处引入的包为
org.springframework.amqp.core
!
package com.example.publishackdemo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration("RabbitMQConfigWithProductACK")
public class RabbitMQConfig {// 此处的常量提取到一个单独的静态类中更好,此处为了方便演式不单独提取public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String CONFIRM_ROUTING_KEY = "key.confirm";@Beanpublic TopicExchange confirmExchange() {return ExchangeBuilder.topicExchange(CONFIRM_EXCHANGE_NAME).durable(true).build();}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding confirmBinding(Queue confirmQueue, TopicExchange confirmExchange) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);}
}
配置 RabbitTemplate 回调
这是核心步骤。我们创建一个配置类,专门用于定制 RabbitTemplate
,并为其设置回调。
package com.example.publishackdemo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Slf4j
@Configuration("RabbitMQConfigWithPublisherAck")
public class RabbitTemplateConfig { @Bean("rabbitTemplate") public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean("confirmRabbitTemplate") // 给这个新的Bean起一个唯一的名字 public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 关键:只为这个新的实例设置回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { String id = (correlationData != null) ? correlationData.getId() : ""; if (ack) { log.info("ConfirmCallback: 消息发送成功!ID: {}", id); } else { log.error("ConfirmCallback: 消息发送失败!ID: {}, 原因: {}", id, cause); } }); // 同样可以设置 ReturnsCallback//rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(returnedMessage -> { log.warn("ReturnsCallback: 消息被退回! Message: {}, ReplyCode: {}, ReplyText: {}, Exchange: {}, RoutingKey: {}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); // 此处可以记录无法路由的消息,用于后续分析或处理 }); return rabbitTemplate; }
}
此处配置两个
RabbitTemplate
的原因?
setConfirmCallback
只能被设置一次,如果直接在Controller
层里面调用的时候声明,那么每次请求都会调用一次设置的代码,会导致第二次之后的请求都会报错,所以需要提取到Config
里面- 在设置
RabbitTemplate
的setReturnsCallback
或者setConfirmCallback
设置之后会全局生效,如果并不需要进行发送确认的生产者也使用了这个Template
那么会导致性能下降等问题,所以创建两个不同的Bean
就是为了在不同情况选择不同的Bean
对象
生产者代码 (Publisher)
修改 MessageController
,增加几个测试接口来模拟不同场景。
package com.example.ackdemo.controller;import com.example.ackdemo.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@Slf4j
@RestController
public class MessageController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate rabbitTemplate;// 1. 测试正常发送@GetMapping("/send/ok")public String sendOkMessage() {String id = UUID.randomUUID().toString();String message = "A correct message.";log.info("Sending message with ID: {}", id);rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY, message, new CorrelationData(id));return "Message sent (OK). Check logs for callback.";}// 2. 测试发送到不存在的 Exchange@GetMapping("/send/bad-exchange")public String sendToBadExchange() {String id = UUID.randomUUID().toString();String message = "Message to a non-existent exchange.";log.info("Sending message with ID: {} to a bad exchange", id);rabbitTemplate.convertAndSend("non-existent-exchange", RabbitMQConfig.CONFIRM_ROUTING_KEY, message, new CorrelationData(id));return "Message sent (Bad Exchange). Check logs for callback.";}// 3. 测试发送到正确的 Exchange,但错误的 Routing Key@GetMapping("/send/bad-routing")public String sendWithBadRoutingKey() {String id = UUID.randomUUID().toString();String message = "Message with a bad routing key.";log.info("Sending message with ID: {} with a bad routing key", id);rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, "wrong.key.123", message, new CorrelationData(id));return "Message sent (Bad Routing). Check logs for callback.";}
}
运行与验证
- 启动应用。
- 验证成功场景:
- 访问
http://localhost:8080/send/ok
。 - 日志打印:你会看到两条日志,说明回调已经正确设置。
- 访问
2025-07-11T19:59:56.682+08:00 INFO 11868 --- [mqDemo] [nio-8080-exec-1] c.d.m.p.PublisherController : Sending message with ID: 679ddb2f-0bfe-4ec6-b77e-4dd44c1612e6
2025-07-11T19:59:56.980+08:00 INFO 11868 --- [mqDemo] [nectionFactory2] c.d.m.p.RabbitTemplateConfig : ConfirmCallback: 消息发送成功!ID: 679ddb2f-0bfe-4ec6-b77e-4dd44c1612e6
- 观察 RabbitMQ 管理界面:
confirm.queue
中会有一条消息。
- 验证交换机失败场景:
- 访问
http://localhost:8080/send/bad-exchange
。 - 日志打印:只会触发
ConfirmCallback
的失败回调。
- 访问
reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'demo', class-id=60, method-id=40)
2025-07-11T20:02:41.784+08:00 ERROR 40760 --- [mqDemo] [nectionFactory3] c.d.m.p.RabbitTemplateConfig : ConfirmCallback: 消息发送失败!ID: bf93981e-f0a7-485d-bddf-cd5aec3e299f, 原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'demo', class-id=60, method-id=40)
- 验证路由失败场景:
- 访问
http://localhost:8080/send/bad-routing
。 - 日志打印:你会看到先触发成功的
ConfirmCallback
(因为消息确实到达了 Exchange) - 但是并没有触发
ReturnsCallback
,为什么?- 因为需要在其方法前添加一行
rabbitTemplate.setMandatory(true);
来开启此功能
- 因为需要在其方法前添加一行
- 访问
2025-07-11T20:13:00.788+08:00 INFO 10852 --- [mqDemo] [nio-8080-exec-1] c.d.m.p.PublisherController : Sending message with ID: 24753c11-7341-4836-8c74-79a0deae8f3b with a bad routing key
2025-07-11T20:13:01.023+08:00 WARN 10852 --- [mqDemo] [nectionFactory2] c.d.m.p.RabbitTemplateConfig : ReturnsCallback: 消息被退回! Message: Message with a bad routing key., ReplyCode: 312, ReplyText: NO_ROUTE, Exchange: confirm.exchange, RoutingKey: wrong.key.123
2025-07-11T20:13:01.023+08:00 INFO 10852 --- [mqDemo] [nectionFactory3] c.d.m.p.RabbitTemplateConfig : ConfirmCallback: 消息发送成功!ID: 24753c11-7341-4836-8c74-79a0deae8f3b
生产环境注意事项
-
为失败做好准备:收到
nack
或return
回调后,必须有相应的补偿机制。常见的策略包括:- 有限重试:对于网络抖动等临时性故障,可以进行几次延时重试。
- 记录日志与告警:对于持续失败或逻辑错误(如错误的Exchange/RoutingKey),应详细记录日志,并触发告警通知开发人员介入。
- 消息入库:将发送失败的消息存入数据库或本地文件,通过定时任务进行重发,这是最可靠的补偿方式。
-
善用
CorrelationData
:在异步高并发场景下,CorrelationData
是你识别哪条消息得到确认的唯一凭证。它的 ID 应该具有业务唯一性(如订单ID、业务流水号),以便于追踪和排错。 -
性能权衡:开启发布者确认会增加网络开销和 Broker 的 CPU 负担,从而降低消息发送的吞吐量。对于可以容忍少量丢失的非核心业务(如打点日志),可以关闭此功能以追求性能。
-
全局回调 vs. 单次发送回调:我们演示的是全局配置
RabbitTemplate
的回调。RabbitTemplate
也支持为单次send
操作指定一个临时的CorrelationData
,它内部可以包含更丰富的回调逻辑,适用于需要对特定消息进行特殊处理的场景。 -
构建完整的可靠性链路:切记,发布者确认只是可靠性拼图的一部分。一个完整的可靠性方案必须是:发布者确认 + 持久化(交换机、队列、消息)+ 消费者确认。三者结合,才能最大限度地保证消息在整个生命周期内的安全。
总结
RabbitMQ 的发布者确认机制,通过 ConfirmCallback
和 ReturnCallback
两个强大的工具,为我们弥补了消息从生产者到 Broker 这一段路程中的可靠性盲区。