当前位置: 首页 > news >正文

RabbitMQ 重试机制 和 TTL

目录

1. 重试机制

1.1 简介

1.2 配置文件 

1.3 消费者确认机制为 auto 时

1.4 消费者确认机制为 manual 时

2. TTL

2.1 设置消息的过期时间

2.2 设置队列的过期时间

2.3 给过期队列中消息设置过期时间


1. 重试机制

1.1 简介

在消息传递过程中, 可能会遇到各种问题, 如网络故障, 服务不可用, 资源不足... 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送. 但如果是程序逻辑引起的错误, 那么多次重试也是没有用的, 可以设置重试次数.

1.2 配置文件 

spring:application:name: rabbitmq-extensions-demorabbitmq:addresses: amqp://study:study@192.168.100.10:5672/extensionlistener:simple:
#        acknowledge-mode: noneacknowledge-mode: auto
#        acknowledge-mode: manualretry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长为5秒max-attempts: 5 # 最大重试次数
    //重试机制@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("retry").noargs();}

1.3 消费者确认机制为 auto 时

如果程序逻辑错误, 那么就会不断重试, 造成消息积压. 因此我们就需要设置重试次数, 当多次重试还是失败, 消息就会被自动确认, 自然消息就会丢失 (这里可以弄个死信队列, 来存储这些丢失的消息)

生产者 : 

    @RequestMapping("/retry")public String retry() {System.out.println("retry...");rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息发送成功";}

消费者 : 

@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n",new String(message.getBody(), StandardCharsets.UTF_8), deliveryTag);int num = 3/0;System.out.println("业务处理完成");}
}

由于消费者代码逻辑错误, 消息重发机制触发了 4 次, 一共发了 5 次, 日志中, 每次的 deliverTag 都一样, 表示发送的是同一个消息.  

查看队列中消息也已经被删除.

1.4 消费者确认机制为 manual 时

更改消费者 : 

    @RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s, deliveryTag: %s \n",new String(message.getBody(), StandardCharsets.UTF_8), deliveryTag);try {int num = 3/0;System.out.println("业务处理完成");channel.basicAck(deliveryTag, false);}catch (Exception e){// 不成功消息重新入队列channel.basicNack(deliveryTag, false, true);}}

这里我们发现, 虽然我们设置了重试机制, 但是 deliveryTag 还是在不断更新... 队列中的消息也不会消失. 原因是手动模式下, 消费者需要显示的对消息进行确认, 如果消费者在消息处理过程中遇到异常, 可以选择确认, 不确认消息,也可以选择重新入队. 所以重试的控制权不在应用程序本身, 而在于代码逻辑本身.

  • 消费者确认机制为 AUTO 时, 如果程序逻辑异常, 多次重试还是失败. 那么消息就会自动确认, 进而消息就会丢失.
  • 消费者确认机制为 MANAUL 时, 如果程序逻辑异常, 多次重试依然处理失败, 无法被确认, 消息就会积压.
  • 消费者确认机制为 NONE 时, 不管发生什么情况, 当消息从 Broker 内部发出时, 就会自动确认, 因此它不存在任何内容

2. TTL

TTL -> time to live, 指的是过期时间.

  • 给消息设置过期时间 : 当消息到达过期时间, 还没有被消费, 就会被自动清除.
  • 给队列设置过期时间 :  就相当于给队列的所有消息设置了一个过期时间, 这消息的过期时间与队列的过期时间是相同的. 从消息入队列开始算起, 经过这个时间, 就会被删除

2.1 设置消息的过期时间

    @Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
    @RequestMapping("/ttl")public String ttl() {System.out.println("ttl...");MessagePostProcessor messageInfo = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("30000"); //单位: 毫秒, 过期时间为30sreturn message;}};rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 30s", messageInfo);rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为10sreturn message;});return "消息发送成功";}

不需要消费者, 不然看不到消息在队列中的自动删除.

2.2 设置队列的过期时间

    //设置ttl@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();  //设置队列的ttl为20s}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue, @Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}
    @RequestMapping("/ttl2")public String ttl2() {System.out.println("ttl2...");//发送普通消息rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...");return "消息发送成功";}

我们发现了消息经过 20 秒后被删除了, 但是队列没有被删除, 说明过期队列是给消息上过期时间的. 这个队列也有 TTL 的标志了.

2.3 给过期队列中消息设置过期时间

如果两种方法同时使用, 那么就以过期时间较小的值为准.

设置队列的过期时间, 一旦消息过期, 就会从队列中删除.

设置消息的过期时间, 即使消息过期, 如果消息不在队首, 还得等到消息到达队首之后才会进行判定是否过期. 如果过期, 那就删除, 反之就投递到相应的消费者中.

为什么这两种方法处理的方式不一样?

因为设置队列的过期时间, 那么队列中过期的消息一定在队首, RabbitMQ 只需要定期从队首扫描消息是否有过期的消息即可, 而设置消息的过期时间, 每条消息的过期时间都不一致, 如果要删除队列的所有过期消息那么就要扫描整个队列, 所以不如等到消息要进行投递时再判断消息是否过期, 这样可以减少一定的资源消耗.


文章转载自:

http://aIdcUD5H.nmbbt.cn
http://yHLLXO5t.nmbbt.cn
http://WtJLoKlb.nmbbt.cn
http://5UhsYG8L.nmbbt.cn
http://4Bxqglmn.nmbbt.cn
http://J242vCcQ.nmbbt.cn
http://IHKsOYdU.nmbbt.cn
http://vy5SMCz2.nmbbt.cn
http://eAMYqQDv.nmbbt.cn
http://TnIPj7zF.nmbbt.cn
http://TLJDbQoe.nmbbt.cn
http://VwO9kUbh.nmbbt.cn
http://2LhooIqC.nmbbt.cn
http://QYVqssY9.nmbbt.cn
http://3buqZXGM.nmbbt.cn
http://3uIsAkuJ.nmbbt.cn
http://QFflch53.nmbbt.cn
http://I6baHoXT.nmbbt.cn
http://TRpvg2eW.nmbbt.cn
http://aM3S3blm.nmbbt.cn
http://LzNaxWLP.nmbbt.cn
http://IkaQLo11.nmbbt.cn
http://yOpK9BfD.nmbbt.cn
http://3auBpKAh.nmbbt.cn
http://W2E91IeG.nmbbt.cn
http://p3ai1rEo.nmbbt.cn
http://X8b7oYTv.nmbbt.cn
http://TwFYbs9o.nmbbt.cn
http://lGK0lETQ.nmbbt.cn
http://aSVfpWro.nmbbt.cn
http://www.dtcms.com/a/372015.html

相关文章:

  • 人工智能竞赛提高mAP的方法
  • 深度学习——残差神经网络案例
  • LeetCode 刷题【68. 文本左右对齐】
  • Day23_【机器学习—集成学习(5)—Boosting—XGBoost算法】
  • 基于飞算JavaAI的在线图书借阅平台设计与实现(深度实践版)
  • fps:AI系统
  • 强化学习入门:从零开始实现Dueling DQN
  • 做事总是三分钟热度怎么办
  • 图像形态学
  • C++运算符重载——函数调用运算符 ()
  • 分布式系统——分布式数据库的高扩展性保证
  • C++ 并发编程:异步任务
  • 四、神经网络的学习(中)
  • OPENPPP2 —— IP标准校验和算法深度剖析:从原理到SSE2优化实现
  • 梅花易数:从入门到精通
  • 计算机⽹络及TCP⽹络应⽤程序开发
  • 单点登录1(SSO知识点)
  • 嵌入式学习---(ARM)
  • 嵌入式学习day44-硬件—ARM体系架构
  • 《数据结构全解析:栈(数组实现)》
  • Linux系统资源监控脚本
  • PHP中各种超全局变量使用的过程
  • C++-类型转换
  • [GDOUCTF 2023]doublegame
  • 系统资源监控与邮件告警
  • 1706.03762v7_analysis
  • 云平台面试内容(三)
  • 机器学习之集成学习
  • 旋转位置编码(RoPE)--结合公式与示例
  • Python-基础 (六)