当前位置: 首页 > news >正文

【RabbitMQ】高级特性—TTL、延迟队列详解

文章目录

  • TTL
    • 设置消息的 TTL
      • 1. 配置交换机&队列
      • 2. 发送消息
      • 3. 运行程序
    • 设置队列的 TTL
      • 1. 配置队列和绑定关系
      • 2. 发送消息
      • 3. 运行程序
    • 两者区别
  • 延迟队列
    • 概念
    • 应用场景
      • TTL + 死信队列实现
        • 代码实现
        • 存在问题
    • 延迟队列插件
      • 安装延迟队列插件
        • 1 . 下载并上传插件
    • 常见面试题
      • 介绍一下 RabbitMQ 的延迟队列

TTL

TTLTime to Live,过期时间),即过期时间。RabbitMQ 可以对消息和队列设置 TTL

当消息到达存活时间之后,还没有被消费,就会被自动清除

  • 我们在网上购物,经常会遇到的一个场景,当下单超过 20 分钟还未付款,订单就会自动取消
  • 还有类似的,申请退款之后,超过 7 天未被处理,则自动退款 image.png

设置消息的 TTL

目前有两种方法可以设置消息的 TTL

  1. 设置队列的 TTL,队列中所有的消息都有相同的过期时间
  2. 对消息本身进行单独设置,每条消息的 TTL 可以不同
    如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准

  • 针对每条消息设置 TTL 的方法是在发送消息的方法中加入 expiration 的属性参数,单位为毫秒

1. 配置交换机&队列

// 1. 交换机  
@Bean("ttlExchange")  
public FanoutExchange ttlExchange() {  return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();  
}  // 2. 队列  
@Bean("ttlQueue")  
public Queue ttlQueue() {  return QueueBuilder.durable(Constant.TTL_QUEUE).build();  
}  // 3. 队列和交换机绑定 Binding@Bean("ttlBinding")  
public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue") Queue queue) {  return BindingBuilder.bind(queue).to(exchange);  
}

2. 发送消息

@RequestMapping("/ttl")  
public String ttl() {  String ttlTime = "10000";  // 10s  rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...",  messagePostProcess -> {  messagePostProcess.getMessageProperties().setExpiration(ttlTime);  return messagePostProcess;  });  return "发送成功";  
}

3. 运行程序

调用接口,发送消息,观察结果: http://127.0.0.1:8080/producer/ttl

  1. 发送消息后,可以看到 Read 消息为 1
    image.png

  2. 10 秒钟之后,刷新页面,发现消息已被删除
    image.png

如果不设置 TTL,则表示此消息不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃

设置队列的 TTL

设置队列 TTL 的方法是在创建队列时,加入 x-message-ttl 参数实现的,单位是毫秒

1. 配置队列和绑定关系

// 设置ttl  
@Bean("ttlQueue2")  
public Queue ttlQueue2() {  // 设置 20 秒过期  return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20*1000).build();  
}  // 设置队列和交换机绑定  
@Bean("ttlQueue2")  
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(exchange);  
}

设置过期时间,也可以采用下面的方式

@Bean("ttlQueue2")  
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue2") Queue queue) {  Map<String, Object> arguments = new HashMap<>();  arguments.put("x-message-ttl", 20000); // 20 秒过期  return BindingBuilder.bind(queue).to(exchange);  
}

2. 发送消息

@RequestMapping("/ttl")  public String ttl() {    // 发送不带 ttl 的消息  rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...");  return "发送成功";  }

3. 运行程序

运行程序,观察结果。

  • 发现新增了一个队列,队列 Features 有一个 TTL 标识 image.png

调用接口,发送消息: http://127.0.0.1:8080/producer/ttl

  1. 发送消息之后,可以看到,Ready 消息为 1 image.png
    采用发布订阅模式,所有与该交换机绑定的队列(ttl_queuettl_queueu2)都会收到消息

  2. 20 秒钟之后,刷新页面,发现消息已经被删除
    image.png

由于 ttl_queue 对类,为设置过期时间,所以 ttl_queue 的消息未被删除

两者区别

  • 设置队列 TTL 属性的方法,一旦消息过期,就会从队列中删除
  • 设置消息 TTL 的方法,即使消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前进行判断的

为什么这两种发发处理方式不一样?

  • 因为设置过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可
  • 而设置消息 TTL 的方式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列,所以不如等到此消息被消费时再判定是否过期,如果过期再进行删除即可

延迟队列

概念

延迟队列(Delayed Queue),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

应用场景

延迟队列的使用场景有很多,比如:

  1. 智能家居:用户希望通过手机远程遥控家里的智能设备,在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备
  2. 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议
  3. 用户注册成功后,7 天后发送短信,提高用户活跃度等

RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 TTL +死信队列的方式,组合模拟出延迟队列的功能


假设一个应用中需要将每条消息都设置为 10 秒的延迟,生产者通过 normal_exchange 这个交换机将发送的消息存储在 normal_queue 这个队列中。

  • 消费者订阅的并非是 normal_queue 这个队列,而是 dlx_queue 这个队列
  • 当消息从 normal_queue 这个队列中过期之后,就被存入 dlx_queue 这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息image.png

TTL + 死信队列实现

代码实现

先看 TTL + 死信队列实现延迟队列。(继续沿用前面死信队列的代码

声明队列

@Bean("normalQueue")  public Queue normalQueue() {  Map<String, Object> arguments = new HashMap<>();  // 1. 绑定死信队列  arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);  // 设置发送给死信队列的 RoutingKey        arguments.put("x-dead-letter-routing-key", "dlx");  return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();  }

生产者
发送两条消息,一条消息 10s 后过期,第二条 20s 后过期

@RequestMapping("/delay")  
public String delay() {  // 发送带 ttl 的消息  rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",  "ttl test 10s..." + new Date(), messagePostProcessor -> {  // 设置 10s 过期  messagePostProcessor.getMessageProperties().setExpiration("10000");  return messagePostProcessor;  });  rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",  "ttl test 20s..." + new Date(), messagePostProcessor -> {  // 设置 20s 过期  messagePostProcessor.getMessageProperties().setExpiration("20000");  return messagePostProcessor;  });  return "发送成功!";  
}

消费者

@RabbitListener(queues = Constant.DLX_QUEUE)  
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {  long deliveryTag = message.getMessageProperties().getDeliveryTag();  System.out.printf("%tc 死信队列接收到消息:%s, deliveryTag: %d%n", new Date(),  new String(message.getBody(), "UTF-8"), deliveryTag);  
}

运行程序

调用接口,发送数据: image.png

可以看到,两条消息按照过期时间一次进入了死信队列

延迟队列,就是希望等待特定的时间之后,消费者才能拿到这个消息。TTL 刚好可以让消息延迟一段时间成为死信,成为死信的消息会被投递到死信队列里面,这样消费者一直消费死信队列里的消息就可以了

存在问题

接下来把生产消息的顺序修改一下, 先发送 20s 过期数据,再发送 10s 过期数据

@RequestMapping("/delay")  
public String delay() {  // 发送带 ttl 的消息  rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",  "ttl test 20s..." + new Date(), messagePostProcessor -> {  // 设置 20s 过期  messagePostProcessor.getMessageProperties().setExpiration("20000");  return messagePostProcessor;  });  rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",  "ttl test 10s..." + new Date(), messagePostProcessor -> {  // 设置 10s 过期  messagePostProcessor.getMessageProperties().setExpiration("10000");  return messagePostProcessor;  });  return "发送成功!";  
}

通过控制台观察死信队列消费情况:
image.png

  • 这时会发现:10s 过期的消息,也是在 20s 后才进入到死信队列

消息过期之后,不一定会被马上丢弃。因为 RabbitMQ 只会检查队首消息是否过期。若过期,则丢到死信队列,此时就会造成一个问题

  • 如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行

所以在考虑使用 TTL + 死信队列实现延迟任务队列的时候,需要确认业务上每个人物的延迟时间是一致的,如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延时的消息建立单独的消息队列

延迟队列插件

RabbitMQ 官方也提供了一个延迟的插件来实现延迟功能

  • 参考: https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq

接下来看具体操作:

安装延迟队列插件

1 . 下载并上传插件

插件下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

image.png|438

根据自己的 RabbitMQ 版本,选择相应版本的延迟插件,下载后上传到服务器

插件上传目录参考:

常见面试题

延迟队列作为 RabbitMQ 的高级特性,也是面试的一大重点

介绍一下 RabbitMQ 的延迟队列

延时队列是一个特殊的对类,消息发送之后,并不会立即给消费者,而是等待特定的时间,才发送给消费者。

延迟队列的应用场景有很多,比如:

  1. 订单在十分钟内未支付自动取消
  2. 用户注册成功后,3 天后发问卷调查
  3. 用户发起退款,24 小时后商家未处理,则默认同意,自动退款

RabbitMQ 本身并没有直接实现延迟队列,通常有两种方式:

  1. TTL + 死信队列组合的方式
  2. 使用官方提供的延迟插件实现延迟功能

二者对比

  1. 基于死信队列实现的延迟队列

    • 优点:
      1. 灵活,不需要额外的插件支持
    • 缺点:
      1. 存在消息顺序问题
      2. 需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性
  2. 基于插件实现的延迟队列

    • 优点:
      1. 用过插件可以直接创建延迟队列,简化延迟消息的实现
      2. 避免了 DLX 的时序问题
    • 缺点:
      1. 需要依赖特定的插件,有运维工作
      2. 只适用特定版本
http://www.dtcms.com/a/323238.html

相关文章:

  • Java 中的编译与反编译:全面解析与实践指南
  • drippingblues靶机
  • 四边形(梯形、平行四边形、矩形、菱形和正方形)
  • [贪心]田忌赛马
  • Aurora接口FPGA设计
  • QT Creator 5.14.2安装
  • 卷板矫平机:给一张钢板做“拉伸放松操”
  • 北大回应录取通知书被指存在语句问题
  • Claude Code 与 Cursor 技术对比:架构差异与适用场景分析
  • 四、RuoYi-Cloud-Plus 部署时nacos配置服务启动
  • NVIDIA Jetson实战笔记
  • 相册管理系统介绍
  • <PLC><汇川><字符转换>在汇川PLC中,如何进行字符串的转换与比较?
  • 实数与复数及欧拉公式关系
  • WeTok Powerful Discrete Tokenization for High-Fidelity Visual Reconstruction
  • DAY 37 作业(补)
  • vue3上传的文件在线查看
  • Mistral Small 3.1 架构深度解析:高效小型模型的巅峰之作
  • 华数杯C题:可调控生物节律的LED光源研究——数学建模与Python实战
  • 应用层Http协议(1)
  • 大玄古籍制作软件【详细教程20:txt文档config自动化配置】,排版软件,自动排版,排版设计,个人出书,一键排版
  • MATLAB中文乱码的解决方法
  • 吴恩达机器学习笔记(4)—多变量线性回归:梯度下降(附代码)
  • STM32学习笔记6-TIM-2输出比较功能
  • Python(13) -- 面向对象
  • 智慧能源设备巡检缺陷漏检率↓76%:陌讯多模态融合算法实战解析
  • 设备点检系统二维码的应用
  • ISO5001能源管理体系认证的流程
  • 频域中的正弦波
  • Datawhale+AI夏令营_让AI读懂财报PDF task2深入赛题笔记