Flink-Source算子状态恢复分析
背景
修改 source 算子
kafka_old_topic 消费任务运行一段时间后,暂停状态并保留。然后将 uid 和 topic 都改了,消费者 offset 会从 earliest 开始。
// before
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// after
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
新增 source 算子
但是只新增一个同样的 kafka-source-new 算子(old 保留,消费者 offset 却会从最近开始。
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// 新增
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
算子(链)子任务状态列表(operatorSubtaskStates)
针对第一种情况,job 的算子状态(localStates)有三个,分别对应xxx,
当给 Task【Source: kafka-source-new -> map-heart (org.apache.flink.streaming.runtime.tasks.SourceStreamTask) 】(被修改的 source)分配状态时,该 Task 的每个算子都会绑定一个状态(OperatorState):“kafka-source-new”、“map-heart”,只不过这两个 OperatorState 有点差异:
这两个算子状态的 operatorSubtaskStates (存储算子子任务的状态信息)集合一个为空,一个不为空。原因就是在分配 “kafka-source-new” 算子状态时,由于其不在 localState,于是走了默认的构造函数创建 OperatorState 对象:
其实关键点就在 operatorSubtaskStates 的封装。
TaskStateAssignment 任务状态分配
TaskStateAssignment 的构造方法有个核心参数 hasNonFinishedState。
如果当前 Task 的子任务状态列表(operatorSubtaskStates全集)不为空,该值就为 true。
一旦该值为 true,就会执行 assignTaskStateToExecutionJobVertices:
给当前 Task 的每个 subTask 赋值状态:
那么每个 subTask 都会有一份状态(JobManagerTaskRestore,绑定 checkpointId):
JobManagerTaskRestore(JM与TM状态交互中间站)
一个 Execution 就是一个 subTask:
Task 部署阶段(JM 向 TM 提交 Task 任务),TM 会根据 TaskDeploymentDescriptor 来恢复状态和创建算子(其中 taskRestore 就是 JobManagerTaskRestore,在 setInitialState 中赋值)。
TM 接收到提交任务请求时,解析出 taskRestore 创建任务状态管理器(TaskStateManager)
TaskStateManager(TM 的任务管理器)
算子子任务状态获取
prioritizedOperatorState:传入算子 ID,即可从 JobMangerTaskRestore 获取子任务状态。
- 如果 JobMangerTaskRestore 为 null,那么返回一个空的 PrioritizedOperatorSubtaskState(checkpoint设置为null)
- 如果不为 null,则会从 JobManagerTaskRestore 中根据算子ID封装 PrioritizedOperatorSubtaskState。
StateInitializationContext(UDF-算子状态初始化上下文)
KafkaConsumerBase 在初始化状态阶段,会调用context.isRestored()
判断是否从状态恢复:
算子状态句柄(StreamOperatorStateHandler)处理算子状态的初始化,该阶段会调用 UDFKafkaConsumerBase.initializeState
初始化算子的本地状态并且 checkpointId 就是在这里被写入状态上下文 StateInitializationContext(该上下文是可以被用户访问的)。
StreamOperatorStateContext(initializeState全局上下文)
以上得知 checkpointId 来自context.getRestoredCheckpointId
,那么 context (该上下文是不可以被用户访问的)从何而来?
算子状态初始化AbstractStreamOperator.initialState
,利用 StreamTaskStateInitializer
封装 StreamOperatorStateContext:
那么 checkpointId 的封装肯定在streamTaskStateManger.streamOperatorStateContext
中:
方法中通过 taskStateManger 封装算子状态,如果 prioritizedOperatorSubtaskState 为空对象,那么这里的 checkpointId 就为 null。
针对第二种情况,task 的算子状态(有多个算子,算子链)不存在 localOperators 中,则默认使用构造方法封装 OperatorState,每个OperatorState 的 operatorSubtaskStates 集合都为空。