当前位置: 首页 > news >正文

RabbitMQ核心机制——延迟队列

一、 什么是延迟队列?

    消息发送之后,不想让消费者马上收到消息,而是等待特定时间后消费者才能拿到这条消息进行消费。


二、 如何实现延迟队列

    RabbitMQ并没有直接支持延迟队列这一功能,如果需要实现延迟队列,有两种方法可以实现:

1> TTL + 死信队列:给普通队列或消息设置TTL,但没有消费者监听普通队列,消息过期后通过死信交换机路由到死信队列,死信队列的消费者获取消息,就达到了延迟的效果,如下图:

2> 插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 通过这个链接下载好插件,安装即可

下载好插件之后(注意插件的版本要与RabbitMQ版本对应),通过下列命令安装插件:

#进入下列目录,这是附加目录,如果没有就自己创建一个
cd /usr/lib/rabbitmq/plugins#查看插件列表
rabbitmq-plugins list#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange#重启服务
service rabbitmq-server restart

如果在管理界面的交换机——>新建交换机看到下图这个交换机,就代表安装好了:

准备工作完成,接下来看如何通过这两种方式实现延迟队列。


三、 基于 TTL + 死信队列 实现

 准备工作:

(1)声明队列、交换机、及绑定关系

    @Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dl");}

(2)生产者代码

    @RequestMapping("/delay")public String delay(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test...");System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}

(3)消费者代码

@Component
public class DelayListener {@RabbitListener(queues = Constants.DL_QUEUE)public void messageHandler(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[delay.queue] %tc 接收到消息: %s, deliveryTag: %d \n",new Date(),new String(message.getBody(),"UTF-8"),deliveryTag);//业务处理}
}

 

3.1 设置队列TTL + 死信队列

上面的代码就是设置 队列的TTL + 死信队列,这里直接测试:

结果预测:由于上面给队列设置的TTL为10s,因此发送消息10s后消息就因该被消费

可以看到,确实达到了延迟效果,消息发送后10消费者才接收到消息


3.2 设置消息TTL + 死性队列(不推荐)

     前面学习死信队列时,我们知道,如果队列前面的消息比后面的消息过期时间长,那么后面的消息必须等待前面的消息被判定为过期才能继续判定后面的消息是否过期,如果使用 设置消息的TTL + 死信队列 来实现延迟队列是否会出现问题?不妨一试

一、修改normal队列声明,修改生产者代码

    @Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}
    @RequestMapping("/delay")public String delay(){MessagePostProcessor messagePostProcessor1 = message -> {message.getMessageProperties().setExpiration("10000");return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setExpiration("30000");return message;};rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 10s...",messagePostProcessor1);System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}

二、运行程序,测试

预期结果:10s后收到第一条消息,再过20s手收到另一条消息

可以看到,两条消息都在30s后才被消费者接收,显然不符合期望

     可以看到,通过设置 消息的TTL + 死信队列 来实现延迟效果是可能会出现问题的,在实际应用中,推荐使用 队列TTL + 死信队列 或 插件 来实现延迟队列,而不是 消息TTL + 死信队列 来实现。


四、 通过插件实现

    通过插件实现延迟队列非常简单,只需要在声明交换机时通过delayed方法指定这是一个延迟交换机即可。

一、声明队列、交换机及绑定关系

    @Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();//通过delayed方法声明这是一个延迟交换机}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("delay");}

二、消费者代码

@Component
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void messageHandler(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[delay.queue] %tc 接收到消息: %s, deliveryTag: %d \n",new Date(),new String(message.getBody(),"UTF-8"),deliveryTag);//业务处理}
}

三、生产者代码

    @RequestMapping("/delay")public String delay(){MessagePostProcessor messagePostProcessor1 = message -> {message.getMessageProperties().setDelayLong(10000l);return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setDelayLong(30000l);return message;};rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 10s...",messagePostProcessor1);System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}

四、运行程序,测试

预期结果,10s后收到第一条消息,再过20s收到第二条消息

符合预期结果

相关文章:

  • win11 禁用/恢复 内置笔记本键盘(保证管用)
  • 【公式】MathType公式右编号对齐
  • MySQL连接错误解决方案:Can‘t connect to MySQL server on ‘localhost‘ (10038)
  • leetcode2081. k 镜像数字的和-hard
  • 华为OD机试真题——仿LISP运算(2025B卷:200分)Java/python/JavaScript/C/C++/GO最佳实现
  • 【短距离通信】【WiFi】WiFi7起源和应用场景介绍
  • MySQL 定时逻辑备份
  • CI/CD (持续集成/持续部署) GitHub Actions 自动构建
  • GitLab-CI将项目Wiki自动部署到文档中心
  • 卷积神经网络(CNN)深度讲解
  • 【HarmonyOS5】DevEco Studio 预览器与模拟工具详解
  • 基于文本挖掘与情感分析的B站《唐探1900》弹幕研究
  • 使用Cursor生成需求文档+UI设计图
  • 【微服务】SpringBoot 对接飞书审批流程使用详解
  • Python GDAL 库离线安装
  • NTFS0x90属性和0xa0属性和0xb0属性的一一对应关系是index_entry中的index_node中VCN和runlist和bitmap
  • Mybatis框架的构建(IDEA)
  • 【C++】21. 红黑树的实现
  • JWT与布隆过滤器结合使用指南
  • C++编程单例模式详细解释---模拟一个网络配置管理器,负责管理和分发网络连接参数
  • 做网站建设与推广企业/天津seo排名收费
  • 网站首页制作怎么做的/广告联盟app下载赚钱
  • 网站建设备案费用/seo教程百度网盘
  • 看板娘 wordpress/枫树seo网
  • 甘肃病毒感染最新消息/百度seo排名工具
  • 手机网站底部广告代码/引流平台有哪些