Apache RocketMQ中 Normal Message(普通消息)的说明
这段内容是关于 Apache RocketMQ 中 Normal Message(普通消息) 的说明。我们来用通俗、清晰的方式,帮你全面理解“普通消息”是什么、它适用于哪些场景、它的生命周期以及使用限制。
一、什么是 Normal Message(普通消息)?
定义:
普通消息是 RocketMQ 中最基础、最常用的消息类型,没有特殊功能(如顺序、延迟、事务等),只保证“可靠地从生产者传送到消费者”。
📌 类比理解:
- 普通消息就像“平邮信件”:
- 能送到 → ✅ 可靠传输
- 什么时候到 → 不确定(无延迟控制)
- 是否按顺序到 → 不保证(除非特别配置)
- 内容是否成功处理 → 需要收件人回执(ACK机制)
✅ 它和以下“特殊消息”相对:
特殊消息类型 | 功能 |
---|---|
FIFO 消息 | 严格顺序消费 |
Delay 消息 | 延迟一定时间后才被消费 |
Transaction 消息 | 支持“半消息 + 事务提交”,用于分布式事务 |
所以:如果你不需要顺序、延迟或事务功能,就用普通消息!
二、适用场景(Scenarios)
普通消息适用于大多数“异步解耦、数据传输、事件驱动”的场景。
✅ 场景 1:微服务异步解耦(Asynchronous Decoupling)
订单系统 → 发送“订单创建”消息↓RocketMQ↓
库存系统 ← 订阅并扣减库存
积分系统 ← 订阅并增加积分
通知系统 ← 订阅并发送短信
📌 特点:
- 各系统之间完全解耦,互不影响。
- 消息之间独立,不需要按顺序处理。
- 只要最终能收到并处理即可,对实时性要求不高。
✔️ 典型应用:电商下单、用户注册、支付结果通知等。
✅ 场景 2:数据集成与传输(Data Integration)
前端 App → 产生日志 → 发送到 RocketMQ↓数据分析平台 ← 消费日志做分析日志存储系统 ← 消费日志存入数据库
📌 特点:
- RocketMQ 只负责“搬运”数据,不做任何处理。
- 每条消息就是一条原始数据(如日志、埋点、监控指标)。
- 强调高吞吐、高可用、不丢数据。
✔️ 典型应用:日志收集、数据同步、流式计算数据源。
三、普通消息的工作机制(Working Mechanism)
1. 普通消息的生命周期(Lifecycle)
一条普通消息在 RocketMQ 中会经历以下几个阶段:
阶段 | 说明 |
---|---|
1. Initialized(初始化) | 生产者创建消息,设置 Topic、Tag、Body、Keys 等属性,准备发送。 |
2. Ready(就绪) | 消息被成功发送到 Broker,写入 CommitLog。此时消费者可以拉取该消息。 |
3. Inflight(飞行中) | 消费者拉取了消息,正在处理。Broker 会等待消费者返回“消费结果”(ACK)。 ⚠️ 如果超时未返回 ACK,Broker 会自动重试(默认最多重试 16 次)。 |
4. Acked(已确认) | 消费者处理成功,返回 CONSUME_SUCCESS ,Broker 标记该消息为“已消费”。❗注意:消息不会立即删除,只是逻辑标记。 |
5. Deleted(已删除) | 消息保存达到保留时间(默认 72 小时)或磁盘空间不足时,Broker 会从物理文件中删除最老的消息。 |
📌 关键点:
- 消息不立即删除 → 支持“消息回溯”(Re-consume),便于排查问题或补数据。
- 消费失败会重试 → 提高系统容错能力。
- Inflight 状态防止重复消费 → 同一消息在同一时间只会被一个消费者处理。
2. 消息存储与清理(Message Storage and Cleanup)
- RocketMQ 使用 CommitLog + ConsumeQueue 的结构存储消息。
- 所有消息按到达顺序写入 CommitLog(顺序写,高性能)。
- 每个 Topic 的队列维护一个 ConsumeQueue(索引),指向 CommitLog 中的消息位置。
- 消息默认保留 72 小时(可配置),过期后自动删除。
- 删除是“滚动删除”:先删最早的消息,保证磁盘空间可用。
四、使用限制(Usage Limits)
普通消息只能发送到 MessageType 为
Normal
的 Topic。
📌 解释:
- RocketMQ 中的 Topic 有不同类型,比如:
Normal
:普通消息FIFO
:顺序消息Delay
:延迟消息Transaction
:事务消息
- 你不能把普通消息发到一个声明为
FIFO
的 Topic 上,否则会报错。
✅ 正确做法:
- 创建 Topic 时指定类型为
Normal
。 - 生产者发送消息时,目标 Topic 必须是
Normal
类型。
📌 示例(控制台或命令行):
# 创建一个普通 Topic
mqadmin updateTopic -n localhost:9876 -t OrderTopic -c DefaultCluster --type Normal
然后你的生产者就可以发送普通消息了:
Message msg = new Message("OrderTopic", "CREATE", "订单内容".getBytes());
SendResult result = producer.send(msg);
五、总结:一句话理解 Normal Message
普通消息是 RocketMQ 的“基础款”消息,它不保证顺序、不支持延迟、不参与事务,但胜在简单、高效、可靠,适用于绝大多数异步通信和数据传输场景。
六、使用建议(Best Practices)
建议 | 说明 |
---|---|
✅ 优先使用普通消息 | 大多数场景不需要复杂功能,简单即稳定 |
✅ 合理设置 Tag | 用 Tag 做一级分类(如 CREATE , PAY , CANCEL ),便于过滤 |
✅ 设置 Keys(可选) | 为重要消息设置唯一 Key(如订单ID),便于通过控制台查询 |
✅ 处理消费失败 | 在 MessageListener 中捕获异常,返回 RECONSUME_LATER 触发重试 |
✅ 监控消费延迟 | 使用 mqadmin consumerProgress 查看消费堆积情况 |
✅ 避免大消息 | 单条消息建议 < 100KB,太大影响性能 |
七、常见问题(FAQ)
❓ 普通消息会丢吗?
- 正常情况下不会丢:消息写入 Broker 的 CommitLog 后才返回成功。
- 极端情况(如 Broker 突然宕机且未刷盘)可能丢失,可通过同步刷盘 + 同步复制提高可靠性。
❓ 普通消息能重复吗?
- 可能重复!特别是在消费失败重试、Consumer 重启时。
- 所以消费者必须实现幂等性处理(如用订单ID做去重)。
❓ 普通消息能按顺序消费吗?
- 默认不能。
- 如果需要顺序消费,必须:
- 使用
MessageQueueSelector
让同一类消息发到同一个 Queue。 - 消费者使用
MessageListenerOrderly
监听器。
- 使用
如果你正在开发一个基于 RocketMQ 的系统,记住:
从普通消息开始,只有当你明确需要“顺序”、“延迟”或“事务”时,才升级到更复杂的消息类型。
如果有具体问题,比如:
- 如何发送/消费普通消息?
- 如何设置 Tag 和 Keys?
- 如何处理消费失败和重试?
欢迎继续提问!