当前位置: 首页 > news >正文

RabbitMQ第二章(RocketMQ的五大工作模式)

文章目录

  • 一、simple模式(即最简单的收发模式)
  • 二、work工作模式(资源的竞争)
  • 三、Exchange发布订阅(交换机)
    • 3.1 Fanout交换机
    • 3.2 Direct交换机
    • 3.3 Topic交换机

一、simple模式(即最简单的收发模式)

1.消息产生消息,将消息放入队列
2.消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删
除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的
ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)
在这里插入图片描述
实现步骤如下:
(1),引入相关的依赖

<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>

(2),新建一个队列:simple.queue
在这里插入图片描述
(3),yml添加配置

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

(4),然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}

(5),消息接收

@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

二、work工作模式(资源的竞争)

1.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1
C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息
被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
在这里插入图片描述

(1),我们修改consumer服务的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

(2),在publisher服务中的SpringAmqpTest类中添加一个测试方法:

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

(3),要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

三、Exchange发布订阅(交换机)

在这里插入图片描述
1、每个消费者监听自己的队列;
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队
列都将接收到消息。

3.1 Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
在这里插入图片描述

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

(1),声明队列和交换机
在这里插入图片描述
(2),消息发送
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = “hmall.fanout”;
// 消息
String message = “hello, everyone!”;
rabbitTemplate.convertAndSend(exchangeName, “”, message);
}
(3),消息接收
@RabbitListener(queues = “fanout.queue1”)
public void listenFanoutQueue1(String msg) {
System.out.println(“消费者1接收到Fanout消息:【” + msg + “】”);
}

@RabbitListener(queues = “fanout.queue2”)
public void listenFanoutQueue2(String msg) {
System.out.println(“消费者2接收到Fanout消息:【” + msg + “】”);
}

3.2 Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
[图片]

(1),声明队列和交换机

  1. 声明一个名为hmall.direct的交换机
  2. 声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
  3. 声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
  4. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  5. 在publisher中编写测试方法,向hmall.direct发送消息
    (2),消息接收
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

(3),消息发送

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "hmall.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

3.3 Topic交换机

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
在这里插入图片描述
(1),声明队列和交换机

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news
      (2),消息发送
/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "hmall.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

(3),消息接收

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
http://www.dtcms.com/a/270450.html

相关文章:

  • 【Linux服务器】-安装ftp与sftp服务
  • 数据结构:数组:合并数组(Merging Arrays)
  • 20 道 Node.js 高频面试题
  • Codeforces Round 868 (Div. 2) D. Unique Palindromes(1900,构造)
  • 深入企业内部的MCP知识(四):FastMCP装饰器与类方法:正确结合面向对象与MCP组件的实践指南
  • 4.权重衰减(weight decay)
  • MySQL-索引
  • SQL135 每个6/7级用户活跃情况
  • ${project.basedir}延申出来的Maven内置的一些常用属性
  • Python入门Day5
  • 嵌入式面试八股文100题(二)
  • 分库分表之实战-sharding-JDBC水平分库+水平分表配置实战
  • 【深度学习入门 鱼书学习笔记(1)感知机】
  • 7月8日学习笔记——统计决策方法
  • 基于springboot的物流配货系统
  • Nuxt.js 静态生成中的跨域问题解决方案
  • C++学习笔记之数组、指针和字符串
  • 【PyTorch】PyTorch中torch.nn模块的激活函数
  • 项目Win系统下可正常获取Header字段,但是到了linux、docker部署后无法获取
  • python基础day08
  • linux wsl2 docker 镜像复用快速方法
  • 【读代码】GLM-4.1V-Thinking:开源多模态推理模型的创新实践
  • 基于模板设计模式开发优惠券推送功能以及对过期优惠卷进行定时清理
  • C++ 遍历可变参数的几种方法
  • 数据库表设计:图片存储与自定义数据类型的实战指南
  • C语言宏替换比较练习
  • 暑假算法日记第四天
  • 5.6.2、ZeroMQ源码分析
  • 利用AI Agent实现精准的数据分析
  • ARM环境openEuler2203sp4上部署19c单机问题-持续更新