RabbitMQ学习(第三天)
文章目录
- 延迟消息概述
- 1、死信交换机
- 2、延迟消息插件
今天主要学习Rabbit延迟消息相关的知识。
延迟消息概述
生产者发送一个消息之后,消费者不会立即收到消息,而是在执行时间才会收到消息。
延迟消息主要为了实现延迟任务,应用场景一般是秒杀情境下,有一个倒计时判断用户是否已经付款,配合定时任务来检查订单状态。
1、死信交换机
首先来看看死信的定义:
当一个队列中的消息满足下列情况之一,就会成为死信 (dead letter):
- 消费者使用basic.reject或basic.nack声明消息失败,并且消息的requeue参数设置为false
- 消息是一个过期消息 (达到了队列或消息本身设定的过期时间),超过无人消费
- 容量超的队列被积满了,最早的消息可能会成为死信
死信交换机,简单来说就是存放死信的交换机,通过dead-letter-exchange参数指定。
而我们通过这个方式,可以模拟出来延迟消息的效果:
图中,我们首先设定了消息过期时间为30s,如果过期了,就将消息放到dix.direct交换机,即死信交换机,随后将消息投给Consumer,这样就实现了延迟消息的效果,30s后才让消费者收到消息。
控制台点击这个即可设置死信队列,参数后面跟的是死信交换机的名称,比如dlx.direct
我们就照着图中实例来定义交换机和队列,演示一下.
消息接收者:
@RabbitListener(queues = "dlx.queue")public void DLXQueueConsumer(String msg) {log.info("dlx.queue消费了消息[ " + msg + " ]");throw new RuntimeException("故意的");}
注意不要有监听simple.queue的监听器,不然的话会导致消息被消费,从而无法进行测试。
测试代码:
@Testpublic void testTTLMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setExpiration("10000").build();rabbitTemplate.convertAndSend("simple.direct","hi", message);log.info("消息发送成功");}
我们这里测试消息就设置成10秒后过期,具体时间可以自己调整。
发送方运行结果:
05-11 11:55:34:549 INFO 27744 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.138.133:5672]
05-11 11:55:34:593 INFO 27744 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#24361cfc:0/SimpleConnection@55e42449 [delegate=amqp://root@192.168.138.133:5672/, localPort= 61846]
05-11 11:55:34:608 INFO 27744 --- [ main] com.rabbitmq.publisher.publisher : 消息发送成功
消费方运行结果:
05-11 11:55:44:619 INFO 22540 --- [ntContainer#0-1] com.rabbitmq.consumer.listener.Consumer : dlx.queue消费了消息[ hello ]
从运行结果的时间来看,可以发现间隔正好差了10秒,实现了延迟消息。
2、延迟消息插件
RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息的交换机,当消息投递到该交换机后可以选择延迟一定时间,过期后再次投递到队列。
下面是插件安装流程:
在linux环境输入:
docker inspect mq
下图中的就是挂载目录:
插件下载流程:
①、首先输入网址:
https://www.rabbitmq.com/community-plugins.html
②、界面中点击对应插件的Release:
③、点击下面下载即可,选择想要的版本,这里下的是3.8.17
④、将插件放到指定插件目录下
cd进入该目录:
cd /var/lib/docker/volumes/mq-plugins/_data
将下载好的插件放入该目录
⑤、接下来执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
⑥、重启镜像
docker restart mq
即可。
java代码中编写延迟消息接收案例:
消费者方代码:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "delay.queue", durable = "true"),exchange = @Exchange(value = "delay.direct", delayed = "true"),key = "hello"))public void listenDelayQueue(String msg) {log.info("接收到delay.queue的消息: [ " + msg + " ]");}
发送者方代码:
@Testpublic void testDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "hello", "message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000);return message;}});log.info("消息发送成功");}
运行结果:
发送方:
05-11 13:21:42:518 INFO 32628 --- [ main] com.rabbitmq.publisher.publisher : 消息发送成功接收方:
05-11 13:21:52:531 INFO 23616 --- [ntContainer#0-1] com.rabbitmq.consumer.listener.Consumer : 接收到delay.queue的消息: [ "message" ]
可以看到发收消息间隔正好10秒。