当前位置: 首页 > news >正文

RocketMQ核心知识点

文章目录

    • 1. 核心架构 ⭐⭐⭐⭐⭐
      • 四大组件
      • 核心要点
    • 2. 消息发送方式 ⭐⭐⭐⭐⭐
      • 三种发送方式
      • 核心代码
      • 核心要点
    • 3. 消息消费模式 ⭐⭐⭐⭐⭐
      • 两种消费模式
      • 集群消费(默认)
      • 广播消费
      • 核心要点
    • 4. 消息存储机制 ⭐⭐⭐⭐⭐
      • CommitLog + ConsumeQueue 架构
      • 为什么这样设计?
      • 核心要点
    • 5. 消息可靠性 ⭐⭐⭐⭐⭐
      • 三个维度保证可靠性
      • 刷盘策略
      • 主从复制策略
      • 消费失败重试机制
      • 核心要点
    • 6. 消息顺序性 ⭐⭐⭐⭐⭐
      • 为什么会乱序?
      • 顺序消息实现
      • 顺序消费
      • 核心要点
    • 7. 消息堆积处理 ⭐⭐⭐⭐⭐
      • 堆积原因
      • 应急处理
      • 核心要点
    • 8. 事务消息 ⭐⭐⭐⭐⭐
      • 分布式事务解决方案
      • RocketMQ事务消息流程
      • 核心代码
      • 核心要点
    • 9. 延迟消息 ⭐⭐⭐⭐
      • 延迟等级
      • 使用场景
      • 实现原理
      • 核心要点
    • 10. 高可用机制 ⭐⭐⭐⭐⭐
      • 主从模式
      • Dledger模式(推荐)
      • NameServer高可用
      • 核心要点
    • 面试核心总结
      • 必问问题(优先级)
      • 常见面试问题及答案索引
        • Q1: RocketMQ和Kafka的区别?
        • Q2: 消息丢失怎么办?
        • Q3: 消息重复消费怎么处理?
        • Q4: 消息堆积如何解决?
        • Q5: 如何保证消息顺序?
        • Q6: 事务消息的实现原理?
        • Q7: RocketMQ的存储结构?
        • Q8: 如何保证高可用?
    • 快速记忆要点


1. 核心架构 ⭐⭐⭐⭐⭐

四大组件

┌─────────────┐      ┌─────────────┐
│  Producer   │      │  Consumer   │
│  (生产者)    │      │  (消费者)    │
└──────┬──────┘      └──────┬──────┘│                    │├────────┬───────────┤↓        ↓           ↓
┌─────────────────────────────────┐
│      NameServer (名字服务)       │
│   - 路由注册中心                 │
│   - Broker注册/发现              │
└────────────┬────────────────────┘│┌─────┴─────┐↓           ↓
┌──────────┐ ┌──────────┐
│ Broker-M │ │ Broker-S │
│ (Master) │ │ (Slave)  │
│ 存储消息  │ │ 备份消息  │
└──────────┘ └──────────┘

四大角色:

  1. Producer:消息生产者,发送消息
  2. Consumer:消息消费者,接收并处理消息
  3. NameServer:路由中心,管理Broker信息
  4. Broker:消息存储服务器,存储和转发消息

核心要点

"RocketMQ采用分布式架构,由四个核心组件组成:

NameServer 是轻量级的路由注册中心,类似Zookeeper但更轻量,负责Broker的注册和发现。各个节点之间不通信,每个节点都有完整的路由信息。

Broker 是消息存储服务器,负责消息的存储、投递和查询。支持主从模式,Master负责读写,Slave负责备份和读。

Producer 通过NameServer获取Broker地址,然后直接与Broker通信发送消息。

Consumer 同样通过NameServer获取路由信息,从Broker拉取消息进行消费。

这种架构的优势是NameServer无状态、易扩展,Broker可以水平扩展,整体高可用。"


2. 消息发送方式 ⭐⭐⭐⭐⭐

三种发送方式

方式可靠性性能使用场景
同步发送⭐⭐⭐⭐⭐⭐⭐⭐订单、支付、重要通知
异步发送⭐⭐⭐⭐⭐⭐⭐⭐⭐用户注册、短信通知
单向发送⭐⭐⭐⭐⭐日志采集、埋点上报

核心代码

// 1. 同步发送(阻塞等待结果)
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {// 发送成功
}// 2. 异步发送(立即返回,回调通知)
producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult result) { }@Overridepublic void onException(Throwable e) { }
});// 3. 单向发送(无返回值,不关心结果)
producer.sendOneway(msg);

核心要点

"RocketMQ提供三种消息发送方式:

同步发送最可靠,发送后会阻塞等待Broker响应,确认消息发送成功才返回。适用于订单、支付等不能丢失的重要消息,但性能相对较低。

异步发送性能最高,发送后立即返回,通过回调接口异步获取发送结果。适用于用户注册、短信通知等场景,既保证可靠性又不阻塞主流程。

单向发送性能最好,发送后不等待任何响应,也不关心发送结果。适用于日志采集、埋点上报等允许少量丢失的场景。

我们在项目中通常用异步发送,平衡了可靠性和性能。"


3. 消息消费模式 ⭐⭐⭐⭐⭐

两种消费模式

模式特点使用场景
集群消费一条消息只被消费一次订单处理、支付通知
广播消费每个消费者都消费全量消息配置更新、缓存刷新

集群消费(默认)

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 集群消费(默认模式)
consumer.setMessageModel(MessageModel.CLUSTERING);// 工作原理:
// Consumer1: 消费消息1、3、5
// Consumer2: 消费消息2、4、6
// → 负载均衡,每条消息只被消费一次

特点:

  • ✅ 同一Group的多个Consumer平均分摊消息
  • ✅ 实现负载均衡
  • ✅ 提高消费能力
  • ❌ 一条消息只能被一个Consumer消费

广播消费

consumer.setMessageModel(MessageModel.BROADCASTING);// 工作原理:
// Consumer1: 消费消息1、2、3、4、5、6(全部)
// Consumer2: 消费消息1、2、3、4、5、6(全部)
// → 每个Consumer都收到全量消息

特点:

  • ✅ 每个Consumer都消费全量消息
  • ✅ 适合配置更新、缓存刷新
  • ❌ 不会重试(失败就失败了)
  • ❌ 不保存消费进度

核心要点

"RocketMQ有两种消费模式:

集群消费是默认模式,同一个Consumer Group内的多个消费者会负载均衡地消费消息,每条消息只会被其中一个消费者消费。这种模式适合订单处理、支付通知等业务场景,可以通过增加消费者来提高消费能力。

广播消费模式下,每个消费者都会收到Topic的全量消息。比如有10条消息,3个消费者,那么每个消费者都会收到这10条消息。这种模式适合配置更新、缓存刷新等需要所有节点都执行的场景。

需要注意的是,广播消费不会重试,也不保存消费进度,所以一般用于允许丢失的场景。"


4. 消息存储机制 ⭐⭐⭐⭐⭐

CommitLog + ConsumeQueue 架构

消息写入流程:
Producer → Broker → CommitLog(顺序写)↓异步构建 ConsumeQueue(索引)↓Consumer 通过 ConsumeQueue 查找↓从 CommitLog 读取完整消息存储结构:
/store/
├── commitlog/           # 消息内容(所有Topic混合存储)
│   ├── 00000000000000000000
│   ├── 00000000001073741824
│   └── ...
├── consumequeue/        # 消息索引(按Topic/Queue分开)
│   ├── TopicA/
│   │   ├── 0/           # Queue 0的索引
│   │   ├── 1/           # Queue 1的索引
│   │   └── ...
│   └── TopicB/
│       └── ...
└── index/               # Hash索引(按Key查询)

为什么这样设计?

优点:

  • CommitLog顺序写:性能极高(磁盘顺序写接近内存随机写)
  • 所有Topic共享CommitLog:避免文件碎片
  • ConsumeQueue轻量:只存储offset、size、tagCode
  • 消费快速:通过索引快速定位

核心要点

"RocketMQ采用CommitLog + ConsumeQueue的存储架构。

CommitLog 是消息的实际存储文件,所有Topic的消息都混合顺序写入CommitLog,每个文件固定1GB。顺序写的性能非常高,可以达到磁盘的极限性能。

ConsumeQueue 是消息的逻辑队列,按Topic和QueueId分开存储,每条记录只存储消息的offset、大小和tag哈希值,非常轻量。Consumer消费时先读ConsumeQueue找到消息在CommitLog中的位置,再去CommitLog读取完整消息。

这种设计的好处是:写入性能极高(顺序写),消费效率也很高(通过索引定位),还避免了文件碎片问题。

另外还有Index索引文件,用于按Key或时间范围查询消息。"


5. 消息可靠性 ⭐⭐⭐⭐⭐

三个维度保证可靠性

1. 生产端可靠性├─ 同步发送(确认发送成功)├─ 异步发送(回调确认)└─ 失败重试(默认2次)2. Broker可靠性├─ 消息持久化(刷盘)├─ 主从复制(同步/异步)└─ 高可用(Dledger)3. 消费端可靠性├─ 消费确认(ACK)├─ 消费失败重试(默认16次)└─ 死信队列(Dead Letter Queue)

刷盘策略

// 同步刷盘(可靠,慢)
flushDiskType = SYNC_FLUSH
// 流程:消息写入PageCache → 等待刷盘完成 → 返回成功
// 优点:消息不会丢失
// 缺点:性能低(每次等待磁盘IO)// 异步刷盘(快,可能丢失)
flushDiskType = ASYNC_FLUSH
// 流程:消息写入PageCache → 立即返回成功 → 后台异步刷盘
// 优点:性能高
// 缺点:Broker宕机可能丢失少量消息

主从复制策略

// 同步复制(可靠,慢)
brokerRole = SYNC_MASTER
// 流程:消息写入Master → 同步到Slave → 返回成功
// 优点:数据不丢失
// 缺点:性能低(等待网络同步)// 异步复制(快,可能丢失)
brokerRole = ASYNC_MASTER
// 流程:消息写入Master → 立即返回 → 异步同步到Slave
// 优点:性能高
// 缺点:Master宕机可能丢失未同步的消息

消费失败重试机制

// 消费失败返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试策略:
重试次数    延迟时间
11023031分钟
42分钟
53分钟
...
162小时// 16次后 → 进入死信队列(%DLQ%ConsumerGroup)

核心要点

"RocketMQ从三个维度保证消息可靠性:

生产端:提供同步发送和异步发送,同步发送会等待Broker确认,异步发送通过回调获取结果。发送失败会自动重试,默认重试2次。

Broker端:通过刷盘和主从复制保证可靠性。刷盘有同步和异步两种,同步刷盘等待数据写入磁盘才返回,不会丢失但性能低;异步刷盘立即返回,性能高但宕机可能丢失。主从复制也分同步和异步,同步复制等待数据同步到Slave,异步复制则立即返回。

消费端:消费失败会自动重试,默认最多重试16次,延迟时间逐渐增加,从10秒到2小时。16次后仍失败的消息会进入死信队列,需要人工介入处理。

生产环境通常使用:异步刷盘+异步复制,平衡性能和可靠性。金融场景会用同步刷盘+同步复制,确保消息不丢失。"


6. 消息顺序性 ⭐⭐⭐⭐⭐

为什么会乱序?

原因1:多个Queue并行消费
Queue0: 消息1、4、7
Queue1: 消息2、5、8  → 消费顺序可能是:1、2、5、4、7、8(乱序)
Queue2: 消息3、6、9原因2:消费重试
消息1 → 消费失败 → 重试(延迟10秒)
消息2 → 消费成功
消息3 → 消费成功
10秒后消息1重试  → 顺序变成:2、3、1(乱序)

顺序消息实现

/*** 发送顺序消息:确保相同订单的消息进入同一队列*/
public class OrderProducer {public void sendOrderMessage(Order order) throws Exception {Message msg = new Message("OrderTopic", order.toJson().getBytes());// 使用MessageQueueSelector选择队列SendResult result = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long orderId = (Long) arg;// 相同订单ID选择同一队列int index = (int) (orderId % mqs.size());return mqs.get(index);}}, order.getOrderId());  // 订单ID作为参数System.out.println("订单消息发送到队列: " + result.getMessageQueue().getQueueId());}
}// 使用
Order order = new Order(12345L, "创建订单");
sendOrderMessage(order);  // 订单12345 → Queue 1order = new Order(12345L, "支付订单");
sendOrderMessage(order);  // 订单12345 → Queue 1(同一队列)order = new Order(12345L, "完成订单");
sendOrderMessage(order);  // 订单12345 → Queue 1(同一队列)
// ✓ 保证顺序:创建 → 支付 → 完成

顺序消费

/*** 顺序消费:单线程消费,保证顺序*/
consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {// 单线程顺序处理for (MessageExt msg : msgs) {System.out.println("顺序消费: " + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}
});

核心要点

"RocketMQ的顺序消息分为全局顺序和分区顺序。

全局顺序需要Topic只有一个Queue,所有消息都进入这个Queue,单线程消费。这种方式可靠但性能差,一般不推荐。

分区顺序是实际常用的方式,通过MessageQueueSelector将相同业务ID的消息路由到同一个Queue。比如订单消息,将同一个订单ID的创建、支付、完成消息都发送到同一个Queue,然后使用MessageListenerOrderly单线程顺序消费这个Queue。

关键点是:发送时使用相同的分区键选择Queue,消费时使用顺序监听器。

需要注意的是,消费失败不能重试,否则会阻塞后续消息,所以要保证消费逻辑的幂等性和可靠性。"


7. 消息堆积处理 ⭐⭐⭐⭐⭐

堆积原因

80%的堆积来自消费端:
├── 消费速度慢(业务逻辑耗时)
├── 消费者宕机
├── 消费线程数不足
└── 消费异常频繁重试20%来自生产端:
└── 流量突增

应急处理

// 1. 增加消费者数量(最快)
原来: 1个消费者实例
扩容: 10个消费者实例(同一个group)
效果: 消费速度提升10倍 ✅// 2. 增加消费线程
consumer.setConsumeThreadMin(20);   // 最小20个
consumer.setConsumeThreadMax(64);   // 最大64个// 3. 批量消费
consumer.setConsumeMessageBatchMaxSize(32);  // 每次拉32条// 4. 优化消费逻辑
// - 并行处理
// - 批量操作数据库
// - 异步化外部调用
// - 使用缓存// 5. 紧急方案:临时转移
// 消费堆积消息并转发到新Topic,不做业务处理
// 修复bug后再从新Topic消费

核心要点

"消息堆积主要是消费端慢导致的,我们的处理方案分应急和预防两方面。

应急处理: 首先增加消费者实例数量,这是最快的方法,比如从1个扩容到10个,消费速度立即提升10倍。其次调整消费线程数,从默认20个增加到64个,同时调大批量消费数量。如果还不够,需要优化消费逻辑,比如数据库批量操作、外部调用异步化、增加缓存等。极端情况下,可以临时启动一个快速消费者,只负责把消息转移到新Topic,不做业务处理,等修复bug后再重新消费。

预防措施: 首先要做好监控告警,堆积超过阈值立即通知。其次合理设置Topic的Queue数量,一般4-8个,高并发场景16-32个。还要做好流量控制,生产端和消费端都要限流。最重要的是提前做性能压测,确认消费能力能否支撑生产流量。"


8. 事务消息 ⭐⭐⭐⭐⭐

分布式事务解决方案

场景:下单扣库存,保证本地事务和消息发送的一致性传统方案问题:
try {orderDao.insert(order);     // 本地事务producer.send(msg);         // 发送消息db.commit();
} catch (Exception e) {db.rollback();
}问题:
- commit后发送消息失败 → 订单入库,库存未扣
- 发送消息后commit失败 → 消息已发,订单未入库

RocketMQ事务消息流程

阶段1:发送Half消息(对消费者不可见)↓
阶段2:执行本地事务├─ 成功 → commit消息(消费者可见)└─ 失败 → rollback消息(删除)↓
阶段3:Broker回查本地事务状态(如果超时)└─ 查询本地事务结果 → commit/rollback

核心代码

// 1. 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
producer.setNamesrvAddr("localhost:9876");// 2. 设置事务监听器
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 订单入库orderDao.insert((Order) arg);// 返回:提交消息return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {// 返回:回滚消息return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// Broker回查本地事务状态String orderId = msg.getKeys();Order order = orderDao.queryByOrderId(orderId);if (order != null) {return LocalTransactionState.COMMIT_MESSAGE;  // 已入库,提交} else {return LocalTransactionState.ROLLBACK_MESSAGE; // 未入库,回滚}}
});producer.start();// 3. 发送事务消息
Order order = new Order("ORDER123", 100.0);
Message msg = new Message("OrderTopic", order.toJson().getBytes());
msg.setKeys(order.getOrderId());  // 设置Key,用于回查TransactionSendResult result = producer.sendMessageInTransaction(msg, order);

核心要点

"RocketMQ的事务消息用于解决分布式事务中的最终一致性问题。

工作流程分三个阶段:

第一阶段,发送Half消息到Broker,这个消息对消费者不可见。

第二阶段,执行本地事务,比如订单入库。如果成功,返回COMMIT,Broker会把消息改为可见状态,消费者就能消费了;如果失败,返回ROLLBACK,Broker会删除这条消息。

第三阶段,如果本地事务执行超时或未返回结果,Broker会定时回查本地事务状态,通过checkLocalTransaction方法查询本地事务是否成功,然后决定提交还是回滚消息。

这种设计保证了本地事务和消息发送的最终一致性,即使网络异常也能通过回查机制保证。

我们项目中用事务消息实现订单和库存的解耦,订单服务只负责下单,库存扣减通过消息异步完成,保证了数据一致性。"


9. 延迟消息 ⭐⭐⭐⭐

延迟等级

// RocketMQ支持18个固定的延迟等级
延迟等级    延迟时间
1          12          53          104          305          1分钟
6          2分钟
7          3分钟
8          4分钟
9          5分钟
10         6分钟
11         7分钟
12         8分钟
13         9分钟
14         10分钟
15         20分钟
16         30分钟
17         1小时
18         2小时

使用场景

// 场景1:订单超时取消(30分钟)
Message msg = new Message("OrderTopic", "订单超时检查".getBytes());
msg.setDelayTimeLevel(16);  // 30分钟后消费
producer.send(msg);// 场景2:定时任务(1小时后执行)
Message msg = new Message("TaskTopic", "定时任务".getBytes());
msg.setDelayTimeLevel(17);  // 1小时
producer.send(msg);// 场景3:重试延迟(5秒后重试)
Message msg = new Message("RetryTopic", "重试任务".getBytes());
msg.setDelayTimeLevel(2);  // 5秒
producer.send(msg);

实现原理

发送延迟消息 → Broker接收↓
修改Topic为 SCHEDULE_TOPIC_XXXX↓
根据延迟等级存储到对应的队列↓
定时任务扫描到期消息↓
恢复原Topic并投递↓
消费者消费

核心要点

"RocketMQ的延迟消息支持18个固定的延迟等级,从1秒到2小时,不支持任意时间延迟。

使用很简单,发送消息时设置delayTimeLevel即可。常见应用场景是订单超时取消,比如用户下单后30分钟不支付就取消,我们会发送一个延迟30分钟的消息,30分钟后消费者收到消息,检查订单状态,如果未支付就取消订单。

实现原理是Broker收到延迟消息后,会先存储到SCHEDULE_TOPIC,根据延迟等级放入不同的队列,然后有定时任务不断扫描,到期后恢复原Topic并投递给消费者。

需要注意的是,只支持固定的18个等级,如果需要任意时间延迟,要么选择最接近的等级,要么使用定时任务或其他方案。"


10. 高可用机制 ⭐⭐⭐⭐⭐

主从模式

传统主从架构:
┌─────────┐  同步/异步复制  ┌─────────┐
│Master   │ ────────────>  │Slave    │
│读写     │                 │只读     │
└─────────┘                 └─────────┘问题:Master宕机,Slave不能自动切换为Master

Dledger模式(推荐)

Dledger高可用架构(基于Raft协议):
┌─────────┐
│Broker-1 │ ─┐
│(Leader) │  │
└─────────┘  │  Raft选举├─ 自动故障转移
┌─────────┐  │
│Broker-2 │ ─┤
│(Follower)│  │
└─────────┘  ││
┌─────────┐  │
│Broker-3 │ ─┘
│(Follower)│
└─────────┘优点:
✅ Leader宕机自动选举新Leader
✅ 数据强一致性(多数派提交)
✅ 无需手动切换

NameServer高可用

NameServer集群(无状态):
┌─────────┐  ┌─────────┐  ┌─────────┐
│NameSrv1 │  │NameSrv2 │  │NameSrv3 │
└─────────┘  └─────────┘  └─────────┘↑            ↑            ↑└────────────┴────────────┘各节点独立特点:
✅ 节点之间不通信
✅ 每个节点都有完整路由信息
✅ 任意节点宕机不影响使用
✅ Producer/Consumer连接多个NameServer

核心要点

"RocketMQ的高可用主要从三个层面保证:

NameServer层面:采用无状态集群,每个节点独立,互不通信,都保存完整的路由信息。Producer和Consumer会连接多个NameServer,任意节点宕机不影响使用。

Broker层面:传统的主从模式下,Master负责读写,Slave负责备份和读。但Master宕机后需要手动切换。现在推荐使用Dledger模式,基于Raft协议实现自动故障转移,当Leader宕机后会自动选举新Leader,实现真正的高可用。

消息层面:通过主从同步复制保证数据不丢失,消费端通过重试机制保证消息最终被消费成功。

Dledger模式要求至少3个节点,多数派存活即可提供服务,比如3节点允许1个宕机,5节点允许2个宕机。"


面试核心总结

必问问题(优先级)

优先级问题核心要点
⭐⭐⭐⭐⭐RocketMQ架构4个组件、职责、通信方式
⭐⭐⭐⭐⭐消息可靠性3个维度、刷盘、主从复制、重试
⭐⭐⭐⭐⭐消息存储CommitLog、ConsumeQueue、顺序写
⭐⭐⭐⭐⭐消息堆积原因、应急方案、预防措施
⭐⭐⭐⭐⭐事务消息Half消息、本地事务、回查机制
⭐⭐⭐⭐消息顺序分区顺序、MessageQueueSelector
⭐⭐⭐⭐消费模式集群消费、广播消费
⭐⭐⭐⭐高可用Dledger、主从切换
⭐⭐⭐延迟消息18个等级、应用场景

常见面试问题及答案索引

Q1: RocketMQ和Kafka的区别?

核心差异:

  • 定位不同:RocketMQ面向业务消息,Kafka面向日志流处理
  • 性能:Kafka百万级TPS,RocketMQ十万级TPS
  • 功能:RocketMQ支持事务消息、延迟消息、顺序消息,Kafka不支持
  • 可靠性:RocketMQ同步刷盘可靠性更高,Kafka异步批量写入性能更高
  • 场景:业务系统用RocketMQ,大数据日志用Kafka

Q2: 消息丢失怎么办?

三个维度保证(详见第5节):

生产端:

  • 同步发送等待响应
  • 发送失败重试(默认2次)
  • 记录失败日志后续补偿

Broker端:

  • 同步刷盘(消息写入磁盘才返回)
  • 同步复制(数据同步到Slave才返回)
  • Dledger多副本机制

消费端:

  • 消费失败自动重试(最多16次)
  • 重试失败进入死信队列
  • 业务幂等防止重复

Q3: 消息重复消费怎么处理?

产生原因:

  • 网络抖动导致重复发送
  • 消费失败后重试
  • Broker故障恢复

解决方案:

// 1. 消息ID去重(推荐)
String msgId = msg.getMsgId();
if (redis.exists("msg:" + msgId)) {return;  // 重复消息,跳过
}
processMessage(msg);
redis.setex("msg:" + msgId, 86400, "1");// 2. 业务幂等
// 数据库唯一约束、状态机控制、版本号机制// 3. 去重表
// 保存已处理的消息ID到数据库

Q4: 消息堆积如何解决?

应急处理(详见第7节):

  1. 扩容消费者(最快):1个扩到10个,速度提升10倍
  2. 增加线程数:调大消费线程池大小
  3. 批量消费:调大每次拉取的消息数量
  4. 优化逻辑:数据库批量操作、异步化、加缓存
  5. 临时转移:快速消费转发到新Topic,修复后重新消费

预防措施:

  • 监控告警(堆积>1万立即通知)
  • 性能压测(提前验证消费能力)
  • 流量控制(生产端限流)
  • 合理设置Queue数量(4-16个)

Q5: 如何保证消息顺序?

分区顺序方案(详见第6节):

发送端:

// 相同业务ID的消息发送到同一队列
producer.send(msg, new MessageQueueSelector() {public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long orderId = (Long) arg;int index = (int) (orderId % mqs.size());return mqs.get(index);}
}, orderId);  // 订单ID作为分区键

消费端:

// 使用顺序监听器,单线程顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ...) {// 顺序处理return ConsumeOrderlyStatus.SUCCESS;}
});

关键点:

  • 相同业务ID路由到同一队列
  • 队列内单线程顺序消费
  • 消费失败不能重试(会阻塞)

Q6: 事务消息的实现原理?

核心流程(详见第8节):

阶段1:发送Half消息

Producer → Broker:发送Half消息(对Consumer不可见)

阶段2:执行本地事务

executeLocalTransaction() {try {orderDao.insert(order);  // 执行本地事务return COMMIT_MESSAGE;   // 成功:提交消息} catch (Exception e) {return ROLLBACK_MESSAGE; // 失败:回滚消息}
}

阶段3:Broker回查(如果超时)

checkLocalTransaction() {Order order = orderDao.query(orderId);if (order != null) {return COMMIT_MESSAGE;   // 已入库,提交} else {return ROLLBACK_MESSAGE; // 未入库,回滚}
}

解决问题: 保证本地事务和消息发送的最终一致性


Q7: RocketMQ的存储结构?

核心设计(详见第4节):

CommitLog:
- 所有Topic的消息混合顺序写入
- 文件固定1GB大小
- 顺序写性能极高ConsumeQueue:
- 按Topic和QueueId分开存储
- 只存储offset、size、tagCode(轻量)
- 消费时通过索引快速定位Index:
- 按Key或时间范围查询
- Hash索引结构

优势:

  • 顺序写性能接近内存随机写
  • 避免文件碎片
  • 消费效率高

Q8: 如何保证高可用?

Dledger方案(详见第10节):

架构:

3个Broker节点,基于Raft协议
├─ Leader:处理读写请求
├─ Follower:同步数据,参与选举
└─ Follower:同步数据,参与选举特点:
✅ Leader宕机自动选举新Leader
✅ 多数派存活即可提供服务(3节点允许1个宕机)
✅ 数据强一致性
✅ 无需手动切换

NameServer高可用:

无状态集群,节点独立
Producer/Consumer连接多个NameServer
任意节点宕机不影响使用

消息可靠:

  • 同步刷盘:数据写入磁盘
  • 同步复制:数据同步到多数派
  • 消费重试:失败自动重试16次

快速记忆要点

架构设计: NameServer作为轻量级路由中心,Broker负责消息存储,Producer和Consumer通过NameServer获取路由信息后直连Broker通信,四大组件协同工作

消息发送: 支持同步、异步、单向三种方式,同步适合重要消息,异步适合高并发,单向适合日志采集

消费模式: 集群消费实现负载均衡,广播消费实现全量通知,顺序消息通过分区路由保证

存储机制: CommitLog顺序写保证高性能,ConsumeQueue作为轻量索引加速消费,所有Topic混合存储避免碎片

可靠保证: 生产端确认、Broker刷盘复制、消费端重试三重保障,同步刷盘加同步复制实现强一致

堆积处理: 水平扩容消费者最快见效,增加消费线程提升并发,优化消费逻辑治本,监控告警预防为主

事务消息: Half消息对消费者不可见,本地事务执行成功后提交消息,超时则回查本地事务状态决定提交或回滚

高可用: Dledger基于Raft协议实现自动故障转移,NameServer无状态集群任意节点可用

http://www.dtcms.com/a/597793.html

相关文章:

  • 网站运营岗位职责描述网络优化分为
  • 【 前端 -- css 】浮动元素导致父容器高度塌陷如何解决
  • 用html5的视频网站重庆公司有哪些
  • Leessun Procreate素描画笔套装含纸张纹理数字插画创作资源
  • websocket(即时通讯)
  • 宁波cms建站网站建设的切片是什么
  • 在防火墙环境下进行LoadRunner性能测试的配置方法
  • 企业门户网站开发门户网站英文版建设
  • 【系统架构设计师-2025下半年真题】案例分析-参考答案及详解(回忆版)
  • 在家做私房菜的网站永州本地网站建设
  • MyBatis如何处理懒加载和预加载?
  • 计算机更换硬盘并新装系统
  • 高端营销型企业网站建设wordpress升级vip
  • 使用adb获取安卓模拟器日志
  • GFC-Chain 公链正式连接 GOF4生态体系,开启去中心化生态新篇章
  • PaddleOCR----制作数据集,模型训练,验证 QT部署(未完成)
  • leetcode 474 一和零
  • ADB点击实战-做一个自动点广告播放领金币的脚本app(下)
  • 系统运维Day06_RSYSLOG系统日志管理
  • LeetCodeHot100| 438.找到字符串中所有字符异位次、和为k 的子数组
  • 网络安全与数字化转型的价值投资
  • 免费网站建设教程厦门建站最新消息
  • 电子辐射能量场的具体过程
  • 住房和城乡规划建设局官方网站士兵突击网站怎么做
  • 文件名精灵2025 v1.0
  • 高端品牌型网站建设店面设计多少钱一个平方
  • git仓库管理
  • SSM基于vuejs的图书管理系统171wx(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • Qt 配置Webassemble环境
  • 维力安网站建设公司wordpress 建站 电子书