《RocketMQ核心架构与实战解析》
一、RocketMQ 核心架构与工作原理
1.1 整体架构设计
RocketMQ 采用分布式架构设计,主要由四个核心组件构成,各组件协同工作实现消息的生产、存储和消费全流程:
- NameServer:路由注册中心,负责管理 Broker 集群信息,提供轻量级路由服务。采用无状态设计,节点间无需通信,通过 Broker 定时上报保持数据一致性。
- Broker:消息服务器,核心组件,负责消息的接收、存储、持久化和投递。支持主从架构,可水平扩展以提高吞吐量和可用性。
- Producer:消息生产者,负责创建和发送消息到 Broker。发送前需从 NameServer 获取 Broker 地址信息。
- Consumer:消息消费者,从 Broker 拉取并处理消息。支持集群和广播两种消费模式。
1.2 核心概念详解
- Topic:消息主题,用于逻辑上分类消息,是消息的一级分类单位。例如可以为订单系统创建 "OrderTopic"。
- Queue:消息队列,物理存储单元,每个 Topic 包含多个 Queue,通过多队列实现负载均衡和并行处理。
- Tag:消息标签,用于对同一 Topic 下的消息进行更细粒度的分类,实现消息的过滤。例如 "OrderTopic" 下可以有 "Create"、"Pay"、"Cancel" 等 Tag。
- Message:消息实体,包含主题、标签、键值对属性、消息体等信息。
- Offset:消息偏移量,用于标识消息在 Queue 中的位置,消费者通过记录 Offset 来确定消费进度
1.3 消息流转过程
- Producer 启动时向 NameServer 注册,并获取 Topic 对应的 Broker 路由信息
- Producer 根据负载均衡策略选择一个 Broker 发送消息
- Broker 接收消息后,将其存储在 CommitLog 中,并同步到 Slave
- Consumer 启动时向 NameServer 注册,并订阅感兴趣的 Topic
- Consumer 从 Broker 拉取消息并处理
- Consumer 处理完成后更新消费 Offset,确保消息不重复消费
1.4 消费模式解析
RocketMQ 提供两种消费模式,适用于不同业务场景:
- 集群消费
一条消息只会被同 Group 中的一个 Consumer 消费
多个 Group 同时消费一个 Topic 时,每个 Group 都会有一个 Consumer 消费到数据
2.广播消费
消息将对一个 Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
二.3种消息发送方式 :
1.1 同步发送
同步发送是最常用的方式,发送消息后会等待 Broker 的响应,确保消息成功发送:
public class SyncProducer {public static void main(String[] args) throws Exception {// 1. 创建生产者实例,指定生产者组名DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");// 2. 设置NameServer地址producer.setNamesrvAddr("localhost:9876");// 3. 启动生产者producer.start();// 4. 创建并发送消息for (int i = 0; i < 10; i++) {Message msg = new Message("TestTopic", // 主题"TagA", // 标签("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息体);// 同步发送消息SendResult sendResult = producer.send(msg);System.out.printf("发送结果:%s%n", sendResult);}// 5. 关闭生产者producer.shutdown();}
}
1.2 异步发送
异步发送不会阻塞等待响应,而是通过回调函数处理发送结果,适用于对响应时间敏感的场景:
public class AsyncProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();// 异步发送回调SendCallback callback = new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("消息发送成功:%s%n", sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.err.printf("消息发送失败:%s%n", e.getMessage());// 可以在这里实现重试逻辑}};// 发送消息for (int i = 0; i < 10; i++) {Message msg = new Message("TestTopic", "TagB", ("Async Message " + i).getBytes());producer.send(msg, callback);}// 等待异步发送完成Thread.sleep(5000);producer.shutdown();}
}
1.3 单向发送
单向发送只负责发送消息,不等待响应也不处理回调,适用于可靠性要求不高的场景如日志收集:
public class OnewayProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 10; i++) {Message msg = new Message("TestTopic", "TagC", ("Oneway Message " + i).getBytes());// 单向发送producer.sendOneway(msg);}// 给服务器一点时间处理消息Thread.sleep(1000);producer.shutdown();}
}
1.4消息消费者实现
public class MessageConsumer {public static void main(String[] args) throws MQClientException {// 1. 创建消费者实例,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer_group");// 2. 设置NameServer地址consumer.setNamesrvAddr("localhost:9876");// 3. 订阅主题,指定需要消费的标签(*表示所有标签)consumer.subscribe("TestTopic", "*");// 4. 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 处理消息for (MessageExt msg : msgs) {try {String message = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);System.out.printf("收到消息:%s, 消息ID:%s%n", message, msg.getMsgId());} catch (UnsupportedEncodingException e) {e.printStackTrace();// 消息处理失败,返回RECONSUME_LATER表示稍后重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 消息处理成功,返回CONSUME_SUCCESSreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5. 启动消费者consumer.start();System.out.println("消费者启动成功,等待接收消息...");}
}
三.常见问题解析
1.1 消息重复消费问题
消息重复消费是分布式系统中不可避免的问题,解决方法是实现消费逻辑的幂等性:
- 利用消息 ID:对于同一业务场景,相同的消息 ID 视为重复消息
- 业务唯一键:使用业务中的唯一标识(如订单号)作为去重依据
- 分布式锁:消费前获取基于业务唯一键的分布式锁,确保只有一个消费者处理
1.2 消息丢失排查
当出现消息丢失时,可按以下步骤排查:
- 检查生产者是否正确处理发送结果,是否有发送失败的日志
- 检查 Broker 是否配置了持久化,磁盘空间是否充足
- 检查 Broker 是否有主从切换,消息是否同步成功
- 检查消费者是否在消息处理完成前返回了 CONSUME_SUCCESS
1.3 NameServer 与 ZooKeeper 的区别
RocketMQ 选择自主研发的 NameServer 而非 ZooKeeper,主要基于以下考虑:
- 功能需求:RocketMQ 只需要轻量级的路由服务,不需要 ZooKeeper 的复杂功能
- 性能考虑:NameServer 更轻量,性能更好,适合高并发场景
- 架构设计:NameServer 的无状态设计更符合 RocketMQ 的高可用需求
- 部署维护:NameServer 部署和维护更简单,降低运维成本
1.4保证消息不丢失的措施
消息在 Producer 端、Broker 端、Consumer 端都可能丢失,需分别处理:
1. Producer 端
- 采取send()同步发消息,发送结果可同步感知。
- 发送失败后可重试,默认重试 3 次,且重试时会发送到其他 Broker 上。
- 采用集群部署,若当前 Broker 宕机,可发送到其他 Broker。
2. Broker 端
- 修改刷盘策略为同步刷盘(默认是异步刷盘)。
- 采用集群部署,主从模式,保证高可用。
3. Consumer 端
完全消费正常后再进行手动 ack 确认
1.5Broker 处理拉取请求的流程
Consumer 首次请求 Broker 时:
- 若 Broker 中有符合条件的消息:
- 响应 Consumer。
- 等待下次 Consumer 的请求。
- 若没有符合条件的消息:
- 执行DefaultMessageStore#ReputMessageService#run方法。
- 通过PullRequestHoldService保持连接,每 5 秒检查pullRequestTable是否有消息,有则立即推送。
- 每隔 1ms 检查 commitLog 中是否有新消息,有则写入到pullRequestTable。
- 当有新消息时返回请求。
- 挂起 Consumer 的请求,即不断开连接,也不返回数据。
- 使用 Consumer 的 offset。