当前位置: 首页 > news >正文

RabbitMQ--延时队列总结

一、延迟队列概念

        延迟队列(Delay Queue)是一种特殊类型的队列,队列中的元素需要在指定的时间点被取出和处理。简单来说,延时队列就是存放需要在某个特定时间被处理的消息。它的核心特性在于“延迟”——消息在队列中停留一段时间,直到满足设定的延迟时间才会被处理。

关键特性:
  • 延时队列中的消息会在指定时间点才被消费。

  • 适用于时间敏感的任务调度,如订单过期、任务超时等。

二、延迟队列的使用场景

延迟队列适用于以下场景:

  1. 订单支付超时自动取消

    • 例如,订单生成后 10 分钟未支付,自动取消订单。

  2. 店铺商品上传提醒

    • 新店铺如果 10 天内没有上传商品,系统自动发送提醒消息。

  3. 用户未登录短信提醒

    • 用户注册后,若 3 天内没有登录,发送短信提醒用户登录。

  4. 退款超时提醒

    • 用户发起退款请求后,如果 3 天内未处理,自动通知运营人员。

  5. 会议提醒

    • 预定会议后,提前 10 分钟通知与会人员。

这些场景的特点是:在某个事件发生后,或者在某个时间点之前,需要完成某项任务。比如,在订单生成事件发生 10 分钟后,检查订单支付状态,未支付则关闭订单。

为什么不使用定时任务?
  • 对于小规模的数据量,可以使用定时任务每秒轮询一次进行处理。

  • 但是当数据量非常庞大(如百万级别的订单检查)时,轮询的方式会给数据库和系统带来巨大压力,无法满足高效处理的需求。

  • 延时队列通过精准的延迟时间控制和异步处理,能够高效地解决这个问题。

三、 如何在 RabbitMQ 中实现延时队列?

我们有两种常用的方式来实现延时队列:

  1. 通过 TTL(消息过期时间或队列过期时间)和死信队列实现:我们可以给队列里的消息设置一个有效期(TTL),一旦消息过期,它就会被路由到一个死信队列,再由死信队列进行消费。

  2. 使用 x-delayed-message 插件:这个插件是官方提供的,它允许我们给消息指定一个延迟时间,在这个时间到期之前,消息不会被消费者消费。

四、通过 TTL(消息过期时间或队列过期时间)和死信队列实现样例

在讨论这个问题前先来了解几个知识点

知识点①:RabbitMQ 中的 TTL(Time-to-Live)

        TTL 是 RabbitMQ 中用来控制消息或队列存活时间的属性。TTL 的单位是毫秒,表示一条消息或队列中的消息在指定时间内没有被消费时,消息会过期,成为死信。

(1) TTL 的两种设置方式
  • 消息 TTL:可以在发送每条消息时指定 TTL。

    例如,发送消息时设置 TTL 为 10 秒:

    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息延迟时间为 10 秒.build();channel.basicPublish("exchange", "routing_key", properties, "message".getBytes());
    
  • 队列 TTL:在创建队列时设置该队列内所有消息的 TTL。队列的 TTL 会影响队列中所有消息的过期时间。

    例如,在队列声明时设置 x-message-ttl

    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 10000);  // 设置消息的 TTL 为 10 秒
    channel.queueDeclare("queue", true, false, false, args);
    
(2) TTL 的行为
  • 队列 TTL:如果设置了队列 TTL,则队列内所有消息的 TTL 会被统一管理。如果消息超时,它会被丢弃或者路由到死信队列。

  • 消息 TTL:如果设置了消息 TTL,那么每条消息的 TTL 都会单独管理。如果消息未能在 TTL 时间内消费,则会成为“死信”。

知识点②: 死信队列(Dead Letter Queue)

 这里死信队列以及TTL的讲解笔者可以去查看这篇博客:Rabbitmq中的死信队列-CSDN博客

当消息过期或被拒绝时,消息会被发送到死信队列。死信队列用于接收那些已经过期的消息或被拒绝的消息,这样消费者可以集中处理这些需要处理的消息。

(1) 如何利用死信队列实现延时队列?
  1. 设置队列的 TTL,使消息在到期后成为死信。

  2. 配置死信队列,使过期的消息进入死信队列。

  3. 消费者从死信队列消费,定期消费这些过期的消息。

方式一:RabbitMQ 延时队列的实现方式(给消息设置TTL和死信队列)

(1)配置文件类代码

@Component
public class MsgTtlQueueConfig {public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String QUEUE_C = "QC";//声明队列 C 死信交换机@Bean("queueC")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuecBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}

(2)消息生产者代码

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}

(3) 发送请求

  • http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
  • http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000       

发送一个 HTTP 请求,参数中包括消息内容和 TTL(过期时间)。

        http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000 表示发送消息"你好 2"并设置该消息的 TTL 为 2000 毫秒(即 2 秒)。2 秒内没有被消费者消费,该消息就会被 RabbitMQ 丢弃。

(4) 给消息设置TTL和死信队列的问题

        你当前的设计是为每条消息单独设置 TTL(通过         correlationData.getMessageProperties().setExpiration(ttlTime)),而不是为队列本身设置 TTL。这样做的目的是希望每条消息有不同的过期时间,从而实现不同的延时处理。

设计中可能存在的问题

        消息 TTL 是通过设置每条消息的 expiration 属性来控制每条消息的过期时间。每条消息可以有不同的 TTL,这样可以灵活地指定不同的消息延迟时间。    

    问题出在 RabbitMQ 的消息消费机制 上:RabbitMQ 是按照队列中的消息顺序来消费消息的,且它只会检查队列里的消息是否过期,而不是单独检查每条消息的 TTL。

  • 消费顺序问题

    假设队列中有两条消息:

    1. 第一条消息的 TTL 设置为 20 秒。

    2. 第二条消息的 TTL 设置为 2 秒。

                在这种情况下,RabbitMQ 会 按顺序检查队列中的消息,也就是说,它首先会检查第一条消息(TTL 20 秒),即使第二条消息的 TTL 很短(只有 2 秒)。如果第一条消息还没有过期,RabbitMQ 会先检查它,然后再检查第二条消息。结果就是,第二条消息可能会被延迟,即使它的 TTL 已经过期。

              也就是说即使第二条消息的 TTL 设置为 2 秒,然后此时第二条消息已经过期,它也会等待第一条消息被消费(进入死信队列后)后才会检查。这意味着 第二条消息在第一条消息未过期的情况下不会立刻进入死信队列,而是会等到第一条消息被消费,才会去检查是否过期。所以会被延迟

        

方式二:RabbitMQ 延时队列的实现方式(给队列设置TTL和死信队列)

RabbitMQ 的延时队列可以通过 TTL 配合死信队列实现,具体步骤如下:

(1) 设置队列的 TTL

在创建队列时,我们设置队列的 x-message-ttl 属性,控制消息的生存时间。例如:

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 10000);  // 设置队列消息的 TTL 为 10 秒
args.put("x-dead-letter-exchange", "dlx_exchange");  // 设置死信交换机
args.put("x-dead-letter-routing-key", "dlx_routing_key");  // 设置死信路由键
channel.queueDeclare("ttl_queue", true, false, false, args);
(2) 配置死信队列

设置死信交换机和死信路由键,当消息 TTL 到期后,它会进入死信队列。

channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "dlx_routing_key");
(3) 发送消息时设置 TTL

发送消息时,可以给消息设置 expiration 属性来控制消息的延迟时间。例如,10 秒后该消息将变为死信:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息的 TTL 为 10 秒.build();channel.basicPublish("exchange", "routing_key", properties, "message".getBytes());
(4) 消费死信队列

消费者从死信队列中获取消息进行处理:

channel.basicConsume("dlx_queue", true, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received expired message: " + message);
}, consumerTag -> {});
(5)总结
  • 延迟队列 通过让消息在指定时间后再被消费,解决了定时任务和轮询检查的性能问题。

  • TTL死信队列 是实现 RabbitMQ 延时队列的关键技术,通过控制消息的存活时间和让过期消息进入死信队列,消费者可以按需处理这些消息。

  • 适用场景包括:订单超时、任务调度、消息提醒等。

  • 延时队列的核心需求是让消息在指定时间后被处理,而 RabbitMQ 中的 TTL(过期时间)正好能实现这一点。当消息的 TTL 到期后,它会变成死信并被投递到死信队列。这样,消费者只需要持续从死信队列消费消息即可,因为队列中的消息都是等待被及时处理的。这种方式实现了高效的延时处理,同时避免了轮询和重复检查。

(6) 给队列设置TTL和死信队列的问题
        如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10秒1个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,需要要增加无数个队列才能满足需求

五、使用 x-delayed-message 插件实现延时队列实现样例

5.1 安装插件

要启用延时队列,首先要安装 rabbitmq-delayed-message-exchange 插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
5.2 创建延时交换机

创建一个交换机时,指定它是一个 x-delayed-message 类型的交换机。通过这个交换机来处理延时消息:

Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");  // 设定延时交换机的类型,通常是 direct 类型channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);
5.3 发送延时消息

发送消息时,我们需要指定延迟的时间。这个时间通过设置消息的 expiration 属性来实现,单位是毫秒:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息延迟时间为 10 秒.build();channel.basicPublish("delayed_exchange", "routing_key", properties, "Hello, delayed message".getBytes());
5.4 消费消息

消费者与普通消息的消费方式一样,消息会在延迟时间到期后被消费:

channel.basicConsume("delayed_queue", true, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received message: " + message);
}, consumerTag -> {});

文章转载自:

http://uyTxAGXL.wphfL.cn
http://dHYWpIip.wphfL.cn
http://0SLAuuvb.wphfL.cn
http://aNSForAm.wphfL.cn
http://r2BdaiPs.wphfL.cn
http://96iLryEI.wphfL.cn
http://1fC18yW2.wphfL.cn
http://sFva0lf3.wphfL.cn
http://2MK5wEWK.wphfL.cn
http://WSXNQnhP.wphfL.cn
http://JYz4zQpo.wphfL.cn
http://23fguMEP.wphfL.cn
http://lW8pgGdG.wphfL.cn
http://UlMt9hF6.wphfL.cn
http://CviCYJ5w.wphfL.cn
http://DSSZEyFE.wphfL.cn
http://GCnAzyOL.wphfL.cn
http://H81BtOXN.wphfL.cn
http://mT4qvcbP.wphfL.cn
http://xaz2ru01.wphfL.cn
http://ea7UJhKo.wphfL.cn
http://DijWJDK2.wphfL.cn
http://tAJFhFDP.wphfL.cn
http://sJRdRAib.wphfL.cn
http://Yt2fjmm8.wphfL.cn
http://rFl9yt6s.wphfL.cn
http://p6qEV0VG.wphfL.cn
http://9kYaNglp.wphfL.cn
http://kqTvYA40.wphfL.cn
http://8odoiSMJ.wphfL.cn
http://www.dtcms.com/a/371415.html

相关文章:

  • 开放式LLM的崛起:未来已至
  • 驱动物流创新与协同,助力物流行业可持续发展的智慧物流开源了
  • GitHub 热榜项目 - 日榜(2025-09-07)
  • 定制开发开源AI智能名片S2B2C商城小程序的优势与局限性分析
  • 开源 C++ QT Widget 开发(十三)IPC通讯--本地套接字 (Local Socket)
  • win10(三)视频剪裁
  • ELK 平台入门与架构设计
  • 【音视频】Http-FLV 介绍
  • 从抽象到实现:Elasticsearch数据类型及其底层Lucene数据结构的深度解析
  • 互联网接入网中PPPoE和PPP协议
  • AI工具深度测评与选型指南 - 音视频生成与处理类
  • iceoryx高性能进程间通信中间件,在Windows环境的编译教程
  • Linux驱动开发(1)环境与代码框架
  • 造粒机cad+设计说明书
  • 游戏中的设计模式——第三篇 简单工厂模式
  • TCP, 三次握手, 四次挥手, 滑动窗口, 快速重传, 拥塞控制, 半连接队列, RST, SYN, ACK
  • Ansible 角色(Roles)
  • 深入理解 X25519 与 Ed25519:密钥交换与签名验签全流程解析
  • 【Python】数据可视化之热力图
  • 分布式专题——2 深入理解Redis线程模型
  • 【xss漏洞waf绕过】
  • Next.js中静态资源处理:图片、字体和其他文件
  • PyCharm 从入门到高效:安装教程 + 快捷键速查表
  • Linux 之从硬件硬盘到文件系统的全面过渡
  • STM32的USART的数据寄存器只有一个吗?
  • 【RabbitMQ】---RabbitMQ 工作流程和 web 界面介绍
  • HakcMyVM-Literal
  • C++判断字符串是否是回文(palindrome)
  • 测试:BUG篇
  • web后端知识(php和python)——第一阶段