当前位置: 首页 > news >正文

消息队列 2.RabbitMQ

RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息中间件,主要用于实现分布式系统中的消息传递,支持异步通信、系统解耦、流量削峰等场景。在 Java 生态中,RabbitMQ 被广泛应用,其 Java 客户端提供了简洁的 API,方便开发者快速集成。

AMQP 协议

核心概念

1. 消息模型
AMQP 采用生产者 - 消费者模型,但引入了更复杂的路由机制:
  • 生产者(Producer):发送消息的应用
  • 消费者(Consumer):接收消息的应用
  • 消息中间件(Broker):负责接收、存储和转发消息
2. 核心组件

AMQP(Advanced Message Queuing Protocol)是一种开放标准的应用层协议,专为消息队列设计。它定义了客户端与消息中间件之间的通信规范,确保不同厂商的实现可以互操作。

+----------+    +---------+    +----------+
| Producer | -> | Exchange| -> | Queue    | -> Consumer
+----------+    +---------+    +----------+|v+---------+| Binding |+---------+
  • Exchange(交换器)

接收生产者的消息

根据规则(Binding)将消息路由到队列

类型包括:Direct、Topic、Fanout、Headers

  • Queue(队列)

存储消息直到被消费

支持多个消费者竞争消费

消息可持久化存储

  • Binding(绑定)

定义 Exchange 与 Queue 之间的关联

通过 Binding Key(绑定键)和 Routing Key(路由键)匹配

工作流程

1.生产者发送消息

        指定消息的 Routing Key

        将消息发送到特定的 Exchange

2.Exchange 路由逻辑

        Direct Exchange:按 Routing Key 精确匹配

        Topic Exchange:按 Routing Key 的模式匹配(支持*#通配符)

        Fanout Exchange:将消息广播到所有绑定的队列

        Headers Exchange:按消息头部属性匹配

3.消费者接收消息

        从队列中拉取或订阅消息

        处理完成后发送确认(ACK)

RabbitMQ 核心概念

在使用 RabbitMQ 前,需先理解其核心组件和消息流转逻辑:

组件

作用

生产者(Producer)

消息的发送方,负责创建并发送消息到 RabbitMQ 服务器。

消费者(Consumer)

消息的接收方,监听队列并处理接收到的消息。

队列(Queue)

消息的存储容器,位于 RabbitMQ 服务器中,消息最终会被投递到队列中等待消费。

交换机(Exchange)

接收生产者发送的消息,并根据绑定规则(Binding)将消息路由到对应的队列。

绑定(Binding)

定义交换机与队列之间的关联关系,包含路由键(Routing Key)和匹配规则。

路由键(Routing Key)

生产者发送消息时指定的键,交换机根据该键和绑定规则路由消息。

RabbitMQ 消息流转流程

消息从生产者到消费者的完整路径为:
生产者 → 交换机(根据 Routing Key 和绑定规则)→ 队列 → 消费者
  • 生产者发送消息时,需指定交换机名称路由键
  • 交换机根据自身类型(如 Direct、Topic 等)和绑定规则,将消息转发到匹配的队列;
  • 消费者监听队列,获取并处理消息。

Java 操作 RabbitMQ 基础示例

1. 连接 RabbitMQ 服务器

所有操作的前提是建立与 RabbitMQ 的连接,需指定服务器地址、端口、账号密码(默认账号guest仅允许本地连接,远程连接需配置新用户)。

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQConnection {// RabbitMQ连接配置private static final String HOST = "localhost"; // 服务器地址private static final int PORT = 5672; // 默认端口private static final String USERNAME = "guest";private static final String PASSWORD = "guest";// 获取连接public static Connection getConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);return factory.newConnection();}
}

2. 生产者发送消息

生产者需完成以下步骤:
  1. 创建连接和通道(Channel);
  2. 声明交换机(可选,若使用默认交换机则无需声明);
  3. 声明队列(指定队列名称、是否持久化等);
  4. 绑定交换机与队列(若使用自定义交换机);
  5. 发送消息(指定交换机、路由键、消息内容)。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 队列名称(需与消费者一致)private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 获取连接Connection connection = RabbitMQConnection.getConnection();// 2. 创建通道(RabbitMQ的操作大多通过通道完成)Channel channel = connection.createChannel();// 3. 声明队列(参数:队列名、是否持久化、是否排他、是否自动删除、附加参数)channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4. 消息内容String message = "Hello, RabbitMQ from Java!";// 5. 发送消息(参数:交换机名、路由键、消息属性、消息字节数组)// 此处使用默认交换机(""),路由键需与队列名一致channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生产者发送消息:" + message);// 6. 关闭资源channel.close();connection.close();}
}

3. 消费者接收消息

消费者需持续监听队列,步骤如下:
  1. 创建连接和通道;
  2. 声明队列(需与生产者队列名一致);
  3. 定义消息处理逻辑(通过DefaultConsumer回调);
  4. 开启消费(指定队列、是否自动确认消息)。
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer {private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 获取连接Connection connection = RabbitMQConnection.getConnection();// 2. 创建通道Channel channel = connection.createChannel();// 3. 声明队列(需与生产者一致,重复声明不会报错)channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("消费者已启动,等待接收消息...");// 4. 定义消息处理回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("消费者接收消息:" + message);};// 5. 开启消费(参数:队列名、是否自动确认、消息接收回调、取消消费回调)// 自动确认(autoAck=true):消息被接收后自动从队列删除;false则需手动确认channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

Spring AMQP简化 RabbitMQ

在 Spring Boot 项目中,可通过Spring AMQP简化 RabbitMQ 的使用,其封装了底层 API,提供注解驱动开发:

1.引入依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置application.yml

spring:rabbitmq:host: localhostport: 5673username: guestpassword: guest

3. 生产者(使用RabbitTemplate):

@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("queue_name", message);
}

4.消费者(使用@RabbitListener注解):

@RabbitListener(queues = "queue_name")
public void receiveMessage(String message) {System.out.println("接收消息:" + message);
}

交换机类型及 Java 实现

RabbitMQ 的交换机负责消息路由,不同类型的交换机路由规则不同,需根据场景选择:
1. Direct 交换机(精确匹配)
  • 路由规则:消息的路由键与绑定的路由键完全一致时,消息被路由到对应队列。
  • 适用场景:一对一通信(如订单通知)。
// 生产者声明Direct交换机并绑定队列
String EXCHANGE_NAME = "direct_exchange";
String ROUTING_KEY = "order.notify";
// 声明Direct交换机(参数:交换机名、类型、是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false);
// 绑定交换机与队列(参数:队列名、交换机名、路由键)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送消息(指定交换机和路由键)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
2. Topic 交换机(模糊匹配)
  • 路由规则:路由键支持通配符(*匹配一个单词,#匹配多个单词,单词以.分隔)。
  • 适用场景:多规则匹配(如日志分类:log.errorlog.warn)。
    // 生产者声明Topic交换机
    String EXCHANGE_NAME = "topic_exchange";
    // 路由键为"log.error"(匹配绑定键"log.*"或"log.#")
    String ROUTING_KEY = "log.error";
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);
    // 绑定队列到交换机,绑定键为"log.#"(匹配所有以log.开头的路由键)
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
3. Fanout 交换机(广播)
  • 路由规则:忽略路由键,将消息路由到所有绑定的队列。
  • 适用场景:一对多通信(如广播通知)。
// 生产者声明Fanout交换机
String EXCHANGE_NAME = "fanout_exchange";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false);
// 绑定多个队列到交换机(无需指定路由键)
channel.queueBind(QUEUE1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE2, EXCHANGE_NAME, "");
// 发送消息(路由键无效,可设为空)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

RabbitMQ 应用场景

  • 异步通信:如用户注册后异步发送邮件 / 短信通知;
  • 系统解耦:订单系统与库存系统通过消息通信,避免直接依赖;
  • 流量削峰:秒杀场景中,通过队列缓冲请求,避免服务器过载;
  • 日志收集:多服务日志通过 Fanout 交换机广播到日志处理服务。

总结

RabbitMQ 凭借其灵活的路由机制、可靠的消息传递和丰富的特性,成为 Java 分布式系统中消息中间件的首选之一。通过 Java 客户端或 Spring AMQP,开发者可快速实现消息的生产、消费及高级功能,有效提升系统的可扩展性和稳定性。

http://www.dtcms.com/a/286473.html

相关文章:

  • Elasticsearch:ES|QL 改进的时间线
  • [3-02-01].第01章:框架概述 - Spring生态
  • 表单、表格字段,输入完毕后立即点击【保存】,导致数据未更新就被保存
  • 【教程】基于无人机的大豆光合效率研究
  • 赛思SLIC芯片、语音芯片原厂 赛思SLIC语音芯片ASX630:国产强“芯”赋能FTTR全光网络​
  • vscode 一直连不上远程,网络是通的,ssh 也能直接登录远程
  • 【科研绘图系列】R语言绘制分组箱线图
  • SDC Specical check setting的描述 - false path
  • Docker笔记-部署Redis集群
  • leetcode15.三数之和题解:逻辑清晰带你分析
  • AWS(基础)
  • 网络基础10 :ACL真机实验
  • Redis原理之哨兵机制(Sentinel)
  • 【洛谷P1417】烹调方案 题解
  • ONNX模型使用指南:从零开始掌握跨领域模型部署
  • 图片平铺下去总是有个缝隙的解决方案
  • 塞舌尔公司的查册报告Certificate of Official Search是什么?有什么信息
  • 瀚高数据库开启Oracle兼容模块
  • vue2 面试题及详细答案150道(101 - 120)
  • ubuntu20.04使用unity3d做机器人仿真环境搭建
  • Python单例模式详解:从原理到实战的完整指南
  • 详解Linux(Ubuntu/RedHat/CentOS)及国产服务器统一加域管理方案
  • GoC 上册课程
  • java+vue+SpringBoot集团门户网站(程序+数据库+报告+部署教程+答辩指导)
  • docker--程序自启动
  • HIMA X-DO3201模块的冗余配置方法
  • Python Pandas 实践学习笔记(1)
  • Chainlit + FasiAPI+ LlamaIndex 实现RAG(一)
  • 深入解析:短连接 vs 长连接 vs 短轮询 vs 长轮询
  • keil5使用技巧----keil-build-viewer.exe插件使用