当前位置: 首页 > news >正文

Flink-Source算子状态恢复分析

背景
修改 source 算子

kafka_old_topic 消费任务运行一段时间后,暂停状态并保留。然后将 uid 和 topic 都改了,消费者 offset 会从 earliest 开始。

// before
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// after
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
新增 source 算子

但是只新增一个同样的 kafka-source-new 算子(old 保留,消费者 offset 却会从最近开始。

FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// 新增
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
算子(链)子任务状态列表(operatorSubtaskStates)

针对第一种情况,job 的算子状态(localStates)有三个,分别对应xxx,

当给 Task【Source: kafka-source-new -> map-heart (org.apache.flink.streaming.runtime.tasks.SourceStreamTask) 】(被修改的 source)分配状态时,该 Task 的每个算子都会绑定一个状态(OperatorState):“kafka-source-new”、“map-heart”,只不过这两个 OperatorState 有点差异:

这两个算子状态的 operatorSubtaskStates (存储算子子任务的状态信息)集合一个为空,一个不为空。原因就是在分配 “kafka-source-new” 算子状态时,由于其不在 localState,于是走了默认的构造函数创建 OperatorState 对象:

其实关键点就在 operatorSubtaskStates 的封装。

TaskStateAssignment 任务状态分配

TaskStateAssignment 的构造方法有个核心参数 hasNonFinishedState。

如果当前 Task 的子任务状态列表(operatorSubtaskStates全集)不为空,该值就为 true。

一旦该值为 true,就会执行 assignTaskStateToExecutionJobVertices:

给当前 Task 的每个 subTask 赋值状态:

那么每个 subTask 都会有一份状态(JobManagerTaskRestore,绑定 checkpointId):

JobManagerTaskRestore(JM与TM状态交互中间站)

一个 Execution 就是一个 subTask:

Task 部署阶段(JM 向 TM 提交 Task 任务),TM 会根据 TaskDeploymentDescriptor 来恢复状态和创建算子(其中 taskRestore 就是 JobManagerTaskRestore,在 setInitialState 中赋值)。

TM 接收到提交任务请求时,解析出 taskRestore 创建任务状态管理器(TaskStateManager)

TaskStateManager(TM 的任务管理器)
算子子任务状态获取

prioritizedOperatorState:传入算子 ID,即可从 JobMangerTaskRestore 获取子任务状态。

  1. 如果 JobMangerTaskRestore 为 null,那么返回一个空的 PrioritizedOperatorSubtaskState(checkpoint设置为null

  1. 如果不为 null,则会从 JobManagerTaskRestore 中根据算子ID封装 PrioritizedOperatorSubtaskState。

StateInitializationContext(UDF-算子状态初始化上下文)

KafkaConsumerBase 在初始化状态阶段,会调用context.isRestored()判断是否从状态恢复:

算子状态句柄(StreamOperatorStateHandler)处理算子状态的初始化,该阶段会调用 UDFKafkaConsumerBase.initializeState初始化算子的本地状态并且 checkpointId 就是在这里被写入状态上下文 StateInitializationContext(该上下文是可以被用户访问的)。

StreamOperatorStateContext(initializeState全局上下文)

以上得知 checkpointId 来自context.getRestoredCheckpointId,那么 context (该上下文是不可以被用户访问的)从何而来?

算子状态初始化AbstractStreamOperator.initialState,利用 StreamTaskStateInitializer

封装 StreamOperatorStateContext:

那么 checkpointId 的封装肯定在streamTaskStateManger.streamOperatorStateContext中:

方法中通过 taskStateManger 封装算子状态,如果 prioritizedOperatorSubtaskState 为空对象,那么这里的 checkpointId 就为 null


针对第二种情况,task 的算子状态(有多个算子,算子链)不存在 localOperators 中,则默认使用构造方法封装 OperatorState,每个OperatorState 的 operatorSubtaskStates 集合都为空。

http://www.dtcms.com/a/267454.html

相关文章:

  • 机器视觉对位中的常见模型与技术原理
  • HTML网页应用打包Android App 完整实践指南
  • 【Project】基于kafka的高可用分布式日志监控与告警系统
  • openstack安装并初始化
  • 智能自主运动体的革命:当AI学会奔跑与协作 ——从单机定位到群体智能的跨越
  • 2025年的前后端一体化CMS框架优选方案
  • 未来趋势:AI与量子计算对服务器安全的影响
  • 博弈论基础-笔记
  • RTX5可以在中断中调用的API
  • 08_容器化与微服务:构建弹性架构
  • Ubuntu 22.04 修改默认 Python 版本为 Python3 笔记
  • Hbase2.6.2集群部署(最新版)
  • spring-initializer
  • OneCode MQTT插件开发实战:基于Paho.Client的物联网通信解决方案
  • python使用fastmcp包编写mcp服务端(mcp server)
  • ServiceNow CAD项目实战详细解析
  • PPT文字精简与视觉化技巧
  • StarRocks × Tableau 连接器完整使用指南 | 高效数据分析从连接开始
  • Eureka和Nacos都可以作为注册中心,它们之间的区别
  • DIODON HP30 防水充气无人机:海上侦察的创新利器
  • 进阶篇:18-使用 Kaniko 在无 Docker Daemon 环境中构建镜像
  • 《数据维度的视觉重构:打造交互式高维数据可视化的黄金法则》
  • 告别 undefined is not a function:TypeScript 前端开发优势与实践指南
  • 缓存解决方案
  • vuedraggable在iframe中无法使用问题
  • MySQL基础和 表的‘CRUD’(基础版)
  • 基础数据结构第04天:单向链表(概念篇)
  • ubuntu手动编译VTK9.3 Generating qmltypes file 失败
  • 解决URL编码兼容性问题:空格转义与HTML实体解码实战
  • 基于企业私有数据实现智能问答