分布式专题——39 RocketMQ客户端编程模型
1 回顾
-
RocketMQ的运行架构图:
-
RocketMQ的消息模型:
2 RocketMQ的消息模型
2.1 RocketMQ客户端基本流程
-
依赖配置:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version> </dependency>
-
一个最为简单的消息生产者代码如下:
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {// 初始化消息生产者DefaultMQProducer,并指定生产者组名DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 指定 NameServer 地址producer.setNamesrvAddr("192.168.65.112:9876");// 启动消息生产者服务producer.start();for (int i = 0; i < 2; i++) {try {// 创建消息对象Message,消息由Topic、Tag和消息体(body)组成,其中body是消息内容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息,获取发送结果SendResultSendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}// 消息发送完成后,停止消息生产者服务,释放资源producer.shutdown();} }
-
一个最为简单的消息消费者代码如下:
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 构建消息消费者DefaultMQPushConsumer,必须指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// 指定 NameServer 地址consumer.setNamesrvAddr("192.168.65.112:9876");// 设置从何处开始消费(如CONSUME_FROM_LAST_OFFSET,从最后一个偏移量开始消费)consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅感兴趣的主题Topic和Tag,需与消息的topic等一致consumer.subscribe("TopicTest", "*");// 注册消息回调函数(MessageListenerConcurrently),消费到消息后触发回调consumer.registerMessageListener(new MessageListenerConcurrently() {// 在回调中处理消息(如打印消息内容),并返回消费状态(如CONSUME_SUCCESS表示消费成功)@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者服务,消费者会一直挂起,持续处理消息consumer.start();System.out.print("Consumer Started");} }
-
RocketMQ客户端编程模型相对固定,掌握消息生产者和消费者的固定步骤,对学习其他复杂消息模型有帮助;
-
NameServer很关键,客户端只需指定NameServer地址,无需指定具体的Broker地址。指定NameServer地址有两种方式:
- 在客户端直接指定(如
consumer.setNameSrvAddr("127.0.0.1:9876")
) - 通过读取系统环境变量
NAMESRV_ADDR
指定,但客户端直接指定的方式优先级更高
- 在客户端直接指定(如
2.2 消息确认机制
- RocketMQ要支持互联网金融场景,消息安全有两方面要求:一是生产者要能确保将消息发送到Broker;二是消费者要能确保从Broker上争取获取到消息;
2.2.1 生产者保证消息发送
- 针对消息发送的不确定性,封装了三种发送消息的方式;
2.2.1.1 单向发送
-
消息生产者只管往Broker发送消息,全然不关心Broker端有没有成功接收到消息,类似生产者向Broker发电子邮件,不管Broker是否处理;
-
代码示例:
OnewayProducer
类中,创建DefaultMQProducer
,启动后发送消息,sendOneway
方法无返回值,发送失败生产者无法补救;public class OnewayProducer {public static void main(String[] args)throws Exception{DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.start();Message message = new Message("Order","tag","order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);Thread.sleep(50000);producer.shutdown();} }
-
适用场景:追求消息发送效率、允许消息丢失的业务场景,比如日志。
2.2.1.2 同步发送
-
消息生产者往Broker发送消息后,会阻塞当前线程,等待Broker端的响应结果;
-
代码及反馈:调用
producer.send(msg)
,返回SendResult
,其中SendStatus
枚举类型包含Broker端的各种情况,SEND_OK
表示消息成功发送到Broker,其他枚举值表示消息在Broker端处理失败;SendResult sendResult = producer.send(msg);
public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE, }
-
如果发现发送失败可进行补救(如重新发送);
- 注意:若
SendStatus
不是SEND_OK
,不代表消息一定不会推送给下游消费者,只是Broker端未完全正确处理,重新发送消息最好带唯一系统标识(如业务含义的OrderId
),以便消费者端做幂等判断;
- 注意:若
-
优缺点:能很大程度保证消息发送的安全性,但发送效率较低,若网速慢,同步发送耗时会很长。
2.2.1.3 异步发送
-
生产者向Broker发送消息时,会注册一个回调函数,不等待Broker响应,Broker端有响应数据过来时,自动触发回调函数处理;
-
代码及回调:调用
producer.send(msg, new SendCallback(){...})
,SendCallback
接口有onSuccess
和onException
方法,Broker端返回成功响应时调用onSuccess
,处理消息超时或失败时调用onException
,生产者可在onException
中进行补救;producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();} });
-
注意事项:
- 触发
onException
不一定表示消息不会向消费者推送,若Broker端返回响应信息太慢,超过默认3秒(可通过producer.setSendMsgTimeout
定制)的超时时间,也会触发,超时原因可能有消息太大造成网络拥堵、网速慢、Broker端处理慢等; - 在
SendCallback
对应方法触发前,生产者不能调用shutdown()
方法(停止消息生产者服务),否则若消息处理完之前生产者线程关闭,SendCallback
的执行线程随主线程停止,对应方法无法执行;
- 触发
-
适用与评价:能较好兼容消息安全性以及生产者的高吞吐需求,很多MQ产品(如RabbitMQ、Kafka)都支持,但对消息生产者的主线业务有侵入,使用需根据业务场景考虑。
2.2.1.4 方式选择建议
- RocketMQ的三种发送方式无绝对好坏,需根据业务场景选择;
- 例如:
- 电商下单场景,要优先保证数据安全,可选择同步发送或异步发送;
- 若下单场景并发高、业务繁忙,优先选异步发送,同时对下单服务业务进行优化定制,适应异步发送机制要求,以可靠发送订单消息到RocketMQ。
2.2.2 消费保证收到消息
-
状态确认机制:
-
消费者通过注册消息监听器(
MessageListenerConcurrently
)来处理消息,处理后返回一个枚举值(ConsumeConcurrentlyStatus
),有CONSUME_SUCCESS
(消息处理成功)和RECONSUME_LATER
(消息处理失败,需后续重试)两个选项; -
若返回
CONSUME_SUCCESS
,消息处理结束;若返回RECONSUME_LATER
,Broker会过一段时间再次发起消息重试;
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} });
-
-
消息重试机制:
-
重试次数限制:
- Broker会记录每个消息的重试次数,默认最大重试次数为16次;
- 若消息经过多次重试后,消费者仍无法正常处理,Broker会将该消息推入消费者组对应的死信Topic(类似Windows的垃圾桶,可人工介入补救或删除消息);
-
重试Topic设计:
- 为避免重试消息影响Topic下其他正常消息,Broker会给每个消费者组自动生成对应的重试Topic;
- 需要重试的消息会先移动到该重试Topic中,后续Broker从重试Topic里拿消息往消费者组重新推送,这样就不会阻塞原MessageQueue(具有严格FIFO特性),不影响其他正常消息处理;
-
消费者组基础:
- RocketMQ设定消费者组是订阅主题和消费逻辑相同的服务备份,所以消息重试时,Broker只需往消费者组中任意一个实例推送即可;
- 但客户端实现时,
DefaultMQConsumer
未强制规定消费者组不能重复,若组成消费组的服务订阅主题和消费逻辑不同,RocketMQ不会报错,但消息处理逻辑不一致,会给业务带来麻烦,实际应用需注意;
-
状态返回与业务执行:
- Broker仅通过消费者组返回的状态确定消息是否处理成功,无法知晓消费者组自身业务执行是否正常;
- 因此,实现消费者业务逻辑时,应尽量使用同步方式,保证业务处理完成后再向Broker返回状态,避免用异步方式处理业务逻辑。
-
2.2.3 消费者组自行指定起始消费位点
-
Broker端通过Consumer返回的状态来推进所属消费者组对应的Offset,但存在一种分裂情况:
- Broker决定把哪条消息发给哪个Consumer,Consumer像一个服务员,后厨(Broker)做什么菜,它就得上什么菜;
- 它不知道自己下一个要处理的是什么,也无法提前准备。它只能被动地接收,然后开始处理;
- 这在某些对消息顺序、状态有严格要求的场景下会很麻烦;
-
解决方式:可创建新的消费者组,通过
ConsumerFromWhere
属性指定消费起点,让新消费者组消费之前发送过的历史消息;public enum ConsumeFromWhere {CONSUME_FROM_LAST_OFFSET, // 从对列的最后一条消息开始消费CONSUME_FROM_FIRST_OFFSET, // 从对列的第一条消息开始消费CONSUME_FROM_TIMESTAMP; // 从某一个时间点开始重新消费 }
-
若指定了枚举值
CONSUME_FROM_TIMESTAMP
,就表示从一个具体时间开始消费,具体时间点需通过Consumer的ConsumerTimestamp
属性传入一个表示时间的字符串,例:consumer.setConsumerTimestamp("20131223171201")
-
总结:从客户端角度分析了保证消息安全性的方式,但消息安全是体系化问题,还涉及服务端配合,后续会继续探讨。
2.3 广播消息
-
应用场景:
- 广播模式和集群模式是RocketMQ消费者端处理消息的两种基本模式;
- 集群模式下,一个消息只会被一个消费者组中的多个消费者实例共同处理一次;
- 广播模式下,一个消息会推送给所有消费者实例处理,不再关注消费者组;
-
消费者核心代码:设置消费者为广播模式,启动多个消费者时,这些消费者都会消费一次消息;
consumer.setMessageModel(MessageModel.BROADCASTING);
-
实现思路:
- 默认的集群模式下,Broker端会给每个
ConsumerGroup
维护一个统一的Offset
,保证一个消息在同一个ConsumerGroup
内只会被消费一次; - 广播模式的本质是将
Offset
转移到Consumer端自行保管,包括Offset
的记录以及更新,全部放到客户端。Broker推送消息时,不再管ConsumerGroup
,只要Consumer来拉取消息,就返回对应的消息;
- 默认的集群模式下,Broker端会给每个
-
注意:
- Broker端不维护消费进度,若消费者处理消息失败,将无法进行消息重试;
- Consumer端维护
Offset
的作用是在服务重启时,按照上一次消费的进度,处理后面没有消费过的消息。若Offset
丢失,Consumer依然可以拉取消息,但只能获取丢失Offset
之后的消息; - 例如:生产者发送1 - 10号消息,消费者消费到6号时宕机,重启后Broker已推送完10号消息,若Consumer端维护了
Offset
,可重新申请6 - 10号消息;若Offset
丢失,只能获取10号以后的消息。
2.4 过滤消息
-
应用场景:
- 在同一个Topic下存在多种不同消息时,消费者仅希望关注某一类消息;
- 例如仓储系统的Topic下有入库、出库等不同消息,不同业务消费者需过滤出感兴趣的消息进行业务操作;
-
过滤方式1——简单过滤(基于Tag)
-
生产者端:发送消息时增加Tag属性,比如下面示例代码中循环发送带不同Tag(TagA、TagB、TagC)的消息;
String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 15; i++) {Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult); }
-
消费者端:通过订阅感兴趣的Tag(如TagA),后续仅处理该Tag的消息;
consumer.subscribe("TagFilterTest", "TagA")
-
-
过滤方式2——SQL过滤
-
适用场景:需进行更复杂的消息过滤(如数字比较、模糊匹配等)时使用;
-
生产者端:发送消息时,除Tag属性外,还可增加自定义属性,比如下面示例代码中通过
msg.putUserProperty("a", String.valueOf(i))
添加自定义属性a
;String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 15; i++) {Message msg = new Message("SqlFilterTest",tags[i % tags.length],("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult); }
-
消费者端:指定标准SQL语句定制复杂过滤规则,比如下面示例代码中过滤出
TAGS
在('TagA', 'TagB')
且自定义属性a
在0
到3
之间的消息。同时,若要使用自定义参数过滤,需在Broker端将enablePropertyFilter
参数设置为true
(默认false
);consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));
-
-
实现思路:
-
Tags和用户自定义属性随消息一起传递,消费者端可获取消息的Tags和自定义属性(如通过
msg.getTags()
和msg.getProperties()
);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(msg.getTags());System.out.println(msg.getProperties());}System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} });
-
Broker在往Consumer推送消息时,会在Broker端进行消息过滤,只推送Consumer感兴趣的消息;
-
其中,Tag属性直接匹配,SQL语句通过ANLTR引擎解析后进行过滤(ANLTR是开源SQL语句解析框架,被ShardingSphere、Flink等开源产品使用);
-
-
注意:
-
Tag过滤:若要匹配多个Tag,可用两个竖线(
||
)连接多个Tag值;也可用星号(*
)匹配所有Tag; -
SQL过滤:SQL语句按SQL92标准执行,支持一些常见的基本操作,比如:
- 数值比较(如
>
,>=
,<
,<=
,BETWEEN
,=
) - 字符比较(如
=
,<>
,IN
) IS NULL
/IS NOT NULL
- 逻辑符号
AND
,OR
,NOT
- 数值比较(如
-
过滤端选择:消息过滤在Broker端和Consumer端都可进行,Consumer端可自行获取用户属性,对不感兴趣的消息返回不成功状态跳过,但RocketMQ建议在Broker端完成过滤,可减少不必要的网络IO,不过会加大服务端压力;
-
未被消费消息处理:Consumer不感兴趣的消息不直接丢弃,通常需在同一消费者组定制其他消费者实例消费剩余消息;若一直无其他Consumer,Broker端仍会推进Offset。
-
2.5 顺序消息机制
-
应用场景:在订单业务中,每个订单有下单、锁库存、支付、下物流等业务步骤,每个步骤由消息生产者通知下游服务,需保证每个订单的业务处理顺序不乱;
-
示例代码:
-
生产者核心代码:通过循环创建不同
orderId
和步骤的消息,利用MessageQueueSelector
将orderId
相同的消息转发到同一个MessageQueue
中,确保同一订单的消息在同一队列;for (int i = 0; i < 10; i++) {int orderId = i;for(int j = 0 ; j <= 5 ; j ++){Message msg =new Message("OrderTopicTest","order_" + orderId, "KEY" + orderId,("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);} }
-
消费者核心代码:注册
MessageListenerOrderly
实现类,在消费消息时设置自动提交,遍历消息并打印内容,最后返回ConsumeOrderlyStatus.SUCCESS
表示消费成功;consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true); // 自动提交for(MessageExt msg:msgs){System.out.println("收到消息内容 "+new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS; // 消费成功} });
-
-
实现思路:RocketMQ实现消息顺序消费需要生产者和消费者配合
-
生产者:将一批有顺序要求的消息放到同一个
MessageQueue
上,借助MessageQueue
的FIFO(先进先出)特性保证这一批消息的顺序;若不指定MessageQueueSelector
对象,生产者会采用轮询方式将多条消息依次发送到不同的MessageQueue
上; -
消费者:需要实现
MessageListenerOrderly
接口,在服务端处理该接口时,会给一个MessageQueue
加锁,拿到该MessageQueue
上所有的消息后,再去读取下一个MessageQueue
的消息;
-
-
注意:
-
局部有序与全局有序:大部分业务场景需要的是局部有序,若要保持全局有序,需要只保留一个
MessageQueue
,性能会非常低: -
消息分散:生产者端尽可能将有序消息打散到不同的
MessageQueue
上,避免过于集中导致数据热点竞争; -
消费者重试:消费者端只进行有限次数的重试。如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试;但如果消费者一直处理失败,超过最大重试次数,RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序;
-
异常处理:消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
作为替代。
-
2.6 延迟消息
-
应用场景:延迟消息发送是指消息发送到RocketMQ后,不期望立马投递,而是延迟一定时间后才投递到Consumer进行消费。相比RabbitMQ(需通过死信队列变相实现或加装插件)和Kafka(不太好实现延迟消息),这是RocketMQ很有特色的功能;
-
核心方法:当前版本RocketMQ提供两种实现延迟消息的机制
-
指定固定的延迟级别:生产者端通过
message.setDelayTimeLevel(level)
方法设置,例如message.setDelayTimeLevel(3)
表示10秒后发送;// 指定固定的延迟级别 Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8)); message.setDelayTimeLevel(3); // 10秒之后发送
RocketMQ定制了18个默认的延迟级别,应用可根据业务要求选择对应的延迟级别;
-
指定消息发送时间:生产者端通过
message.setDeliverTimeMs(time)
方法设置,例如message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L)
表示指定10秒之后的时间点发送;// 指定消息发送时间 Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8)); message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L); // 指定10秒之后的时间点发送
-
-
实现思路:
-
指定固定延迟级别的延迟消息:RocketMQ预设一个系统Topic(
SCHEDULE_TOPIC_XXXX
),在这个Topic下预设了18个MessageQueue
,每个队列对应一种延迟级别。通过扫描这18个队列里的消息,进行延迟操作; -
指定时间点的延迟消息:RocketMQ通过时间轮算法实现。
-
2.7 批量消息
-
应用场景:当生产者要发送的消息比较多时,可将多条消息合并成一个批量消息一次性发送出去,以此减少网络IO,提升消息发送的吞吐量;
-
示例代码(生产者核心代码)
// 创建一个ArrayList类型的messages列表,用于存储要发送的消息 List<Message> messages = new ArrayList<>(MESSAGE_COUNT); // 通过循环向messages中添加多条消息,每条消息指定了Topic、Tag、消息内容等 for (int i = 0; i < MESSAGE_COUNT; i++) {messages.add(new Message(TOPIC, TAG, "OrderID" + i, ("Hello world " + i).getBytes(StandardCharsets.UTF_8))); }// 使用ListSplitter将大量消息分割成小的批次 ListSplitter splitter = new ListSplitter(messages); // 在while循环中,每次获取分割后的小批次消息列表listItem,调用producer.send(listItem)方法发送,并打印发送结果sendResult while (splitter.hasNext()) {List<Message> listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s", sendResult); }
-
注意:
-
同一批消息的
Topic
必须相同; -
不支持延迟消息;
-
批量消息的大小不要超过1M,如果太大就需要自行分割;
-
当前版本中,RocketMQ在尝试实现一种自动化的消息分割机制,基于客户端内部一个新增的
ProduceAccumulator
组件,可查看org.apache.rocketmq.client.producer.ProduceAccumulatorTest
的testProduceAccumulator_async
和testProduceAccumulator_sync
方法了解详情。
-
2.8 事务消息
-
应用场景
-
事务消息是RocketMQ的一个高级功能,其基础诉求是通过事务机制保证上下游数据一致性;
-
以电商为例,用户支付订单时涉及下游物流发货、积分变更、购物车状态清空等多个子系统变更,适合用RocketMQ的解耦功能来串联,且要保证相关业务同时成功或同时失败。若直接将多个服务作为分布式事务控制很麻烦,而用RocketMQ串联后,因RocketMQ与消费者端有失败重试机制,消息成功发送到RocketMQ后,各分支步骤(如物流变更、积分变更等)可保证最终数据一致性,此时一个复杂分布式事务问题就简化为“Main Branch1(更新订单)”和“Branch2(发送 RocketMQ 消息触发后续子系统变更)”两个步骤的分布式事务问题;
-
在此基础上,RocketMQ提出事务消息机制,采用两阶段提交思路,保证“Main Branch1”和“Branch2”之间的事务一致性;
-
-
实现思路:
-
生产者将消息发送至Apache RocketMQ服务端;
-
Apache RocketMQ服务端将消息持久化成功后,向生产者返回Ack确认消息发送成功,此时消息被标记为“暂不能投递”,即半事务消息;
-
生产者开始执行本地事务逻辑;
-
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或Rollback):
-
若为Commit,服务端将半事务消息标记为可投递,并投递给消费者;
-
若为Rollback,服务端将回滚事务,不会将半事务消息投递给消费者;
-
-
在断网或生产者应用重启等特殊情况下,若服务端未收到发送者提交的二次确认结果,或收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对生产者集群中任一生产者实例发起消息回查;
-
生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果;
-
生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理;
-
-
示例代码
org.apache.rocketmq.example.transaction.TransactionProducer
:TransactionMQProducer
:RocketMQ 专门用于发送事务消息的生产者类。它继承自普通的DefaultMQProducer
,但添加了事务相关的功能,比如事务监听器的设置、本地事务执行和事务状态回查等机制。通过它来创建事务消息生产者实例,指定生产者组名等信息;TransactionListener
:事务监听器接口,用于定义本地事务的执行逻辑以及事务状态回查的逻辑。在下面代码中通过TransactionListenerImpl
实现该接口,来处理事务消息的本地事务执行和回查操作;
public class TransactionProducer {public static final String PRODUCER_GROUP = "please_rename_unique_group_name";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest1234";public static final int MESSAGE_COUNT = 10;public static void main(String[] args) throws MQClientException, InterruptedException {// 1、创建事务监听器和生产者// 实例化 TransactionListenerImpl 作为事务监听器,用于处理事务相关的逻辑TransactionListener transactionListener = new TransactionListenerImpl();// 创建 TransactionMQProducer 实例,指定生产者组名 PRODUCER_GROUP 和相关主题列表TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));// 2、配置生产者// 设置线程池 executorService,用于处理事务消息的相关任务,比如事务状态的回查等// 线程池的核心线程数、最大线程数等参数可根据实际需求调整ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});// 将事务监听器和线程池设置到 TransactionMQProducer 中,使生产者具备处理事务消息的能力producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);// 启动生产者producer.start();// 定义消息的标签数组 tags,用于区分不同类型的消息String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};// 循环发送多条事务消息for (int i = 0; i < MESSAGE_COUNT; i++) {try {Message msg =new Message(TOPIC, tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送事务消息,该方法会触发事务监听器中本地事务的执行逻辑SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}// 关闭生产者,释放资源producer.shutdown();} }
- 本地事务执行与事务监听器:
- 通过实现
TransactionListener
接口,在executeLocalTransaction
方法中执行本地事务(如数据库操作等),并返回本地事务的状态(提交、回滚或未知); - 在
checkLocalTransaction
方法中处理事务状态回查,当 RocketMQ 服务端在一定时间内未收到事务的二次确认(提交或回滚)时,会调用该方法来检查本地事务的最终状态;
- 通过实现
- 事务状态管理:RocketMQ 服务端会根据事务监听器返回的状态,决定是提交事务消息(使其对消费者可见)还是回滚事务消息(不投递该消息)。若返回未知状态,服务端会在后续进行事务状态回查,以确保事务的最终一致性;
-
注意:
-
半消息特性:半消息对消费者不可见,RocketMQ会将半消息转到系统Topic(
RMQ_SYS_TRANS_HALF_TOPIC
)中; -
本地事务回查机制:
- 回查次数通过
transactionCheckMax
参数设定,默认15次; - 回查间隔通过
transactionCheckInterval
参数设定,默认60秒; - 若超过最大回查次数,消息会被丢弃;
- 回查次数通过
-
了解事务消息机制后,可根据实际情况对事务流程进行适当调整,以更好地适配业务需求。以订单系统为例:
-
-
面试题:
// 找出代码中存在的问题 /*** 创建订单** @param goodsId 商品ID* @param userId 用户ID* @return 订单ID*/ @Transactional(rollbackFor = Exception.class) public Long createOrder(Long goodsId, Long userId) {// 新增订单Order order = new Order();order.setStockCode(stockCode);order.setUserId(userId);order.setGoodsId(goodsId);mapper.insert(order);// 发送MQmq.send("ORDER_CREATE", order.getId());// 同步订单信息给第三方,并且回写第三方IDString thirdId = http.syncThirdSystem(order);order.setThirdSystemId(thirdId);mapper.updateThirdId(thirdId, order.getId());return order.getId(); }
- 若在发送MQ消息或同步第三方信息过程中出现异常,虽然方法上的
@Transactional
会回滚数据库操作(新增订单),但MQ消息可能已经成功发送,这就会导致数据库中订单记录回滚了,而下游服务却收到了订单创建的MQ消息,从而造成数据不一致。因为普通的MQ发送与数据库事务没有结合RocketMQ的事务消息机制,无法保证两者的原子性。
- 若在发送MQ消息或同步第三方信息过程中出现异常,虽然方法上的
2.9 ACL权限控制机制
-
应用场景:RocketMQ提供针对队列、用户等不同维度的全面权限管理机制。通常作为内部服务时无需权限控制,但进行跨部门甚至跨公司合作时,权限控制的重要性就会凸显;
-
权限控制体系有:
- Topic权限控制
- Broker端权限控制
2.9.1 Topic权限控制
-
RocketMQ针对每个Topic有完整的权限控制,在控制平台可方便为每个Topic配置权限。
perm
字段表示Topic的权限,有三个可选项:2
:禁写禁订阅4
:可订阅,不能写6
:可写可订阅
2.9.2 Broker端权限控制
-
开启ACL:在
broker.conf
中设置aclEnable=true
,开启ACL标志; -
配置文件:使用Broker提供的
plain_acl.yml
进行权限配置,且该配置文件是热加载的,修改配置无需重启Broker服务; -
plain_acl.yml
配置内容:# 全局白名单,不受ACL控制,通常将主从架构中的所有节点加入 globalWhiteRemoteAddresses: - 10.10.103.* - 192.168.0.*# 账户配置 accounts: # 第一个账户 - accessKey: RocketMQsecretKey: 12345678whiteRemoteAddress: # 白名单IPadmin: false # 是否为管理员,管理员可访问所有资源defaultTopicPerm: DENY # 默认Topic访问策略(拒绝)defaultGroupPerm: SUB # 默认Group访问策略(只允许订阅)# 各Topic的权限topicPerms:- topicA=DENY # topicA拒绝- topicB=PUB|SUB # topicB允许发布和订阅消息- topicC=SUB # topicC只允许订阅# 各Group的权限groupPerms:- groupA=DENY- groupB=PUB|SUB- groupC=SUB # 第二个账户,只要是来自192.168.1.*的IP,就可以访问所有资源 - accessKey: rocketmq2secretKey: 12345678whiteRemoteAddress: 192.168.1.*admin: true
-
客户端使用
-
依赖引入:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.1</version> </dependency>
-
身份提交:通过
accessKey
和secretKey
提交身份信息。在声明客户端(如DefaultMQProducer
)时,传入一个RPCHook
(通过AclClientRPCHook
,结合SessionCredentials
传入ACL_ACCESS_KEY
和ACL_SECRET_KEY
);// 声明时传入RPCHook DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());private static final String ACL_ACCESS_KEY = "RocketMQ"; private static final String ACL_SECRET_KEY = "1234567"; static RPCHook getAclRPCHook() {return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY)); }
-
3 SpringBoot整合RocketMQ
3.1 快速实战
-
引入依赖:
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>3.0.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>3.0.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency> </dependencies>
-
启动类:
@SpringBootApplication public class RocketMQSBApplication {public static void main(String[] args) {SpringApplication.run(RocketMQSBApplication.class,args);} }
-
配置文件:
rocketmq.name-server=192.168.65.112:9876 rocketmq.producer.group=springBootGroup# 如果在此处的生产者配置不配,那就需要在消费者配置中配 # rocketmq.consumer.topic= rocketmq.consumer.group=testGroup server.port=9000
-
声明生产者,直接使用RocketMQTemplate进行消息发送
这个
rocketMQTemplate
不光可以发消息,还可以主动拉消息;如果需要拉取消息,要配置
rocketmq.consumer.topic和rocketmq.consumer.group
参数@Component public class SpringProducer {@Resourceprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);} }
-
声明消费者,所有属性通过
@RocketMQMessageListener
注解声明@Component @RocketMQMessageListener(consumerGroup = "MyConsumerGroup",topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY,messageModel= MessageModel.BROADCASTING ) public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received message : " + message);} }
-
需要注意的是,SpringBoot 框架对消息的封装与原生 API 的消息封装不同。
3.2 如何处理各种消息类型
-
基础消息发送机制:
- 同步发送:等待发送结果返回
- 异步发送:通过回调函数处理发送结果
- 单向发送:不关心发送结果
- 顺序消息:通过hashKey保证消息顺序
- 延迟消息:指定延迟级别实现定时发送
- 批量消息:一次性发送多条消息
- 请求-响应模式:发送消息并等待响应
- 消息过滤:通过TAG实现消息分类
@RunWith(SpringRunner.class) @SpringBootTest public class SpringRocketTest {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Testpublic void sendMessageTest(){String springTopic="TestTopic";// 同步发送字符串消息到指定主题SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);// 同步发送User对象消息,RocketMQ会自动序列化对象sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);// 同步发送带消息头的User对象消息,明确指定内容类型为JSONsendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);// 异步发送订单支付事件对象消息,通过回调函数处理发送结果rocketMQTemplate.asyncSend(springTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {@Overridepublic void onSuccess(SendResult var1) {System.out.printf("async onSucess SendResult=%s %n", var1);}@Overridepublic void onException(Throwable var1) {System.out.printf("async onException Throwable=%s %n", var1);}});// 发送带TAG的消息,TAG用于消息过滤,不同TAG的消息可以被不同的消费者处理rocketMQTemplate.convertAndSend(springTopic + ":tag0", "I'm from tag0"); // tag0 will not be consumer-selectedSystem.out.printf("syncSend topic %s tag %s %n", springTopic, "tag0");rocketMQTemplate.convertAndSend(springTopic + ":tag1", "I'm from tag1");System.out.printf("syncSend topic %s tag %s %n", springTopic, "tag1");// 同步发送请求消息并等待String类型的响应(请求-响应模式)String replyString = rocketMQTemplate.sendAndReceive(springTopic, "request string", String.class);System.out.printf("send %s and receive %s %n", "request string", replyString);// 同步发送请求消息并等待byte[]类型的响应,设置超时时间为3000毫秒byte[] replyBytes = rocketMQTemplate.sendAndReceive(springTopic, MessageBuilder.withPayload("request byte[]").build(), byte[].class, 3000);System.out.printf("send %s and receive %s %n", "request byte[]", new String(replyBytes));// 同步发送带hashKey的请求消息(用于保证消息顺序),并等待User类型的响应User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");User requestUser2 = new User().setUserAge((byte) 9).setUserName("requestUserName");User requestUser3 = new User().setUserAge((byte) 9).setUserName("requestUserName");User requestUser4 = new User().setUserAge((byte) 9).setUserName("requestUserName");User replyUser = rocketMQTemplate.sendAndReceive(springTopic, requestUser, User.class, "order-id");User replyUser2 = rocketMQTemplate.sendAndReceive(springTopic, requestUser2, User.class, "order-id");User replyUser3 = rocketMQTemplate.sendAndReceive(springTopic, requestUser3, User.class, "order-id");User replyUser4 = rocketMQTemplate.sendAndReceive(springTopic, requestUser4, User.class, "order-id");System.out.printf("send %s and receive %s %n", requestUser, replyUser);// 同步发送带延迟级别的消息(延迟消息),并返回泛型结果ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(springTopic, "request generic",new TypeReference<ProductWithPayload<String>>() {}.getType(), 30000, 2); // 延迟级别2,超时时间30秒System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);// 异步发送请求消息并通过回调处理String类型的响应rocketMQTemplate.sendAndReceive(springTopic, "request string", new RocketMQLocalRequestCallback<String>() {@Override public void onSuccess(String message) {System.out.printf("send %s and receive %s %n", "request string", message);}@Override public void onException(Throwable e) {e.printStackTrace();}});// 异步发送User对象请求消息并通过回调处理响应,设置超时时间为5000毫秒rocketMQTemplate.sendAndReceive(springTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {@Override public void onSuccess(User message) {System.out.printf("send user object and receive %s %n", message.toString());}@Override public void onException(Throwable e) {e.printStackTrace();}}, 5000);// 发送批量消息:构建10条消息的列表,每条消息都设置唯一的KEYList<Message> msgs = new ArrayList<Message>();for (int i = 0; i < 10; i++) {msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());}// 同步发送批量消息,设置超时时间为60秒SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);System.out.printf("--- Batch messages send result :" + sr);} }
-
多 Topic 消息发送:一个
RocketMQTemplate
实例默认只包含一个生产者,也只能往一个 Topic 下发送消息。如果需要往另外一个 Topic 下发送消息,就需要通过@ExtRocketMQTemplateConfiguration()
注解另外声明一个子类实例; -
事务消息机制:最关键的事务监听器需要通过
@RocketMQTransactionListener
注解注入到 Spring 容器中,在这个注解中可以通过rocketMQTemplateBeanName
属性,指向具体的RocketMQTemplate
子类。
3.3 实现原理
3.3.1 RocketMQTemplate的注入过程
- 条件化配置:根据类路径和配置属性决定是否创建RocketMQ相关bean
- 生产者配置:创建DefaultMQProducer并设置各种参数(超时、重试、消息大小等)
- 消费者配置:创建DefaultLitePullConsumer并设置订阅信息
- 模板创建:创建RocketMQTemplate,它是Spring集成RocketMQ的核心工具类
- 依赖注入:将生产者和消费者注入到模板中,提供统一的消息操作接口
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package org.apache.rocketmq.spring.autoconfigure;import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.ConfigurationCondition.ConfigurationPhase;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;// RocketMQ自动配置类,负责自动创建和配置RocketMQ相关组件
@Configuration
@EnableConfigurationProperties({RocketMQProperties.class}) // 启用配置属性绑定
@ConditionalOnClass({MQAdmin.class}) // 类路径下存在MQAdmin类时才生效
@ConditionalOnProperty(prefix = "rocketmq",value = {"name-server"},matchIfMissing = true // name-server属性存在或缺失时都生效
)
@Import({ // 导入其他配置类MessageConverterConfiguration.class, // 消息转换器配置ListenerContainerConfiguration.class, // 监听器容器配置ExtProducerResetConfiguration.class, // 扩展生产者重置配置ExtConsumerResetConfiguration.class, // 扩展消费者重置配置RocketMQTransactionConfiguration.class, // 事务消息配置RocketMQListenerConfiguration.class // 消息监听器配置
})
@AutoConfigureAfter({MessageConverterConfiguration.class}) // 在消息转换器配置之后自动配置
@AutoConfigureBefore({RocketMQTransactionConfiguration.class}) // 在事务配置之前自动配置
public class RocketMQAutoConfiguration implements ApplicationContextAware {private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class);public static final String ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME = "rocketMQTemplate"; // RocketMQTemplate的默认bean名称public static final String PRODUCER_BEAN_NAME = "defaultMQProducer"; // 默认生产者的bean名称public static final String CONSUMER_BEAN_NAME = "defaultLitePullConsumer"; // 默认消费者的bean名称private final Environment environment; // Spring环境对象,用于读取配置属性private ApplicationContext applicationContext; // Spring应用上下文public RocketMQAutoConfiguration(Environment environment) {this.environment = environment;this.checkProperties(); // 构造函数中检查必要属性}public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext; // 保存应用上下文引用}// 检查必要的配置属性public void checkProperties() {String nameServer = (String)this.environment.getProperty("rocketmq.name-server", String.class);log.debug("rocketmq.nameServer = {}", nameServer);if (nameServer == null) {log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!");}}// 创建默认MQ生产者bean@Bean({"defaultMQProducer"})@ConditionalOnMissingBean({DefaultMQProducer.class}) // 当没有DefaultMQProducer bean时才创建@ConditionalOnProperty(prefix = "rocketmq",value = {"name-server", "producer.group"} // 必须配置name-server和producer.group属性)public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();String nameServer = rocketMQProperties.getNameServer();String groupName = producerConfig.getGroup();Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); // 验证nameServer不为空Assert.hasText(groupName, "[rocketmq.producer.group] must not be null"); // 验证groupName不为空String accessChannel = rocketMQProperties.getAccessChannel();String ak = rocketMQProperties.getProducer().getAccessKey();String sk = rocketMQProperties.getProducer().getSecretKey();boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();// 使用工具类创建默认MQ生产者DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);producer.setNamesrvAddr(nameServer); // 设置NameServer地址if (StringUtils.hasLength(accessChannel)) {producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); // 设置访问通道}// 设置生产者各种配置参数producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());producer.setMaxMessageSize(producerConfig.getMaxMessageSize());producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());producer.setUseTLS(producerConfig.isTlsEnable());if (StringUtils.hasText(producerConfig.getNamespace())) {producer.setNamespace(producerConfig.getNamespace()); // 设置命名空间}if (StringUtils.hasText(producerConfig.getNamespaceV2())) {producer.setNamespaceV2(producerConfig.getNamespaceV2()); // 设置V2命名空间}producer.setInstanceName(producerConfig.getInstanceName()); // 设置实例名称log.info("a producer ({}) init on namesrv {}", groupName, nameServer);return producer;}// 创建默认轻量级拉取消费者bean@Bean({"defaultLitePullConsumer"})@ConditionalOnMissingBean({DefaultLitePullConsumer.class}) // 当没有DefaultLitePullConsumer bean时才创建@ConditionalOnProperty(prefix = "rocketmq",value = {"name-server", "pull-consumer.group", "pull-consumer.topic"} // 必须配置name-server、group和topic属性)public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties) throws MQClientException {RocketMQProperties.PullConsumer consumerConfig = rocketMQProperties.getPullConsumer();String nameServer = rocketMQProperties.getNameServer();String groupName = consumerConfig.getGroup();String topicName = consumerConfig.getTopic();Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");String accessChannel = rocketMQProperties.getAccessChannel();MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel()); // 消息模式(广播/集群)SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType()); // 选择器类型String selectorExpression = consumerConfig.getSelectorExpression(); // 选择表达式String ak = consumerConfig.getAccessKey();String sk = consumerConfig.getSecretKey();int pullBatchSize = consumerConfig.getPullBatchSize(); // 拉取批量大小boolean useTLS = consumerConfig.isTlsEnable();// 使用工具类创建轻量级拉取消费者DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel, groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace()); // 设置消息轨迹litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic()); // 设置自定义轨迹主题if (StringUtils.hasText(consumerConfig.getNamespace())) {litePullConsumer.setNamespace(consumerConfig.getNamespace()); // 设置命名空间}if (StringUtils.hasText(consumerConfig.getNamespaceV2())) {litePullConsumer.setNamespaceV2(consumerConfig.getNamespaceV2()); // 设置V2命名空间}litePullConsumer.setInstanceName(consumerConfig.getInstanceName()); // 设置实例名称log.info("a pull consumer({} sub {}) init on namesrv {}", new Object[]{groupName, topicName, nameServer});return litePullConsumer;}// 创建RocketMQTemplate bean - 这是核心的Spring集成模板类@Bean(destroyMethod = "destroy" // 指定销毁方法)@Conditional({ProducerOrConsumerPropertyCondition.class}) // 自定义条件:生产者或消费者存在@ConditionalOnMissingBean(name = {"rocketMQTemplate"} // 当没有名为rocketMQTemplate的bean时才创建)public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) {RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();// 如果存在生产者bean,则设置到模板中if (this.applicationContext.containsBean("defaultMQProducer")) {rocketMQTemplate.setProducer((DefaultMQProducer)this.applicationContext.getBean("defaultMQProducer"));}// 如果存在消费者bean,则设置到模板中if (this.applicationContext.containsBean("defaultLitePullConsumer")) {rocketMQTemplate.setConsumer((DefaultLitePullConsumer)this.applicationContext.getBean("defaultLitePullConsumer"));}// 设置消息转换器rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());return rocketMQTemplate;}// 自定义条件类:检查生产者或消费者是否存在static class ProducerOrConsumerPropertyCondition extends AnyNestedCondition {public ProducerOrConsumerPropertyCondition() {super(ConfigurationPhase.REGISTER_BEAN); // 在bean注册阶段检查}// 条件1:存在DefaultLitePullConsumer bean@ConditionalOnBean({DefaultLitePullConsumer.class})static class DefaultLitePullConsumerExistsCondition {DefaultLitePullConsumerExistsCondition() {}}// 条件2:存在DefaultMQProducer bean@ConditionalOnBean({DefaultMQProducer.class})static class DefaultMQProducerExistsCondition {DefaultMQProducerExistsCondition() {}}}
}
3.3.2 Push模式
-
Push模式对于
@RocketMQMessageListener
注解的处理方式,入口在rocketmq-spring-boot-2.3.1.jar
中的org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration
类中:- 该配置类会向Spring容器中注入
RocketMQMessageListenerContainerRegistrar
对象; - 注入时,会根据
RocketMQProperties
等参数来创建RocketMQMessageListenerContainerRegistrar
,用于后续容器的注册等操作;
@Configuration @ConditionalOnMissingBean(RocketMQMessageListenerContainerRegistrar.class) public class ListenerContainerConfiguration {@Beanpublic RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties);} }
- 该配置类会向Spring容器中注入
-
注入
RocketMQMessageListenerContainerRegistrar
后,rocketmq-spring-boot-2.3.1.jar
中会另外注入一个RocketMQMessageListenerBeanPostProcessor
对象,该对象实现了SmartLifecycle
接口,在初始化完成后,会调用自身的start
方法,进而调用RocketMQMessageListenerContainerRegistrar
的startContainer
方法;@Override public void start() {if (!isRunning()) {this.setRunning(true);listenerContainerRegistrar.startContainer();} }
-
startContainer
方法:遍历所有的DefaultRocketMQListenerContainer
容器,对未运行的容器调用start
方法启动。若启动过程中出现异常,会进行日志记录并抛出运行时异常;public void startContainer() {for (DefaultRocketMQListenerContainer container : containers) {if (!container.isRunning()) {try {container.start();} catch (Exception e) {log.error("Started container failed. {}", container, e);throw new RuntimeException(e);}}} }
- 这个**
DefaultRocketMQListenerContainer
**实际上是对RocketMQ原生DefaultMQPushConsumer
的封装。其start
方法本质上是启动一个RocketMQ的消费者;
- 这个**
-
创建
DefaultMQPushConsumer
实例的关键逻辑在DefaultRocketMQListenerContainer
的afterPropertiesSet
方法中。该方法会调用initRocketMQPushConsumer
方法来初始化消费者:public void afterPropertiesSet() throws Exception {initRocketMQPushConsumer();this.messageType = getMessageType();this.methodParameter = getMethodParameter();log.debug("RocketMQ messageType: {}", messageType); }
private void initRocketMQPushConsumer() throws MQClientException {// ……// 1、根据是否配置了RPC Hook来创建不同的消费者实例// RPC Hook用于消息发送和消费的拦截处理,如认证、监控等if (Objects.nonNull(rpcHook)) {// 使用RPC Hook创建消费者,包含消息队列分配策略和消息轨迹配置consumer = new DefaultMQPushConsumer(consumerGroup,rpcHook,new AllocateMessageQueueAveragely(), // 平均分配消息队列策略enableMsgTrace, // 是否启用消息轨迹this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()) // 解析自定义轨迹主题,支持占位符);consumer.setVipChannelEnabled(false); // 禁用VIP通道,避免连接VIP端口} else {// 没有配置RPC Hook时创建基础消费者实例log.debug("Access-key or secret-key not configure in " + this + ".");consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, // 是否启用消息轨迹this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()) // 解析自定义轨迹主题);}// 设置消费者实例名称,基于NameServer地址生成唯一标识// 这有助于在多个实例中区分不同的消费者consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));// ……// 2、根据配置的消息模式设置消费者的消息模型// 消息模式决定消息在消费者组中的分发方式switch (messageModel) {case BROADCASTING:// 广播模式:每条消息会被消费者组中的每个消费者实例都消费一次consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);break;case CLUSTERING:// 集群模式:每条消息只会被消费者组中的某一个消费者实例消费一次// 这是默认模式,用于实现负载均衡consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException("Property 'messageModel' was wrong.");}// 设置消费者的其他配置属性,如NameServer地址、消费线程数、超时时间等// ……// 3、根据消费模式设置消息监听器// 消费模式决定消息的处理方式(顺序或并发)switch (consumeMode) {case ORDERLY:// 顺序消费模式:保证同一个消息队列中的消息按顺序处理// 适用于需要严格顺序的业务场景,如订单状态变更consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:// 并发消费模式:消息可以并行处理,不保证顺序但吞吐量更高// 适用于对顺序不敏感的高吞吐场景consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");} }
3.3.3 Pull模式
-
Pull模式通过在
RocketMQTemplate
实例中注入DefaultLitePullConsumer
实例来实现。注入并启动该实例后,可通过RocketMQTemplate
的receive
方法调用DefaultLitePullConsumer
的poll
方法,主动拉取消息; -
初始化
DefaultLitePullConsumer
的代码在rocketmq-spring-boot-2.3.1.jar
包中,处理类是org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
,该配置类通过SpringBoot的自动装载机制(配置在jar包的spring.factories
文件中)加载进来;// 通过@Bean注解创建DefaultLitePullConsumer Bean @Bean(CONSUMER_BEAN_NAME) // 使用@ConditionalOnMissingBean确保仅在没有该 Bean 时创建 @ConditionalOnMissingBean(DefaultLitePullConsumer.class) // @ConditionalOnProperty解析 SpringBoot 配置属性(前缀为rocketmq,需配置name-server、consumer.group、consumer.topic等) @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"}) public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)throws MQClientException {// 从RocketMQProperties获取消费者配置、nameServer、groupName、topicName等RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();String nameServer = rocketMQProperties.getNameServer();String groupName = consumerConfig.getGroup();String topicName = consumerConfig.getTopic();// 通过断言确保这些配置项不为空Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");// ……// 创建消费者 DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);// 并进行相关属性设置(如是否启用消息轨迹、自定义轨迹主题、命名空间等)litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());litePullConsumer.setNamespace(consumerConfig.getNamespace());return litePullConsumer; }
-
RocketMQUtil.createDefaultLitePullConsumer
方法维护DefaultLitePullConsumer
实例,该实例是RocketMQ原生API中提供的拉模式客户端; -
实际开发中Pull模式用得较少,但RocketMQ对拉模式做了很多优化。原本有
DefaultMQPullConsumer
类用于拉模式消息消费,DefaultLitePullConsumer
在此基础上做了诸多优化。
3.3.4 两种模式对比
3.3.4.1 实现原理
- Push模式:
- 本质上是一种长轮询机制,基于
DefaultMQPushConsumer
实现。在Spring Boot整合中,RocketMQMessageListenerContainerRegistrar
等配置类和RocketMQMessageListenerBeanPostProcessor
等后置处理器协同工作,最终创建并启动DefaultRocketMQListenerContainer
,而它实际上是对DefaultMQPushConsumer
的封装 ; - 消费者启动后,会不断向Broker端请求消息;
- 如果Broker端有可消费的消息,就会立即推送给消费者;
- 如果没有,Broker会保持这个请求一段时间(长轮询等待),期间一旦有新消息到达,就会立刻将消息推送给消费者;
- 本质上是一种长轮询机制,基于
- Pull模式:
- 基于
DefaultLitePullConsumer
实现,通过在RocketMQTemplate
实例中注入DefaultLitePullConsumer
实例来使用; - 消费者主动调用
poll
方法从Broker拉取消息,类似于主动去询问Broker “有没有消息可以给我” ,如果有消息则拉取回来,没有则返回空或等待一段时间后再次尝试拉取。
- 基于
3.3.4.2 使用场景
- Push模式:
- 适用于对消息实时性要求较高的场景,比如订单支付成功后的消息通知,物流状态变更通知等,消费者需要尽快收到消息并进行处理;
- 当消息处理逻辑相对简单,消费者能够及时处理接收到的消息,不会因为大量消息的突然涌入而导致处理能力瓶颈时,Push模式能很好地满足需求;
- Pull模式:
- 适用于消息处理能力不稳定,或者需要根据自身处理能力来灵活控制消息获取节奏的场景。例如,在一些复杂的业务处理中,消费者处理消息的速度会受到多种因素影响(如依赖的外部服务响应时间),Pull模式可以让消费者根据自身的负载情况,合理地控制拉取消息的频率和数量;
- 对于一些资源有限的系统,Pull模式可以避免因Broker大量推送消息而导致系统资源耗尽的情况。
3.3.4.3 优点
- Push模式:
- 消息实时性高,消费者能迅速接收到新消息,保证业务处理的及时性;
- 代码实现相对简单,开发人员无需过多关注消息拉取的频率和时机,框架会自动处理;
- Pull模式:
- 消费者可自主控制消息拉取节奏,根据自身的处理能力调整拉取频率和数量,避免因消息积压导致处理不过来的情况,对系统资源的管理更加灵活;
- 适用于复杂业务场景下的消息消费,能更好地应对消息处理速度不稳定的情况。
3.3.4.4 缺点
- Push模式:
- 如果消费者处理消息的速度跟不上Broker推送消息的速度,容易造成消息积压,进而可能导致消费者端的内存占用过高,甚至系统崩溃;
- 对消费者的处理能力要求较高,需要提前评估好消费者的最大处理能力,以应对可能出现的消息高峰;
- Pull模式:
- 如果拉取频率设置不合理,可能会导致消息处理延迟,影响消息的实时性。例如,拉取间隔设置过长,会使消息不能及时被处理;
- 代码实现相对复杂,需要开发者自行处理消息拉取的逻辑,包括设置拉取间隔、处理拉取不到消息的情况等。
3.3.4.5 消息确认机制
- Push模式:消费者在处理完消息后,会根据配置自动或手动向Broker发送消息确认,告知Broker该消息已被成功处理,以便Broker进行消息删除或标记等操作;
- Pull模式:消费者拉取到消息并处理完成后,也需要向Broker发送确认,表明消息已处理,否则Broker可能会认为消息未被消费而再次推送。但与Push模式相比,Pull模式下消费者对消息确认的控制更加自主,可以根据业务逻辑在合适的时机进行确认。
4 RocketMQ客户端注意事项
4.1 消息的ID、Key和Tag
-
有个小细节需要注意,Producer端发送的是Message对象,而Consumer消费端处理的却是MessageExt对象。也就是说,虽然都是传递消息,但是Consumer端拿到的信息会比Producer端发送的消息更多。这里就有几个重点的参数需要理解,即MessageId,Key和Tag;
-
消息的ID(MessageId)
-
Producer发送的
Message
对象没有msgId
属性,Broker端接收到Producer发送的消息后,会给每条消息单独分配一个唯一的msgId
,可作为消息的唯一键使用; -
但由于客户端不清楚
msgId
的生成机制,且RocketMQ内部针对批量消息、事务消息等特殊消息机制有特殊的msgId
分配机制,所以在复杂业务场景下,不建议使用msgId
作为消息的唯一索引,建议采用Key
属性自行指定业务层面的唯一索引;
-
-
Key属性
-
Producer发送
Message
消息时,Message
对象本身没有key
属性,设置的key
是以RocketMQ中消息的补充属性形式插入的(通过putProperty
方法,将key
相关信息存入消息的properties
集合中);public void setKeys(String keys) {this.putProperty(MessageConst.PROPERTY_KEYS, keys); }void putProperty(final String name, final String value) {if (null == this.properties) {this.properties = new HashMap<>();}this.properties.put(name, value); }
-
key
属性本质是Message
中的补充信息,也可以像使用key
一样往消息中添加自定义属性,RocketMQ内部也大量运用了这些自定义属性(可参考源码中的MessageConst
类); -
建议在业务中添加带有业务唯一性的数据作为
MessageId
的补充,RocketMQ基于Keys
属性实现了消息溯源、消息压缩等一系列功能;
-
-
Tag属性
-
Tag
属性是Producer发送的Message
对象的固有属性,主要作用是进行消息过滤; -
RocketMQ的服务端会把消息的
Tag
信息以某种形式(如hashCode
)写入到检索消息的ConsumeQueue
索引中,这样Consumer消费消息时,可通过过滤ConsumeQueue
索引中的Tag
属性,快速找到自己感兴趣的消息,因此通过Tag
进行消息过滤性能非常高,这也是官方推荐的最佳实践。
-
4.2 最佳实践
-
一个应用尽量用一个Topic,消息子类型用tags标识(可由应用自由设置)。只有生产者发送消息时设置tags(如
message.setTags("TagA")
),消费方订阅时才可利用tags通过broker做消息过滤; -
Kafka因Topic过多会导致Partition文件过多,进而影响性能,而RocketMQ的Topic虽不影响消息转发性能,但过多会加大元数据维护的性能消耗,所以要合理分配Topic;
-
使用Tag区分消息时,尽量直接用Tag过滤,避免复杂SQL过滤。因为消息过滤虽能减少网络IO,但会加大Broker端的消息处理压力,所以过滤逻辑越简单越好。
4.3 消费者端进行幂等控制
- 在RocketMQ中,消息幂等有三种实现语义:
- at most once(最多一次):每条消息最多被消费一次。RocketMQ可通过异步发送、
sendOneWay
等方式保证; - at least once(至少一次):每条消息至少被消费一次。RocketMQ可通过同步发送、事务消息等方式保证;
- exactly once(刚好一次):每条消息只确定消费一次。这是MQ中最理想也最难保证的语义,RocketMQ只能保证
at least once
,无法保证exactly once
,需业务系统自行保证消息幂等性,官网也明确说明RocketMQ确保所有消息至少传递一次,大多数情况下消息不重复;
- at most once(最多一次):每条消息最多被消费一次。RocketMQ可通过异步发送、
- 消息幂等的必要性。在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
- 发送时消息重复:消息成功发送到服务端并持久化后,因网络中断或客户端宕机导致服务端应答失败,生产者重试发送,消费者会收到两条内容和Message ID都相同的消息;
- 投递时消息重复:消息投递到消费者并完成业务处理后,客户端给服务端反馈应答时网络中断,为保证至少消费一次,服务端网络恢复后会再次投递已处理的消息,消费者会收到重复消息;
- 负载均衡时消息重复:RocketMQ的Broker或客户端重启、扩容/缩容时触发Rebalance,消费者可能收到重复消息;
- 处理方式
- 从上面的分析中可知,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性;
- RocketMQ每条消息有唯一的MessageId,但它无法保证全局唯一,存在冲突可能。所以在幂等性要求严格的场景,最好用业务上唯一的标识(如订单ID),可通过Message的Key传递该业务标识。
4.4 关注错误消息重试
-
当RocketMQ消费者端处理消息失败时,Broker会重新投递消息,且为每个消费者组创建对应的重试队列,重试消息进入“%RETRY% + ConsumeGroup”的队列中。通过关注重试队列,能及时了解消费者端的运行情况,若队列中有大量消息,说明消费者运行出现问题,需及时干预;
-
RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间不同,且重试时间跟延迟消息的延迟级别相对应,取的是延迟级别的后16级别,具体重试次数与间隔时间如下:
可以将源码中
org.apache.rocketmq.example.quickstart.Consumer
里的消息监听器返回状态改为RECONSUME_LATER
来测试重试时间;重试次数 与上次重试的间隔时间 重试次数 与上次重试的间隔时间 1 10秒 9 7分钟 2 30秒 10 8分钟 3 1分钟 11 9分钟 4 2分钟 12 10分钟 5 3分钟 13 20分钟 6 4分钟 14 30分钟 7 5分钟 15 1小时 8 6分钟 16 2小时 -
如果消息重试16次后仍然失败,消息将不再投递,转为进入死信队列;
-
RocketMQ的重试次数可以定制,例如通过
consumer.setMaxReconsumeTimes(20);
将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时; -
消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效,并且最后启动的Consumer会覆盖之前启动的Consumer的配置。
4.5 手动处理死信队列
-
当一条消息消费失败,RocketMQ会自动进行消息重试。若消息超过最大重试次数,RocketMQ会认为该消息有问题,但不会立刻丢弃,而是将其发送到对应消费者组的特殊队列——死信队列(名称为
%DLQ% + ConsumeGroup
); -
通常,消息进入死信队列意味着消费处理过程出现严重且无法自行恢复的错误,一般需要人工查看死信队列中的消息,排查错误原因,之后对死信消息进行处理,比如转发到正常Topic重新消费,或者丢弃;
-
死信队列的特征:
-
一个死信队列对应一个
ConsumeGroup
,而非某个消费者实例; -
若一个
ConsumeGroup
未产生死信消息,RocketMQ不会为其创建死信队列; -
一个死信队列包含对应
ConsumeGroup
里的所有死信消息,不管消息属于哪个Topic; -
死信队列中的消息不会再被消费者正常消费;
-
死信队列的有效期和正常消息相同,默认3天,对应
broker.conf
中的fileReservedTime
属性,超过该时间的消息会被删除,不管是否消费过;
-
-
默认创建的死信队列,里面的消息无法读取,在控制台和消费者中都不可读。这是因为默认死信队列的权限
perm
被设置成了2(2代表禁读,4代表禁写,6代表可读可写),需要手动将死信队列的权限配置成6,才能被消费(可通过mqadmin
指定或者web控制台操作)。