当前位置: 首页 > news >正文

zk管理kafkakafka-broker通信

系列文章目录

文章目录

  • 系列文章目录
  • 一、作用和不足
    • 作用
    • 不足
  • 二、zk全部宕机后,kafka集群能正常收发消息吗?
  • 三、controller功能和如何选举
  • 四、kafka发送数据到client


一、作用和不足

作用

2.8版本以前,kafka使用zk管理并存储kafka的关键元数据,每个kafka集群都有broker作为控制器,是zk watcher 选举产生的。controller不仅像其他普通节点一样存储主体分区日志和处理消费生成数据请求,而且还维护着kafka集群元数据,比如broker id,主题分区,leader, isr信息。它将这些信息持久化到zk中。

0.9版本以前,controller通过单线程循环的方式,将更新后的元数据同步给其他broker节点,生产者和消费者都需要直接连接到zk集群进行发送和消费数据,现在这种方式已经被通过 bootstrap.servers连接的方式取代

不足

在一个健康的zk集群中,每时每刻只有一个健康的leader节点,zk必须保证半数以上的节点可用,才能保证zk集群可用。浪费资源。

zk和kafka是两个不同的系统,运维工作加倍。

controller节点存在单点故障。

controller宕机的话,新controller 需要从zk拉取所有分区的元数据信息,向每个broker节点发送leaderAndISR和updateMetaData请求,时间复杂度是O(num.partitions)。

broker宕机的话, 受控关机时间长,尤其是某个broker节点上分布的分区leader非常多时,需要重选leader,重新拉取元数据信息,分区越多重启时间越长,时间复杂度是O(partitions.leader)。

二、zk全部宕机后,kafka集群能正常收发消息吗?

能够正常收发消息。但是在kafka的server log里面,会报错:xxxx拒绝连接
原因是,kafka0.9以后,zk的功能被弱化,用来帮助kafka集群选举controller节点等基本操作,zk节点宕机后,kafka集群只是不能再发现新的broker节点加入了,但是不影响既有的节点收发日志。

三、controller功能和如何选举

  1. 分区 Leader 选举
    当某个 Broker 宕机或重新加入时,Controller 负责重新选举该 Broker 上的分区 Leader。
    确保每个分区都有一个 Leader 和多个 Follower(副本)。
  2. 分区分配与重平衡
    在集群扩容、Broker 故障恢复等情况下,Controller 会重新分配分区到不同的 Broker 上,以实现负载均衡。
    控制 Rebalance(再平衡)过程,确保所有副本同步。
  3. 元数据管理
    维护并更新集群的元数据信息,如:
    Topic 的分区数量
    分区的 Leader 和副本信息
    Broker 的状态(在线/离线)
    Controller 自己的状态(主/备)
  4. 监控 Broker 状态
    检测 Broker 是否正常运行。
    如果某个 Broker 失联,Controller 会触发相应的处理逻辑(如重新分配分区)。
  5. 控制副本同步
    确保副本之间的数据同步,防止数据丢失

集群的启动中,如何选出controller: 抢占机制

集群运行中,一些broker节点压力过大,导致controller反应慢,由于没有超时,zk无法判断是否下线,但是导致某些分区选主超时,可以删除controller临时节点,zk触发controller重新选主。健康的broker节点抢先注册成为controller, epoch任期号是2.
path: /controller path:/controller_epoch

value:{broker: 1} value:2

四、kafka发送数据到client

向broker节点传输数据是通过networkclient完成的。kafka客户端的networkclient类使用java nio 实现与broker节点通信,源代码可以看Selector类,

producer获取元数据信息,判断需要发送的topic分区位于哪个broker节点,selector和具体的broker节点创建一个socket 连接。this.nioSelector = java.nio.channels.Selector.open();

具体是如何连接的呢?看源码

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {//1.检查是否和Node重复建立连接,确保每个channel当下只能有一个Node连接ensureNotRegistered(id);//2.创建SocketChannel对象SocketChannel socketChannel = SocketChannel.open();SelectionKey key = null;try {/*** 3.配置SocketChannel对象* 将SocketChannel设置为非阻塞模式,keepAlive设置为true,TCP_NODELAY设置为true* 指定发送和接收缓冲区的大小(如果不是默认值)* */configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);/*** 4.确认连接是否真正建立* 因为是非阻塞模式,scoketChannel.connect()只是尝试发起一个连接请求,并不保证连接一定建立成功* connect()返回true表示连接已经建立成功,返回false表示连接请求已经发出但还没有建立成功* 因此boolean connected会立即返回连接状态,但是并一定是true* */boolean connected = doConnect(socketChannel, address);/*** 5.注册SocketChannel对象到Selector* 注册感兴趣的事件为OP_CONNECT,表示当连接建立成功时,Selector会收到通知* */key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);if (connected) {// OP_CONNECT won't trigger for immediately connected channelslog.debug("Immediately connected to node {}", id);//6.1 将key加入immediatelyConnectedKeys集合,表示该连接已经建立成功immediatelyConnectedKeys.add(key);//6.2 将key的感兴趣事件清空,因为连接已经建立成功,不需要再监听OP_CONNECT事件key.interestOps(0);}} catch (IOException | RuntimeException e) {if (key != null)immediatelyConnectedKeys.remove(key);channels.remove(id);socketChannel.close();throw e;}}

Selector类中还有一个poll()方法,获取准备就绪的SelectionKey,代表的是当前SocketChannel上已经就绪的时间,然后就可以 遍历这些SelectionKey来执行在interestSets中的兴趣操作
在KafkaProducer和KafkaConsumer中,负责与Broker通信的线程会重复调用NetworkClient的poll(), 在poll()中,会执行selector的select()方法,并以IO多路复用的方式处理与Broker之间的通信

 if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {//获取所有准备就绪的SelectionKeySet<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

sender子线程将封装好的发送请求缓存在InFlightRequests中,然后由NIO请求将消息发送到broker
在这里插入图片描述

InFlightRequests中的一个Deque中可以缓存多个sender创建好的ProduceRequest。“飞行中的请求”指的是请求发出去,尚未收到broker端的响应
其中Deque大小由参数max.in.flight.requests.per.connection决定,默认是5.如果将幂等性参数设置为true,broker会按照请求的先后顺序处理produceRequest;反之如果设置为false,只能将max.in.flight.requests.per.connection设置为1,才能保证顺序发送消息

http://www.dtcms.com/a/393128.html

相关文章:

  • 前端开发技术趋势Web Components
  • Python tarfile库详解
  • ​​[硬件电路-287]:高性能六通道数字隔离器CA-IS3763L 功能概述与管脚定义
  • 错题集系统接口文档
  • 【RAG-LLM】InfoGain-RAG基于文档信息增益的RAG
  • Browser-Use深度解析:重新定义AI与浏览器的智能协作
  • 【Mysql】事务隔离级别、索引原理、/redolog/undolog/binlog区别、主从复制原理
  • AWS 全景速查手册
  • 小米Openvela城市沙龙
  • Python数据分析:求矩阵的秩。啥是矩阵秩?听故事学线代并用Python实现,娘来太容易学会了!
  • UI Toolkit自定义元素
  • redis未授权访问-漏洞复现
  • PR调节器与PI调节器的区别
  • Unity核心概念⑫:碰撞检测
  • 【读论文】面向工业的ASR语音大模型
  • 重谈IO——五种IO模型及其分类
  • 数据库造神计划第十七天---索引(2)
  • 【开题答辩实录分享】以《车联网位置信息管理软件》为例进行答辩实录分享
  • (3)机器学习-模型介绍
  • 如何在 Ubuntu 20.04 LTS 上安装 MySQL 8
  • MuMu模拟器使用入门实践指南:从ADB连接到Frida动态分析
  • 条款5:优先选用auto, 而非显示类型声明
  • 强化学习原理(一)
  • 解读43页PPT经营分析与决策支持系统建设方案交流及解决经验
  • ubuntu24设置证书登录及问题排查
  • MySQL 备份与恢复完全指南:从理论到实战
  • 2011/12 JLPT听力原文 问题四
  • 实战free_s:在高并发缓存系统中落地“内存释放更安全——free_s函数深度解析与free全方位对比”
  • 异步通知实验
  • 用 C 语言模拟面向对象编程