RocketMQ 核心知识整理:工作原理、常用命令与常见问题解决
一、RocketMQ 核心工作原理
RocketMQ 是基于 “生产者 - 消费者” 模型的分布式消息中间件,核心通过 4 个组件实现消息的收发与存储,整体流程如下:
-
核心组件分工
- Producer(生产者):负责发送消息,支持同步 / 异步 / 单向发送,可通过 “Topic” 指定消息分类,通过 “Tag” 实现消息过滤。
- Broker(消息服务器):核心存储与转发节点,一个 Broker 包含多个 “Topic”,每个 Topic 又分为多个 “Queue”(队列),通过 Queue 实现消息的负载均衡与顺序存储。
- Consumer(消费者):负责订阅并消费消息,分为 “集群模式”(同组消费者分摊消息)和 “广播模式”(同组消费者均接收全量消息)。
- NameServer(路由中心):轻量级注册中心,存储 Broker 节点信息,Producer/Consumer 通过 NameServer 获取 Broker 地址,实现动态路由。
-
消息流转核心流程
- Broker 启动后,向所有 NameServer 注册自身节点信息(IP、端口、Topic 列表等)。
- Producer 发送消息前,先向 NameServer 查询目标 Topic 对应的 Broker 地址。
- Producer 根据 Broker 地址,将消息发送到指定 Broker 的 Queue 中。
- Consumer 启动后,向 NameServer 订阅 Topic,并获取对应的 Broker 地址。
- Consumer 从 Broker 的 Queue 中拉取(默认)或接收推送消息,消费后向 Broker 发送 “消费确认”(ACK)。
二、RocketMQ 常用命令(基于 bin 目录脚本)
RocketMQ 命令通过 mqadmin
(管理命令)和 mqnamesrv
/mqbroker
(启停命令)执行,需先配置 ROCKETMQ_HOME
环境变量。
1. 服务启停命令
- 启动 NameServer:
bash
nohup sh bin/mqnamesrv & # 后台启动,日志输出到 nohup.out tail -f ~/logs/rocketmqlogs/namesrv.log # 查看 NameServer 日志
- 启动 Broker(指定 NameServer 地址):
bash
nohup sh bin/mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true & # 允许自动创建 Topic tail -f ~/logs/rocketmqlogs/broker.log # 查看 Broker 日志
- 关闭 Broker:
bash
sh bin/mqshutdown broker
- 关闭 NameServer:
bash
sh bin/mqshutdown namesrv
2. 核心管理命令(mqadmin)
- 查看 Topic 列表:
bash
sh bin/mqadmin topicList -n 127.0.0.1:9876
- 创建 Topic(指定 Broker 集群):
bash
sh bin/mqadmin updateTopic -n 127.0.0.1:9876 -t TestTopic -c DefaultCluster
- 查看 Topic 路由信息(查看对应 Broker 和 Queue):
bash
sh bin/mqadmin topicRoute -n 127.0.0.1:9876 -t TestTopic
- 查看消费组信息(查看消费者订阅状态):
sh bin/mqadmin consumerStatus -n 127.0.0.1:9876 -g TestConsumerGroup
- 查看消息消费进度(查看消费组对 Topic 的消费偏移量):
sh bin/mqadmin consumerProgress -n 127.0.0.1:9876 -g TestConsumerGroup -t TestTopic
3.也可以使用图形化管理工具操作mq
三、常见问题与解决方案
1. 问题:Producer 发送消息失败,报错 “NO_NAME_SERVER_AVAILABLE”
- 原因:Producer 未正确配置 NameServer 地址,或 NameServer 未启动。
- 解决方案:
- 检查 NameServer 是否启动:执行
ps -ef | grep namesrv
,确认进程存在。 - 发送消息时指定 NameServer:代码中配置
producer.setNamesrvAddr("127.0.0.1:9876")
,或通过 JVM 参数-Drocketmq.namesrv.addr=127.0.0.1:9876
指定。
- 检查 NameServer 是否启动:执行
2. 问题:Consumer 无法消费消息,无报错但收不到消息
- 常见原因:
- 消费组(ConsumerGroup)配置错误,与 Producer 发送的 Topic 不匹配。
- 消费模式错误(如集群模式下,同组消费者已分摊消息)。
- 消息被过滤(Consumer 订阅时指定了 Tag,而 Producer 发送的消息 Tag 不匹配)。
- 解决方案:
- 核对 Consumer 配置:确认
consumer.setConsumerGroup("正确组名")
和consumer.subscribe("TestTopic", "正确Tag")
(Tag 为*
表示订阅所有消息)。 - 查看消费组状态:执行
sh bin/mqadmin consumerStatus -n 127.0.0.1:9876 -g 消费组名
,确认消费者已在线。 - 检查消息是否堆积:执行
consumerProgress
命令,若 “消费偏移量” 远小于 “最大偏移量”,说明消息堆积,需排查 Consumer 消费速度或数量。
- 核对 Consumer 配置:确认
3. 问题:Broker 启动失败,日志报错 “Address already in use”
- 原因:Broker 占用的端口(默认 10911 通信端口、10909 快查端口)已被其他进程占用。
- 解决方案:
- 查找占用端口的进程:
bash
netstat -tuln | grep 10911 # 查看 10911 端口占用 ps -ef | grep 进程ID # 查看占用进程详情
- 停止占用进程,或修改 Broker 端口:在
conf/broker.conf
中添加配置:ini
listenPort=10912 # 自定义通信端口 fastFailIfNoBufferInStorePool=true
- 重新启动 Broker,指定配置文件:
bash
nohup sh bin/mqbroker -n 127.0.0.1:9876 -c conf/broker.conf &
- 查找占用端口的进程:
4. 问题:消息消费后重复处理(重复消费)
- 原因:RocketMQ 不保证 “Exactly-Once”(仅一次)语义,因网络波动、Consumer 重启等,可能导致 ACK 丢失,Broker 重新投递消息。
- 解决方案:
- 业务层实现 “幂等性”:通过消息唯一 ID(如
msgId
或自定义业务 ID),在消费前检查是否已处理(如存入数据库或 Redis,用 ID 作为唯一键)。 - 配置 Consumer 重试次数:在代码中设置
consumer.setMaxReconsumeTimes(3)
(超过重试次数后,消息会进入 “死信队列”,需单独处理)。
- 业务层实现 “幂等性”:通过消息唯一 ID(如
5. 问题:NameServer 启动后,Broker 注册失败,日志报错 “connect to NameServerIP:9876 failed”
- 原因:
- NameServer 与 Broker 之间网络不通(防火墙拦截、IP 错误)。
- NameServer 端口(默认 9876)未开放。
- 解决方案:
- 测试网络连通性:在 Broker 机器执行
telnet 127.0.0.1 9876
或ping 127.0.0.1
,确认网络通畅。 - 开放防火墙端口:若开启防火墙,需放行 NameServer 和 Broker 端口:
bash
# 以 CentOS 为例,开放 9876(NameServer)、10911(Broker)端口 firewall-cmd --zone=public --add-port=9876/tcp --permanent firewall-cmd --zone=public --add-port=10911/tcp --permanent firewall-cmd --reload
- 重新启动 Broker,确认注册日志:查看 Broker 日志,出现 “register broker to name server successfully” 表示注册成功。
- 测试网络连通性:在 Broker 机器执行