当前位置: 首页 > news >正文

网站建设维护面试英雄联盟网站模板

网站建设维护面试,英雄联盟网站模板,本地搭建linux服务器做网站,职业生涯规划大赛活动目的消费者总体工作流程 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费…

消费者总体工作流程

        Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组初始化流程

        1、coordinator:辅助实现消费者组的初始化和分区的分配。 coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量) 例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset;

        2、coordinator选出一个 consumer作为leader;

        3、coordinator把要消费的topic情况发送给leader消费者;

        4、leader会负责制定消费方案;

        5、把消费方案发给coordinator;

        6、Coordinator就把消费方 案下发给各个consumer;

        7、每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的时间过长(max.poll.interval.ms5分钟),也会触发再平衡

消费者组详细消费流程

        左侧为Kafka集群,右侧为消费者组,消费者创建网络连接客户端,消费者组调用sendFetches,抓取数据,同时还会准备两个参数,Fetch.min.bytes:每批次最小抓取大小,默认1字节,fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms,任一条件满足,都会拉取数据;Fetch.max.bytes每批次最 大抓取大小,默认50m

        send->拉取数据将数据放进completedFetches队列,消费者一批次拉取默认500条进行处理:反序列化->拦截器->处理数据

package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {//配置Properties properties = new Properties();//链接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//1.创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2。订阅主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//拉数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

http://www.dtcms.com/a/536061.html

相关文章:

  • 网络安全:金盾 RASP 应用防护
  • Cursor MCP Java程序员从零开始实战教程 第一章-第四节-MCP服务器安装与配置
  • __金仓数据库替代MongoDB护航医疗隐私:医院患者随访记录安全存储实践__
  • 有没有教做衣服的网站济南建设工程交易中心网站
  • MongoDB使用命令行导出导入索引
  • __金仓数据库平替MongoDB全栈安全实战:从文档存储到多模一体化的演进之路__
  • Python爬虫实战:新闻数据抓取与MongoDB存储全流程
  • 一站式搭建WordPress网站与Nginx RTMP流媒体服务
  • 使用 EasyExcel 进行 多 Sheet 导出
  • 做游戏网站赚钱么云服务器怎么用详细步骤
  • 建设网站的技术回龙观手机网站开发服务
  • 边缘计算与物联网中的 MDM和OTA
  • Linux物联网常用7天循环视频录制软件架构解决方案
  • Arguments: ls-remote --tags --heads git://github.com/adobe-webplatform/eve.git
  • Glide 图片缓存:异步更新 + 动画支持 + 自定义目录+自定义刷新时效
  • SWAT模型应用
  • 界面控件DevExpress WPF v25.1 - 官宣支持Avalonia XPF
  • HarmonyOS应用日志HiLog:从基础使用到高级调试技术
  • 系统架构设计师备考第55天——数据库设计融合物联网层次架构案例分析
  • 加查网站建设乌海seo
  • 北京金港建设股份有限公司网站wordpress缓存清理
  • Deepseek大模型结合Chrome搜索爬取2025AI投资趋势数据
  • 基于 ComfyUI 的 Stable Diffusion 本地部署与使用教程(Windows + CUDA12.0)
  • HTTPS 端口,443 之外的那些坑与排查实战
  • Stable Diffusion 短视频制作算力需求与优化策略研究
  • ComfyUI本地部署Stable Diffusion:核心组件(Python、PyTorch、CUDA)版本与显卡配置全指南
  • Adobe Pro DC裁剪PDF流程介绍
  • 如何编写 Chrome 插件(Chrome Extension)
  • 怎样做网站吸引人深圳网站seo优化排名公司
  • 比wordpress更好的网站程序外贸高端网站建设