当前位置: 首页 > news >正文

RabbitMQ高级特性——消息确认、持久性、发送方确认、重试

目录

一、消息确认

1.1消息确认机制

1.2手动确认方法

1. basicAck - 确认单条消息处理成功

2. basicNack - 否定确认(消息处理失败)

3.basicReject - 否定确认(拒绝单条消息)

不同确认方式的对比

1.3代码示例

1.3.1主要流程:

二、持久性

2.1交换机持久化

2.2队列持久化

2.3消息持久化

三、发送方确认

3.1Confirm确认模式

3.2return退回模式

基本概念

常见面试题

四、重试机制

4.1步骤如下:

4.2重试注意事项


一、消息确认

1.1消息确认机制

生产者发送消息之后,到达消费端之后,可能出现以下情况:

  • 消息处理成功
  • 消息处理异常

RabbitMQ向消费者发送消息之后,就会把该消息删除掉,那么第二种情况(消息处理异常),就会造成消息丢失

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制

消息确认机制分为以下两种:

  • 自动确认:当autoAcktrue时,RabbitMQ会自动认为已成功发送消息,并从内存/磁盘中删除该消息,而不管消费者是否真正的消费到了这些消息,自动确认模式适合对于消息可靠性要求不高的场景
  • 手动确认:当autoAckfalse时,RabbitMQ会等待消费者显式调用basic.Ack命令,收到确认消息后才从内存/磁盘中移出消息,这种模式适合对消息可靠性要求比较高的场景

当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:

  1. 是等待投递给消费者的消息
  2. 是已经投递给消费者,但是还没有收到消费者确认信号的消息

如果RabbitMQ一直没有收到消费者的确认应答信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者,也有可能还是原来的消费者

1.2手动确认方法

当在 RabbitMQ 中将 autoAck 参数设置为 false 时,消费者需要手动发送确(acknowledgement)来告知 RabbitMQ 消息已成功处理。以下是主要的手动确认方法(三种):

1. basicAck - 确认单条消息处理成功

channel.basicAck(long deliveryTag, boolean multiple);
  • 参数

    • deliveryTag: 消息的唯一标识ID(64位长整型)

    • multiple: 是否批量确认

      • false: 只确认当前消息

      • true: 确认所有比当前deliveryTag小的未确认消息

2. basicNack - 否定确认(消息处理失败)

Channel.basicReject(long deliveryTag,boolean multiple, boolean requeue)
  • 参数

    • deliveryTag: 消息的唯一标识ID

    • multiple: 是否批量否定确认

    • requeue: 是否重新入队

      • true: 消息重新放回队列(可能被其他消费者或自己再次获取)

      • false: 消息会被丢弃或进入死信队列(如果配置了DLX)

3.basicReject - 否定确认(拒绝单条消息)

Channel.basicReject(long deliveryTag, boolean requeue)
requeue: 表⽰拒绝后, 这条消息如何处理. 如果requeue 参数设置为true, 则RabbitMQ会重新将这条 消息存⼊队列,以便可以发送给下⼀个订阅的消费者. 如果requeue参数设置为false, 则RabbitMQ会把 消息从队列中移除, ⽽不会把它发送给新的消费者

不同确认方式的对比

方法批量处理重新入队选项备注
basicAck支持-确认成功处理
basicNack支持支持更灵活的拒绝方式
basicReject不支持支持只能拒绝单条消息

1.3代码示例

基于SpringBoot来演⽰消息的确认机制, 使⽤⽅式和使⽤RabbitMQ Java Client 库有⼀定差异

Spring—AMQP对消息确认机制提供了三种策略:

public enum AcknowledgeMode{NONE,MANUAL,AUTO;
}
1.AcknowledgeMode.NONE
  • 这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认 消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失.
2.AcknowledgeMode.AUTO(默认)
  • 这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确认消息.
3.AcknowledgeMode.MANUAL
  • ⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这 种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被 重新处理.

1.3.1主要流程:

  1.  配置确认机制(⾃动确认/⼿动机制)
  2. ⽣产者发送消息
  3. 消费端逻辑
  4.  测试

1.配置确认机制

配置确认机制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none

2.发送消息

3.编写消费端逻辑

4.运行程序(测试)

二、持久性

  •  持久化是RabbitMQ可靠性保证机制之一,前面了解一下消费端处理消息时,消息如何不丢失,但是该如何保证生产者发送的消息不丢失呢
  • RabbitMQ的持久化分为个部分:交换机的持久化队列的持久化消息的持久化

2.1交换机持久化

  • 交换机的持久化是通过在声明交换机时将durable参数设置为true交换机默认就是持久化的),相当于将交换机的属性在服务器内部保存,后续服务器发生意外或者关闭后,重启RabbitMQ时不在需要重新创建交换机啦,交换机会自动建立,相当于一直存在
ExchangeBuilder. topicExchange(ExCHANGE_NAME) .durable(true) .build()

2.2队列持久化

  • 队列的持久化是通过在声明队列时将durable参数设置为true(队列默认就是持久化的
  • 队列的持久化能保证队列本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不回丢失,要确保消息不会丢失,需要将消息设置为持久化
QueueBuilder.durable(Constant.ACK_QUEUE).build();

也可以将队列设置为非持久化

QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();

2.3消息持久化

消息要实现持久化,需要把消息投递模式(MessageProperties)中的(deliveryMode)设置为2,

public enum MessageDeliveryMode {NON_PERSISTENT,//非持久化PERSISTENT;//持久化
}

注意⚠️

消息是存储在队列中的,所以消息的持久化,需要队列持久化+消息持久化

  • 如果只设置了队列持久化,MQ重启后,消息会丢失
  • 如果只设置消息持久化,MQ重启后,队列会丢失,消息也随之消失
  • 如果将所有消息都设置为持久化,会严重影响RabbitM Q的性能,导致写入磁盘的速度比写入内存的速度慢得不只一点点,需要根据实际结果来选择是否将消息持久化

三、发送方确认

发送方确认(Publisher Confirms)是 RabbitMQ 提供的一种可靠消息投递机制,用于确保消息已成功到达服务器。这是比事务更轻量级的解决方案。

在使用RabbitMQ时候,可以通过消息持久化 来解决因为服务器的异常崩溃而导致的消息丢失,但是还有一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果在消息到达服务器以前已经丢失(比如MQ重启),持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ为我们提供了两种解决方案:

  1. 通过事务机制实现
  2. 通过发送方确认(publisher confirm)机制实现

主要了解confirm机制来实现发送方的确认

Rabbitm MQ提供了两个方式来控制消息的可靠性投递

  1. confirm确认模式
  2. return退回模式

3.1Confirm确认模式

生产者(producer)在发送消息的时候,对发送端设置一个ConfirmCallback的监听,无论消息是否有到达Exchange,这个监听都会被执行,如果Exchange成功收到,ACK为true,如果没有收到消息,ACK就为false

步骤如下

  1. 配置RabbitMQ
  2. 设置确认回调逻辑并发送消息

1.配置RabbitMQ

spring:rabbitmq:addresses: amqp: //admin:admin@8.140.60.17:15672/listener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确

2.设置确认回调逻辑并发送消息


# 生产者代码
@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);return "消息发送成功";}
}@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});//消息被退回时, 回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}

3.2return退回模式

Return 退回模式是 RabbitMQ 提供的另一种可靠性机制,用于处理消息从 Exchange 路由到 Queue 失败的情况。当消息无法被正确路由时,RabbitMQ 会将消息退回给生产者。

基本概念

  1. 与 Confirm 模式的区别

    • Confirm 模式:确认消息是否到达 Exchange

    • Return 模式:处理消息从 Exchange 路由到 Queue 失败的情况

  2. 触发条件

    • Exchange 不存在

    • Exchange 与 Queue 之间没有绑定匹配的路由键

    • mandatory 参数设置为 true

步骤如下

  1. 配置RabbitMQ
  2. 设置返回回调逻辑并发送消息

1.配置RabbitMQ

spring:rabbitmq:addresses: amqp: //admin:admin@8.140.60.17:15672/listener:simple:acknowledge-mode: manual #消息接收确认publisher-confirm-type: correlated #消息发送确

2.设置返回回调逻辑并发送消息(结合confirm)


# 生产者代码
@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,         "confirm111", "returns test...", correlationData);return "消息发送成功";}
}@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});//消息被退回时, 回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}

常见面试题

如何保证RabbitMQ消息的可靠传输?


这是⼀张RabbitMQ消息传递图
问题环节问题描述可能原因解决方案
生产者到Broker消息未能到达RabbitMQ服务器网络中断、Broker宕机、生产者崩溃1. 启用Confirm确认模式
2. 实现发送重试机制
3. 持久化未确认消息
Exchange到Queue消息到达Exchange但无法路由到任何Queue路由键错误、绑定关系缺失、目标队列不存在1. 启用Return退回模式(mandatory=true)
2. 配置死信队列
3. 加强绑定关系验证
Broker存储Broker宕机导致消息丢失未持久化消息、磁盘故障、集群节点失效1. 全面持久化(交换机/队列/消息)
2. 配置镜像队列
3. 定期备份元数据
消费者处理消息被获取但未成功处理消费者崩溃、业务逻辑异常、自动确认模式下提前确认1. 使用手动ACK模式
2. 实现消费重试机制
3. 保证消费逻辑幂等性

四、重试机制

在消息传递过程中,可能会遇到各种问题,如网络故障,服务不可用,资源不足等,这些问题可能导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败后重新发送.
但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数


4.1步骤如下

  1. 配置RabbitMQ
  2. 配置交换机与队列
  3. 编写生产者
  4. 编写消费者
  5. 测试结果

1.配置RabbitMQ

spring:rabbitmq:addresses: amqp://admin:admin@8.140.60.17:15672/listener:simple:acknowledge-mode: auto #消息接收确认 retry:enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 初始失败等待时⻓为5秒 max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次) 

2.配置交换机与队列

    /** 重试机制*/@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue,@Qualifier("retryExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("retry");}

3.编写生产者

    /** 重试机制*/@RequestMapping("/retry")public String retry(){rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");return "消息发送成功";}

4.编写消费者

@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 10/0;System.out.println("业务处理完成");} 
}

5.测试结果:

在重试设置的次数之后,还未成功发送消息就会抛出异常,可以手动处理异常

@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3/0;System.out.println("业务处理完成");channel.basicAck(deliveryTag, false);}catch (Exception e){channel.basicNack(deliveryTag, false, true);}}
}

测试结果:

发现手动处理完异常就不会在重试

4.2重试注意事项

1. 自动确认模式 : 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失 

2. 手动确认模式:程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是 unacked的状态, 导致消息积压

http://www.dtcms.com/a/330810.html

相关文章:

  • 解锁Prompt秘籍:框架、技巧与指标全解析
  • 基于Django的福建省旅游数据分析与可视化系统【城市可换】
  • 《量子雷达》第4章 量子雷达的检测与估计 预习2025.8.14
  • 【51单片机学习】定时器、串口、LED点阵屏、DS1302实时时钟、蜂鸣器
  • 量子人工智能
  • Python训练营打卡Day32-神经网络的训练
  • Swift 数据类型全景解析(基础到高阶)
  • 按位运算的枚举在 Swift 里如何实现?
  • 《吃透 C++ 类和对象(中):拷贝构造函数与赋值运算符重载深度解析》
  • 【数据分享】2014-2023年长江流域 (0.05度)5.5km分辨率的每小时日光诱导叶绿素荧光SIF数据
  • Pytest自动化测试框架总结
  • iOS性能监控新方法多版本对比与趋势分析实战指南
  • C++进阶:特殊类
  • 手写MyBatis第16弹:泛型魔法应用:MyBatis如何破解List的运行时类型
  • 笔试——Day38
  • 根据图片远程地址复制图片内容,可以在富文本、word等文本里粘贴
  • word——删除最后一页空白页
  • Exif.js获取手机拍摄照片的经纬度
  • 【网络】TCP/UDP总结复盘
  • Unity人形角色IK优化指南
  • AI搜索优化专家孟庆涛:以技术温度重构“人机信息对话”新范式
  • 手机实时提取SIM卡打电话的信令声音-当前现状与思考
  • CICD-DevOps进阶-2
  • 提升工作效率的利器:GitHub Actions Checkout V5
  • 多种适用于 MCU 固件的 OTA 升级方案
  • Qt基本控件
  • 飞算JavaAI金融风控场景实践:从实时监测到智能决策的全链路安全防护
  • 西门子TIA-FOR循环多路PID控制器(PID_Compact)
  • VirtualBox虚拟机Ubuntu18.04安装hdl_localization保姆级教程
  • 【自动化运维神器Ansible】template模块深度解析:动态配置文件生成的艺术