当前位置: 首页 > news >正文

RocketMQ核心架构解析与实战指南

一、了解RocketMQ 

1.1 是什么?

RocketMQ 是阿里巴巴开源的分布式消息中间件,Apache 顶级项目,主打高吞吐、低延迟、高可靠,适用于异步通信、削峰填谷、分布式事务等场景(如购票系统、订单系统)。

1.2 核心架构:为什么要分离 NameServer 和 Broker?

这是 RocketMQ 架构设计的核心,本质是解耦 “路由管理” 与 “消息存储转发”,具体原因如下:

核心目标具体逻辑
解耦职责NameServer 只做 “路由导航”(存 Broker 地址、Topic 映射),Broker 只做 “消息仓库”(存消息、处理收发),避免功能混杂导致维护困难。
提升高可用NameServer 无状态可集群部署,某个节点宕机不影响整体;Broker 故障时,NameServer 会更新路由表,生产者 / 消费者自动切换,避免 “一损俱损”。
弹性扩展消息量暴增时,只需新增 Broker 节点(无需动 NameServer);路由请求多则新增 NameServer,按需扩容不浪费资源。

简单比喻:NameServer 是 “快递网点导航系统”(告诉你包裹该送哪个网点),Broker 是 “快递网点”(实际收寄包裹)。

二、Rocketmq的几种概念

2.1 组件角色

组件核心职责
NameServer1. 管理 Broker 路由信息;2. 给生产者 / 消费者提供 “消息在哪” 的查询服务;3. 无状态(不存消息)。
Broker1. 存储消息(接收生产者消息,写入磁盘 / 内存);2. 处理消息转发(给消费者推 / 拉消息);3. 支持主从复制(高可用)。
Producer消息生产者(如购票系统中 “用户提交订单” 模块,负责发消息到 Broker)。
Consumer消息消费者(如购票系统中 “扣减库存” 模块,负责从 Broker 读消息并处理)。

2.2 消息相关概念:Topic/Tag/ 消息体的关系

这是高频混淆点 ——Topic 和 Tag 是 “消息元数据”,不属于消息体,三者独立

概念本质作用是否属于消息体(Body)
Topic消息一级分类(如 “CONFIRM_ORDER”)决定消息路由到哪个 Broker/Queue,是收发消息的 “必选定位标识”。
Tag消息二级分类(如 “TAG_CREATE”)同一 Topic 下筛选消息(如只消费 “订单创建” 消息),避免接收无关数据。
消息体(Body)字节数组(byte [])承载业务数据(如ConfirmOrderMQDto序列化后的 JSON),仅生产者 / 消费者约定解析规则。

示例:一条购票消息的结构

  • Topic:CONFIRM_ORDER(标识 “购票相关消息”)
  • Tag:TAG_CREATE(标识 “订单创建” 类型)
  • Body:{"orderId":123,"trainCode":"G1001"}(实际业务数据)

2.3 其他关键概念

  • Queue:每个 Topic 包含多个 Queue(默认 4 个),分布在不同 Broker 上,用于负载均衡(如 4 个 Queue 分给 2 个消费者,各处理 2 个)。
  • Offset:消息在 Queue 中的 “位置下标”,消费者通过记录 Offset 确认 “已消费到哪”(如 Offset=100 表示已消费前 100 条)。
  • 死信队列:重试多次失败的消息会进入%DLQ%+消费者组名队列,需手动处理(如订单支付超时消息)。

三、环境搭建:Windows/Linux 快速启动

以 Windows 为例(Linux 流程类似,只需替换脚本后缀为.sh):

3.1 环境准备

  • JDK:1.8+(RocketMQ 5.x 支持 JDK 17)
  • 安装包:官网下载(推荐 5.0 + 版本)

3.2 配置与启动

  1. 解压安装包:如解压到D:\rocketmq-all-5.2.0-bin-release
  2. 修改 Broker 配置conf/broker.conf):

    properties

    brokerIP1=127.0.0.1  # 本机IP(集群部署填实际IP)
    namesrvAddr=127.0.0.1:9876  # NameServer地址
    
  3. 启动 NameServer:打开 CMD,进入bin目录,执行:

    cmd

    start mqnamesrv.cmd
    
    成功日志:The Name Server boot success...
  4. 启动 Broker:新打开 CMD,执行:

    cmd

    start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf
    
    成功日志:The broker[xxx, 127.0.0.1:10911] boot success...

3.3 验证服务

  • 发送测试消息:
    set NAMESRV_ADDR=127.0.0.1:9876
    tools.cmd org.apache.rocketmq.example.quickstart.Producer  # 发消息
    tools.cmd org.apache.rocketmq.example.quickstart.Consumer  # 收消息

四、两种开发方式

RocketMQ 支持原生 APISpring Boot 集成,后者更简洁(封装了细节),实际开发优先用 Spring Boot。

Spring Boot 集成步骤

1. 引入依赖
​
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>  <!-- Spring Boot 3需用2.2.0+版本 -->
</dependency>​
2. 配置文件(application.yml
​
rocketmq:name-server: 127.0.0.1:9876  # NameServer地址(必填)producer:group: my-producer-group  # 生产者组(必填,标识同一类生产者)send-message-timeout: 3000  # 发送超时时间(毫秒)​

五、发送方法

在 RocketMQ 中,消息发送方法的核心差异体现在 “可靠性”“阻塞性”“业务特性” 三个维度。根据不同的业务需求(如是否需要确认结果、是否有定时 / 事务需求),需选择合适的发送方法。以下从 “基础发送方式” 和 “特殊消息类型” 两方面详细讲解,结合实战场景说明用法和区别。

5.1基础发送方式(按 “可靠性与阻塞性” 分类)

这是最常用的三种发送方式,核心区别在于 “是否等待 Broker 响应” 和 “是否阻塞当前线程”。

1. 同步发送(Sync Send)

定义

发送消息后,阻塞当前线程,等待 Broker 返回确认结果(SendResult),直到超时或收到响应。

核心特点

  • 可靠性高:明确知道消息是否成功送达(成功 / 失败 / 超时);
  • 性能中等:因阻塞等待,TPS 低于异步发送;
  • 适用场景:核心业务(如订单创建、支付通知),必须确保消息不丢失且知道结果。

原生 API 实现(DefaultMQProducer

// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("sync-producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();// 构建消息(Topic、Tag、Body)
Message msg = new Message("order-topic",  // Topic"create",       // Tag(可选)"订单ID:1001".getBytes()  // 消息体(字节数组)
);// 同步发送(等待结果)
SendResult result = producer.send(msg);  // 超时默认3000ms,可指定:send(msg, 5000)
if (result.getSendStatus() == SendStatus.SEND_OK) {System.out.println("发送成功,消息ID:" + result.getMsgId());
} else {System.out.println("发送失败:" + result.getSendStatus());
}producer.shutdown();

Spring Boot 封装实现(RocketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;// 方式1:convertAndSend(自动序列化POJO,简化版同步发送)
public void sendSync1() {OrderDTO order = new OrderDTO(1001L, "手机");// 自动将OrderDTO转为JSON,发送到"order-topic:create"rocketMQTemplate.convertAndSend("order-topic:create", order);
}// 方式2:syncSend(更灵活,支持自定义超时、延迟等)
public void sendSync2() {OrderDTO order = new OrderDTO(1001L, "手机");// 自定义超时时间3秒SendResult result = rocketMQTemplate.syncSend("order-topic:create", order, 3000);if (result.getSendStatus() == SendStatus.SEND_OK) {System.out.println("发送成功");}
}
2. 异步发送(Async Send)

定义

发送消息后 不阻塞当前线程,立即返回,通过回调函数(SendCallback)异步处理 Broker 的响应结果(成功 / 失败)。

核心特点

  • 可靠性高:同样能获取发送结果,但通过异步回调处理;
  • 性能高:非阻塞,适合高并发场景;
  • 适用场景:响应时间敏感的业务(如用户 APP 提交订单后,无需等待库存扣减完成即可显示 “提交中”)。

原生 API 实现

​
DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();Message msg = new Message("order-topic", "pay", "订单支付:1001".getBytes());// 异步发送,通过回调处理结果
producer.sendAsync(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送成功,消息ID:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.err.println("异步发送失败:" + e.getMessage());// 失败处理:如重试、记录日志}
});// 注意:异步发送后需保持生产者运行(实际项目中无需手动shutdown,由Spring管理)
// producer.shutdown(); ​

Spring Boot 封装实现

​
public void sendAsync() {OrderDTO order = new OrderDTO(1001L, "手机");// 异步发送,回调处理结果rocketMQTemplate.asyncSend("order-topic:pay", order, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步发送成功");}@Overridepublic void onException(Throwable e) {System.err.println("异步发送失败");}});
}​
3. 单向发送(Oneway Send)

定义

发送消息后 不等待任何响应,直接返回,Broker 是否收到消息完全未知。

核心特点

  • 可靠性低:可能丢失消息(如网络故障、Broker 宕机);
  • 性能极高:无阻塞、无回调,适合超高频发送;
  • 适用场景:非核心业务(如日志收集、监控数据上报),允许偶尔丢失消息。

原生 API 实现

DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();Message msg = new Message("log-topic", "user", "用户1001登录".getBytes());// 单向发送,不关心结果
producer.sendOneway(msg); producer.shutdown();

Spring Boot 封装实现

public void sendOneway() {// 发送日志消息,不关心结果rocketMQTemplate.sendOneWay("log-topic:user", "用户1001登录");
}

5.2特殊消息类型(按 “业务特性” 分类)

针对特定业务场景(如定时任务、分布式事务、顺序依赖),RocketMQ 提供了专用发送方法。

1. 延迟消息(Delay Message)

定义

消息发送后,Broker 不会立即投递,而是 延迟指定时间后 再推送给消费者(如 “订单 30 分钟未支付自动取消”)。

核心特点

  • 延迟时间通过 “延迟级别” 控制(默认 18 级,如 1 级 = 1s、2 级 = 5s、3 级 = 10s...18 级 = 2h,可自定义配置);
  • 本质是 “临时存储 + 定时投递”:消息先存到 “延迟队列”,到达时间后转移到目标 Topic。

实现方式(Spring Boot)

​
public void sendDelay() {OrderDTO order = new OrderDTO(1001L, "手机");// 发送延迟消息:延迟级别3(默认10秒),超时时间3秒rocketMQTemplate.syncSend("order-delay-topic",  // Topicorder,                // 消息体3000,                 // 超时时间3                     // 延迟级别(3级=10秒));
}​

注意

  • 延迟级别需在 Broker 配置文件(broker.conf)中定义,默认配置:
    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    
  • 延迟消息不支持任意时间(如 “7 秒”),需按级别定义。

2. 事务消息(Transaction Message)

定义

解决分布式事务问题:通过 “半事务消息 + 本地事务 + 事务回查” 机制,确保跨服务操作的一致性(如 “下单” 和 “减库存” 要么都成功,要么都失败)。

核心流程

  1. 发送 “半事务消息”(Broker 标记为 “待确认”,暂不投递);
  2. 执行本地事务(如 “创建订单”);
  3. 根据本地事务结果,发送 “提交” 或 “回滚” 指令(Broker 收到 “提交” 后投递消息,“回滚” 则删除消息);
  4. 若本地事务结果未知(如超时),Broker 会主动回查生产者,确认最终状态。

实现方式(Spring Boot)

​
// 1. 发送事务消息
public void sendTransaction() {OrderDTO order = new OrderDTO(1001L, "手机");// 参数:生产者组、Topic:Tag、消息体、自定义业务参数rocketMQTemplate.sendMessageInTransaction("order-tx-group",       // 事务生产者组(需与监听器一致)"order-tx-topic:create", // Topic:Tagorder,                  // 消息体null                    // 自定义参数(如订单ID));
}// 2. 定义事务监听器(处理本地事务和回查)
@RocketMQTransactionListener(txProducerGroup = "order-tx-group") // 与生产者组一致
public class OrderTxListener implements RocketMQLocalTransactionListener {// 执行本地事务(如创建订单)@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 本地事务逻辑:如插入订单表OrderDTO order = JSON.parseObject(new String((byte[]) msg.getPayload()), OrderDTO.class);orderMapper.insert(order); // 假设插入订单return RocketMQLocalTransactionState.COMMIT; // 本地事务成功,提交消息} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK; // 本地事务失败,回滚消息}}// 事务回查(Broker查询本地事务状态)@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {OrderDTO order = JSON.parseObject(new String((byte[]) msg.getPayload()), OrderDTO.class);// 检查订单是否存在:存在则提交,不存在则回滚OrderDTO dbOrder = orderMapper.selectById(order.getId());return dbOrder != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;}​
3. 顺序消息(Ordered Message)

定义

确保消息 按发送顺序被消费(如同一订单的 “创建→支付→发货” 消息必须按顺序处理)。

核心原理

  • RocketMQ 中 “单个 Queue 内的消息是有序的”,但 Topic 包含多个 Queue(默认 4 个);
  • 发送时通过 “业务标识”(如订单 ID)将同一类消息路由到 同一个 Queue,消费时按 Queue 顺序处理。

实现方式(Spring Boot)

​
public void sendOrderly() {String orderId = "ORDER_1001"; // 业务标识(同一订单ID)// 发送3条同一订单的消息,确保顺序rocketMQTemplate.syncSendOrderly("order-seq-topic",  // Topic"创建订单:" + orderId, // 消息1orderId             // 路由key(同一key进入同一Queue));rocketMQTemplate.syncSendOrderly("order-seq-topic", "支付订单:" + orderId, orderId); // 消息2rocketMQTemplate.syncSendOrderly("order-seq-topic", "发货订单:" + orderId, orderId); // 消息3
}// 消费者(需确保同一Queue被同一消费者处理)
@RocketMQMessageListener(topic = "order-seq-topic",consumerGroup = "order-seq-group",consumeMode = ConsumeMode.ORDERLY // 顺序消费模式(默认是并发模式)
)
public class OrderSeqConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("顺序消费:" + message); // 按“创建→支付→发货”顺序输出}
}​

注意

  • 消费者需设置 consumeMode = ConsumeMode.ORDERLY(默认是 CONCURRENTLY 并发模式);
  • 顺序消息会降低消费吞吐量(单 Queue 无法并行处理),非必要不使用。

5.3发送方法对比

发送方式阻塞性可靠性性能核心场景关键 API(Spring)
同步发送阻塞核心业务(订单、支付)convertAndSendsyncSend
异步发送非阻塞响应敏感场景(用户交互)asyncSend
单向发送非阻塞极高非核心消息(日志、监控)sendOneWay
延迟消息可阻塞定时任务(超时取消)syncSend(带 delayLevel)
事务消息可阻塞最高分布式事务(跨服务一致性)sendMessageInTransaction
顺序消息可阻塞有序依赖(订单流程)syncSendOrderly

5.4应用场景

  1. 同步发送:订单创建通知库存扣减、支付结果同步订单状态、金融交易指令对账
  2. 异步发送:APP 下单后快速响应前端、秒杀活动流量削峰、批量非实时数据同步
  3. 单向发送:用户行为日志收集、系统监控指标上报、非关键浏览记录上报
  4. 延迟消息:订单超时未支付自动取消、会议前定时提醒、接口失败延迟重试
  5. 事务消息:下单扣库存(跨服务一致性)、银行转账(A 扣钱 B 加钱)、积分兑换商品
  6. 顺序消息:同一订单的创建→支付→发货流程、物流状态流转(已揽收→运输中→签收)、有序操作日志记录

六、通过RocketMQ在项目中实现异步购票逻辑

6.1 整体架构

用户发起购票请求 → BeforeConfirmOrderService(前置校验+发MQ) → RocketMQ → ConfirmOrderConsumer(消费消息+实际购票)

6.2 关键步骤代码解析

1. 前置校验与消息发送(BeforeConfirmOrderService
@Service
public class BeforeConfirmOrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public Long beforeDoConfirm(ConfirmOrderDoReq req) {// 1. 发送MQ消息(异步触发实际购票,避免用户等待)ConfirmOrderMQDto dto = new ConfirmOrderMQDto();dto.setOrderId(confirmOrder.getId());dto.setTrainCode(req.getTrainCode());// 自动转换DTO为Message,发送到CONFIRM_ORDER TopicrocketMQTemplate.convertAndSend(RocketMQTopicEnum.CONFIRM_ORDER.getCode(), dto);return confirmOrder.getId(); // 快速返回,用户无需等待购票完成}}
2. 消息消费与实际购票(ConfirmOrderConsumer
@Component
@RocketMQMessageListener(topic = "CONFIRM_ORDER",  // 监听发送的TopicconsumerGroup = "confirm-order-consumer-group"
)
public class ConfirmOrderConsumer implements RocketMQListener<ConfirmOrderMQDto> {@Autowiredprivate ConfirmOrderService confirmOrderService;@Overridepublic void onMessage(ConfirmOrderMQDto dto) {// 实际购票逻辑:扣减库存、更新订单状态为“已确认”confirmOrderService.doConfirm(dto);}
}

6.3 RocketMQ 在案例中的价值

  1. 削峰填谷:高并发抢票时,请求先进入 MQ 队列,消费者按能力处理,避免系统过载;
  2. 异步解耦:用户无需等待 “扣减库存” 完成,快速收到响应,提升体验;
  3. 可靠性保障:即使消费者故障,MQ 会持久化消息,重启后继续处理,避免购票请求丢失。

http://www.dtcms.com/a/545217.html

相关文章:

  • Excel怎么制作下拉菜单?
  • 如何做后台网站的教程WordPress+百度+主动
  • Faster-Whisper唤醒词检测程序设计实战1
  • MPP文件处理组件Aspose.Tasks教程:使用Python在Excel中打开MPP文件
  • Optimum:onnx模型量化
  • C++ 鸭科夫手柄适配
  • dubbo和springcloud的差别
  • Linux系统编程——目录操作函数
  • MitoSOX Red 别名:Mitochondrial Superoxide Indicator; 红色线粒体超氧化物荧光探针
  • 深圳做网站做得比较好的公司struts2 做的网站
  • Reflex:用纯Python写交互式Web应用,从0到1构建你的第一个UI
  • 怎么查找网站是谁做的钢材网站模板
  • 打造高清3D虚拟世界|零基础学习Unity HDRP高清渲染管线(第十天)
  • oto电子商务网站建设网站建设的公司上海
  • 【数据结构】链式结构二叉树详解
  • Flutter兼容性问题:Could not get unknown property ‘flutter‘ for extension ‘android‘
  • 【Linux系统编程】自动化构建-make/Makefile
  • php网站打开慢青海高端网站建设
  • 仓颉TreeMap红黑树结构深度解析
  • React中Suspense的分包实践
  • 垃圾收集器
  • CSharp UI更新及跨线程更新总结
  • 两个域名同一个网站做优化淘宝优惠网站怎么做
  • 深入仓颉UI:事件处理的声明式哲学与高阶实践
  • Actix Web 入门与实战
  • 外贸soho建站云南省建设厅网站二建
  • 20251029在AIO-3576Q38开发板的Android14下使用iperf3测试WIFI模块AP6256的网速【87.8 Mbits/sec】
  • 怎么用dede建设网站网站建设开放的端口
  • 基本select语句
  • linux命令-系统信息与监控-2