延迟 队列
概念
延迟队列顾名思义就是消息不立即发送给消费者消费,而是延迟一段时间再交给消费者。
RabbitMQ本身没有直接支持延迟队列的的功能,但是可以通过前面所介绍的TTL+死信队列的方式组合
模拟出延迟队列的功能.
RabbitMQ 有些版本还支持延迟队列的插件安装,我们也可以通过安装这个插件实现延迟队列的功能。
TTL + 死信队列
实现思路:
假设一个应用中需要将每条消息都设置为10秒的延迟,生产者通过normal_exchange这个交换器将发送的消息存储在normal_queue这个队列中.消费者订阅的并非是normal_queue这个队列,而是dlx_queue这个队列.当消息从normal_queue这个队列中过期之后被存入dlx_queue这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。
代码演示:
常量设置:
//死信队列public static final String DL_QUEUE = "DL_QUEUE";public static final String DL_EXCHANGE = "DL_EXCHANGE";public static final String DL_KEY = "DL_KEY";//普通队列public static final String NORMAL_QUEUE = "NORMAL_QUEUE";public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static final String NORMAL_KEY = "NORMAL_KEY";
声明队列、交换机、绑定关系:
//普通队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(MQConstants.NORMAL_QUEUE).deadLetterExchange(MQConstants.DL_EXCHANGE).deadLetterRoutingKey(MQConstants.DL_KEY).build();}@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(MQConstants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.NORMAL_KEY).noargs();}//死信队列@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(MQConstants.DL_QUEUE).build();}@Bean("dlExchange")public Exchange dlExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") Exchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DL_KEY).noargs();}
生产者:将消息过期时间设置为 10 s
@RequestMapping("/dl")public String dl() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "dl" + i, message -> {message.getMessageProperties().setExpiration("10000");return message;});}return "消息发送成功";}
消费者需要消费的队列是死信队列:
@Component
@RabbitListener(queues = MQConstants.DL_QUEUE)
public class DLListener {@RabbitHandlerpublic void handle(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {channel.basicAck(deliveryTag, false);System.out.println("消息成功消费:" + messageContent);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}
存在的问题
当我们先发送一条延迟时间长的消息,然后再发送一条延迟时间短的消息,我们会发现,短的消息并没有被即使消费,而是等到长的消息时间一到,才被消费了
@RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "30s ",message -> {message.getMessageProperties().setExpiration("30000");return message;});rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "10s ",message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息发送成功";}
原因如下:
消息过期之后,不一定会被马上丢弃,因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列,此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个
消息并不会优先得到执行。
所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
延迟队列的插件
安装
官方文档:Scheduling Messages with RabbitMQ
下载链接:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载的插件需要存放到哪个目录:https://www.rabbitmq.com/docs/installing-plugins
根据你不同的环境去选择不同的目录:
Linux命令:
#查看插件列表
rabbitmq-plugins list#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange#重启服务
service rabbitmq-server restart
我们去到 rabbitmq 管理界面查看 exchange 有没有延迟类型 “x-delayed-messge” ,如果存在这一类型说明我们的插件安装成功了
代码演示
常量类:
//延迟队列public static final String DELAY_QUEUE = "DELAY_QUEUE";public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";public static final String DELAY_KEY = "DELAY_KEY";
声明:
//延迟队列@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(MQConstants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DELAY_KEY).noargs();}
生产者:这里我们发送三条不同过期时间的消息来进行演示:
通过setDelayLong() 方法设置延迟时间
@RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "30s ",message -> {message.getMessageProperties().setDelayLong(30000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "10s ",message -> {message.getMessageProperties().setDelayLong(10000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "40s ", message -> {message.getMessageProperties().setDelayLong(40000L);return message;});return "消息发送成功";}
这里我们将确认模式设置为自动模式,不进行手动确认,便于我们书写代码:
@Component
@RabbitListener(queues = MQConstants.DELAY_QUEUE)
public class DelayListener {@RabbitHandlerpublic void handle(String message) {System.out.printf("%tc 接收到的消息为:%s\n", new Date(), message);}
}
最终效果:
总结
1.基于死信实现的延迟队列
a优点:1)灵活不需要额外的插件支持
b.缺点: 1) 存在消息顺序问题 2)需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性
2.基于插件实现的延迟队列
a.优点:1)通过插件可以直接创建延迟队列,简化延迟消息的实现. 2)避免了DLX的时序问题
b.缺点:1)需要依赖特定的插件,有运维工作2)只适用特定版本