【RabbitMQ】发布确认机制的具体实现
文章目录
- 模式介绍
- 建立连接
- 单独确认
- 代码实现逻辑
- 运行结果
- 批量确认
- 代码实现逻辑
- 运行结果
- 异步确认
- 实现逻辑介绍
- 代码实现逻辑
- 运行结果
- 三种策略对比以及完整代码
模式介绍
作为消息中间件,都会面临消息丢失的问题,消息丢失大概分为三种情况:
- 生产者问题:因为应用程序故障,网络抖动等各种原因,生产者没有成功向
broker
发送消息 - 消息中间件自身问题:生产者成功发送给了
Broker
,但是Broker
没有把消息保存好,导致消息丢失 - 消费者问题:
Broker
发送消息到消费者,消费者在消费消息时,因为没有处理好,导致broker
将消费失败的消息从列表中删除了
RabbitMQ
也对上述问题给出了相应的解决方案。- 问题二可以通过持久化机制
- 问题三可以采用消息应答机制
- 问题一可以采用发布确认机制
发布确认属于 RabbitMQ
的七大工作模式之一
生产者将信道设置成 confirm
(确认)模式
- 一旦信道进入
confirm
模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始)- 同一个
channel
下,序号不可能重复
- 同一个
- 一旦消息被投递到所有匹配的对类之后,
brocker
就会发送一个确认(ACK
)给生产者(包含消息的唯一ID
)- 这就使得生产者知道消息已经正确到达目的的队列了
- 如果消息和对类是可持久化的,那么确认消息会在将消息写入磁盘之后发出
broker
回传给生产者的确认消息中deliveryTag
包含了确认消息的序号- 此外
broker
也可以设置channel.basicAck
方法中的multiple
参数,表示到这个序号之前的所有消息都已经得到了处理
发送确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息
- 当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息
- 如果
RabbitMQ
因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)
命令,生产者同样可以在回调方法中处理该nack
命令
使用发布确认机制,必须要信道设置成 confirm
(确认) 模式
- 发布确认是
AMQP 0.9.1
协议的扩展,默认情况下它不会被启用 - 生产者通过
channel.confirmSelect()
将信道设置为confirm
模式
Channel channel = connection.createChannel();
channel.confirmSelect();
发布确认有三种策略,接下来我们来介绍这三种策略
Producer
、Brocker
、Consumer
都有可能丢失消息
- 发布确认是来解决生产者
Producer
消息丢失的问题- 生产者可以在发送消息的同时,等待返回确认消息
建立连接
因为每一个策略都需要重复建立链接这一步骤,所以我们将其提出来,单独作为一个方法,需要的时候直接调用即可
- 之后就不用重复写这一部分代码了
- 在类里面,
main
方法外面,使用一个静态方法
public class PublisherConfirms { static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); return connectionFactory.newConnection(); } public static void main(String[] args) { } }
单独确认
Publishing Messages Individually
代码实现逻辑
/** * 单独确认 */ private static void publishingMessagesIndividually() throws Exception { // 1. 创建连接 // 我们将连接的建立写在 try 里面,这样就不用再去关闭了 try(Connection connection = createConnection()) { // 2. 开启信道 Channel channel = connection.createChannel(); // 3. 设置信道为 confirms 模式 channel.confirmSelect(); // 4. 声明队列(交换机就使用内置的,就不再声明了) // 队列对象、是否持久化、是否独占、是否自动删除、传递参数 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false,null); // 5. 发送消息,并等待确认 // 这里我们需要可以再创建一个 MESSAGE_COUNT 全局变量,来指定消息的数量 long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; // 信道的发送 // 交换机的名称(我们使用的是内置交换机,也就是空的)、routingKey、 channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes()); // 等待确认(等待确认消息,只要消息被确认,这个方法就会被返回) // 有 waitForConfirms() 和 waitForConfirmsOrDie() 随便用哪个 // 如果超时过期,则抛出 TimeoutException。如果任何消息被 nack(丢失),waitForConfirmsOrDie 则抛出 Exception channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); // 这里注意是 printf System.out.printf("单独确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start); } }
运行结果
单独确认==>消息条数: 200, 耗时: 5793 ms
- 可以发现,耗时较长
观察上面代码,会发现这种策略是每发送一条消息后就调用 channel.waitForConfirmsOrDie()
方法, 之后等待服务器的确认
- 这其实是一种串行同步等待的方式
- 尤其对于持久化的消息来说,需要等待消息确认存储在硬盘之后才会返回 (调用
Linux
内核中的fsync
方法)
但是消息确认机制是支持异步的,可以一边发送消息,一边等待消息确认。由此进行了改进,我们看另外两种策略
Publishing Messages in Batches(批量确认)
:每发送一批消息之后,调用channel.waitForConfirms
方法,等待服务器的确认返回Handling Publisher Confirms Asynchronously(异步确认)
:提供一个回调方法,服务端确认了一条或者多条消息后,客户端会对这个方法进行处理
批量确认
Publishing Messages in Batches
代码实现逻辑
/** * 批量确认 */
private static void publishingMessagesInBatches() throws Exception { // 1. 建立连接 try(Connection connection = createConnection()){ // 2. 开启信道 Channel channel = connection.createChannel(); // 3. 设置信道为 confirm 模式 channel.confirmSelect(); // 4. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null); // 5. 发送消息,并进行确认 // 设置批量处理的大小和计数器 long start = System.currentTimeMillis(); int batchSize = 100; int outstandingMessageCount = 0; for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes()); outstandingMessageCount++; // 当计数器的大小达到了设置的批量处理大小,就进行确认 if(outstandingMessageCount == batchSize) { channel.waitForConfirmsOrDie(5000); // 消息确认后,计数器要清零 outstandingMessageCount = 0; } // 当计数器大小 < 100 的时候,由于没有达到批量发送的标准,所以单独再进行发送 if (outstandingMessageCount > 0) { channel.waitForConfirmsOrDie(5000); } } long end = System.currentTimeMillis(); System.out.printf("批量确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start); }
}
运行结果
批量确认==>消息条数: 200, 耗时: 128 ms
异步确认
Handling Publisher Confirms Asynchronously
实现逻辑介绍
异步 confirm
方法的编程实现最为复杂
Channel
接口提供了一个方法addConfirmListener()
- 这个方法可以添加
ConfirmListener
回调接口
ConfirmListener
接口中包含两个方法:
handleAck(long deliveryTag, boolean multiple)
,处理RabbitMQ
发送给生产者的ack
deliveryTag
表示发送消息的序号multiple
表示是否批量确认
handleNack(long deliveryTag, boolean multiple)
,处理RabbitMQ
发送给生产者的nack
我们需要为每一个 Channel
维护一个已发送消息的序号集合
- 当收到
RabbitMQ
的confirm
回调时,从集合中删除对应的消息 - 当
Channel
开启confirm
模式后,channel
上发送消息都会附带一个从1
开始递增的deliveryTag
序号 - 我们可以使用
SortedSet
的有序性来维护这个已发消息的集合- 当收到
ack
时,从序列中删除该消息的序号。如果为批量确认消息,表示小于当前序号deliveryTag
的消息都收到了,则清楚对应集合 - 当收到
nack
时,处理逻辑类似,不过需要结合具体业务情况,进行消息重发等操作
- 当收到
代码实现逻辑
/** * 异步确认 */
private static void handlingPublisherConfirmsAsynchronously() throws Exception { // 1. 建立连接 try(Connection connection = createConnection()) { // 2. 开启信道 Channel channel = connection.createChannel(); // 3. 设置信道为 confirm 模式 channel.confirmSelect(); // 4. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null); // 5. 监听 confirm long start = System.currentTimeMillis(); // 创建一个集合,用来存放未确认的消息(的id) SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果是批量确认,就要将集合中 <= deliveryTag 的 id 都给清除掉 if(multiple) { // headSet(n)方法返回当前集合中小于 n 的集合 // 先获取到这部分 id,然后一起 clear 清除掉即可 confirmSeqNo.headSet(deliveryTag + 1).clear(); }else { // 单独确认,只需要移除当前这个 id 即可 confirmSeqNo.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 和 ack 处理模式基本是相似的,只是多了一步重发处理 if(multiple) { confirmSeqNo.headSet(deliveryTag + 1).clear(); }else { confirmSeqNo.remove(deliveryTag); } // 业务需要根据实际场景进行处理,比如重发,此处代码省略 } }); // 6. 发送消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; long seqNo = channel.getNextPublishSeqNo(); // 拿到消息的序号 channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes()); confirmSeqNo.add(seqNo); // 将消息的序号加入集合中 } // 确认消息已处理完 while (!confirmSeqNo.isEmpty()) { // 没有处理完,就休眠一段时间后再确认一下,看是否处理完 Thread.sleep(10); } long end = System.currentTimeMillis(); System.out.printf("异步确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start); }
}
运行结果
单独确认==>消息条数: 200, 耗时: 93 ms
三种策略对比以及完整代码
package rabbitmq.publisher.confirms; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 200; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { // Strategy #1: Publishing Messages Individually // 单独确认 publishingMessagesIndividually(); // Strategy #2: Publishing Messages in Batches // 批量确认 publishingMessagesInBatches(); // Strategy #3: Handling Publisher Confirms Asynchronously // 异步确认 handlingPublisherConfirmsAsynchronously(); } /** * 单独确认 */ private static void publishingMessagesIndividually() throws Exception { // 1. 创建连接 // 我们将连接的建立写在 try 里面,这样就不用再去关闭了 try(Connection connection = createConnection()) { // 2. 开启信道 Channel channel = connection.createChannel(); // 3. 设置信道为 confirms 模式 channel.confirmSelect(); // 4. 声明队列(交换机就使用内置的,就不再声明了) // 队列对象、是否持久化、是否独占、是否自动删除、传递参数 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false,null); // 5. 发送消息,并等待确认 // 这里我们需要可以再创建一个 MESSAGE_COUNT 全局变量,来指定消息的数量 long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; // 信道的发送 // 交换机的名称(我们使用的是内置交换机,也就是空的)、routingKey、 channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes()); // 等待确认(等待确认消息,只要消息被确认,这个方法就会被返回) // 有 waitForConfirms() 和 waitForConfirmsOrDie() 随便用哪个 // 如果超时过期,则抛出 TimeoutException。如果任何消息被 nack(丢失),waitForConfirmsOrDie 则抛出 Exception channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); // 这里注意是 printf System.out.printf("单独确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start); } } /** * 批量确认 */ private static void publishingMessagesInBatches() throws Exception { // 1. 建立连接 try(Connection connection = createConnection()){ // 2. 开启信道 Channel channel = connection.createChannel(); // 3. 设置信道为 confirm 模式 channel.confirmSelect(); // 4. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null); // 5. 发送消息,并进行确认 // 设置批量处理的大小和计数器 long start = System.currentTimeMillis(); int batchSize = 100; int outstandingMessageCount = 0; for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes()); outstandingMessageCount++; // 当计数器的大小达到了设置的批量处理大小,就进行确认 if(outstandingMessageCount == batchSize) { channel.waitForConfirmsOrDie(5000); // 消息确认后,计数器要清零 outstandingMessageCount = 0; } // 当计数器大小 < 100 的时候,由于没有达到批量发送的标准,所以单独再进行发送 if (outstandingMessageCount > 0) { channel.waitForConfirmsOrDie(5000); } } long end = System.currentTimeMillis(); System.out.printf("批量确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start); } } /** * 异步确认 */ private static void handlingPublisherConfirmsAsynchronously() throws Exception { // 1. 建立连接 try(Connection connection = createConnection()) { // 2. 开启信道 Channel channel = connection.createChannel(); // 3. 设置信道为 confirm 模式 channel.confirmSelect(); // 4. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null); // 5. 监听 confirm long start = System.currentTimeMillis(); // 创建一个集合,用来存放未确认的消息(的id) SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // 如果是批量确认,就要将集合中 <= deliveryTag 的 id 都给清除掉 if(multiple) { // headSet(n)方法返回当前集合中小于 n 的集合 // 先获取到这部分 id,然后一起 clear 清除掉即可 confirmSeqNo.headSet(deliveryTag + 1).clear(); }else { // 单独确认,只需要移除当前这个 id 即可 confirmSeqNo.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 和 ack 处理模式基本是相似的,只是多了一步重发处理 if(multiple) { confirmSeqNo.headSet(deliveryTag + 1).clear(); }else { confirmSeqNo.remove(deliveryTag); } // 业务需要根据实际场景进行处理,比如重发,此处代码省略 } }); // 6. 发送消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms" + i; long seqNo = channel.getNextPublishSeqNo(); // 拿到消息的序号 channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes()); confirmSeqNo.add(seqNo); // 将消息的序号加入集合中 } // 确认消息已处理完 while (!confirmSeqNo.isEmpty()) { // 没有处理完,就休眠一段时间后再确认一下,看是否处理完 Thread.sleep(10); } long end = System.currentTimeMillis(); System.out.printf("异步确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start); } } }
- 消息条数越多,异步确认的优势越明显