当前位置: 首页 > news >正文

Kafka消费者相关原理

前言

前面已经介绍了Kafka的架构知识并引出了Kafka的相关专业名称进行解释

也介绍了Kafka生产者相关的原理

这次分享一下Kafka对消费者接收消息进行处理的运行机制和原理

消费者接收到消息之后,就是进行消费,但是消费者要维护Kafka中消费者偏移量主题的消息

所以消费者消费的时候一定是需要给消费者偏移量主题提交 消费者组/消费主题分区/偏移量这些消费者消费的信息,让Kafka维护消息偏移量信息的

消费者提交消息偏移量

自动提交

每隔 5 秒钟自动把最近拉取到的消息的 offset 提交给 Kafka 的 消费者偏移量主题

可以自己设置是否自动提交和自动提交消息偏移量的间隔

优点: 批量提交,减少提交压力

缺点:

        实时同步性差,若消费者挂掉,重新启动

                如果之前的消息没有消费完就提交了,会造成消息丢失

                如果之前的消息消费完了还没来得及提交,下次消费者还按之前的消息偏移量进行消费,会造成消息重复消费

手动提交

消费者消费完消息之后手动对偏移量进行提交

设置完手动提交需要自己提交消息偏移量,如果设置了手动提交但是不手动提交消息的偏移量此时消息偏移量没有维护,会导致重复消费

手动同步提交

手动同步提交就是当前线程提交消息偏移量阻塞,等待提交成功

手动异步提交

手动异步提交就是当前线程提交消息偏移量不阻塞,异步调用消费者提供回调方法来对异步提交结果进行处理

消费者长轮询拉取消息

拉取消息数量:默认每次拉取最多 500 条消息

长轮询等待时间:如果当前批次的消息不足 500 条,消费者会阻塞等待 最多 1 秒,期间 Broker 会推送新到达的消息

总结: 够500条直接返回,不够最多拉取一秒也返回

其中消费者长轮询拉取消息条数和长轮询等待时间是可以设置的

拉取完之后消费者就开始对消息进行消费了,当消费者消费完这次拉取的消息就会再次长轮询拉取消息

所以就引出了下面的消费者运行机制

消费者两次长轮询拉取消息间隔如果超过30秒就认定这个消费者处理业务能力差就把它剔除出消费者组。触发rebalance机制,rebalance机制会造成性能开销

消费者两次长轮询拉取消息间隔时间是可以设置的,需要根据消费者能力快慢进行设置

消费者的健康状态检查

消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出 消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费

新消费者指定分区和偏移量、时间消费

Kafka可以指定具体主题的分区进行消息的拉取

并且指定从这个主题分区的那个消息偏移量开始消费

如: 从头开始消费、指定从第几个偏移量开始消费、指定从过去的某个具体时间进行消费

其中,指定从过去某个时间进行消费的原理步骤

先通过指定过去的某个时间找到主题分区的偏移量位置,然后使用这个偏移量调用指定偏移量开始消费

所以需要配置消费者的消息偏移量配置

新消费组的消费偏移量规则

新消费组中的消费者在启动以后,默认消费新消息                          

        Latest:默认的,消费新消息

当然也可以通过配置让新消费组中的消费组从头开始消费消息

        earliest:第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)

所以需要配置消费组的消息偏移量设置

其中,当消费者消息偏移量配合和消费组消息偏移量配置冲突的时候,优先消费者偏移量配置

http://www.dtcms.com/a/323824.html

相关文章:

  • 第4章 程序段的反复执行4 多重循环练习(题及答案)
  • Audio Flamingo
  • 网站升级https地址方法
  • LeetCode每日一题,2025-8-10
  • jmeter常规压测【读取csv文件】
  • BGP HCIP
  • 繁花深处:花店建设的时代意义与多元应用—仙盟创梦IDE
  • 农经权二轮延包—已有软件与后续研究
  • 线性代数1000题学习笔记
  • 从街亭失守看管理
  • Datawhale AI 夏令营——全球AI攻防挑战赛(AIGC技术-图像方向)
  • LLaMA-Adapter V2 Parameter-Efficient Visual Instruction Model
  • 快速了解DBSCAN算法
  • 分布微服务电商订单系统Rust编码开发[下]
  • 数据结构:树
  • 分布微服务电商订单系统Rust编码开发[上]
  • 代码随想录算法训练营第六十天|图论part10
  • sqllabs——Less1
  • 【每天一个知识点】深度领域对抗神经网络
  • 医防融合中心-智慧化慢病全程管理医疗AI系统开发(下)
  • 零基础学Java第二讲---数据类型与变量
  • 什么是ABA问题?
  • Day 10: Transformer完整架构详解 - 从位置编码到编解码器的全面剖析
  • 【QT】常⽤控件详解(七)容器类控件 GroupBox TabWidget 布局管理器 Spacer
  • 大型动作模型LAM:让企业重复任务实现80%效率提升的AI技术架构与实现方案
  • 复杂项目即时通讯从android 5升级android x后遗症之解决 ANR: Input dispatching timed out 问题 -优雅草卓伊凡
  • 【东枫科技】 FR2 Massive MIMO 原型验证与开发平台,8*8通道
  • Linux 系统中,如何处理信号以避免竞态条件并确保程序稳定性?
  • 【实证分析】上市公司技术创新持续性数据分析-含代码(2008-2023年)
  • 【嵌入式】嵌入式硬件相关基础知识