当前位置: 首页 > news >正文

什么是延迟队列?RabbitMQ 如何实现延迟队列?

画板

什么是延迟队列

  • 定义:延迟队列是一种特殊的队列,队列中的元素(消息)并不会立即被消费者获取并处理,而是在经过一段指定的延迟时间后,才会被消费者消费。它主要用于需要在特定时间点或经过一定时间间隔后执行的任务场景。

RabbitMQ实现延迟队列的方法

利用消息的TTL(Time-To-Live)和死信队列(DLQ)组合

  • 原理:为队列或消息设置TTL,当消息在队列中存活时间超过TTL值,就会变成死信。将该队列与死信交换机绑定,死信交换机再将死信路由到另一个队列(即延迟队列),消费者从这个延迟队列中获取消息进行处理。
  • 代码
1. 定义 RabbitMQ 相关配置,声明普通队列并配置TTL和死信交换机
public class RabbitMQConfig {// 延迟队列public static final String DELAY_QUEUE = "delay.queue";// 延迟交换机public static final String DELAY_EXCHANGE = "delay.exchange";// 延迟路由键public static final String DELAY_ROUTING_KEY = "delay.routing.key";// 死信队列public static final String DLX_QUEUE = "dlx.queue";// 死信交换机public static final String DLX_EXCHANGE = "dlx.exchange";// 死信路由键public static final String DLX_ROUTING_KEY = "dlx.routing.key";// 创建连接public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxxx");    // 设置RabbitMQ服务器地址factory.setPort(5672);           // 设置RabbitMQ服务器端口factory.setUsername("admin");    // 设置用户名factory.setPassword("admin");    // 设置密码return factory.newConnection();}// 初始化队列和交换机public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 声明死信交换机channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);// 声明死信队列channel.queueDeclare(DLX_QUEUE, true, false, false, null);// 绑定死信队列和交换机channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);// 声明延迟交换机channel.exchangeDeclare(DELAY_EXCHANGE, "direct", true);// 声明延迟队列,并设置死信参数Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE);args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);channel.queueDeclare(DELAY_QUEUE, true, false, false, args);// 绑定延迟队列和交换机channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);}}
} 
2. 生产者和消费者
// 消息发送者
@Slf4j
public class MessageProducer {public void sendMessage(String message, int delayTime) throws Exception {try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {log.info("发送延迟消息: {}, 延迟时间: {}ms", message, delayTime);// 设置消息的过期时间AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(delayTime)).build();channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE,RabbitMQConfig.DELAY_ROUTING_KEY,properties,message.getBytes());}}
} // 消息 消费者
@Slf4j
public class MessageConsumer {public void consumeDelayQueue() throws Exception {Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1channel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到延迟消息: {}", message);// 确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费死信队列(实际处理延迟消息的队列)channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {});}
} 
3. 测试
@Slf4j
public class DelayQueueTest {public static void main(String[] args) throws Exception {// 初始化队列和交换机RabbitMQConfig.init();// 创建生产者和消费者MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 启动消费者线程new Thread(() -> {try {consumer.consumeDelayQueue();} catch (Exception e) {e.printStackTrace();}}).start();// 发送不同延迟时间的消息producer.sendMessage("延迟5秒的消息", 5000);producer.sendMessage("延迟10秒的消息", 10000);producer.sendMessage("延迟15秒的消息", 15000);log.info("所有消息已发送,等待延迟处理...");// 保持程序运行Thread.sleep(20000);}
} 

从时间便可以看出消息是延迟消费了。

使用插件(rabbitmq-delay-message-exchange插件)

  • 原理:该插件提供了一种更直接的方式来实现延迟队列。它在RabbitMQ中添加了一个自定义的交换机类型(如x-delay-message),生产者可以直接在发送消息时指定延迟时间,消息会在指定延迟时间后被路由到绑定的队列。

  • 实现步骤
    • 插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
    • 安装插件:选择适配的版本,在将rabbitmq-delay-message-exchange插件下载并安装到RabbitMQ服务器上,然后启用插件。

消息流转过程如下:
在这里插入图片描述

  • 代码
RabbitMQ 相关配置,定义延迟队列
public class RabbitMQConfig {// 延迟队列public static final String DELAY_QUEUE = "plugin.delay.queue";// 延迟交换机public static final String DELAY_EXCHANGE = "plugin.delay.exchange";// 延迟路由键public static final String DELAY_ROUTING_KEY = "plugin.delay.routing.key";// 创建连接public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxxxxx");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");return factory.newConnection();}// 初始化队列和交换机public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 声明延迟交换机(使用x-delayed-message类型)Map<String, Object> args = new java.util.HashMap<>();args.put("x-delayed-type", "direct");channel.exchangeDeclare(DELAY_EXCHANGE, "x-delayed-message", true, false, args);// 声明延迟队列channel.queueDeclare(DELAY_QUEUE, true, false, false, null);// 绑定延迟队列和交换机channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);}}
} 
生产者和消费者
// 生产者
@Slf4j
public class MessageProducer {public void sendMessage(String message, int delayTime) throws Exception {try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {log.info("发送延迟消息: {}, 延迟时间: {}ms", message, delayTime);// 设置消息的延迟时间AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(new java.util.HashMap<String, Object>() {{put("x-delay", delayTime);}}).build();channel.basicPublish(RabbitMQConfig.DELAY_EXCHANGE,RabbitMQConfig.DELAY_ROUTING_KEY,properties,message.getBytes());}}
} // 消费者
@Slf4j
public class MessageConsumer {public void consumeDelayQueue() throws Exception {Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1channel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到延迟消息: {}", message);// 确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费延迟队列channel.basicConsume(RabbitMQConfig.DELAY_QUEUE, false, deliverCallback, consumerTag -> {});}
} 
测试
@Slf4j
public class DelayQueuePluginTest {public static void main(String[] args) throws Exception {// 初始化队列和交换机RabbitMQConfig.init();// 创建生产者和消费者MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 启动消费者线程new Thread(() -> {try {consumer.consumeDelayQueue();} catch (Exception e) {log.error("消费者异常", e);}}).start();// 发送不同延迟时间的消息producer.sendMessage("延迟5秒的消息", 5000);producer.sendMessage("延迟10秒的消息", 10000);producer.sendMessage("延迟15秒的消息", 15000);log.info("所有消息已发送,等待延迟处理...");// 保持程序运行Thread.sleep(20000);}
} 

同样可以达到延迟消费的效果。

从管理台可以看到已创建 type=x-delayed-message 的交换机

使用场景

延迟队列的使用场景有很多,主要能够有效解决需要在特定时间或经过一定延迟后执行任务的需求。下面列举一些使用场景:

  • 订单超时处理:用户下单后,订单进入延迟队列,设置一定的延迟时间(如30分钟)。若在该时间内用户未完成支付,消息从延迟队列中被消费,系统自动取消订单并释放库存。这保证了库存的合理利用,避免资源浪费。防止用户长时间占用库存而不付款,影响其他用户购买。
  • 退款处理:当用户发起退款申请后,退款请求进入延迟队列。经过一定时间(如72小时),系统检查该订单是否有新的状态变化(如商家拒绝退款、用户撤销申请等)。若无变化,则自动执行退款操作,提高退款处理效率,减少人工干预。
  • 还款提醒:在还款日前几天,将还款提醒消息放入延迟队列,根据不同用户设置不同的延迟时间。到了设定时间,系统从队列中取出消息,向用户发送还款提醒,帮助用户避免逾期还款,减少逾期风险。
  • 转账确认超时处理:在进行跨行转账等操作时,转账请求进入延迟队列。若在规定时间(如24小时)内未收到对方银行的确认信息,消息被消费,系统自动回滚转账操作,并通知用户转账失败,保障资金安全和交易的准确性。
  • 包裹逾期未取提醒:当包裹到达配送点一定时间(如3天)后仍未被取走,将提醒消息放入延迟队列。延迟时间到达后,系统自动发送提醒通知给收件人,提醒其尽快取件,提高包裹周转效率,减少配送点的存储压力。
  • 配送延迟预警:根据物流运输的预计时间,将预警消息放入延迟队列。如果在预计到达时间前,包裹状态仍未更新为已送达,消息被消费,系统向相关人员(如配送员、客服、收件人)发送配送延迟预警,便于及时沟通和处理。

相关文章:

  • Lost connect to debugger on ‘iphone‘
  • [ctfshow web入门] web58
  • 【算法-链表】链表操作技巧:常见算法
  • 《数据结构初阶》【链式二叉树】
  • 从父类到子类:C++ 继承的奇妙旅程(1)
  • 什么是HTML、CSS 和 JavaScript?
  • 如何阅读、学习 Git 核心源代码 ?
  • 使用C# ASP.NET创建一个可以由服务端推送信息至客户端的WEB应用(2)
  • 缓存套餐-03.功能测试
  • 缓存(1):三级缓存
  • 如何利用 Elastic Load Balancing 提升应用性能与可用性?
  • java CyclicBarrier
  • 模拟堆(算法题)
  • Linux电源管理(7)_Wakeup events framework
  • 【神经网络与深度学习】VAE 在解码前进行重参数化
  • 前端线上错误日志收集与定位指南
  • MySQL概念简介
  • C++ stl中的vector的相关用法 迭代器失效问题
  • 第4章 递推法
  • 1688拍立淘搜索相似商品API接口概述,json数据示例参考
  • 九家企业与上海静安集中签约,投资额超10亿元
  • 85后清华博士黄佐财任湖北咸宁市咸安区委副书记、代区长
  • 中美“第二阶段”贸易协定是否会在会谈中提出?商务部回应
  • 阿森纳被打得毫无脾气,回天无力的阿尔特塔只剩嘴硬
  • 美联储主席:不打算先发制人地降息,将继续观望
  • 习近平向“和平薪火 时代新章——纪念中国人民抗日战争和苏联伟大卫国战争胜利80周年中俄人文交流活动”致贺信