Flink状态和容错-基础篇
1. 概念
flink的状态和容错绕不开3个概念,state backends和checkpoint、savepoint。本文重心即搞清楚这3部分内容。
容错机制是基于在状态快照的一种恢复方式。但是状态和容错要分开来看。
- 什么是状态,为什么需要状态?
流计算和批计算在数据源上最大的区别是,流计算中的数据是无边界的,数据持续不断,而批计算中数据是有边界的,在计算时可以一次性将数据全部拿到。在流计算中无法拿到全部数据进行计算结果,因此需要将历史数据的处理结果记录下来,这个就是状态。通过状态实现历史数据和未来数据的结合处理。
举个例子,在对数据求和的场景中,第n条数据的求和结果是前n-1条数据的求和结果再加上第n条数据的值。因此必须记录前n-1条数据的累加和,才能在第n条数据到达时,得到前n条数据的累加和。
记录的前n-1条数据的累加和就是状态数据。
并不是全部的流计算场景都需要状态,如单词大小写转换的场景中,每条数据的处理仅和当前数据有关,因此不需要状态。
- 什么是state backend?
state backend用于存储flink管理的状态。有多种实现方式,不同实现方式中数据结构和存储方式不同,对快照的支持也有所区别。
- 什么是checkpoint?
checkpoint可以简单理解为游戏中的进度存档,在游戏中当玩家死亡或再次打开游戏时,可以从最近的游戏存档继续,而无需重头开始。checkpoint即作业在某个时刻的快照信息(状态的快照),如某个时刻数据源(消息队列)的消费位移,算子状态等。当发生故障恢复时,会从最新(最近)的一次checkpoint来恢复整个应用程序。
checkpoint是容错机制。checkpoint默认关闭,开启后会根据配置来持续自动保存。
- 什么是savepoint?
checkpoint是根据配置项自动周期性进行的“存档,而savepoint则是需要手动触发的“存档”。savepoint是手动触发的checkpoint。
snapshot、checkpoint、savepoint在一些语境中是可以互换的,表示相同的含义。
- state backend、checkpoint和savepoint的关系?
state backend表示的是状态,而checkpoint和savepoint表示的是状态在某个时刻的快照。state backend是状态存储。checkpoint和savepoint是容错机制。
2. checkpoint
2.1. 工作原理
checkpoint是由Jobmanager中的checkpoint coordinator来协调并在TaskManager执行的。当checkpoint开始时,所有的Source将会记录数据的偏移量,并将有编号的barrier插入到流中。barrier将流分划分了前一个checkpoint和下一个checkpoint两部分。当job graph中每个运算符收到其中一个barrier时,将开始记录状态。
当具有两个Input的运算符,默认情况下会执行barrier alignment。一个算子可能有多个Input,每个Input中都会携带barrier,根据运算符是否要等待全部Input中barrier将checkpoint分成aligned和unaligned(对齐和不对齐)的checkpoint。
state backend使用copy-on-write机制允许流处理不受阻碍得继续执行,同时对旧版本的状态进行异步快照,当快照被持久化后,旧版本的状态被清理。
2.2. 精确一次语义和端到端的精确一次?
- 精确一次语义
精确一次语义的含义是,每个事件都会只影响flink管理的状态一次。并不是每个事件都只会处理一次。barrier alignment仅仅在精确一次语义提供保障,如果不需要精确一次语义,可以使用至少一次来获取一些性能。这具有禁用barrier alignment的效果。
- 端到端的精确一次
端到端的精确一次语句是指来自Source中每个每个事件恰好影响sink一次。实现这个必须具备两个条件,Source必须是可重放的(如kafka)并且sink必须支持事务或幂等的。
2.3. checkpoint storage
checkpoint期间状态快照保留在哪里取决于所配置的checkpoint storage。
checkpoint storage提供了两种实现,基于分布式文件系统和基于JobManager jvm heap
- 基于分布式文件系统,
FileSystemCheckpointStorage
- 基于Jobmanager jvm heap,
JobManagerCheckpointStorage
,默认方式。
JobManagerCheckpointStorage
使用方式
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE));
MAX_MEM_STATE_SIZE的默认值5M,当快照超过此大小时checkpoint将失败,从而避免jobmanager OOM。无论配置的状态最大值是多少,状态都不能大于akka frame size(用于控制jogmanager和taskmanager之间发送消息的最大大小)。
FileSystemCheckpointStorage,配置了checkpoint路径后将会使用此方式,在执行checkpoint期间,状态快照将写入配置的文件系统和目录中的文件中。极少的元数据存储在JobManager内存中。
使用方式
// 全局配置 state.checkpoints.dir: hdfs:///checkpoints/
// 或在代码中为每个job配置
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
// 或
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs:///checkpoints-data/"));
目录结构
/user-defined-checkpoint-dir/{job-id}|+ --shared/ (这个目录表示多个checkpoint一部分的状态)+ --taskowned/ (该目录表示jobmanager绝对无法丢失的状态)+ --chk-1/ (其他目录表示单独属于一个checkpoint的状态)+ --chk-2/+ --chk-3/...
2.4. retained checkpoint
默认情况下,flink仅仅保存最近的n个checkpoint并且取消作业或作业失败时删除它们,checkpoint结果仅用于在作业失败时自动使用其进行恢复作业。可以手动配置将checkpoint保留下来。这样在作业取消或失败时,可以手动使用保留下来的快照进行作业恢复。
使用方式
CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 可选项
// NO_EXTERNALIZED_CHECKPOINTS,默认值,禁用retained checkpoint
// DELETE_ON_CANCELLATION,作业被取消时ck将被删除。但当作业状态为JobStatus.FAILED时ck会保留
// RETAIN_ON_CANCELLATION,作业被取消时ck将保留(保留下来的数据需要手动删除)。但当作业状态为JobStatus.FAILED时ck会保留
2.5. 相关配置
- checkpoint间隔
env.enableCheckpointing(1000);
- checkpoint存储,设置存储checkpoint快照数据的位置,默认情况下使用jobmanager heap
// 当配置路径后,将使用FileSystemCheckpointStorage方式的checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir");
- 外部化的checkpoint,即上文retained checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- 精确一次 至少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- 超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
- 两次checkpoint的最小时间间隔,如果将该值设置为5s,则无论持续时间和间隔时间设置为何值,下一次checkpoint都将在上一个checkpoint完成后不早于5秒启动,这个值意味间隔时间将永远不会小于该时间,这个值还以意味着checkpoint的并发量为1。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
- 可以容忍的checkpoint连续失败的个数,默认为0。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
- checkpoint的并行数量,默认情况下flink将不会再一个checkpoint进行时触发另一个checkpoint。定义了最小间隔时间后则不能使用该选项。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- 非对齐的checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
- dag中包含有界数据源时的checkpoint,从1.14版本开始支持,1.15版本默认开启。
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
2.6. checkpointing under backpressure
通常情况下,checkpoint时间由此过程中的同步和异步部分主导。但是在作业处于背压下时,checkpoint端到端时间主要影响因数是barriers传播到所有subtask/operators的时间。
3. savepoint
savepoint也是由checkpoint机制创建的。
savepoint由两部分组成,包含二进制文件的目录(通常较大)和元数据文件(相对较小)。二进制文件是状态快照的纯净文件,元数据文件中包含了二进制文件相对路径的指针。
通过state.savepoints.dir
来指定savepoint文件路径。
env.setDefaultSavepointDir("hdfs:///flink/savepoints");
savepoint可以将整个文件目录移动或复制到其他地方使用, 而checkpoint由于包含一些绝对路径,无法使用移动或复制后文件。
3.1. savepoint和checkpoint
savepoint和checkpoint之间的不同类似于传统数据中备份与恢复日志的不同。savepoint 代表数据库中的备份,checkpoint 代表数据中的日志恢复。
checkpoint的主要意图是一种异常时的容错机制。生命周期由flink管理,无需用户交互。checkpoint被频繁触发和依赖于故障恢复,因此checkpoint主要设计目标是尽可能轻量级的创建和尽可能的尽快恢复。
尽管savepoint也是使用checkpoint机制来创建的。但是和checkpoint的概念是不同的。savepoint的设计意图是可移植性和操作灵活性,尤其是作业更改方面,用于有计划的手动操作。如升级flink版本,更改job graph等,使用savepoint对作业进行恢复。
4. state backend
flink状态存储在state backend。state backend有两种实现:基于rocksDB(本地磁盘)和基于jvm heap(内存)
- 基于rocksDB,
EmbeddedRocksDBStateBackend
,访问和更新状态涉及序列化和反序列化,成本更高,相对内存而言较慢,但是可以允许巨量的状态。支持增量快照。 - 基于Java heap,
HashMapStateBackend
,默认方式,访问和更新状态涉及在heap上读写对象。
HashMapStateBackend
状态将作为jvm heap上的对象进行存储。由于作为对象进行存储,因此对象重用(reuse Object)是不安全的。这种建议将 managed memory 设置为0。从而使JVM为用户代码分配最大内存。状态大小受限于集群可用的内容大小。
EmbeddedRocksDBStateBackend
状态存储在rocksDB数据库中,即TaskManager的本地磁盘中,数据存储为序列化的字节数组。状态大小受限于磁盘空间大小。对象重用是安全的。当前唯一支持增量checkpoint的实现方式。
使用方式
// state.backend 配置项,可选项hashmap (HashMapStateBackend), rocksdb (EmbeddedRocksDBStateBackend)
// 或
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
4.1. 旧版的state backend实现
从1.13版本开始,社区重新设计了state backend实现类,新实现类目的是为了帮助用户更好的理解状态存储和checkpoint存储的分离。并未影响flink的state backend和checkpoint进行运行时实现和特征。可以使用新api来迁移老版本的应用程序,且不会丢失状态和一致性。
- 旧
MemoryStateBackend
,等价于HashMapStateBackend
andJobManagerCheckpointStorage
. - 旧
FsStateBackend
,等价于HashMapStateBackend
andFileSystemCheckpointStorage
. - 旧
RocksDBStateBackend
,等价于EmbeddedRocksDBStateBackend
andFileSystemCheckpointStorage
.
4.2. 基于rocksDB的state backend
rocksDB的方式支持增量快照,增量快照是建立在先前旧快照基础上的。flink利用了rocksDB的内部的压缩机制对旧快照进行合并,因此增量快照的历史记录不会无限增长,旧的快照结果最终会被自动合并和修剪。
增量快照需要手动开启。开启增量快照后,在web UI上展示checkpoint data size仅仅表示增量数据的大小。
使用方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// true表示开启增量快照
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(backend);
4.2.1. 内存管理
rocksDB state backend的性能很大程度上取决于它可用的内存量。增加内存对提高性能有很大帮助。或者调整内存的功能。
默认情况下,RocksDB State Backend 使用 Flink 的managed memory预算作为 RocksDB 的缓冲区和缓存(state.backend.rocksdb.memory.managed:true
)。
flink并不是直接将Manager memory的内存分配给rocksDB,而是将通过配置和限制来确保rocksDB的内存使用保持在Manager momery内存范围内。
flink在单个slot中的所有rocksDB实例之间共享缓存和写缓冲区。这种共享管理帮助限制rocksDB主要内存消耗组件的内存使用。
- block cache:用于缓存从磁盘读取的数据块
- index and bloom filters:用于加速数据检索,过滤不必要的读取
- memtables:用于暂时存储写入的数据,直到他们被刷新到磁盘。
flink提供了额外的配置来调整rocksDB中写路径和读路径之间的内存分配
- 写路径(memtable),如果发生频繁的刷新,则表示写缓存区内存不足,适当调整这部分内存。
state.backend.rocksdb.memory.write-buffer-ratio
,默认0.5,即50% - 读路径(index and bloom fliters,剩余缓存),如果发生频繁缓存未命中,则表示读操作的内存不足,适当调整这部分内存。
state.backend.rocksdb.memory.high-prio-pool-ratio
,默认0.1,即10%
在内存层面的调优,大多数情况下,增加Manager memory。
建议设置的配置。
state.backend.rocksdb.predefined-options
,这个参数允许用户选择一组预定义的 RocksDB 配置,以优化不同的使用场景。具体来说,这个参数可以帮助用户在性能和资源使用之间进行权衡,而不需要手动调整大量的 RocksDB 配置项。
- DEFAULT: 使用 RocksDB 的默认配置。
- SPINNING_DISK_OPTIMIZED: 针对传统硬盘(HDD)进行优化。建议设置成此项
- FLASH_SSD_OPTIMIZED: 针对固态硬盘(SSD)进行优化。
- SPINNING_DISK_OPTIMIZED_HIGH_MEM: 针对高内存环境下的 HDD 进行优化。单个slot的状态达到GB,且托管内存充裕,设置为此最佳。
- FLASH_SSD_OPTIMIZED_HIGH_MEM: 针对高内存环境下的 SSD 进行优化。
必要的RocksDB监控,观察是否有性能瓶颈,观察完毕后关闭它们
state.backend.rocksdb.metrics.block-cache-capacity
,显示了为块缓存分配的总内存量,帮助了解块缓存的大小是否适合当前的需求。state.backend.rocksdb.metrics.block-cache-usage
,监视块缓存内存的使用情况,帮助了解在运行时使用了多少内存来缓存数据块。state.backend.rocksdb.metrics.cur-size-all-mem-tables
,监视以字节为单位的active和unflush的不可变 memtables 的大致大小。state.backend.rocksdb.metrics.mem-table-flush-pending
,监控 RocksDB 中挂起的 memtable 刷新次数。state.backend.rocksdb.metrics.num-running-flushes
,监控当前正在运行的flush次数。state.backend.rocksdb.metrics.num-running-compactions
,监控当前运行的压缩次数。
4.3. Timers
当state backend选则rocksDB时,定时器默认也存储在rocksDB中,这是更健壮和可扩展的选则。但是这样会有较高的成本,因此flink提供了基于JVM heap memory存储的定时器的选项(state.backend.rocksdb.timer-service.factory
=heap)。当在无窗口、在ProcessFunction为使用定时器的场景下,将定时器存储在Heap中将会有更优的性能。但是可能会增加checkpoint时间,并且无法自然的扩展到内存之外。
当使用基于rocksDb的state backend和基于heap存储的定时器时,定时器不支持异步快照,其他状态仍会异步存储。