【RabbitMQ】高级特性—TTL、延迟队列详解
文章目录
- TTL
- 设置消息的 TTL
- 1. 配置交换机&队列
- 2. 发送消息
- 3. 运行程序
- 设置队列的 TTL
- 1. 配置队列和绑定关系
- 2. 发送消息
- 3. 运行程序
- 两者区别
- 延迟队列
- 概念
- 应用场景
- TTL + 死信队列实现
- 代码实现
- 存在问题
- 延迟队列插件
- 安装延迟队列插件
- 1 . 下载并上传插件
- 常见面试题
- 介绍一下 RabbitMQ 的延迟队列
TTL
TTL
(Time to Live
,过期时间),即过期时间。RabbitMQ
可以对消息和队列设置 TTL
当消息到达存活时间之后,还没有被消费,就会被自动清除
- 我们在网上购物,经常会遇到的一个场景,当下单超过 20 分钟还未付款,订单就会自动取消
- 还有类似的,申请退款之后,超过 7 天未被处理,则自动退款
设置消息的 TTL
目前有两种方法可以设置消息的 TTL
- 设置队列的
TTL
,队列中所有的消息都有相同的过期时间 - 对消息本身进行单独设置,每条消息的
TTL
可以不同
如果两种方法一起使用,则消息的TTL
以两者之间较小的那个数值为准
- 针对每条消息设置
TTL
的方法是在发送消息的方法中加入expiration
的属性参数,单位为毫秒
1. 配置交换机&队列
// 1. 交换机
@Bean("ttlExchange")
public FanoutExchange ttlExchange() { return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();
} // 2. 队列
@Bean("ttlQueue")
public Queue ttlQueue() { return QueueBuilder.durable(Constant.TTL_QUEUE).build();
} // 3. 队列和交换机绑定 Binding@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue") Queue queue) { return BindingBuilder.bind(queue).to(exchange);
}
2. 发送消息
@RequestMapping("/ttl")
public String ttl() { String ttlTime = "10000"; // 10s rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcess -> { messagePostProcess.getMessageProperties().setExpiration(ttlTime); return messagePostProcess; }); return "发送成功";
}
3. 运行程序
调用接口,发送消息,观察结果: http://127.0.0.1:8080/producer/ttl
-
发送消息后,可以看到
Read
消息为 1
-
10 秒钟之后,刷新页面,发现消息已被删除
如果不设置 TTL
,则表示此消息不会过期;如果将 TTL
设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃
设置队列的 TTL
设置队列 TTL
的方法是在创建队列时,加入 x-message-ttl
参数实现的,单位是毫秒
1. 配置队列和绑定关系
// 设置ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() { // 设置 20 秒过期 return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20*1000).build();
} // 设置队列和交换机绑定
@Bean("ttlQueue2")
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue2") Queue queue) { return BindingBuilder.bind(queue).to(exchange);
}
设置过期时间,也可以采用下面的方式
@Bean("ttlQueue2")
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue2") Queue queue) { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 20000); // 20 秒过期 return BindingBuilder.bind(queue).to(exchange);
}
2. 发送消息
@RequestMapping("/ttl") public String ttl() { // 发送不带 ttl 的消息 rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test..."); return "发送成功"; }
3. 运行程序
运行程序,观察结果。
- 发现新增了一个队列,队列
Features
有一个TTL
标识
调用接口,发送消息: http://127.0.0.1:8080/producer/ttl
-
发送消息之后,可以看到,
Ready
消息为 1
采用发布订阅模式,所有与该交换机绑定的队列(ttl_queue
和ttl_queueu2
)都会收到消息 -
20 秒钟之后,刷新页面,发现消息已经被删除
由于 ttl_queue
对类,为设置过期时间,所以 ttl_queue
的消息未被删除
两者区别
- 设置队列
TTL
属性的方法,一旦消息过期,就会从队列中删除 - 设置消息
TTL
的方法,即使消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前进行判断的
为什么这两种发发处理方式不一样?
- 因为设置过期时间,队列中已过期的消息肯定在队列头部,
RabbitMQ
只要定期从队头开始扫描是否有过期的消息即可 - 而设置消息
TTL
的方式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列,所以不如等到此消息被消费时再判定是否过期,如果过期再进行删除即可
延迟队列
概念
延迟队列(Delayed Queue
),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
应用场景
延迟队列的使用场景有很多,比如:
- 智能家居:用户希望通过手机远程遥控家里的智能设备,在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备
- 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议
- 用户注册成功后,7 天后发送短信,提高用户活跃度等
- …
RabbitMQ
本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 TTL
+死信队列的方式,组合模拟出延迟队列的功能
假设一个应用中需要将每条消息都设置为 10 秒的延迟,生产者通过 normal_exchange
这个交换机将发送的消息存储在 normal_queue
这个队列中。
- 消费者订阅的并非是
normal_queue
这个队列,而是dlx_queue
这个队列 - 当消息从
normal_queue
这个队列中过期之后,就被存入dlx_queue
这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息
TTL + 死信队列实现
代码实现
先看 TTL
+ 死信队列实现延迟队列。(继续沿用前面死信队列的代码)
声明队列
@Bean("normalQueue") public Queue normalQueue() { Map<String, Object> arguments = new HashMap<>(); // 1. 绑定死信队列 arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME); // 设置发送给死信队列的 RoutingKey arguments.put("x-dead-letter-routing-key", "dlx"); return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build(); }
生产者
发送两条消息,一条消息 10s
后过期,第二条 20s
后过期
@RequestMapping("/delay")
public String delay() { // 发送带 ttl 的消息 rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..." + new Date(), messagePostProcessor -> { // 设置 10s 过期 messagePostProcessor.getMessageProperties().setExpiration("10000"); return messagePostProcessor; }); rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..." + new Date(), messagePostProcessor -> { // 设置 20s 过期 messagePostProcessor.getMessageProperties().setExpiration("20000"); return messagePostProcessor; }); return "发送成功!";
}
消费者
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.printf("%tc 死信队列接收到消息:%s, deliveryTag: %d%n", new Date(), new String(message.getBody(), "UTF-8"), deliveryTag);
}
运行程序
调用接口,发送数据:
可以看到,两条消息按照过期时间一次进入了死信队列
延迟队列,就是希望等待特定的时间之后,消费者才能拿到这个消息。TTL
刚好可以让消息延迟一段时间成为死信,成为死信的消息会被投递到死信队列里面,这样消费者一直消费死信队列里的消息就可以了
存在问题
接下来把生产消息的顺序修改一下, 先发送 20s
过期数据,再发送 10s
过期数据
@RequestMapping("/delay")
public String delay() { // 发送带 ttl 的消息 rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..." + new Date(), messagePostProcessor -> { // 设置 20s 过期 messagePostProcessor.getMessageProperties().setExpiration("20000"); return messagePostProcessor; }); rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..." + new Date(), messagePostProcessor -> { // 设置 10s 过期 messagePostProcessor.getMessageProperties().setExpiration("10000"); return messagePostProcessor; }); return "发送成功!";
}
通过控制台观察死信队列消费情况:
- 这时会发现:
10s
过期的消息,也是在20s
后才进入到死信队列
消息过期之后,不一定会被马上丢弃。因为 RabbitMQ
只会检查队首消息是否过期。若过期,则丢到死信队列,此时就会造成一个问题
- 如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行
所以在考虑使用 TTL
+ 死信队列实现延迟任务队列的时候,需要确认业务上每个人物的延迟时间是一致的,如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延时的消息建立单独的消息队列
延迟队列插件
RabbitMQ
官方也提供了一个延迟的插件来实现延迟功能
- 参考: https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq
接下来看具体操作:
安装延迟队列插件
1 . 下载并上传插件
插件下载地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
根据自己的 RabbitMQ
版本,选择相应版本的延迟插件,下载后上传到服务器
插件上传目录参考:
常见面试题
延迟队列作为 RabbitMQ
的高级特性,也是面试的一大重点
介绍一下 RabbitMQ 的延迟队列
延时队列是一个特殊的对类,消息发送之后,并不会立即给消费者,而是等待特定的时间,才发送给消费者。
延迟队列的应用场景有很多,比如:
- 订单在十分钟内未支付自动取消
- 用户注册成功后,3 天后发问卷调查
- 用户发起退款,24 小时后商家未处理,则默认同意,自动退款
- …
但 RabbitMQ
本身并没有直接实现延迟队列,通常有两种方式:
TTL
+ 死信队列组合的方式- 使用官方提供的延迟插件实现延迟功能
二者对比:
-
基于死信队列实现的延迟队列
- 优点:
- 灵活,不需要额外的插件支持
- 缺点:
- 存在消息顺序问题
- 需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性
- 优点:
-
基于插件实现的延迟队列
- 优点:
- 用过插件可以直接创建延迟队列,简化延迟消息的实现
- 避免了
DLX
的时序问题
- 缺点:
- 需要依赖特定的插件,有运维工作
- 只适用特定版本
- 优点: