分布式专题——22 Kafka集群工作机制详解
1 ZooKeeper 集群数据梳理
-
Kafka 与 ZooKeeper 的关联基础
- Kafka 把状态信息存到 ZooKeeper 里,这些信息能体现每个 Kafka Broker 服务的差异,助力集群化业务能力;
- 由于这些数据需在集群各 Broker 间达成共识,所以得存在所有集群都能访问的第三方存储(ZooKeeper)中;
- 而且这些共识数据要保持强一致性(保证各个 Broker 之间的分工是同步且清晰的),基于 CP 实现的 ZooKeeper 是合适之选,另外 ZooKeeper 的 Watcher 机制还能减少 Broker 读取 ZooKeeper 的次数;
-
Kafka 在 ZooKeeper 上管理的数据相关
- Kafka 整体集群结构里,Producer 向 Broker 推送数据,Consumer 从 Broker 拉取数据;
- 集群中有两个关键状态信息:
- 一是多个 Broker 要选出一个担任 Controller 角色,由其管理整个集群的分区和副本状态;
- 二是同一 Topic 下的多个 Partition 要选出一个 Leader 角色,由该 Leader 分区负责与客户端进行数据交互;
- 这些状态信息都注册到了 ZooKeeper 中;
-
ZooKeeper 中 Kafka 相关数据结构
-
从 ZooKeeper 数据结构来看,有多个与 Kafka 相关的节点,如下图;
-
查看 ZooKeeper 数据可使用 IDEA 中的 ZooKeeper Manager 插件;
-
-
ZooKeeper 中 Kafka 相关节点的特点与示例
- Kafka 往 ZooKeeper 注册的节点大多比较简明;
- 例如集群中每个 Broker 启动后,会往 ZooKeeper 注册临时节点
/brokers/ids/<BrokerId>
,若 Kafka 服务停止,ZooKeeper 上对应的临时节点就会注销。
2 Controller Broker 选举机制
-
在 Kafka 集群开展工作前,需要选举出一个 Broker 来担任 Controller 角色,该角色负责整体管理集群内的分区和副本状态;
-
选举 Controller 的过程是通过抢占 Zookeeper 的
/controller
临时节点来实现的;-
当集群内的 Kafka 服务启动时,会尝试在 Zookeeper 上创建一个
/controller
临时节点,并将自身的 brokerid 写入这个节点,节点内容包含version
、brokerid
、timestamp
、kraftControllerEpoch
等信息; -
节点内容比如:
{"version":2,"brokerid":2,"timestamp":"1723447688383","kraftControllerEpoch":-1}
-
-
Zookeeper 会保证在一个集群中,只有一个 broker 能够成功创建这个
/controller
临时节点。后续其他 broker 创建不成功时,就会注册一个监听,一旦/controller
临时节点被删除,就会重新开始注册/controller
节点,争取成为新的 controller; -
注册成功的 broker 成为集群中的 Controller 节点后,Zookeeper 会维护与这个 broker 的心跳连接,broker 会定期向 Zookeeper 发送心跳以保持连接状态。一旦 Zookeeper 长时间检测不到这个 broker 的心跳信息,就会删除该临时节点。这样就会有下一个 broker 成功注册
/controller
,同时更新version
,成为新的 Controller,这就是 Kafka 基于 Zookeeper 的 Controller 选举机制。 -
选举产生的 Controller 节点,会负责监听 Zookeeper 中的一些关键节点,触发集群的相关管理工作,例如:
-
监听 Zookeeper 中的
/brokers/ids
节点,感知 Broker 增减变化; -
监听
/brokers/topics
,感知 topic 以及对应的 partition 的增减变化; -
监听
/admin/delete_topic
节点,处理删除 topic 的动作; -
另外,Controller 还需要负责将元数据推送给其他 Broker。
-
3 Leader Partition 选举机制
3.1 Leader Partition 的状态是如何记录的?
-
在 Kafka 里,一个 Topic 下的所有消息会分散存储在不同 Partition 中。使用
kafka - topics.sh
脚本创建 Topic 时,可通过--partitions
参数指定 Topic 包含的 Partition 数量,还能通过--replication - factors
参数指定每个 Partition 的备份数量; -
在一个 Partition 的众多备份里,要选举出一个 Leader Partition,它负责对接所有客户端请求,先保存消息,之后再通知其他 Follower Partition 同步消息;
-
先看几个相关基础概念
-
AR(Assigned Replicas):代表 Kafka 分区中的所有副本,不管存活与否;
-
ISR(In - Sync Replicas):指在所有 AR 中,服务正常且与 Leader 保持同步的 Follower 集合;
- 若 Follower 长时间(超时时间由
replica.lag.time.max.ms
参数设定,默认 30 秒)没向 Leader 发送通信请求,就会被踢出 ISR; - 老版本 Kafka 还会考虑 Partition 与 Leader Partition 之间同步的消息差值,大于
replica.lag.max.messages
条也会被移除 ISR,不过现在版本已移除该参数;
- 若 Follower 长时间(超时时间由
-
OSR(Out - of - Sync Replicas):是从 ISR 中踢出的节点,记录的是那些服务有问题、延迟过多的副本;
-
-
其中 AR 和 ISR 比较关键,可通过
kafka - topics.sh --describe
指令查看,结果里 AR 就是Replicas
列中的 Broker 集合;[root@192-168-65-112 kafka_2.13-3.8.0]# bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic disTopic [2024-08-12 15:42:57,462] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient) Topic: disTopic TopicId: CNrWfmEgSBqc9gLClemrXw PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: disTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 Elr: N/A LastKnownElr: N/ATopic: disTopic Partition: 1 Leader: 2 Replicas: 0,2 Isr: 2,0 Elr: N/A LastKnownElr: N/ATopic: disTopic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Elr: N/A LastKnownElr: N/A
-
而该指令中的所有信息,实际都记录在 Zookeeper 中,从 Zookeeper 节点(如
brokers/topics/disTopic/partitions
下各分区的state
节点)能看到包含leader
、isr
等信息的数据,比如有"leader":1, "isr":[0,1]
这样的内容,体现了 Leader Partition 以及 ISR 等状态信息在 Zookeeper 中的存储情况;
3.2 选举机制
-
Kafka 是如何在 Partition 中选举产生 Leader Partition的呢?下面来做一个实验:
# 1、创建 Topic:创建一个备份因子为 3、包含 4 个 Partition 的 Topic(secondTopic) [root@192-168-65-112 kafka_2.13-3.8.0]# bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 3 --partitions 4 --topic secondTopic Created topic secondTopic.# 2、查看初始 Partition 情况。可以看到,默认的 Leader 就是 Replicas 中的第一个 [root@192-168-65-112 kafka_2.13-3.8.0]# bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic [2024-08-12 16:50:33,594] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient) Topic: secondTopic TopicId: DNNw-hXqQCOW61shM7zZ2Q PartitionCount: 4 ReplicationFactor: 3 Configs: Topic: secondTopic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Elr: N/A LastKnownElr: N/A# 3、在worker3上停掉kafka服务 [root@192-168-65-193 kafka_2.13-3.8.0]# bin/kafka-server-stop.sh# 4、再次执行描述 Topic 的指令,观察到 Leader 依然是 Replicas 中第一个存活的 Broker [root@192-168-65-112 kafka_2.13-3.8.0]# bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic [2024-08-12 16:52:51,510] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient) Topic: secondTopic TopicId: DNNw-hXqQCOW61shM7zZ2Q PartitionCount: 4 ReplicationFactor: 3 Configs: Topic: secondTopic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,2 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 1 Leader: 2 Replicas: 0,2,1 Isr: 2,1 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,2 Elr: N/A LastKnownElr: N/A
-
从上面的实验中可以看到:
-
当 BrokerId=0 的 kafka 服务停止后,0 号 BrokerId 从所有 Partition 的 ISR 列表中剔除。原本 Partition 1 的 Leader 节点是 Broker 0,在 Broker 0 服务停止后,重新进行了 Leader 选举。Partition 1 优先评估
Replicas
列表中 Broker 0 后面的 Broker 2,由于 Broker 2 在 ISR 列表中,所以最终选举 Broker 2 成为 Leader; -
由此得出 Kafka 选举 Leader Partition 的机制:选举时按照 AR(Assigned Replicas,分区的所有副本)中的排名顺序,靠前的优先选举。只要当前 Partition 在 ISR(In - Sync Replicas,与 Leader 保持同步的 Follower 集合)列表中(即存活),该节点就会被选举成为 Leader Partition;
-
当 Partition 选举完成后,Zookeeper 中的信息会及时更新,使得选举结果能在集群所有 Broker 中达成共识,例如 Zookeeper 上的
/brokers/topics/secondTopic
节点会记录相关的分区、副本等信息;# Zookeeper上的/brokers/topics/secondTopic {"partitions":{"0":[1,0,2],"1":[0,2,1],"2":[2,1,0],"3":[1,2,0]},"topic_id":"DNNw-hXqQCOW61shM7zZ2Q","adding_replicas":{},"removing_replicas":{},"version":3}
-
3.3 Leader Partition 自动平衡机制
-
Leader Partition 选举机制能保证每个 Partition 同一时刻只有一个 Leader Partition,但仅分配好 Leader Partition 还不够;
-
在一组 Partition 中,Leader Partition 需负责与客户端的数据交互以及向 Follower 同步数据,默认情况下 Kafka 会尽量将 Leader Partition 分配到不同 Broker 节点,以保证集群性能压力平均;
-
不过经过 Leader Partition 选举后,这种平衡可能被打破,导致 Leader Partition 过多集中在同一个 Broker 上,使该 Broker 压力远高于其他 Broker,影响集群整体性能;
经过 Leader Partition 选举后,Leader Partition 过多集中在同一个 Broker 上,主要有以下几方面原因:
-
初始分配与故障恢复逻辑:Kafka 在初始为 Partition 分配 Leader 时,通常会按照一定策略(比如优先选择 AR(Assigned Replicas,分区的所有副本)中靠前的副本作为 Leader)进行分配,以保证集群性能均衡。但当集群中发生 Broker 故障时,故障 Broker 上的 Leader Partition 会重新选举。在选举过程中,会从 ISR(In - Sync Replicas,与 Leader 保持同步的 Follower 集合)中选择合适的副本成为新的 Leader。如果某个 Broker 比较**“健壮”**,在多次故障恢复的选举中,容易被选中成为多个 Partition 的 Leader,从而导致 Leader 过多集中在该 Broker 上;
-
副本同步与可用性因素。ISR 中的副本是与 Leader 保持同步的副本,在选举 Leader 时,会优先从 ISR 中选择。如果某个 Broker 上的副本总是能快速与 Leader 同步,保持在 ISR 中,那么在其他 Broker 出现故障时,该 Broker 上的副本就更有可能被选举为新的 Leader,长此以往,会使得该 Broker 承载的 Leader Partition 数量增多;
-
集群负载与资源分布。集群中不同 Broker 的负载和资源(如 CPU、内存、磁盘 I/O 等)分布可能存在差异。某些 Broker 资源更充足、性能更好,在处理客户端请求和同步数据时更高效,这会让其在 Leader 选举中更具优势,进而吸引更多的 Leader Partition 集中到该 Broker 上;
-
-
-
Kafka 设计了 Leader Partition 自动平衡机制,当发现 Leader 分配不均衡时,会自动进行调整;
- Kafka 进行 Leader Partition 自动平衡的逻辑是:认为 AR(Assigned Replicas,分区的所有副本)中的第一个节点应该是 Leader 节点,这种选举结果称为 preferred election(理想选举结果);
- Controller 会定期检测集群的 Partition 平衡情况,检测时会依次检查所有 Broker,当发现某个 Broker 上不平衡的 Partition 比例高于
leader.imbalance.per.broker.percentage
阈值时,就会触发一次 Leader Partition 的自动平衡;
-
这个机制涉及到 Broker 中
server.properties
配置文件中的几个重要参数:# 自动平衡开关,默认true # 开启后,后台线程会按 leader.imbalance.check.interval.seconds 配置的时间间隔检查 Partition 领导者的分布情况,若领导者不均衡程度超过 leader.imbalance.per.broker.percentage,就会触发向首选领导者的再平衡。 auto.leader.rebalance.enable Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered. Type: boolean Default: true Valid Values: Importance: high Update Mode: read-only # 自动平衡扫描间隔,Controller 触发分区再平衡检查的频率,默认 300 秒 leader.imbalance.check.interval.seconds The frequency with which the partition rebalance check is triggered by the controller Type: long Default: 300 Valid Values: [1,...] Importance: high Update Mode: read-only # 自动平衡触发比例 # 每个 Broker 允许的领导者不均衡比例,超过该值 Controller 会触发领导者再平衡,默认 10% leader.imbalance.per.broker.percentage The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.Type: int Default: 10 Valid Values: Importance: high Update Mode: read-only
- 这些参数可在 broker 的
server.properties
文件中修改,且需修改集群中所有 broker 的文件并重启 Kafka 服务才能生效;
- 这些参数可在 broker 的
-
也可通过手动调用
kafka-leader-election.sh
脚本触发一次自动平衡,例如:# 启动worker3上的Kafka服务,Broker上线 # secondTopic的partion1不是理想状态(理想 Leader 应为 Replicas 中的 0,此时 0 已在 ISR 列表中) [root@192-168-65-112 kafka_2.13-3.8.0]# bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic [2024-08-12 17:16:48,966] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient) Topic: secondTopic TopicId: DNNw-hXqQCOW61shM7zZ2Q PartitionCount: 4 ReplicationFactor: 3 Configs: Topic: secondTopic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,2,0 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 1 Leader: 2 Replicas: 0,2,1 Isr: 2,1,0 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Elr: N/A LastKnownElr: N/A# 手动触发所有Topic的Leader Partition自平衡 [root@192-168-65-112 kafka_2.13-3.8.0]# bin/kafka-leader-election.sh --bootstrap-server worker1:9092 --election-type preferred --topic secondTopic --partition 1 Valid replica already elected for partitions secondTopic-1# 自平衡后secondTopic的partition2就变成理想状态了 [root@192-168-65-112 kafka_2.13-3.8.0]# bin/kafka-topics.sh -bootstrap-server worker1:9092 --describe --topic secondTopic [2024-08-12 17:18:50,015] WARN [AdminClient clientId=adminclient-1] The DescribeTopicPartitions API is not supported, using Metadata API to describe topics. (org.apache.kafka.clients.admin.KafkaAdminClient) Topic: secondTopic TopicId: DNNw-hXqQCOW61shM7zZ2Q PartitionCount: 4 ReplicationFactor: 3 Configs: Topic: secondTopic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,2,0 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 2,1,0 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Elr: N/A LastKnownElr: N/A Topic: secondTopic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Elr: N/A LastKnownElr: N/A
-
实际应用注意点:Leader Partition 自动平衡过程涉及大量消息的转移与同步,是很重的操作。所以在很多对性能要求高的线上环境,会选择将
auto.leader.rebalance.enable
参数设置为false
,关闭 Kafka 的 Leader Partition 自动平衡操作,而采用其他运维方式,在业务不繁忙的时间段手动进行 Leader Partition 自动平衡,尽量减少自动平衡过程对业务的影响。
4 Kafka 的 Partition 故障恢复机制
-
为保证消息在多个 Partition 中保持数据同步,Kafka 内部记录了两个关键数据:
-
LEO(Log End Offset,日志末端偏移量):每个 Partition 的最后一个 Offset。Leader Partition 收到并记录生产者发送的一条消息,就将 LEO 加 1,Follower Partition 从 Leader Partition 同步消息,每同步到一个消息,自己的 LEO 就加 1,通过 LEO 值能知晓各个 Follower Partition 与 Leader Partition 之间的消息差距;
-
HW(High Watermark,高水位):一组 Partition 中最小的 LEO。Follower Partition 每次往 Leader Partition 同步消息时,都会同步自己的 LEO 给 Leader Partition,Leader Partition 据此计算出 HW 值,并同步给各个 Follower Partition。Leader Partition 认为 HW 值以前的消息在所有 Follower Partition 之间完成了同步,是安全的,可被消费者拉取;HW 值之后的消息是不安全的,可能丢失,若被消费者拉取消费,可能造成数据不一致;
-
-
当 Follower 发生故障时,处理相对简单:
-
将故障的 Follower 节点临时提出 ISR(In - Sync Replicas,与 Leader 保持同步的 Follower 集合)集合,其他 Leader 和 Follower 继续正常接收消息;
-
故障的 Follower 节点恢复后,不会立即加入 ISR 集合。该 Follower 节点会读取本地记录的上一次的 HW,将自己的日志中高于 HW 的部分信息全部删除,然后从 HW 开始,向 Leader 进行消息同步;
-
等到该 Follower 的 LEO 大于等于整个 Partition 的 HW 后,就重新加入到 ISR 集合中,此时该 Follower 的消息进度追上了 Leader;
-
-
当 Leader 节点出现故障时,处理相对复杂:
-
Leader 发生故障,会从 ISR 中进行选举,将一个原本是 Follower 的 Partition 提升为新的 Leader。由于消息可能没有完成同步,新的 Leader 的 LEO 会低于之前 Leader 的 LEO;
-
Kafka 中的消息只能以 Leader 中的备份为准,其他 Follower 会将各自的 Log 文件中高于 HW 的部分全部清理掉,然后从新的 Leader 中同步数据;
-
旧的 Leader 恢复后,将作为 Follower 节点,进行数据恢复;
-
-
在 Leader 故障恢复过程中,Kafka 注重保护多个副本之间的数据一致性,但消息的安全性得不到保障,可能会有消息丢失(如上图中原本 Partition0 中的 4、5、6、7 号消息丢失);
- 这表明在服务极端不稳定的极端情况下,Kafka 为了保证高性能,牺牲了数据安全性,并没有保证消息绝对安全(RocketMQ 在这方面做了改善,优先保证数据安全);
- 若要提升消息的安全性,基本思路是服务端处理不了,就交给客户端自己处理,例如将 Producer 的 ACKs 参数设置成 all 或者 1,然后 Producer 根据每次发消息的返回值,自行进行消息确认或者重复投递;
-
机制中有一个重要前提是各个 Broker 中记录的 HW 是一致的,但 HW 和 LEO 同样是分布式的值,关于如何保证 HW 在多个 Broker 中是一致的,是需要思考的问题。
5 HW一致性保障-Epoch更新机制
-
有了 HW 机制后,各个 Partition 的数据能较好保持统一,但实际中,HW 值在一组 Partition 里并非总是一致;
- Leader Partition 计算 HW 值需要保留所有 Follower Partition 的 LEO 值。Follower Partition 需先从 Leader Partition 拉取消息到本地,才能向 Leader Partition 上报 LEO 值。所有 Follower Partition 上报后,Leader Partition 才能更新 HW 值,之后 Follower Partition 在下次拉取消息时,才能更新自身 HW 值。这导致 Leader Partition 上更新 HW 值的时刻与 Follower Partition 上更新 HW 值的时刻存在延迟;
- 若有多个 Follower Partition,这些 Partition 保存的 HW 值会不统一。正常情况下,最终 Leader Partition 会正常推进 HW,保证 HW 最终一致性,但当 Leader Partition 出现切换,所有 Follower Partition 都按自己的 HW 进行数据恢复时,就会出现数据不一致的情况;
Step1:Follower 拉取消息并上报 LEO。Broker1(Follower)从 Broker0(Leader)拉取 4 号消息,然后向 Leader 上报自己当前的 LEO为 4。此时,Follower 还未完成对该消息的保存,只是告知 Leader 自己当前的日志偏移情况;
Step2:Follower 保存消息并更新 LEO。Broker1 成功保存从 Leader 拉取的 4 号消息,随后将自己的 LEO 从 4 更新为 5。这表明 Follower 已经将消息写入本地日志,日志的末端偏移量前进了;
Step3:Follower 上报新 LEO 并拉取 Leader 的 HW。Broker1 向 Leader 上报自己更新后的 LEO 为 5,同时从 Leader 拉取当前的 HW 值,此时拉取到的 HW 为 4;
Step4:Leader 推进 HW。Leader(Broker0)综合所有 Follower 上报的 LEO 情况(这里只有 Broker1 这一个 Follower,且其 LEO 为 5),将 HW 推进到 5。因为 HW 是一组 Partition 中最小的 LEO,此时 Leader 自身的 LEO 大于等于 5,Follower 的 LEO 也达到 5,所以 HW 可以推进到 5;
最终交由 Leader 推进 HW。如果 Leader 故障发生切换,HW 就会不一致。因为 HW 的推进由 Leader 主导,若 Leader 故障切换,新的 Leader 可能基于不同的 Follower 同步状态来处理 HW,容易导致 HW 出现不一致的情况。
-
为保证 HW 一致性,Kafka 设计了 Epoch 机制:
-
Epoch 的定义:Epoch 是单调递增的版本号,每当 Leader Partition 发生变更时,该版本号就会更新。当有多个 Epoch 时,只有最新的 Epoch 是有效的,其他 Epoch 对应的 Leader Partition 是过期、无用的;
-
Leader Partition 上任的 Epoch 记录:每个 Leader Partition 在上任之初,都会新增一个新的 Epoch 记录。该记录包含更新后的 epoch 版本号,以及当前 Leader Partition 写入的第一个消息的偏移量(例如 (1,100),表示 epoch 版本号是 1,当前 Leader Partition 写入的第一条消息是 100)。Broker 会将这个 epoch 数据保存到内存中,并且持久化到本地一个
leader-epoch-checkpoint
文件当中; -
Epoch 记录的同步:
leader-epoch-checkpoint
会在所有 Follower Partition 中同步。当 Leader Partition 有变更时,新的 Leader Partition 会读取这个 Epoch 记录,更新后添加自己的 Epoch 记录; -
Follower Partition 数据更新依据:其他 Follower Partition 要更新数据时,不再依靠自己记录的 HW 值判断拉取消息的起点,而是根据最新的 epoch 条目来判断;
-
-
leader-epoch-checkpoint
文件-
这个关键文件保存在 Broker 上每个 partition 对应的本地目录中,是一个文本文件,可直接查看。其内容大致为:
[root@192-168-65-193 secondTopic-1]# pwd /app/kafka/logs/secondTopic-1 [root@192-168-65-193 secondTopic-1]# cat leader-epoch-checkpoint 0 1 2 0
- 第一行是版本号,第二行表示下面的记录数,这两行数据实际意义不大;
- 从第三行开始,能看到两个数字,即 epoch 和 offset,epoch 表示 leader 的 epoch 版本(从 0 开始,leader 变更一次 epoch 就会 +1),offset 则对应该 epoch 版本的 leader 写入第一条消息的 offset,可理解为用户可以消费到的最早的消息 offset;
-
例如之前创建的 secondTopic,partition1 经过了两次 Leader 切换,epoch 更新为 2,由于还没有写入消息,所以切换时的 offset 是 0。
-