当前位置: 首页 > news >正文

RabbitMQ 高级特性之消息确认

1. 简介

RabbitMQ 的消息发送流程:

  • producer 将消息发送给 broker,consumer 从 broker 中获取消息并消费

那么在这里就涉及到了两种消息发送,即 producer 与 broker 之间和 consumer 与 broker 之间。

“消息确认” 讨论的是 consumer 与 broker 之间的消息发送。

2. 为什么会有这个特性

当 broker 给 consumer 发送消息时,可能会出现下面两种情况:

  • 消息未成功到达 consumer;
  • 消息成功到达 consumer,但是 consumer 没有成功消费这条消息,如:在处理消息时发生异常等情况。

这时,就需要有一种解决方案,保证 broker 与 consumer 之间消息传输的可靠性,于是就有了消息确认这一特性。

3. 使用 RabbitMQ Java 时如何进行消息确认(不是重点)

public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列//如果队列不存在,就创建channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}

这是一段路由模式的代码,在这段代码中,有下面一条语句:

channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);

在这个方法中,有三个参数:

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  • queue:consumer 通过哪个队列获取 broker 发送的消息;
  • autoAck:是否自动确认;
  • callback:consumer 消费消息的逻辑。

其中,autoAck 就是消息确认的体现:

  • autoAck 为 true:RabbitMQ 会将发送给 consumer 的消息视为已被成功接收和消费(consumer 可能并没有成功接收到或成功消费,但是 RabbitMQ 不管了),就会被将这条消息删除;
  • autoAck 为 false:当 RabbitMQ 发送消息后,并不会马上就将消息删除,而是会等 consumer 调用 Basic.Ack,收到 ack 后,才会将消息删除。

将 autoAck 设置为 false 后,若 broker 长时间没有收到 consumer 发送的 ack 且 consumer 已经断开连接,就会将这条消息重新入队列,继续发送给 consumer 进行消费,此时,队列中的消息就分为了两种:

  • 还未被发送的消息;
  • 已经发送了的消息,但是没有收到 ack 而重新入队列等待被消费。

4. 在 spring 中使用 RabbitMQ 时如何进行消息确认

4.1 basicAck

在 spring 下的 Channel 类中提供了下面几种方法:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

在这个方法中,有三个参数:

  • deliveryTag:是 broker 给 consumer 发送消息的唯一标识,在一个 channel 中 deliveryTag 是唯一的;
  • mulitple: 是否批量确认

使用这个方法后,就会告知 broker 这条消息已经成功被消费,可以将其删除。

4.2 basicNack

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

 在这个方法中,多了一个参数:

  • requeue:是否重新入队列。

使用这个方法,就相当于给 broker 发送 nack,即这条消息没有被正确消费。

若 requeue 为 true,就会将这条消息重新入队列,继续给 consumer 消费;

若 requeue 为 false,broker 就会这条消息删除。

4.3 basicReject

void basicReject(long deliveryTag, boolean requeue) throws IOException;

这个方法与 basicNack 大致相同,此处省略。

4.4 配置

在 spring 中,提供了三种配置用于消息确认:

  • none:当消息发送给 consumer,不管 consumer 是否成功消费了消息,broker 都会当作这条消息被成功消费了,然后删除这条消息;
  • auto:在 consumer 处理消息时没有抛出异常时,就会确认消息,反之就不会确认,并且将消息重新放入队列中,进行下一次的消费;
  • manual:手动确认,我们需要在代码中指定这条消息是消费成功还是消费失败,分别使用 basicAck 和 basicNack。
spring:rabbitmq:listener:simple:acknowledge-mode: none

5. 代码测试

@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {String messageInfo = "consumer ack mode test...";rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, Constants.ACK_ROUTINGKEY, messageInfo);return "消息发送成功";}
}

这段代码代表的是一个 producer,下面接收到的消息都是通过这段代码发送的。

5.1 none

① 无异常时的消费者代码:

    @RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}

代码运行结果如下:

我们可以通过访问 RabbitMQ 客户端来观察这条消息是否成功被消费:

 

可以看到,Messages 这一列中,Ready 和 Unacked 都为 0,表示消息被成功消费。 

 ② 有异常时的消费者代码:

    @RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}

代码运行结果如下:

由于我们使用了除零操作,于是抛出了异常,我们可以通过访问 RabbitMQ 来观察这条消息是否被删除:

和上面一样,在 broker 中这条消息已经被删除,这与 none 配置性质一致。

5.2 auto 

① 无异常时的消费者代码:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}

代码运行结果如下:

在 RabbitMQ 客户端中显示,这条消息已经被成功消费:

 

② 有异常时的消费者代码:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}

 代码运行结果如下:

在运行结果中,一直会有报错产生,并且都是两个两个为一组,并且在报错信息中可以看到,producer 发送的消息一直在被消费,这是因为存在异常,就会导致这条消息一直在队列中,通过观察 RabbitMQ 客户端可以看出,这条消息依然保存在队列中:

5.3  manual

① 无异常的消费者代码如下:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

在这段代码中,我们使用了 basicAck 和 basicNack 来进行消息确认,当消息处理成功后,就会执行 basicAck,告诉 broker 这条消息已经被成功消费,可以将其删除;当消息执行发生异常后,就会执行 basicNack,并且根据 requeue 参数决定如何处理这条消息。

代码运行结果如下:

RabbitMQ 客户端显示这条消息被成功消费:

 

 ② 有异常的消费者代码如下:

当 requeue 为 true:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);int n = 1 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

在此处的 basicNack,将 requeue 设置为了 true,当消息处理失败后,就会将消息重新入队列,重新被消费:

我们可以看到,这条消息一直在被消费,并且 delivertTag 在递增。

并且从 RabbitMQ 客户端中可以看到,这条消息依然存在,等待被成功消费:

 当 requeue 为 false:

当处理消息发生异常后,就会将消息从队列中删除。

代码运行结果如下:

虽然异常依然存在,但是消息却没有重复发送,并且 RabbitMQ 中也将这条消息删除:

 

 

http://www.dtcms.com/a/264915.html

相关文章:

  • 【Java面试】讲讲Redis的Cluster的分片机制
  • 前端面试专栏-主流框架:16. vue工程化配置(Vite、Webpack)
  • Django 安装使用教程
  • Linux基本命令篇 —— which命令
  • 无人机AI制导模块运行方式概述
  • 免费版安全性缩水?ToDesk、TeamViewer、向日葵、网易UU远程访问隐私防护测评
  • 【C#引用DLL详解】
  • 使用 JavaScript、Mastra 和 Elasticsearch 构建一个具备代理能力的 RAG 助手
  • docker离线/在线环境下安装elasticsearch
  • SpringCloud系列(47)--SpringCloud Bus实现动态刷新定点通知
  • springboot切面编程
  • 大数据Hadoop之——Hbase下载安装部署
  • CSS外边距合并(塌陷)全解析:原理、场景与解决方案
  • OD 算法题 B卷【求最小步数】
  • 计算机视觉的新浪潮:扩散模型(Diffusion Models)技术剖析与应用前景
  • 360安全卫士占用5037端口(ADB端口)解决方案
  • 【小技巧】Python+PyCharm IDE 配置解释器出错,环境配置不完整或不兼容。(小智AI、MCP、聚合数据、实时新闻查询、NBA赛事查询)
  • 智慧赋能高压并网:分布式光伏监控系统在5.88MW物流园项目的实践解析
  • 深入解析 OPC UA:工业自动化与物联网的关键技术
  • css实现优惠券效果 全
  • DAY 45 通道注意力(SE注意力)
  • langchain从入门到精通(三十四)——RAG优化策略(十)父文档检索器实现拆分和存储平衡
  • JavaFX:属性Property简介
  • 集合-二叉搜索树
  • 【在 C# 中通过 P/Invoke 调用 C++ DLL 时的数据类型转换】
  • 第二章-AIGC入门-文本生成:开启内容创作新纪元(4/36)
  • 字典课后练习讲解|5类数据容器的总结对比
  • 存储过程封装:复杂业务逻辑的性能优化
  • AntV L7 之LarkMap 地图
  • A模块 系统与网络安全 第三门课 网络通信原理-4