当前位置: 首页 > news >正文

Kafka消费者组

消费者总体工作流程

        Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组初始化流程

        1、coordinator:辅助实现消费者组的初始化和分区的分配。 coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量) 例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset;

        2、coordinator选出一个 consumer作为leader;

        3、coordinator把要消费的topic情况发送给leader消费者;

        4、leader会负责制定消费方案;

        5、把消费方案发给coordinator;

        6、Coordinator就把消费方 案下发给各个consumer;

        7、每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的时间过长(max.poll.interval.ms5分钟),也会触发再平衡

消费者组详细消费流程

        左侧为Kafka集群,右侧为消费者组,消费者创建网络连接客户端,消费者组调用sendFetches,抓取数据,同时还会准备两个参数,Fetch.min.bytes:每批次最小抓取大小,默认1字节,fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms,任一条件满足,都会拉取数据;Fetch.max.bytes每批次最 大抓取大小,默认50m

        send->拉取数据将数据放进completedFetches队列,消费者一批次拉取默认500条进行处理:反序列化->拦截器->处理数据

package com.atguigu.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {

        //配置
        Properties properties = new Properties();

        //链接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //1.创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2。订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        //3.消费数据
        while(true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//拉数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

相关文章:

  • 链接未来:深入理解链表数据结构(二.c语言实现带头双向循环链表)
  • react v-18父组件调用子组件的方法和数据
  • 11种方法判断​软件的安全可靠性​
  • CentOS 7 Tomcat服务的安装
  • 关于“Python”的核心知识点整理大全31
  • 55.0/CSS 的应用(详细版)
  • [Unity]接入Firebase 并且关联支付埋点
  • R语言【cli】——cli_warn可以更便捷的在控制台输出警告信息
  • 数据管理平台Splunk Enterprise本地部署结合内网穿透实现远程访问
  • IDEA版SSM入门到实战(Maven+MyBatis+Spring+SpringMVC) -Spring的AOP前奏
  • 图像处理—小波变换
  • Apache Pulsar 技术系列 - PulsarClient 实现解析
  • 【Spring实战】配置单数据源
  • ICC2:Less than minimum edge length和Concave convex edge enclosure
  • Backend - Django 项目创建 运行
  • 基于查表法的水流量算法设计与实现
  • 漫谈UNIX、Linux、UNIX-Like
  • 2024年第二届“华数杯”国际大学生数学建模竞赛思路及代码
  • 神经网络可以计算任何函数的可视化证明
  • SearchWP WordPress高级网站内容搜索插件
  • 国家统计局:4月份居民消费价格同比下降0.1%
  • 欧洲理事会前主席米歇尔受聘中欧国际工商学院特聘教授,上海市市长龚正会见
  • 暴利之下:宠物殡葬行业的冰与火之歌
  • 晶圆销量上升,中芯国际一季度营收增长近三成,净利增超1.6倍
  • 陕南多地供水形势严峻:有的已呼吁启用自备水井
  • 央行:5月8日起,下调个人住房公积金贷款利率0.25个百分点