当前位置: 首页 > news >正文

【RabbitMQ】高级特性—发送方确认详解

文章目录

  • 发送方确认
    • confirm 确认模式
      • 1. 配置 RabbitMQ
      • 2. 设置确认回调逻辑并发送消息
      • 3. 测试
      • 完整代码
    • return 退回模式
      • 1. 配置 RabbitMQ
      • 2. 设置返回回调逻辑并发送消息
      • 3. 测试
    • 常见面试题
      • 如何保证 RabbitMQ 消息的可靠传输

发送方确认

在使用 RabbitMQ 的时候,可以通过消息持久化来解决因为服务器的一次崩溃而导致的消息丢失

但是还有一个问题,当消息的生产者将消息发送初期之后,消息到底有没有正确地到达服务器呢?

  • 如果在消息到达服务器之前已经丢失(比如 RabbitMQ 重启,那么 RabbitMQ 重启期间生产者消息投递失败),持久化操作也解决不了这个问题,因为消息根被没有到达服务器,何谈持久化?

RabbitMQ 为我们提供了两种解决方案:

  1. 通过事务机制实现
  2. 通过发送方确认(publisher confirm)机制实现

事务机制比较消耗性能,在实际工作中使用也不多,我们主要极少 confirm 机制来实现发送方的确认。

RabbitMQ 为我们提供了两个方式来控制消息的可靠性投递

  1. confirm 确认模式
  2. return 返回模式

confirm 确认模式

Producer 在发送消息的时候,对发送端设置一个 ConfirmCallback 的监听,无论消息是否到达 Exchange,这个监听都会被执行

  • 如果 Exchange 成功收到,ACKAcknowledge character,确认字符)为 true
  • 如果没有收到消息,ACK 就为 false

步骤如下:

  1. 配置 RabbitMQ
  2. 设置确认回调逻辑并发送消息
  3. 测试

接下来看实现步骤:

1. 配置 RabbitMQ

配置确认机制
spring:  rabbitmq:  addresses: amqp://guest:guest@127.0.0.1:5672/coding  listener:  simple:  acknowledge-mode: manual  # 消息接收确认publisher-confirm-type: correlated  # 消息发送确认

2. 设置确认回调逻辑并发送消息

无论消息确认成功还是失败,都会调用 ConfirmCallbackconfirm 方法。

  • 如果消息成功发送到 Brokeracktrue
  • 如果消息发送失败,ackfalse,并且 cause 提供失败的原因
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  @Override  public void confirm(CorrelationData correlationData, boolean ack, String cause) {  System.out.println("");  if (ack) {  System.out.printf("消息接收成功,id: %s \n", correlationData.getId());  } else {  System.out.printf("消息接收失败,id: %s \n", correlationData.getId());  }  }  });  return rabbitTemplate;  
}
  • ConfirmCallback() 里面的 confirm(@Nullable CorrelationData, boolean ack, @Nullables String cause)
    • 确认回调
    • correlationDate:发送消息时的附加信息,通常用于在确认回调中识别特定的消息
    • ack:交换机是否收到消息,收到为 true,未收到为 false
    • cause:当消息确认失败时,这个字符串参数将提供失败的原因,这个原因可以用于调试和错误处理。成功时,causenull
@Resource(name = "confirmRabbitTemplate")  
private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")  
public String confirm() {  CorrelationData correlationData = new CorrelationData("1");  confirmRabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "confirm", "confirm test...", correlationData);  return "确认成功";  
}

RabbitTemplate.ConfirmCallbackConfirmListener 区别
RabbitMQ 中,ConfirmListenerConfirmCallback 都是用来处理消息确认的机制,但他们属于不同的客户端库,并且使用的场景和方式有所不同

  1. ConfirmListenerRabbitMQ Java Client 库中的接口。这个库是 RabbitMQ 官方提供的一个直接于 RabbitMQ 服务器交互的客户端库。ConfirmListener 接口提供了两个方法:handleAckhandleNack,用于处理消息确认好否定确认的事件
  2. ConfirmCallbackSpring AMQP 框架中的一个接口。专门为 Spring 环境设计,用于简化与 RabbitMQ 交互的过程,它只包含一个 confirm 方法,用于处理消息确认的回调
    SpringBoot 应用中,通常会使用 ConfirmCallback,因为它与 Spring 框架的其他部分更加整合,可以利用 Spring 的配置和依赖注入功能。而在 RabbitMQ Java Client 库时,则可能会直接实现 ConfirmListener 接口,更直接的与 RabbitMQChannel 交互

3. 测试

运行程序,调用接口: http://127.0.0.1:8080/producer/confirm

观察控制台,消息确认成功:image.png|254

接下来把交换机名称改一下,重新运行,会触发另一个结果

// 发送失败
confirmRabbitTemplate.convertAndSend("confirm_exchange1", "confirm", "confirm test...", correlationData);

运行结果:image.png

  • 原因中,明确显示 “no exchange 'confirm_exchange1' in vhost 'bite' “,也就是说,bite 这个虚拟机,没有名字为 confirm_exchange1 的交换机

完整代码

public class Constant {  public static final String ACK_EXCHANGE_NAME = "ack_exchange";  public static final String ACK_QUEUE = "ack_queue";  
}
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Bean("rabbitTemplate")public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.printf("");if (ack){System.out.printf("消息接收成功, id:%s \n", correlationData.getId());} else {System.out.printf("消息接收失败, id:%s, cause: %s", correlationData.getId(), cause);}}});return rabbitTemplate;}
}
import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqConfig {//publish-confirm模式//1. 交换机@Bean("confirmExchange")public Exchange confirmExchange() {return ExchangeBuilder.topicExchange(Constant.CONFIRM_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmExchange") Exchange exchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}
}
package com.bite.rabbitmq.controller;import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/product")
public class ProductController {@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedException {CorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,"confirm", "confirm test...", correlationData1);//发送失败// confirmRabbitTemplate.convertAndSend("confirm_exchange1", "confirm", "confirm test...", correlationData1);Thread.sleep(2000);return "确认成功";}
}

return 退回模式

消息到达 Exchange 之后,会根据路由规则匹配,把消息放入 Queue 中,ExchangeQueue 的过程:

  • 如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者
  • 消息返回发送者时,我们可以设置一个返回回调方法,对消息进行处理

步骤如下:

  1. 配置 RabbitMQ
  2. 设置返回回调逻辑并发送信息
  3. 测试

接下来看实现步骤

1. 配置 RabbitMQ

# 配置确认机制
spring:  rabbitmq:  addresses: amqp://guest:guest@127.0.0.1:5672/coding  listener:  simple:  acknowledge-mode: manual  # 消息接收确认publisher-confirm-type: correlated  # 消息发送确认

2. 设置返回回调逻辑并发送消息

  • 消息无法被路由到人恶化队列,它将返回给发送者,这时 setReturnCallback 设置的回调将被触发
/**  * retrun 返回模式  */  
@Primary  
@Bean("confirmRabbitTemplate2")  
public RabbitTemplate confirmRabbitTemplate2(CachingConnectionFactory connectionFactory) {  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  rabbitTemplate.setMandatory(true);  rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {  @Override  public void returnedMessage(ReturnedMessage returnedMessage) {  System.out.printf("消息被退回:%s", returnedMessage);  }  });  return rabbitTemplate;  
}
@Resource(name = "confirmRabbitTemplate2")  
private RabbitTemplate confirmRabbitTemplate2;@RequestMapping("/msgReturn")  
public String msgReturn() {  CorrelationData correlationData = new CorrelationData("2");  confirmRabbitTemplate2.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm1", "message return test...", correlationData);  return "消息发送成功";  
}
  • 使用 RabbitTemplatesetMandatory 方法设置消息的 mandatory 属性为 true(默认为 false
  • 这个属性的作用是告诉 RabbitMQ,如果一条消息无法被任何队列消费,RabbitMQ 应该将消息返回给发送者,此时 ReturnCallback 就会被触发

回调函数中,有一个参数:ReturnedMessage,包含以下属性

public class ReturnedMessage {  // 返回的消息对象,包含了消息体和消息属性private final Message message;  // 由 Broker 提供的回复码,表示消息无法路由的原因,通常是一个数字代码,每个数字代表不同的含义private final int replyCode;// 一个文本字符串,提供了无法路由消息的额外信息或错误描述  private final String replyText;  // 消息被发送到的交换机名称private final String exchange;// 消息的路由键,即发送消息时指定的键  private final String routingKey;
}

3. 测试

运行程序,调用接口: http://127.0.0.1:8080/producer/msgReturn

观察控制台,消息被退回:image.png

常见面试题

#高频面试

如何保证 RabbitMQ 消息的可靠传输

先放一张 RabbitMQ 消息传递图image.png

从这个图中,可以看出,消息可能丢失的场景以及解决方案:

  1. 生产者将消息发送到 RabbitMQ 失败

    1. 可能原因:网络问题等
    2. 解决办法:参考——发送方确认,confirm 模式
  2. 消息在交换机中无法路由到指定队列:

    1. 可能原因:代码或配置层面错误,导致消息路由失败
    2. 解决办法:参考——发送方确认,return 模式
  3. 消息队列自身数据丢失

    1. 可能原因:消息到达 RabbitMQ 之后,RabbitMQ Server 宕机导致消息丢失
    2. 解决办法:参考——持久性。开启 RabbitMQ 持久化,就是消息写入之后会持久化到磁盘,如果 RabbitMQ 挂了,回复之后会自动读取之前存储的数据(极端情况下,RabbitMQ 还未持久化就挂了,可能导致少量的数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)
  4. 消息者异常,导致消息丢失

    1. 可能原因:消息到达消费者,还没来得及消费,消费者宕机、消费者逻辑有问题
    2. 解决办法:参考——消息确认RabbitMQ 提供了消费者应答机制来使 RabbitMQ 能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动答应的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制,当消息消费异常时,通过消息重试确保消息的可靠性
http://www.dtcms.com/a/318302.html

相关文章:

  • 【科研绘图系列】R语言绘制瀑布图
  • 院校机试刷题第二十一天|回顾代码随想录第十六天、
  • google官方性能文档:Android 动态性能框架优化散热和 CPU 性能-Thermal API部分
  • 短剧小程序系统开发:技术驱动下的内容创新之路
  • 2025年08月 GitHub 热门项目推荐
  • 1深度学习Pytorch-pytorch、tensor的创建、属性、设备和类型转换、数据转换、常见操作(获取元素、元素运算、形状改变、相乘、广播)
  • 【31】C++实战篇——C++ 从数组里找出相邻两个波谷之间的主波峰的y值和其对应下标i,考虑到波形的上升和下降情况
  • 【AI总结】python连接MySQL(5)- 高级数据库配置与连接器设计
  • go语言变量2
  • 开疆智能ModbusTCP转Profinet网关连接安川YRC1000机器人配置案例
  • 嵌入式处理器指令系统:精简指令集RISC与复杂指令集CISC的简介,及区别
  • Cervantes:面向渗透测试人员和红队的开源协作平台
  • 勇芳字体查看器 v1.0 免费版
  • 当前就业形势下,软件测试工程师职业发展与自我提升的必要性
  • Kubesphere搜索镜像问题
  • 深度解析|资源位管理工具如何重构媒体商业化效率?
  • 飞书对接E签宝完整方案
  • AI浪潮下,FPGA如何实现自我重塑与行业变革
  • 动态代理常用的两种方式?
  • 开发教育全链路管理系统 + 微信小程序,为各类教育主体注入数字化动力!
  • LeetCode 面试经典 150_数组/字符串_O(1)时间插入、删除和获取随机元素(12_380_C++_中等)(哈希表)
  • Conda虚拟环境安装包
  • 信号处理:信号产生
  • 2025年WiFi技术白皮书:全球物联网无线通信的关键创新
  • Codeforces Round 987 (Div. 2)
  • [特殊字符]海尔考察游学 | 解码人才培养秘籍
  • 长时间面对电脑屏幕需要使用防晒霜吗?
  • js中的 slice、splice、split、substring、substr
  • 面试题:使用class类来写工个五子棋
  • spring-dubbo