RabbitMQ全方位解析
1.MQ的概念是什么?
MQ(message queue),从字面意思上看就个 FIFO 先入先出的队列,只不过队列中存放的内容是 message 而已,它是一种具有接收数据、存储数据、发送数据等功能的技术服务。
这样就将发送发和接收方解耦了,发送发只需要发消息,而接受方法只需要取消息即可。
2.MQ的作用是什么?为什么要使用MQ
2.1高并发的流量削峰
举一个例子,某一个订单系统每秒最多只能处理10000个请求,那么在某一秒,如果说请求是40万,那么很显然此时超过了10000的部分就直接无法无法请求了,只能限制订单超过一万后不允许用户下单,为了解决这样的问题,使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
2.2应用解耦
如图,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。这样就实现了系统之间的解耦,不会出现一个系统出问题,其他系统直接崩溃的局面了。
2.3异步处理
使用消息队列,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。
2.4分布式事务
在分布式系统当中,一次请求可能会调用不同服务去操作不同的数据库,而和单体项不同的是,单体项目可以一次性对支付,修改等等操作一个方法完成,但是分布式是在不同的微服务当中的,而 MQ 能够很好的帮我们解决分布式事务的问题,有一个比较容易理解的方案,就是二次提交。基于MQ的特点,MQ作为二次提交的中间节点,负责存储请求数据,在失败的情况可以进行多次尝试,或者基于MQ中的队列数据进行回滚操作,是一个既能保证性能,又能保证业务一致性的方案,如下图所示
2.5数据分发
MQ 具有发布订阅机制,不仅仅是简单的上游和下游一对一的关系,还有支持一对多或者广播的模式,并且都可以根据规则选择分发的对象。这样一份上游数据,众多下游系统中,可以根据规则选择是否接收这些数据,能达到很高的拓展性。
3.常用的MQ及其优缺点对比
消息队列 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
RabbitMQ | - 支持多种消息协议(AMQP、MQTT、STOMP 等),兼容性强 - 提供丰富的路由策略(direct、topic、fanout 等),灵活度高 - 自带 Web 管理界面,便于监控和调试 - 支持消息确认(ack)、持久化、死信队列等高级特性 - 社区活跃,文档丰富 | - 吞吐量相对较低(万级 / 秒),不适合超大规模数据传输 - 底层基于 Erlang 语言,二次开发和问题排查门槛较高 - 集群扩展复杂度较高,需要依赖插件 | - 中小型系统的异步通信(如订单通知、日志收集) - 需要复杂路由逻辑的场景(如多系统消息分发) - 对消息可靠性要求高的场景(如金融交易通知) |
Kafka | - 吞吐量极高(十万级 / 秒),适合大数据场景 - 基于磁盘存储,消息持久化性能优异,支持海量数据存储 - 分布式架构设计,集群扩展简单,容错性强 - 支持消息回溯(通过 offset),适合日志采集等场景 - 与大数据生态(Spark、Flink)集成紧密 | - 消息可靠性默认配置较低(需手动优化) - 不支持复杂路由,仅支持简单的主题(Topic)订阅 - 消息延迟较高(毫秒级),不适合实时性要求极高的场景 - 运维复杂度较高,需要手动调优参数 | - 日志采集与传输(如 ELK 生态) - 大数据实时处理(如实时计算、数据同步) - 高并发场景下的削峰填谷(如秒杀系统的流量缓冲) |
RocketMQ | - 吞吐量高(十万级 / 秒),性能接近 Kafka - 支持事务消息、定时消息、重试机制等高级特性,可靠性强 - 基于 Java 开发,易于二次开发和集成(适合 Java 技术栈) - 集群部署简单,支持动态扩展 - 提供完善的监控和运维工具 | - 生态相对较窄,对非 Java 语言支持较弱 - 社区活跃度不及 RabbitMQ 和 Kafka - 部分高级特性(如事务消息)使用复杂度较高 | - 金融级业务(如支付、对账,依赖事务消息) - 分布式系统的异步通信(如微服务间调用) - 需要高吞吐量且兼顾可靠性的场景(如电商订单处理) |
ActiveMQ | - 支持多种协议(JMS、AMQP、STOMP 等),兼容性好 - 部署简单,适合快速上手 - 文档丰富,对新手友好 | - 吞吐量低(千级 / 秒),不适合高并发场景 - 社区活跃度下降,更新较慢 - 大规模集群下稳定性较差 | - 中小型系统的简单异步通信(如内部通知) - 对性能要求不高,需要快速集成的场景 - 传统企业级应用(如 ERP 系统消息传递) |
记住一句话:kafka的作用主要是记录大量的日志,RocketMQ是阿里研发的作用于高吞吐和可靠性高,RabbitMQ适用于可靠性但是吞吐不怎么高。
4.消息队列协议
消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire
、AMQP
、MQTT
、Kafka
、OpenMessage协议。
为什么不使用HTTP协议呢,因为HTTP协议请求报头文和响应报文头很复杂,包括了各种安全处理比如说cookie,而对应消息而言不需要那么复杂的,只需要他去传递信息即可。而且HTTP大部分都是短链接,一个请求到响应中断了,就不会进行持久化导致请求的丢失。
4.1常用消息中间件协议
4.1.1AMQP
协议(Advanced Message Queuing Protocol—高级消息队列协议)
特性:分布式事务支持、消息的持久化支持、高性能和高可靠的消息处理优势
AMQP典型的实现者是RabbitMQ
、ACTIVEMQ
等
4.1.2MQTT
协议(Message Queueing Telemetry Transport—消息队列遥测传输协议)
它是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
特点:轻量、结构简单、传输快、不支持事务、没有持久化设计
应用场景:适用于计算能力有限、低带宽、网络不稳定的场景
支持者:RabbitMQ
、ACTIVEMQ
(默认情况下关闭,需要打开)
4.1.3OpenMessage
协议
是近几年由阿里、雅虎和滴滴出行、 Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
特点:结构简单、解析速度快、支持事务和持久化设计
4.1.4.Kafka
协议
基于TCP/IP的二进制协议。消息内部是通过长度来分割,由些基本数据类型组成。
特点:结构简单、解析速度快、无事务支持、有持久化设计
5.消息队列持久化
持久化简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。.
常见支持的持久化方式对比:
除了ActiveMQ支持将数据持久化到数据库以外,其他的消息队列都不能将其存入数据库,只能存入到文件当中。
6.分发策略
这里就是RabbitMQ的核心分发策略,就是消息从生产者到消息队列再到消费者这整个过程当中消息的流向问题。后续会详细介绍这里了解一下即可。
分发策略(基于交换机类型) | 简单解释 | 核心特点 | 适用场景 |
---|---|---|---|
Direct(直接交换机) | 消息的 Routing Key 与队列绑定的 Routing Key 完全匹配时,消息才会被分发到该队列 | 精确匹配,一对一或多对一 | 单目标消息传递(如订单状态通知到特定处理队列) |
Topic(主题交换机) | 支持通配符匹配:* 匹配一个单词,# 匹配多个单词(含 0 个),按 Routing Key 模糊匹配分发 | 灵活的模糊匹配,支持多规则路由 | 多场景分类消息(如order.* 匹配所有订单相关消息) |
Fanout(扇形交换机) | 忽略 Routing Key,将消息广播到所有绑定的队列 | 无差别广播,一对多 | 消息需多消费者同时接收(如日志同步到多个存储系统) |
Headers(头交换机) | 不依赖 Routing Key,根据消息头(headers)中的键值对匹配分发,支持多种匹配规则(全匹配、任意匹配等) | 基于消息属性过滤,匹配维度更丰富 | 需复杂属性筛选的场景(如按消息优先级、来源系统过滤) |
7.RabbitMQ教程
7.1RabbitMQ的架构组成
接下来介绍mq当中核心内容,这里对于你使用mq有着至关重要的作用,请详细阅读:
Broker:这个就是RabbitMQ服务,消息在这里被存储和转发,连接消费者和生产者。
Virtual host:出于多用户的设计,每个用户在自己的host下,相对于其他用户自己的交换机和Queue是独立的不可见的。
Connection
:连接,生产者/消费者与 Broker 之间的 TCP 网络连接。
Channel:类似于线程池里面的线程,而Connection就是那个线程池,因为建立/销毁Connection的话开销比较大,所以说真正的作用于通信的是Channel,而这些Channel是彼此独立的,互相不影响。
Message
:消息,服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Exchange
:交换机,是 message 到达 broker 的第一站,用于根据分发规则、匹配查询表中的 routing key,分发消息到 queue 中去,不具备消息存储的功能。常用的类型有:direct、topic、fanout。
Bindings
:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。.
Routing key
:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
到这里就可以看出来,消息分配闭环了整个流程是这样的:
生产者通过Connection连接,开辟一个channel发送到指定的交换机当中,而这个交换机是提前配置好的:通过binding将交换机和队列连接起来,而这个bindings过程当中极其重要的一点就是会指定交换机和队列连接的一个
Routing key,该Routing key会被记录到绑定到的交换机上面从而下次消息来的时候就回去匹配这个Routing key看是否需要发到该队列当中。消费者通过Connection连接对应的队列去监听信息即可。
如下图消息流转的整个过程,而上述总结语言,已经将在"Broker"当中发生的事情描述清楚了。
7.2RabbitMQ消息模式
RabbitMQ提供6种模式,分别是 Hello World、Work Queues、Publish/Subscribe、Routing、Topics、RPC。本文详细讲述了前5种,并给出代码实现和思路。其中 Publish/Subscribe、Routing、Topics 三种模式可以统一归为 Exchange 模式,只是创建时交换机的类型不一样,分别是 fanout、direct、topic 三种交换机类型。
注意:简单模式和工作模式虽然途中没有画出交换机,但是都会有一个默认的交换机,类型为direct
7.2.1简单模式
一个生产者一个消费者一个队列,采用默认的交换机,可以理解为一个生产者P发送到消息队列Q,被一个消费者C接收。(采用默认的交换机direct)
7.2.2工作模式
在简单模式的基础上,消费者可以多个,即一个消息可以同时被多个消费者进行消费。
7.2.3发布/订阅模式
一个生产者、一个 fanout 类型的交换机、多个队列、多个消费者。一个生产者发送的消息会被多个消费者获取。其中 fanout 类型就是发布订阅模式,只有订阅该生产者的消费者会收到消息。只要和改交换机绑定了消息就会发送到对应的队列当中。
7.2.4路由模式
一个生产者,一个direct类型的交换机,多个队列,而交换机和队列之间通过routing-key的进行精确绑定(完全匹配),然后消息会携带routing-key,就会去转发到对应的队列当中了。
7.2.5主题模式(Topic)
一个生产者,一个direct类型的交换机,多个队列,而交换机和队列之间通过routing-key的进行精确绑定(可以用模糊匹配),然后消息会携带routing-key,就会去转发到对应的队列当中了。
*
(星号):严格匹配一个且仅一个单词(单词以.
为分隔符)。
#
(井号):匹配零个或多个单词(包括空值)
7.2.6参数模式
作用:可以携带参数,根据参数进行过滤
这里就在mq的图形化页面上演示一下topic模式,其他模式都是相同的道理
1.创建一个交换机类型为topic的交换机
2.创建两个queue和交换机进行绑定
3.在交换机页面创建消息并发送消息
接收结果:
7.3Spring Boot 整合RabbitMQ
现在以简单模式作为例子来梳理整合的过程。
1.创建maven工程并引入rabbitmq的依赖
我使用的jdk版本是1.8,spring boot的版本是2.6.13,核心的依赖就是下面这个:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.qingjin</groupId><artifactId>rabbit-mq</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbit-mq</name><description>rabbit-mq</description><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.qingjin.rabbitmq.RabbitMqApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>
2.编写mq的配置
3.编写配置类
该配置类的作用就是定义交换机和队列,以及二者之间绑定的关系包括routingkey的绑定。
4.编写消费者和生产者
生产者:
package com.qingjin.rabbitmq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("simple.queue", message);System.out.println("生产者:发送消息:" + message);}}
消费者:
package com.qingjin.rabbitmq;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class consumer {@RabbitListener(queues = "simple.queue") //通过这个注解实现监听某特定队列public void receive(String message) {System.out.println("接收到的消息:" + message);}
}
4.编写测试类进行测试
package com.qingjin.rabbitmq;import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class rabbitTest {@Resourceprivate producer producer;@Testpublic void testSendMessage() {producer.sendMessage("hello world");}
}
运行测试类后的结果就是:
此时再运行项目启动消费者,消费者就会立即去消费信息
此时mq的控制台的simple.queue的消息数就清空了
7.4RabbitMQ的回答机制
默认情况下,RabbitMQ 一旦向消费者发送了一条消息后,便立即将该消息标记为删除。由于消费者处理一个消息可能需要一段时间,假如在处理消息中途消费者挂掉了,我们会丢失其正在处理的消息以及后续发送给该消费这的消息。
为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答意思就是:消费者在接收消息并且处理完该消息之后,才告知 RabbitMQ 可以把该消息删除了。
RabbitMQ 中消息应答方式有两种:自动应答(默认)、手动应答
自动应答机制:
该机制是默认使用的机制,也就是发送了消息就代表了消费者消费成功,自动应答机制的弊端很明显了,如果说消息传递出去了但是消费者在消费过程当中挂掉了,或者是消息都还未到达消费者,connection挂掉了,那么消息不就丢失了吗,更有甚者就是如果说某一时刻消息数量特别多,消费者这边来不及处理,消息就会堆积占内存从而被操作系统杀死。
手动应答机制:
手动应答机制核心在于一个ack,采取手动应答的时候会存在一个ack的过程,如果说消费者由于某些原因失去了连接(比如说服务挂了,连接关闭了),导致没有接收到ack,此时就会将消息重新加入队列给另外一个消费者进行消费,如下: