RabbitMQ核心机制——延迟队列
一、 什么是延迟队列?
消息发送之后,不想让消费者马上收到消息,而是等待特定时间后消费者才能拿到这条消息进行消费。
二、 如何实现延迟队列
RabbitMQ并没有直接支持延迟队列这一功能,如果需要实现延迟队列,有两种方法可以实现:
1> TTL + 死信队列:给普通队列或消息设置TTL,但没有消费者监听普通队列,消息过期后通过死信交换机路由到死信队列,死信队列的消费者获取消息,就达到了延迟的效果,如下图:
2> 插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 通过这个链接下载好插件,安装即可
下载好插件之后(注意插件的版本要与RabbitMQ版本对应),通过下列命令安装插件:
#进入下列目录,这是附加目录,如果没有就自己创建一个
cd /usr/lib/rabbitmq/plugins#查看插件列表
rabbitmq-plugins list#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange#重启服务
service rabbitmq-server restart
如果在管理界面的交换机——>新建交换机看到下图这个交换机,就代表安装好了:
准备工作完成,接下来看如何通过这两种方式实现延迟队列。
三、 基于 TTL + 死信队列 实现
准备工作:
(1)声明队列、交换机、及绑定关系
@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dl");}
(2)生产者代码
@RequestMapping("/delay")public String delay(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test...");System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}
(3)消费者代码
@Component
public class DelayListener {@RabbitListener(queues = Constants.DL_QUEUE)public void messageHandler(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[delay.queue] %tc 接收到消息: %s, deliveryTag: %d \n",new Date(),new String(message.getBody(),"UTF-8"),deliveryTag);//业务处理}
}
3.1 设置队列TTL + 死信队列
上面的代码就是设置 队列的TTL + 死信队列,这里直接测试:
结果预测:由于上面给队列设置的TTL为10s,因此发送消息10s后消息就因该被消费
可以看到,确实达到了延迟效果,消息发送后10消费者才接收到消息
3.2 设置消息TTL + 死性队列(不推荐)
前面学习死信队列时,我们知道,如果队列前面的消息比后面的消息过期时间长,那么后面的消息必须等待前面的消息被判定为过期才能继续判定后面的消息是否过期,如果使用 设置消息的TTL + 死信队列 来实现延迟队列是否会出现问题?不妨一试
一、修改normal队列声明,修改生产者代码
@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}
@RequestMapping("/delay")public String delay(){MessagePostProcessor messagePostProcessor1 = message -> {message.getMessageProperties().setExpiration("10000");return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setExpiration("30000");return message;};rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","delay test 10s...",messagePostProcessor1);System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}
二、运行程序,测试
预期结果:10s后收到第一条消息,再过20s手收到另一条消息
可以看到,两条消息都在30s后才被消费者接收,显然不符合期望
可以看到,通过设置 消息的TTL + 死信队列 来实现延迟效果是可能会出现问题的,在实际应用中,推荐使用 队列TTL + 死信队列 或 插件 来实现延迟队列,而不是 消息TTL + 死信队列 来实现。
四、 通过插件实现
通过插件实现延迟队列非常简单,只需要在声明交换机时通过delayed方法指定这是一个延迟交换机即可。
一、声明队列、交换机及绑定关系
@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();//通过delayed方法声明这是一个延迟交换机}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("delay");}
二、消费者代码
@Component public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void messageHandler(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[delay.queue] %tc 接收到消息: %s, deliveryTag: %d \n",new Date(),new String(message.getBody(),"UTF-8"),deliveryTag);//业务处理} }
三、生产者代码
@RequestMapping("/delay")public String delay(){MessagePostProcessor messagePostProcessor1 = message -> {message.getMessageProperties().setDelayLong(10000l);return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setDelayLong(30000l);return message;};rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","delay test 10s...",messagePostProcessor1);System.out.printf("%tc 消息发送成功 \n",new Date());return "消息发送成功";}
四、运行程序,测试
预期结果,10s后收到第一条消息,再过20s收到第二条消息
符合预期结果