RocketMQ核心知识点
文章目录
- 1. 核心架构 ⭐⭐⭐⭐⭐
- 四大组件
- 核心要点
- 2. 消息发送方式 ⭐⭐⭐⭐⭐
- 三种发送方式
- 核心代码
- 核心要点
- 3. 消息消费模式 ⭐⭐⭐⭐⭐
- 两种消费模式
- 集群消费(默认)
- 广播消费
- 核心要点
- 4. 消息存储机制 ⭐⭐⭐⭐⭐
- CommitLog + ConsumeQueue 架构
- 为什么这样设计?
- 核心要点
- 5. 消息可靠性 ⭐⭐⭐⭐⭐
- 三个维度保证可靠性
- 刷盘策略
- 主从复制策略
- 消费失败重试机制
- 核心要点
- 6. 消息顺序性 ⭐⭐⭐⭐⭐
- 为什么会乱序?
- 顺序消息实现
- 顺序消费
- 核心要点
- 7. 消息堆积处理 ⭐⭐⭐⭐⭐
- 堆积原因
- 应急处理
- 核心要点
- 8. 事务消息 ⭐⭐⭐⭐⭐
- 分布式事务解决方案
- RocketMQ事务消息流程
- 核心代码
- 核心要点
- 9. 延迟消息 ⭐⭐⭐⭐
- 延迟等级
- 使用场景
- 实现原理
- 核心要点
- 10. 高可用机制 ⭐⭐⭐⭐⭐
- 主从模式
- Dledger模式(推荐)
- NameServer高可用
- 核心要点
- 面试核心总结
- 必问问题(优先级)
- 常见面试问题及答案索引
- Q1: RocketMQ和Kafka的区别?
- Q2: 消息丢失怎么办?
- Q3: 消息重复消费怎么处理?
- Q4: 消息堆积如何解决?
- Q5: 如何保证消息顺序?
- Q6: 事务消息的实现原理?
- Q7: RocketMQ的存储结构?
- Q8: 如何保证高可用?
- 快速记忆要点
1. 核心架构 ⭐⭐⭐⭐⭐
四大组件
┌─────────────┐ ┌─────────────┐
│ Producer │ │ Consumer │
│ (生产者) │ │ (消费者) │
└──────┬──────┘ └──────┬──────┘│ │├────────┬───────────┤↓ ↓ ↓
┌─────────────────────────────────┐
│ NameServer (名字服务) │
│ - 路由注册中心 │
│ - Broker注册/发现 │
└────────────┬────────────────────┘│┌─────┴─────┐↓ ↓
┌──────────┐ ┌──────────┐
│ Broker-M │ │ Broker-S │
│ (Master) │ │ (Slave) │
│ 存储消息 │ │ 备份消息 │
└──────────┘ └──────────┘
四大角色:
- Producer:消息生产者,发送消息
- Consumer:消息消费者,接收并处理消息
- NameServer:路由中心,管理Broker信息
- Broker:消息存储服务器,存储和转发消息
核心要点
"RocketMQ采用分布式架构,由四个核心组件组成:
NameServer 是轻量级的路由注册中心,类似Zookeeper但更轻量,负责Broker的注册和发现。各个节点之间不通信,每个节点都有完整的路由信息。
Broker 是消息存储服务器,负责消息的存储、投递和查询。支持主从模式,Master负责读写,Slave负责备份和读。
Producer 通过NameServer获取Broker地址,然后直接与Broker通信发送消息。
Consumer 同样通过NameServer获取路由信息,从Broker拉取消息进行消费。
这种架构的优势是NameServer无状态、易扩展,Broker可以水平扩展,整体高可用。"
2. 消息发送方式 ⭐⭐⭐⭐⭐
三种发送方式
| 方式 | 可靠性 | 性能 | 使用场景 |
|---|---|---|---|
| 同步发送 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 订单、支付、重要通知 |
| 异步发送 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 用户注册、短信通知 |
| 单向发送 | ⭐ | ⭐⭐⭐⭐⭐ | 日志采集、埋点上报 |
核心代码
// 1. 同步发送(阻塞等待结果)
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {// 发送成功
}// 2. 异步发送(立即返回,回调通知)
producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult result) { }@Overridepublic void onException(Throwable e) { }
});// 3. 单向发送(无返回值,不关心结果)
producer.sendOneway(msg);
核心要点
"RocketMQ提供三种消息发送方式:
同步发送最可靠,发送后会阻塞等待Broker响应,确认消息发送成功才返回。适用于订单、支付等不能丢失的重要消息,但性能相对较低。
异步发送性能最高,发送后立即返回,通过回调接口异步获取发送结果。适用于用户注册、短信通知等场景,既保证可靠性又不阻塞主流程。
单向发送性能最好,发送后不等待任何响应,也不关心发送结果。适用于日志采集、埋点上报等允许少量丢失的场景。
我们在项目中通常用异步发送,平衡了可靠性和性能。"
3. 消息消费模式 ⭐⭐⭐⭐⭐
两种消费模式
| 模式 | 特点 | 使用场景 |
|---|---|---|
| 集群消费 | 一条消息只被消费一次 | 订单处理、支付通知 |
| 广播消费 | 每个消费者都消费全量消息 | 配置更新、缓存刷新 |
集群消费(默认)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 集群消费(默认模式)
consumer.setMessageModel(MessageModel.CLUSTERING);// 工作原理:
// Consumer1: 消费消息1、3、5
// Consumer2: 消费消息2、4、6
// → 负载均衡,每条消息只被消费一次
特点:
- ✅ 同一Group的多个Consumer平均分摊消息
- ✅ 实现负载均衡
- ✅ 提高消费能力
- ❌ 一条消息只能被一个Consumer消费
广播消费
consumer.setMessageModel(MessageModel.BROADCASTING);// 工作原理:
// Consumer1: 消费消息1、2、3、4、5、6(全部)
// Consumer2: 消费消息1、2、3、4、5、6(全部)
// → 每个Consumer都收到全量消息
特点:
- ✅ 每个Consumer都消费全量消息
- ✅ 适合配置更新、缓存刷新
- ❌ 不会重试(失败就失败了)
- ❌ 不保存消费进度
核心要点
"RocketMQ有两种消费模式:
集群消费是默认模式,同一个Consumer Group内的多个消费者会负载均衡地消费消息,每条消息只会被其中一个消费者消费。这种模式适合订单处理、支付通知等业务场景,可以通过增加消费者来提高消费能力。
广播消费模式下,每个消费者都会收到Topic的全量消息。比如有10条消息,3个消费者,那么每个消费者都会收到这10条消息。这种模式适合配置更新、缓存刷新等需要所有节点都执行的场景。
需要注意的是,广播消费不会重试,也不保存消费进度,所以一般用于允许丢失的场景。"
4. 消息存储机制 ⭐⭐⭐⭐⭐
CommitLog + ConsumeQueue 架构
消息写入流程:
Producer → Broker → CommitLog(顺序写)↓异步构建 ConsumeQueue(索引)↓Consumer 通过 ConsumeQueue 查找↓从 CommitLog 读取完整消息存储结构:
/store/
├── commitlog/ # 消息内容(所有Topic混合存储)
│ ├── 00000000000000000000
│ ├── 00000000001073741824
│ └── ...
├── consumequeue/ # 消息索引(按Topic/Queue分开)
│ ├── TopicA/
│ │ ├── 0/ # Queue 0的索引
│ │ ├── 1/ # Queue 1的索引
│ │ └── ...
│ └── TopicB/
│ └── ...
└── index/ # Hash索引(按Key查询)
为什么这样设计?
优点:
- ✅ CommitLog顺序写:性能极高(磁盘顺序写接近内存随机写)
- ✅ 所有Topic共享CommitLog:避免文件碎片
- ✅ ConsumeQueue轻量:只存储offset、size、tagCode
- ✅ 消费快速:通过索引快速定位
核心要点
"RocketMQ采用CommitLog + ConsumeQueue的存储架构。
CommitLog 是消息的实际存储文件,所有Topic的消息都混合顺序写入CommitLog,每个文件固定1GB。顺序写的性能非常高,可以达到磁盘的极限性能。
ConsumeQueue 是消息的逻辑队列,按Topic和QueueId分开存储,每条记录只存储消息的offset、大小和tag哈希值,非常轻量。Consumer消费时先读ConsumeQueue找到消息在CommitLog中的位置,再去CommitLog读取完整消息。
这种设计的好处是:写入性能极高(顺序写),消费效率也很高(通过索引定位),还避免了文件碎片问题。
另外还有Index索引文件,用于按Key或时间范围查询消息。"
5. 消息可靠性 ⭐⭐⭐⭐⭐
三个维度保证可靠性
1. 生产端可靠性├─ 同步发送(确认发送成功)├─ 异步发送(回调确认)└─ 失败重试(默认2次)2. Broker可靠性├─ 消息持久化(刷盘)├─ 主从复制(同步/异步)└─ 高可用(Dledger)3. 消费端可靠性├─ 消费确认(ACK)├─ 消费失败重试(默认16次)└─ 死信队列(Dead Letter Queue)
刷盘策略
// 同步刷盘(可靠,慢)
flushDiskType = SYNC_FLUSH
// 流程:消息写入PageCache → 等待刷盘完成 → 返回成功
// 优点:消息不会丢失
// 缺点:性能低(每次等待磁盘IO)// 异步刷盘(快,可能丢失)
flushDiskType = ASYNC_FLUSH
// 流程:消息写入PageCache → 立即返回成功 → 后台异步刷盘
// 优点:性能高
// 缺点:Broker宕机可能丢失少量消息
主从复制策略
// 同步复制(可靠,慢)
brokerRole = SYNC_MASTER
// 流程:消息写入Master → 同步到Slave → 返回成功
// 优点:数据不丢失
// 缺点:性能低(等待网络同步)// 异步复制(快,可能丢失)
brokerRole = ASYNC_MASTER
// 流程:消息写入Master → 立即返回 → 异步同步到Slave
// 优点:性能高
// 缺点:Master宕机可能丢失未同步的消息
消费失败重试机制
// 消费失败返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试策略:
重试次数 延迟时间
1次 10秒
2次 30秒
3次 1分钟
4次 2分钟
5次 3分钟
...
16次 2小时// 16次后 → 进入死信队列(%DLQ%ConsumerGroup)
核心要点
"RocketMQ从三个维度保证消息可靠性:
生产端:提供同步发送和异步发送,同步发送会等待Broker确认,异步发送通过回调获取结果。发送失败会自动重试,默认重试2次。
Broker端:通过刷盘和主从复制保证可靠性。刷盘有同步和异步两种,同步刷盘等待数据写入磁盘才返回,不会丢失但性能低;异步刷盘立即返回,性能高但宕机可能丢失。主从复制也分同步和异步,同步复制等待数据同步到Slave,异步复制则立即返回。
消费端:消费失败会自动重试,默认最多重试16次,延迟时间逐渐增加,从10秒到2小时。16次后仍失败的消息会进入死信队列,需要人工介入处理。
生产环境通常使用:异步刷盘+异步复制,平衡性能和可靠性。金融场景会用同步刷盘+同步复制,确保消息不丢失。"
6. 消息顺序性 ⭐⭐⭐⭐⭐
为什么会乱序?
原因1:多个Queue并行消费
Queue0: 消息1、4、7
Queue1: 消息2、5、8 → 消费顺序可能是:1、2、5、4、7、8(乱序)
Queue2: 消息3、6、9原因2:消费重试
消息1 → 消费失败 → 重试(延迟10秒)
消息2 → 消费成功
消息3 → 消费成功
10秒后消息1重试 → 顺序变成:2、3、1(乱序)
顺序消息实现
/*** 发送顺序消息:确保相同订单的消息进入同一队列*/
public class OrderProducer {public void sendOrderMessage(Order order) throws Exception {Message msg = new Message("OrderTopic", order.toJson().getBytes());// 使用MessageQueueSelector选择队列SendResult result = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long orderId = (Long) arg;// 相同订单ID选择同一队列int index = (int) (orderId % mqs.size());return mqs.get(index);}}, order.getOrderId()); // 订单ID作为参数System.out.println("订单消息发送到队列: " + result.getMessageQueue().getQueueId());}
}// 使用
Order order = new Order(12345L, "创建订单");
sendOrderMessage(order); // 订单12345 → Queue 1order = new Order(12345L, "支付订单");
sendOrderMessage(order); // 订单12345 → Queue 1(同一队列)order = new Order(12345L, "完成订单");
sendOrderMessage(order); // 订单12345 → Queue 1(同一队列)
// ✓ 保证顺序:创建 → 支付 → 完成
顺序消费
/*** 顺序消费:单线程消费,保证顺序*/
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 单线程顺序处理for (MessageExt msg : msgs) {System.out.println("顺序消费: " + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}
});
核心要点
"RocketMQ的顺序消息分为全局顺序和分区顺序。
全局顺序需要Topic只有一个Queue,所有消息都进入这个Queue,单线程消费。这种方式可靠但性能差,一般不推荐。
分区顺序是实际常用的方式,通过MessageQueueSelector将相同业务ID的消息路由到同一个Queue。比如订单消息,将同一个订单ID的创建、支付、完成消息都发送到同一个Queue,然后使用MessageListenerOrderly单线程顺序消费这个Queue。
关键点是:发送时使用相同的分区键选择Queue,消费时使用顺序监听器。
需要注意的是,消费失败不能重试,否则会阻塞后续消息,所以要保证消费逻辑的幂等性和可靠性。"
7. 消息堆积处理 ⭐⭐⭐⭐⭐
堆积原因
80%的堆积来自消费端:
├── 消费速度慢(业务逻辑耗时)
├── 消费者宕机
├── 消费线程数不足
└── 消费异常频繁重试20%来自生产端:
└── 流量突增
应急处理
// 1. 增加消费者数量(最快)
原来: 1个消费者实例
扩容: 10个消费者实例(同一个group)
效果: 消费速度提升10倍 ✅// 2. 增加消费线程
consumer.setConsumeThreadMin(20); // 最小20个
consumer.setConsumeThreadMax(64); // 最大64个// 3. 批量消费
consumer.setConsumeMessageBatchMaxSize(32); // 每次拉32条// 4. 优化消费逻辑
// - 并行处理
// - 批量操作数据库
// - 异步化外部调用
// - 使用缓存// 5. 紧急方案:临时转移
// 消费堆积消息并转发到新Topic,不做业务处理
// 修复bug后再从新Topic消费
核心要点
"消息堆积主要是消费端慢导致的,我们的处理方案分应急和预防两方面。
应急处理: 首先增加消费者实例数量,这是最快的方法,比如从1个扩容到10个,消费速度立即提升10倍。其次调整消费线程数,从默认20个增加到64个,同时调大批量消费数量。如果还不够,需要优化消费逻辑,比如数据库批量操作、外部调用异步化、增加缓存等。极端情况下,可以临时启动一个快速消费者,只负责把消息转移到新Topic,不做业务处理,等修复bug后再重新消费。
预防措施: 首先要做好监控告警,堆积超过阈值立即通知。其次合理设置Topic的Queue数量,一般4-8个,高并发场景16-32个。还要做好流量控制,生产端和消费端都要限流。最重要的是提前做性能压测,确认消费能力能否支撑生产流量。"
8. 事务消息 ⭐⭐⭐⭐⭐
分布式事务解决方案
场景:下单扣库存,保证本地事务和消息发送的一致性传统方案问题:
try {orderDao.insert(order); // 本地事务producer.send(msg); // 发送消息db.commit();
} catch (Exception e) {db.rollback();
}问题:
- commit后发送消息失败 → 订单入库,库存未扣
- 发送消息后commit失败 → 消息已发,订单未入库
RocketMQ事务消息流程
阶段1:发送Half消息(对消费者不可见)↓
阶段2:执行本地事务├─ 成功 → commit消息(消费者可见)└─ 失败 → rollback消息(删除)↓
阶段3:Broker回查本地事务状态(如果超时)└─ 查询本地事务结果 → commit/rollback
核心代码
// 1. 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
producer.setNamesrvAddr("localhost:9876");// 2. 设置事务监听器
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 订单入库orderDao.insert((Order) arg);// 返回:提交消息return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {// 返回:回滚消息return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// Broker回查本地事务状态String orderId = msg.getKeys();Order order = orderDao.queryByOrderId(orderId);if (order != null) {return LocalTransactionState.COMMIT_MESSAGE; // 已入库,提交} else {return LocalTransactionState.ROLLBACK_MESSAGE; // 未入库,回滚}}
});producer.start();// 3. 发送事务消息
Order order = new Order("ORDER123", 100.0);
Message msg = new Message("OrderTopic", order.toJson().getBytes());
msg.setKeys(order.getOrderId()); // 设置Key,用于回查TransactionSendResult result = producer.sendMessageInTransaction(msg, order);
核心要点
"RocketMQ的事务消息用于解决分布式事务中的最终一致性问题。
工作流程分三个阶段:
第一阶段,发送Half消息到Broker,这个消息对消费者不可见。
第二阶段,执行本地事务,比如订单入库。如果成功,返回COMMIT,Broker会把消息改为可见状态,消费者就能消费了;如果失败,返回ROLLBACK,Broker会删除这条消息。
第三阶段,如果本地事务执行超时或未返回结果,Broker会定时回查本地事务状态,通过checkLocalTransaction方法查询本地事务是否成功,然后决定提交还是回滚消息。
这种设计保证了本地事务和消息发送的最终一致性,即使网络异常也能通过回查机制保证。
我们项目中用事务消息实现订单和库存的解耦,订单服务只负责下单,库存扣减通过消息异步完成,保证了数据一致性。"
9. 延迟消息 ⭐⭐⭐⭐
延迟等级
// RocketMQ支持18个固定的延迟等级
延迟等级 延迟时间
1 1秒
2 5秒
3 10秒
4 30秒
5 1分钟
6 2分钟
7 3分钟
8 4分钟
9 5分钟
10 6分钟
11 7分钟
12 8分钟
13 9分钟
14 10分钟
15 20分钟
16 30分钟
17 1小时
18 2小时
使用场景
// 场景1:订单超时取消(30分钟)
Message msg = new Message("OrderTopic", "订单超时检查".getBytes());
msg.setDelayTimeLevel(16); // 30分钟后消费
producer.send(msg);// 场景2:定时任务(1小时后执行)
Message msg = new Message("TaskTopic", "定时任务".getBytes());
msg.setDelayTimeLevel(17); // 1小时
producer.send(msg);// 场景3:重试延迟(5秒后重试)
Message msg = new Message("RetryTopic", "重试任务".getBytes());
msg.setDelayTimeLevel(2); // 5秒
producer.send(msg);
实现原理
发送延迟消息 → Broker接收↓
修改Topic为 SCHEDULE_TOPIC_XXXX↓
根据延迟等级存储到对应的队列↓
定时任务扫描到期消息↓
恢复原Topic并投递↓
消费者消费
核心要点
"RocketMQ的延迟消息支持18个固定的延迟等级,从1秒到2小时,不支持任意时间延迟。
使用很简单,发送消息时设置delayTimeLevel即可。常见应用场景是订单超时取消,比如用户下单后30分钟不支付就取消,我们会发送一个延迟30分钟的消息,30分钟后消费者收到消息,检查订单状态,如果未支付就取消订单。
实现原理是Broker收到延迟消息后,会先存储到SCHEDULE_TOPIC,根据延迟等级放入不同的队列,然后有定时任务不断扫描,到期后恢复原Topic并投递给消费者。
需要注意的是,只支持固定的18个等级,如果需要任意时间延迟,要么选择最接近的等级,要么使用定时任务或其他方案。"
10. 高可用机制 ⭐⭐⭐⭐⭐
主从模式
传统主从架构:
┌─────────┐ 同步/异步复制 ┌─────────┐
│Master │ ────────────> │Slave │
│读写 │ │只读 │
└─────────┘ └─────────┘问题:Master宕机,Slave不能自动切换为Master
Dledger模式(推荐)
Dledger高可用架构(基于Raft协议):
┌─────────┐
│Broker-1 │ ─┐
│(Leader) │ │
└─────────┘ │ Raft选举├─ 自动故障转移
┌─────────┐ │
│Broker-2 │ ─┤
│(Follower)│ │
└─────────┘ ││
┌─────────┐ │
│Broker-3 │ ─┘
│(Follower)│
└─────────┘优点:
✅ Leader宕机自动选举新Leader
✅ 数据强一致性(多数派提交)
✅ 无需手动切换
NameServer高可用
NameServer集群(无状态):
┌─────────┐ ┌─────────┐ ┌─────────┐
│NameSrv1 │ │NameSrv2 │ │NameSrv3 │
└─────────┘ └─────────┘ └─────────┘↑ ↑ ↑└────────────┴────────────┘各节点独立特点:
✅ 节点之间不通信
✅ 每个节点都有完整路由信息
✅ 任意节点宕机不影响使用
✅ Producer/Consumer连接多个NameServer
核心要点
"RocketMQ的高可用主要从三个层面保证:
NameServer层面:采用无状态集群,每个节点独立,互不通信,都保存完整的路由信息。Producer和Consumer会连接多个NameServer,任意节点宕机不影响使用。
Broker层面:传统的主从模式下,Master负责读写,Slave负责备份和读。但Master宕机后需要手动切换。现在推荐使用Dledger模式,基于Raft协议实现自动故障转移,当Leader宕机后会自动选举新Leader,实现真正的高可用。
消息层面:通过主从同步复制保证数据不丢失,消费端通过重试机制保证消息最终被消费成功。
Dledger模式要求至少3个节点,多数派存活即可提供服务,比如3节点允许1个宕机,5节点允许2个宕机。"
面试核心总结
必问问题(优先级)
| 优先级 | 问题 | 核心要点 |
|---|---|---|
| ⭐⭐⭐⭐⭐ | RocketMQ架构 | 4个组件、职责、通信方式 |
| ⭐⭐⭐⭐⭐ | 消息可靠性 | 3个维度、刷盘、主从复制、重试 |
| ⭐⭐⭐⭐⭐ | 消息存储 | CommitLog、ConsumeQueue、顺序写 |
| ⭐⭐⭐⭐⭐ | 消息堆积 | 原因、应急方案、预防措施 |
| ⭐⭐⭐⭐⭐ | 事务消息 | Half消息、本地事务、回查机制 |
| ⭐⭐⭐⭐ | 消息顺序 | 分区顺序、MessageQueueSelector |
| ⭐⭐⭐⭐ | 消费模式 | 集群消费、广播消费 |
| ⭐⭐⭐⭐ | 高可用 | Dledger、主从切换 |
| ⭐⭐⭐ | 延迟消息 | 18个等级、应用场景 |
常见面试问题及答案索引
Q1: RocketMQ和Kafka的区别?
核心差异:
- 定位不同:RocketMQ面向业务消息,Kafka面向日志流处理
- 性能:Kafka百万级TPS,RocketMQ十万级TPS
- 功能:RocketMQ支持事务消息、延迟消息、顺序消息,Kafka不支持
- 可靠性:RocketMQ同步刷盘可靠性更高,Kafka异步批量写入性能更高
- 场景:业务系统用RocketMQ,大数据日志用Kafka
Q2: 消息丢失怎么办?
三个维度保证(详见第5节):
生产端:
- 同步发送等待响应
- 发送失败重试(默认2次)
- 记录失败日志后续补偿
Broker端:
- 同步刷盘(消息写入磁盘才返回)
- 同步复制(数据同步到Slave才返回)
- Dledger多副本机制
消费端:
- 消费失败自动重试(最多16次)
- 重试失败进入死信队列
- 业务幂等防止重复
Q3: 消息重复消费怎么处理?
产生原因:
- 网络抖动导致重复发送
- 消费失败后重试
- Broker故障恢复
解决方案:
// 1. 消息ID去重(推荐)
String msgId = msg.getMsgId();
if (redis.exists("msg:" + msgId)) {return; // 重复消息,跳过
}
processMessage(msg);
redis.setex("msg:" + msgId, 86400, "1");// 2. 业务幂等
// 数据库唯一约束、状态机控制、版本号机制// 3. 去重表
// 保存已处理的消息ID到数据库
Q4: 消息堆积如何解决?
应急处理(详见第7节):
- 扩容消费者(最快):1个扩到10个,速度提升10倍
- 增加线程数:调大消费线程池大小
- 批量消费:调大每次拉取的消息数量
- 优化逻辑:数据库批量操作、异步化、加缓存
- 临时转移:快速消费转发到新Topic,修复后重新消费
预防措施:
- 监控告警(堆积>1万立即通知)
- 性能压测(提前验证消费能力)
- 流量控制(生产端限流)
- 合理设置Queue数量(4-16个)
Q5: 如何保证消息顺序?
分区顺序方案(详见第6节):
发送端:
// 相同业务ID的消息发送到同一队列
producer.send(msg, new MessageQueueSelector() {public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long orderId = (Long) arg;int index = (int) (orderId % mqs.size());return mqs.get(index);}
}, orderId); // 订单ID作为分区键
消费端:
// 使用顺序监听器,单线程顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ...) {// 顺序处理return ConsumeOrderlyStatus.SUCCESS;}
});
关键点:
- 相同业务ID路由到同一队列
- 队列内单线程顺序消费
- 消费失败不能重试(会阻塞)
Q6: 事务消息的实现原理?
核心流程(详见第8节):
阶段1:发送Half消息
Producer → Broker:发送Half消息(对Consumer不可见)
阶段2:执行本地事务
executeLocalTransaction() {try {orderDao.insert(order); // 执行本地事务return COMMIT_MESSAGE; // 成功:提交消息} catch (Exception e) {return ROLLBACK_MESSAGE; // 失败:回滚消息}
}
阶段3:Broker回查(如果超时)
checkLocalTransaction() {Order order = orderDao.query(orderId);if (order != null) {return COMMIT_MESSAGE; // 已入库,提交} else {return ROLLBACK_MESSAGE; // 未入库,回滚}
}
解决问题: 保证本地事务和消息发送的最终一致性
Q7: RocketMQ的存储结构?
核心设计(详见第4节):
CommitLog:
- 所有Topic的消息混合顺序写入
- 文件固定1GB大小
- 顺序写性能极高ConsumeQueue:
- 按Topic和QueueId分开存储
- 只存储offset、size、tagCode(轻量)
- 消费时通过索引快速定位Index:
- 按Key或时间范围查询
- Hash索引结构
优势:
- 顺序写性能接近内存随机写
- 避免文件碎片
- 消费效率高
Q8: 如何保证高可用?
Dledger方案(详见第10节):
架构:
3个Broker节点,基于Raft协议
├─ Leader:处理读写请求
├─ Follower:同步数据,参与选举
└─ Follower:同步数据,参与选举特点:
✅ Leader宕机自动选举新Leader
✅ 多数派存活即可提供服务(3节点允许1个宕机)
✅ 数据强一致性
✅ 无需手动切换
NameServer高可用:
无状态集群,节点独立
Producer/Consumer连接多个NameServer
任意节点宕机不影响使用
消息可靠:
- 同步刷盘:数据写入磁盘
- 同步复制:数据同步到多数派
- 消费重试:失败自动重试16次
快速记忆要点
架构设计: NameServer作为轻量级路由中心,Broker负责消息存储,Producer和Consumer通过NameServer获取路由信息后直连Broker通信,四大组件协同工作
消息发送: 支持同步、异步、单向三种方式,同步适合重要消息,异步适合高并发,单向适合日志采集
消费模式: 集群消费实现负载均衡,广播消费实现全量通知,顺序消息通过分区路由保证
存储机制: CommitLog顺序写保证高性能,ConsumeQueue作为轻量索引加速消费,所有Topic混合存储避免碎片
可靠保证: 生产端确认、Broker刷盘复制、消费端重试三重保障,同步刷盘加同步复制实现强一致
堆积处理: 水平扩容消费者最快见效,增加消费线程提升并发,优化消费逻辑治本,监控告警预防为主
事务消息: Half消息对消费者不可见,本地事务执行成功后提交消息,超时则回查本地事务状态决定提交或回滚
高可用: Dledger基于Raft协议实现自动故障转移,NameServer无状态集群任意节点可用
