RocketMQ 部署;与Golang服务交互
文章目录
- RocketMQ
- 拉取优点
- 跑一个demo
- docker部署
- console
- 读写队列
- consumer
- 客户端定时统计
- producer
- 高可用
- 对比 Kafka
RocketMQ
RocketMQ 的消费流程本质上是 Pull 模式,客户端长轮询。
但是下次轮询前有消息,会直接通知客户端。
- 消费者主动向 Broker 请求消息。
- Broker 返回一批消息(可以配置每次拉取的数量)。
拉取优点
客户端自己管理 offset (消费进度)。客户端可根据自身处理能力动态调整拉取频率(如设置长轮询超时时间),适应不同场景需求。
跑一个demo
docker部署
docker-compose.yml :
端口: 左 外部访问: 右 容器端口
- NameServer
NameServer 相当于 注册中心,记录 Broker 的信息,Producer/Consumer 需要先连到 NameServer 才能找到 Broker。 - Broker
10911: Broker 的 主端口(客户端生产/消费消息的通信)。
10909: Broker 的 HA(高可用同步)端口,主从复制时使用。 - Console(管理界面)
services:namesrv:image: apache/rocketmq:5.3.0container_name: rmqnamesrvports:- "9876:9876"command: sh mqnamesrvbroker:image: apache/rocketmq:5.3.0container_name: rmqbrokerports:- "10911:10911"- "10909:10909"environment:- NAMESRV_ADDR=namesrv:9876command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-5.3.0/conf/broker.confvolumes:# 左边是你本机路径(Windows),右边是容器路径(Linux)- ./broker.conf:/home/rocketmq/rocketmq-5.3.0/conf/broker.confdepends_on:- namesrvconsole:image: styletang/rocketmq-console-ng:latestcontainer_name: rmq-consoleports:- "8080:8080"environment:- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876depends_on:- namesrv- broker
broker.conf :
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSHbrokerIP1=172.29.24.103
-
brokerId = 0
- 0 → 主节点(Master)
- >0 → 从节点(Slave)
-
deleteWhen = 04
每日定时清理,RocketMQ 会每天凌晨 04:00 检查消息过期文件并删除。 -
fileReservedTime = 48
RocketMQ 会保留最近 48 小时 的消息文件,超过这个时间的消息文件会被删除(结合 deleteWhen 执行)。 -
brokerRole = ASYNC_MASTER
- ASYNC_MASTER → 异步主节点 (异步主节点在写消息时不等待从节点确认,性能更高,但 HA 可靠性稍低。)
- SYNC_MASTER → 同步主节点
- SLAVE → 从节点
-
flushDiskType = ASYNC_FLUSH
- ASYNC_FLUSH → 异步刷盘(性能高,可能丢少量消息)
- SYNC_FLUSH → 同步刷盘(性能低,但消息安全性高)
-
brokerIP1=172.29.24.103
指定 Broker 的 IP 地址(对外提供服务)。
这个 IP 会在 NameServer 注册,用于客户端查找 Broker。
console
读写队列
Broker 和 Consumer 的调度机制 来分配工作
- writeQueueNums(写队列数)
如 Topic 有 16 个物理队列,Producer 会根据 MessageQueueSelector 或轮询算法选择写入哪条队列。
对高吞吐量场景,writeQueueNums 可以大一些,支持更多并行写入。 - readQueueNums(读队列数)
Broker 会把该 Topic 的读队列均匀分配给不同 Consumer 实例,保证每条队列只被一个 Consumer 实例消费(避免重复消费) - perm(权限 Permission)
2 → 可写(Producer 可以发送消息)
4 → 可读(Consumer 可以消费消息)
6 → 可读可写(2 + 4)
consumer
消费者
package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {c, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"172.29.24.103:9876"}),consumer.WithGroupName("TestGroup"),)if err != nil {panic(err)}err = c.Subscribe("TestTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for _, msg := range msgs {fmt.Println("Received:", string(msg.Body))}return consumer.ConsumeSuccess, nil})if err != nil {panic(err)}err = c.Start()if err != nil {panic(err)}fmt.Println("Consumer started...")// 阻塞,不让 main 退出 【不占用CPU】select {}//time.Sleep(30 * time.Second) // 拉长一点时间,方便测试//_ = c.Shutdown()
}
接收到消息:
客户端定时统计
Go 客户端默认每隔一段时间 统计一次 消息消费和拉取指标
每五秒钟一次输出:(对应控制台创建topic时设置的16个读队列)
time="2025-09-18T19:54:47+08:00" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=TestTopic@TestGroup statsName=CONSUME_RT
time="2025-09-18T19:54:47+08:00" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=TestTopic@TestGroup statsName=PULL_RT
time="2025-09-18T19:54:47+08:00" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=TestTopic@TestGroup statsName=CONSUME_OK_TPS
time="2025-09-18T19:54:47+08:00" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=TestTopic@TestGroup statsName=PULL_TPS
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=7]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=10]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=6]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%TestGroup, brokerName=broker-a, queueId=0]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=12]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1]" consumerGroup=TestGroup offset=2
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=4]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=14]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=5]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=13]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=11]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=15]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=8]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=9]" consumerGroup=TestGroup offset=0
producer
生产者
package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {// 创建 Producerp, err := rocketmq.NewProducer(producer.WithNameServer([]string{"172.29.24.103:9876"}), // 和 consumer 一致producer.WithRetry(2),)if err != nil {panic(err)}// 启动 Producererr = p.Start()if err != nil {panic(err)}// 构造消息msg := &primitive.Message{Topic: "TestTopic",Body: []byte("Hello RocketMQ from Go!"),}// 同步发送消息res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Println("Send failed:", err)} else {fmt.Println("Send success:", res.String())}// 关闭 Producer_ = p.Shutdown()time.Sleep(time.Second) // 等一会儿,防止日志没刷完
}
发送消息
高可用
多 NameServer
namesrv1:image: apache/rocketmq:5.3.0container_name: namesrv1ports: ["9876:9876"]command: sh mqnamesrvnamesrv2:image: apache/rocketmq:5.3.0container_name: namesrv2ports: ["9877:9876"]command: sh mqnamesrv
多 broker
rocketmq/├─ conf/│ ├─ broker-master.conf│ └─ broker-slave.conf
对比 Kafka
两者都采用 Pull Long Polling 机制,这是保证实时性和减少无效网络交互的关键。 【长轮询是兼顾实时性和服务端压力的一个优秀折中方案。】
-
消息到来时,rmq 会主动通知
kafka则不会,鼓励批处理 -
Kafka 原生不支持定时/延迟消息
-
Kafka集群需要借助Zookeeper
-
RocketMQ还支持Filter Server组件,用于支持消息过滤功能。这种设计使得RocketMQ在处理复杂业务逻辑时更加灵活。
-
RocketMQ单机支持最高5万个队列,这使得它在处理大量队列时仍能保持稳定的性能。这种高队列数支持得益于RocketMQ的队列模型设计,使得它在处理海量消息时具有更高的吞吐量和更低的延迟。
相比之下,Kafka在单机超过64个队列/分区时,消息发送性能可能会显著降低。这意味着在处理大量分区时,Kafka可能会遇到性能瓶颈。 -
消息顺序性:
消息顺序性对于某些业务场景至关重要,如金融交易、订单处理等。RocketMQ支持严格的消息顺序,即使在一台Broker宕机的情况下,也能通过其他机制保证消息的有序性。这种顺序性保证使得RocketMQ在需要确保消息顺序的场景中具有优势。
Kafka在某些配置下也支持消息顺序,但当一台Broker宕机后,可能会产生消息乱序的问题。这可能会影响到业务的正确性和一致性。 -
Kafka 高吞吐:
Kafka采用分区(partition)的方式存储消息,每个分区对应一个或多个数据文件。这种设计有助于实现高吞吐量的数据写入和读取。同时,Kafka支持消息压缩和删除策略,以优化存储空间和性能。
RocketMQ则主要使用一个物理文件(commitLog)来存储消息,而队列信息则维护在consumeQueue中。这种设计使得RocketMQ在消息存储和读取方面具有较高的效率。然而,随着消息量的增长,单个物理文件的大小可能会变得非常庞大,这可能对文件管理和备份带来挑战。