Kafka Controller 元数据解析与故障恢复实战指南
#作者:张桐瑞
文章目录
- 1 生产案例:Controller 选举在故障恢复中的关键作用
- 1.1 问题背景
- 1.2 核心操作原理:
- 2 Controller 元数据全景:从 ZooKeeper 到内存的数据镜像
- 2.1元数据核心载体:ControllerContext 类
- 2.2核心元数据深度解析
1 生产案例:Controller 选举在故障恢复中的关键作用
1.1 问题背景
某 Kafka 集群部分核心主题分区一直处于“不可用”状态,通过kafka-toics.sh命令查看,发现分区leader一直处于Leader=-1的 “不可用” 状态。尝试重启旧 Leader 所在 Broker 无效,并且由于是生产环境,不能通过重启整个集群来随意重启,这毕竟是一个非常缺乏计划性的事情。
如何在避免重启集群的情况下,干掉已有Controller并执行新的Controller选举呢?答案就在源码中的ControllerZNode.path上,也就是ZooKeeper的/controller节点。倘若我们手动删除/controller节点,Kafka集群就会触发Controller选举。于是,我们马上实施这个方案,效果出奇得好:之前的受损分区全部恢复正常,业务数据得以正常生产和消费。
1.2 核心操作原理:
Controller 选举后会触发集群元数据全量同步(“重刷” 分区状态),原理如下:
- ZooKeeper 的/controller节点存储当前 Controller 信息,删除该节点会触发集群重新选举 Controller
- 新 Controller 上任后通过UpdateMetadataRequest向所有 Broker 同步最新元数据,修复分区状态
注意事项:此操作需谨慎,生产环境建议通过 API 或工具触发元数据更新,避免直接操作 ZooKeeper 节点。
2 Controller 元数据全景:从 ZooKeeper 到内存的数据镜像
Kafka Controller 作为集群元数据的 “真理之源副本”,其核心作用是:
- 缓存 ZooKeeper 元数据,避免 Broker 直接与 ZooKeeper 交互
- 未来将替代 ZooKeeper 成为唯一元数据中心(社区规划)
2.1元数据核心载体:ControllerContext 类
类定义路径:
core/src/main/scala/kafka/controller/ControllerContext.scala
数据结构概览:
该类封装 17 项元数据,以下是核心字段解析(按重要性排序):
2.2核心元数据深度解析
2.2.1 ControllerStats
private[controller] class ControllerStats extends KafkaMetricsGroup {// 统计每秒发生的Unclean Leader选举次数val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)// Controller事件通用的统计速率指标的方法val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>state.rateAndTimeMetricName.map { metricName =>state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))}}.toMap
}
其中,前者是计算Controller每秒执行的Unclean Leader选举数量,通常情况下,执行Unclean Leader选举可能造成数据丢失,一般不建议开启它。一旦开启,你就需要时刻关注这个监控指标的值,确保Unclean Leader选举的速率维持在一个很低的水平,否则会出现很多数据丢失的情况。
后者是统计所有Controller状态的速率和时间信息,单位是毫秒。当前,Controller定义了很多事件,比如,TopicDeletion是执行主题删除的Controller事件、ControllerChange是执行Controller重选举的事件。ControllerStats的这个指标通过在每个事件名后拼接字符串RateAndTimeMs的方式,为每类Controller事件都创建了对应的速率监控指标。
2.2.2 offlinePartitionCount
该字段统计集群中所有离线或处于不可用状态的主题分区数量。所谓的不可用状态,就是我最开始举的例子中“Leader=-1”的情况。
ControllerContext中的updatePartitionStateMetrics方法根据给定主题分区的当前状态和目标状态,来判断该分区是否是离线状态的分区。如果是,则累加offlinePartitionCount字段的值,否则递减该值。方法代码如下:
// 更新offlinePartitionCount元数据
private def updatePartitionStateMetrics(partition: TopicPartition, currentState: PartitionState,targetState: PartitionState): Unit = {// 如果该主题当前并未处于删除中状态if (!isTopicDeletionInProgress(partition.topic)) {// targetState表示该分区要变更到的状态// 如果当前状态不是OfflinePartition,即离线状态并且目标状态是离线状态// 这个if语句判断是否要将该主题分区状态转换到离线状态if (currentState != OfflinePartition && targetState == OfflinePartition) {offlinePartitionCount = offlinePartitionCount + 1// 如果当前状态已经是离线状态,但targetState不是// 这个else if语句判断是否要将该主题分区状态转换到非离线状态} else if (currentState == OfflinePartition && targetState != OfflinePartition) {offlinePartitionCount = offlinePartitionCount - 1}}
}
该方法首先要判断,此分区所属的主题当前是否处于删除操作的过程中。如果是的话,Kafka就不能修改这个分区的状态,那么代码什么都不做,直接返回。否则,代码会判断该分区是否要转换到离线状态。如果targetState是OfflinePartition,那么就将offlinePartitionCount值加1,毕竟多了一个离线状态的分区。相反地,如果currentState是offlinePartition,而targetState反而不是,那么就将offlinePartitionCount值减1。
2.2.3 shuttingDownBrokerIds
顾名思义,该字段保存所有正在关闭中的Broker ID列表。当Controller在管理集群Broker时,它要依靠这个字段来甄别Broker当前是否已关闭,因为处于关闭状态的Broker是不适合执行某些操作的,如分区重分配(Reassignment)以及主题删除等。
另外,Kafka必须要为这些关闭中的Broker执行很多清扫工作,Controller定义了一个onBrokerFailure方法,它就是用来做这个的。代码如下:
private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {info(s"Broker failure callback for ${deadBrokers.mkString(",")}")// deadBrokers:给定的一组已终止运行的Broker Id列表// 更新Controller元数据信息,将给定Broker从元数据的replicasOnOfflineDirs中移除deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)// 找出这些Broker上的所有副本对象val deadBrokersThatWereShuttingDown =deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))if (deadBrokersThatWereShuttingDown.nonEmpty)info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")// 执行副本清扫工作val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)onReplicasBecomeOffline(allReplicasOnDeadBrokers)// 取消这些Broker上注册的ZooKeeper监听器unregisterBrokerModificationsHandler(deadBrokers)
}
该方法接收一组已终止运行的Broker ID列表,首先是更新Controller元数据信息,将给定Broker从元数据的replicasOnOfflineDirs和shuttingDownBrokerIds中移除,然后为这组Broker执行必要的副本清扫工作,也就是onReplicasBecomeOffline方法做的事情。
该方法主要依赖于分区状态机和副本状态机来完成对应的工作。在后面的课程中,我们会专门讨论副本状态机和分区状态机,这里你只要简单了解下它要做的事情就行了。后面等我们学完了这两个状态机之后,你可以再看下这个方法的具体实现原理。
这个方法的主要目的是把给定的副本标记成Offline状态,即不可用状态。具体分为以下这几个步骤:
- 利用分区状态机将给定副本所在的分区标记为Offline状态;
- 将集群上所有新分区和Offline分区状态变更为Online状态;
- 将相应的副本对象状态变更为Offline。
2.2.4 liveBrokers
该字段保存当前所有运行中的Broker对象。每个Broker对象就是一个的三元组。ControllerContext中定义了很多方法来管理该字段,如addLiveBrokersAndEpochs、removeLiveBrokers和updateBrokerMetadata等。我拿updateBrokerMetadata方法进行说明,以下是源码:
def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {liveBrokers -= oldMetadataliveBrokers += newMetadata}
每当新增或移除已有Broker时,ZooKeeper就会更新其保存的Broker数据,从而引发Controller修改元数据,也就是会调用updateBrokerMetadata方法来增减Broker列表中的对象。
2.2.5 liveBrokerEpochs
该字段保存所有运行中Broker的Epoch信息。Kafka使用Epoch数据防止Zombie Broker,即一个非常老的Broker被选举成为Controller。
另外,源码大多使用这个字段来获取所有运行中Broker的ID序号,如下面这个方法定义的那样:
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet – shuttingDownBrokerIds
liveBrokerEpochs的keySet方法返回Broker序号列表,然后从中移除关闭中的Broker序号,剩下的自然就是处于运行中的Broker序号列表了。
2.2.6 epoch & epochZkVersion
这两个字段一起说,因为它们都有“epoch”字眼,放在一起说,可以帮助你更好地理解两者的区别。epoch实际上就是ZooKeeper中/controller_epoch节点的值,你可以认为它就是Controller在整个Kafka集群的版本号,而epochZkVersion实际上是/controller_epoch节点的dataVersion值。
Kafka使用epochZkVersion来判断和防止Zombie Controller。这也就是说,原先在老Controller任期内的Controller操作在新Controller不能成功执行,因为新Controller的epochZkVersion要比老Controller的大。
另外,你可能会问:“这里的两个Epoch和上面的liveBrokerEpochs有啥区别呢?”实际上,这里的两个Epoch值都是属于Controller侧的数据,而liveBrokerEpochs是每个Broker自己的Epoch值。
2.2.7 allTopics
该字段保存集群上所有的主题名称。每当有主题的增减,Controller就要更新该字段的值。
比如Controller有个processTopicChange方法,从名字上来看,它就是处理主题变更的。我们来看下它的代码实现,我把主要逻辑以注释的方式标注了出来:
private def processTopicChange(): Unit = {if (!isActive) return // 如果Contorller已经关闭,直接返回val topics = zkClient.getAllTopicsInCluster(true) // 从ZooKeeper中获取当前所有主题列表val newTopics = topics -- controllerContext.allTopics // 找出当前元数据中不存在、ZooKeeper中存在的主题,视为新增主题val deletedTopics = controllerContext.allTopics -- topics // 找出当前元数据中存在、ZooKeeper中不存在的主题,视为已删除主题controllerContext.allTopics = topics // 更新Controller元数据// 为新增主题和已删除主题执行后续处理操作registerPartitionModificationsHandlers(newTopics.toSeq)val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)deletedTopics.foreach(controllerContext.removeTopic)addedPartitionReplicaAssignment.foreach {case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)}info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +s"[$addedPartitionReplicaAssignment]")if (addedPartitionReplicaAssignment.nonEmpty)onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)}
2.2.8 partitionAssignments
该字段保存所有主题分区的副本分配情况。在我看来,这是Controller最重要的元数据了。事实上,你可以从这个字段衍生、定义很多实用的方法,来帮助Kafka从各种维度获取数据。
比如,如果Kafka要获取某个Broker上的所有分区,那么,它可以这样定义:
partitionAssignments.flatMap {case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {case (_, partitionAssignment) => partitionAssignment.replicas.contains(brokerId)}.map {case (partition, _) => new TopicPartition(topic, partition)}}.toSet
再比如,如果Kafka要获取某个主题的所有分区对象,代码可以这样写:
partitionAssignments.getOrElse(topic, mutable.Map.empty).map {case (partition, _) => new TopicPartition(topic, partition)}.toSet
实际上,这两段代码分别是ControllerContext.scala中partitionsOnBroker方法和partitionsForTopic两个方法的主体实现代码。