【RabbitMQ】简介
目录
- 一、MQ简介
- 二、RabbitMQ简介
- 三、Linux下 安装 RabbitMQ
- 3.1 Ubuntu 环境安装
- 3.2 CentOS 安装
- 四、工作流程
- 五、核心概念
- 六、快速上手示例
- 6.1 引入依赖
- 6.2 生产者
- 6.2.1 建立连接
- 6.2.2 开启信道 创建Channel
- 6.2.3 声明一个交换机和一个队列queue
- 6.2.4 发送消息
- 6.2.5 释放资源
- 6.2.6 运⾏
- 6.3 消费者

一、MQ简介
MQ:message queue,本质是一个先进先出的队列,只不过里面储存的是消息而已。消息可以是文本,JSON,也可以是内嵌对象等等。通常用于分布式系统之间的系统通信。
MQ的作用:接收和转发消息。
- 异步解耦:在业务流程中, ⼀些操作可能⾮常耗时, 但并不需要即时返回结果. 可以借助MQ把这些操作异步化, ⽐如 ⽤⼾注册后发送注册短信或邮件通知, 可以作为异步任务处理, ⽽不必等待这些操作完成后才告知⽤⼾注册成功.
- 流量削峰: 在访问量剧增的情况下, 应⽤仍然需要继续发挥作⽤, 但是这样的突发流量并不常⻅. 如果以能处理这类峰值为标准⽽投⼊资源,⽆疑是巨⼤的浪费. 使⽤MQ能够使关键组件⽀撑突发访问压⼒, 不会因为突发流量⽽崩溃. ⽐如秒杀或者促销活动, 可以使⽤MQ来控制流量, 将请求排队, 然后系统根据⾃⼰的处理能⼒逐步处理这些请求.
- 消息分发: 当多个系统需要对同⼀数据做出响应时, 可以使⽤MQ进⾏消息分发. ⽐如⽀付成功后, ⽀付系统可以向MQ发送消息, 其他系统订阅该消息, ⽽⽆需轮询数据库.
- 延迟通知: 在需要在特定时间后发送通知的场景中, 可以使⽤MQ的延迟消息功能, ⽐如在电⼦商务平台中,如果⽤⼾下单后⼀定时间内未⽀付,可以使⽤延迟队列在超时后⾃动取消订单.
二、RabbitMQ简介
RabbitMQ :采⽤Erlang语⾔实现AMQP(Advanced Message Queuing Protocol,⾼级消息队列协议)的消息中间件。
三、Linux下 安装 RabbitMQ
3.1 Ubuntu 环境安装
- 安装Erlang:
#更新软件包
sudo apt-get update
#安装erlang
sudo apt-get install erlang
查看erlang版本:erl
退出命令:halt().
- 安装RabbitMQ:
#更新软件包
sudo apt-get update
#安装rabbitmq
sudo apt-get install rabbitmq-server
#确认安装结果
systemctl status rabbitmq-server
- 安装RabbitMQ管理界⾯:
rabbitmq-plugins enable rabbitmq_management
- 添加管理员⽤⼾
rabbitmqctl add_user ${账号} ${密码}
- 给⽤⼾添加权限
rabbitmqctl set_user_tags ${账号} ${⻆⾊名称}
RabbitMQ⽤⼾⻆⾊分为Administrator、Monitoring、Policymaker、Management、Impersonator、None共六种⻆⾊
- Administrator 超级管理员,可登陆管理控制台(启⽤management plugin的情况下),可查看所有的信息,并且可以对⽤⼾,策略(policy)进⾏操作
- Monitoring 监控者,可登陆管理控制台(启⽤management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使⽤情况,磁盘使⽤情况等)。
- Policymaker 策略制定者,可登陆管理控制台(启⽤management plugin的情况下),同时可以对policy进⾏管理。但⽆法查看节点的相关信息.
- Management 普通管理者,仅可登陆管理控制台(启⽤management plugin的情况下),⽆法看到节点信息,也⽆法对策略进⾏管理.
- Impersonator 模拟者,⽆法登录管理控制台。
- None 其他⽤⼾,⽆法登陆管理控制台,通常就是普通的⽣产者和消费者。
- 通过 IP:port 访问界⾯,默认端口号15672
- 重启RabbitMQ:
sudo systemctl restart rabbitmq-server
- 服务操作:
#启动服务 sudo systemctl start rabbitmq-server #停⽌服务 sudo systemctl stop rabbitmq-server #重启服务 sudo systemctl restart rabbitmq-server #添加开机启动服务sudo systemctl enable rabbitmq-server #检查服务状态 sudo systemctl status rabbitmq-server
- 卸载RabbitMQ:
9.1. 停⽌RabbitMQ服务:sudo systemctl stop rabbitmq-server
9.2. 查找RabbitMQ安装情况dpkg -l | grep rabbitmq
9.3. 卸载rabbitmq已安装的相关内容sudo apt-get purge --auto-remove rabbitmq-server
9.4. 卸载Erlang
9.4.1. 查看erlang安装的相关列表:dpkg -l | grep erlang
9.4.2. 卸载erlang已安装的相关内容sudo apt-get purge --auto-remove erlang
3.2 CentOS 安装
- 安装Erlang
1.1. 查看系统版本:cat /etc/redhat-release
1.2. 下载 Erlang 的 rpm 包:wget --content-disposition "https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm?distro_version_id=140"
1.3. 安装 Erlang :yum localinstall erlang-23.3.4.11-1.el7.x86_64.rpm
- 安装 RabbitMQ
2.1. 下载 RabbitMQ 客⼾端wget --content-disposition "https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmqserver-3.8.30-1.el7.noarch.rpm/download.rpm?distro_version_id=140"
2.2. 安装 RabbitMQ 客⼾端: 导入签名秘钥:rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
,⽤ yum 进⾏安装yum localinstall rabbitmq-server-3.8.30-1.el7.noarch.rpm
- 安装 RabbitMQ 管理界⾯:
rabbitmq-plugins enable rabbitmq_management
- 重启RabbitMQ:
sudo service rabbitmq-server restart
- 服务操作:
#启动服务 service rabbitmq-server start #停⽌服务 sudo systemctl stop rabbitmq-server #重启服务 service rabbitmq-server restart #添加开机启动服务 chkconfig rabbitmq-server on
- 卸载RabbitMQ:
6.1. 停⽌RabbitMQ服务:service rabbitmq-server stop
6.2. 查看RabbitMQ安装列表:yum list|grep rabbitmq
6.3 卸载rabbitmq已安装的相关内容yum -y remove rabbitmq-server.noarch
6.4. 删除RabbitMQ相关⽂件rm -rf /var/lib/rabbitmq/
rm -rf /usr/local/rabbitmq
6.5. 卸载Erlang
6.5.1. 查看erlang安装的相关列表yum list | grep erlang
6.5.2. 卸载erlang已安装的相关内容yum -y remove erlang.x86_64
6.5.3. 删除Erlang相关⽂件rm -rf /usr/lib64/erlang/
rm -rf /usr/local/erlang
四、工作流程
流程图:
- 角色与组件
Broker(RabbitMQ Server)
├─ Virtual Host(vhost)——逻辑隔离单元,相当于数据库里的“库”
│ ├─ Exchange ——消息入口,负责路由
│ ├─ Queue ——消息缓冲,真正存储消息
│ ├─ Channel ——轻量级 TCP 子连接,生产/消费都在 channel 上发命令
│ ├─ Producer ——消息发布者
│ └─ Consumer ——消息接收者
└─ Connection ——TCP 长连接,一个 Connection 可开多条 Channel - 一次完整的消息流转(单 vhost 内)
① Producer 建立 TCP Connection → 在 Connection 上创建 Channel
② 通过 Channel 把消息发给 Exchange,并指定 routing-key 等参数
③ Exchange 根据自身类型(direct / topic / fanout / headers)和 Binding 规则,把消息路由到 0~N 个 Queue
④ Queue 把消息按 FIFO 缓冲;若设置了持久化,则落地磁盘
⑤ Consumer 建立 Connection/Channel → 订阅(basic.consume)或单条获取(basic.get)Queue
⑥ Queue 把消息推/拉给 Consumer,Consumer 回复 ack(或自动 ack)
⑦ RabbitMQ 收到 ack 后删除该消息;若未收到 ack 且 Consumer 断开,消息重新入队(requeue)
多 vhost 与多角色并发
图中出现多个“Virtual Host”字样,说明可在同一 Broker 内跑多套隔离环境;不同 vhost 的 Exchange/Queue 完全隔离,互不可见。 - 多个 Producer1、Producer2、Consumer… 表明:
– 一个 Exchange 可对接多个 Producer;
– 一个 Queue 可对接多个 Consumer(平均分摊消息,实现负载均衡);
– 一个 Channel 只能串行收发,但应用可建多 Channel 并发。 - 一句话总结
“生产者把消息先给交换器,交换器按规则把消息转存到队列,队列再把消息分发给消费者”——这就是 RabbitMQ 最简也最核心的工作流程;而 Virtual Host、Connection、Channel 只是为了让这条流程在多租户、多线程、高并发场景下更安全、更高效。
五、核心概念
-
Producer 和 Consumer
Producer: ⽣产者, 是RabbitMQ Server的客⼾端, 向RabbitMQ发送消息
Consumer: 消费者, 也是RabbitMQ Server的客⼾端, 从RabbitMQ接收消息
Broker:其实就是RabbitMQ Server, 主要是接收和收发消息
-
Connection和Channel
Connection: 连接. 是客⼾端和RabbitMQ服务器之间的⼀个TCP连接. 这个连接是建⽴消息传递的基础, 它负责传输客⼾端和服务器之间的所有数据和控制信息.
Channel: 通道, 信道. Channel是在Connection之上的⼀个抽象层. 在 RabbitMQ 中, ⼀个TCP连接可以有多个Channel, 每个Channel都是独⽴的虚拟连接. 消息的发送和接收都是基于 Channel的.
通道的主要作⽤是将消息的读写操作复⽤到同⼀个TCP连接上,这样可以减少建⽴和关闭连接的开销,提⾼性能.
-
Virtual host
Virtual host: 虚拟主机. 这是⼀个虚拟概念. 它为消息队列提供了⼀种逻辑上的隔离机制. 对于RabbitMQ⽽⾔, ⼀个 BrokerServer 上可以存在多个 Virtual Host. 当多个不同的⽤⼾使⽤同⼀个 RabbitMQ Server 提供的服务时,可以虚拟划分出多个 vhost,每个⽤⼾在⾃⼰的 vhost 创建 exchange/queue 等 -
Queue
Queue: 队列, 是RabbitMQ的内部对象, ⽤于存储消息.
-
Exchange
Exchange: 交换机. message 到达 broker 的第⼀站, 它负责接收⽣产者发送的消息, 并根据特定的规则把这些消息路由到⼀个或多个Queue列中.
Exchange起到了消息路由的作⽤,它根据类型和规则来确定如何转发接收到的消息.
六、快速上手示例
6.1 引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.18.0</version>
</dependency>
6.2 生产者
6.2.1 建立连接
建立连接需要:IP,端口号,账号、密码、虚拟主机。
我们使用com.rabbitmq.client
包下的ConnectionFactory
连接工厂类设置需要的配置,并且创建连接。
//连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("101.43.47.137");//ip地址connectionFactory.setPort(5672);//默认端口号connectionFactory.setUsername("study");//用户connectionFactory.setPassword("study");//用户密码connectionFactory.setVirtualHost("study");//虚拟主机//获取连接Connection connection = connectionFactory.newConnection();
6.2.2 开启信道 创建Channel
//开启信道Channel channel = connection.createChannel();
6.2.3 声明一个交换机和一个队列queue
交换机我们使用RabbitMQ自带的,
声明队列使用Channel类的queueDeclare方法。 queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数说明:
- queue:队列名
- durable: 是否持久化.true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
- exclusive:是否独占,只能有一个消费者监听队列
- autoDelete: 是否⾃动删除, 当没有消费者时, ⾃动删除掉
- arguments:⼀些参数 。
//声明队列channel.queueDeclare("hello",true,false,true,null);
6.2.4 发送消息
通过Channel类的basicPublish方法发送消息到队列中 basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
参数说明:
- exchange:交换机名称, 简单模式下, 交换机会使⽤默认的""
- routingKey:路由名称,等于队列名称
- props:配置信息
- body:发送的消息
//发送消息String msg = "hello rabbitMQ";channel.basicPublish("","hello",null,msg.getBytes());
6.2.5 释放资源
释放信道和连接。
//资源释放channel.close();connection.close();
6.2.6 运⾏
总代码:
package org.example.rabbitmq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("101.43.47.137");//ip地址connectionFactory.setPort(5672);//默认端口号connectionFactory.setUsername("study");//用户connectionFactory.setPassword("study");//用户密码connectionFactory.setVirtualHost("study");//虚拟主机//获取连接Connection connection = connectionFactory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("hello",true,false,true,null);//发送消息String msg = "hello rabbitMQ";channel.basicPublish("","hello",null,msg.getBytes());//资源释放channel.close();connection.close();}
}
结果:
6.3 消费者
步骤:
- 创建连接
- 创建Channel
- 声明⼀个队列Queue
- 消费消息
- 释放资源
消费消息:使用Channel类的basicConsume方法basicConsume(String queue, boolean autoAck, Consumer callback)
参数说明:
- queue: 队列名称
- autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认
- callback: 回调对象
Consumer类说明:
Consumer ⽤于定义消息消费者的⾏为. 当我们需要从RabbitMQ接收消息时, 需要提供⼀个实现了Consumer 接⼝的对象.
DefaultConsumer类 是 RabbitMQ提供的⼀个默认消费者, 实现了Consumer 接⼝.
核⼼⽅法:
handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
: 从队列接收到消息时, 会⾃动调⽤该⽅法.
在这个⽅法中, 我们可以定义如何处理接收到的消息, 例如打印消息内容, 处理业务逻辑或者将消息存储到数据库等.
参数说明如下:
- consumerTag :消费者标签,通常是消费者在订阅队列时指定的.
- envelope :包含消息的封包信息,如队列名称,交换机等.
- properties :⼀些配置信息
- body :消息的具体内容
总代码:
package org.example.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("101.43.47.137");//ip地址connectionFactory.setPort(5672);//默认端口号connectionFactory.setUsername("study");//用户connectionFactory.setPassword("study");//用户密码connectionFactory.setVirtualHost("study");//虚拟主机//创建连接Connection connection = connectionFactory.newConnection();//开启信道//创建信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("hello",true,false,true,null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume("hello",true, consumer);//释放资源channel.close();;connection.close();}
}
运行: