【从零开始学习RabbitMQ】
这个课程是基于黑马商城的业务场景讲的,不用担心要不要补上黑马商城,技术是一定的,我们本来就是学习 MQ 的使用。
初识 MQ
同步调用
- 问题 1:拓展性差
因为代码的耦合度高,每次有新的需求都要大动干戈,修改原代码。这是不符合开闭原则的(对拓展开放,对修改关闭)。
简单理解就是尽量通过更新代码来拓展功能,而不是修改已有代码。
- 问题 2:性能下降
因为这些业务是同步执行,在上一个业务没完成前是不会进行执行下一个业务的,这样极大的浪费 cpu 性能,因为一直在阻塞。这里我想到了 Redis 中的 IO 多路复用就是为解决这样的问题。(面试重点)
- 问题 3:级联失败
一个业务失败导致其他业务也失败。
异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者: 投递消息的人,就是原来的调用方
- 消息代理: 管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者: 接收和处理消息的人,就是原来的服务提供方
就想外卖员、外卖柜与消费者的关系,外卖员送到了就放外卖柜里,然后就去忙别的了,消费者什么时候拿是他的事,这样就非常高效了。
这里让我想到了一句话,没有什么是加一层中间件解决不了的,如果有,那就再加一层。看来也不无道理。
异步调用优势:
- 解除耦合,拓展性强
- 无需等待,性能好
- 故障隔离
- 缓存消息,流量削峰填谷
异步调用的问题是什么?
- 不能立即得到调用结果,时效性差
- 不确定下游业务执行是否成功
- 业务安全依赖于Broker(代理)的可靠性
MQ 技术选型
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。
常见消息队列的比较
MQ 入门
RabbitMQ 的整体架构以及核心概念:
- virtual-host:虚拟主机,数据隔离
- publisher: 消息发送者
- consumer: 消息的消费者
- queue: 队列,存储消息
- exchange: 交换机,负责路由消息
对应多个 VirtualHost,起到隔离的作用。
数据隔离
由于虚拟主机之间数据是隔离的,也就是即使是超级管理员也无法操作不属于自己的 virtual-host 里的功能。
可以创建自己专属的虚拟主机,比如
可以查看各个虚拟主机里的交换机,于是 hmall 用户也有了自己的数据隔离区。
Java 客户端-SpringAMQP
快速入门
需求如下:
- 利用控制台创建队列simple.queue
- 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
- 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
SpringAMQP如何收发消息?
① 引入spring-boot-starter-amqp依赖
② 配置rabbitmq服务端信息
③ 利用RabbitTemplate发送消息
④ 利用@RabbitListener注解声明要监听的队列,监听消息
编写测试类:
生产者:
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessage2Queue(){String queueName = "simple.queue";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}
}
消费者:
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessage2Queue(){String queueName = "simple.queue";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}
}
work 模型
- 在RabbitMQ的控制台创建一个队列, 名为work.queue
- 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
- 在consumer服务中定义两个消息监听者,都监听work.queue队列
- 消费者1每秒处理50条消息,消费者2每秒处理5条消息
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid TestSendMessage2Queue(){String queueName = "simple.queue";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}@Testvoid testWorkQueue() throws InterruptedException {String queueName = "work.queue";for(int i = 0; i < 50; i++){String msg = "hello, work, message_" + i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}
}
@Slf4j
@Component
public class MqListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者收到了simple.queue的消息:【" + msg + "】");}@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg){System.out.println("消费者1收到了work.queue的消息:【" + msg + "】" );}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg){System.err.println("消费者2收到了work.queue的消息:【" + msg + "】" );}
}
消费者消息推送限制
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yml,设置prefetch值为1,确保同一时刻最多投递给消费者1条消息:
Work模型的使用:
- 多个消费者绑定到一个队列, 可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量, 处理完一条再处理下一条, 实现能者多劳
Fanout 交换机
真正生产环境都会经过exchange来发送消息, 而不是直接发送到队列, 交换机的类型有以下三种:
- Fanout: 广播
- Direct: 定向
- Topic: 话题
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue, 所以也叫广播模式
利用SpringAMQP演示FanoutExchange的使用
- 在RabbitMQ控制台中, 声明队列fanout.queue1和fanout.queue2
- 在RabbitMQ控制台中, 声明交换机hmall.fanout, 将两个队列与其绑定
- 在consumer服务中,编写两个消费者方法, 分别监听fanout.queue1和fanout.queue2
- 在publisher中编写测试方法, 向hmall.fanout发送消息
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- FanoutExchange的会将消息路由到每个绑定的队列