当前位置: 首页 > news >正文

RabbitMQ ①-MQ | Linux安装RabbitMQ | 快速上手

在这里插入图片描述

MQ

MQ(Message Queue)即消息队列,是一种应用间通信的一种方式。消息队列是一种异步通信方式,生产者(Producer)将消息放入队列,消费者(Consumer)从队列中取出消息进行消费。

MQ 的作用:

  • 异步解耦:生产者和消费者之间不需要同步,可以异步通信,降低耦合度,提高系统的吞吐量。在业务流程中,一些操作可能非常耗时,但并不需要即时返回结果。可以借助 MQ 把这些操作异步化
  • 流量削峰:MQ 可以缓冲生产者的请求,避免请求积压,从而避免瞬时流量过大,引起系统崩溃。
  • 消息分发:MQ 可以将消息分发给多个消费者,实现负载均衡。当多个系统需要对同一数据做出响应时,可以使用 MQ 进行消息分发。
  • 延迟通知:在需要在特定时间后发送通知的场景中,可以使用 MQ 的延迟消息功能,比如在电子商务平台中,如果用户下单后一定时间内未支付,可以使用延迟队列在超时后自动取消订单。

RabbitMQ

RabbitMQ 是一个消息中间件,也是一个生产者消费者模型,它负责接收,存储并转发消息。

RabbitMQ 也是 AMQP(Advanced Message Queuing Protocol)协议的一个实现,由 Erlang 语言开发,支持多种消息队列模型。RabbitMQ 是一个开源的、跨平台的、可靠的、支持多种应用的消息队列。

核心架构图:
在这里插入图片描述

  • Producer:消息的生产者,是 RabbitMQ Server 的客户端,向 RabbitMQ Server 发送消息。
  • Consumer:消息的消费者,是 RabbitMQ Server 的客户端,从 RabbitMQ Server 接收消息。
  • Broker:RabbitMQ Server,是消息队列服务器,接收生产者的消息,存储消息,并将消息转发给消费者。
  • Connection:连接,是客户端和 RabbitMQ 服务器之间的⼀个TCP连接。这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息
  • Channel:信道,是建立在连接之上的虚拟连接,它是消息的载体,生产者和消费者通过信道来发送和接收消息。
  • Virtual host:虚拟主机,是 RabbitMQ Server 中的一个虚拟概念,它是消息的逻辑隔离单元,它将多个用户的消息隔离开来,并提供权限控制和消息路由功能。
  • Exchange:交换机,是消息交换的中枢。message 到达 broker 的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个 Queue 列中。
  • Queue:队列,是消息的容器,存储消息直到消费者取出。
  • Binding:绑定,是 exchangequeue 的绑定关系,它决定了 exchangequeue 的路由规则。

工作流程:

  1. 生产者(Producer)生产一个消息。
  2. 生产者(Producer)连接到 RabbitMQ Broker,建立一个 Connection,开启一个 Channel。
  3. 生产者(Producer)声明一个 Exchange,路有消息。
  4. 生产者(Producer)声明一个 Queue,将消息发送到这个 Queue。
  5. 生产者(Producer)发送消息到 RabbitMQ Broker。
  6. RabbitMQ Broker 接收到消息,将消息存储到 Queue 中。
  7. RabbitMQ Broker 接收到消息,将消息存储到 Queue 中。如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者。

AMQP

RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

AMQP 协议的三层:

  • Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
  • Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
  • TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP 模型的三大组件:

  • 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
  • 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
  • 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。

RabbitMQ安装(Ubuntu)

安装 Erlang

sudo apt-get updatesudo apt-get install erlang

安装 RabbitMQ

sudo apt-get install rabbitmq-server# 确认安装成功
sudo systemctl status rabbitmq-server

安装 RabbitMQ 管理插件

sudo rabbitmq-plugins enable rabbitmq_management# 重启 RabbitMQ 服务
sudo systemctl restart rabbitmq-server

访问 RabbitMQ 管理界面

通过 公网IP:15672 访问 RabbitMQ 管理界面,默认用户名密码是 guest/guest

但是,从 3.3.0 开始,RabbitMQ 禁止使用 guset/guest 权限通过 localhost 外的访问。

解决方案

额外添加账号:

sudo rabbitmqctl add_user admin admin# 设置账号的权限
sudo rabbitmqctl set_user_tags admin administrator

RabbitMQ 快速上手

通过 Maven 创建普通 Java 项目

引入依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

编写生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.94.9.33"); // ? 公网 IPconnectionFactory.setPort(5672); // ? 端口connectionFactory.setUsername("admin"); // ? 用户名connectionFactory.setPassword("admin"); // ? 密码connectionFactory.setVirtualHost("/"); // ? 虚拟主机Connection connection = connectionFactory.newConnection();//  TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明交换机(使用内置的交换机)// TODO 4. 声明队列/*** Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,*                                  Map<String, Object> arguments)* queue:队列名称* durable:是否持久化* exclusive:是否独占* autoDelete:是否自动删除* arguments:参数*/channel.queueDeclare("hello", true, false, false, null);// TODO 5. 发送消息/*** void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:交换机名称* routingKey:使用内置交换机,routingKey 和队列名保持一致* props:属性配置* body:消息体*/String msg = "hello rabbitMQ~";for (int i = 0; i < 1000; i++) {channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功," + msg);// TODO 6. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}

编写消费者代码

package mq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.94.9.33"); // ? 公网 IPconnectionFactory.setPort(5672); // ? 端口connectionFactory.setUsername("admin"); // ? 用户名connectionFactory.setPassword("admin"); // ? 密码connectionFactory.setVirtualHost("/"); // ? 虚拟主机Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare("hello", true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){/*** 从队列中,收到消息就会执行该方法* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message 封包的消息,比如交换机,队列名称等...* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array)* @throws IOException IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume("hello", true, consumer);Thread.sleep(1000);// TODO 5. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}

相关文章:

  • 解锁健康生活:全新养身指南
  • HPE推出零信任网络与私有云运维解决方案
  • 盘古信息领德创|半导体存储与云计算存储小巨人企业IMS数字化升级项目正式启动!
  • 当智能科技遇上医疗行业会帮助疫苗如何方便管理呢?
  • Spring Boot Validation实战详解:从入门到自定义规则
  • C++负载均衡远程调用学习之集成测试与自动启动脚本
  • Axure疑难杂症:深度理解与认识“事件”“动作”(玩转交互)
  • 【Linux】Linux入门——权限
  • OpenCV 图形API(81)图像与通道拼接函数-----透视变换函数warpPerspective()
  • 相同的数(简单)
  • 基于 Spring Boot 瑞吉外卖系统开发(十)
  • Spring AI Alibaba-03- Spring AI + DeepSeek-R1 + ES/Milvus + RAG 智能对话应用开发全流程
  • 当手机开始预判你的下一步:一场正在颠覆生活的AI静默革命
  • 解决JSON.stringify方法数据丢失
  • Linux 系统上安装 Firefox 浏览器的完整指南
  • 码蹄集——直线切平面、圆切平面
  • C++入门基础(上)
  • Javase 基础加强 —— 06 Stream流
  • eNSP中路由器OSPF协议配置完整实验和命令解释
  • netty单线程并发量评估对比tomcat
  • 高进华“控股”后首份年报出炉,史丹利账上可动资金大幅缩水
  • 中方对中美就关税谈判的立场发生变化?外交部:中方立场没有任何改变
  • 南京明孝陵石兽遭涂鸦“到此一游”,景区:已恢复原貌,警方在排查
  • 今年五一档电影票房已破7亿
  • 外交部就习近平主席将应邀对俄罗斯进行国事访问并出席纪念苏联伟大卫国战争胜利80周年庆典答问
  • 光明日报头版评论:让投身西部成为青春潮流