一篇文章详解Kafka Broker
目录
工作流程
副本
故障处理
Follower 故障
Leader 故障
Leader Partition自动平衡
文件存储
记录寻找
文件清理
高效读写数据
工作流程

1)Broker 启动与注册(Zookeeper 侧)
Broker 启动后,会在 Zookeeper 的/brokers/ids/路径下注册自己的 ID(图中为[0,1,2])。
2)Controller 选举(Zookeeper 侧)
在 Zookeeper 的controller节点中,所有broker中谁是最早注册的 controller 就是谁。图中选举出brokerid:0的节点作为 Controller。
3)Controller 监听 Broker 变化
被选举的 Controller 会监听 Zookeeper 中brokers节点的变化,以感知集群中 Broker 的存活状态。
4)节点信息上传与同步(Zookeeper 与 Kafka 集群)
Controller 会将 Kafka 集群的节点信息(如分区的 Leader、ISR 等)上传到 Zookeeper 的对应路径(如图中/brokers/topics/first/partitions/0/state,记录了leader:0和isr:[1,0,2])。
集群中其他潜在的 Controller 会从 Zookeeper 同步这些相关信息,保证信息一致性。
5)Leader 故障与感知(Kafka 集群侧)
假设broker1中的TopicA-Partition0的 Leader 挂了(图中用红色叉号标记),Controller 会监听并感知到这一节点变化。
6)Leader 选举
Controller 按照规则发起新的 Leader 选举:
选举前提:候选节点必须在 ISR(同步副本列表) 中存活。
选举规则:在满足前提的情况下,按照 **AR(分区的所有副本列表)** 中排在前面的优先级轮询。例如图中 AR 为[1,0,2]、ISR 为[1,0,2],则按1→0→2的顺序选举。
7)更新 Leader 及 ISR
选举出新的 Leader 后,Controller 会更新 Zookeeper 中对应节点的 Leader 和 ISR 信息,完成故障转移。
8)生产者交互(额外流程)
Kafka Producer 会向当前的 Leader(如初始的broker1的TopicA-Partition0 Leader)发送消息,Leader 处理后向 Producer 应答消息;当 Leader 故障转移后,Producer 会与新选举的 Leader 继续交互。
副本
副本目的是提高数据的可靠性。 Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader, 然后 Follower 找 Leader 进行同步数据。 Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。 AR = ISR + OSR。
ISR:表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被移到 OSR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。
故障处理
LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO 。
Follower 故障
- Follower 发生故障后会被临时踢出 ISR
- 这个期间 Leader 和 Follower 继续接收数据
- 该 Follower 恢复后,Follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步。
- 等该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上 Leader 之后,就可以重新加入 ISR 了。
Leader 故障
- Leader 发生故障之后,会从 ISR 中选出一个新的 Leader
- 为保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 同步数据。
这只能保证副本之间的数据一致性,不能保证数据不丢失或者不重复!
Leader Partition自动平衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,导致负载不均衡。

Kafka提供了Leader Partition自动平衡:
- auto.leader.rebalance.enable,默认是true。 自动Leader Partition 平衡。
- leader.imbalance.per.broker.percentage, 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过 了这个值,控制器会触发leader的平衡。
- leader.imbalance.check.interval.seconds, 默认值300秒。检查leader负载是否平衡的间隔时间。
文件存储
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。
每个segment包括:.index文件、.log文件和.timeindex等文件。 .log :日志文件 ;.index :偏移量索引文件; .timeindex :时间戳索引文件。

记录寻找

1)根据目标 offset 定位 Segment 文件
Kafka 的日志按 Segment 分段存储,每个 Segment 对应一段连续的 offset 范围。首先确定目标 offset=600 所属的 Segment 文件,图中对应的是Segement-1 [offset:522-1004],其索引文件为00000000000000000522.index。
2)找到小于等于目标 offset 的最大 offset 对应的索引项
在.index稀疏索引文件中,查找小于等于目标 offset=600 的最大绝对 offset。图中找到的是587,其对应的相对 Offset 为65,Position 为6410。
3)定位到 log 文件
根据索引项中的 Position(6410),在对应的.log文件(00000000000000000522.log)中定位到该位置。
4)向下遍历找到目标 Record
从定位到的位置开始,在.log 文件中向下遍历 RecordBatch,最终找到 offset=600 对应的 Record。
文件清理
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
- log.retention.hours,最低优先级小时,默认 7 天。
- log.retention.minutes,分钟。
- log.retention.ms,最高优先级毫秒。
- log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
有两种日志清理策略。
delete 日志删除:将过期数据删除
log.cleanup.policy = delete 所有数据启用删除策略
- 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
- 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。 log.retention.bytes,默认等于-1,表示无穷大。
compact 日志压缩
compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
压缩后的offset可能是不连续的,这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息 集里就保存了所有用户最新的资料
高效读写数据
- Kafka 本身是分布式集群,可以采用分区技术,并行度高。
- 读数据采用稀疏索引,可以快速定位要消费的数据。
- 顺序写磁盘,写入log的过程是追加写。
- 页缓存 + 零拷贝技术
什么是页缓存 + 零拷贝技术?

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Broker只负责存储和转发,不关心数据具体是什么,因此数据的处理不走应用层。
PageCache页缓存:是操作系统底层的功能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存来使用。
