【rabbitmq 高级特性】RabbitMQ 延迟队列全面解析
目录
1. 延迟队列的概念与原理
1.1 什么是延迟队列
1.2 延迟队列的实现原理
2. TTL+死信队列实现延迟队列
2.1 实现原理
2.2 代码实现
2.2.1 常量定义
2.2.2 配置类
2.2.3 生产者
2.2.4 消费者
2.3 TTL+死信队列的局限性
3. 使用延迟插件实现延迟队列
3.1 安装延迟插件
3.2 代码实现
3.2.1 常量定义
3.2.2 配置类
3.2.3 生产者
3.2.4 消费者
4. 两种实现方式的对比
5. 应用场景
5.1 订单超时取消
5.2 定时提醒
5.3 重试机制
6. 注意事项
7. 管理界面查看
1. 延迟队列的概念与原理
1.1 什么是延迟队列
延迟队列(Delayed Queue)是指消息被发送后,不会立即被消费者获取,而是等待特定时间后,消费者才能获取到这些消息进行消费。
1.2 延迟队列的实现原理
RabbitMQ本身没有直接支持延迟队列的功能,但可以通过两种方式实现:
-
TTL + 死信队列组合:通过设置消息或队列的TTL,配合死信交换机实现
-
官方延迟插件:使用RabbitMQ官方提供的
rabbitmq_delayed_message_exchange
插件
2. TTL+死信队列实现延迟队列
2.1 实现原理
通过设置消息或队列的TTL,使消息在指定时间后过期,然后通过死信机制将过期消息路由到死信队列,消费者监听死信队列即可实现延迟消费。
2.2 代码实现
2.2.1 常量定义
public class Constant {// 死信交换机public static final String DLX_EXCHANGE_NAME = "dlx_exchange";// 死信队列public static final String DLX_QUEUE = "dlx_queue";// 正常交换机public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";// 正常队列public static final String NORMAL_QUEUE = "normal_queue";
}
2.2.2 配置类
@Configuration
public class DLXConfig {// 死信交换机@Bean("dlxExchange")public Exchange dlxExchange() {return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();}// 死信队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DLX_QUEUE).build();}// 死信绑定@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}// 正常交换机@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();}// 正常队列(绑定死信交换机)@Bean("normalQueue")public Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();// 绑定死信交换机arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);// 设置死信路由键arguments.put("x-dead-letter-routing-key", "dlx");return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();}// 正常绑定@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange,@Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}
}
2.2.3 生产者
@RestController
@RequestMapping("/producer")
public class DelayProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/delay")public String delay() {// 发送带TTL的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 10s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000"); // 10s过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 20s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000"); // 20s过期return messagePostProcessor;});return "发送成功!";}
}
2.2.4 消费者
@Component
public class DelayConsumer {// 监听死信队列(即延迟队列)@RabbitListener(queues = Constant.DLX_QUEUE)public void listenerDLXQueue(Message message, Channel channel) throws Exception {System.out.printf("%tc 接收到延迟消息: %s%n", new Date(), new String(message.getBody(), "UTF-8"));channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}
}
2.3 TTL+死信队列的局限性
这种方法存在一个问题:RabbitMQ只会检查队列头部的消息是否过期,如果头部消息的TTL很长,即使后面的消息TTL较短,也会被阻塞,直到头部消息过期。这会导致消息的实际延迟时间可能比预期的要长。
3. 使用延迟插件实现延迟队列
3.1 安装延迟插件
-
下载插件:从RabbitMQ官方插件页面下载对应版本的插件
-
安装插件:
# 将插件文件放到RabbitMQ插件目录 rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 重启RabbitMQ服务 service rabbitmq-server restart
3.2 代码实现
3.2.1 常量定义
public class Constant {// 延迟交换机public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";// 延迟队列public static final String DELAYED_QUEUE = "delayed_queue";
}
3.2.2 配置类
@Configuration
public class DelayedConfig {// 延迟交换机(注意类型是x-delayed-message)@Bean("delayedExchange")public CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(Constant.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 延迟队列@Bean("delayedQueue")public Queue delayedQueue() {return QueueBuilder.durable(Constant.DELAYED_QUEUE).build();}// 延迟绑定@Bean("delayedBinding")public Binding delayedBinding(@Qualifier("delayedExchange") CustomExchange exchange,@Qualifier("delayedQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delayed").noargs();}
}
3.2.3 生产者
@RestController
@RequestMapping("/producer")
public class DelayedProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/delay2")public String delay2() {// 发送延迟消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed","delayed test 20s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay(20000); // 20s延迟return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed","delayed test 10s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay(10000); // 10s延迟return messagePostProcessor;});return "发送成功!";}
}
3.2.4 消费者
@Component
public class DelayedConsumer {// 监听延迟队列@RabbitListener(queues = Constant.DELAYED_QUEUE)public void listenerDelayedQueue(Message message, Channel channel) throws Exception {System.out.printf("%tc 接收到延迟消息: %s%n", new Date(), new String(message.getBody(), "UTF-8"));channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}
}
4. 两种实现方式的对比
特性 | TTL+死信队列 | 延迟插件 |
---|---|---|
消息顺序 | 可能乱序(头部阻塞问题) | 保证按延迟时间顺序 |
安装要求 | 无需额外安装 | 需要安装插件 |
灵活性 | 较低 | 较高 |
性能 | 较好 | 较好 |
适用场景 | 简单延迟需求 | 复杂延迟需求 |
5. 应用场景
5.1 订单超时取消
// 订单创建30分钟后未支付自动取消
public void createOrder(Order order) {// 创建订单逻辑...// 发送延迟消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "order.cancel",order.getId(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay(30 * 60 * 1000); // 30分钟return messagePostProcessor;});
}
5.2 定时提醒
// 会议开始前15分钟提醒参会人
public void scheduleMeeting(Meeting meeting) {// 安排会议逻辑...// 计算延迟时间long delay = meeting.getStartTime().getTime() - System.currentTimeMillis() - 15 * 60 * 1000;// 发送延迟消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "meeting.remind",meeting, messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay((int) delay);return messagePostProcessor;});
}
5.3 重试机制
// 失败操作延迟重试
public void handleFailedOperation(Operation operation) {// 计算重试延迟(指数退避)long delay = calculateRetryDelay(operation.getAttemptCount());// 发送延迟重试消息rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "operation.retry",operation, messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelay((int) delay);return messagePostProcessor;});
}
6. 注意事项
-
延迟精度:延迟队列不保证精确的时间延迟,可能会有几毫秒的误差
-
内存使用:大量延迟消息可能会占用较多内存,需要合理设置延迟时间
-
集群环境:在RabbitMQ集群中,延迟插件需要在所有节点上安装和启用
-
消息持久化:重要延迟消息应该设置为持久化,防止RabbitMQ重启导致消息丢失
-
监控告警:需要监控延迟队列的积压情况,设置合理的告警机制
7. 管理界面查看
使用延迟插件后,在RabbitMQ管理界面可以看到特殊的交换机类型:
-
Type:
x-delayed-message
-
Features:
D
(持久化)、DLX
(延迟交换机)
通过延迟队列,RabbitMQ能够处理各种需要定时或延迟执行的任务,大大增强了其应用场景和灵活性。根据具体需求选择合适的实现方式,可以构建出高效可靠的延迟任务处理系统。