MQ 消息持久化方案
MQ 消息持久化方案
1. RabbitMQ 发送与消费消息的模型

2. 消息丢失的几种情况?
- 生产者发送消息未到达交换机
- 消息到达交换机,没有正确路由到队列
- MQ 宕机,队列中的消息不见了
- 消费者收到消息,还没消费,消费者宕机
3. 如何保证消息不丢失?
3.1 生产者确认机制
-
publisher-confirm
:-
消息成功投递到交换机,返回 ack
-
消息未成功投递到交换机,返回 nack
记录消息以及交换机等相关信息到数据库,后期可以编写任务去补偿发送
-
-
publisher-return
:-
未正确到达队列,返回 ack 及失败原因
记录消息以及交换机等相关信息到数据库,后期可以编写任务去补偿发送
-
图示

实现
根据实际情况修改!!!
-
配置文件
spring: rabbitmq: host: 192.168.200.130 # 虚拟机 IP port: 5672 # 端口 virtual-host: / # MQ 的虚拟主机 username: username password: password publisher-confirm-type: correlated publisher-returns: true # 开启 publisher-returns template: mandatory: true
参数说明:
publish-confirm-type
:开启publisher-confirm
none
:关闭 confirm 机制simple
:同步阻塞等待 MQ 的回执(回调方法)correlated
:MQ 异步回调返回回执
template.mandatory
:定义消息路由失败时的策略。- true:调用 ReturnCallback
- false:则直接丢弃消息
-
定义 ConfirmCallback
ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同。
public void testSendMessage2SimpleQueue() throws InterruptedException { // 1 消息体 String message = "hello, spring amqp!"; // 2 全局唯一的消息 ID,需要封装到 CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3 添加 callback correlationData.getFuture().addCallback( result -> { if(result.isAck()) { log.debug("消息发送成功, ID:{}", correlationData.getId()); } else { log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason()); } }, ex -> log.error("消息发送异常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage()) ); // 4 发送消息 rabbitTemplate.convertAndSend("", "simple.queue", message, correlationData); // 休眠一会儿,等待 ack 回执 Thread.sleep(2000); }
-
定义 Return 回调
每个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目加载时配置。
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取 RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 设置 ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 投递失败,记录日志 log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有业务需要,可以重发消息 }); } }
3.2 持久化机制
-
交换机持久化:
默认就是持久化,durable 默认就是 true
-
队列持久化
默认就是持久化,durable 默认就是true
-
消息持久化
默认就是持久化。在发送消息时,使用 Message 对象,并设置 delivery-mode 为持久化
3.3 消费者 ack 机制

ack 取值情况:
-
none:只要消息到达消费者,消费者直接返回 ack 给 MQ
MQ 收到 ack,会把队列中的消息删除,消息可能会丢失
-
消费者配置
spring: rabbitmq: listener: simple: acknowledge-mode: none # 关闭 ack
-
-
manual:手动 ack
- 消费成功,调用 API 给 MQ 返回 ack
- 消费失败,调用 API 给 MQ 返回 nack,并且让消息重回队列
消费者配置
spring: rabbitmq: listener: simple: acknowledge-mode: manual # 手动 ack
测试代码:
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) { try { // 从 redis 获取一个 retry_count >= 3 直接记录日志,不重回队列,中断操作 return log.warn("消费者接收到 simple.queue 的消息:{}", msg); int i = 1 / 0; log.info("消息成功消费了 ---> SUCCESS"); // 手动 ack // 可以使用 org.springframework.amqp.core.Messagee 拿到 deLiveryTag channel.basicAck(deliveryTag, false); } catch (Exception e) { e.printStackTrace(); try { // 返回 nack,并且让消息重回队列 channel.basicNack(deliveryTag, false, true); Thread.sleep(1000); log.error("消息消费失败,重回队列-->"); // 向 redis 中设置值 // redisTemplate.opsForValue().incr(retry_count) } catch (Exception ex) { ex.printStackTrace(); } } }
-
auto:自动 ack。消费消息不出异常,返回 ack 给 MQ。消费消息出异常了,返回 nack,把消息重回队列
-
本地重试
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初始的失败等待时长为1秒 multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true 无状态;false 有状态。如果业务中包含事务,这里改为 false
达到重试次数后,还是失败,则返回 ack,不 requeue。MQ 会删除队列消息
-
失败策略
RejectAndDontRequeueRecoverer
:重试耗尽后,直接 reject,丢弃消息。默认方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回 nack,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
-
使用 RepublishMessageRecoverer
需求:把消息投递到失败的交换机,路由队列。记录日志,将来人工干预
实现
- 定义错误交换机、队列、绑定关系。定义 RepublishMessageRecoverer
- 监听错误队列
-
4. 总结
-
创建交换机、队列、消息进行持久化
-
交换机、队列默认就是持久化的
-
消息持久化
-
-
生产者开启确认机制
- 开启消息发送失败的重试策略
- 设置重试次数和重试间隔比例
- 耗尽重试次数后,依旧失败,记录失败消息到数据库失败消息表,用于后期执行补偿错误。如使用定时任务去扫描这个表,重新发送消息
- 开启 confirm 机制:保证消息正确到达交换机
- 返回 ack,正确到达
- 返回 nack,没有到达交换机,写入数据库,后期重试
- 开启 return 机制
- 保证消息正确到达队列
- 没有到达队列,会调用ReturnCallback,写入数据库,后期重试
- 开启消息发送失败的重试策略
-
消费者确认机制
-
开机自动确认机制
-
开启重试策略
重试次数耗尽后,定义RepublishMessageRecoverer策略来让消息路由到错误队列,落库
-
