当前位置: 首页 > news >正文

【从零开始学习RabbitMQ】

这个课程是基于黑马商城的业务场景讲的,不用担心要不要补上黑马商城,技术是一定的,我们本来就是学习 MQ 的使用。

初识 MQ

同步调用

  • 问题 1:拓展性差

因为代码的耦合度高,每次有新的需求都要大动干戈,修改原代码。这是不符合开闭原则的(对拓展开放,对修改关闭)。

简单理解就是尽量通过更新代码来拓展功能,而不是修改已有代码。

  • 问题 2:性能下降

因为这些业务是同步执行,在上一个业务没完成前是不会进行执行下一个业务的,这样极大的浪费 cpu 性能,因为一直在阻塞。这里我想到了 Redis 中的 IO 多路复用就是为解决这样的问题。(面试重点)

  • 问题 3:级联失败

一个业务失败导致其他业务也失败。

异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者: 投递消息的人,就是原来的调用方
  • 消息代理: 管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者: 接收和处理消息的人,就是原来的服务提供方

就想外卖员、外卖柜与消费者的关系,外卖员送到了就放外卖柜里,然后就去忙别的了,消费者什么时候拿是他的事,这样就非常高效了。

这里让我想到了一句话,没有什么是加一层中间件解决不了的,如果有,那就再加一层。看来也不无道理。

异步调用优势:

  • 解除耦合,拓展性强
  • 无需等待,性能好
  • 故障隔离
  • 缓存消息,流量削峰填谷

异步调用的问题是什么?

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖于Broker(代理)的可靠性

MQ 技术选型

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。

常见消息队列的比较

MQ 入门

RabbitMQ 的整体架构以及核心概念:

  • virtual-host:虚拟主机,数据隔离
  • publisher: 消息发送者
  • consumer: 消息的消费者
  • queue: 队列,存储消息
  • exchange: 交换机,负责路由消息

对应多个 VirtualHost,起到隔离的作用。

数据隔离

由于虚拟主机之间数据是隔离的,也就是即使是超级管理员也无法操作不属于自己的 virtual-host 里的功能。

可以创建自己专属的虚拟主机,比如

可以查看各个虚拟主机里的交换机,于是 hmall 用户也有了自己的数据隔离区。

Java 客户端-SpringAMQP

快速入门

需求如下:

  • 利用控制台创建队列simple.queue
  • 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
  • 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

SpringAMQP如何收发消息?

① 引入spring-boot-starter-amqp依赖

② 配置rabbitmq服务端信息

③ 利用RabbitTemplate发送消息

④ 利用@RabbitListener注解声明要监听的队列,监听消息

编写测试类:

生产者:

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessage2Queue(){String queueName = "simple.queue";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}
}

消费者:

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessage2Queue(){String queueName = "simple.queue";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}
}

work 模型

  1. 在RabbitMQ的控制台创建一个队列, 名为work.queue
  2. 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  3. 在consumer服务中定义两个消息监听者,都监听work.queue队列
  4. 消费者1每秒处理50条消息,消费者2每秒处理5条消息
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessage2Queue(){String queueName = "simple.queue";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}@Testvoid testWorkQueue() throws InterruptedException {String queueName = "work.queue";for(int i = 0; i < 50; i++){String msg = "hello, work, message_" + i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}
}
@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者收到了simple.queue的消息:【" + msg + "】");}@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg){System.out.println("消费者1收到了work.queue的消息:【" + msg + "】" );}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg){System.err.println("消费者2收到了work.queue的消息:【" + msg + "】" );}
}
消费者消息推送限制

默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置prefetch值为1,确保同一时刻最多投递给消费者1条消息:

Work模型的使用:

  • 多个消费者绑定到一个队列, 可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量, 处理完一条再处理下一条, 实现能者多劳

Fanout 交换机

真正生产环境都会经过exchange来发送消息, 而不是直接发送到队列, 交换机的类型有以下三种:

  • Fanout: 广播
  • Direct: 定向
  • Topic: 话题

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue, 所以也叫广播模式

利用SpringAMQP演示FanoutExchange的使用
  1. 在RabbitMQ控制台中, 声明队列fanout.queue1和fanout.queue2

  1. 在RabbitMQ控制台中, 声明交换机hmall.fanout, 将两个队列与其绑定
  2. 在consumer服务中,编写两个消费者方法, 分别监听fanout.queue1和fanout.queue2
  3. 在publisher中编写测试方法, 向hmall.fanout发送消息
交换机的作用是什么?
  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • FanoutExchange的会将消息路由到每个绑定的队列
http://www.dtcms.com/a/418835.html

相关文章:

  • Kafka08-优化-尚硅谷
  • 小杰深度学习(two)——全连接与链式求导
  • vue警告:Extraneous non-props attributes (class) were passed to component
  • 记录第一次搭建ELK+filebeat环境
  • 【复习】计网每日一题--多播
  • 狮山网站开发wordpress轩小程序
  • Ubuntu22.04——配置固定IP
  • 记Bugku CTF平台解题过程
  • OceanBase主备库日志传输服务
  • React-props的children属性
  • 济宁做网站的公司邯郸公司网站建设
  • 特别分享:关于Pipeline
  • 速通ACM省铜第十七天 赋源码(Racing)
  • ARM(IMX6ULL)——通信(IIC/I2C)
  • 零基础学AI大模型之LangChain-PromptTemplate
  • FFT去除规律条纹
  • JAVA中的权限修饰符
  • 前端面试十四之webpack和vite有什么区别
  • 小米路由器 做网站银川森林半岛
  • Kafka04-知识速记
  • 【Linux】高级I/O
  • 开源的容器化平台:Docker高级应用与实战案例
  • 3.7 广域网 (答案见原书 P116)
  • 临淄网站制作首选公司seo排名需要多少钱
  • k8s-部署单master节点
  • Python 2025:量子计算编程的新前沿
  • 二级学院网站建设自评报告互联网营销公司有哪些
  • 做网站滨州现在写博客还是做网站
  • 基于 Service Worker 的图书馆资源缓存技术研究
  • php网站后台验证码不显示哈尔滨教育云平台网站建设