RocketMQ【入门实践】
RocketMQ 入门实践
一、mq 命令
mqadmin 是 RocketMQ 的一个强大的命令行工具,提供了一系列用于管理和监控消息队列的命令。下面是一些常用的 mqadmin 子命令及其功能简介

| 命令 | 功能描述 |
|---|---|
| updateTopic | 更新或创建主题 |
| deleteTopic | 删除主题 |
| topicList | 列出所有主题的详细信息 |
| updateSubGroup | 更新或创建消费者组 |
| deleteSubGroup | 删除消费者组 |
| consumerProgress | 列出所有消费者组的详细信息 |
| topicStatus | 查看主题状态 |
| topicClusterList | 查看指定主题所在的集群 |
| brokerStatus | 查看Broker的运行状态 |
| queryMsgById | 根据消息ID查询消息 |
| queryMsgByKey | 根据消息键查询消息 |
| queryMsgByUniqueKey | 根据消息的唯一键查询消息 |
| queryMsgByOffset | 根据偏移量查询消息 |
| sendMessage | 发送消息 |
| consumeMessage | 消费消息 |
| checkMsgSendRT | 检查消息发送的实时状态 |
| clusterList | 查看集群列表 |
| statsAll | 查看Broker的统计消息 |
| wipeWritePerm | 清楚Broker写权限 |
| resetOffsetByTime | 根据时间重置消费者的偏移量 |
| getBrokerConfig | 获取Broker的配置 |
| updateBrokerConfig | 更新Broker的配置 |
基础命令演示
# 指定集群DefaultCluster以及nameSrv地址,列出su'p
$ ./mqadmin topicList -c DefaultCluster -n localhost:9876[root@vinjcent bin]# ./mqadmin topicList --help
usage: mqadmin topicList [-c] [-h] [-n <arg>]-c,--clusterModel clusterModel-h,--help Print help-n,--namesrvAddr <arg> Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'
二、消息分类
2.1 普通 | 基础消息
官方中文文档
说明
普通消息为 Apache RocketMQ 中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息。本文为您介绍普通消息的应用场景、功能原理、使用方法和使用建议
应用场景
普通消息为 Apache RocketMQ 中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息
典型场景一:微服务异步解耦

如上图所示,以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联
具体流程:
- 用户(User):流程的开始,用户进行某种操作,如下订单
- 物流订单系统(Logistics order System):用户的操作被转换成消息,这些消息包含了必要的订单数据
- 事件到消息(Event to Message):用户事件(如点击下订单按钮)被转化为系统可以处理的消息格式,通常是以二进制数据标识
- 消息(Message):这一环节表示已经格式化的消息数据,准备发送到消息队列中
- RocketMQ:这是一个中间件,用于接受来自订单系统的消息,处理它们,并将消息传递到下一个系统。它确保消息的可靠传输,并能处理并发情况
- 物流系统(Logistics System):RocketMQ 处理的消息会传递给物流系统。物流系统根据这些消息处理订单的物理配送
- 消息事件(Message to Event):物流系统收到的消息转化为具体的操作或事件,如更新数据库或触发配送流程
- 已下单,发货(Ordered,deliver goods):在物流系统中,订单已确认并启动发货流程
- 成员系统(Member System):同时,订单信息也被用来更新会员系统中的用户信息,如积分增加、会员等级更新等
典型场景二:数据集成传输

如上图所示,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 Apache RocketMQ 。每条消息都是一段日志数据,Apache RocketMQ 不做任何处理,只需要将日志数据可靠投递到下游的存储系统和分析系统即可,后续功能由后端应用完成
- Web应用(Web app):用户在Web应用上的交互开始了整个流程。用户的每一个操作都会生成数据日志
- 日志(Log):这里收集的日志信息包括用户ID、时间、事件和事件ID。这些数据记录了用户的具体操作和操作时间
- 收集代理(Collection agent):日志数据被一个收集系统或代理捕获,然后转化成消息格式,以便进一步的处理和传输
- 日志到消息(Log to Message):将日志数据转换为统一的消息格式,这一步骤是为了将数据标准化,便于后续系统的处理
- 消息(Message):经过格式化的数据现在以消息的形式存在,准备发送到消息队列系统
- RocketMQ:这是一个中间件消息队列系统,用于接收、存储和传输消息。它能确保消息在不同系统间可靠且有效地传递
- 消息到记录(Message to Record):RocketMQ将消息传递给后端系统,如数据库和分析系统
- 数据库(Database):一部分消息会被直接存储在数据库中,用于长期存储和备份
- 分析系统(Analysis System):同时,一部分消息会传输到分析系统,这里会对数据进行进一步的处理和分析,如行为分析、用户画像等
功能原理
定义:普通消息是Apache RocketMQ基本消息功能,支持生产者和消费者的异步解耦通信

普通消息生命周期
- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态
- 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制
使用限制
普通消息仅支持使用MessageType为Normal主题,即普通消息只能发送至类型为普通消息的主题中,发送的消息的类型必须和主题的类型一致
使用示例
- 创建主题
Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:
$ sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=NORMAL$ sh mqadmin updateTopic -n localhost:9876 -t baseTopic -c DefaultCluster -a +message.type=NORMAL

- 报错异常

org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failedat org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:198)at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:164)at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:114)
Caused by: org.apache.rocketmq.remoting.exception.RemotingTimeoutException: invokeSync call the addr[127.0.0.1:9876] timeoutat org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:549)at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1961)at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:577)at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:318)at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:94)at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:171)... 2 more
该错误为在调用namesrv接口创建主题时,无法或请求接口超时,实际上是能ping的通的
- 解决办法
在CentOS中的 /etc/hosts 文件中添加主机映射本地IP

2.1.1 代码实现消息生产
- 创建一个SpringBooot工程,并引入web、rocketmq依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.5</version></dependency>
- 执行代码
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author vinjcent* @description 普通消息的生成者* @since 2024/9/5 22:30:46*/
public class TestProducer {public static void main(String[] args) throws ClientException, IOException {// 1.创建消息// 4.x版本接入点地址,需要设置成namesrv地址// 5.x版本接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081String endpoint = "192.168.159.100:8081";// 消息发送的目标Topic名称,需要提前创建,类型必须是normalString topic = "baseTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();// 2.创建一个msg对象//普通消息发送。MessageBuilder messageBuilder = new MessageBuilderImpl();Message message = messageBuilder.setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息.setKeys("uniqueKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息.setTag("messageTag")// 消息体。.setBody("hello world".getBytes())//.setDeliveryTimestamp(60 * 2L).build();// 3.通过producer发送msgtry {//发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);// 异步生产消息获取到的结果是futureCompletableFuture<SendReceipt> completableFuture = producer.sendAsync(message);// 可以通过自定义线程池来实现异步线程的处理ExecutorService executor = Executors.newCachedThreadPool();completableFuture.whenCompleteAsync((receipt, throwable) -> {// TODO 执行异步发送消息后,需要处理的逻辑}, executor);System.out.println(sendReceipt.getMessageId());} catch (ClientException e) {e.printStackTrace();System.out.println(e.getMessage());}// 4.关闭生产者producer.close();}}
- 运行异常
Expected the service ProducerImpl-0 [FAILED] to be RUNNING, but the service
【注】不能修改rocketmq服务端的broker.conf文件

- 运行结果


2.1.2 代码实现消息消费
Apache RocketMQ 支持 PushConsumer、SimpleConsumer 以及 PullConsumer 这三种类型的消费者,接下来将分别从使用方式、实现原理、可靠性重试和适用场景等方面介绍这三种类型的消费者
背景
Apahce RocketMQ面向不同的业务场景提供了不同消费者类型,每种消费者类型的集成方式和控制方式都不一样。类型如下
- 如何实现并发消费:消费者如何使用并发的多线程机制处理消息,以此提高消息处理效率
- 如何实现同步、异步消息处理:对于不同的集成场景,消费者获取消息后可能会将消息异步分发到业务逻辑中处理,此时消息异步化处理如何实现?
- 如何实现消息可靠处理:消费者处理消息时如何返回响应结果?如何在消息异常情况进行重试,保证消息的可靠处理

如上图所示, Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取—>消息处理—>消费状态提交
针对以上几个阶段,Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下:
信息
在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求
若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理
危险
生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常
| 对比项 | PushConsumer | SimpleConsumer | PullConsumer |
|---|---|---|---|
| 接口方式 | 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑 | 业务方自行实现消息处理,并主动调用接口返回消费结果 | 业务方自行按队列拉取消息,并可选择性地提交消费结果 |
| 消费并发度管理 | 由SDK管理消费并发度 | 由业务方消费逻辑自行管理消费线程 | 由业务方消费逻辑自行管理消费线程 |
| 负载均衡粒度 | 5.0 SDK是消息粒度,更均衡,早期版本是队列维度 | 消息粒度,更均衡 | 队列粒度,吞吐攒批性能更好,但容易不均衡 |
| 接口灵活度 | 高度封装,不够灵活 | 原子接口,可灵活自定义 | 原子接口,可灵活自定义 |
| 适用场景 | 适用于无自定义流程的业务消息开发场景 | 适用于需要高度自定义业务流程的业务开发场景 | 仅推荐在流处理框架场景下集成使用 |
2.1.2.1 消费示例一:PushConsumer
PushConsumers是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成
PushConsumer的消费监听器执行结果分为以下三种情况:
- 返回消费成功:以Java SDK为例,返回
ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。 - 返回消费失败:以Java SDK为例,返回
ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。 - 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费
PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。消息超时,请参见PushConsumer消费重试策略
信息
出现消费超时情况时,SDK虽然提交消费失败结果,但是当前消费线程可能仍然无法响应中断,还会继续处理消息
内部原理
在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑

可靠性重试
PushConsumer 消费者类型中,客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返回结果判断消息是否消费成功,并做可靠性重试。所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步化分发。消息重试具体信息,请参见PushConsumer消费重试策略
使用PushConsumer消费者消费时,不允许使用以下方式处理消息,否则 Apache RocketMQ 无法保证消息的可靠性
- 错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,Apache RocketMQ 服务端是无法感知的,因此不会进行消费重试。
- 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,Apache RocketMQ 服务端同样无法感知,因此也不会进行消费重试
顺序性保障
基于 Apache RocketMQ 顺序消息的定义,如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵循消息的先后顺序。业务处理逻辑无感知即可保证消息的消费顺序
信息
消息消费按照顺序处理的前提是遵循同步提交原则,如果业务逻辑自定义实现了异步分发,则Apache RocketMQ 无法保证消息的顺序性。
适用场景
PushConsumer严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:
- 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。
- 无异步化、高级定制场景:PushConsumer限制了消费逻辑的线程模型,由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程
使用示例
- 在rocketmq中创建一个消费者组 baseGroup
方式一:dashboard控制面板实现

方式二:使用mqadmin命令
$ ./mqadmin updateSubGroup -c DefaultCluster -g baseGroup -n localhost:9876
- 实现代码
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.util.Collections;
import java.util.List;/*** @author vinjcent* @description 测试simple模式消费消息* @since 2024/11/12 22:51:57*/
public class TestSimpleConsumer {private static final Logger log = LoggerFactory.getLogger(TestSimpleConsumer.class);public static void main(String[] args) throws ClientException, InterruptedException {// 1.创建一个消费者组ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081String endpoint = "192.168.159.100:8081";ClientConfiguration configuration = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint).build();// 订阅消息的过滤规则,标识订阅所有tag的消息String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建String consumerGroup = "baseGroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建String topic = "baseTopic";// 初始化PushConsumer,需要绑定消费者分组baseGroup、通信参数以及订阅关系SimpleConsumer simpleConsumer = clientServiceProvider.newSimpleConsumerBuilder().setClientConfiguration(configuration).setConsumerGroup(consumerGroup).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setAwaitDuration(Duration.ofSeconds(5L))//.setMessageListener(messageView -> {// // 处理消息并返回消费结果// log.info("Consumer message successfully, messageId => {}, content => {}", messageView.getMessageId(), messageView.getBody());// return ConsumeResult.SUCCESS;//}).build();// 每次获取5条消息,获取后对其它消费者不可见,且持续这种情况为10sList<MessageView> messageViews = simpleConsumer.receive(5, Duration.ofSeconds(10L));for (MessageView messageView : messageViews) {log.info("主动接受到的消息开始遍历 ===> {}", messageView);}Thread.sleep(Long.MAX_VALUE);}}
- 运行截图

2.1.2.2 消费示例二:SimpleConsumer
SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成
SimpleConsumer主要涉及以下几个接口行为:
| 接口名称 | 主要作用 | 可修改参数 |
|---|---|---|
ReceiveMessage | 消费者主动调用该接口从服务端获取消息。 说明 由于服务端存储为分布式,可能会出现服务端实际有消息,但是返回为空的现象。 一般可通过重新发起ReceiveMessage调用或提高ReceiveMessage的并发度解决 | 批量拉取消息数:SimpleConsumer可以一次性批量获取多条消息实现批量消费,该接口可修改批量获取的消息数量。 消费不可见时间:消息的最长处理耗时,该参数用于控制消费失败时的消息重试间隔。具体信息,请参见SimpleConsumer消费重试策略。消费者调用ReceiveMessage接口时需要指定消费不可见时间 |
AckMessage | 消费者成功消费消息后,主动调用该接口向服务端返回消费成功响应 | 无 |
ChangeInvisibleDuration | 消费重试场景下,消费者可通过该接口修改消息处理时长,即控制消息的重试间隔 | 消费不可见时间:调用本接口可修改ReceiveMessage接口预设的消费不可见时间的参数值。一般用于需要延长消息处理时长的场景 |
可靠性重试
SimpleConsumer消费者类型中,客户端SDK和服务端通过ReceiveMessage和AckMessage接口通信。客户端SDK如果处理消息成功则调用AckMessage接口;如果处理失败只需要不回复ACK响应,即可在定义的消费不可见时间到达后触发消费重试流程。更多信息,请参见SimpleConsumer消费重试策略
顺序性保障
基于 Apache RocketMQ 顺序消息的定义,SimpleConsumer在处理顺序消息时,会按照消息存储的先后顺序获取消息。即需要保持顺序的一组消息中,如果前面的消息未处理完成,则无法获取到后面的消息
适用场景
SimpleConsumer提供原子接口,用于消息获取和提交消费结果,相对于PushConsumer方式更加灵活。SimpleConsumer适用于以下场景:
- 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改
- 需要异步化、批量消费等高级定制场景:SimpleConsumer在SDK内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景
- 需要自定义消费速率:SimpleConsumer是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率
实现用例
- 代码
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Duration;
import java.util.Collections;
import java.util.List;/*** @author vinjcent* @description 测试simple模式消费消息* @since 2024/11/12 22:51:57*/
public class TestSimpleConsumer {private static final Logger log = LoggerFactory.getLogger(TestSimpleConsumer.class);public static void main(String[] args) throws ClientException, InterruptedException {// 1.创建一个消费者组ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081String endpoint = "192.168.159.100:8081";ClientConfiguration configuration = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint).build();// 订阅消息的过滤规则,标识订阅所有tag的消息String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建String consumerGroup = "baseGroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建String topic = "baseTopic";// 初始化PushConsumer,需要绑定消费者分组baseGroup、通信参数以及订阅关系SimpleConsumer simpleConsumer = clientServiceProvider.newSimpleConsumerBuilder().setClientConfiguration(configuration).setConsumerGroup(consumerGroup).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setAwaitDuration(Duration.ofSeconds(5L))//.setMessageListener(messageView -> {// // 处理消息并返回消费结果// log.info("Consumer message successfully, messageId => {}, content => {}", messageView.getMessageId(), messageView.getBody());// return ConsumeResult.SUCCESS;//}).build();// 每次获取5条消息,获取后对其它消费者不可见,且持续这种情况为10sList<MessageView> messageViews = simpleConsumer.receive(5, Duration.ofSeconds(10L));for (MessageView messageView : messageViews) {System.out.println(messageView);}Thread.sleep(Long.MAX_VALUE);}}
- 运行结果

2.2 定时/延时消息
定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。因此,下文统一用定时消息描述
应用场景
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 Apache RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力
典型场景一:分布式定时调度

在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。传统基于数据库的定时调度方案在分布式场景下,性能不高,实现复杂。基于 Apache RocketMQ 的定时消息可以封装出多种类型的定时触发器
典型场景二:任务超时处理

以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发
基于定时消息的超时任务处理具备如下优势:
- 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重
- 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。 Apache RocketMQ 的定时消息具有高并发和水平扩展的能力
功能原理
什么是定时消息
定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果
定时时间设置原则
- Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长
- 定时时间的格式为毫秒级的Unix时间戳,您需要将要设置的时刻转换成时间戳形式。具体方式,请参见Unix时间戳转换工具
- 定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息
- 定时时长最大值默认为24小时,不支持自定义修改,更多信息,请参见参数限制
- 定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息
示例如下:
- 定时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望消息在下午19:20:00定时投递,则定时时间为2022-06-09 19:20:00,转换成时间戳格式为1654773600000
- 延时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望延时1个小时后投递消息,则您需要根据当前时间和延时时长换算成定时时刻,即消息投递时间为2022-06-09 18:30:00,转换为时间戳格式为1654770600000
定时消息生命周期

- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态
- 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达
- 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制
使用限制
消息类型一致性
定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致
定时精度约束
Apache RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度
Apache RocketMQ 定时消息的状态支持持久化存储,系统由于故障重启后,仍支持按照原来设置的定时时间触发消息投递。若存储系统异常重启,可能会导致定时消息投递出现一定延迟
使用示例
1、在终端中创建一个延迟类型的主题
$ sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY

- 控制板界面查看

2、在终端中创建一个消费者组
$ sh mqadmin updateSubGroup -c DefaultCluster -g delayGroup -n localhost:9876
- 控制面板查看

- 先执行消费者代码
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Collections;/*** @author vinjcent* @description 测试push模式消费消息* @since 2024/11/12 22:51:57*/
public class TestPushConsumer {private static final Logger log = LoggerFactory.getLogger(TestPushConsumer.class);public static void main(String[] args) throws ClientException, InterruptedException {// 1.创建一个消费者组ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081String endpoint = "192.168.159.100:8081";ClientConfiguration configuration = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint).build();// 订阅消息的过滤规则,标识订阅所有tag的消息String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建String consumerGroup = "delayGroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建String topic = "DelayTopic";// 初始化PushConsumer,需要绑定消费者分组baseGroup、通信参数以及订阅关系clientServiceProvider.newPushConsumerBuilder().setClientConfiguration(configuration).setConsumerGroup(consumerGroup).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {// 处理消息并返回消费结果log.info("Consumer message successfully, messageId => {}, content => {}", messageView.getMessageId(), messageView.getBody());return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);// 3.消费消息// 4.关闭消费者}}
- 后执行生产者代码
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.Calendar;/*** @author vinjcent* @description 普通消息的生成者* @since 2024/9/5 22:30:46*/
public class TestProducer {private static final Logger log = LoggerFactory.getLogger(TestProducer.class);public static void main(String[] args) throws ClientException, IOException {// 1.创建消息// 4.x版本接入点地址,需要设置成namesrv地址// 5.x版本接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081String endpoint = "192.168.159.100:8081";// 消息发送的目标Topic名称,需要提前创建,类型为DELAYString topic = "DelayTopic";Calendar calendar = Calendar.getInstance();calendar.set(2024, Calendar.DECEMBER, 15, 22, 35, 0);// 延迟消息一:具体到未来某个时间long time = calendar.getTime().getTime();// 延迟消息二:具体到当前时间后的时间(30s后)long delayTime = System.currentTimeMillis() + 30 * 1000;ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();// 2.创建一个msg对象//普通消息发送。MessageBuilder messageBuilder = new MessageBuilderImpl();Message message = messageBuilder.setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息.setKeys("uniqueKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息.setTag("messageTag")// 消息体.setBody("hello world".getBytes())// 消息延迟到具体时间.setDeliveryTimestamp(delayTime).build();// 3.通过producer发送msgtry {//发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);//// 异步生产消息获取到的结果是future//CompletableFuture<SendReceipt> completableFuture = producer.sendAsync(message);//// 可以通过自定义线程池来实现异步线程的处理//ExecutorService executor = Executors.newCachedThreadPool();//completableFuture.whenCompleteAsync((receipt, throwable) -> {// // TODO 执行异步发送消息后,需要处理的逻辑//}, executor);log.info("发送消息成功 ===> 消息ID:[{}]", sendReceipt.getMessageId());} catch (ClientException e) {e.printStackTrace();System.out.println(e.getMessage());}// 4.关闭生产者producer.close();}}
- 运行结果
生产者

消费者

2.3 顺序消息
顺序消息为 Apache RocketMQ 中的高级特性消息
应用场景
在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 Apache RocketMQ 的顺序消息可以有效保证数据传输的顺序性
- 典型场景一:撮合交易

以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单
- 典型场景二:数据实时增量同步
- 普通消息

- 顺序消息

以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致
功能原理
什么是顺序消息
顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系
Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性
基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力
如何保证消息的顺序性
Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性
-
生产顺序性 :
Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化
如需保证消息生产的顺序性,则必须满足以下条件:
- 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序
- 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序
满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
- 相同消息组的消息按照先后顺序被存储在同一个队列
- 不同消息组的消息可以混合在同一个队列中,且不保证连续

如上图所示,消息组1和消息组4的消息混合存储在队列1中, Apache RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系
-
消费顺序性 :
Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理
如需保证消息消费的顺序性,则必须满足以下条件:
-
投递顺序
Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序
【备注】
消费者类型为PushConsumer时, Apache RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。消费者类型的具体信息,请参见消费者分类。
-
有限重试
Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理
对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序
-
生产顺序性和消费顺序性组合
如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性
一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:
| 生产顺序 | 消费顺序 | 顺序性效果 |
|---|---|---|
| 设置消息组,保证消息顺序发送。 | 顺序消费 | 按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。 |
| 设置消息组,保证消息顺序发送。 | 并发消费 | 并发消费,尽可能按时间顺序处理。 |
| 未设置消息组,消息乱序发送。 | 顺序消费 | 按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。 |
| 未设置消息组,消息乱序发送。 | 并发消费 | 并发消费,尽可能按照时间顺序处理。 |
顺序消息生命周期

- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态
- 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制
【备注】
- 消息消费失败或消费超时,会触发服务端重试逻辑,重试消息属于新的消息,原消息的生命周期已结束
- 顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理
使用限制
顺序消息仅支持使用MessageType为FIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致
使用实例
Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加
$ sh mqadmin updateTopic -n localhost:9876 -t orderTopic -c DefaultCluster -o true -a +message.type=FIFO

创建订阅消费组
Apache RocketMQ 5.0版本下创建订阅消费组操作,推荐使用mqadmin工具,需要注意的是,对于订阅消费组顺序类型需要通过 -o 选项设置
$ sh mqadmin updateSubGroup -c DefaultCluster -g orderConsumerGroup -n localhost:9876 -o true

代码示例
- 先执行消费者代码
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Collections;/*** @author vinjcent* @description 测试push模式消费消息* @since 2024/11/12 22:51:57*/
public class TestPushConsumer {private static final Logger log = LoggerFactory.getLogger(TestPushConsumer.class);public static void main(String[] args) throws ClientException, InterruptedException {// 1.创建一个消费者组ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081String endpoint = "192.168.159.100:8081";ClientConfiguration configuration = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint).build();// 订阅消息的过滤规则,标识订阅所有tag的消息String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建String consumerGroup = "orderConsumerGroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建String topic = "orderTopic";// 初始化PushConsumer,需要绑定消费者分组baseGroup、通信参数以及订阅关系clientServiceProvider.newPushConsumerBuilder().setClientConfiguration(configuration).setConsumerGroup(consumerGroup).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {// 处理消息并返回消费结果log.info("Consumer message successfully, messageId => {}, content => {}", messageView.getMessageId(), messageView);return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);// 3.消费消息// 4.关闭消费者}}
- 再执行生产者代码
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;/*** @author vinjcent* @description 普通消息的生成者* @since 2024/9/5 22:30:46*/
public class TestProducer {private static final Logger log = LoggerFactory.getLogger(TestProducer.class);public static void main(String[] args) throws ClientException, IOException {// 1.创建消息// 4.x版本接入点地址,需要设置成namesrv地址// 5.x版本接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081String endpoint = "192.168.159.100:8081";// 消息发送的目标Topic名称,需要提前创建,类型为DELAYString topic = "orderTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).build();for (int i = 0; i < 100; i++) {// 2.创建一个msg对象//普通消息发送。MessageBuilder messageBuilder = new MessageBuilderImpl();Message message = messageBuilder.setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息.setKeys("uniqueKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息.setTag("messageTag")// 设置消息组的消息会被路由到同一个队列,并保证有序.setMessageGroup("orderMessageGroup")// 消息体.setBody("hello world".getBytes()).build();// 3.通过producer发送msgtry {//发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message);//// 异步生产消息获取到的结果是future//CompletableFuture<SendReceipt> completableFuture = producer.sendAsync(message);//// 可以通过自定义线程池来实现异步线程的处理//ExecutorService executor = Executors.newCachedThreadPool();//completableFuture.whenCompleteAsync((receipt, throwable) -> {// // TODO 执行异步发送消息后,需要处理的逻辑//}, executor);log.info("发送消息成功 ===> 消息ID:[{}]", sendReceipt.getMessageId());} catch (ClientException e) {e.printStackTrace();System.out.println(e.getMessage());}}// 4.关闭生产者producer.close();}}
- 运行效果


2.4 事务消息
应用场景
分布式事务的诉求
分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:
- 主分支订单系统状态更新:由未支付变更为支付成功
- 物流系统状态新增:新增待发货物流记录,创建订单物流记录
- 积分系统状态变更:变更用户积分,更新用户积分表
- 购物车系统状态变更:清空购物车,更新用户购物车记录
传统XA事务方案:性能不足
为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差
基于普通消息方案:一致性保障困难
将上述基于XA事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度

该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如:
- 消息发送成功,订单没有执行成功,需要回滚整个事务
- 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致
- 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更
基于Apache RocketMQ分布式事务消息:支持最终一致性
上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力
而基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性

功能原理
什么是事务消息
事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性
事务消息处理流程
事务消息交互流程如下图所示

- 生产者将消息发送至Apache RocketMQ服务端
- Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息
- 生产者开始执行本地事务逻辑
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制
- 生产者收到消息回查后(回查代码有开发者自定义编写),需要检查对应消息的本地事务执行的最终结果
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理
事务消息生命周期

- 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态
- 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见
- 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止
- 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制
使用限制
消息类型一致性
事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致
消费事务性
Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功
中间状态可见性
Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景
事务超时机制
Apache RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制
使用示例
- 首先在终端创建一个事务主题
$ sh mqadmin updateTopic -n localhost:9876 -t transactionTopic -c DefaultCluster -a +message.type=TRANSACTION

- 创建一个消费者组
$ sh mqadmin updateSubGroup -c DefaultCluster -g transactionConsumerGroup -n localhost:9876

- 先执行消费者
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Collections;/*** @author vinjcent* @description 测试push模式消费消息* @since 2024/11/12 22:51:57*/
public class TestPushConsumer {private static final Logger log = LoggerFactory.getLogger(TestPushConsumer.class);public static void main(String[] args) throws ClientException, InterruptedException {// 1.创建一个消费者组ClientServiceProvider clientServiceProvider = ClientServiceProvider.loadService();// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081String endpoint = "192.168.159.100:8081";ClientConfiguration configuration = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint).build();// 订阅消息的过滤规则,标识订阅所有tag的消息String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 为消费者指定所属的消费者分组,Group需要提前创建String consumerGroup = "transactionConsumerGroup";// 指定需要订阅哪个目标Topic,Topic需要提前创建String topic = "transactionTopic";// 初始化PushConsumer,需要绑定消费者分组baseGroup、通信参数以及订阅关系clientServiceProvider.newPushConsumerBuilder().setClientConfiguration(configuration).setConsumerGroup(consumerGroup).setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {// 处理消息并返回消费结果log.info("Consumer message successfully, messageId => {}, content => {}", messageView.getMessageId(), messageView);return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);// 3.消费消息// 4.关闭消费者}}
- 在执行生产者
package com.vinjcent;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.*;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;/*** @author vinjcent* @description 普通消息的生成者* @since 2024/9/5 22:30:46*/
public class TestProducer {private static final Logger log = LoggerFactory.getLogger(TestProducer.class);public static void main(String[] args) throws ClientException, IOException {// 事务模式下的回查操作TransactionChecker checker = messageView -> {log.info("Receive transactional message check, message={}", messageView);// Return the transaction resolution according to your business logic.return TransactionResolution.COMMIT;};// 1.创建消息// 4.x版本接入点地址,需要设置成namesrv地址// 5.x版本接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081String endpoint = "192.168.159.100:8081";// 消息发送的目标Topic名称,需要提前创建,类型为TRANSACTIONString topic = "transactionTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder()//.setRequestTimeout(Duration.ofSeconds(3600L)).setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的TopicProducer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).setTransactionChecker(checker).build();// 获取transaction(开启事务)Transaction transaction = producer.beginTransaction();// 2.创建一个msg对象//普通消息发送。MessageBuilder messageBuilder = new MessageBuilderImpl();Message message = messageBuilder.setTopic(topic)// 设置消息索引键,可根据关键字精确查找某条消息.setKeys("uniqueKey")// 设置消息Tag,用于消费端根据指定Tag过滤消息.setTag("messageTag")// 消息体.setBody("hello world".getBytes()).build();// 3.通过producer发送msgtry {//发送消息,需要关注发送结果,并捕获失败等异常。SendReceipt sendReceipt = producer.send(message, transaction);//// 异步生产消息获取到的结果是future//CompletableFuture<SendReceipt> completableFuture = producer.sendAsync(message);//// 可以通过自定义线程池来实现异步线程的处理//ExecutorService executor = Executors.newCachedThreadPool();//completableFuture.whenCompleteAsync((receipt, throwable) -> {// // TODO 执行异步发送消息后,需要处理的逻辑//}, executor);log.info("发送消息成功 ===> 消息ID:[{}]", sendReceipt.getMessageId());} catch (Exception e) {e.printStackTrace();log.error("消息发送失败,并执行mq事务回滚操作 ===> {}", e.getMessage());System.out.println(e.getMessage());// Or rollback the transaction.transaction.rollback();} finally {log.info("提交事务...");// 提交事务transaction.commit();}// 4.关闭生产者producer.close();}}
- 执行效果


三、消息重试与控流
背景信息
消息发送重试
Apache RocketMQ的消息发送重试机制主要为您解答如下问题:
- 部分节点异常是否影响消息发送?
- 请求重试是否会阻塞业务调用?
- 请求重试会带来什么不足?
消息流控
Apache RocketMQ 的流控机制主要为您解答如下问题:
- 系统在什么情况下会触发流控?
- 触发流控时客户端行为是什么?
- 应该如何避免触发流控,以及如何应对突发流控?
3.1 消息发送重试机制
重试基本概念
Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果
同步发送和异步发送模式均支持消息发送重试
重试触发条件
触发消息发送重试机制的条件如下:
- 客户端消息发送请求调用失败或请求超时
- 网络异常造成连接失败或请求超时
- 服务端节点处于重启或下线等状态造成连接失败
- 服务端运行慢造成请求超时
- 服务端返回失败错误码
- 系统逻辑错误:因运行逻辑不正确造成的错误
- 系统流控错误:因容量超限造成的流控错误
【备注】
对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试
重试流程
生产者在初始化时设置消息发送最大重试次数,当出现上述触发条件的场景时,生产者客户端会按照设置的重试次数一直重试发送消息,直到消息发送成功或达到最大重试次数重试结束,并在最后一次重试失败后返回调用错误响应
- 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败,抛出错误码和异常
- 异步发送:调用线程不会阻塞,但调用结果会通过异常事件或者成功事件返回
重试间隔
- 若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试。指数退避算法通过以下参数控制重试行为:
- INITIAL_BACKOFF: 第一次失败重试前后需等待多久,默认值:1秒
- MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6
- JITTER :随机抖动因子,默认值:0.2
- MAX_BACKOFF :等待间隔时间上限,默认值:120秒
- MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20秒
建议算法如下:
ConnectWithBackoff()// 第一次失败重试前后需等待多久current_backoff = INITIAL_BACKOFF// 当前截止时间 = 现在时间 + 等待时间current_deadline = now() + INITIAL_BACKOFF// 根据"当前截止时间"、"现在时间 + 最短重试间隔",两者之中的最大值作为重试连接等待时间,并重新连接失败while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS)// 睡眠到"当前截止时间"SleepUntil(current_deadline)// 重新计算重试失败需要等待多久 = "前一次失败重试前后需要等待时间 * 指数退避因子"、等待间隔时间上限的最小值current_backoff = Min(current_backoff * MULTIPLIER, MAX_BACKOFF)// 重新计算"当前截止时间" = 当前时间 + 重试失败需要等待时间 + 随机抖动因子,目的为了错开重试current_deadline = now() + current_backoff + UniformRandom(-JITTER * current_backoff, JITTER * current_backoff)
更多信息,请参见connection-backoff 策略
功能约束
- 链路耗时阻塞评估:从上述重试机制可以看出,在重试流程中生产者仅能控制最大重试次数。若由于系统异常触发了SDK内置的重试逻辑,则服务端需要等待最终重试结果,可能会导致消息发送请求链路被阻塞。对于某些实时调用类场景,您需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时
- 最终异常兜底: Apache RocketMQ 客户端内置的发送请求重试机制并不能保证消息发送一定成功。当最终重试仍然失败时,业务方调用需要捕获异常,并做好冗余保护处理,避免消息发送结果不一致
- 消息重复问题:因远程调用的不确定性,当Apache RocketMQ客户端因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果,客户端进行的消息发送重试可能会产生消息重复问题,业务逻辑需要自行处理消息重复问题(幂等性)
消息流控机制
消息流控基本概念
消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力
触发条件
Apache RocketMQ 的消息流控触发条件如下:
- 存储压力大:参考消费进度管理的原理机制,消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控
- 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力
流控行为
当系统触发消息发送流控时,客户端会收到系统限流错误和异常,错误码信息如下:
- reply-code:530
- reply-text:TOO_MANY_REQUESTS
客户端收到系统流控错误码后,会根据指数退避策略进行消息发送重试
处理建议
- 如何避免触发消息流控:触发限流的根本原因是系统容量或水位过高,您可以利用可观测性功能监控系统水位容量等,保证底层资源充足,避免触发流控机制
- 突发消息流控处理:如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议业务方将请求调用临时替换到其他系统进行应急处理
3.2 消息过滤
消费者订阅了某个主题后,Apache RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 Apache RocketMQ 服务端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。本文介绍消息过滤的定义、原理、分类及不同过滤方式的使用方法、配置示例等
应用场景
Apache RocketMQ 作为发布订阅模型的消息中间件广泛应用于上下游业务集成场景。在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只关注自身逻辑需要的消息子集
使用 Apache RocketMQ 的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力
Apache RocketMQ 主要解决的单个业务域即同一个主题内不同消息子集的过滤问题,一般是基于同一业务下更具体的分类进行过滤匹配。如果是需要对不同业务域的消息进行拆分,建议使用不同主题处理不同业务域的消息
功能概述
消息过滤定义
过滤的含义指的是将符合条件的消息投递给消费者,而不是将匹配到的消息过滤掉
Apache RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 Apache RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费
消息过滤原理

消息过滤主要通过以下几个关键流程实现:
- 生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标
- 消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件
- 服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者
消息过滤分类
Apache RocketMQ 支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:
| 对比项 | Tag标签过滤 | SQL属性过滤 |
|---|---|---|
| 过滤目标 | 消息的Tag标签 | 消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性) |
| 过滤能力 | 精准匹配 | SQL语法匹配 |
| 适用场景 | 简单过滤场景、计算逻辑简单轻量 | 复杂过滤场景、计算逻辑较复杂 |
订阅关系一致性
过滤表达式属于订阅关系的一部分,Apache RocketMQ 的领域模型规定,同一消费者分组内的多个消费者的订阅关系包括过滤表达式,必须保持一致,否则可能会导致部分消息消费不到。更多信息,请参见订阅关系(Subscription)
3.3 Tag标签过滤
Tag标签过滤方式是 Apache RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的Tag标签进行匹配。生产者在发送消息时,设置消息的Tag标签,消费者需指定已有的Tag标签来进行匹配订阅
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息:
- 订单消息
- 支付消息
- 物流消息
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的下游系统所订阅:
- 支付系统:只需订阅支付消息
- 物流系统:只需订阅物流消息
- 交易成功率分析系统:需订阅订单和支付消息
- 实时计算系统:需要订阅所有和交易相关的消息
过滤效果如下图所示:

Tag标签设置
- Tag由生产者发送消息时设置,每条消息允许设置一个Tag标签
- Tag使用可见字符,建议长度不超过128字符
Tag标签过滤规则
Tag标签过滤为精准字符串匹配,过滤规则设置格式如下:
- 单Tag匹配:过滤表达式为目标Tag。表示只有消息标签为指定目标Tag的消息符合匹配条件,会被发送给消费者。
- 多Tag匹配:多个Tag之间为或的关系,不同Tag间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为Tag1或Tag2或Tag3的消息都满足匹配条件,都会被发送给消费者进行消费
- 全部匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费
使用示例
-
发送消息,设置Tag标签
Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 //该示例表示消息的Tag设置为"TagA"。 .setTag("TagA") //消息体。 .setBody("messageBody".getBytes()) .build(); -
订阅消息,匹配单个Tag标签
String topic = "Your Topic"; //只订阅消息标签为"TagA"的消息。 FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression); -
订阅消息,匹配多个Tag标签
String topic = "Your Topic"; //只订阅消息标签为"TagA"、"TagB"或"TagC"的消息。 FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression); -
订阅消息,匹配Topic中的所有消息,不进行过滤
String topic = "Your Topic"; //使用Tag标签过滤消息,订阅所有消息。 FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
3.4 SQL属性过滤
SQL属性过滤是 Apache RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置SQL语法的过滤表达式过滤多个属性
信息
Tag是一种系统属性,所以SQL过滤方式也兼容Tag标签过滤。在SQL语法中,Tag的属性名称为TAGS
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海:
- 订单消息
- 物流消息
- 物流消息且地域为杭州
- 物流消息且地域为上海
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅:
- 物流系统1:只需订阅物流消息且消息地域为杭州
- 物流系统2:只需订阅物流消息且消息地域为杭州或上海
- 订单跟踪系统:只需订阅订单消息
- 实时计算系统:需要订阅所有和交易相关的消息
过滤效果如下图所示:

消息属性设置
生产者发送消息时可以自定义消息属性,每个属性都是一个自定义的键值对(Key-Value)
每条消息支持设置多个属性
SQL属性过滤规则
SQL属性过滤使用SQL92语法作为过滤规则表达式,语法规范如下:
| 语法 | 说明 | 示例 |
|---|---|---|
| IS NULL | 判断属性不存在 | a IS NULL :属性a不存在 |
| IS NOT NULL | 判断属性存在 | a IS NOT NULL:属性a存在 |
| > >= < <= | 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明 可转化为数字的字符串也被认为是数字 | a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100。 a IS NOT NULL AND a > 'abc':错误示例,abc为字符串,不能用于比较大小 |
| BETWEEN xxx AND xxx | 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间 | a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100 |
| NOT BETWEEN xxx AND xxx | 用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外 | a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100 |
| IN (xxx, xxx) | 表示属性的值在某个集合内。集合的元素只能是字符串 | a IS NOT NULL AND (a IN ('abc', 'def')):属性a存在且属性a的值为abc或def |
| = <> | 等于和不等于。可用于比较数字和字符串 | a IS NOT NULL AND (a = 'abc' OR a<>'def'):属性a存在且属性a的值为abc或a的值不为def |
| AND OR | 逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内 | a IS NOT NULL AND (a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在 |
由于SQL属性过滤是生产者定义消息属性,消费者设置SQL过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:
- 异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值
- 空值情况处理:如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为null
- 数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。
使用示例
-
发送消息,同时设置消息Tag标签和自定义属性
Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 //该示例表示消息的Tag设置为"messageTag"。 .setTag("messageTag") //消息也可以设置自定义的分类属性,例如环境标签、地域、逻辑分支。 //该示例表示为消息自定义一个属性,该属性为地域,属性值为杭州。 .addProperty("Region", "Hangzhou") //消息体。 .setBody("messageBody".getBytes()) .build(); -
订阅消息,根据单个自定义属性匹配消息
String topic = "topic"; //只订阅地域属性为杭州的消息。 FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression); -
订阅消息,同时根据多个自定义属性匹配消息
String topic = "topic"; //只订阅地域属性为杭州且价格属性大于30的消息。 FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression); -
订阅消息,匹配Topic中的所有消息,不进行过滤
String topic = "topic"; //订阅所有消息。 FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
使用建议
合理划分主题和Tag标签
从消息的过滤机制和主题的原理机制可以看出,业务消息的拆分可以基于主题进行筛选,也可以基于主题内消息的Tag标签及属性进行筛选。关于拆分方式的选择,应遵循以下原则:
- 消息类型是否一致:不同类型的消息,如顺序消息和普通消息需要使用不同的主题进行拆分,无法通过Tag标签进行分类
- 业务域是否相同:不同业务域和部门的消息应该拆分不同的主题。例如物流消息和支付消息应该使用两个不同的主题;同样是一个主题内的物流消息,普通物流消息和加急物流消息则可以通过不同的Tag进行区分
- 消息量级和重要性是否一致:如果消息的量级规模存在巨大差异,或者说消息的链路重要程度存在差异,则应该使用不同的主题进行隔离拆分
3.5 消费者负载均衡
消费者从 Apache RocketMQ 获取消息消费时,通过消费者负载均衡策略,可将主题内的消息分配给指定消费者分组中的多个消费者共同分担,提高消费并发能力和消费者的水平扩展能力
背景信息
了解消费者负载均衡策略,可以帮助您解决以下问题:
- 消息消费处理的容灾策略:您可以根据消费者负载均衡策略,明确当局部节点出现故障时,消息如何进行消费重试和容灾切换
- 消息消费的顺序性机制:通过消费者负载均衡策略,您可以进一步了解消息消费时,如何保证同一消息组内消息的先后顺序
- 消息分配的水平拆分策略:了解消费者负载均衡策略,您可以明确消息消费压力如何被分配到不同节点,有针对性地进行流量迁移和水平扩缩容
广播消费和共享消费
在 Apache RocketMQ 领域模型中,同一条消息支持被多个消费者分组订阅,同时,对于每个消费者分组可以初始化多个消费者。您可以根据消费者分组和消费者的不同组合,实现以下两种不同的消费效果:

-
消费组间广播消费 :如上图所示,每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内所有的消息,各消费者分组都订阅相同的消息,以此实现单客户端级别的广播一对多推送效果
该方式一般可用于网关推送、配置推送等场景
-
消费组内共享消费 :如上图所示,每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和均衡负载
该方式一般可用于微服务解耦场景
什么是消费者负载均衡
如上文所述,消费组间广播消费场景下,每个消费者分组内只有一个消费者,因此不涉及消费者的负载均衡
消费组内共享消费场景下,消费者分组内多个消费者共同分担消息,消息按照哪种逻辑分配给哪个消费者,就是由消费者负载均衡策略所决定的
根据消费者类型的不同,消费者负载均衡策略分为以下两种模式:
- 消息粒度负载均衡:PushConsumer和SimpleConsumer默认负载策略
- 队列粒度负载均衡:PullConsumer默认负载策略
3.6 消息粒度负载均衡
使用范围
对于PushConsumer和SimpleConsumer类型的消费者,默认且仅使用消息粒度负载均衡策略
【备注】
上述说明是指5.0 SDK下,PushConsumer默认使用消息粒度负载均衡,对于3.x/4.x等Remoting协议SDK 仍然使用了队列粒度负载均衡。业务集成如无特殊需求,建议使用新版本机制
策略原理
消息粒度负载均衡策略中,同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给多个消费者共同消费

如上图所示,消费者分组Group A中有三个消费者A1、A2和A3,这三个消费者将共同消费主题中同一队列Queue1中的多条消息。 注意 消息粒度负载均衡策略保证同一个队列的消息可以被多个消费者共同处理,但是该策略使用的消息分配算法结果是随机的,并不能指定消息被哪一个特定的消费者处理
消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的。消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时。因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费
顺序消息负载机制
在顺序消息中,消息的顺序性指的是同一消息组内的多个消息之间的先后顺序。因此,顺序消息场景下,消息粒度负载均衡策略还需要保证同一消息组内的消息,按照服务端存储的先后顺序进行消费。不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费

如上图所述,队列Queue1中有4条顺序消息,这4条消息属于同一消息组G1,存储顺序由M1到M4。在消费过程中,前面的消息M1、M2被消费者Consumer A1处理时,只要消费状态没有提交,消费者A2是无法并行消费后续的M3、M4消息的,必须等前面的消息提交消费状态后才能消费后面的消息
策略特点
相对于队列粒度负载均衡策略,消息粒度负载均衡策略有以下特点:
- 消费分摊更均衡:对于传统队列级的负载均衡策略,如果队列数量和消费者数量不均衡,则可能会出现部分消费者空闲,或部分消费者处理过多消息的情况。消息粒度负载均衡策略无需关注消费者和队列的相对数量,能够更均匀地分摊消息
- 对非对等消费者更友好:在线上生产环境中,由于网络机房分区延迟、消费者物理资源规格不一致等原因,消费者的处理能力可能会不一致,如果按照队列分配消息,则可能出现部分消费者消息堆积、部分消费者空闲的情况。消息粒度负载均衡策略按需分配,消费者处理任务更均衡
- 队列分配运维更方便:传统基于绑定队列的负载均衡策略必须保证队列数量大于等于消费者数量,以免产生部分消费者获取不到队列出现空转的情况,而消息粒度负载均衡策略则无需关注队列数
适用场景
消息粒度消费负载均衡策略下,同一队列内的消息离散地分布于多个消费者,适用于绝大多数在线事件处理的场景。只需要基本的消息处理能力,对消息之间没有批量聚合的诉求。而对于流式处理、聚合计算场景,需要明确地对消息进行聚合、批处理时,更适合使用队列粒度的负载均衡策略
使用示例
消息粒度负载均衡策略不需要额外设置,对于PushConsumer和SimpleConsumer消费者类型默认启用
3.7 队列粒度负载均衡
使用范围
对于历史版本(服务端4.x/3.x版本)的消费者,包括PullConsumer、DefaultPushConsumer、DefaultPullConsumer、LitePullConsumer等,默认且仅能使用队列粒度负载均衡策略
策略原理
队列粒度负载均衡策略中,同一消费者分组内的多个消费者将按照队列粒度消费消息,即每个队列仅被一个消费者消费

如上图所示,主题中的三个队列Queue1、Queue2、Queue3被分配给消费者分组中的两个消费者,每个队列只能分配给一个消费者消费,该示例中由于队列数大于消费者数,因此,消费者A2被分配了两个队列。若队列数小于消费者数量,可能会出现部分消费者无绑定队列的情况
队列粒度的负载均衡,基于队列数量、消费者数量等运行数据进行统一的算法分配,将每个队列绑定到特定的消费者,然后每个消费者按照取消息>提交消费位点>持久化消费位点的消费语义处理消息,取消息过程不提交消费状态,因此,为了避免消息被多个消费者重复消费,每个队列仅支持被一个消费者消费
【备注】
队列粒度负载均衡策略保证同一个队列仅被一个消费者处理,该策略的实现依赖消费者和服务端的信息协商机制,Apache RocketMQ 并不能保证协商结果完全强一致。因此,在消费者数量、队列数量发生变化时,可能会出现短暂的队列分配结果不一致,从而导致少量消息被重复处理。
策略特点
相对于消息粒度负载均衡策略,队列粒度负载均衡策略分配粒度较大,不够灵活。但该策略在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好
适用场景
队列粒度负载均衡策略适用于流式计算、数据聚合等需要明确对消息进行聚合、批处理的场景
使用示例
队列粒度负载均衡策略不需要额外设置,对于历史版本(服务端4.x/3.x版本)的消费者类型PullConsumer默认启用
使用建议
针对消费逻辑做消息幂等
无论是消息粒度负载均衡策略还是队列粒度负载均衡策略,在消费者上线或下线、服务端扩缩容等场景下,都会触发短暂的重新负载均衡动作。此时可能会存在短暂的负载不一致情况,出现少量消息重复的现象。因此,需要在下游消费逻辑中做好消息幂等去重处理
四、消费进度管理
背景信息
Apache RocketMQ 的生产者和消费者在进行消息收发时,必然会涉及以下场景,消息先生产后订阅或先订阅后生产。这两种场景下,消费者客户端启动后从哪里开始消费?如何标记已消费的消息?这些都是由 Apache RocketMQ 的消费进度管理机制来定义的
通过了解 Apache RocketMQ 的消费进度管理机制,可以帮助您解答以下问题:
- 消费者启动后从哪里开始消费消息?
- 消费者每次消费成功后如何标记消息状态,确保下次不会再重复处理该消息?
- 某消息被指定消费者消费过一次后,如果业务出现异常需要做故障恢复,该消息能否被重新消费?
4.1 消费进度原理
消息位点(Offset)
参考 Apache RocketMQ 主题和队列的定义,消息是按到达服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点
任意一个消息队列在逻辑上都是无限存储,即消息位点会从0到Long.MAX无限增加。通过主题、队列和位点就可以定位任意一条消息的位置,具体关系如下图所示:

Apache RocketMQ 定义队列中最早一条消息的位点为最小消息位点(MinOffset);最新一条消息的位点为最大消息位点(MaxOffset)。虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限, Apache RocketMQ 会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化

消费位点(ConsumerOffset)
Apache RocketMQ 领域模型为发布订阅模式,每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息
因此,Apache RocketMQ 通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,Apache RocketMQ 会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点
当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点
信息
消费位点的保存和恢复是基于 Apache RocketMQ 服务端的存储实现,和任何消费者无关。因此 Apache RocketMQ 支持跨消费者的消费进度恢复
队列中消息位点MinOffset、MaxOffset和每个消费者分组的消费位点ConsumerOffset的关系如下:

- ConsumerOffset≤MaxOffset:
- 当消费速度和生产速度一致,且全部消息都处理完成时,最大消息位点和消费位点相同,即ConsumerOffset=MaxOffset
- 当消费速度较慢小于生产速度时,队列中会有部分消息未消费,此时消费位点小于最大消息位点,即ConsumerOffset<MaxOffset,两者之差就是该队列中堆积的消息量
- ConsumerOffset≥MinOffset:正常情况下有效的消费位点ConsumerOffset必然大于等于最小消息位点MinOffset。消费位点小于最小消息位点时是无效的,相当于消费者要消费的消息已经从队列中删除了,是无法消费到的,此时服务端会将消费位点强制纠正到合法的消息位点
消费位点初始值
消费位点初始值指的是消费者分组首次启动消费者消费消息时,服务端保存的消费位点的初始值
Apache RocketMQ 定义消费位点的初始值为消费者首次获取消息时,该时刻队列中的最大消息位点。相当于消费者将从队列中最新的消息开始消费
4.2 重置消费位点
若消费者分组的初始消费位点或当前消费位点不符合您的业务预期,您可以通过重置消费位点调整您的消费进度。
适用场景
- 初始消费位点不符合需求:因初始消费位点为当前队列的最大消息位点,即客户端会直接从最新消息开始消费。若业务上线时需要消费部分历史消息,您可以通过重置消费位点功能消费到指定时刻前的消息
- 消费堆积快速清理:当下游消费系统性能不足或消费速度小于生产速度时,会产生大量堆积消息。若这部分堆积消息可以丢弃,您可以通过重置消费位点快速将消费位点更新到指定位置,绕过这部分堆积的消息,减少下游处理压力
- 业务回溯,纠正处理:由于业务消费逻辑出现异常,消息被错误处理。若您希望重新消费这些已被处理的消息,可以通过重置消费位点快速将消费位点更新到历史指定位置,实现消费回溯
重置功能
Apache RocketMQ 的重置消费位点提供以下能力:
- 重置到队列中的指定位点
- 重置到某一时刻对应的消费位点,匹配位点时,服务端会根据自动匹配到该时刻最接近的消费位点
使用限制
- 重置消费位点后消费者将直接从重置后的位点开始消费,对于回溯重置类场景,重置后的历史消息大多属于存储冷数据,可能会造成系统压力上升,一般称为冷读现象。因此,需要谨慎评估重置消费位点后的影响。建议严格控制重置消费位点接口的调用权限,避免无意义、高频次的消费位点重置
- Apache RocketMQ 重置消费位点功能只能重置对消费者可见的消息,不能重置定时中、重试等待中的消息。更多信息,请参见定时/延时消息和消费重试
版本兼容性
关于消费者分组的消费位点初始值,不同的服务端版本中定义如下:
- 服务端历史版本(4.x/3.x版本):消息位点初始值受当前队列消息状态的影响
- 服务端5.x版本:明确定义消费位点初始值为消费者获取消息时刻队列中的最大消息位点
因此,若您将服务端版本从历史版本升级到最新的5.x版本时,需要自行对消费者首次启动时的情况做兼容性判断
使用建议
【严格控制消费位点重置的权限】
重置消费位点会给系统带来额外处理压力,可能会影响新消息的读写性能。 因此该操作请在适用场景下谨慎执行,并提前做好合理性和必要性评估
五、消费重试
应用场景
Apache RocketMQ 的消费重试主要解决的是业务处理逻辑失败导致的消费完整性问题,是一种为业务兜底的策略,不应该被用做业务流程控制。建议以下消费失败场景使用重试机制:
推荐使用消息重试场景如下:
- 业务处理失败,且失败原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功
- 消费失败的原因不会导致连续性,即当前消息消费失败是一个小概率事件,不是常态化的失败,后面的消息大概率会消费成功。此时可以对当前消息进行重试,避免进程阻塞
典型错误使用场景如下:
- 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的,因为处理逻辑已经预见了一定会大量出现该判断分支
- 消费处理中使用消费失败来做处理速率限流,是不合理的。限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路
5.1 消费重试策略概述
消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一定次数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中
消息重试的触发条件
- 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常
- 消息处理超时,包括在PushConsumer中排队超时
消息重试策略主要行为
- 重试过程状态机:控制消息在重试流程中的状态和变化逻辑
- 重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间
- 最大重试次数:消息可被重试消费的最大次数
消息重试策略差异
根据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下:
| 消费者类型 | 重试过程状态机 | 重试间隔 | 最大重试次数 |
|---|---|---|---|
| PushConsumer | 已就绪 处理中 待重试 提交 * 死信 | 消费者分组创建时元数据控制。 无序消息:阶梯间隔 顺序消息:固定间隔时间 | 消费者分组创建时的元数据控制 |
| SimpleConsumer | 已就绪 处理中 提交 死信 | 通过API修改获取消息时的不可见时间 | 消费者分组创建时的元数据控制 |
具体的重试策略,请参见下文PushConsumer消费重试策略和SimpleConsumer消费重试策略
5.2 PushConsumer消费重试策略
重试状态机
PushConsumer消费消息时,消息的几个主要状态如下:

- Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费
- Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态
- WaitingRetry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败
- Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机
- DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复
消息重试过程中,每次重试消息状态都会经过已就绪>处理中>待重试的变化,两次消费间的间隔时间实际由消费耗时及重试间隔控制,消费耗时的最大上限受服务端系统参数控制,一般不应该超过上限时间

最大重试次数
PushConsumer的最大重试次数由消费者分组创建时的元数据控制,具体参数,请参见消费者分组
例如,最大重试次数为3次,则该消息最多可被投递4次,1次为原始消息,3次为重试投递次数
重试间隔时间
-
无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下:
第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间 1 10秒 9 7分钟 2 30秒 10 8分钟 3 1分钟 11 9分钟 4 2分钟 12 10分钟 5 3分钟 13 20分钟 6 4分钟 14 30分钟 7 5分钟 15 1小时 8 6分钟 16 2小时
信息
若重试次数超过16次,后面每次重试间隔都为2小时
- 顺序消息:重试间隔为固定时间,具体取值,请参见参数限制
使用示例
PushConsumer触发消息重试只需要返回消费失败的状态码即可,当出现非预期的异常时,也会被SDK捕获
SimpleConsumer simpleConsumer = null;//消费示例:使用PushConsumer消费普通消息,如果消费失败返回错误,即可触发重试。MessageListener messageListener = new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {System.out.println(messageView);//返回消费失败,会自动重试,直至到达最大重试次数。return ConsumeResult.FAILURE;}};
5.3 SimpleConsumer消费重试策略
重试状态机
SimpleConsumer消费消息时,消息的几个主要状态如下:

- Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费
- Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态
- Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机
- DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复
和PushConsumer消费重试策略不同的是,SimpleConsumer消费者的重试间隔是预分配的,每次获取消息消费者会在调用API时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值

由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,您可以通过API接口修改不可见时间
例如,您预设消息处理耗时最多20 ms,但实际业务中20 ms内消息处理不完,您可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制
修改消息不可见时间需要满足以下条件:
- 消息处理未超时
- 消息处理未提交消费状态
如下图所示,消息不可见时间修改后立即生效,即从调用API时刻开始,重新计算消息不可见时间

最大重试次数
SimpleConsumer的最大重试次数由消费者分组创建时的元数据控制,具体参数,请参见消费者分组。
消息重试间隔
消息重试间隔=不可见时间-消息实际处理时长
SimpleConsumer 的消费重试间隔通过消息的不可见时间控制。例如,消息不可见时间为30 ms,实际消息处理用了10 ms就返回失败响应,则距下次消息重试还需要20 ms,此时的消息重试间隔即为20 ms;若直到30 ms消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为0 ms。
使用示例
SimpleConsumer 触发消息重试只需要等待即可。
//消费示例:使用SimpleConsumer消费普通消息,如果希望重试,只需要静默等待超时即可,服务端会自动重试。List<MessageView> messageViewList = null;try {messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView -> {System.out.println(messageView);//如果处理失败,希望服务端重试,只需要忽略即可,等待消息再次可见后即可重试获取。});} catch (ClientException e) {//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。e.printStackTrace();}
使用建议
合理重试,避免因限流等诉求触发消费重试
上文应用场景中提到,消息重试适用业务处理失败且当前消费为小概率事件的场景,不适合在连续性失败的场景下使用,例如消费限流场景
- 错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费
- 正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费
合理控制重试次数,避免无限重试
虽然Apache RocketMQ支持自定义消费重试次数,但是建议通过减少重试次数+延长重试间隔来降低系统压力,避免出现无限重试或大量重试的情况
六、消息存储和清理机制
背景信息
参考 Apache RocketMQ 中队列的定义,消息按照达到服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储
但是在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储。因此,在实际使用中需要考虑以下问题,消息在服务端中的存储以什么维度为判定条件?消息存储以什么粒度进行管理?消息存储超过限制后如何处理?这些问题都是由消息存储和过期清理机制来定义的
了解消息存储和过期清理机制,可以从以下方面帮助您更好的进行运维管理:
- 提供消息存储时间SLA,为业务提供安全冗余空间:消息存储时间的承诺本质上代表业务侧可以自由获取消息的时间范围。对于消费时间长、消息堆积、故障恢复等场景非常关键
- 评估和控制存储成本:Apache RocketMQ 消息一般存储于磁盘介质上,您可以通过存储机制评估消息存储空间,提前预留存储资源
消息存储机制
原理机制
Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉
消息存储机制主要定义以下关键问题:
- 消息存储管理粒度:Apache RocketMQ 按存储节点管理消息的存储时长,并不是按照主题或队列粒度来管理
- 消息存储判断依据:消息存储按照存储时间作为判断依据,相对于消息数量、消息大小等条件,使用存储时间作为判断依据,更利于业务方对消息数据的价值进行评估
- 消息存储和是否消费状态无关:Apache RocketMQ 的消息存储是按照消息的生产时间计算,和消息是否被消费无关。按照统一的计算策略可以有效地简化存储机制
消息在队列中的存储情况如下:

【备注】
消息存储管理粒度说明
Apache RocketMQ 按照服务端节点粒度管理存储时长而非队列或主题,原因如下:
- 消息存储优势权衡:Apache RocketMQ 基于统一的物理日志队列和轻量化逻辑队列的二级组织方式,管理物理数据。这种机制可以带来顺序读写、高吞吐、高性能等优势,但缺点是不支持按主题和队列单独管理
- 安全生产和容量保障风险要求:即使Apache RocketMQ 按照主题或者队列独立生成存储文件,但存储层本质还是共享存储介质。单独根据主题或队列控制存储时长,这种方式看似更灵活,但实际上整个集群仍然存在容量风险,可能会导致存储时长SLA被打破。从安全生产角度考虑,最合理的方式是将不同存储时长的消息通过不同集群进行分离治理
消息存储和消费状态关系说明
Apache RocketMQ 统一管理消息的存储时长,无论消息是否被消费
当消费者不在线或消息消费异常时,会造成队列中大量消息堆积,且该现象暂时无法有效控制。若此时按照消费状态考虑将未消费的消息全部保留,则很容易导致存储空间不足,进而影响到新消息的读写速度
根据统一地存储时长管理消息,可以帮助消费者业务清晰地判断每条消息的生命周期。只要消息在有效期内可以随时被消费,或通过重置消费位点功能使消息可被消费多次
消息存储文件结构说明 Apache RocketMQ 消息默认存储在本地磁盘文件中,存储文件的根目录由配置参数 storePathRootDir 决定,存储结构如下图所示,其中 commitlog 文件夹存储消息物理文件,consumeCQueue文件夹存储逻辑队列索引,其他文件的详细作用可以参考代码解析

消息过期清理机制
在 Apache RocketMQ中,消息保存时长并不能完整控制消息的实际保存时间,因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性消息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长
使用建议
消息存储时长建议适当增加
Apache RocketMQ 按存储时长统一控制消息是否保留。建议在存储成本可控的前提下,尽可能延长消息存储时长。延长消息存储时长,可以为紧急故障恢复、应急问题排查和消息回溯带来更多的可操作空间
