当前位置: 首页 > news >正文

延迟 队列

概念

延迟队列顾名思义就是消息不立即发送给消费者消费,而是延迟一段时间再交给消费者。

RabbitMQ本身没有直接支持延迟队列的的功能,但是可以通过前面所介绍的TTL+死信队列的方式组合
模拟出延迟队列的功能.

RabbitMQ 有些版本还支持延迟队列的插件安装,我们也可以通过安装这个插件实现延迟队列的功能。

TTL + 死信队列

实现思路:

假设一个应用中需要将每条消息都设置为10秒的延迟,生产者通过normal_exchange这个交换器将发送的消息存储在normal_queue这个队列中.消费者订阅的并非是normal_queue这个队列,而是dlx_queue这个队列.当消息从normal_queue这个队列中过期之后被存入dlx_queue这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。

在这里插入图片描述

代码演示:

常量设置:

    //死信队列public static final String DL_QUEUE = "DL_QUEUE";public static final String DL_EXCHANGE = "DL_EXCHANGE";public static final String DL_KEY = "DL_KEY";//普通队列public static final String NORMAL_QUEUE = "NORMAL_QUEUE";public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static final String NORMAL_KEY = "NORMAL_KEY";

声明队列、交换机、绑定关系:

    //普通队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(MQConstants.NORMAL_QUEUE).deadLetterExchange(MQConstants.DL_EXCHANGE).deadLetterRoutingKey(MQConstants.DL_KEY).build();}@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(MQConstants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.NORMAL_KEY).noargs();}//死信队列@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(MQConstants.DL_QUEUE).build();}@Bean("dlExchange")public Exchange dlExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") Exchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DL_KEY).noargs();}

生产者:将消息过期时间设置为 10 s

    @RequestMapping("/dl")public String dl() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "dl" + i, message -> {message.getMessageProperties().setExpiration("10000");return message;});}return "消息发送成功";}

消费者需要消费的队列是死信队列:

@Component
@RabbitListener(queues = MQConstants.DL_QUEUE)
public class DLListener {@RabbitHandlerpublic void handle(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {channel.basicAck(deliveryTag, false);System.out.println("消息成功消费:" + messageContent);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}

存在的问题

当我们先发送一条延迟时间长的消息,然后再发送一条延迟时间短的消息,我们会发现,短的消息并没有被即使消费,而是等到长的消息时间一到,才被消费了

    @RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "30s ",message -> {message.getMessageProperties().setExpiration("30000");return message;});rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "10s ",message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息发送成功";}

在这里插入图片描述


原因如下:
消息过期之后,不一定会被马上丢弃因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列,此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个
消息并不会优先得到执行。

所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

延迟队列的插件

安装

官方文档:Scheduling Messages with RabbitMQ

下载链接:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载的插件需要存放到哪个目录:https://www.rabbitmq.com/docs/installing-plugins

根据你不同的环境去选择不同的目录:
在这里插入图片描述

Linux命令:

#查看插件列表
rabbitmq-plugins list#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange#重启服务
service rabbitmq-server restart

我们去到 rabbitmq 管理界面查看 exchange 有没有延迟类型 “x-delayed-messge” ,如果存在这一类型说明我们的插件安装成功了

在这里插入图片描述

代码演示

常量类:

    //延迟队列public static final String DELAY_QUEUE = "DELAY_QUEUE";public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";public static final String DELAY_KEY = "DELAY_KEY";

声明:

    //延迟队列@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(MQConstants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DELAY_KEY).noargs();}

生产者:这里我们发送三条不同过期时间的消息来进行演示:
通过setDelayLong() 方法设置延迟时间

    @RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "30s ",message -> {message.getMessageProperties().setDelayLong(30000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "10s ",message -> {message.getMessageProperties().setDelayLong(10000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "40s ", message -> {message.getMessageProperties().setDelayLong(40000L);return message;});return "消息发送成功";}

这里我们将确认模式设置为自动模式,不进行手动确认,便于我们书写代码:

@Component
@RabbitListener(queues = MQConstants.DELAY_QUEUE)
public class DelayListener {@RabbitHandlerpublic void handle(String message) {System.out.printf("%tc 接收到的消息为:%s\n", new Date(), message);}
}

最终效果:
在这里插入图片描述

总结

1.基于死信实现的延迟队列
a优点:1)灵活不需要额外的插件支持
b.缺点: 1) 存在消息顺序问题 2)需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性
2.基于插件实现的延迟队列

a.优点:1)通过插件可以直接创建延迟队列,简化延迟消息的实现. 2)避免了DLX的时序问题
b.缺点:1)需要依赖特定的插件,有运维工作2)只适用特定版本


文章转载自:

http://kkYrSXr9.rxfgh.cn
http://9ng0sjwR.rxfgh.cn
http://ULEO7cQL.rxfgh.cn
http://oA1zzaFB.rxfgh.cn
http://N1iFIAXg.rxfgh.cn
http://YBK1rpd3.rxfgh.cn
http://asSLieFn.rxfgh.cn
http://3S5GrQ8h.rxfgh.cn
http://Rgtc8Bk4.rxfgh.cn
http://M46sB6gJ.rxfgh.cn
http://yDMKLYi2.rxfgh.cn
http://ay2plwXV.rxfgh.cn
http://dHiIxD9x.rxfgh.cn
http://YZdpGYeB.rxfgh.cn
http://kMr5CMkX.rxfgh.cn
http://djzyMu0F.rxfgh.cn
http://Z9akoLGL.rxfgh.cn
http://TgIGe0zz.rxfgh.cn
http://8SIw679S.rxfgh.cn
http://QImWQAgj.rxfgh.cn
http://lZDvzeXV.rxfgh.cn
http://knOmCHK7.rxfgh.cn
http://MzefvPzw.rxfgh.cn
http://pqcOWyHe.rxfgh.cn
http://aunBPawU.rxfgh.cn
http://0LqDfLgX.rxfgh.cn
http://oQAnCqLO.rxfgh.cn
http://g4aqVjEO.rxfgh.cn
http://gHGAu5DJ.rxfgh.cn
http://wwJWINYF.rxfgh.cn
http://www.dtcms.com/a/371329.html

相关文章:

  • 宋红康 JVM 笔记 Day14|垃圾回收概述
  • 【ICCV2025】计算机视觉|即插即用|ESC:颠覆Transformer!超强平替,ESC模块性能炸裂!
  • 手机能看、投屏 / 车机不能看与反向链接验证类似吗?
  • Xilinx ZYNQ 开发环境中搭建 Qt 环
  • leetcode909.蛇梯棋
  • JAVA NIO学习笔记基础强化学习总结
  • 基于51单片机手机无线蓝牙APP控制风扇调速设计
  • 力扣hot100:相交链表与反转链表详细思路讲解(160,206)
  • 如何在 DevOps 管道中实现 AI?
  • 【Java基础07】面向对象进阶
  • 动态维护有效区间:滑动窗口
  • 桌面时间 Catime
  • 解锁服务器网络配置新姿势:Wisdom SSH 助力之旅
  • 设计模式:状态模式(State Pattern)
  • 【ARM基础知道】
  • SpringCloud Alibaba微服务--Gateway使用
  • 基于脚手架微服务的视频点播系统-播放控制部分
  • 【C++详解】C++ 智能指针:使用场景、实现原理与内存泄漏防治
  • 【iOS】push,pop和present,dismiss
  • HiCMAE 论文复现:基于 RAVDESS 数据集的音视频情感识别
  • axios的两种异步方式对比
  • uniapp结合uview制作美食页面
  • Spark mapreduce 的一个用法
  • [iOS] push 和 present Controller 的区别
  • 五.贪心算法
  • vue中axios与fetch比较
  • 【iOS】block复习
  • 打造第二大脑读书笔记目录
  • 【Docker】Docker基础
  • 一、CMake基础