当前位置: 首页 > news >正文

Kafka Consumer工作流程

 Kafka Consumer工作流程图

 

  • 1、启动与加入组

    • 消费者启动后,会向 Kafka 集群中的某个 Broker 发送请求,请求加入特定消费者组。这个 Broker 中的消费者协调器(Consumer Coordinator)负责管理消费者组相关事宜。
  • 2、组内分区分配(Rebalance)

    • 消费者协调器会对消费者组内的消费者进行分区分配。一个消费者组订阅某个 Topic 时,该 Topic 的每个分区只能由组内一个消费者消费 ,但一个消费者可消费多个分区数据 。比如图中TopicA的不同分区,会分配给组内不同消费者。当组内消费者数量变化,或 Topic 分区数量改变时,会触发 Rebalance,重新分配分区。
  • 3、确定消费位置(获取 Offset)

    • 消费者从系统主题__consumer_offsets中获取自己上次提交的偏移量(Offset ),它标识着消费者在分区中上次消费到的位置。若首次消费或没有可查询的偏移量记录,可能从分区起始位置(最早消息 )或最新位置(最新消息 )开始消费,这取决于配置策略。
  • 4、消息拉取

    • 消费者根据分配到的分区,向对应分区的 Leader 副本所在 Broker 发起拉取请求(如向图中broker0上的TopicA - partition0 - leader拉取 )。消费者可配置每次拉取消息的最大数量、最大字节数等参数。若 Broker 当前没有新消息,消费者可能收到空响应,也可设置等待策略,直到有新数据才返回 。
  • 5、消息处理

    • 反序列化:拉取到的消息通常是序列化的字节数组,消费者利用配置的key.deserializervalue.deserializer进行反序列化,将其转换为程序可处理的对象格式。
    • 业务逻辑处理:对反序列化后的消息,依据具体业务需求进行处理,如写入数据库、进行计算分析等。处理过程中要兼顾可靠性和性能,防止消息积压。
  • 6、偏移量提交

    • 消费者处理完消息后,需将当前消费到的偏移量提交到__consumer_offsets 。可选择自动提交(配置enable.auto.commit=true ,默认每 5 秒提交一次 ),优点是简单,但可能导致重复消费或消息丢失;也可手动提交,开发者在确保消息处理完成后提交,能更精准控制消费位置,保证消息准确消费 。

文章转载自:
http://antipsychotic.apjjykv.cn
http://apartheid.apjjykv.cn
http://apostleship.apjjykv.cn
http://brelogue.apjjykv.cn
http://abyssal.apjjykv.cn
http://arkansan.apjjykv.cn
http://bicyclist.apjjykv.cn
http://caddish.apjjykv.cn
http://caravel.apjjykv.cn
http://agnosia.apjjykv.cn
http://autocoherer.apjjykv.cn
http://anticarcinogenic.apjjykv.cn
http://bale.apjjykv.cn
http://calque.apjjykv.cn
http://aetatis.apjjykv.cn
http://campaigner.apjjykv.cn
http://batt.apjjykv.cn
http://accept.apjjykv.cn
http://achromate.apjjykv.cn
http://attap.apjjykv.cn
http://alopecia.apjjykv.cn
http://berber.apjjykv.cn
http://aarnet.apjjykv.cn
http://chicana.apjjykv.cn
http://abbreviate.apjjykv.cn
http://bifrost.apjjykv.cn
http://calabrian.apjjykv.cn
http://bytecode.apjjykv.cn
http://ceremonially.apjjykv.cn
http://cardiocirculatory.apjjykv.cn
http://www.dtcms.com/a/212877.html

相关文章:

  • JVM 的类加载机制
  • 贪心算法应用:贝尔曼-福特松弛问题详解
  • 贪心算法应用:Ford-Fulkerson最大流问题详解
  • 自训练NL-SQL模型
  • webpack优化方法
  • Linux系统之----磁盘硬件
  • 【C++进阶篇】红黑树的封装(赋源码)
  • 线程池实战——数据库连接池
  • Python中字典(dict)知识详解应用
  • Vue.extend
  • CentOS7更新 GLIBC 2.25
  • 区块链可投会议CCF C--APSEC 2025 截止7.13 附录用率
  • ISO 26262-5 区分失效模式
  • 阿里千问系列:Qwen3技术报告解读(下)
  • 英语科研词汇现象及语言演变探讨
  • 用 Python 构建自动驾驶的实时通信系统:让车辆“交流”起来!
  • YOLOV8涨点技巧之空间通道协作注意力(SCCA)-应用于自动驾驶领域
  • 类欧几里得算法(floor_sum)
  • git 把一个分支A的某一个 commit 应用到另一个分支B上
  • LLM 使用本地模型 提取新生成 文本 的token ID序列
  • 使用中文作为map的可以,需要注意什么
  • 差分数组知识笔记
  • java 加密算法的简单使用
  • 医学写作人才管理策略
  • Leetcode 刷题记录 11 —— 二叉树第二弹
  • 获取 Stream 对象的方式
  • 内存管理(第五、六章)
  • RocketMQ 深度解析:消息中间件核心原理与实践指南
  • AUTOSAR图解==>AUTOSAR_SRS_ICUDriver
  • 关于 Web 安全:5. 认证绕过与权限控制分析