01-RabbitMQ消息队列
文章目录
- 一、消息中间件
- 1.1 简介
- 1.2 消息队列常见使用场景
- 1.2.1 异步处理
- 1.2.2 应用解耦
- 1.2.3 流量削峰、削峰填谷
- 三、消息中间件的两种工作模式
- 3.1 `P2P`模式 (点对点模式)
- 3.1.1 核心特点
- 3.1.2 适用场景
- 3.2 `Pub/Sub`模式(发布/订阅)
- 3.2.1 核心特点
- 3.2.2 适用场景
- 四、常用中间件介绍与对比
- 4.1 `ActiveMQ`
- 4.2 `Kafka`
- 4.3 `RabbitMQ`
- 4.4 `RocketMQ`
- 五、`RabbitMQ`解决方案
- 5.1 `RabbitMQ`简介
- 5.2 `RabbitMQ`架构方案
- 5.3 `RabbitMQ`基本概念
- 5.4 `RabbitMQ`集群中节点划分
- 5.4.1 `Disk Node`(磁盘节点)
- 5.4.2 `RAM Node`(内存节点)
- 5.5 普通集群准备环境
- 5.5.1 三台主机基础环境配置
- 5.5.2 三个节点安装 `RabbitMQ`
- 5.5.3 `rabbitmq-1` 节点创建用户
- 5.5.4 三台主机开启用户远程登录
- 5.5.5 端口说明
- 5.6 部署集群前初始化操作
- 5.6.1 三台主机创建数据存放目录和日志目录
- 5.6.2 `rabbitmq-1`拷贝`erlang.cookie`至其他两台主机
- 5.6.3 另外两台`RabbitMQ`主机作为内存节点加到`rabbitmq-1`节点集群
- 5.6.4 查看集群状态
- 5.6.5 登录`RabbitMQ`管理控制台,创建新的队列
- 5.7 `RabbitMQ`普通集群和镜像集群区别
- 5.8 `RabbitMQ`镜像集群配置
一、消息中间件
1.1 简介
消息队列(Message Queue,简称MQ)指保存消息的一个容器,其实本质就是一个保存数据的队列。
消息中间件是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的构建。
消息中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性的系统架构。使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ等。
1.2 消息队列常见使用场景
1.2.1 异步处理
异步处理,将一些非核心的业务流程以异步并行的方式执行,从而减少请求响应时间,提高系统吞吐量。

以下单为例,用户下单后需要生成订单、赠送活动积分、赠送红包、发送下单成功通知等一系列业务处理。假设三个业务节点每个使用100毫秒钟,不考虑网络等其他开销,则串行方式的时间是400毫秒,并行的时间只需要200毫秒。这样就大大提高了系统的吞吐量。
1.2.2 应用解耦
应用解耦,顾名思义就是解除应用系统之间的耦合依赖。通过消息队列,使得每个应用系统不必受其他系统影响,可以更独立自主。

以电商系统为例,用户下单后,订单系统需要通知积分系统。一般的做法是:订单系统直接调用积分系统的接口。这就使得应用系统间的耦合特别紧密。如果积分系统无法访问,则积分处理失败,从而导致订单失败。加入消息队列之后,用户下单后,订单系统完成下单业务后,将消息写入消息队列,返回用户订单下单成功。积分系统通过订阅下单消息的方式获取下单通知消息,从而进行积分操作。实现订单系统与库存系统的应用解耦。如果,在下单时积分系统系统异常,也不影响用户正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作。
1.2.3 流量削峰、削峰填谷
流量削峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。

以秒杀活动为例,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列,秒杀业务处理系统根据消息队列中的请求信息,再做后续处理。服务器接收到用户的请求后,首先写入消息队列,秒杀业务处理系统根据消息队列中的请求信息,做后续业务处理。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
三、消息中间件的两种工作模式
3.1 P2P模式 (点对点模式)
3.1.1 核心特点
- 单消费者消费:每个消息仅被一个消费者处理,消息消费后从队列移除;
- 负载均衡:多个消费者共享同一队列,通过轮询或竞争机制分配任务,提升系统并行处理能力;
- 异步解耦:生产者与消费者无需同时在线,系统间通过队列解耦。
3.1.2 适用场景
- 任务分发:如邮件批量发送、订单处理,通过多消费者并行处理加速任务执行;
- 高可靠性场景:金融交易扣款、库存扣减,需确保每个消息仅处理一次。
假设有一个邮件发送系统,当用户发起邮件群发请求时,邮件服务需要将大量邮件发送给不同的收件人。为了提高处理速度,邮件发送系统可以将每个邮件的发送任务放入消息队列中,多个消费者(邮件发送模块)并行地从队列中取出任务并处理,从而加快整个邮件群发的速度。
3.2 Pub/Sub模式(发布/订阅)
3.2.1 核心特点
- 多消费者订阅:同一消息可被多个订阅者接收,实现消息广播;
- 主题(
Topic)驱动:消息通过主题分类,消费者按需订阅特定主题。
3.2.2 适用场景
- 广播:当需要向多个消费者同时发送消息时,发布-订阅模型非常有用。例如,实时消息推送、新闻订阅等。
假设有一个电商系统,当新商品上线时,系统需要将商品信息推送给所有消费者。使用发布-订阅模型,系统将商品信息发布到消息队列中的“商品发布”主题,所有订阅了该主题的消费者(比如推荐引擎、库存系统、客户通知服务等)都会收到商品信息,进行相应的处理。
四、常用中间件介绍与对比
4.1 ActiveMQ

官网:https://activemq.apache.org/
- 多协议与多语言支持:适应不同系统的通信需求;
- 高可用性与扩展性:支持主从集群和网络代理模式,通过故障转移实现高可用性;
- 轻量级与易部署:无需复杂依赖,单机部署快速;支持嵌入到Java应用中,适合轻量级架构。
4.2 Kafka

官网:https://kafka.apache.org/
- 高吞吐与低延迟:采用顺序磁盘写入、零拷贝技术和批量消息处理机制,单机每秒可处理百万级消息;
- 持久化与容灾能力:通过多副本和分区设计,数据持久化到磁盘并自动同步备份,即使节点故障也能快速恢复;
- 分布式扩展性:支持动态添加
Broker节点和分区扩容,提升集群处理能力。
4.3 RabbitMQ

官网:https://www.rabbitmq.com/
- 高可靠性:消息、队列和交换器均可持久化到磁盘;
- 确认机制:生产者确认和消费者手动确认确保消息不丢失;
- 死信队列:处理因超时、拒绝或队列满而无法消费的消息;
- 灵活路由:通过交换器类型和路由键实现复杂分发策略,如按地域、业务类型分发消息;
- 优先级队列:支持为消息设置优先级(如会员用户优先处理);
4.4 RocketMQ

官网:https://rocketmq.apache.org/
- 高吞吐与低延迟:通过顺序磁盘写入、零拷贝技术和批量消息处理,单机每秒可处理数十万条消息,延迟稳定在毫秒级;
- 消息可靠性:消息持久化存储至磁盘,支持主从同步复制,确保数据零丢失;
- 顺序消息与延迟消息:队列内消息严格按写入顺序消费,适用于支付状态变更等场景;
五、RabbitMQ解决方案
5.1 RabbitMQ简介
一种开源的消息队列中间件,基于 AMQP(高级消息队列协议)构建,主要用于异步通信、系统解耦、削峰填谷等高并发场景。
它像一个“快递中转站”——你把包裹(消息)扔进去,不需要等收件人(消费者)签收就能继续干别的。等对方有空了,再慢慢取件处理,完全解耦!在高并发、高可用的现代系统架构中,RabbitMQ 就像城市交通的“红绿灯+缓冲带”,能有效地管控流量、解耦系统、提升稳定性。
5.2 RabbitMQ架构方案
- 单机模式。
- 普通模式(默认的集群模式)。
- 镜像模式(把需要的队列做成镜像队列,存在于多个节点,属于
RabbitMQ的HA方案,在对业务可靠性要求较高的场合中比较适合)。要实现镜像模式,需要先搭建出普通集群模式,在这个模式的基础上再配置镜像模式以实现高可用。
5.3 RabbitMQ基本概念
一个Rabbitmq集群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制。
Broker:RabbitMQ服务器本身就是一个消息代理(Broker)。负责接收、存储和转发消息到合适的目的地。ConnectionFactory(连接管理器):应用程序与RabbitMQ之间建立连接的管理器,程序代码中使用;如主机名、端口、用户名、密码等。Exchange(交换器):交换器(Exchange)是消息路由的核心组件。生产者发送的消息不会直接传递到队列,而是先发送到交换器,然后由交换器根据特定的规则将消息路由到一个或多个队列中。Routing Key: 当生产者发送消息到交换机(Exchange)时,会指定一个路由键。交换机会根据这个键和绑定规则将消息分发到相应的队列。Queue(队列):存储消息的地方,消费者从队列中获取并处理消息。队列可以配置为持久化,以确保消息在服务器重启后仍然存在。Bindding(绑定):将交换机和队列连接起来,定义了交换机如何根据路由键将消息路由到队列。vhost(虚拟主机):用于多租户和权限分离的机制。一个Broker可以有多个Vhost,每个Vhost可以有独立的交换机、队列、绑定和权限配置。这样可以隔离不同的应用或租户的数据和配置。producer(生产者):生产者是创建并发送消息到RabbitMQ的应用程序或服务。生产者将消息发送到交换机,并指定一个路由键。consumer(消费者):消费者是从RabbitMQ队列中接收并处理消息的应用程序或服务。消费者可以订阅一个或多个队列,并根据需要处理消息。channel(信道):消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。消息推送使用的通道。

- 生产者发送消息:生产者创建消息,并将其发送到
RabbitMQ的某个交换机中。消息可以携带路由键,交换机会根据这个键和绑定规则将消息路由到相应的队列。 - 交换机路由消息:交换机接收消息后,根据类型和绑定规则将消息路由到一个或多个队列中。如果没有找到匹配的队列,交换机会丢弃消息,或者根据配置返回给生产者。
- 消息进入队列:一旦消息被路由到队列中,消息就会暂时存储在队列中,等待消费者处理。队列可以配置为持久化,以确保即使
RabbitMQ服务崩溃,消息也不会丢失。 - 消费者接收消息:消费者从队列中取走消息并进行处理。消息的分发可以通过轮询分配给多个消费者,或者消费者可以从多个队列中获取消息进行处理。
- 消息确认:消费者处理完消息后,会向
RabbitMQ发送一个确认消息(ACK),告知消息已被成功处理。如果消费者在处理过程中失败,RabbitMQ可以重新将消息投递给其他消费者或重新进入队列,这取决于队列的配置。
5.4 RabbitMQ集群中节点划分
5.4.1 Disk Node(磁盘节点)
消息和元数据(如队列定义、交换器定义、绑定关系等)会被持久化到磁盘。如果集群中的所有磁盘节点都宕机,整个集群将不可用。RabbitMQ 集群中需要至少一个磁盘节点。
5.4.2 RAM Node(内存节点)
消息和元数据只存储在内存中,速度更快,但数据在节点宕机后会丢失。RAM 节点主要用来提升性能,通常用作从节点或缓存节点。
注意事项:
RAM节点和磁盘节点可以混合部署,但至少需要一个磁盘节点作为元数据的持久化存储。
5.5 普通集群准备环境
Rabbitmq官方最新rpm包下载地址:https://www.rabbitmq.com/install-rpm.html#downloads
# rabbitmq 和 erlang兼容版本
https://www.rabbitmq.com/which-erlang.html
# erlang 版本选择
https://packagecloud.io/rabbitmq/erlang
# rabbitmq 版本选择
https://www.rabbitmq.com/news.html
5.5.1 三台主机基础环境配置
# Step 1:三台主机修改主机名(强制)
$ hostnamectl set-hostname rabbitmq-1
$ hostnamectl set-hostname rabbitmq-2
$ hostnamectl set-hostname rabbitmq-3# Step 2:三台主机本地解析(强制)
$ vim /etc/hosts
192.168.50.138 rabbitmq-1
192.168.50.139 rabbitmq-2
192.168.50.140 rabbitmq-3# Step 3:三台主机永久关闭防火墙
$ systemctl disable --now firewalld# Step 4:三台主机关闭SELinux
$ setenforce 0# Step 5:三台主机删除默认安装源
$ rm -rf /etc/yum.repos.d/*# Step 6:三台主机安装阿里云安装源
$ curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo# Step 7:三台主机安装阿里云epel源
$ curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
5.5.2 三个节点安装 RabbitMQ
# Step 1:三台主机安装依赖
$ yum install -y *epel* gcc-c++ unixODBC unixODBC-devel openssl-devel ncurses-devel# Step 2:三台主机安装erlang环境
$ wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-20.3-1.el7.centos.x86_64.rpm/download.rpm
$ yum install erlang-20.3-1.el7.centos.x86_64.rpm -y# Step 3:三台主机测试erlang
$ erl
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]Eshell V9.3 (abort with ^G)
1>
# 两次Crtl+c退出# Step 4:三台主机安装rabbitmq
$ wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.5/rabbitmq-server-3.7.5-1.el7.noarch.rpm
$ yum install rabbitmq-server-3.7.5-1.el7.noarch.rpm -y# Step 5:三台主机启动rabbitma服务
$ systemctl daemon-reload
$ systemctl enable --now rabbitmq-server
$ systemctl status rabbitmq-server# Step 6:三台主机开启rabbitmq的web访问界面(默认RabbitMQ web界面关闭)
$ rabbitmq-plugins enable rabbitmq_management

# Step 7:三台主机查看rabbitmq web端口是否放开
$ ss -tunlp | grep -w 15672

5.5.3 rabbitmq-1 节点创建用户
# Step 1:rabbitmq-1添加用户和密码
$ rabbitmqctl add_user soho soso
Adding user "soho" ...# soho:用户名# soso:密码# Step 2:rabbitmq-1设置为管理员
$ rabbitmqctl set_user_tags soho administrator
Setting tags for user "soho" to [administrator] ...# Step 3:rabbitmq-1查看所有用户
$ rabbitmqctl list_users
Listing users ...
soho [administrator]
guest [administrator]# Step 4:rabbitmq-1新建用户设置权限
$ rabbitmqctl set_permissions -p "/" soho ".*" ".*" ".*"
Setting permissions for user "soho" in vhost "/" ...
此处设置权限时注意
'.*'之间需要有空格 三个'.*'分别代表了加载conf权限,read权限与write权限 例如:当没有给soho设置这三个权限前是没有权限查询队列,在ui界面也看不见
5.5.4 三台主机开启用户远程登录
# Step 1:三台主机切换目录
$ cd /etc/rabbitmq/ # Step 2:三台主机拷贝默认配置文件至工作目录
$ cp /usr/share/doc/rabbitmq-server-3.7.5/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config# Step 3:三台主机修改配置文件
$ vim rabbitmq.config
# 修改如下:

# Step 3:三台机器都操作重启服务服务:
$ systemctl restart rabbitmq-server
5.5.5 端口说明
4369–erlang端口5672– 程序连接端口15672– 管理界面ui端口25672– 集群主机间内部通信端口
访问:192.168.50.138:15672


RabbitMQ默认管理员用户:guest 密码:guest
新添加的用户为:soho 密码:soso
5.6 部署集群前初始化操作
5.6.1 三台主机创建数据存放目录和日志目录
# Step 1:三台主机创建RabbitMQ数据存放处
$ mkdir -p /data/rabbitmq/data# Step 2:三台主机创建RabbitMQ日志文件存放处
$ mkdir -p /data/rabbitmq/logs# Step 3:三台主机修改权限和属主、属组
$ chmod 777 -R /data/rabbitmq
$ chown rabbitmq.rabbitmq /data/ -R# Step 4:三台主机创建RabbitMQ环境变量配置文件:
$ vim /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_MNESIA_BASE=/data/rabbitmq/data
RABBITMQ_LOG_BASE=/data/rabbitmq/logs# Step 5:三台主机重启服务
$ systemctl restart rabbitmq-server
5.6.2 rabbitmq-1拷贝erlang.cookie至其他两台主机
Rabbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群。Erlang的集群中各节点是经由各个cookie来实现的;cookie值存放在/var/lib/rabbitmq/.erlang.cookie中,文件是400的权限。所以必须保证各节点cookie一致,不然节点之间就无法通信;
官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。使用二进制式安装部署的rabbitmq,文件会在{home}目录下,是$home/.erlang.cookie。使用rpm等安装包方式进行安装,文件会在/var/lib/rabbitmq目录下。
# Step 1:查看Cookie值
$ cat /var/lib/rabbitmq/.erlang.cookie
HOUCUGJDZYTFZDSWXTHJ# Step 2:rabbitmq-1节点.erlang.cookie的值复制到其他两个节点中。
$ NODES='rabbitmq-2 rabbitmq-3'
$ for i in $NODES;do scp /var/lib/rabbitmq/.erlang.cookie $i:/var/lib/rabbitmq/;done
.erlang.cookie文件是Erlang运行时系统用于节点之间的认证文件。搭建RabbitMQ集群时,确保集群中各个节点可以相互通信和信任。每个节点在启动时都会检查.erlang.cookie文件的内容,只有拥有相同cookie值的节点才能成功地建立连接并进行通信。
5.6.3 另外两台RabbitMQ主机作为内存节点加到rabbitmq-1节点集群
# Step 1:其他两个节点停止节点,切记不是停止服务
$ rabbitmqctl stop_app # Step 2:若数据需要重置,执行以下命名。没有则不用
$ rabbitmqctl reset
注意查看回显,如果不是以上。就是错误;如果报错,重启rabbitmq服务
# Step 3: rabbitmq-2节点以内存节点加入rabbitmq-1集群
$ rabbitmqctl join_cluster --ram rabbit@rabbitmq-1
Clustering node 'rabbit@rabbitmq-2' with 'rabbit@rabbitmq-1' ...# Step 4:rabbitmq-2节点启动节点
$ rabbitmqctl start_app
Starting node rabbit@rabbitmq-2 ...completed with 3 plugins.# Step 5:rabbitmq-3节点以内存节点加入rabbitmq-1集群
$ rabbitmqctl join_cluster --ram rabbit@rabbitmq-1
Clustering node 'rabbit@rabbitmq-3' with 'rabbit@rabbitmq-1' ...# Step 6:rabbitmq-3节点启动节点
$ rabbitmqctl start_app
Starting node 'rabbit@rabbitmq-3' ...completed with 3 plugins.
注意事项:
默认
RabbitMQ启动后是磁盘节点,在执行rabbitmqctl join_cluster --ram rabbit@rabbitmq-1命令下,rabbitmq-2和rabbitmq-3为内存节点,rabbitmq-1为磁盘节点;
如果要使rabbitmq-2和rabbitmq-3都是磁盘节点,去掉--ram参数即可。
如果想要更改节点类型,可以使用命令rabbitmqctl change_cluster_node_type disc(ram),前提是必须停掉RabbitMQ基础应用Erlang。
5.6.4 查看集群状态
# Step 1:在RabbitMQ集群任意节点上执行 rabbitmqctl cluster_status来查看是否集群配置成功。在mq-1磁盘节点上面查看
$ rabbitmqctl cluster_status

每台机器显示出三台节点,表示已经添加成功!
5.6.5 登录RabbitMQ管理控制台,创建新的队列
打开浏览器输入[http://192.168.50.138:15672],
输入默认的Username:guest;输入默认的Password:guest

根据界面提示创建一条队列


5.7 RabbitMQ普通集群和镜像集群区别
该集群模式没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
5.8 RabbitMQ镜像集群配置
创建镜像集群:在任意一台机器操作
$ rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'# 命令回显
Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
rabbitmq set_policy:设置策略,"^"匹配所有的队列,ha-all策略名称为ha-all,'{"ha-mode":"all"}'策略模式为all即复制到所有节点,包含新增节点。
再次查看队列已经同步到其他两台节点:
"^"匹配所有的队列,ha-all策略名称为ha-all,'{"ha-mode":"all"}'策略模式为all即复制到所有节点,包含新增节点。
设置策略介绍:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称,可以定义
Pattern: queue的匹配模式(正则表达式),也就是说会匹配一组。 ^test.*
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-modeha-mode:指明镜像队列的模式,有效值为 all/exactly/nodesall:表示在集群中所有的节点上进行镜像exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定ha-params:ha-mode模式需要用到的参数ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
案例:
例如,对队列名称以hello开头的所有队列进行镜像,并在集群的两个节点上完成镜像,policy的设置命令为:
rabbitmqctl set_policy hello-ha “^hello)” ‘{“ha-mode”:”exactly”,”ha-params”:2,”ha-sync-mode”:”automatic”}’
则此时镜像队列设置成功。已经部署完成。将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一致。
1、消息队列常见功能?
2、常见的详细队列产品?
3、消息队列两种工作类型?
4、什么消息堆积?消息堆积的解决方案?
5、RabbitMQ的高可用方案?
6、rocketMQ高可用集群搭建?
