当前位置: 首页 > news >正文

RabbitMQ学习(第三天)

文章目录

  • 延迟消息概述
  • 1、死信交换机
  • 2、延迟消息插件

今天主要学习Rabbit延迟消息相关的知识。

延迟消息概述

生产者发送一个消息之后,消费者不会立即收到消息,而是在执行时间才会收到消息。

延迟消息主要为了实现延迟任务,应用场景一般是秒杀情境下,有一个倒计时判断用户是否已经付款,配合定时任务来检查订单状态。

1、死信交换机

首先来看看死信的定义:

当一个队列中的消息满足下列情况之一,就会成为死信 (dead letter):

  • 消费者使用basic.reject或basic.nack声明消息失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息 (达到了队列或消息本身设定的过期时间),超过无人消费
  • 容量超的队列被积满了,最早的消息可能会成为死信

死信交换机,简单来说就是存放死信的交换机,通过dead-letter-exchange参数指定。

而我们通过这个方式,可以模拟出来延迟消息的效果:

在这里插入图片描述
图中,我们首先设定了消息过期时间为30s,如果过期了,就将消息放到dix.direct交换机,即死信交换机,随后将消息投给Consumer,这样就实现了延迟消息的效果,30s后才让消费者收到消息。

在这里插入图片描述
控制台点击这个即可设置死信队列,参数后面跟的是死信交换机的名称,比如dlx.direct

我们就照着图中实例来定义交换机和队列,演示一下.

消息接收者:

    @RabbitListener(queues = "dlx.queue")public void DLXQueueConsumer(String msg) {log.info("dlx.queue消费了消息[ " + msg + " ]");throw new RuntimeException("故意的");}

注意不要有监听simple.queue的监听器,不然的话会导致消息被消费,从而无法进行测试。

测试代码:

    @Testpublic void testTTLMessage() {Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setExpiration("10000").build();rabbitTemplate.convertAndSend("simple.direct","hi", message);log.info("消息发送成功");}

我们这里测试消息就设置成10秒后过期,具体时间可以自己调整。
发送方运行结果:

05-11 11:55:34:549  INFO 27744 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.138.133:5672]
05-11 11:55:34:593  INFO 27744 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#24361cfc:0/SimpleConnection@55e42449 [delegate=amqp://root@192.168.138.133:5672/, localPort= 61846]
05-11 11:55:34:608  INFO 27744 --- [           main] com.rabbitmq.publisher.publisher         : 消息发送成功

消费方运行结果:

05-11 11:55:44:619  INFO 22540 --- [ntContainer#0-1] com.rabbitmq.consumer.listener.Consumer  : dlx.queue消费了消息[ hello ]

从运行结果的时间来看,可以发现间隔正好差了10秒,实现了延迟消息。

2、延迟消息插件

RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息的交换机,当消息投递到该交换机后可以选择延迟一定时间,过期后再次投递到队列。

下面是插件安装流程:

在linux环境输入:

docker inspect mq

下图中的就是挂载目录:
在这里插入图片描述

插件下载流程:

①、首先输入网址:
https://www.rabbitmq.com/community-plugins.html

②、界面中点击对应插件的Release:
在这里插入图片描述

③、点击下面下载即可,选择想要的版本,这里下的是3.8.17
在这里插入图片描述

④、将插件放到指定插件目录下

cd进入该目录:

cd /var/lib/docker/volumes/mq-plugins/_data

将下载好的插件放入该目录

⑤、接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

⑥、重启镜像

docker restart mq

即可。

java代码中编写延迟消息接收案例:

消费者方代码:

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "delay.queue", durable = "true"),exchange = @Exchange(value = "delay.direct", delayed = "true"),key = "hello"))public void listenDelayQueue(String msg) {log.info("接收到delay.queue的消息: [ " + msg + " ]");}

发送者方代码:

@Testpublic void testDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "hello", "message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000);return message;}});log.info("消息发送成功");}

运行结果:

发送方:
05-11 13:21:42:518  INFO 32628 --- [           main] com.rabbitmq.publisher.publisher         : 消息发送成功接收方:
05-11 13:21:52:531  INFO 23616 --- [ntContainer#0-1] com.rabbitmq.consumer.listener.Consumer  : 接收到delay.queue的消息: [ "message" ]

可以看到发收消息间隔正好10秒。

相关文章:

  • redis数据结构-08(SINTER、SUNION、SDIFF、SISMEMBER)
  • Python-UV多环境管理
  • 作业...
  • CentOS7离线安装Mysql8
  • 菜鸟之路day31一一MySQL之多表设计
  • idea中的vcs不见了,如何解决
  • 部署Superset BI(五)连接oracle数据库失败
  • LlamaIndex 第七篇 结构化数据提取
  • 进程(沉淀中)
  • ElasticSearch进阶
  • 大数据模型的构建与优化
  • 找银子 题解(c++)
  • C++ 核心基础:数字、数组、字符串、指针与引用详解
  • springboot3+vue3融合项目实战-大事件文章管理系统-更新用户密码
  • TypeScript 装饰器详解
  • Kotlin Multiplatform--03:项目实战
  • 六大设计原则
  • 2025低空经济发展趋势
  • css背景相关
  • PyGame游戏开发(含源码+演示视频+开结题报告+设计文档)
  • 牛市早报|中方调整对美加征关税措施,五部门约谈外卖平台企业
  • 人民日报评外卖平台被约谈:合法规范经营,公平有序竞争
  • 王毅谈中拉论坛第四届部长级会议重要共识
  • 广东早熟荔枝“抢滩”上海,向长三角消费者喊话:包甜,管够
  • 学者的“好运气”:读本尼迪克特·安德森《椰壳碗外的人生》
  • 熊出没!我驻日本札幌总领馆提示中国公民注意人身安全