当前位置: 首页 > news >正文

RocketMQ 消费模式

在分布式消息中间件的世界里,RocketMQ 以其高吞吐、低延迟、高可靠的特性占据了重要地位。作为 Java 开发者,我们在使用 RocketMQ 时,经常会遇到一个经典问题:RocketMQ 的消息消费到底是推模式(Push)还是拉模式(Pull)?这个问题看似简单,却隐藏着 RocketMQ 底层设计的精妙之处。

本文将带你深入 RocketMQ 的消费机制,从底层原理到实际代码,全方位解析其消费模式的真相。无论你是刚接触 RocketMQ 的新手,还是有一定经验的开发者,相信都能从这篇文章中获得新的认知。

一、推模式与拉模式的本质区别

在探讨 RocketMQ 的消费模式之前,我们首先需要明确什么是推模式(Push)和拉模式(Pull),以及它们之间的核心区别。

1.1 推模式(Push)

推模式是指消息服务器在有新消息到达时,主动将消息推送给消费者。这种模式下,消费者处于被动接收的状态。

工作流程:

优点:

  • 实时性好:消息到达后能立即被处理
  • 消费者实现简单:只需专注于消息处理逻辑
  • 无需关心消息获取的时机和频率

缺点:

  • 服务器压力大:需要维护与所有消费者的连接
  • 可能导致消费者过载:当消息量突增时,服务器仍会持续推送
  • 灵活性差:无法根据消费者的处理能力动态调整消息获取速率

1.2 拉模式(Pull)

拉模式是指消费者主动从消息服务器拉取消息。这种模式下,消费者掌握着消息获取的主动权。

工作流程:

优点:

  • 消费者控制主动权:可以根据自身处理能力调整拉取频率
  • 服务器压力小:无需维护长连接和推送逻辑
  • 灵活性高:支持批量拉取、按条件拉取等高级特性

缺点:

  • 实时性差:如果拉取间隔设置不合理,可能导致消息处理延迟
  • 消费者实现复杂:需要处理消息拉取、重试、偏移量管理等逻辑
  • 可能造成资源浪费:即使没有新消息,消费者也可能频繁发起拉取请求

了解了两种模式的本质区别后,我们再来看看 RocketMQ 是如何选择和实现的。

二、RocketMQ 的消费模式:表面是推,底层是拉

RocketMQ 的消费模式设计非常巧妙,它在表面上提供了推模式的易用性,而在底层实现上却采用了拉模式的灵活性。这种 "推拉结合" 的设计,兼顾了易用性和性能。

2.1 官方定义

根据 RocketMQ 官方文档的描述:

RocketMQ 提供了两种消费模式:Push Consumer 和 Pull Consumer。其中,Push Consumer 并不是真正意义上的推送,而是由 Consumer 端主动拉取消息,只是封装了拉取过程,并在拉取到消息后触发回调函数,让用户感觉像是消息被推送过来的。

这表明,RocketMQ 的所谓 "推模式" 本质上是基于拉模式实现的,是一种 "伪推模式"。

2.2 核心设计思想

RocketMQ 采用这种设计的核心思想是:在保证易用性的同时,最大限度地提高系统的灵活性和可靠性。

  • 对用户而言,使用 Push Consumer 就像使用真正的推模式一样简单,只需注册消息监听器即可
  • 对系统而言,底层采用拉模式可以避免服务器主动推送带来的各种问题,同时让消费者可以根据自身情况调整拉取策略

2.3 工作原理

RocketMQ 的 Push Consumer 工作原理如下:

关键在于长轮询(Long Polling)机制:

  1. 消费者向 Broker 发送拉取消息的请求
  2. 如果 Broker 没有新消息,不会立即返回空结果,而是将请求挂起一段时间(默认 30 秒)
  3. 在这段时间内,如果有新消息到达,Broker 会立即将消息返回给消费者
  4. 如果超时仍没有新消息,Broker 会返回空结果,消费者收到后会立即再次发起拉取请求

这种机制既保证了消息的实时性(接近推模式),又保留了消费者的主动权(拉模式的优势)。

三、Push Consumer 详解

Push Consumer 是 RocketMQ 中最常用的消费方式,它封装了复杂的拉取逻辑,提供了简单易用的接口。下面我们来详细了解其使用方法和内部机制。

3.1 基本使用示例

首先,我们需要在 Maven 项目中引入 RocketMQ 的依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>3.2.0</version>
</dependency>
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version>
</dependency>
<dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.3.0</version><scope>runtime</scope>
</dependency>
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.45</version>
</dependency>
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.1.0-jre</version>
</dependency>
<dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>2.3.0</version>
</dependency>

接下来,配置 RocketMQ:

rocketmq:name-server: 127.0.0.1:9876consumer:group: demo_consumer_group# 消费模式:BROADCASTING(广播)或CLUSTERING(集群)message-model: CLUSTERING# 批量消费最大消息数consume-batch-size: 10# 消费线程池核心线程数consume-thread-min: 20# 消费线程池最大线程数consume-thread-max: 64

然后,实现一个简单的 Push Consumer:

import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;import java.util.List;/*** 订单消息消费者* 采用Push模式消费RocketMQ消息** @author ken*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "order_topic",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "order_created || order_paid" // 只消费这两个标签的消息
)
public class OrderMessageConsumer implements RocketMQListener<String> {@Autowiredprivate OrderService orderService;/*** 处理接收到的消息** @param message 消息内容,JSON格式的订单信息*/@Overridepublic void onMessage(String message) {log.info("接收到订单消息: {}", message);try {// 解析消息OrderMessage orderMessage = JSON.parseObject(message, OrderMessage.class);if (ObjectUtils.isEmpty(orderMessage)) {log.error("消息解析失败,消息内容: {}", message);return;}// 根据消息类型处理不同的业务逻辑if ("order_created".equals(orderMessage.getEventType())) {handleOrderCreated(orderMessage);} else if ("order_paid".equals(orderMessage.getEventType())) {handleOrderPaid(orderMessage);} else {log.warn("未知的消息类型: {}", orderMessage.getEventType());}} catch (Exception e) {log.error("处理订单消息失败", e);// 如果需要重试,可以抛出异常,RocketMQ会根据重试策略进行重试throw new RuntimeException("处理订单消息失败", e);}}/*** 处理订单创建消息** @param orderMessage 订单消息*/private void handleOrderCreated(OrderMessage orderMessage) {log.info("处理订单创建消息,订单ID: {}", orderMessage.getOrderId());// 调用业务服务处理订单创建逻辑orderService.processOrderCreation(orderMessage);}/*** 处理订单支付消息** @param orderMessage 订单消息*/private void handleOrderPaid(OrderMessage orderMessage) {log.info("处理订单支付消息,订单ID: {}", orderMessage.getOrderId());// 调用业务服务处理订单支付逻辑orderService.processOrderPayment(orderMessage);}
}

上面的代码展示了一个典型的 Push Consumer 实现:

  1. 通过@RocketMQMessageListener注解指定要消费的 topic、消费组和消息选择器
  2. 实现RocketMQListener接口,并重写onMessage方法处理消息
  3. onMessage方法中解析消息并根据消息类型处理不同的业务逻辑

3.2 批量消费

RocketMQ 支持批量消费消息,可以提高消费效率。只需将RocketMQListener的泛型指定为List<T>即可:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.util.List;/*** 批量消费订单消息** @author ken*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "order_topic",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "*",consumeMode = ConsumeMode.CONCURRENTLY // 并发消费模式
)
public class BatchOrderMessageConsumer implements RocketMQListener<List<String>> {@Autowiredprivate OrderService orderService;/*** 批量处理接收到的消息** @param messages 消息列表,每个元素是JSON格式的订单信息*/@Overridepublic void onMessage(List<String> messages) {log.info("接收到批量订单消息,数量: {}", messages.size());if (CollectionUtils.isEmpty(messages)) {log.warn("接收到空的消息列表");return;}try {// 解析批量消息List<OrderMessage> orderMessages = Lists.newArrayList();for (String message : messages) {OrderMessage orderMessage = JSON.parseObject(message, OrderMessage.class);if (!ObjectUtils.isEmpty(orderMessage)) {orderMessages.add(orderMessage);} else {log.error("消息解析失败,消息内容: {}", message);}}// 批量处理订单消息orderService.batchProcessOrderMessages(orderMessages);} catch (Exception e) {log.error("批量处理订单消息失败", e);throw new RuntimeException("批量处理订单消息失败", e);}}
}

批量消费的大小可以通过consume-batch-size配置项控制,默认是 1 条,最大可以设置为 32 条。

3.3 消费模式

RocketMQ 支持两种消费模式:

  1. 集群消费(CLUSTERING):消息只会被消费组中的一个消费者消费,适用于负载均衡场景
  2. 广播消费(BROADCASTING):消息会被消费组中的所有消费者消费,适用于通知场景

可以通过message-model配置项指定,默认为集群消费。

// 在@RocketMQMessageListener注解中指定消费模式
@RocketMQMessageListener(topic = "order_topic",consumerGroup = "broadcast_consumer_group",messageModel = MessageModel.BROADCASTING // 广播消费模式
)

3.4 消费位点管理

RocketMQ 通过消费位点(Offset)来记录消费者已经消费到的位置。对于 Push Consumer,RocketMQ 会自动管理 Offset:

  • 集群消费模式:Offset 存储在 Broker 端,由 Broker 统一管理
  • 广播消费模式:Offset 存储在 Consumer 本地,每个消费者维护自己的 Offset

如果需要自定义 Offset 管理,可以实现RocketMQPushConsumerLifecycleListener接口:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;/*** 自定义Offset管理的消费者** @author ken*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "order_topic",consumerGroup = "custom_offset_consumer_group"
)
public class CustomOffsetConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {log.info("处理消息: {}", message);// 消息处理逻辑}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置从最早的消息开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 可以在这里设置自定义的Offset存储// consumer.setOffsetStore(new CustomOffsetStore(...));}
}

3.5 消息重试机制

当消息处理失败时,RocketMQ 会自动进行重试。可以通过以下方式配置重试策略:

rocketmq:consumer:# 最大重试次数max-reconsume-times: 16# 重试队列的Topic名称,默认为%RETRY%+消费组名retry-topic: "%RETRY%demo_consumer_group"

也可以在代码中指定:

@RocketMQMessageListener(topic = "order_topic",consumerGroup = "retry_consumer_group",maxReconsumeTimes = 3 // 最大重试3次
)

当消息重试达到最大次数后,会被发送到死信队列(Dead Letter Queue),死信队列的 Topic 名称格式为%DLQ%+消费组名

四、Pull Consumer 详解

虽然 Push Consumer 已经能满足大部分场景,但在某些特殊情况下,我们可能需要更灵活的消息消费方式,这时就可以使用 Pull Consumer。

4.1 基本使用示例

Pull Consumer 需要手动管理消息的拉取过程,包括拉取频率、Offset 管理等:

import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** 订单消息拉取消费者* 采用Pull模式主动拉取RocketMQ消息** @author ken*/
@Slf4j
@Component
public class OrderPullConsumer implements InitializingBean {@Value("${rocketmq.name-server}")private String nameServer;@Value("${rocketmq.consumer.group}")private String consumerGroup;private final DefaultMQPullConsumer consumer;// 记录每个队列的消费Offsetprivate final Map<MessageQueue, Long> offsetTable = new HashMap<>();// 定时任务线程池,用于定时拉取消息private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();public OrderPullConsumer() {// 初始化Pull Consumerconsumer = new DefaultMQPullConsumer(consumerGroup);}@Overridepublic void afterPropertiesSet() throws Exception {start();}/*** 启动Pull Consumer** @throws MQClientException 如果启动失败*/public void start() throws MQClientException {consumer.setNamesrvAddr(nameServer);consumer.start();log.info("Pull Consumer启动成功,消费组: {}", consumerGroup);// 获取Topic下的所有消息队列Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order_topic");if (CollectionUtils.isEmpty(mqs)) {log.warn("没有找到order_topic的消息队列");return;}// 定时拉取消息,初始延迟1秒,之后每5秒拉取一次scheduler.scheduleAtFixedRate(() -> {try {pullMessages(mqs);} catch (Exception e) {log.error("拉取消息异常", e);}}, 1, 5, TimeUnit.SECONDS);}/*** 拉取消息** @param mqs 消息队列集合* @throws Exception 如果拉取过程中发生异常*/private void pullMessages(Set<MessageQueue> mqs) throws Exception {for (MessageQueue mq : mqs) {log.info("开始拉取消息队列: {}", mq);// 获取该队列的下一个Offsetlong offset = getMessageQueueOffset(mq);// 拉取消息PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);// 处理拉取结果handlePullResult(mq, pullResult);// 更新OffsetupdateConsumeOffset(mq, pullResult.getNextBeginOffset());}}/*** 获取消息队列的消费Offset** @param mq 消息队列* @return 消费Offset* @throws MQClientException 如果获取失败*/private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {Long offset = offsetTable.get(mq);if (ObjectUtils.isEmpty(offset)) {// 如果本地没有记录,从Broker获取offset = consumer.fetchConsumeOffset(mq, true);if (ObjectUtils.isEmpty(offset)) {offset = 0L;}}return offset;}/*** 处理拉取结果** @param mq 消息队列* @param pullResult 拉取结果*/private void handlePullResult(MessageQueue mq, PullResult pullResult) {if (ObjectUtils.isEmpty(pullResult)) {log.warn("拉取结果为空,队列: {}", mq);return;}switch (pullResult.getPullStatus()) {case FOUND:// 有消息List<MessageExt> messages = pullResult.getMsgFoundList();if (!CollectionUtils.isEmpty(messages)) {log.info("从队列 {} 拉取到 {} 条消息", mq, messages.size());processMessages(messages);}break;case NO_NEW_MSG:// 没有新消息log.debug("队列 {} 没有新消息", mq);break;case NO_MATCHED_MSG:// 没有匹配的消息log.debug("队列 {} 没有匹配的消息", mq);break;case OFFSET_ILLEGAL:// Offset非法log.error("队列 {} 的Offset非法", mq);// 重置Offset为0try {consumer.updateConsumeOffset(mq, 0L);offsetTable.put(mq, 0L);} catch (MQClientException e) {log.error("重置Offset失败", e);}break;default:break;}}/*** 处理拉取到的消息** @param messages 消息列表*/private void processMessages(List<MessageExt> messages) {for (MessageExt message : messages) {try {String msgBody = new String(message.getBody(), "UTF-8");log.info("处理拉取到的消息: {}", msgBody);// 解析消息并处理OrderMessage orderMessage = JSON.parseObject(msgBody, OrderMessage.class);if (!ObjectUtils.isEmpty(orderMessage)) {// 调用业务服务处理消息if ("order_created".equals(orderMessage.getEventType())) {orderService.processOrderCreation(orderMessage);} else if ("order_paid".equals(orderMessage.getEventType())) {orderService.processOrderPayment(orderMessage);}} else {log.error("消息解析失败: {}", msgBody);}} catch (Exception e) {log.error("处理消息失败", e);// 处理失败时,可以根据需要决定是否重试或记录到死信队列}}}/*** 更新消费Offset** @param mq 消息队列* @param offset 新的Offset* @throws MQClientException 如果更新失败*/private void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {log.debug("更新队列 {} 的Offset为: {}", mq, offset);consumer.updateConsumeOffset(mq, offset);offsetTable.put(mq, offset);}/*** 关闭消费者*/public void shutdown() {if (consumer != null) {consumer.shutdown();}if (scheduler != null) {scheduler.shutdown();}log.info("Pull Consumer已关闭,消费组: {}", consumerGroup);}
}

Pull Consumer 的使用相对复杂,需要手动处理以下事项:

  1. 初始化DefaultMQPullConsumer并启动
  2. 获取 Topic 下的所有消息队列
  3. 为每个队列维护消费 Offset
  4. 手动调用pull方法拉取消息
  5. 根据拉取结果处理消息
  6. 更新消费 Offset
  7. 处理各种异常情况

4.2 拉取策略

Pull Consumer 提供了多种拉取方法,可以根据不同需求选择:

  1. pull(MessageQueue mq, String subExpression, long offset, int maxNums):基本的拉取方法
  2. pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums):如果没有消息,会阻塞一段时间(默认 5 秒)
  3. pullAsync(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback):异步拉取,通过回调处理结果
// 异步拉取示例
consumer.pullAsync(mq, "*", offset, 32, new PullCallback() {@Overridepublic void onSuccess(PullResult pullResult) {log.info("异步拉取成功,队列: {}", mq);try {handlePullResult(mq, pullResult);updateConsumeOffset(mq, pullResult.getNextBeginOffset());} catch (Exception e) {log.error("处理异步拉取结果失败", e);}}@Overridepublic void onException(Throwable e) {log.error("异步拉取异常,队列: {}", mq, e);}
});

4.3 Offset 管理

Pull Consumer 需要手动管理 Offset,这既是劣势(增加了复杂性),也是优势(可以灵活控制消费位置)。常见的 Offset 管理策略有:

  1. 内存管理:如上面示例中使用offsetTable在内存中记录 Offset,优点是简单高效,缺点是服务重启后会丢失 Offset 信息
  2. 持久化管理:将 Offset 存储到数据库或其他持久化存储中,确保服务重启后可以恢复 Offset
/*** 从数据库获取Offset*/
private long getOffsetFromDB(MessageQueue mq) {// 假设我们有一个OffsetDao来操作数据库OffsetDO offsetDO = offsetDao.selectOne(new QueryWrapper<OffsetDO>().eq("consumer_group", consumerGroup).eq("topic", mq.getTopic()).eq("queue_id", mq.getQueueId()).eq("broker_name", mq.getBrokerName()));if (ObjectUtils.isEmpty(offsetDO)) {return 0L;}return offsetDO.getOffset();
}/*** 将Offset保存到数据库*/
private void saveOffsetToDB(MessageQueue mq, long offset) {OffsetDO offsetDO = new OffsetDO();offsetDO.setConsumerGroup(consumerGroup);offsetDO.setTopic(mq.getTopic());offsetDO.setBrokerName(mq.getBrokerName());offsetDO.setQueueId(mq.getQueueId());offsetDO.setOffset(offset);offsetDO.setUpdateTime(new Date());// 先查询是否存在OffsetDO existing = offsetDao.selectOne(new QueryWrapper<OffsetDO>().eq("consumer_group", consumerGroup).eq("topic", mq.getTopic()).eq("queue_id", mq.getQueueId()).eq("broker_name", mq.getBrokerName()));if (ObjectUtils.isEmpty(existing)) {offsetDao.insert(offsetDO);} else {offsetDO.setId(existing.getId());offsetDao.updateById(offsetDO);}
}

五、Push vs Pull:如何选择?

了解了 RocketMQ 的两种消费模式后,我们需要根据实际场景选择合适的消费方式。

5.1 适用场景对比

场景推荐模式原因
大部分常规业务场景Push Consumer简单易用,无需关心复杂的拉取逻辑
消息处理耗时较长Pull Consumer可以控制拉取频率,避免消息堆积
需要批量处理消息两者均可Push 支持批量消费,Pull 可以更灵活地控制批量大小
需要根据业务情况动态调整消费速率Pull Consumer可以根据系统负载动态调整拉取频率
需要精确控制消费位置Pull Consumer可以手动设置 Offset,实现回溯消费等功能
消费端资源有限Pull Consumer可以根据资源情况调整消费策略

5.2 性能对比

在性能方面,两种模式各有优劣:

  • Push Consumer:由于采用长轮询机制,消息实时性好,但会有一定的网络开销
  • Pull Consumer:可以减少无效的网络请求,但如果拉取策略不合理,可能导致消息延迟

实际测试表明,在正常负载下,两种模式的吞吐量差异不大。但在高负载场景下,Pull Consumer 由于可以更灵活地控制消费节奏,可能会表现出更好的稳定性。

5.3 代码复杂度对比

  • Push Consumer:代码简洁,只需实现消息处理逻辑
  • Pull Consumer:代码复杂,需要处理消息拉取、Offset 管理、异常重试等

六、RocketMQ 消费模式的底层实现

为了更深入地理解 RocketMQ 的消费模式,我们有必要了解其底层实现原理。

6.1 长轮询机制

RocketMQ 的 Push Consumer 之所以能实现类似推模式的效果,关键在于长轮询机制。其流程如下:

长轮询机制的核心是 Broker 端的PullRequestHoldService,它负责管理所有挂起的拉取请求。当有新消息到达时,会触发notifyMessageArriving方法,唤醒对应的挂起请求并返回消息。

6.2 消费进度管理

RocketMQ 通过 OffsetStore 来管理消费进度:

  • 集群消费模式:使用RemoteBrokerOffsetStore,Offset 存储在 Broker 端
  • 广播消费模式:使用LocalFileOffsetStore,Offset 存储在本地文件中

OffsetStore 的核心接口定义如下:

public interface OffsetStore {/*** 加载Offset*/void load() throws MQClientException;/*** 更新Offset到内存*/void updateOffset(MessageQueue mq, long offset, boolean increaseOnly);/*** 从内存中读取Offset*/long readOffset(MessageQueue mq, ReadOffsetType type);/*** 持久化Offset*/void persistAll(Set<MessageQueue> mqs);/*** 持久化单个队列的Offset*/void persist(MessageQueue mq);/*** 移除Offset*/void removeOffset(MessageQueue mq);/*** 克隆Offset存储*/Map<MessageQueue, Long> cloneOffsetTable(String topic);
}

6.3 消息拉取流程

Push Consumer 的消息拉取流程由DefaultMQPushConsumerImplPullMessageService共同完成:

PullMessageService是一个独立的线程,负责管理所有的拉取请求,包括发送请求、处理响应和重试等。

七、实战案例:订单状态同步系统

下面我们通过一个实际案例,展示如何在项目中选择和使用 RocketMQ 的消费模式。

7.1 需求分析

我们需要实现一个订单状态同步系统,主要功能包括:

  1. 接收订单系统发送的订单状态变更消息
  2. 将订单状态同步到多个下游系统(库存、物流、财务等)
  3. 保证消息不丢失,且下游系统最终能接收到正确的订单状态

7.2 架构设计

7.3 消费模式选择

根据需求分析,我们选择不同的消费模式:

  1. 对于实时性要求高的库存系统和物流系统,使用 Push Consumer
  2. 对于批量处理的财务系统,使用 Pull Consumer,在夜间低峰期批量同步数据

7.4 代码实现

7.4.1 实体类定义
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;import java.util.Date;/*** 订单消息实体** @author ken*/
@Data
@Schema(description = "订单消息实体")
public class OrderMessage {@Schema(description = "订单ID")private String orderId;@Schema(description = "用户ID")private String userId;@Schema(description = "订单状态")private String orderStatus;@Schema(description = "事件类型")private String eventType;@Schema(description = "消息发送时间")private Date sendTime;@Schema(description = "订单金额")private BigDecimal amount;
}/*** 订单同步记录实体** @author ken*/
@Data
@TableName("order_sync_record")
@Schema(description = "订单同步记录实体")
public class OrderSyncRecord {@TableId(type = IdType.AUTO)@Schema(description = "记录ID")private Long id;@Schema(description = "订单ID")private String orderId;@Schema(description = "同步目标系统")private String targetSystem;@Schema(description = "同步状态:0-未同步,1-已同步,2-同步失败")private Integer syncStatus;@Schema(description = "同步次数")private Integer syncCount;@Schema(description = "最后同步时间")private Date lastSyncTime;@Schema(description = "创建时间")private Date createTime;@Schema(description = "更新时间")private Date updateTime;
}
7.4.2 Push Consumer 实现(同步到库存和物流系统)
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;/*** 订单状态实时同步消费者* 同步订单状态到库存和物流系统** @author ken*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "order_status_topic",consumerGroup = "order_realtime_sync_group",selectorExpression = "order_status_changed"
)
public class OrderRealtimeSyncConsumer implements RocketMQListener<String> {@Autowiredprivate InventoryService inventoryService;@Autowiredprivate LogisticsService logisticsService;@Autowiredprivate OrderSyncRecordService syncRecordService;@Overridepublic void onMessage(String message) {log.info("接收到订单状态变更消息: {}", message);try {OrderMessage orderMessage = JSON.parseObject(message, OrderMessage.class);if (ObjectUtils.isEmpty(orderMessage)) {log.error("消息解析失败,消息内容: {}", message);return;}// 记录同步开始syncRecordService.createRecord(orderMessage.getOrderId(), "INVENTORY");syncRecordService.createRecord(orderMessage.getOrderId(), "LOGISTICS");// 同步到库存系统boolean inventorySuccess = inventoryService.syncOrderStatus(orderMessage);if (inventorySuccess) {syncRecordService.updateSuccess(orderMessage.getOrderId(), "INVENTORY");} else {syncRecordService.updateFailed(orderMessage.getOrderId(), "INVENTORY");// 同步失败,抛出异常触发重试throw new RuntimeException("同步订单状态到库存系统失败,订单ID: " + orderMessage.getOrderId());}// 同步到物流系统boolean logisticsSuccess = logisticsService.syncOrderStatus(orderMessage);if (logisticsSuccess) {syncRecordService.updateSuccess(orderMessage.getOrderId(), "LOGISTICS");} else {syncRecordService.updateFailed(orderMessage.getOrderId(), "LOGISTICS");// 同步失败,抛出异常触发重试throw new RuntimeException("同步订单状态到物流系统失败,订单ID: " + orderMessage.getOrderId());}} catch (Exception e) {log.error("处理订单状态变更消息失败", e);throw new RuntimeException("处理订单状态变更消息失败", e);}}
}
7.4.3 Pull Consumer 实现(同步到财务系统)
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** 订单状态批量同步消费者* 批量同步订单状态到财务系统** @author ken*/
@Slf4j
@Component
public class OrderBatchSyncConsumer {@Value("${rocketmq.name-server}")private String nameServer;@Value("${rocketmq.consumer.group}")private String consumerGroup;private final DefaultMQPullConsumer consumer;private final Map<MessageQueue, Long> offsetTable = new HashMap<>();private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();@Autowiredprivate FinanceService financeService;@Autowiredprivate OrderSyncRecordService syncRecordService;public OrderBatchSyncConsumer() {consumer = new DefaultMQPullConsumer(consumerGroup + "_batch");}@PostConstructpublic void init() throws MQClientException {consumer.setNamesrvAddr(nameServer);// 设置从最后一个Offset开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.start();log.info("订单批量同步消费者启动成功");// 每天凌晨2点执行批量同步scheduler.scheduleAtFixedRate(this::batchSync, calculateInitialDelay(), 24 * 60 * 60, TimeUnit.SECONDS);}/*** 计算到凌晨2点的初始延迟时间(秒)*/private long calculateInitialDelay() {long now = System.currentTimeMillis();long target = now;// 设置目标时间为今天凌晨2点Calendar calendar = Calendar.getInstance();calendar.setTimeInMillis(now);calendar.set(Calendar.HOUR_OF_DAY, 2);calendar.set(Calendar.MINUTE, 0);calendar.set(Calendar.SECOND, 0);calendar.set(Calendar.MILLISECOND, 0);target = calendar.getTimeInMillis();// 如果当前时间已经过了今天凌晨2点,则设置为明天凌晨2点if (target <= now) {target += 24 * 60 * 60 * 1000;}return (target - now) / 1000;}/*** 批量同步订单状态到财务系统*/public void batchSync() {log.info("开始批量同步订单状态到财务系统");try {Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order_status_topic");if (CollectionUtils.isEmpty(mqs)) {log.warn("没有找到订单状态主题的消息队列");return;}// 拉取并处理昨天的所有消息long endTime = System.currentTimeMillis();long startTime = endTime - 24 * 60 * 60 * 1000;for (MessageQueue mq : mqs) {log.info("开始处理消息队列: {}", mq);long offset = getMessageQueueOffset(mq);boolean hasMore = true;while (hasMore) {PullResult pullResult = consumer.pull(mq, "order_status_changed", offset, 100);switch (pullResult.getPullStatus()) {case FOUND:List<MessageExt> messages = pullResult.getMsgFoundList();if (!CollectionUtils.isEmpty(messages)) {log.info("从队列 {} 拉取到 {} 条消息", mq, messages.size());// 筛选出昨天的消息List<MessageExt> targetMessages = Lists.newArrayList();for (MessageExt msg : messages) {if (msg.getBornTimestamp() >= startTime && msg.getBornTimestamp() < endTime) {targetMessages.add(msg);} else if (msg.getBornTimestamp() >= endTime) {// 已经是今天的消息,不需要处理hasMore = false;break;}}if (!CollectionUtils.isEmpty(targetMessages)) {processBatchMessages(targetMessages);}offset = pullResult.getNextBeginOffset();updateConsumeOffset(mq, offset);}break;case NO_NEW_MSG:case NO_MATCHED_MSG:hasMore = false;break;case OFFSET_ILLEGAL:log.error("队列 {} 的Offset非法,重置为0", mq);offset = 0;updateConsumeOffset(mq, offset);break;}}}log.info("批量同步订单状态到财务系统完成");} catch (Exception e) {log.error("批量同步订单状态到财务系统失败", e);}}/*** 批量处理消息*/private void processBatchMessages(List<MessageExt> messages) {List<OrderMessage> orderMessages = Lists.newArrayList();// 解析消息for (MessageExt msg : messages) {try {String msgBody = new String(msg.getBody(), "UTF-8");OrderMessage orderMessage = JSON.parseObject(msgBody, OrderMessage.class);if (!ObjectUtils.isEmpty(orderMessage)) {orderMessages.add(orderMessage);syncRecordService.createRecord(orderMessage.getOrderId(), "FINANCE");} else {log.error("消息解析失败: {}", msgBody);}} catch (Exception e) {log.error("解析消息失败", e);}}// 批量同步到财务系统if (!CollectionUtils.isEmpty(orderMessages)) {try {List<String> successOrderIds = financeService.batchSyncOrderStatus(orderMessages);log.info("批量同步成功,订单数量: {}", successOrderIds.size());// 更新成功记录for (String orderId : successOrderIds) {syncRecordService.updateSuccess(orderId, "FINANCE");}// 处理失败的订单List<String> allOrderIds = orderMessages.stream().map(OrderMessage::getOrderId).collect(Collectors.toList());allOrderIds.removeAll(successOrderIds);for (String orderId : allOrderIds) {syncRecordService.updateFailed(orderId, "FINANCE");log.warn("订单 {} 同步到财务系统失败", orderId);}} catch (Exception e) {log.error("批量同步到财务系统失败", e);// 标记所有订单为同步失败for (OrderMessage msg : orderMessages) {syncRecordService.updateFailed(msg.getOrderId(), "FINANCE");}}}}// 其他方法(获取Offset、更新Offset等)与前面的Pull Consumer示例类似private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {// 实现从数据库获取Offset的逻辑// ...}private void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {// 实现更新Offset到数据库的逻辑// ...}@PreDestroypublic void destroy() {if (consumer != null) {consumer.shutdown();}if (scheduler != null) {scheduler.shutdown();}log.info("订单批量同步消费者已关闭");}
}
7.4.4 服务层实现
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;import java.util.Date;
import java.util.List;/*** 订单同步记录服务实现** @author ken*/
@Service
public class OrderSyncRecordServiceImpl extends ServiceImpl<OrderSyncRecordMapper, OrderSyncRecord> implements OrderSyncRecordService {@Override@Transactional(rollbackFor = Exception.class)public void createRecord(String orderId, String targetSystem) {if (StringUtils.isEmpty(orderId) || StringUtils.isEmpty(targetSystem)) {log.error("订单ID或目标系统不能为空");return;}OrderSyncRecord record = new OrderSyncRecord();record.setOrderId(orderId);record.setTargetSystem(targetSystem);record.setSyncStatus(0); // 0-未同步record.setSyncCount(0);record.setCreateTime(new Date());record.setUpdateTime(new Date());baseMapper.insert(record);}@Override@Transactional(rollbackFor = Exception.class)public void updateSuccess(String orderId, String targetSystem) {if (StringUtils.isEmpty(orderId) || StringUtils.isEmpty(targetSystem)) {log.error("订单ID或目标系统不能为空");return;}OrderSyncRecord record = baseMapper.selectOne(new QueryWrapper<OrderSyncRecord>().eq("order_id", orderId).eq("target_system", targetSystem).last("limit 1"));if (ObjectUtils.isEmpty(record)) {log.warn("未找到订单 {} 同步到 {} 系统的记录", orderId, targetSystem);return;}record.setSyncStatus(1); // 1-已同步record.setSyncCount(record.getSyncCount() + 1);record.setLastSyncTime(new Date());record.setUpdateTime(new Date());baseMapper.updateById(record);}@Override@Transactional(rollbackFor = Exception.class)public void updateFailed(String orderId, String targetSystem) {if (StringUtils.isEmpty(orderId) || StringUtils.isEmpty(targetSystem)) {log.error("订单ID或目标系统不能为空");return;}OrderSyncRecord record = baseMapper.selectOne(new QueryWrapper<OrderSyncRecord>().eq("order_id", orderId).eq("target_system", targetSystem).last("limit 1"));if (ObjectUtils.isEmpty(record)) {log.warn("未找到订单 {} 同步到 {} 系统的记录", orderId, targetSystem);return;}record.setSyncStatus(2); // 2-同步失败record.setSyncCount(record.getSyncCount() + 1);record.setLastSyncTime(new Date());record.setUpdateTime(new Date());baseMapper.updateById(record);}
}

八、常见问题与解决方案

在使用 RocketMQ 的消费模式时,可能会遇到一些常见问题,下面我们介绍这些问题的解决方案。

8.1 消息重复消费

问题描述:同一条消息被多次消费。

原因分析

  • 消费端处理消息成功,但在更新 Offset 时失败
  • 网络抖动导致 Broker 没有收到 Offset 更新请求
  • 消费者异常退出,未及时提交 Offset

解决方案

  1. 实现消息的幂等处理,确保重复消费不会导致业务异常
  2. 对于关键业务,使用事务消息或分布式锁保证一致性
  3. 调整autoCommitOffset参数,根据业务需求选择手动提交或自动提交
// 手动提交Offset示例
@RocketMQMessageListener(topic = "order_topic",consumerGroup = "manual_commit_consumer_group",consumeMode = ConsumeMode.ORDERLY // 顺序消费模式下支持手动提交
)
public class ManualCommitConsumer implements RocketMQListener<MessageExt> {@Autowiredprivate OrderService orderService;@Overridepublic void onMessage(MessageExt message) {log.info("接收到消息: {}", new String(message.getBody()));try {// 处理消息orderService.processMessage(message);// 手动提交Offsetmessage.getCommitLogOffset();// 注意:在实际使用中,需要通过MessageListenerOrderly接口才能手动提交} catch (Exception e) {log.error("处理消息失败", e);// 处理失败,不提交Offset,会导致消息重试throw new RuntimeException("处理消息失败", e);}}
}

8.2 消息堆积

问题描述:消息消费速度跟不上生产速度,导致消息堆积。

原因分析

  • 消费端处理逻辑耗时过长
  • 消费者数量不足
  • 消费线程池配置不合理

解决方案

  1. 优化消费端处理逻辑,减少处理时间
  2. 增加消费者实例数量(集群消费模式)
  3. 调整消费线程池参数,增加consume-thread-max
  4. 使用批量消费提高消费效率
  5. 对于非实时性要求的场景,使用 Pull Consumer 控制消费速度
# 优化消费线程池配置
rocketmq:consumer:consume-thread-min: 50consume-thread-max: 200consume-batch-size: 32

8.3 消息丢失

问题描述:消息发送成功,但消费者未消费到。

原因分析

  • 消费者 Offset 被意外重置
  • 消息被发送到了死信队列
  • 消费者组配置错误
  • Broker 数据丢失

解决方案

  1. 确保消费者组配置正确,不同业务使用不同的消费组
  2. 监控死信队列,及时处理死信消息
  3. 启用 Broker 的持久化配置,确保数据不丢失
  4. 定期备份 Offset 信息,以便在异常情况下恢复
// 死信队列消费者示例
@RocketMQMessageListener(topic = "%DLQ%demo_consumer_group", // 死信队列的Topic名称格式为%DLQ%+消费组名consumerGroup = "dlq_consumer_group"
)
public class DLQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("接收到死信队列消息: {}", message);// 处理死信消息,可以人工介入或自动重试}
}

九、总结

通过本文的深入分析,我们可以得出结论:RocketMQ 的消费模式表面上提供了推模式(Push Consumer)和拉模式(Pull Consumer)两种选择,但 Push Consumer 本质上是基于拉模式实现的,通过长轮询机制模拟了推模式的效果。

这种设计既保证了易用性(像使用推模式一样简单),又保留了灵活性(底层基于拉模式),是一种非常巧妙的折中方案。

在实际项目中,我们应该根据具体需求选择合适的消费模式:

  • 大部分场景下,推荐使用 Push Consumer,它简单易用且能满足大部分需求
  • 在需要更灵活控制消费过程的场景下,可以选择 Pull Consumer,但要注意处理好 Offset 管理、消息重试等复杂逻辑

无论选择哪种模式,都需要注意处理消息的幂等性、重复消费和消息堆积等问题,以确保系统的可靠性和稳定性。

http://www.dtcms.com/a/542375.html

相关文章:

  • 广东网站开发公司网店运营计划书
  • 网站备案注意网站分为
  • 菠菜网站如何做推广wordpress中常用插件安装包
  • HTML基础语法
  • 网站建设发布雅安移动网站建设
  • 王梓同亮相IMX国际音乐季,畅谈音乐创作理念
  • 企业网站管理百度网站打开
  • Hive安装部署
  • 初始网络通信
  • 从MSF载荷生成到Windows防火墙绕过
  • 成品网站设计网站网站设计优缺点
  • 广州建设工程交易中心网站软件定制网站优化 seo一站式
  • orchestrator Web API
  • 营销型网站免费企业网站模版单页网站seo如何优化
  • MySQL之慢查询sql排查及优化
  • 如何实现企业网站推广的系统性哪家公司做跳转网站
  • Redis Cluster集群理论
  • 广州网站开发定制设计北京做网站的大公司
  • 求个网站你会感谢我的西安大雁塔高多少米
  • 59网站一起做网店电商详情页设计思路
  • 网站建设预算表样本wordpress 上一篇 下一篇
  • 江西 网站 建设 开发永辉企业微信app下载安装
  • 提升学历是什么意思百度地图优化
  • 竹子林网站建设php网站开发实践指南
  • 【测试理论和实践 4.测试用例】
  • Java—接口
  • 移动网站源码兰州需要做推广的公司
  • 公司架设网站费用怎么做分录自学it怎么入门
  • 不用代码做网站杭州品牌网站设计
  • 商城建站模板广州从化建设网站官网