当前位置: 首页 > news >正文

RocketMQ 部署;与Golang服务交互

在这里插入图片描述

文章目录

    • RocketMQ
        • 拉取优点
    • 跑一个demo
      • docker部署
      • console
        • 读写队列
      • consumer
        • 客户端定时统计
      • producer
      • 高可用
    • 对比 Kafka

RocketMQ

RocketMQ 的消费流程本质上是 Pull 模式,客户端长轮询。
但是下次轮询前有消息,会直接通知客户端。

  • 消费者主动向 Broker 请求消息。
  • Broker 返回一批消息(可以配置每次拉取的数量)。
拉取优点

客户端自己管理 offset (消费进度)。客户端可根据自身处理能力动态调整拉取频率(如设置长轮询超时时间),适应不同场景需求。 ‌

跑一个demo

docker部署

在这里插入图片描述

docker-compose.yml :

端口: 左 外部访问: 右 容器端口

  • NameServer
    NameServer 相当于 注册中心,记录 Broker 的信息,Producer/Consumer 需要先连到 NameServer 才能找到 Broker。
  • Broker
    10911: Broker 的 主端口(客户端生产/消费消息的通信)。
    10909: Broker 的 HA(高可用同步)端口,主从复制时使用。
  • Console(管理界面)
services:namesrv:image: apache/rocketmq:5.3.0container_name: rmqnamesrvports:- "9876:9876"command: sh mqnamesrvbroker:image: apache/rocketmq:5.3.0container_name: rmqbrokerports:- "10911:10911"- "10909:10909"environment:- NAMESRV_ADDR=namesrv:9876command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-5.3.0/conf/broker.confvolumes:# 左边是你本机路径(Windows),右边是容器路径(Linux)- ./broker.conf:/home/rocketmq/rocketmq-5.3.0/conf/broker.confdepends_on:- namesrvconsole:image: styletang/rocketmq-console-ng:latestcontainer_name: rmq-consoleports:- "8080:8080"environment:- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876depends_on:- namesrv- broker

broker.conf :

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSHbrokerIP1=172.29.24.103
  • brokerId = 0

    • 0 → 主节点(Master)
    • >0 → 从节点(Slave)
  • deleteWhen = 04
    每日定时清理,RocketMQ 会每天凌晨 04:00 检查消息过期文件并删除。

  • fileReservedTime = 48
    RocketMQ 会保留最近 48 小时 的消息文件,超过这个时间的消息文件会被删除(结合 deleteWhen 执行)。

  • brokerRole = ASYNC_MASTER

    • ASYNC_MASTER → 异步主节点 (异步主节点在写消息时不等待从节点确认,性能更高,但 HA 可靠性稍低。)
    • SYNC_MASTER → 同步主节点
    • SLAVE → 从节点
  • flushDiskType = ASYNC_FLUSH

    • ASYNC_FLUSH → 异步刷盘(性能高,可能丢少量消息)
    • SYNC_FLUSH → 同步刷盘(性能低,但消息安全性高)
  • brokerIP1=172.29.24.103
    指定 Broker 的 IP 地址(对外提供服务)。
    这个 IP 会在 NameServer 注册,用于客户端查找 Broker。

console

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

读写队列

Broker 和 Consumer 的调度机制 来分配工作

  • writeQueueNums(写队列数)
    如 Topic 有 16 个物理队列,Producer 会根据 MessageQueueSelector 或轮询算法选择写入哪条队列。
    对高吞吐量场景,writeQueueNums 可以大一些,支持更多并行写入。
  • readQueueNums(读队列数)
    Broker 会把该 Topic 的读队列均匀分配给不同 Consumer 实例,保证每条队列只被一个 Consumer 实例消费(避免重复消费)
  • perm(权限 Permission)
    2 → 可写(Producer 可以发送消息)
    4 → 可读(Consumer 可以消费消息)
    6 → 可读可写(2 + 4)

consumer

消费者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {c, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"172.29.24.103:9876"}),consumer.WithGroupName("TestGroup"),)if err != nil {panic(err)}err = c.Subscribe("TestTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for _, msg := range msgs {fmt.Println("Received:", string(msg.Body))}return consumer.ConsumeSuccess, nil})if err != nil {panic(err)}err = c.Start()if err != nil {panic(err)}fmt.Println("Consumer started...")// 阻塞,不让 main 退出 【不占用CPU】select {}//time.Sleep(30 * time.Second) // 拉长一点时间,方便测试//_ = c.Shutdown()
}

在这里插入图片描述

接收到消息:
在这里插入图片描述

客户端定时统计

Go 客户端默认每隔一段时间 统计一次 消息消费和拉取指标
每五秒钟一次输出:(对应控制台创建topic时设置的16个读队列)

time="2025-09-18T19:54:47+08:00" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=TestTopic@TestGroup statsName=CONSUME_RT
time="2025-09-18T19:54:47+08:00" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=TestTopic@TestGroup statsName=PULL_RT
time="2025-09-18T19:54:47+08:00" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=TestTopic@TestGroup statsName=CONSUME_OK_TPS
time="2025-09-18T19:54:47+08:00" level=info msg="Stats In One Minute." AVGPT=0 SUM=0 TPS=0.00 statsKey=TestTopic@TestGroup statsName=PULL_TPS
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=0]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=7]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=10]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=6]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=%RETRY%TestGroup, brokerName=broker-a, queueId=0]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=12]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1]" consumerGroup=TestGroup offset=2
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=4]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=14]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=5]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=13]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=11]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=15]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=2]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=8]" consumerGroup=TestGroup offset=0
time="2025-09-18T19:54:47+08:00" level=info msg="update offset to broker success" MessageQueue="MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=9]" consumerGroup=TestGroup offset=0

producer

生产者

package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {// 创建 Producerp, err := rocketmq.NewProducer(producer.WithNameServer([]string{"172.29.24.103:9876"}), // 和 consumer 一致producer.WithRetry(2),)if err != nil {panic(err)}// 启动 Producererr = p.Start()if err != nil {panic(err)}// 构造消息msg := &primitive.Message{Topic: "TestTopic",Body:  []byte("Hello RocketMQ from Go!"),}// 同步发送消息res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Println("Send failed:", err)} else {fmt.Println("Send success:", res.String())}// 关闭 Producer_ = p.Shutdown()time.Sleep(time.Second) // 等一会儿,防止日志没刷完
}

发送消息
在这里插入图片描述

高可用

多 NameServer

namesrv1:image: apache/rocketmq:5.3.0container_name: namesrv1ports: ["9876:9876"]command: sh mqnamesrvnamesrv2:image: apache/rocketmq:5.3.0container_name: namesrv2ports: ["9877:9876"]command: sh mqnamesrv

多 broker

rocketmq/├─ conf/│   ├─ broker-master.conf│   └─ broker-slave.conf

对比 Kafka

两者都采用 Pull Long Polling 机制,这是保证实时性和减少无效网络交互的关键。 【长轮询是兼顾实时性和服务端压力的一个优秀折中方案。】

  • 消息到来时,rmq 会主动通知
    kafka则不会,鼓励批处理

  • Kafka 原生不支持定时/延迟消息

  • Kafka集群需要借助Zookeeper

  • RocketMQ还支持Filter Server组件,用于支持消息过滤功能。这种设计使得RocketMQ在处理复杂业务逻辑时更加灵活。

  • RocketMQ单机支持最高5万个队列,这使得它在处理大量队列时仍能保持稳定的性能。这种高队列数支持得益于RocketMQ的队列模型设计,使得它在处理海量消息时具有更高的吞吐量和更低的延迟。
    相比之下,Kafka在单机超过64个队列/分区时,消息发送性能可能会显著降低。这意味着在处理大量分区时,Kafka可能会遇到性能瓶颈。

  • 消息顺序性
    消息顺序性对于某些业务场景至关重要,如金融交易、订单处理等。RocketMQ支持严格的消息顺序,即使在一台Broker宕机的情况下,也能通过其他机制保证消息的有序性。这种顺序性保证使得RocketMQ在需要确保消息顺序的场景中具有优势。
    Kafka在某些配置下也支持消息顺序,但当一台Broker宕机后,可能会产生消息乱序的问题。这可能会影响到业务的正确性和一致性。

  • Kafka 高吞吐:
    Kafka采用分区(partition)的方式存储消息,每个分区对应一个或多个数据文件。这种设计有助于实现高吞吐量的数据写入和读取。同时,Kafka支持消息压缩和删除策略,以优化存储空间和性能。
    RocketMQ则主要使用一个物理文件(commitLog)来存储消息,而队列信息则维护在consumeQueue中。这种设计使得RocketMQ在消息存储和读取方面具有较高的效率。然而,随着消息量的增长,单个物理文件的大小可能会变得非常庞大,这可能对文件管理和备份带来挑战。

http://www.dtcms.com/a/389275.html

相关文章:

  • 南京某高校校园外卖点餐系统_django
  • 类的基础语法(笔记补充)
  • pycharm 连git 传文件到GitHub
  • 11 简答题-伪码转为NS图 PAD图
  • Java 中如何利用 CAS 实现原子操作?以AtomicInteger 为例
  • Custom SRP - Point And Spot Shadows
  • 无障碍前端组件实践(上):基础交互组件与色彩无障碍
  • 矩阵的导数运算
  • 微算法科技(NASDAQ:MLGO)多注意力循环网络:MARN技术如何让机器理解语言、手势与语音的微妙交互
  • 混合架构(SpringCloud+Dubbo)的整合方案与适用场景(二)
  • centos的hadoop的允许hdfs命令覆盖linux系统目录文件或生成副本
  • 跨平台开发框架全景分析:Flutter、RN、KMM 与腾讯 Kuikly 谁更值得选择?
  • 燃料电池负载均衡测试:解锁高效供能密码
  • ip地址在哪里查看?怎样查询自己电脑ip?如何找到使用内网ip,判断看本地有无公网ip?内网ip怎么给外网访问?
  • 设计模式-模板方法模式详解
  • Red Hat 8.5.0-18 部署ceph文件系统
  • 将ceph文件存储挂载给k8s使用
  • ENVI系列教程(七)——自定义 RPC 文件图像正射校正
  • 「Java EE开发指南」如何用MyEclipse开发Java EE企业应用程序?(二)
  • Linux -- 传输层协议UDP
  • 使用Android Studio中自带的手机投屏功能
  • LeetCode:19.螺旋矩阵
  • Windows 命令行:在 cd 命令中使用绝对路径与相对路径
  • 图片修改尺寸
  • 《嵌入式硬件(十五):基于IMX6ULL的统一异步收发器(UART)的操作》
  • Python爬虫实战:研究Pandas,构建苏宁易购月饼销售数据采集与智能推荐系统
  • 导购app佣金模式的分布式计算架构:实时分账与财务对账
  • Linux Bash脚本自动创建keystore和生成公钥
  • 数据库管理员偏爱哪些MySQL数据库连接工具?
  • 大数据毕业设计选题推荐-基于大数据的农产品交易数据分析与可视化系统-Spark-Hadoop-Bigdata