RabbitMQ ①-MQ | Linux安装RabbitMQ | 快速上手
MQ
MQ(Message Queue)即消息队列,是一种应用间通信的一种方式。消息队列是一种异步通信方式,生产者(Producer)将消息放入队列,消费者(Consumer)从队列中取出消息进行消费。
MQ
的作用:
- 异步解耦:生产者和消费者之间不需要同步,可以异步通信,降低耦合度,提高系统的吞吐量。在业务流程中,一些操作可能非常耗时,但并不需要即时返回结果。可以借助
MQ
把这些操作异步化 - 流量削峰:
MQ
可以缓冲生产者的请求,避免请求积压,从而避免瞬时流量过大,引起系统崩溃。 - 消息分发:
MQ
可以将消息分发给多个消费者,实现负载均衡。当多个系统需要对同一数据做出响应时,可以使用MQ
进行消息分发。 - 延迟通知:在需要在特定时间后发送通知的场景中,可以使用
MQ
的延迟消息功能,比如在电子商务平台中,如果用户下单后一定时间内未支付,可以使用延迟队列在超时后自动取消订单。 - …
RabbitMQ
RabbitMQ 是一个消息中间件,也是一个生产者消费者模型,它负责接收,存储并转发消息。
RabbitMQ 也是 AMQP
(Advanced Message Queuing Protocol)协议的一个实现,由 Erlang
语言开发,支持多种消息队列模型。RabbitMQ 是一个开源的、跨平台的、可靠的、支持多种应用的消息队列。
核心架构图:
Producer
:消息的生产者,是 RabbitMQ Server 的客户端,向 RabbitMQ Server 发送消息。Consumer
:消息的消费者,是 RabbitMQ Server 的客户端,从 RabbitMQ Server 接收消息。Broker
:RabbitMQ Server,是消息队列服务器,接收生产者的消息,存储消息,并将消息转发给消费者。Connection
:连接,是客户端和 RabbitMQ 服务器之间的⼀个TCP连接。这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息Channel
:信道,是建立在连接之上的虚拟连接,它是消息的载体,生产者和消费者通过信道来发送和接收消息。Virtual host
:虚拟主机,是 RabbitMQ Server 中的一个虚拟概念,它是消息的逻辑隔离单元,它将多个用户的消息隔离开来,并提供权限控制和消息路由功能。Exchange
:交换机,是消息交换的中枢。message
到达broker
的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个Queue
列中。Queue
:队列,是消息的容器,存储消息直到消费者取出。Binding
:绑定,是exchange
和queue
的绑定关系,它决定了exchange
到queue
的路由规则。
工作流程:
- 生产者(Producer)生产一个消息。
- 生产者(Producer)连接到 RabbitMQ Broker,建立一个 Connection,开启一个 Channel。
- 生产者(Producer)声明一个 Exchange,路有消息。
- 生产者(Producer)声明一个 Queue,将消息发送到这个 Queue。
- 生产者(Producer)发送消息到 RabbitMQ Broker。
- RabbitMQ Broker 接收到消息,将消息存储到 Queue 中。
- RabbitMQ Broker 接收到消息,将消息存储到 Queue 中。如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者。
AMQP
RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。
AMQP 协议的三层:
- Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
- Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
- TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。
AMQP 模型的三大组件:
- 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
- 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
- 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
RabbitMQ安装(Ubuntu)
安装 Erlang
sudo apt-get updatesudo apt-get install erlang
安装 RabbitMQ
sudo apt-get install rabbitmq-server# 确认安装成功
sudo systemctl status rabbitmq-server
安装 RabbitMQ 管理插件
sudo rabbitmq-plugins enable rabbitmq_management# 重启 RabbitMQ 服务
sudo systemctl restart rabbitmq-server
访问 RabbitMQ 管理界面
通过 公网IP:15672
访问 RabbitMQ 管理界面,默认用户名密码是 guest/guest
。
但是,从 3.3.0 开始,RabbitMQ 禁止使用 guset/guest 权限通过 localhost 外的访问。
解决方案
额外添加账号:
sudo rabbitmqctl add_user admin admin# 设置账号的权限
sudo rabbitmqctl set_user_tags admin administrator
RabbitMQ 快速上手
通过 Maven 创建普通 Java 项目
引入依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>
编写生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.94.9.33"); // ? 公网 IPconnectionFactory.setPort(5672); // ? 端口connectionFactory.setUsername("admin"); // ? 用户名connectionFactory.setPassword("admin"); // ? 密码connectionFactory.setVirtualHost("/"); // ? 虚拟主机Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明交换机(使用内置的交换机)// TODO 4. 声明队列/*** Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,* Map<String, Object> arguments)* queue:队列名称* durable:是否持久化* exclusive:是否独占* autoDelete:是否自动删除* arguments:参数*/channel.queueDeclare("hello", true, false, false, null);// TODO 5. 发送消息/*** void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:交换机名称* routingKey:使用内置交换机,routingKey 和队列名保持一致* props:属性配置* body:消息体*/String msg = "hello rabbitMQ~";for (int i = 0; i < 1000; i++) {channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功," + msg);// TODO 6. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}
编写消费者代码
package mq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.94.9.33"); // ? 公网 IPconnectionFactory.setPort(5672); // ? 端口connectionFactory.setUsername("admin"); // ? 用户名connectionFactory.setPassword("admin"); // ? 密码connectionFactory.setVirtualHost("/"); // ? 虚拟主机Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare("hello", true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){/*** 从队列中,收到消息就会执行该方法* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message 封包的消息,比如交换机,队列名称等...* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array)* @throws IOException IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume("hello", true, consumer);Thread.sleep(1000);// TODO 5. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}