当前位置: 首页 > news >正文

【RabbitMQ】发布确认机制的具体实现

文章目录

    • 模式介绍
    • 建立连接
    • 单独确认
      • 代码实现逻辑
      • 运行结果
    • 批量确认
      • 代码实现逻辑
      • 运行结果
    • 异步确认
      • 实现逻辑介绍
      • 代码实现逻辑
      • 运行结果
    • 三种策略对比以及完整代码

模式介绍

作为消息中间件,都会面临消息丢失的问题,消息丢失大概分为三种情况:

  1. 生产者问题:因为应用程序故障,网络抖动等各种原因,生产者没有成功向 broker 发送消息
  2. 消息中间件自身问题:生产者成功发送给了 Broker,但是 Broker 没有把消息保存好,导致消息丢失
  3. 消费者问题:Broker 发送消息到消费者,消费者在消费消息时,因为没有处理好,导致 broker 将消费失败的消息从列表中删除了

image.png

  • RabbitMQ 也对上述问题给出了相应的解决方案。
    • 问题二可以通过持久化机制
    • 问题三可以采用消息应答机制
    • 问题一可以采用发布确认机制

发布确认属于 RabbitMQ 的七大工作模式之一

生产者将信道设置成 confirm(确认)模式

  • 一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始)
    • 同一个 channel 下,序号不可能重复
  • 一旦消息被投递到所有匹配的对类之后,brocker 就会发送一个确认(ACK)给生产者(包含消息的唯一 ID
    • 这就使得生产者知道消息已经正确到达目的的队列了
  • 如果消息和对类是可持久化的,那么确认消息会在将消息写入磁盘之后发出
    • broker 回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号
    • 此外 broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理
      image.png

发送确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息

  1. 当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息
  2. 如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack) 命令,生产者同样可以在回调方法中处理该 nack 命令

使用发布确认机制,必须要信道设置成 confirm (确认) 模式

  • 发布确认是 AMQP 0.9.1 协议的扩展,默认情况下它不会被启用
  • 生产者通过 channel.confirmSelect() 将信道设置为 confirm 模式
Channel channel = connection.createChannel();
channel.confirmSelect();

发布确认有三种策略,接下来我们来介绍这三种策略

ProducerBrockerConsumer 都有可能丢失消息

  • 发布确认是来解决生产者 Producer 消息丢失的问题
  • 生产者可以在发送消息的同时,等待返回确认消息

建立连接

因为每一个策略都需要重复建立链接这一步骤,所以我们将其提出来,单独作为一个方法,需要的时候直接调用即可

  • 之后就不用重复写这一部分代码了
  • 在类里面,main 方法外面,使用一个静态方法
public class PublisherConfirms {  static Connection createConnection() throws Exception {  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  return connectionFactory.newConnection();  }  public static void main(String[] args) {  }  }

单独确认

Publishing Messages Individually

代码实现逻辑

    /**  * 单独确认  */  private static void publishingMessagesIndividually() throws Exception {  // 1. 创建连接  // 我们将连接的建立写在 try 里面,这样就不用再去关闭了  try(Connection connection = createConnection()) {  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirms 模式  channel.confirmSelect();  // 4. 声明队列(交换机就使用内置的,就不再声明了)  // 队列对象、是否持久化、是否独占、是否自动删除、传递参数  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false,null);  // 5. 发送消息,并等待确认  // 这里我们需要可以再创建一个 MESSAGE_COUNT 全局变量,来指定消息的数量  long start = System.currentTimeMillis();  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  // 信道的发送  // 交换机的名称(我们使用的是内置交换机,也就是空的)、routingKey、  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());  // 等待确认(等待确认消息,只要消息被确认,这个方法就会被返回)  // 有 waitForConfirms() 和 waitForConfirmsOrDie() 随便用哪个  // 如果超时过期,则抛出 TimeoutException。如果任何消息被 nack(丢失),waitForConfirmsOrDie 则抛出 Exception  channel.waitForConfirmsOrDie(5000);  }  long end = System.currentTimeMillis();  // 这里注意是 printf            System.out.printf("单独确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);  }  }

运行结果

单独确认==>消息条数: 200, 耗时: 5793 ms 
  • 可以发现,耗时较长

观察上面代码,会发现这种策略是每发送一条消息后就调用 channel.waitForConfirmsOrDie() 方法, 之后等待服务器的确认

  • 这其实是一种串行同步等待的方式
  • 尤其对于持久化的消息来说,需要等待消息确认存储在硬盘之后才会返回 (调用 Linux 内核中的 fsync 方法)

但是消息确认机制是支持异步的,可以一边发送消息,一边等待消息确认。由此进行了改进,我们看另外两种策略

  • Publishing Messages in Batches(批量确认):每发送一批消息之后,调用 channel.waitForConfirms 方法,等待服务器的确认返回
  • Handling Publisher Confirms Asynchronously(异步确认):提供一个回调方法,服务端确认了一条或者多条消息后,客户端会对这个方法进行处理

批量确认

Publishing Messages in Batches

代码实现逻辑

/**  * 批量确认  */  
private static void publishingMessagesInBatches() throws Exception {  // 1. 建立连接  try(Connection connection = createConnection()){  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirm 模式  channel.confirmSelect();  // 4. 声明队列  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);  // 5. 发送消息,并进行确认  // 设置批量处理的大小和计数器  long start = System.currentTimeMillis();  int batchSize = 100;  int outstandingMessageCount = 0;  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());  outstandingMessageCount++;  // 当计数器的大小达到了设置的批量处理大小,就进行确认  if(outstandingMessageCount == batchSize) {  channel.waitForConfirmsOrDie(5000);  // 消息确认后,计数器要清零  outstandingMessageCount = 0;  }  // 当计数器大小 < 100 的时候,由于没有达到批量发送的标准,所以单独再进行发送  if (outstandingMessageCount > 0) {  channel.waitForConfirmsOrDie(5000);  }  }  long end = System.currentTimeMillis();  System.out.printf("批量确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);  }  
}

运行结果

批量确认==>消息条数: 200, 耗时: 128 ms 

异步确认

Handling Publisher Confirms Asynchronously

实现逻辑介绍

异步 confirm 方法的编程实现最为复杂

  • Channel 接口提供了一个方法 addConfirmListener()
  • 这个方法可以添加 ConfirmListener 回调接口

ConfirmListener 接口中包含两个方法:

  • handleAck(long deliveryTag, boolean multiple),处理 RabbitMQ 发送给生产者的 ack
    • deliveryTag 表示发送消息的序号
    • multiple 表示是否批量确认
  • handleNack(long deliveryTag, boolean multiple),处理 RabbitMQ 发送给生产者的 nack
    image.png|413

我们需要为每一个 Channel 维护一个已发送消息的序号集合

  • 当收到 RabbitMQconfirm 回调时,从集合中删除对应的消息
  • Channel 开启 confirm 模式后,channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号
  • 我们可以使用 SortedSet 的有序性来维护这个已发消息的集合
    1. 当收到 ack 时,从序列中删除该消息的序号。如果为批量确认消息,表示小于当前序号 deliveryTag 的消息都收到了,则清楚对应集合
    2. 当收到 nack 时,处理逻辑类似,不过需要结合具体业务情况,进行消息重发等操作

代码实现逻辑

/**  * 异步确认  */  
private static void handlingPublisherConfirmsAsynchronously() throws Exception {  // 1. 建立连接  try(Connection connection = createConnection()) {  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirm 模式  channel.confirmSelect();  // 4. 声明队列  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);  // 5. 监听 confirm        long start = System.currentTimeMillis();  // 创建一个集合,用来存放未确认的消息(的id)  SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());  channel.addConfirmListener(new ConfirmListener() {  @Override  public void handleAck(long deliveryTag, boolean multiple) throws IOException {  // 如果是批量确认,就要将集合中 <= deliveryTag 的 id 都给清除掉  if(multiple) {  // headSet(n)方法返回当前集合中小于 n 的集合  // 先获取到这部分 id,然后一起 clear 清除掉即可  confirmSeqNo.headSet(deliveryTag + 1).clear();  }else {  // 单独确认,只需要移除当前这个 id 即可  confirmSeqNo.remove(deliveryTag);  }  }  @Override  public void handleNack(long deliveryTag, boolean multiple) throws IOException {  // 和 ack 处理模式基本是相似的,只是多了一步重发处理  if(multiple) {  confirmSeqNo.headSet(deliveryTag + 1).clear();  }else {  confirmSeqNo.remove(deliveryTag);  }  // 业务需要根据实际场景进行处理,比如重发,此处代码省略  }  });  // 6. 发送消息  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  long seqNo = channel.getNextPublishSeqNo(); // 拿到消息的序号  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());  confirmSeqNo.add(seqNo); // 将消息的序号加入集合中  }  // 确认消息已处理完  while (!confirmSeqNo.isEmpty()) {  // 没有处理完,就休眠一段时间后再确认一下,看是否处理完  Thread.sleep(10);  }  long end = System.currentTimeMillis();  System.out.printf("异步确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);  }  
}

运行结果

单独确认==>消息条数: 200, 耗时: 93 ms 

三种策略对比以及完整代码

package rabbitmq.publisher.confirms;  import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.ConfirmListener;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.Collections;  
import java.util.SortedSet;  
import java.util.TreeSet;  public class PublisherConfirms {  private static final Integer MESSAGE_COUNT = 200;  static Connection createConnection() throws Exception {  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  return connectionFactory.newConnection();  }  public static void main(String[] args) throws Exception {  // Strategy #1: Publishing Messages Individually  // 单独确认  publishingMessagesIndividually();  // Strategy #2: Publishing Messages in Batches  // 批量确认  publishingMessagesInBatches();  // Strategy #3: Handling Publisher Confirms Asynchronously  // 异步确认  handlingPublisherConfirmsAsynchronously();  }  /**  * 单独确认  */  private static void publishingMessagesIndividually() throws Exception {  // 1. 创建连接  // 我们将连接的建立写在 try 里面,这样就不用再去关闭了  try(Connection connection = createConnection()) {  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirms 模式  channel.confirmSelect();  // 4. 声明队列(交换机就使用内置的,就不再声明了)  // 队列对象、是否持久化、是否独占、是否自动删除、传递参数  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false,null);  // 5. 发送消息,并等待确认  // 这里我们需要可以再创建一个 MESSAGE_COUNT 全局变量,来指定消息的数量  long start = System.currentTimeMillis();  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  // 信道的发送  // 交换机的名称(我们使用的是内置交换机,也就是空的)、routingKey、  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());  // 等待确认(等待确认消息,只要消息被确认,这个方法就会被返回)  // 有 waitForConfirms() 和 waitForConfirmsOrDie() 随便用哪个  // 如果超时过期,则抛出 TimeoutException。如果任何消息被 nack(丢失),waitForConfirmsOrDie 则抛出 Exception                channel.waitForConfirmsOrDie(5000);  }  long end = System.currentTimeMillis();  // 这里注意是 printf            System.out.printf("单独确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);  }  }  /**  * 批量确认  */  private static void publishingMessagesInBatches() throws Exception {  // 1. 建立连接  try(Connection connection = createConnection()){  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirm 模式  channel.confirmSelect();  // 4. 声明队列  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);  // 5. 发送消息,并进行确认  // 设置批量处理的大小和计数器  long start = System.currentTimeMillis();  int batchSize = 100;  int outstandingMessageCount = 0;  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());  outstandingMessageCount++;  // 当计数器的大小达到了设置的批量处理大小,就进行确认  if(outstandingMessageCount == batchSize) {  channel.waitForConfirmsOrDie(5000);  // 消息确认后,计数器要清零  outstandingMessageCount = 0;  }  // 当计数器大小 < 100 的时候,由于没有达到批量发送的标准,所以单独再进行发送  if (outstandingMessageCount > 0) {  channel.waitForConfirmsOrDie(5000);  }  }  long end = System.currentTimeMillis();  System.out.printf("批量确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);  }  }  /**  * 异步确认  */  private static void handlingPublisherConfirmsAsynchronously() throws Exception {  // 1. 建立连接  try(Connection connection = createConnection()) {  // 2. 开启信道  Channel channel = connection.createChannel();  // 3. 设置信道为 confirm 模式  channel.confirmSelect();  // 4. 声明队列  channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);  // 5. 监听 confirm            long start = System.currentTimeMillis();  // 创建一个集合,用来存放未确认的消息(的id)  SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());  channel.addConfirmListener(new ConfirmListener() {  @Override  public void handleAck(long deliveryTag, boolean multiple) throws IOException {  // 如果是批量确认,就要将集合中 <= deliveryTag 的 id 都给清除掉  if(multiple) {  // headSet(n)方法返回当前集合中小于 n 的集合  // 先获取到这部分 id,然后一起 clear 清除掉即可  confirmSeqNo.headSet(deliveryTag + 1).clear();  }else {  // 单独确认,只需要移除当前这个 id 即可  confirmSeqNo.remove(deliveryTag);  }  }  @Override  public void handleNack(long deliveryTag, boolean multiple) throws IOException {  // 和 ack 处理模式基本是相似的,只是多了一步重发处理  if(multiple) {  confirmSeqNo.headSet(deliveryTag + 1).clear();  }else {  confirmSeqNo.remove(deliveryTag);  }  // 业务需要根据实际场景进行处理,比如重发,此处代码省略  }  });  // 6. 发送消息  for (int i = 0; i < MESSAGE_COUNT; i++) {  String msg = "hello publisher confirms" + i;  long seqNo = channel.getNextPublishSeqNo(); // 拿到消息的序号  channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());  confirmSeqNo.add(seqNo); // 将消息的序号加入集合中  }  // 确认消息已处理完  while (!confirmSeqNo.isEmpty()) {  // 没有处理完,就休眠一段时间后再确认一下,看是否处理完  Thread.sleep(10);  }  long end = System.currentTimeMillis();  System.out.printf("异步确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);  }  }  }
  • 消息条数越多,异步确认的优势越明显

相关文章:

  • MUSE Pi Pro 更换kernel内核及module模块
  • AI智能体的现状和应用前景
  • jQuery知识框架
  • 2020年下半年试题三:论云原生架构及其应用
  • IDEA 新建 SpringBoot 项目时,没有高版本 SpringBoot 可选
  • Kafka 消费者组进度监控方法解析
  • 【SSL证书系列】https双向认证中客户端认证的原理
  • LeetCode 每日一题 3341. 到达最后一个房间的最少时间 I + II
  • Vue ElementUI原生upload修改字体大小和区域宽度
  • SCDN能够运用在物联网加速当中吗?
  • 精益数据分析(58/126):移情阶段的深度实践与客户访谈方法论
  • Kite AI 自动机器人部署教程
  • 【达梦数据库】超出全局hash join空间问题处理
  • 【江苏省】《信息技术应用创新软件适配改造成本评估规范》(DB32/T 4935-2024)-标准解读系列
  • WinFrom 使用 LiveCharts 实现动态折线图
  • 关于 js:9. Node.js 后端相关
  • 自营交易考试中,怎么用“黄昏之星”形态做出漂亮反转单?
  • 集成 ONLYOFFICE 与 AI 插件,为您的服务带来智能文档编辑器
  • Java的多线程笔记
  • 数据获取_Python
  • 中国乒协坚决抵制恶意造谣,刘国梁21日将前往多哈参加国际乒联会议
  • 西班牙政府排除因国家电网遭攻击导致大停电的可能
  • 佩斯科夫:俄方代表团15日将在伊斯坦布尔等候乌克兰代表团
  • 人民日报仲音:大力纠治违规吃喝顽瘴痼疾
  • 夜读|尊重生命的棱角
  • 京东美团饿了么等外卖平台被约谈