RocketMQ详解,消息队列实战
本文详细介绍了RocketMQ的核心概念、工作原理、安装部署以及特性。
1. 为什么需要消息队列?
典型场景:双十一秒杀
想象一下双十一零点,海量用户瞬间涌入淘宝下单的场景:
-
没有使用MQ的问题(同步调用)
用户下单后,订单系统必须同步等待库存系统扣减库存、支付系统生成支付单、物流系统创建物流单等一系列操作全部完成后,才能给用户返回"下单成功"。
这就好比一家奶茶店只有一个点单员和一个制作员。点单员收完钱,必须等着制作员一杯杯做完,才能服务下一个客户,效率极低,队伍排成长龙。 -
引入MQ的解决方案(异步解耦)
订单系统(生产者)在接单后,只需要将订单信息(消息)快速写入一个"中间站"(消息队列),就可以立即返回结果给用户,无需等待后续处理。
库存、支付、物流等系统(消费者)根据自己的处理能力,从"中间站"里拉取订单消息,慢慢进行后续处理。
MQ的三大核心价值
- 异步:用户无需漫长等待,下单响应极快。
- 削峰填谷:海量下单请求先积压在消息队列中,后端系统按照自身能力处理,防止系统被瞬时流量冲垮。
- 解耦:订单系统不需要知道也不依赖库存、支付等系统。即使物流系统暂时宕机了,也不会影响用户下单,只需恢复后继续处理积压的订单消息即可。
2. RocketMQ介绍:阿里巴巴的超级快递系统
RocketMQ是阿里巴巴开源的一款分布式消息中间件,后来捐赠给Apache基金会,成为顶级项目。它是为电商巨量订单场景量身定做的、高度可靠且高效的"订单调度中心"。
核心特点
- 高吞吐量:单机可达10万级QPS,满足电商等大数据场景。
- 高可用:支持多主多从的集群部署,任何一台机器宕机都不影响服务。
- 消息可靠:提供强大的消息持久化机制,保证消息不丢失。
- 功能丰富:支持顺序消息、事务消息、延迟消息等复杂业务场景。
3. 基本概念解析
概念 | 说明 | 类比 |
---|---|---|
消息 | 要传递的数据 | 订单数据:{“orderId”: 10001, “userId”: 123, “status”: “created”} |
主题 | 消息的分类 | 订单消息发往Order_Topic,物流消息发往Logistics_Topic |
队列 | Topic下的子分区 | 订单主题下的多条传送带,实现并行处理 |
Message ID | 系统生成的唯一ID | 消息的唯一标识 |
Key | 业务标识 | 订单ID(10001),用于查询消息处理轨迹 |
4. RocketMQ工作流程:"订单调度中心"如何运作
启动流程
- NameServer启动:服务发现中心先启动,记录所有"快递网点"地址。
- Broker注册:存储节点启动后,向所有NameServer注册自己的地址和服务。
消息发送流程
- 查询路由:Producer向NameServer查询:“哪个Broker可以接收Order_Topic的订单?”
- 返回地址:NameServer返回Broker地址。
- 发送消息:Producer将消息发送给对应Broker,Broker将其存入指定Topic的队列中。
消息消费流程
- 查询路由:Consumer向NameServer查询:“哪个Broker有Order_Topic的订单?”
- 返回地址:NameServer返回Broker地址。
- 拉取消费:Consumer从Broker的队列中拉取消息进行消费,例如扣减库存。
5. Topic的创建模式
自动创建(开发用)
当Producer第一次向不存在的Topic发送消息时,RocketMQ会自动创建它。
手动创建(生产环境)
通过控制台预先创建Topic,指定队列数量,进行更好的资源规划和权限控制。
6. 快速搭建RocketMQ服务
方式一:CentOS7安装
- 下载:
https://rocketmq.apache.org/zh/download/
- 安装JDK
- 上传zip包到服务器并解压:
unzip rocketmq-all-5.3.2-bin-release.zip
- 修改启动的JVM内存大小,配置文件在bin目录下
vi runbroker.sh
vi runserver.sh# 参考设置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m 0XX:MetaspaceSize=128m -Xx:mMaxMetaspaceSize=320m"
- 启动NameServer
# 启动NameServer
nohup sh mqnamesrv &
# 查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
- 启动Broker
# 启动Broker
nohup sh mqbroker -n 127.0.0.1:9876 -c /opt/rocketmq/rocketmq-all-5.3.2-bin-release/conf/broker.conf &
# 查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
- 关闭RocketMQ
# 关闭NameServer
sh mqshutdown namesrv
# 关闭Broker
sh mqshutdown broker
方式二:Windows本地安装
确保已经安装了JDK
- 下载地址:https://rocketmq.apache.org/zh/download/
- 选择Binary下载二进制ZIP包,下载完之后进行解压
- 新建环境变量
变量名:ROCKETMQ_HOME
变量值:安装目录,例如:D:\Java\rocketmq-all-5.3.2-bin-release\rocketmq-all-5.3.2-bin-release
- PATH环境变量新增:
%ROKCETMQ_HOME%/bin
- 修改启动参数,默认参数比较大
bin/runserve.cmdbin/runborker.cmdbin/tools.sh
- 启动NameServer,启动后不要关闭cmd窗口
# cd到解压后的安装目录的bin文件夹
cd D:\rocketmq-all-5.3.3-bin-release\bin# 启动NameServer
mqnamesrv.cmd# 成功提示
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
7. 配置broker的conf文件
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=172.16.60.204:9876# 存储路径配置,根据自己实际路径进行修改
storePathRootDir=D:/JMY/rocketmq-all-5.3.2-bin-release/mqstore
storePathCommitLog=D:/JMY/rocketmq-all-5.3.2-bin-release/mqstore/commitlog
storePathConsumeQueue=D:/JMY/rocketmq-all-5.3.2-bin-release/mqstore/consumequeue
storePathIndex=D:/JMY/rocketmq-all-5.3.2-bin-release/mqstore/index
storePathCheckpoint=D:/JMY/rocketmq-all-5.3.2-bin-release/mqstore/checkpoint
storePathScheduleMessage=D:/JMY/rocketmq-all-5.3.2-bin-release/mqstore/schedule# 自动创建主题
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
- 启动Broker,启动后不要关闭cmd窗口
# 启动Broker
mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf# 成功提示
The broker[broker-a, 172.16.60.33:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
- 停止服务,直接关闭或者使用命令
mqshutdown.cmd broker
mqshutdown.cmd namesrv
- autoCreateTopicEnable=true
autoCreateTopicEnable
:控制当 Producer 发送消息到不存在的 Topic 时,是否自动创建该 Topic
7. Java客户端集成
- 添加Maven依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version>
</dependency>
- 配置
application.yml
rocketmq:name-server: 127.0.0.1:9876producer:group: platform-base-producer-groupsend-message-timeout: 3000retry-times-when-send-failed: 2
- 消息发送示例
@RestController
@RequestMapping("/mq")
public class SendController {@Resourceprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendMessage")public CommonResult sendMessage() {try {rocketMQTemplate.syncSend("test-topic", "Hello, World!", 3000);return new CommonResult(CommonResultEmnu.OK);} catch (Exception e) {return new CommonResult(CommonResultEmnu.ERROR, "消息发送失败: " + e.getMessage());}}
}
- 消息消费示例
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "demo-consumer-group")
public class ConsumeService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到消息: " + message);// 业务处理逻辑}
}
8. Dashboard可视化管理平台
-
下载地址:
https://rocketmq.apache.org/zh/download#rocketmq-dashboard
-
使用IDEA启动项目,注意JDK版本要使用1.8
-
如果连接提示:
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.16.60.33:10909> failed
- 解决:修改RocketMQ安装目录下conf文件夹下的broker.conf文件,添加borkerIP1=本机IP
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
borkerIP1=192.168.226.188
-
重启时候,先启动namesrv,再启动消息服务器broker
-
直接访问,默认端口为8080
9. 可视化管理平台各个参数介绍
- Cluster:查看Broker集群状态
- Topic:管理主题,监控消息堆积量
- Consumer:查看消费组进度和延迟情况
- Message:按ID或Key查询消息
- Connection:查看客户端连接信息
10. 客户端消息确认机制
RocketMQ通过双重确认保证消息可靠性:
-
Broker → Producer:发送后返回SendResult告知发送状态
-
Consumer → Broker:消费成功后返回CONSUME_SUCCESS,失败返回RECONSUME_LATER
-
消息最多重试16次,仍失败则进入死信队列,需要人工干预。
11. 广播模式 vs 集群模式
默认是集群模式:同一个消费者组内,一条消息只能被一个消费者实例消费。
广播模式:同一个消费者组内,一条消息会被组内每一个消费者实例消费一次。
广播设置方式:messageModel = MessageModel.BROADCASTING
应用场景:比如有一个“配置更新”的消息,需要通知到所有服务实例,让每个实例都刷新自己的本地缓存。
集群模式设置
@Service
@RocketMQMessageListener(topic = "test-topic",consumerGroup = "demo-consumer-group2",messageModel = MessageModel.BROADCASTING)
class ConsumeService2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("消费者2收到消息: " + message);}
}
广播模式设置
@Service
@RocketMQMessageListener(topic = "test-topic",consumerGroup = "demo-consumer-group",messageModel = MessageModel.CLUSTERING)
class ConsumeService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("消费者1收到消息: " + message);}
}
注意:一条消息只会被投递给同一个消费者组内的一个消费者,但会被投递给每一个订阅了该Topic的消费者组。
12. 消息过滤机制
消费者可以只订阅Topic下的符合特定条件的消息。
TAG过滤:为消息打上Tag标签,消费者可以按Tag订阅。
// 生产者发送时设置Tag
Message msg = new Message("Order_Topic", "Paid", "订单001已支付".getBytes());// 消费者按Tag订阅
consumer.subscribe("Order_Topic", "Paid || Created");
- SQL表达式过滤
更复杂的过滤方式,根据消息属性进行过滤(需Broker开启功能)
13. 顺序消息机制
保证关键消息顺序处理,如订单的创建 → 付款 → 发货
实现原理
- 生产者:通过MessageQueueSelector将同一批消息发送到同一Queue
- 消费者:以拉取锁方式消费Queue,保证单线程顺序处理
发送示例
package com.platform.base.demo.mq;import com.alibaba.fastjson.JSON;
import com.platform.common.domain.emnu.CommonResultEmnu;
import com.platform.common.domain.result.CommonResult;
import com.platform.common.util.CommonUtil;
import jakarta.annotation.Resource;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Optional;@RestController
@RequestMapping("/v1/base/pc/mq")
public class SendController {@Resourceprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendMessage")public CommonResult sendMessage(){String param = CommonUtil.getObjFromReq(String.class);HashMap<String, Object> parse = JSON.parseObject(param, HashMap.class);// 获取消息数量Integer num = Optional.ofNullable(parse.get("mqNum")).map(Object::toString).map(Integer::valueOf).orElse(1);// 获取用于分区的key(顺序消息的关键)String shardingKey = Optional.ofNullable(parse.get("shardingKey")).map(Object::toString).orElse("defaultKey");try {for (int i = 0; i < num; i++) {// 使用syncSendOrderly发送顺序消息rocketMQTemplate.syncSendOrderly("test-topic", "Hello, World! " + i, shardingKey, 3000);}return new CommonResult(CommonResultEmnu.OK);} catch (Exception e) {return new CommonResult(CommonResultEmnu.ERROR, "顺序消息发送失败: " + e.getMessage());}}}
消费示例
package com.platform.base.demo.mq;import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** @Author WangHan* @Date 2025/7/24* @Description 消费者*/
@Service
@RocketMQMessageListener(topic = "test-topic",consumerGroup = "demo-consumer-group",consumeMode = ConsumeMode.ORDERLY)
class ConsumeService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理顺序消息System.out.println("Received orderly message: " + message);}
}
结果
Received orderly message: Hello, World! 0
Received orderly message: Hello, World! 1
Received orderly message: Hello, World! 2
Received orderly message: Hello, World! 3
Received orderly message: Hello, World! 4
14. 延迟消息
消息发送后,不会立刻被消费,而是在指定的延迟时间后才能被消费者拉到。
应用场景:订单超时未支付自动取消、定时提醒等。
支持延迟级别
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
对应级别为1-18
发送示例
package com.platform.base.demo.mq;import com.alibaba.fastjson.JSON;
import com.platform.common.domain.emnu.CommonResultEmnu;
import com.platform.common.domain.result.CommonResult;
import com.platform.common.util.CommonUtil;
import jakarta.annotation.Resource;
import lombok.Data;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Optional;@RestController
@RequestMapping("/v1/base/pc/mq")
public class SendController {/*RocketMQ支持以下延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h对应级别为1-18*/@Resourceprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendMessage")public CommonResult sendMessage(){String param = CommonUtil.getObjFromReq(String.class);HashMap<String, Object> parse = JSON.parseObject(param, HashMap.class);// 获取消息内容String messageContent = Optional.ofNullable(parse.get("message")).map(Object::toString).orElse("Default delay message");// 获取延迟级别Integer delayLevel = Optional.ofNullable(parse.get("delayLevel")).map(Object::toString).map(Integer::valueOf).orElse(3); // 默认延迟级别3(大约10秒)try {// 构造Message对象org.springframework.messaging.Message<String> message = org.springframework.messaging.support.MessageBuilder.withPayload(messageContent).build();// 发送延迟消息System.out.println("发送延迟消息开始" + new Date());rocketMQTemplate.syncSend("test-topic", message, 3000, delayLevel);return new CommonResult(CommonResultEmnu.OK, "延迟消息发送成功,延迟级别: " + delayLevel);} catch (Exception e) {return new CommonResult(CommonResultEmnu.ERROR, "延迟消息发送失败: " + e.getMessage());}}}
接收示例
package com.platform.base.demo.mq;import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.util.Date;/*** @Author WangHan* @Date 2025/7/24* @Description 消费者*/
@Service
@RocketMQMessageListener(topic = "test-topic",consumerGroup = "demo-consumer-group",consumeMode = ConsumeMode.ORDERLY)
class ConsumeService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理顺序消息System.out.println("接收到消息: " + message + ",处理完成" + new Date());}
}
结果
2025-09-09 19:17:03|INFO |com.platform.base.config.Interceptor.AdminInterceptor:41|send GET request to http://127.0.0.1:9205/v1/base/pc/mq/sendMessage
发送延迟消息开始Tue Sep 09 19:17:03 GMT+08:00 2025
接收到消息: Default delay message,处理完成Tue Sep 09 19:17:13 GMT+08:00 2025
15. 批量消息
将多条消息打包一次性发送,减少网络IO,提高性能
注意:单次批量消息总大小不能超过4MB
发送示例
package com.platform.base.demo.mq;import com.alibaba.fastjson.JSON;
import com.platform.common.domain.emnu.CommonResultEmnu;
import com.platform.common.domain.result.CommonResult;
import com.platform.common.util.CommonUtil;
import jakarta.annotation.Resource;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.*;@RestController
@RequestMapping("/v1/base/pc/mq")
public class SendController {@Resourceprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/sendMessage")public CommonResult sendMessage(){String param = CommonUtil.getObjFromReq(String.class);HashMap<String, Object> parse = JSON.parseObject(param, HashMap.class);String messageContent = Optional.ofNullable(parse.get("message")).map(Object::toString).orElse("Default batch delay message");Integer batchCount = Optional.ofNullable(parse.get("batchCount")).map(Object::toString).map(Integer::valueOf).orElse(5);Integer delayLevel = Optional.ofNullable(parse.get("delayLevel")).map(Object::toString).map(Integer::valueOf).orElse(3);try {List<SendResult> results = new ArrayList<>();for (int i = 0; i < batchCount; i++) {Message<String> message = MessageBuilder.withPayload(messageContent + " - " + i).build();SendResult result = rocketMQTemplate.syncSend("test-topic", message, 3000, delayLevel);results.add(result);}return new CommonResult(CommonResultEmnu.OK, "批量延迟消息发送成功,共发送: " + results.size() + " 条消息");} catch (Exception e) {return new CommonResult(CommonResultEmnu.ERROR, "批量延迟消息发送失败: " + e.getMessage());}}}
接收示例
package com.platform.base.demo.mq;import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;import java.util.Date;/*** @Author WangHan* @Date 2025/7/24* @Description 消费者*/
@Service
@RocketMQMessageListener(topic = "test-topic",consumerGroup = "demo-consumer-group",consumeMode = ConsumeMode.ORDERLY)
class ConsumeService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理顺序消息System.out.println("接收到消息: " + message + ",处理完成" + new Date());}
}
结果
接收到消息: Default batch delay message - 0,处理完成Tue Sep 09 19:22:09 GMT+08:00 2025
接收到消息: Default batch delay message - 3,处理完成Tue Sep 09 19:22:09 GMT+08:00 2025
接收到消息: Default batch delay message - 1,处理完成Tue Sep 09 19:22:09 GMT+08:00 2025
接收到消息: Default batch delay message - 2,处理完成Tue Sep 09 19:22:09 GMT+08:00 2025
接收到消息: Default batch delay message - 4,处理完成Tue Sep 09 19:22:09 GMT+08:00 2025
16. 事务消息
分布式事务的经典解决方案,用于保证本地事务和消息发送的最终一致性。
流程(半消息):
1. 生产者先向Broker发送一条半消息
,此时消费者不可见。
2. 生产者执行本地事务(如扣减库存)。
3. 根据本地事务执行结果,生产者向Broker提交确认
或回滚
。
4. Broker如果收到确认,则将半消息变为正式消息,可供消费;如果收到回滚,则删除半消息。
RocketMQ提供了事务回查机制:如果生产者执行完本地事务后宕机,没有提交确认/回滚,Broker会定时回查生产者,询问该消息的最终状态。
事务消息在以下情况下会存在于MQ中:
-
Half消息阶段:消息已发送到MQ但消费者不可见
-
UNKNOWN状态:事务状态未确定,等待回查
-
回查期间:Broker正在等待应用返回最终状态
-
COMMIT状态:消息对消费者可见,直到被成功消费
事务消息不会存在于MQ中的情况:
- ROLLBACK状态:消息会被立即删除
- 消费成功:消息被消费者确认后删除
- 达到最大回查次数仍未确定:根据配置可能被删除
因此,如果事务消息没有成功处理(即没有返回COMMIT状态),消息会一直存在于MQ中直到得到最终状态或达到最大回查次数。
场景:
发送支付请求,接收到回调时直接支付成功,同时发送一条事务消息到mq中,mq消费时执行扣减库存的动作和增加销量的动作
发送示例
package com.platform.base.demo.mq;import com.alibaba.fastjson.JSON;
import com.platform.common.domain.emnu.CommonResultEmnu;
import com.platform.common.domain.result.CommonResult;
import com.platform.common.util.CommonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import jakarta.annotation.Resource;
import java.util.HashMap;
import java.util.Optional;@RestController
@RequestMapping("/v1/base/pc/payment")
public class PaymentCallbackController {@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 支付回调接口* 接收到支付成功回调后,发送事务消息处理库存扣减和销量增加*/@GetMapping("/callback")public CommonResult paymentCallback() {try {// 获取支付回调参数String param = CommonUtil.getObjFromReq(String.class);HashMap<String, Object> parse = JSON.parseObject(param, HashMap.class);// 解析支付信息String orderId = Optional.ofNullable(parse.get("orderId")).map(Object::toString).orElse("ORDER_" + System.currentTimeMillis());String productId = Optional.ofNullable(parse.get("productId")).map(Object::toString).orElse("PRODUCT_001");Integer quantity = Optional.ofNullable(parse.get("quantity")).map(Object::toString).map(Integer::valueOf).orElse(1);Double amount = Optional.ofNullable(parse.get("amount")).map(Object::toString).map(Double::valueOf).orElse(0.0);System.out.println("接收到支付成功回调,订单ID: " + orderId + ", 商品ID: " + productId + ", 数量: " + quantity + ", 金额: " + amount);// 构造事务消息内容:商品ID:数量String transactionPayload = productId + ":" + quantity;// 构造事务消息,确保使用字符串作为payloadMessage<String> message = MessageBuilder.withPayload(transactionPayload) // 确保是String类型.setHeader("orderId", orderId).setHeader("productId", productId).setHeader("quantity", quantity).build();// 发送事务消息,处理库存扣减和销量增加System.out.println("发送事务消息处理库存和销量,订单ID: " + orderId);SendResult sendResult = rocketMQTemplate.sendMessageInTransaction("inventory-topic", message, orderId);return new CommonResult(CommonResultEmnu.OK, "支付处理成功,事务消息已发送: " + sendResult.getMsgId());} catch (Exception e) {System.err.println("支付回调处理失败: " + e.getMessage());e.printStackTrace();return new CommonResult(CommonResultEmnu.ERROR, "支付回调处理失败: " + e.getMessage());}}
}
事务监听器
// TransactionListenerImpl.java
package com.platform.base.demo.mq;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {// 模拟数据库表private static final Map<String, String> ORDER_STATUS = new ConcurrentHashMap<>(); // 订单状态private static final Map<String, Integer> PRODUCT_INVENTORY = new ConcurrentHashMap<>(); // 商品库存private static final Map<String, Integer> PRODUCT_SALES = new ConcurrentHashMap<>(); // 商品销量// 事务状态存储private final Map<String, RocketMQLocalTransactionState> transactionStateMap = new ConcurrentHashMap<>();// 失败事务记录(用于补偿)private final Map<String, FailedTransactionInfo> failedTransactions = new ConcurrentHashMap<>();/*** 执行本地事务 - 扣减库存和增加销量*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String transactionId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);String orderId = (String) msg.getHeaders().get("orderId");String productId = (String) msg.getHeaders().get("productId");Integer quantity = 0;System.out.println("=== 事务执行开始 ===");System.out.println("事务ID: " + transactionId);System.out.println("订单ID: " + orderId);System.out.println("消息状态: Half消息已发送到MQ,消费者不可见");try {// 正确处理消息体,支持字节数组和字符串String payload;Object payloadObj = msg.getPayload();if (payloadObj instanceof byte[]) {payload = new String((byte[]) payloadObj, StandardCharsets.UTF_8);} else {payload = payloadObj.toString();}// 解析消息内容String[] parts = payload.split(":");String productIdFromPayload = parts[0];quantity = Integer.valueOf(parts[1]);// 模拟数据库初始化数据PRODUCT_INVENTORY.putIfAbsent(productIdFromPayload, 100);PRODUCT_SALES.putIfAbsent(productIdFromPayload, 0);System.out.println("开始执行库存扣减和销量增加,商品ID: " + productIdFromPayload + ", 数量: " + quantity);// 执行库存扣减Integer currentInventory = PRODUCT_INVENTORY.get(productIdFromPayload);if (currentInventory < quantity) {System.out.println("库存不足,回滚事务。当前库存: " + currentInventory + ", 需要: " + quantity);recordFailedTransaction(transactionId, orderId, productIdFromPayload, quantity, "库存不足");if (transactionId != null) {transactionStateMap.put(transactionId, RocketMQLocalTransactionState.ROLLBACK);}return RocketMQLocalTransactionState.ROLLBACK;}// 模拟可能的业务异常if (shouldSimulateBusinessException()) {throw new RuntimeException("模拟业务处理异常");}// 扣减库存PRODUCT_INVENTORY.put(productIdFromPayload, currentInventory - quantity);System.out.println("库存扣减成功,剩余库存: " + PRODUCT_INVENTORY.get(productIdFromPayload));// 增加销量Integer currentSales = PRODUCT_SALES.get(productIdFromPayload);PRODUCT_SALES.put(productIdFromPayload, currentSales + quantity);System.out.println("销量增加成功,当前销量: " + PRODUCT_SALES.get(productIdFromPayload));// 更新订单状态ORDER_STATUS.put(orderId, "PROCESSED");System.out.println("订单处理完成,订单ID: " + orderId);// 事务执行成功if (transactionId != null) {transactionStateMap.put(transactionId, RocketMQLocalTransactionState.COMMIT);}return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {System.err.println("执行本地事务异常: " + e.getMessage());e.printStackTrace();// 记录失败的事务信息recordFailedTransaction(transactionId, orderId, productId, quantity, e.getMessage());if (transactionId != null) {transactionStateMap.put(transactionId, RocketMQLocalTransactionState.ROLLBACK);}return RocketMQLocalTransactionState.ROLLBACK;}}/*** 检查本地事务状态*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String transactionId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);System.out.println("检查本地事务状态,事务ID: " + transactionId);if (transactionId != null) {RocketMQLocalTransactionState state = transactionStateMap.get(transactionId);if (state != null) {System.out.println("事务状态检查结果: " + state);return state;}}System.out.println("未找到事务状态,返回未知状态");return RocketMQLocalTransactionState.UNKNOWN;}/*** 记录失败的事务信息*/private void recordFailedTransaction(String transactionId, String orderId, String productId,Integer quantity, String errorMessage) {FailedTransactionInfo failedInfo = new FailedTransactionInfo();failedInfo.setTransactionId(transactionId);failedInfo.setOrderId(orderId);failedInfo.setProductId(productId);failedInfo.setQuantity(quantity);failedInfo.setErrorMessage(errorMessage);failedInfo.setFailTime(System.currentTimeMillis());failedTransactions.put(orderId, failedInfo);System.err.println("记录失败事务: " + failedInfo);}/*** 模拟业务异常(用于测试)*/private boolean shouldSimulateBusinessException() {// 5%概率模拟异常return Math.random() < 0.05;}/*** 失败事务信息类*/public static class FailedTransactionInfo {private String transactionId;private String orderId;private String productId;private Integer quantity;private String errorMessage;private Long failTime;// getters and setterspublic String getTransactionId() { return transactionId; }public void setTransactionId(String transactionId) { this.transactionId = transactionId; }public String getOrderId() { return orderId; }public void setOrderId(String orderId) { this.orderId = orderId; }public String getProductId() { return productId; }public void setProductId(String productId) { this.productId = productId; }public Integer getQuantity() { return quantity; }public void setQuantity(Integer quantity) { this.quantity = quantity; }public String getErrorMessage() { return errorMessage; }public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }public Long getFailTime() { return failTime; }public void setFailTime(Long failTime) { this.failTime = failTime; }@Overridepublic String toString() {return "FailedTransactionInfo{" +"transactionId='" + transactionId + '\'' +", orderId='" + orderId + '\'' +", productId='" + productId + '\'' +", quantity=" + quantity +", errorMessage='" + errorMessage + '\'' +", failTime=" + failTime +'}';}}
}
结果
2025-09-10 11:00:06|INFO |com.platform.base.config.Interceptor.AdminInterceptor:41|send GET request to http://127.0.0.1:9205/v1/base/pc/payment/callback
接收到支付成功回调,订单ID: ORDER_20250724001, 商品ID: PRODUCT_001, 数量: 2, 金额: 199.99
发送事务消息处理库存和销量,订单ID: ORDER_20250724001
=== 事务执行开始 ===
事务ID: null
订单ID: ORDER_20250724001
消息状态: Half消息已发送到MQ,消费者不可见
开始执行库存扣减和销量增加,商品ID: PRODUCT_001, 数量: 2
库存扣减成功,剩余库存: 98
销量增加成功,当前销量: 2
订单处理完成,订单ID: ORDER_20250724001
接收到库存处理消息: PRODUCT_001:2,处理时间: Wed Sep 10 11:00:06 GMT+08:00 2025
执行库存扣减,商品ID: PRODUCT_001, 数量: 2
执行销量增加,商品ID: PRODUCT_001, 数量: 2
库存处理完成,商品ID: PRODUCT_001
17. SpringBoot整合RocketMQ
使用
rocketmq-spring-boot-starter
可以无缝集成。
配置简化yaml
# application.yml
rocketmq:name-server: localhost:9876producer:group: my-springboot-producer-group
简洁的发送接收
@RestController
public class OrderController {@Resourceprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/order")public String createOrder() {rocketMQTemplate.convertAndSend("Order_Topic", "订单消息");return "success";}
}@Service
@RocketMQMessageListener(topic = "Order_Topic", consumerGroup = "my-springboot-consumer-group")
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到消息: " + message);}
}
18. 客户端注意事项
-
生产者/消费者组名:在集群内必须唯一
-
消费者线程数:默认20个,根据配置和业务耗时调整
-
消息大小:单条消息建议不超过4MB
-
资源释放:应用退出时调用shutdown()方法
19. 消息可靠性保障
从三个层面保证消息不丢失:
-
生产阶段:同步发送 + 失败重试
-
存储阶段:同步刷盘 + 主从同步
-
消费阶段:消费成功后才手动ACK确认
20. 消息幂等性处理
RocketMQ不直接提供幂等性,需要业务方自行实现:
- 方案一:数据库唯一键
// 利用订单ID唯一性保证幂等
INSERT INTO orders (order_id, ...) VALUES (10001, ...);
- 方案二:Redis原子操作
// 使用setnx操作
Boolean result = redisTemplate.opsForValue().setIfAbsent("order:10001", "1");
if (Boolean.TRUE.equals(result)) {// 第一次处理
} else {// 已处理过,直接返回
}
21. 消息积压处理
当Topic中堆积大量消息时,应急方案:
-
紧急扩容:增加Consumer实例数量(需有足够Queue)
-
提高消费能力:优化业务逻辑,减少单消息处理耗时
-
跳过非重要消息:临时程序消费并丢弃积压消息
22. 死信队列处理
RocketMQ 的死信队列(Dead-Letter Queue, DLQ)用于存放经过最大重试次数(默认16次)后仍然消费失败的消息。
死信队列命名规则
- 死信Topic:%DLQ% + 消费者组名
- 例如:%DLQ%inventory-consumer-group
查看死信消息
-
通过 RocketMQ Dashboard
- 打开 Dashboard(通常是 http://localhost:8080)
- 在 Topic 页面,你会看到所有以
%DLQ%
开头的死信Topic - 点击进入具体的死信Topic,可以查看:
- 消息堆积数量
- 具体的消息内容
- 消息的原始Topic和Tag
-
通过命令行查看
# 查看死信Topic列表
./mqadmin topicList -n localhost:9876 | grep "%DLQ%"# 查询死信队列中的消息
./mqadmin queryMsgByKey -n localhost:9876 -t %DLQ%inventory-consumer-group -k "你的消息Key"
死信监控服务
-
创建死信队列监控服务
// DeadLetterQueueMonitorService.java package com.platform.base.demo.mq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import java.util.List; import java.util.concurrent.atomic.AtomicLong;@Service public class DeadLetterQueueMonitorService {@Value("${rocketmq.name-server:127.0.0.1:9876}")private String nameServer;@Value("${rocketmq.producer.group:payment-producer-group}")private String producerGroup;private DefaultMQPushConsumer dlqConsumer;private AtomicLong dlqMessageCount = new AtomicLong(0);@PostConstructpublic void init() {try {// 初始化死信队列消费者initDeadLetterQueueConsumer();} catch (Exception e) {System.err.println("初始化死信队列监控失败: " + e.getMessage());}}/*** 初始化死信队列消费者*/private void initDeadLetterQueueConsumer() throws MQClientException {// 死信队列的Topic命名规则: %DLQ% + consumerGroupString dlqTopic = "%DLQ%" + "inventory-consumer-group"; // 根据您的消费者组名调整dlqConsumer = new DefaultMQPushConsumer("dlq-monitor-consumer");dlqConsumer.setNamesrvAddr(nameServer);// 订阅死信队列dlqConsumer.subscribe(dlqTopic, "*");// 设置监听器dlqConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {return handleDeadLetterMessages(msgs, context);}});dlqConsumer.start();System.out.println("死信队列监控已启动,监听Topic: " + dlqTopic);}/*** 处理死信消息*/private ConsumeConcurrentlyStatus handleDeadLetterMessages(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {dlqMessageCount.incrementAndGet();System.out.println("=== 检测到死信消息 ===");System.out.println("消息ID: " + msg.getMsgId());System.out.println("Topic: " + msg.getTopic());System.out.println("Tag: " + msg.getTags());System.out.println("Keys: " + msg.getKeys());System.out.println("延迟时间: " + (System.currentTimeMillis() - msg.getBornTimestamp()) + "ms");System.out.println("重试次数: " + msg.getReconsumeTimes());System.out.println("消息体: " + new String(msg.getBody()));// 记录死信消息信息recordDeadLetterMessage(msg);// 根据业务需求决定处理方式processDeadLetterMessage(msg);}// 返回成功,避免死信消息继续堆积return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}/*** 记录死信消息信息*/private void recordDeadLetterMessage(MessageExt msg) {DeadLetterMessageInfo dlqInfo = new DeadLetterMessageInfo();dlqInfo.setMsgId(msg.getMsgId());dlqInfo.setTopic(msg.getTopic());dlqInfo.setTags(msg.getTags());dlqInfo.setKeys(msg.getKeys());dlqInfo.setBody(new String(msg.getBody()));dlqInfo.setBornTimestamp(msg.getBornTimestamp());dlqInfo.setStoreTimestamp(msg.getStoreTimestamp());dlqInfo.setReconsumeTimes(msg.getReconsumeTimes());dlqInfo.setProcessTime(System.currentTimeMillis());// 实际项目中应该保存到数据库或日志系统System.out.println("记录死信消息: " + dlqInfo);}/*** 处理死信消息*/private void processDeadLetterMessage(MessageExt msg) {try {// 根据业务需求处理死信消息// 1. 人工审核// 2. 自动重试特定类型的消息// 3. 发送告警通知// 4. 记录到特殊处理队列String orderId = msg.getProperty("orderId");if (orderId != null) {System.out.println("死信消息关联订单: " + orderId);// 可以触发人工处理流程或发送告警sendAlertForDeadLetter(msg, orderId);}} catch (Exception e) {System.err.println("处理死信消息异常: " + e.getMessage());}}/*** 发送死信告警*/private void sendAlertForDeadLetter(MessageExt msg, String orderId) {System.err.println("发送死信告警 - 订单: " + orderId + ", 消息ID: " + msg.getMsgId());// 实际项目中可以发送邮件、短信或调用监控系统API}/*** 获取死信消息统计*/public long getDeadLetterMessageCount() {return dlqMessageCount.get();}/*** 清理死信消息(根据条件)*/public int cleanDeadLetterMessages(String condition) {System.out.println("清理死信消息,条件: " + condition);// 实际的清理逻辑需要根据具体需求实现// 这里只是示例,实际可能需要操作RocketMQ的管理接口return 0;}@PreDestroypublic void destroy() {if (dlqConsumer != null) {dlqConsumer.shutdown();System.out.println("死信队列监控已关闭");}} }
-
死信消息信息类
// DeadLetterMessageInfo.java package com.platform.base.demo.mq;public class DeadLetterMessageInfo {private String msgId;private String topic;private String tags;private String keys;private String body;private Long bornTimestamp;private Long storeTimestamp;private Integer reconsumeTimes;private Long processTime;// getters and setterspublic String getMsgId() { return msgId; }public void setMsgId(String msgId) { this.msgId = msgId; }public String getTopic() { return topic; }public void setTopic(String topic) { this.topic = topic; }public String getTags() { return tags; }public void setTags(String tags) { this.tags = tags; }public String getKeys() { return keys; }public void setKeys(String keys) { this.keys = keys; }public String getBody() { return body; }public void setBody(String body) { this.body = body; }public Long getBornTimestamp() { return bornTimestamp; }public void setBornTimestamp(Long bornTimestamp) { this.bornTimestamp = bornTimestamp; }public Long getStoreTimestamp() { return storeTimestamp; }public void setStoreTimestamp(Long storeTimestamp) { this.storeTimestamp = storeTimestamp; }public Integer getReconsumeTimes() { return reconsumeTimes; }public void setReconsumeTimes(Integer reconsumeTimes) { this.reconsumeTimes = reconsumeTimes; }public Long getProcessTime() { return processTime; }public void setProcessTime(Long processTime) { this.processTime = processTime; }@Overridepublic String toString() {return "DeadLetterMessageInfo{" +"msgId='" + msgId + '\'' +", topic='" + topic + '\'' +", tags='" + tags + '\'' +", keys='" + keys + '\'' +", body='" + body + '\'' +", bornTimestamp=" + bornTimestamp +", storeTimestamp=" + storeTimestamp +", reconsumeTimes=" + reconsumeTimes +", processTime=" + processTime +'}';} }
-
添加监控接口
@Autowired private DeadLetterQueueMonitorService dlqMonitorService;/*** 查询死信队列统计信息*/ @GetMapping("/dlqStats") public CommonResult getDeadLetterQueueStats() {try {long count = dlqMonitorService.getDeadLetterMessageCount();HashMap<String, Object> result = new HashMap<>();result.put("deadLetterMessageCount", count);result.put("monitorStatus", "RUNNING");return new CommonResult(CommonResultEmnu.OK, result);} catch (Exception e) {return new CommonResult(CommonResultEmnu.ERROR, "查询失败: " + e.getMessage());} }/*** 清理死信队列消息*/ @GetMapping("/cleanDlq") public CommonResult cleanDeadLetterQueue() {try {String param = CommonUtil.getObjFromReq(String.class);HashMap<String, Object> parse = JSON.parseObject(param, HashMap.class);String condition = Optional.ofNullable(parse.get("condition")).map(Object::toString).orElse("all");int cleanedCount = dlqMonitorService.cleanDeadLetterMessages(condition);HashMap<String, Object> result = new HashMap<>();result.put("cleanedCount", cleanedCount);result.put("condition", condition);return new CommonResult(CommonResultEmnu.OK, result);} catch (Exception e) {return new CommonResult(CommonResultEmnu.ERROR, "清理失败: " + e.getMessage());} }
-
配置私信队列参数
rocketmq:name-server: 127.0.0.1:9876producer:group: payment-producer-group# 死信队列相关配置dead-letter-policy:max-redelivery-times: 16 # 最大重试次数dlq-topic: "%DLQ%inventory-consumer-group" # 死信队列Topic
-
完善消费者配置以支持死信队列
// 确保您的消费者配置了适当的重试策略 @RocketMQMessageListener(topic = "inventory-topic",consumerGroup = "inventory-consumer-group",consumeThreadMax = 10,consumeThreadMin = 5,maxReconsumeTimes = 16 // 设置最大重试次数 ) @Service public class InventoryConsumeService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {try {System.out.println("处理库存消息: " + message);// 模拟处理逻辑processInventoryMessage(message);} catch (Exception e) {System.err.println("库存消息处理失败: " + e.getMessage());// 抛出异常触发重试机制throw new RuntimeException("处理失败", e);}}private void processInventoryMessage(String message) {// 实际的业务处理逻辑// 如果处理失败会自动重试,超过重试次数后进入死信队列} }
-
定时监控任务
// DeadLetterQueueScheduledTask.java package com.platform.base.demo.mq;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;@Component public class DeadLetterQueueScheduledTask {@Autowiredprivate DeadLetterQueueMonitorService dlqMonitorService;/*** 定时检查死信队列状态(每小时执行一次)*/@Scheduled(fixedDelay = 3600000)public void checkDeadLetterQueueStatus() {try {long count = dlqMonitorService.getDeadLetterMessageCount();if (count > 0) {System.out.println("死信队列监控报告 - 当前死信消息数量: " + count);// 可以根据数量发送不同级别的告警if (count > 100) {sendHighPriorityAlert(count);}}} catch (Exception e) {System.err.println("检查死信队列状态异常: " + e.getMessage());}}private void sendHighPriorityAlert(long count) {System.err.println("高优先级告警 - 死信消息数量过多: " + count);// 实际项目中发送告警通知} }
-
在主应用类上添加:
@EnableScheduling
死信消息处理策略
- 人工审核干预
- 自动重试特定类型消息
- 发送告警通知
- 记录到特殊处理队列
总结
RocketMQ作为阿里巴巴开源的分布式消息中间件,以其高吞吐、高可用、高可靠的特性,成为企业级应用的首选消息队列解决方案。
在实际项目中,建议根据业务场景选择合适的消息模式,合理配置监控和告警机制,确保消息系统的稳定可靠运行。