当前位置: 首页 > news >正文

RocketMQ 4.9.3源码解读-客户端Consumer消费者组件启动流程分析

作者源码阅读笔记主要采用金山云文档记录的,所有的交互图和代码阅读笔记都是记录在云文档里面,本平台的文档编辑实在不方便,会导致我梳理的交互图和文档失去原来的格式,所以整理在文档里面,供大家阅读交流

【金山文档 | WPS云文档】 Consumer组件启动流程分析

相关重要类介绍

DefaultMQPushConsumer Push消费者

名称

描述

默认值

consumerGroup

消费组

messageModel

消息模型

  1. 集群消费(Clustering):同一个消息主题的消息会被集群中的多个消费者消费,但是一条消息只会被其中一个消费者消费。
  2. 广播消费(Broadcasting):同一个消息主题的消息会被集群中的所有消费者广播式地消费,即每个消息会被所有消费者消费一次。

默认CLUSTERING

consumeFromWhere

表示从什么位点开始消费

CONSUME_FROM_LAST_OFFSET:从上次消费的位点开始消费,相当于断点继续。

CONSUME_FROM_FIRST_OFFSET:从 ConsumeQueue 的最小位点开始消费。

CONSUME_FROM_TIMESTAMP:从指定时间开始消费。

默认CONSUME_FROM_LAST_OFFSET

consumeTimestamp

表示从哪一时刻开始消费,格式为 yyyyMMDDHHmmss

默认为半小时前。

consumeFromWhere = consumeTimestamp 时,consumeTimestamp 设置的值才生效。

allocateMessageQueueStrategy

消费Queue分配策略管理器

subscription

订阅关系,表示当前消费者订阅了哪些 Topic 的哪些 Tag

messageListener

消息 Push 回调监听器

offsetStore

存储和管理消费者消费进度的组件。

集群模式位点会持久化到 Broker 中。

广播模式持久化到本地文件中。

两个实现类:

RemoteBrokerOffsetStoreLocalFileOffsetStore

consumeThreadMin

消费者消费消息最小线程数量

[1, 1000]

默认20

consumeThreadMax

消费者消费消息最大线程数量

[1, 1000]

默认20

adjustThreadPoolNumsThreshold

动态调整消费线程池的线程数大小

默认 100000

consumeConcurrentlyMaxSpan

单队列并行消费允许的最大跨度

[1, 65535]

默认 2000

pullThresholdForQueue

队列级别的流量控制阈值,拉消息本地队列缓存消息最大数

[1, 65535]

默认 1000

pullThresholdSizeForQueue

在队列级别限制缓存的消息大小

默认情况下每个消息队列最多缓存100MiB消息

[1, 1024]

默认 100M

pullThresholdForTopic

Topic的拉取消息数量的阈值

[1, 6553500]

默认-1,不限制

pullThresholdSizeForTopic

Topic的拉取消息大小的阈值,单位是MB。默认为-1

[1, 102400]

默认-1,不限制

pullInterval

拉取间隔,单位为ms

[0, 65535]

默认 0

consumeMessageBatchMaxSize

消费者每次批量消费时,最多消费多少条消息

[1, 1024]

默认 1

pullBatchSize

一次最多拉取多少条消息

[1, 1024]

默认 32

postSubscriptionWhenPull

每次拉取消息时是否更新订阅关系

默认 false

maxReconsumeTimes

最大重试次数

In concurrently mode, -1 means 16;

In orderly mode, -1 means Integer.MAX_VALUE.

默认 -1

http://www.dtcms.com/a/332060.html

相关文章:

  • 具身智能Scaling Law缺失:机器人界的“摩尔定律“何时诞生?
  • Ansible企业级实战
  • centos部署chrome和chromedriver
  • C#WPF实战出真汁03--登录界面设计
  • C#WPF实战出真汁04--登录功能实现
  • 单目操作符与逗号表达式
  • CoreShop商城框架开启多租户(2)
  • 莫队 + 离散化 Ann and Books
  • 【19-模型训练细节 】
  • 业务敏捷性对SAP驱动型企业意味着什么?如何保持企业敏捷性?
  • 零信任架构(Zero Trust Architecture, ZTA)(通过动态验证和最小权限控制,实现对所有访问请求的严格授权和持续监控)
  • latex 中破折号的输入
  • 介绍java中atomic及相关类
  • PERC初探暨小试牛刀
  • Vue3 vxeTree树形组件完全指南:从入门到精通的完整使用教程
  • QT6(可视化UI设计代码实现)
  • MATLAB实现图像增强(直方图均衡化)
  • 数学分析| 极限论| 1.数列极限常用方法总结
  • App冷启动阶段Open Dexfiles实现原理【ART虚拟机系列2】
  • docker nginx 定时脚本保存30天日志信息
  • MFC的使用——使用ChartCtrl绘制曲线
  • 2025.8.13~14 实习总结
  • 计算机网络技术学习-day1《网络乾坤:从比特洪流到协议星河的奇幻之旅》​
  • MCU中的LTDC(LCD-TFT Display Controller)
  • 网卡聚合teamdctl
  • 大模型技术栈全景
  • Java 图片像素碰撞检测
  • Linux软件编程-进程(1)
  • 【嵌入式C语言】四
  • 【PCB设计经验】3D模型在线预览!效率便捷!