RabbitMQ高级特性——消息确认、持久性、发送方确认、重试
目录
一、消息确认
1.1消息确认机制
1.2手动确认方法
1. basicAck - 确认单条消息处理成功
2. basicNack - 否定确认(消息处理失败)
3.basicReject - 否定确认(拒绝单条消息)
不同确认方式的对比
1.3代码示例
1.3.1主要流程:
二、持久性
2.1交换机持久化
2.2队列持久化
2.3消息持久化
三、发送方确认
3.1Confirm确认模式
3.2return退回模式
基本概念
常见面试题
四、重试机制
4.1步骤如下:
4.2重试注意事项
一、消息确认
1.1消息确认机制
生产者发送消息之后,到达消费端之后,可能出现以下情况:
- 消息处理成功
- 消息处理异常
RabbitMQ向消费者发送消息之后,就会把该消息删除掉,那么第二种情况(消息处理异常),就会造成消息丢失
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制
消息确认机制分为以下两种:
- 自动确认:当autoAck为true时,RabbitMQ会自动认为已成功发送消息,并从内存/磁盘中删除该消息,而不管消费者是否真正的消费到了这些消息,自动确认模式适合对于消息可靠性要求不高的场景
- 手动确认:当autoAck为false时,RabbitMQ会等待消费者显式调用basic.Ack命令,收到确认消息后才从内存/磁盘中移出消息,这种模式适合对消息可靠性要求比较高的场景
当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:
- 是等待投递给消费者的消息
- 是已经投递给消费者,但是还没有收到消费者确认信号的消息
如果RabbitMQ一直没有收到消费者的确认应答信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,也有可能还是原来的消费者
1.2手动确认方法
当在 RabbitMQ 中将 autoAck
参数设置为 false
时,消费者需要手动发送确(acknowledgement)来告知 RabbitMQ 消息已成功处理。以下是主要的手动确认方法(三种):
1. basicAck
- 确认单条消息处理成功
channel.basicAck(long deliveryTag, boolean multiple);
-
参数:
-
deliveryTag
: 消息的唯一标识ID(64位长整型) -
multiple
: 是否批量确认-
false
: 只确认当前消息 -
true
: 确认所有比当前deliveryTag小的未确认消息
-
-
2. basicNack
- 否定确认(消息处理失败)
Channel.basicReject(long deliveryTag,boolean multiple, boolean requeue)
-
参数:
-
deliveryTag
: 消息的唯一标识ID -
multiple
: 是否批量否定确认 -
requeue
: 是否重新入队-
true
: 消息重新放回队列(可能被其他消费者或自己再次获取) -
false
: 消息会被丢弃或进入死信队列(如果配置了DLX)
-
-
3.basicReject
- 否定确认(拒绝单条消息)
Channel.basicReject(long deliveryTag, boolean requeue)
requeue: 表⽰拒绝后, 这条消息如何处理. 如果requeue 参数设置为true, 则RabbitMQ会重新将这条 消息存⼊队列,以便可以发送给下⼀个订阅的消费者. 如果requeue参数设置为false, 则RabbitMQ会把 消息从队列中移除, ⽽不会把它发送给新的消费者
不同确认方式的对比
方法 | 批量处理 | 重新入队选项 | 备注 |
---|---|---|---|
basicAck | 支持 | - | 确认成功处理 |
basicNack | 支持 | 支持 | 更灵活的拒绝方式 |
basicReject | 不支持 | 支持 | 只能拒绝单条消息 |
1.3代码示例
Spring—AMQP对消息确认机制提供了三种策略:
public enum AcknowledgeMode{NONE,MANUAL,AUTO;
}
1.AcknowledgeMode.NONE
- 这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认 消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失.
2.AcknowledgeMode.AUTO(默认)
- 这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确认消息.
3.AcknowledgeMode.MANUAL
- ⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这 种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被 重新处理.
1.3.1主要流程:
- 配置确认机制(⾃动确认/⼿动机制)
- ⽣产者发送消息
- 消费端逻辑
- 测试
1.配置确认机制
配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
2.发送消息
3.编写消费端逻辑
4.运行程序(测试)
二、持久性
- 持久化是RabbitMQ可靠性保证机制之一,前面了解一下消费端处理消息时,消息如何不丢失,但是该如何保证生产者发送的消息不丢失呢
- RabbitMQ的持久化分为三个部分:交换机的持久化、队列的持久化和消息的持久化
2.1交换机持久化
- 交换机的持久化是通过在声明交换机时将durable参数设置为true(交换机默认就是持久化的),相当于将交换机的属性在服务器内部保存,后续服务器发生意外或者关闭后,重启RabbitMQ时不在需要重新创建交换机啦,交换机会自动建立,相当于一直存在
ExchangeBuilder. topicExchange(ExCHANGE_NAME) .durable(true) .build()
2.2队列持久化
- 队列的持久化是通过在声明队列时将durable参数设置为true(队列默认就是持久化的)
- 队列的持久化能保证队列本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不回丢失,要确保消息不会丢失,需要将消息设置为持久化
QueueBuilder.durable(Constant.ACK_QUEUE).build();
也可以将队列设置为非持久化
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3消息持久化
消息要实现持久化,需要把消息投递模式(MessageProperties)中的(deliveryMode)设置为2,
public enum MessageDeliveryMode {NON_PERSISTENT,//非持久化PERSISTENT;//持久化
}
注意⚠️:
消息是存储在队列中的,所以消息的持久化,需要队列持久化+消息持久化
- 如果只设置了队列持久化,MQ重启后,消息会丢失
- 如果只设置消息持久化,MQ重启后,队列会丢失,消息也随之消失
- 如果将所有消息都设置为持久化,会严重影响RabbitM Q的性能,导致写入磁盘的速度比写入内存的速度慢得不只一点点,需要根据实际结果来选择是否将消息持久化
三、发送方确认
发送方确认(Publisher Confirms)是 RabbitMQ 提供的一种可靠消息投递机制,用于确保消息已成功到达服务器。这是比事务更轻量级的解决方案。
在使用RabbitMQ时候,可以通过消息持久化 来解决因为服务器的异常崩溃而导致的消息丢失,但是还有一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果在消息到达服务器以前已经丢失(比如MQ重启),持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决方案:
- 通过事务机制实现
- 通过发送方确认(publisher confirm)机制实现
主要了解confirm机制来实现发送方的确认
Rabbitm MQ提供了两个方式来控制消息的可靠性投递
- confirm确认模式
- return退回模式
3.1Confirm确认模式
生产者(producer)在发送消息的时候,对发送端设置一个ConfirmCallback的监听,无论消息是否有到达Exchange,这个监听都会被执行,如果Exchange成功收到,ACK为true,如果没有收到消息,ACK就为false
步骤如下:
- 配置RabbitMQ
- 设置确认回调逻辑并发送消息
1.配置RabbitMQ
spring:rabbitmq:addresses: amqp: //admin:admin@8.140.60.17:15672/listener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确
2.设置确认回调逻辑并发送消息
# 生产者代码
@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);return "消息发送成功";}
}@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});//消息被退回时, 回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}
3.2return退回模式
Return 退回模式是 RabbitMQ 提供的另一种可靠性机制,用于处理消息从 Exchange 路由到 Queue 失败的情况。当消息无法被正确路由时,RabbitMQ 会将消息退回给生产者。
基本概念
-
与 Confirm 模式的区别:
-
Confirm 模式:确认消息是否到达 Exchange
-
Return 模式:处理消息从 Exchange 路由到 Queue 失败的情况
-
-
触发条件:
-
Exchange 不存在
-
Exchange 与 Queue 之间没有绑定匹配的路由键
-
mandatory 参数设置为 true
-
步骤如下:
- 配置RabbitMQ
- 设置返回回调逻辑并发送消息
1.配置RabbitMQ
spring:rabbitmq:addresses: amqp: //admin:admin@8.140.60.17:15672/listener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确
2.设置返回回调逻辑并发送消息(结合confirm)
# 生产者代码
@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "returns test...", correlationData);return "消息发送成功";}
}@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});//消息被退回时, 回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}
常见面试题
如何保证RabbitMQ消息的可靠传输?

问题环节 | 问题描述 | 可能原因 | 解决方案 |
---|---|---|---|
生产者到Broker | 消息未能到达RabbitMQ服务器 | 网络中断、Broker宕机、生产者崩溃 | 1. 启用Confirm确认模式 2. 实现发送重试机制 3. 持久化未确认消息 |
Exchange到Queue | 消息到达Exchange但无法路由到任何Queue | 路由键错误、绑定关系缺失、目标队列不存在 | 1. 启用Return退回模式(mandatory=true) 2. 配置死信队列 3. 加强绑定关系验证 |
Broker存储 | Broker宕机导致消息丢失 | 未持久化消息、磁盘故障、集群节点失效 | 1. 全面持久化(交换机/队列/消息) 2. 配置镜像队列 3. 定期备份元数据 |
消费者处理 | 消息被获取但未成功处理 | 消费者崩溃、业务逻辑异常、自动确认模式下提前确认 | 1. 使用手动ACK模式 2. 实现消费重试机制 3. 保证消费逻辑幂等性 |
四、重试机制
在消息传递过程中,可能会遇到各种问题,如网络故障,服务不可用,资源不足等,这些问题可能导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数
4.1步骤如下:
- 配置RabbitMQ
- 配置交换机与队列
- 编写生产者
- 编写消费者
- 测试结果
1.配置RabbitMQ
spring:rabbitmq:addresses: amqp://admin:admin@8.140.60.17:15672/listener:simple:acknowledge-mode: auto #消息接收确认 retry:enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 初始失败等待时⻓为5秒 max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)
2.配置交换机与队列
/** 重试机制*/@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue,@Qualifier("retryExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("retry");}
3.编写生产者
/** 重试机制*/@RequestMapping("/retry")public String retry(){rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");return "消息发送成功";}
4.编写消费者
@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 10/0;System.out.println("业务处理完成");}
}
5.测试结果:
在重试设置的次数之后,还未成功发送消息就会抛出异常,可以手动处理异常
@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3/0;System.out.println("业务处理完成");channel.basicAck(deliveryTag, false);}catch (Exception e){channel.basicNack(deliveryTag, false, true);}}
}
测试结果:
发现手动处理完异常就不会在重试
4.2重试注意事项
1. 自动确认模式 : 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失 了
2. 手动确认模式:程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是 unacked的状态, 导致消息积压