当前位置: 首页 > news >正文

深入剖析 RocketMQ 中的 DefaultMQPushConsumerImpl:消息推送消费的核心实现

前言

在 Apache RocketMQ 的消息消费体系中,RocketMQ 提供了DefaultMQPushConsumer(推送消费)和DefaultMQPullConsumer(拉取消费)两种主要消费方式。DefaultMQPushConsumer与DefaultMQPullConsumer在消息获取方式,消息处理逻辑,负载均衡与消费管理都有着不同的处理逻辑,这篇博文主要进行分析DefaultMQPushConsumer。

一、DefaultMQPushConsumerImpl 的主要属性

/*** consumer重平衡的组件*/private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);/*** 过滤消息的钩子*/private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();/*** 消费者启动的时间戳*/private final long consumerStartTimestamp = System.currentTimeMillis();/*** 消费消息的钩子list*/private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();/*** rpc的钩子*/private final RPCHook rpcHook;/*** 服务状态*/private volatile ServiceState serviceState = ServiceState.CREATE_JUST;/*** 网络客户端通信*/private MQClientInstance mQClientFactory;/*** 消息拉取的Api*/private PullAPIWrapper pullAPIWrapper;/*** 是否暂停拉取消息的标识*/private volatile boolean pause = false;/*** 是否顺序消费的标识*/private boolean consumeOrderly = false;/*** 消息处理监听器*/private MessageListener messageListenerInner;/*** 消费偏移量的存储组件*/private OffsetStore offsetStore;/*** 消费消息的服务*/private ConsumeMessageService consumeMessageService;/*** 队列流量控制次数*/private long queueFlowControlTimes = 0;/*** 队列最大跨度流量控制次数*/private long queueMaxSpanFlowControlTimes = 0;

其中比较重要的属性位:

  • pullAPIWrapper:负责与Broker进行消息拉取的交互,封装了底层的网络请求与响应处理。

  • rebalanceImpl:实现消费者的负载均衡逻辑,动态分配消息队列给消费者实例

  • offsetStore:负责消费者消费进度(偏移量)的存储与管理,确保消息不重复消费、不丢失

  • consumeMessageService:管理消息消费的线程池,执行具体的消息消费任务

二、DefaultMQPushConsumerImpl 启动方法

//启动方法public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;// 检查配置合法性this.checkConfig();// 复制订阅信息this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}// 获取或创建MQClientInstancethis.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}//从磁盘加载数据this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();//向broker发送心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//立即进行rebalancethis.mQClientFactory.rebalanceImmediately();}

从DefaultMQPushConsumerImpl的start方法中可以看出来主要作用为:

1. 配置校验与状态管理

  • 配置合法性检查:验证消费者组名、NameServer 地址、订阅关系等核心配置是否正确,确保后续操作的基础条件满足。

  • 状态机控制:通过状态枚举(ServiceState)确保消费者只能从初始状态(CREATE_JUST)启动,防止重复启动或非法状态转换。

2. 核心组件初始化

  • 复制订阅信息:将用户配置的主题(Topic)和过滤表达式(Tag)复制到内部数据结构,用于后续消息过滤。

  • 获取或创建 MQClientInstance:通过单例模式获取或创建与 RocketMQ 集群通信的客户端实例,该实例负责管理网络连接、心跳机制和请求路由。

  • 注册消费者:将当前消费者注册到 MQClientInstance 的消费者注册表中,便于统一管理和协调。

3. 启动核心服务

  • 负载均衡服务:初始化并启动RebalanceImpl,定期(默认 20 秒)执行负载均衡算法,动态分配消息队列给消费者实例,确保集群内负载均匀。

  • 消息拉取服务:启动PullAPIWrapper,初始化拉取线程池,为后续从 Broker 拉取消息做准备。

  • 消息消费服务:根据消费模式(并发 / 顺序)启动对应的ConsumeMessageService,初始化消费线程池,处理拉取到的消息。

4. 状态更新与资源准备

  • 更新服务状态:将消费者状态从START_FAILED切换为RUNNING,标志启动成功。

  • 订阅信息同步:向 NameServer 同步订阅信息,确保 Broker 知晓消费者的订阅关系。

三、消息拉取PullAPIWrapper

消息拉取由pullAPIWrapper主导。pullKernelImpl方法构建请求并发送到Broker,Broker在无消息时挂起请求,超时后返回结果。消费者接收到响应后,解析PullResult。

主要属性

 /*** 网路通信组件*/private final MQClientInstance mQClientFactory;/*** 消费组*/private final String consumerGroup;/*** 单元*/private final boolean unitMode;/*** 消息队列和broker机器的映射关系*/private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(32);/*** 是否连接broker*/private volatile boolean connectBrokerByUser = false;private volatile long defaultBrokerId = MixAll.MASTER_ID;private Random random = new Random(System.currentTimeMillis());/*** 过滤消息的钩子*/private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

拉取方法

 /*** 拉取消息的核心方法* @param mq 从哪个MessageQueue中拉取消息* @param subExpression 子表达式* @param expressionType 表达式类型* @param subVersion 子版本号* @param offset 拉取偏移量* @param maxNums 拉取的最大数量* @param sysFlag 系统标志* @param commitOffset 我们已经拉取处理完毕的数据偏移量 做一个提交* @param brokerSuspendMaxTimeMillis broker挂起最大的时间戳* @param timeoutMillis 拉取消息的超时时间* @param communicationMode 通信模式* @param pullCallback 回调的接口* @return* @throws MQClientException* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//根据messageQueue获取broker地址 可以针对针对这个MessageQueue重新计算FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);//未找到broker 从nameServer中进行更新数据 然后进行再次寻找if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {{// check versionif (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}//构建拉取消息的请求头PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);requestHeader.setBname(mq.getBrokerName());String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}//拉取消息PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}

四、RebalanceImpl负载均衡

RebalanceImpl#rebalanceByTopic 是 RocketMQ 实现消费者负载均衡的核心方法,其主要作用是根据当前集群中消费者实例和消息队列的状态,动态分配每个消费者应负责消费的队列。这个过程确保了消息消费的均匀性和高可用性,避免了部分消费者过载而其他消费者空闲的情况。

 private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {//订阅的topic 有哪些queuesSet<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//查询consumer分组中有哪些consumer实例List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);//获取配置的负载均衡策略(默认:平均分配)AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {//执行负载均衡算法,计算当前消费者应分配的队列allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}//更新consumer要进行处理的消息队列boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}}

 核心功能概述

rebalanceByTopic 方法会针对指定主题(Topic)执行以下操作:

  • 获取当前消费组内所有活跃的消费者实例列表

  • 获取该主题下的所有消息队列(MessageQueue)

  • 根据负载均衡策略(如平均分配、轮询等)计算每个消费者应分配的队列

  • 更新本地队列分配信息,并触发消息拉取或停止不必要的拉取任务

负载均衡会在以下情况触发

  • 消费者启动时

  • 消费者实例数量变化(新实例加入或旧实例退出)

  • 主题的队列数量变化(如 Broker 动态调整队列数)

  • 定时任务(默认每 20 秒执行一次)

五、总结

通过对DefaultMQPushConsumerImpl了解,我们清晰地看到了 RocketMQ 消息推送消费的完整实现逻辑。从类结构设计到各个核心流程的源码细节,每一部分都紧密协作,共同保障了消息消费的高效性和可靠性。理解这些源码,开发者能够更好地进行性能优化、问题排查和功能扩展,让 RocketMQ 在实际项目中发挥更大价值。

相关文章:

  • SAP 在 AI 与数据统一平台上的战略转向
  • 测试(面经 八股)
  • 2025年—Comfyui聚合插件:Comfyui-LayerStyle 超多实用功能 | 附各功能模型
  • LeetCode 118 杨辉三角 (Java)
  • 【面试篇 9】c++生成可执行文件的四个步骤、悬挂指针、define和const区别、c++定义和声明、将引用作为返回值的好处、类的四个缺省函数
  • 如何使用Jmeter进行压力测试?
  • WPF 播放器(AudioPlayer 2025)
  • Java中栈的多种实现类详解
  • 23、字节对齐
  • Xxl-job——源码设计思考
  • 数据通信基础
  • Axure应用交互设计:注册登录页完整交互设计
  • 【Linux操作系统】基础开发工具(yum、vim、gcc/g++)
  • sendDefaultImpl call timeout(rocketmq)
  • 【Python】屏幕像素颜色值的获取
  • SpringBoot-17-MyBatis动态SQL标签之常用标签
  • Unity的日志管理类
  • 【CF】Day78——⭐Codeforces Round 876 (Div. 2) D (LIS | 思维 | DP)
  • 15-Oracle 23ai Vector Search Similarity Search-向量相似性和混合搜索-实操
  • go工具库:hertz api框架 hertz client的使用
  • 网站建设需要哪些工作室/重庆百度推广
  • 做封面的软件ps下载网站/seo营销网站的设计标准
  • pdf怎么做电子书下载网站/by72777最新域名查询
  • 长安做网站价格/推广一般去哪发帖
  • 南京建设银行官方网站/软件开发工资一般多少
  • 服务器网站建设维护合同/花西子网络营销案例分析