Flink 状态和 CheckPoint 的区别和联系(附源码)
Flink 状态和checkpoint的区别和联系(附源码
- 1. 本质区别:运行时 vs 持久化
-
- 1.1 State(状态):运行时的"工作内存"
- 1.2 Checkpoint:状态的"快照存档"
- 2. 形象类比
- 3. 源码层面的关系
-
- 3.1 CheckpointableKeyedStateBackend:连接两者的桥梁
- 3.2 StateHandle:Checkpoint 的元数据
- 3.3 HeapKeyedStateBackend:实际的实现
- 4. 完整的生命周期
-
- 4.1 正常运行时
- 4.2 Checkpoint 触发时
- 4.3 故障恢复时
- 5 关键区别对比表
- 6. 源码中的协作机制
-
- 6.1 Checkpoint 选项配置
- 6.2不同类型的 Checkpoint
- 7. 实战示例:完整流程
- 8. 核心联系总结
-
- 8.1 依赖关系
- 8.2 协作关系
- 8.3 性能权衡
- 8.4 统一抽象
- 9. 关键要点
1. 本质区别:运行时 vs 持久化
1.1 State(状态):运行时的"工作内存"
package org.apache.flink.api.common.state;import org.apache.flink.annotation.PublicEvolving;/*** Interface that different types of partitioned state must implement.** <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is* automatically supplied by the system, so the function always sees the value mapped to the key of* the current element. That way, the system can handle stream and state partitioning consistently* together.*/
@PublicEvolving
public interface State {/** Removes the value mapped under the current key. */void clear();
}
State 的特征:
- 位置:存储在 TaskManager 的内存或本地磁盘(RocksDB)
- 目的:算子处理数据时的"工作记忆"
- 访问:频繁、实时、微秒级
- 生命周期:作业运行期间一直存在
- 可变性:每处理一条数据可能就会更新
1.2 Checkpoint:状态的"快照存档"
@Internal
public interface Snapshotable<S extends StateObject> {/*** Operation that writes a snapshot into a stream that is provided by the given {@link* CheckpointStreamFactory} and returns a @{@link RunnableFuture} that gives a state handle to* the snapshot. It is up to the implementation if the operation is performed synchronous or* asynchronous. In the later case, the returned Runnable must be executed first before* obtaining the handle.** @param checkpointId The ID of the checkpoint.* @param timestamp The timestamp of the checkpoint.* @param streamFactory The factory that we can use for writing our state to streams.* @param checkpointOptions Options for how to perform this checkpoint.* @return A runnable future that will yield a {@link StateObject}.*/@NonnullRunnableFuture<S> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory streamFactory,@Nonnull CheckpointOptions checkpointOptions)throws Exception;
}
Checkpoint 的特征:
- 位置:持久化存储(HDFS、S3、OSS 等)
- 目的:容错恢复的"存档点"
- 访问:低频、定期(如每分钟)
- 生命周期:独立于作业运行,故障恢复时使用
- 不可变性:一旦完成就不再改变
2. 形象类比
State = 你正在编辑的 Word 文档(内存中)↓ 每隔一段时间
Checkpoint = 保存到磁盘的文档副本(硬盘上)↓ 如果程序崩溃
Recovery = 从最近的保存恢复(重新加载到内存)
3. 源码层面的关系
3.1 CheckpointableKeyedStateBackend:连接两者的桥梁
/*** Interface that combines both, the {@link KeyedStateBackend} interface, which encapsulates methods* responsible for keyed state management and the {@link Snapshotable} which tells the system how to* snapshot the underlying state.** <p><b>NOTE:</b> State backends that need to be notified of completed checkpoints can additionally* implement the {@link CheckpointListener} interface.** @param <K> Type of the key by which state is keyed.*/
public interface CheckpointableKeyedStateBackend<K>extends KeyedStateBackend<K>, Snapshotable<SnapshotResult<KeyedStateHandle>>, Closeable {/** Returns the key groups which this state backend is responsible for. */KeyGroupRange getKeyGroupRange();/*** Returns a {@link SavepointResources} that can be used by {@link SavepointSnapshotStrategy} to* write out a savepoint in the common/unified format.*/@NonnullSavepointResources<K> savepoint() throws Exception;
}
设计理念:
- KeyedStateBackend:管理运行时状态的读写
- Snapshotable:提供状态快照能力
- CheckpointableKeyedStateBackend:同时具备两种能力
3.2 StateHandle:Checkpoint 的元数据
/*** Base for the handles of the checkpointed states in keyed streams. When recovering from failures,* the handle will be passed to all tasks whose key group ranges overlap with it.*/
public interface KeyedStateHandle extends CompositeStateHandle {/** Returns the range of the key groups contained in the state. */KeyGroupRange getKeyGroupRange();/*** Returns a state over a range that is the intersection between this handle's key-group range* and the provided key-group range.** @param keyGroupRange The key group range to intersect with, will return null if the* intersection of this handle's key-group and the provided key-group is empty.*/@NullableKeyedStateHandle getIntersection(KeyGroupRange keyGroupRange);/*** Returns a unique state handle id to distinguish with other keyed state handles.