RabbitMQ--延时队列总结
一、延迟队列概念
延迟队列(Delay Queue)是一种特殊类型的队列,队列中的元素需要在指定的时间点被取出和处理。简单来说,延时队列就是存放需要在某个特定时间被处理的消息。它的核心特性在于“延迟”——消息在队列中停留一段时间,直到满足设定的延迟时间才会被处理。
关键特性:
延时队列中的消息会在指定时间点才被消费。
适用于时间敏感的任务调度,如订单过期、任务超时等。
二、延迟队列的使用场景
延迟队列适用于以下场景:
订单支付超时自动取消:
例如,订单生成后 10 分钟未支付,自动取消订单。
店铺商品上传提醒:
新店铺如果 10 天内没有上传商品,系统自动发送提醒消息。
用户未登录短信提醒:
用户注册后,若 3 天内没有登录,发送短信提醒用户登录。
退款超时提醒:
用户发起退款请求后,如果 3 天内未处理,自动通知运营人员。
会议提醒:
预定会议后,提前 10 分钟通知与会人员。
这些场景的特点是:在某个事件发生后,或者在某个时间点之前,需要完成某项任务。比如,在订单生成事件发生 10 分钟后,检查订单支付状态,未支付则关闭订单。
为什么不使用定时任务?
对于小规模的数据量,可以使用定时任务每秒轮询一次进行处理。
但是当数据量非常庞大(如百万级别的订单检查)时,轮询的方式会给数据库和系统带来巨大压力,无法满足高效处理的需求。
延时队列通过精准的延迟时间控制和异步处理,能够高效地解决这个问题。
三、 如何在 RabbitMQ 中实现延时队列?
我们有两种常用的方式来实现延时队列:
通过 TTL(消息过期时间或队列过期时间)和死信队列实现:我们可以给队列里的消息设置一个有效期(TTL),一旦消息过期,它就会被路由到一个死信队列,再由死信队列进行消费。
使用
x-delayed-message
插件:这个插件是官方提供的,它允许我们给消息指定一个延迟时间,在这个时间到期之前,消息不会被消费者消费。四、通过 TTL(消息过期时间或队列过期时间)和死信队列实现样例
在讨论这个问题前先来了解几个知识点
知识点①:RabbitMQ 中的 TTL(Time-to-Live)
TTL 是 RabbitMQ 中用来控制消息或队列存活时间的属性。TTL 的单位是毫秒,表示一条消息或队列中的消息在指定时间内没有被消费时,消息会过期,成为死信。
(1) TTL 的两种设置方式
消息 TTL:可以在发送每条消息时指定 TTL。
例如,发送消息时设置 TTL 为 10 秒:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000") // 设置消息延迟时间为 10 秒.build();channel.basicPublish("exchange", "routing_key", properties, "message".getBytes());
队列 TTL:在创建队列时设置该队列内所有消息的 TTL。队列的 TTL 会影响队列中所有消息的过期时间。
例如,在队列声明时设置
x-message-ttl
:Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 10000); // 设置消息的 TTL 为 10 秒 channel.queueDeclare("queue", true, false, false, args);
(2) TTL 的行为
队列 TTL:如果设置了队列 TTL,则队列内所有消息的 TTL 会被统一管理。如果消息超时,它会被丢弃或者路由到死信队列。
消息 TTL:如果设置了消息 TTL,那么每条消息的 TTL 都会单独管理。如果消息未能在 TTL 时间内消费,则会成为“死信”。
知识点②: 死信队列(Dead Letter Queue)
这里死信队列以及TTL的讲解笔者可以去查看这篇博客:Rabbitmq中的死信队列-CSDN博客
当消息过期或被拒绝时,消息会被发送到死信队列。死信队列用于接收那些已经过期的消息或被拒绝的消息,这样消费者可以集中处理这些需要处理的消息。
(1) 如何利用死信队列实现延时队列?
设置队列的 TTL,使消息在到期后成为死信。
配置死信队列,使过期的消息进入死信队列。
消费者从死信队列消费,定期消费这些过期的消息。
方式一:RabbitMQ 延时队列的实现方式(给消息设置TTL和死信队列)
(1)配置文件类代码
@Component public class MsgTtlQueueConfig {public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String QUEUE_C = "QC";//声明队列 C 死信交换机@Bean("queueC")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuecBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");} }
(2)消息生产者代码
@GetMapping("sendExpirationMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message); }
(3) 发送请求
- http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
- http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
发送一个 HTTP 请求,参数中包括消息内容和 TTL(过期时间)。
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000 表示发送消息"你好 2"并设置该消息的 TTL 为 2000 毫秒(即 2 秒)。2 秒内没有被消费者消费,该消息就会被 RabbitMQ 丢弃。
(4) 给消息设置TTL和死信队列的问题
你当前的设计是为每条消息单独设置 TTL(通过
correlationData.getMessageProperties().setExpiration(ttlTime)
),而不是为队列本身设置 TTL。这样做的目的是希望每条消息有不同的过期时间,从而实现不同的延时处理。设计中可能存在的问题
消息 TTL 是通过设置每条消息的
expiration
属性来控制每条消息的过期时间。每条消息可以有不同的 TTL,这样可以灵活地指定不同的消息延迟时间。问题出在 RabbitMQ 的消息消费机制 上:RabbitMQ 是按照队列中的消息顺序来消费消息的,且它只会检查队列里的消息是否过期,而不是单独检查每条消息的 TTL。
消费顺序问题:
假设队列中有两条消息:
第一条消息的 TTL 设置为 20 秒。
第二条消息的 TTL 设置为 2 秒。
在这种情况下,RabbitMQ 会 按顺序检查队列中的消息,也就是说,它首先会检查第一条消息(TTL 20 秒),即使第二条消息的 TTL 很短(只有 2 秒)。如果第一条消息还没有过期,RabbitMQ 会先检查它,然后再检查第二条消息。结果就是,第二条消息可能会被延迟,即使它的 TTL 已经过期。
也就是说即使第二条消息的 TTL 设置为 2 秒,然后此时第二条消息已经过期,它也会等待第一条消息被消费(进入死信队列后)后才会检查。这意味着 第二条消息在第一条消息未过期的情况下不会立刻进入死信队列,而是会等到第一条消息被消费,才会去检查是否过期。所以会被延迟
方式二:RabbitMQ 延时队列的实现方式(给队列设置TTL和死信队列)
RabbitMQ 的延时队列可以通过 TTL 配合死信队列实现,具体步骤如下:
(1) 设置队列的 TTL
在创建队列时,我们设置队列的
x-message-ttl
属性,控制消息的生存时间。例如:Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 10000); // 设置队列消息的 TTL 为 10 秒 args.put("x-dead-letter-exchange", "dlx_exchange"); // 设置死信交换机 args.put("x-dead-letter-routing-key", "dlx_routing_key"); // 设置死信路由键 channel.queueDeclare("ttl_queue", true, false, false, args);
(2) 配置死信队列
设置死信交换机和死信路由键,当消息 TTL 到期后,它会进入死信队列。
channel.queueDeclare("dlx_queue", true, false, false, null); channel.queueBind("dlx_queue", "dlx_exchange", "dlx_routing_key");
(3) 发送消息时设置 TTL
发送消息时,可以给消息设置
expiration
属性来控制消息的延迟时间。例如,10 秒后该消息将变为死信:AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000") // 设置消息的 TTL 为 10 秒.build();channel.basicPublish("exchange", "routing_key", properties, "message".getBytes());
(4) 消费死信队列
消费者从死信队列中获取消息进行处理:
channel.basicConsume("dlx_queue", true, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received expired message: " + message); }, consumerTag -> {});
(5)总结
延迟队列 通过让消息在指定时间后再被消费,解决了定时任务和轮询检查的性能问题。
TTL 和 死信队列 是实现 RabbitMQ 延时队列的关键技术,通过控制消息的存活时间和让过期消息进入死信队列,消费者可以按需处理这些消息。
适用场景包括:订单超时、任务调度、消息提醒等。
延时队列的核心需求是让消息在指定时间后被处理,而 RabbitMQ 中的 TTL(过期时间)正好能实现这一点。当消息的 TTL 到期后,它会变成死信并被投递到死信队列。这样,消费者只需要持续从死信队列消费消息即可,因为队列中的消息都是等待被及时处理的。这种方式实现了高效的延时处理,同时避免了轮询和重复检查。
(6) 给队列设置TTL和死信队列的问题
如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10秒1个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,需要要增加无数个队列才能满足需求五、使用
x-delayed-message
插件实现延时队列实现样例
5.1 安装插件
要启用延时队列,首先要安装
rabbitmq-delayed-message-exchange
插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
5.2 创建延时交换机
创建一个交换机时,指定它是一个
x-delayed-message
类型的交换机。通过这个交换机来处理延时消息:Channel channel = connection.createChannel(); Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); // 设定延时交换机的类型,通常是 direct 类型channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);
5.3 发送延时消息
发送消息时,我们需要指定延迟的时间。这个时间通过设置消息的
expiration
属性来实现,单位是毫秒:AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000") // 设置消息延迟时间为 10 秒.build();channel.basicPublish("delayed_exchange", "routing_key", properties, "Hello, delayed message".getBytes());
5.4 消费消息
消费者与普通消息的消费方式一样,消息会在延迟时间到期后被消费:
channel.basicConsume("delayed_queue", true, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received message: " + message); }, consumerTag -> {});