zk管理kafkakafka-broker通信
系列文章目录
文章目录
- 系列文章目录
- 一、作用和不足
- 作用
- 不足
- 二、zk全部宕机后,kafka集群能正常收发消息吗?
- 三、controller功能和如何选举
- 四、kafka发送数据到client
一、作用和不足
作用
2.8版本以前,kafka使用zk管理并存储kafka的关键元数据,每个kafka集群都有broker作为控制器,是zk watcher 选举产生的。controller不仅像其他普通节点一样存储主体分区日志和处理消费生成数据请求,而且还维护着kafka集群元数据,比如broker id,主题分区,leader, isr信息。它将这些信息持久化到zk中。
0.9版本以前,controller通过单线程循环的方式,将更新后的元数据同步给其他broker节点,生产者和消费者都需要直接连接到zk集群进行发送和消费数据,现在这种方式已经被通过 bootstrap.servers连接的方式取代
不足
在一个健康的zk集群中,每时每刻只有一个健康的leader节点,zk必须保证半数以上的节点可用,才能保证zk集群可用。浪费资源。
zk和kafka是两个不同的系统,运维工作加倍。
controller节点存在单点故障。
controller宕机的话,新controller 需要从zk拉取所有分区的元数据信息,向每个broker节点发送leaderAndISR和updateMetaData请求,时间复杂度是O(num.partitions)。
broker宕机的话, 受控关机时间长,尤其是某个broker节点上分布的分区leader非常多时,需要重选leader,重新拉取元数据信息,分区越多重启时间越长,时间复杂度是O(partitions.leader)。
二、zk全部宕机后,kafka集群能正常收发消息吗?
能够正常收发消息。但是在kafka的server log里面,会报错:xxxx拒绝连接
原因是,kafka0.9以后,zk的功能被弱化,用来帮助kafka集群选举controller节点等基本操作,zk节点宕机后,kafka集群只是不能再发现新的broker节点加入了,但是不影响既有的节点收发日志。
三、controller功能和如何选举
- 分区 Leader 选举
当某个 Broker 宕机或重新加入时,Controller 负责重新选举该 Broker 上的分区 Leader。
确保每个分区都有一个 Leader 和多个 Follower(副本)。 - 分区分配与重平衡
在集群扩容、Broker 故障恢复等情况下,Controller 会重新分配分区到不同的 Broker 上,以实现负载均衡。
控制 Rebalance(再平衡)过程,确保所有副本同步。 - 元数据管理
维护并更新集群的元数据信息,如:
Topic 的分区数量
分区的 Leader 和副本信息
Broker 的状态(在线/离线)
Controller 自己的状态(主/备) - 监控 Broker 状态
检测 Broker 是否正常运行。
如果某个 Broker 失联,Controller 会触发相应的处理逻辑(如重新分配分区)。 - 控制副本同步
确保副本之间的数据同步,防止数据丢失
集群的启动中,如何选出controller: 抢占机制
集群运行中,一些broker节点压力过大,导致controller反应慢,由于没有超时,zk无法判断是否下线,但是导致某些分区选主超时,可以删除controller临时节点,zk触发controller重新选主。健康的broker节点抢先注册成为controller, epoch任期号是2.
path: /controller path:/controller_epoch
value:{broker: 1} value:2
四、kafka发送数据到client
向broker节点传输数据是通过networkclient完成的。kafka客户端的networkclient类使用java nio 实现与broker节点通信,源代码可以看Selector类,
producer获取元数据信息,判断需要发送的topic分区位于哪个broker节点,selector和具体的broker节点创建一个socket 连接。this.nioSelector = java.nio.channels.Selector.open();
具体是如何连接的呢?看源码
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {//1.检查是否和Node重复建立连接,确保每个channel当下只能有一个Node连接ensureNotRegistered(id);//2.创建SocketChannel对象SocketChannel socketChannel = SocketChannel.open();SelectionKey key = null;try {/*** 3.配置SocketChannel对象* 将SocketChannel设置为非阻塞模式,keepAlive设置为true,TCP_NODELAY设置为true* 指定发送和接收缓冲区的大小(如果不是默认值)* */configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);/*** 4.确认连接是否真正建立* 因为是非阻塞模式,scoketChannel.connect()只是尝试发起一个连接请求,并不保证连接一定建立成功* connect()返回true表示连接已经建立成功,返回false表示连接请求已经发出但还没有建立成功* 因此boolean connected会立即返回连接状态,但是并一定是true* */boolean connected = doConnect(socketChannel, address);/*** 5.注册SocketChannel对象到Selector* 注册感兴趣的事件为OP_CONNECT,表示当连接建立成功时,Selector会收到通知* */key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);if (connected) {// OP_CONNECT won't trigger for immediately connected channelslog.debug("Immediately connected to node {}", id);//6.1 将key加入immediatelyConnectedKeys集合,表示该连接已经建立成功immediatelyConnectedKeys.add(key);//6.2 将key的感兴趣事件清空,因为连接已经建立成功,不需要再监听OP_CONNECT事件key.interestOps(0);}} catch (IOException | RuntimeException e) {if (key != null)immediatelyConnectedKeys.remove(key);channels.remove(id);socketChannel.close();throw e;}}
Selector类中还有一个poll()方法,获取准备就绪的SelectionKey,代表的是当前SocketChannel上已经就绪的时间,然后就可以 遍历这些SelectionKey来执行在interestSets中的兴趣操作
在KafkaProducer和KafkaConsumer中,负责与Broker通信的线程会重复调用NetworkClient的poll(), 在poll()中,会执行selector的select()方法,并以IO多路复用的方式处理与Broker之间的通信
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {//获取所有准备就绪的SelectionKeySet<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
sender子线程将封装好的发送请求缓存在InFlightRequests中,然后由NIO请求将消息发送到broker
InFlightRequests中的一个Deque中可以缓存多个sender创建好的ProduceRequest。“飞行中的请求”指的是请求发出去,尚未收到broker端的响应
其中Deque大小由参数max.in.flight.requests.per.connection决定,默认是5.如果将幂等性参数设置为true,broker会按照请求的先后顺序处理produceRequest;反之如果设置为false,只能将max.in.flight.requests.per.connection设置为1,才能保证顺序发送消息