当前位置: 首页 > news >正文

Kafka Controller 元数据解析与故障恢复实战指南

#作者:张桐瑞

文章目录

  • 1 生产案例:Controller 选举在故障恢复中的关键作用
    • 1.1 问题背景
    • 1.2 核心操作原理:
  • 2 Controller 元数据全景:从 ZooKeeper 到内存的数据镜像
    • 2.1元数据核心载体:ControllerContext 类
    • 2.2核心元数据深度解析

1 生产案例:Controller 选举在故障恢复中的关键作用

1.1 问题背景

某 Kafka 集群部分核心主题分区一直处于“不可用”状态,通过kafka-toics.sh命令查看,发现分区leader一直处于Leader=-1的 “不可用” 状态。尝试重启旧 Leader 所在 Broker 无效,并且由于是生产环境,不能通过重启整个集群来随意重启,这毕竟是一个非常缺乏计划性的事情。

如何在避免重启集群的情况下,干掉已有Controller并执行新的Controller选举呢?答案就在源码中的ControllerZNode.path上,也就是ZooKeeper的/controller节点。倘若我们手动删除/controller节点,Kafka集群就会触发Controller选举。于是,我们马上实施这个方案,效果出奇得好:之前的受损分区全部恢复正常,业务数据得以正常生产和消费。

1.2 核心操作原理:

Controller 选举后会触发集群元数据全量同步(“重刷” 分区状态),原理如下:

  1. ZooKeeper 的/controller节点存储当前 Controller 信息,删除该节点会触发集群重新选举 Controller
  2. 新 Controller 上任后通过UpdateMetadataRequest向所有 Broker 同步最新元数据,修复分区状态
    注意事项:此操作需谨慎,生产环境建议通过 API 或工具触发元数据更新,避免直接操作 ZooKeeper 节点。

2 Controller 元数据全景:从 ZooKeeper 到内存的数据镜像

Kafka Controller 作为集群元数据的 “真理之源副本”,其核心作用是:

  • 缓存 ZooKeeper 元数据,避免 Broker 直接与 ZooKeeper 交互
  • 未来将替代 ZooKeeper 成为唯一元数据中心(社区规划)

2.1元数据核心载体:ControllerContext 类

类定义路径:
core/src/main/scala/kafka/controller/ControllerContext.scala
数据结构概览:
该类封装 17 项元数据,以下是核心字段解析(按重要性排序):
在这里插入图片描述

2.2核心元数据深度解析

2.2.1 ControllerStats

private[controller] class ControllerStats extends KafkaMetricsGroup {// 统计每秒发生的Unclean Leader选举次数val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)// Controller事件通用的统计速率指标的方法val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>state.rateAndTimeMetricName.map { metricName =>state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))}}.toMap
}

其中,前者是计算Controller每秒执行的Unclean Leader选举数量,通常情况下,执行Unclean Leader选举可能造成数据丢失,一般不建议开启它。一旦开启,你就需要时刻关注这个监控指标的值,确保Unclean Leader选举的速率维持在一个很低的水平,否则会出现很多数据丢失的情况。

后者是统计所有Controller状态的速率和时间信息,单位是毫秒。当前,Controller定义了很多事件,比如,TopicDeletion是执行主题删除的Controller事件、ControllerChange是执行Controller重选举的事件。ControllerStats的这个指标通过在每个事件名后拼接字符串RateAndTimeMs的方式,为每类Controller事件都创建了对应的速率监控指标。

2.2.2 offlinePartitionCount
该字段统计集群中所有离线或处于不可用状态的主题分区数量。所谓的不可用状态,就是我最开始举的例子中“Leader=-1”的情况。

ControllerContext中的updatePartitionStateMetrics方法根据给定主题分区的当前状态和目标状态,来判断该分区是否是离线状态的分区。如果是,则累加offlinePartitionCount字段的值,否则递减该值。方法代码如下:

// 更新offlinePartitionCount元数据
private def updatePartitionStateMetrics(partition: TopicPartition, currentState: PartitionState,targetState: PartitionState): Unit = {// 如果该主题当前并未处于删除中状态if (!isTopicDeletionInProgress(partition.topic)) {// targetState表示该分区要变更到的状态// 如果当前状态不是OfflinePartition,即离线状态并且目标状态是离线状态// 这个if语句判断是否要将该主题分区状态转换到离线状态if (currentState != OfflinePartition && targetState == OfflinePartition) {offlinePartitionCount = offlinePartitionCount + 1// 如果当前状态已经是离线状态,但targetState不是// 这个else if语句判断是否要将该主题分区状态转换到非离线状态} else if (currentState == OfflinePartition && targetState != OfflinePartition) {offlinePartitionCount = offlinePartitionCount - 1}}
}

该方法首先要判断,此分区所属的主题当前是否处于删除操作的过程中。如果是的话,Kafka就不能修改这个分区的状态,那么代码什么都不做,直接返回。否则,代码会判断该分区是否要转换到离线状态。如果targetState是OfflinePartition,那么就将offlinePartitionCount值加1,毕竟多了一个离线状态的分区。相反地,如果currentState是offlinePartition,而targetState反而不是,那么就将offlinePartitionCount值减1。

2.2.3 shuttingDownBrokerIds
顾名思义,该字段保存所有正在关闭中的Broker ID列表。当Controller在管理集群Broker时,它要依靠这个字段来甄别Broker当前是否已关闭,因为处于关闭状态的Broker是不适合执行某些操作的,如分区重分配(Reassignment)以及主题删除等。
另外,Kafka必须要为这些关闭中的Broker执行很多清扫工作,Controller定义了一个onBrokerFailure方法,它就是用来做这个的。代码如下:

private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {info(s"Broker failure callback for ${deadBrokers.mkString(",")}")// deadBrokers:给定的一组已终止运行的Broker Id列表// 更新Controller元数据信息,将给定Broker从元数据的replicasOnOfflineDirs中移除deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)// 找出这些Broker上的所有副本对象val deadBrokersThatWereShuttingDown =deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))if (deadBrokersThatWereShuttingDown.nonEmpty)info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")// 执行副本清扫工作val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)onReplicasBecomeOffline(allReplicasOnDeadBrokers)// 取消这些Broker上注册的ZooKeeper监听器unregisterBrokerModificationsHandler(deadBrokers)
}

该方法接收一组已终止运行的Broker ID列表,首先是更新Controller元数据信息,将给定Broker从元数据的replicasOnOfflineDirs和shuttingDownBrokerIds中移除,然后为这组Broker执行必要的副本清扫工作,也就是onReplicasBecomeOffline方法做的事情。

该方法主要依赖于分区状态机和副本状态机来完成对应的工作。在后面的课程中,我们会专门讨论副本状态机和分区状态机,这里你只要简单了解下它要做的事情就行了。后面等我们学完了这两个状态机之后,你可以再看下这个方法的具体实现原理。
这个方法的主要目的是把给定的副本标记成Offline状态,即不可用状态。具体分为以下这几个步骤:

  1. 利用分区状态机将给定副本所在的分区标记为Offline状态;
  2. 将集群上所有新分区和Offline分区状态变更为Online状态;
  3. 将相应的副本对象状态变更为Offline。

2.2.4 liveBrokers
该字段保存当前所有运行中的Broker对象。每个Broker对象就是一个的三元组。ControllerContext中定义了很多方法来管理该字段,如addLiveBrokersAndEpochs、removeLiveBrokers和updateBrokerMetadata等。我拿updateBrokerMetadata方法进行说明,以下是源码:

def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {liveBrokers -= oldMetadataliveBrokers += newMetadata}

每当新增或移除已有Broker时,ZooKeeper就会更新其保存的Broker数据,从而引发Controller修改元数据,也就是会调用updateBrokerMetadata方法来增减Broker列表中的对象。

2.2.5 liveBrokerEpochs
该字段保存所有运行中Broker的Epoch信息。Kafka使用Epoch数据防止Zombie Broker,即一个非常老的Broker被选举成为Controller。
另外,源码大多使用这个字段来获取所有运行中Broker的ID序号,如下面这个方法定义的那样:
def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet – shuttingDownBrokerIds

liveBrokerEpochs的keySet方法返回Broker序号列表,然后从中移除关闭中的Broker序号,剩下的自然就是处于运行中的Broker序号列表了。

2.2.6 epoch & epochZkVersion
这两个字段一起说,因为它们都有“epoch”字眼,放在一起说,可以帮助你更好地理解两者的区别。epoch实际上就是ZooKeeper中/controller_epoch节点的值,你可以认为它就是Controller在整个Kafka集群的版本号,而epochZkVersion实际上是/controller_epoch节点的dataVersion值。

Kafka使用epochZkVersion来判断和防止Zombie Controller。这也就是说,原先在老Controller任期内的Controller操作在新Controller不能成功执行,因为新Controller的epochZkVersion要比老Controller的大。
另外,你可能会问:“这里的两个Epoch和上面的liveBrokerEpochs有啥区别呢?”实际上,这里的两个Epoch值都是属于Controller侧的数据,而liveBrokerEpochs是每个Broker自己的Epoch值。

2.2.7 allTopics
该字段保存集群上所有的主题名称。每当有主题的增减,Controller就要更新该字段的值。

比如Controller有个processTopicChange方法,从名字上来看,它就是处理主题变更的。我们来看下它的代码实现,我把主要逻辑以注释的方式标注了出来:

private def processTopicChange(): Unit = {if (!isActive) return // 如果Contorller已经关闭,直接返回val topics = zkClient.getAllTopicsInCluster(true) // 从ZooKeeper中获取当前所有主题列表val newTopics = topics -- controllerContext.allTopics // 找出当前元数据中不存在、ZooKeeper中存在的主题,视为新增主题val deletedTopics = controllerContext.allTopics -- topics // 找出当前元数据中存在、ZooKeeper中不存在的主题,视为已删除主题controllerContext.allTopics = topics // 更新Controller元数据// 为新增主题和已删除主题执行后续处理操作registerPartitionModificationsHandlers(newTopics.toSeq)val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)deletedTopics.foreach(controllerContext.removeTopic)addedPartitionReplicaAssignment.foreach {case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)}info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +s"[$addedPartitionReplicaAssignment]")if (addedPartitionReplicaAssignment.nonEmpty)onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)}

2.2.8 partitionAssignments
该字段保存所有主题分区的副本分配情况。在我看来,这是Controller最重要的元数据了。事实上,你可以从这个字段衍生、定义很多实用的方法,来帮助Kafka从各种维度获取数据。

比如,如果Kafka要获取某个Broker上的所有分区,那么,它可以这样定义:

partitionAssignments.flatMap {case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {case (_, partitionAssignment) => partitionAssignment.replicas.contains(brokerId)}.map {case (partition, _) => new TopicPartition(topic, partition)}}.toSet

再比如,如果Kafka要获取某个主题的所有分区对象,代码可以这样写:

partitionAssignments.getOrElse(topic, mutable.Map.empty).map {case (partition, _) => new TopicPartition(topic, partition)}.toSet

实际上,这两段代码分别是ControllerContext.scala中partitionsOnBroker方法和partitionsForTopic两个方法的主体实现代码。

http://www.dtcms.com/a/264997.html

相关文章:

  • UI前端大数据处理策略优化:基于云计算的数据存储与计算
  • leetcode:416.分割等和子集【01背包】【动态规划】
  • 光照解耦和重照明
  • 接口测试用例设计
  • Shader Graph学习——屏幕uv采样
  • 智能学号抽取系统 V3.7.5 —— 一个基于 Vue.js 的交互式网页应用
  • Arduino CH552 PWM的使用
  • 项目开发基本流程
  • 深入理解Unicode:字符编码的终极指南
  • RGB下的色彩变换:用线性代数解构色彩世界
  • vue3 JavaScript localeCompare 比较两个字符串 localeCompare is not a function
  • 如何将文件从 iPhone 传输到 Android(新指南)
  • Spring Boot 集成 GeoTools 详解
  • 昇腾机器节点磁盘状态检查与问题处理方法
  • 智能攻击原理和架构
  • 深入Flink核心概念:解锁大数据流处理的奥秘
  • vue-36(为组件编写单元测试:属性、事件和方法)
  • 【Linux】Rocky Linux 安装教程
  • vscode基本使用
  • armv8汇编码分析
  • QGIS+CesiumIon
  • 多模态进化论:GPT-5V图文推理能力在工业质检中的颠覆性应用
  • 媲美 GPT-4o,Kontext 实现高效文本驱动图像编辑
  • vscode、openocd 使用
  • Excel 如何让数据自动按要求排序或筛选?
  • Learning PostgresSQL读书笔记: 第16章 Configuration and Monitoring
  • PostgreSQL大表创建分区实战
  • Arduino CH552 ADC的使用
  • NumPy 或 PyTorch/TensorFlow 中的张量理解
  • Servlet开发流程(包含IntelliJ IDEA项目添加Tomcat依赖的详细教程)