当前位置: 首页 > news >正文

RocketMQ核心编程模型

RocketMQ核心编程模型与SpringBoot整合深度解析

笔记整理自RocketMQ官方文档与实战经验 | 图灵楼兰出品
配套视频课程学习效果更佳

一、RocketMQ架构核心回顾

RocketMQ采用经典发布-订阅模型,核心组件包括:

  • NameServer:轻量级服务发现中心(无状态)
  • Broker:消息存储与转发节点(主从架构)
  • Producer:消息生产者
  • Consumer:消息消费者

二、深入消息模型

1. 客户端基础流程

生产者固定步骤
// 1. 创建生产者(指定组名)
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 2. 配置NameServer地址
producer.setNamesrvAddr("192.168.65.112:9876"); 
// 3. 启动服务
producer.start();
// 4. 构建消息(Topic/Tag/Body)
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());
// 5. 发送消息
SendResult result = producer.send(msg);
// 6. 关闭生产者
producer.shutdown();
消费者固定步骤
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("192.168.65.112:9876");
consumer.subscribe("TopicTest", "*"); // 订阅Topic
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {msgs.forEach(msg -> System.out.println(new String(msg.getBody())));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费状态
});
consumer.start();

2. 消息确认机制

三种发送方式对比
发送方式特点适用场景
单向发送不关心结果,吞吐量最高日志收集等低可靠性场景
同步发送阻塞等待Broker响应金融交易等高可靠性场景
异步发送回调处理结果,平衡性能与可靠性电商下单等并发场景
消费端重试策略
  • 返回RECONSUME_LATER触发重试
  • 最大重试次数默认16次(可配置)
  • 重试消息进入专属重试Topic:%RETRY%+ConsumerGroup

3. 高级消息类型

顺序消息(局部有序)
// 生产者:相同订单号的消息发往同一队列
Message msg = new Message("OrderTopic", "PAY", orderId.getBytes(), body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;return mqs.get(id % mqs.size()); // 自定义队列选择}
}, orderId);// 消费者:实现MessageListenerOrderly
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {// 处理顺序消息...
});
事务消息(两阶段提交)
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.UNKNOW; // 执行本地事务}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.COMMIT_MESSAGE; // 事务回查}
});
延迟消息
// 指定延迟级别(1-18对应预设时间)
message.setDelayTimeLevel(3); // 10秒后投递// 指定精确时间点(5.0+版本)
message.setDeliverTimeMs(System.currentTimeMillis() + 30_000); // 30秒后

4. ACL权限控制

启用步骤:

  1. Broker端开启aclEnable=true
  2. 配置plain_acl.yml
accounts:
- accessKey: RocketMQsecretKey: 12345678topicPerms:- topicA=DENY- topicB=PUB|SUB
  1. 客户端添加认证Hook:
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials("RocketMQ", "12345678"));
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);

三、SpringBoot整合实战

1. 快速集成

依赖配置:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version>
</dependency>

配置文件application.yml

rocketmq:name-server: 192.168.65.112:9876producer:group: springboot-group

2. 消息生产与消费

// 生产者模板
@Autowired private RocketMQTemplate rocketMQTemplate;public void sendMessage() {rocketMQTemplate.convertAndSend("TestTopic", "Hello SpringBoot");
}// 消费者监听
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode = ConsumeMode.CONCURRENTLY
)
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received: " + message);}
}

3. 事务消息整合

@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {return RocketMQLocalTransactionState.UNKNOWN;}
}

四、客户端最佳实践

1. 消息三要素规范

属性作用
MessageIdBroker生成的消息唯一标识(不建议作业务主键)
Key业务唯一键(如订单ID),用于消息追踪
Tag消息标签,用于高效过滤(性能远高于SQL过滤

2. 消费者幂等设计

重复消息场景:

  • 网络闪断导致生产者重试
  • 消费端ACK失败触发重投
  • Rebalance过程消息重复

解决方案示例:

consumer.registerMessageListener((msgs, context) -> {MessageExt msg = msgs.get(0);String orderId = msg.getKeys(); // 获取业务主键// 分布式锁或数据库唯一索引校验if (orderService.isProcessed(orderId)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 处理业务逻辑...
});

3. 死信队列处理

  • 命名规则:%DLQ%+ConsumerGroup
  • 运维注意:默认权限禁读,需手动改为可读
  • 处理方式:
    1. 查询死信原因:sh mqadmin queryMsgById
    2. 修复后重新投递到正常Topic
    3. 设置单独消费者处理死信

4. 重试策略优化

// 调整最大重试次数(超过16次间隔固定2小时)
consumer.setMaxReconsumeTimes(10); // 重试间隔配置(需Broker端配合)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 30m 1h

官方说明:RocketMQ 仅保证 At Least Once,业务需自行实现幂等
源码位置:org.apache.rocketmq.common.consumer.ConsumeFromWhere


总结
RocketMQ的客户端设计充分考虑了金融级场景需求,通过多种消息模型组合可满足复杂业务场景。在实际使用中需特别注意:

  1. 生产环境务必启用ACL
  2. 顺序消息避免单队列堆积
  3. 死信队列监控不可或缺
  4. 消费者幂等是系统稳定性的生命线
http://www.dtcms.com/a/289531.html

相关文章:

  • Python基础-列表
  • 氛围编码(Vice Coding)的工具选择方式
  • 攻防世界-web习题-unseping-flag获取详解、总结
  • snmp攻击
  • 【CUDA进阶】Tensor Core实战教程(下)
  • 【JS逆向基础】数据库之redis
  • TypeScript环境安装和操作
  • 将 VHD/VHDX 转换为物理磁盘
  • 无 sudo 权限的环境下将 nvcc (CUDA Toolkit) 安装到个人目录 linux
  • 虚拟地址空间
  • rman清理归档
  • 2024年全国青少年信息素养大赛Scratch编程挑战赛 小低组初赛
  • 【JDK内置工具】常用工具和实战指令
  • 贝叶斯分类器的相关理论学习
  • 力扣面试150(34/150)
  • 人脸识别:AI 如何精准 “认人”?
  • Florence2-通用表征完成多种视觉任务的视觉基础模型
  • 最新轻量美化表白墙系统源码v2.0 带后台版 附搭建教程
  • 分治算法---归并
  • 智能制造——48页毕马威:汽车营销与研发数字化研究【附全文阅读】
  • Muduo库中单例模式详解
  • 【Anaconda】Conda 虚拟环境打包迁移教程
  • 基于ACPs协议的智能体互联网示例(多智能体旅游规划)
  • JMeter连接数据库
  • Linux操作系统从入门到实战(十一)回车换行问题与用户缓冲区问题
  • C++虚函数易错点整理
  • 20250720-4-Kubernetes 调度-指定节点调度:nodeSelectornodeAffinity笔记
  • LeetCode 3202.找出有效子序列的最大长度 II:取模性质(动态规划)
  • JDK8默认垃圾回收器
  • (Python)类和类的方法进阶(基础教程介绍)(Python基础教程)