当前位置: 首页 > news >正文

Flink状态和容错-基础篇

1. 概念

flink的状态和容错绕不开3个概念,state backends和checkpoint、savepoint。本文重心即搞清楚这3部分内容。

容错机制是基于在状态快照的一种恢复方式。但是状态和容错要分开来看。

  • 什么是状态,为什么需要状态?

流计算和批计算在数据源上最大的区别是,流计算中的数据是无边界的,数据持续不断,而批计算中数据是有边界的,在计算时可以一次性将数据全部拿到。在流计算中无法拿到全部数据进行计算结果,因此需要将历史数据的处理结果记录下来,这个就是状态。通过状态实现历史数据和未来数据的结合处理。

举个例子,在对数据求和的场景中,第n条数据的求和结果是前n-1条数据的求和结果再加上第n条数据的值。因此必须记录前n-1条数据的累加和,才能在第n条数据到达时,得到前n条数据的累加和。
记录的前n-1条数据的累加和就是状态数据。

并不是全部的流计算场景都需要状态,如单词大小写转换的场景中,每条数据的处理仅和当前数据有关,因此不需要状态。

  • 什么是state backend?

state backend用于存储flink管理的状态。有多种实现方式,不同实现方式中数据结构和存储方式不同,对快照的支持也有所区别。

  • 什么是checkpoint?

checkpoint可以简单理解为游戏中的进度存档,在游戏中当玩家死亡或再次打开游戏时,可以从最近的游戏存档继续,而无需重头开始。checkpoint即作业在某个时刻的快照信息(状态的快照),如某个时刻数据源(消息队列)的消费位移,算子状态等。当发生故障恢复时,会从最新(最近)的一次checkpoint来恢复整个应用程序。

checkpoint是容错机制。checkpoint默认关闭,开启后会根据配置来持续自动保存。

  • 什么是savepoint?

checkpoint是根据配置项自动周期性进行的“存档,而savepoint则是需要手动触发的“存档”。savepoint是手动触发的checkpoint。

snapshot、checkpoint、savepoint在一些语境中是可以互换的,表示相同的含义。

  • state backend、checkpoint和savepoint的关系?

state backend表示的是状态,而checkpoint和savepoint表示的是状态在某个时刻的快照。state backend是状态存储。checkpoint和savepoint是容错机制。

2. checkpoint

2.1. 工作原理

checkpoint是由Jobmanager中的checkpoint coordinator来协调并在TaskManager执行的。当checkpoint开始时,所有的Source将会记录数据的偏移量,并将有编号的barrier插入到流中。barrier将流分划分了前一个checkpoint和下一个checkpoint两部分。当job graph中每个运算符收到其中一个barrier时,将开始记录状态。

当具有两个Input的运算符,默认情况下会执行barrier alignment。一个算子可能有多个Input,每个Input中都会携带barrier,根据运算符是否要等待全部Input中barrier将checkpoint分成aligned和unaligned(对齐和不对齐)的checkpoint。

state backend使用copy-on-write机制允许流处理不受阻碍得继续执行,同时对旧版本的状态进行异步快照,当快照被持久化后,旧版本的状态被清理。

2.2. 精确一次语义和端到端的精确一次?

  • 精确一次语义

精确一次语义的含义是,每个事件都会只影响flink管理的状态一次。并不是每个事件都只会处理一次。barrier alignment仅仅在精确一次语义提供保障,如果不需要精确一次语义,可以使用至少一次来获取一些性能。这具有禁用barrier alignment的效果。

  • 端到端的精确一次

端到端的精确一次语句是指来自Source中每个每个事件恰好影响sink一次。实现这个必须具备两个条件,Source必须是可重放的(如kafka)并且sink必须支持事务或幂等的。

2.3. checkpoint storage

checkpoint期间状态快照保留在哪里取决于所配置的checkpoint storage。

checkpoint storage提供了两种实现,基于分布式文件系统和基于JobManager jvm heap

  • 基于分布式文件系统,FileSystemCheckpointStorage
  • 基于Jobmanager jvm heap,JobManagerCheckpointStorage,默认方式。

JobManagerCheckpointStorage

使用方式

env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage(MAX_MEM_STATE_SIZE));

MAX_MEM_STATE_SIZE的默认值5M,当快照超过此大小时checkpoint将失败,从而避免jobmanager OOM。无论配置的状态最大值是多少,状态都不能大于akka frame size(用于控制jogmanager和taskmanager之间发送消息的最大大小)。

FileSystemCheckpointStorage,配置了checkpoint路径后将会使用此方式,在执行checkpoint期间,状态快照将写入配置的文件系统和目录中的文件中。极少的元数据存储在JobManager内存中。

使用方式

// 全局配置 state.checkpoints.dir: hdfs:///checkpoints/
// 或在代码中为每个job配置
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
// 或
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs:///checkpoints-data/"));

目录结构

/user-defined-checkpoint-dir/{job-id}|+ --shared/ (这个目录表示多个checkpoint一部分的状态)+ --taskowned/ (该目录表示jobmanager绝对无法丢失的状态)+ --chk-1/ (其他目录表示单独属于一个checkpoint的状态)+ --chk-2/+ --chk-3/...  

2.4. retained checkpoint

默认情况下,flink仅仅保存最近的n个checkpoint并且取消作业或作业失败时删除它们,checkpoint结果仅用于在作业失败时自动使用其进行恢复作业。可以手动配置将checkpoint保留下来。这样在作业取消或失败时,可以手动使用保留下来的快照进行作业恢复。

使用方式

CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 可选项
// NO_EXTERNALIZED_CHECKPOINTS,默认值,禁用retained checkpoint
// DELETE_ON_CANCELLATION,作业被取消时ck将被删除。但当作业状态为JobStatus.FAILED时ck会保留
// RETAIN_ON_CANCELLATION,作业被取消时ck将保留(保留下来的数据需要手动删除)。但当作业状态为JobStatus.FAILED时ck会保留

2.5. 相关配置

  • checkpoint间隔
env.enableCheckpointing(1000);
  • checkpoint存储,设置存储checkpoint快照数据的位置,默认情况下使用jobmanager heap
// 当配置路径后,将使用FileSystemCheckpointStorage方式的checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir");
  • 外部化的checkpoint,即上文retained checkpoint
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  • 精确一次 至少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  • 超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
  • 两次checkpoint的最小时间间隔,如果将该值设置为5s,则无论持续时间和间隔时间设置为何值,下一次checkpoint都将在上一个checkpoint完成后不早于5秒启动,这个值意味间隔时间将永远不会小于该时间,这个值还以意味着checkpoint的并发量为1。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  • 可以容忍的checkpoint连续失败的个数,默认为0。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
  • checkpoint的并行数量,默认情况下flink将不会再一个checkpoint进行时触发另一个checkpoint。定义了最小间隔时间后则不能使用该选项。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  • 非对齐的checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
  • dag中包含有界数据源时的checkpoint,从1.14版本开始支持,1.15版本默认开启。
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);

2.6. checkpointing under backpressure

通常情况下,checkpoint时间由此过程中的同步和异步部分主导。但是在作业处于背压下时,checkpoint端到端时间主要影响因数是barriers传播到所有subtask/operators的时间。

3. savepoint

savepoint也是由checkpoint机制创建的。

savepoint由两部分组成,包含二进制文件的目录(通常较大)和元数据文件(相对较小)。二进制文件是状态快照的纯净文件,元数据文件中包含了二进制文件相对路径的指针。

通过state.savepoints.dir来指定savepoint文件路径。

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

savepoint可以将整个文件目录移动或复制到其他地方使用, 而checkpoint由于包含一些绝对路径,无法使用移动或复制后文件。

3.1. savepoint和checkpoint

savepoint和checkpoint之间的不同类似于传统数据中备份与恢复日志的不同。savepoint 代表数据库中的备份,checkpoint 代表数据中的日志恢复。

checkpoint的主要意图是一种异常时的容错机制。生命周期由flink管理,无需用户交互。checkpoint被频繁触发和依赖于故障恢复,因此checkpoint主要设计目标是尽可能轻量级的创建和尽可能的尽快恢复。

尽管savepoint也是使用checkpoint机制来创建的。但是和checkpoint的概念是不同的。savepoint的设计意图是可移植性和操作灵活性,尤其是作业更改方面,用于有计划的手动操作。如升级flink版本,更改job graph等,使用savepoint对作业进行恢复。

4. state backend

flink状态存储在state backend。state backend有两种实现:基于rocksDB(本地磁盘)和基于jvm heap(内存)

  • 基于rocksDB,EmbeddedRocksDBStateBackend,访问和更新状态涉及序列化和反序列化,成本更高,相对内存而言较慢,但是可以允许巨量的状态。支持增量快照。
  • 基于Java heap,HashMapStateBackend,默认方式,访问和更新状态涉及在heap上读写对象。

HashMapStateBackend

状态将作为jvm heap上的对象进行存储。由于作为对象进行存储,因此对象重用(reuse Object)是不安全的。这种建议将 managed memory 设置为0。从而使JVM为用户代码分配最大内存。状态大小受限于集群可用的内容大小。

EmbeddedRocksDBStateBackend

状态存储在rocksDB数据库中,即TaskManager的本地磁盘中,数据存储为序列化的字节数组。状态大小受限于磁盘空间大小。对象重用是安全的。当前唯一支持增量checkpoint的实现方式。

使用方式

// state.backend 配置项,可选项hashmap (HashMapStateBackend), rocksdb (EmbeddedRocksDBStateBackend)
// 或
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

4.1. 旧版的state backend实现

从1.13版本开始,社区重新设计了state backend实现类,新实现类目的是为了帮助用户更好的理解状态存储和checkpoint存储的分离。并未影响flink的state backend和checkpoint进行运行时实现和特征。可以使用新api来迁移老版本的应用程序,且不会丢失状态和一致性。

  • MemoryStateBackend,等价于HashMapStateBackend and JobManagerCheckpointStorage.
  • FsStateBackend,等价于HashMapStateBackend and FileSystemCheckpointStorage.
  • RocksDBStateBackend,等价于EmbeddedRocksDBStateBackend and FileSystemCheckpointStorage.

4.2. 基于rocksDB的state backend

rocksDB的方式支持增量快照,增量快照是建立在先前旧快照基础上的。flink利用了rocksDB的内部的压缩机制对旧快照进行合并,因此增量快照的历史记录不会无限增长,旧的快照结果最终会被自动合并和修剪。

增量快照需要手动开启。开启增量快照后,在web UI上展示checkpoint data size仅仅表示增量数据的大小。

使用方式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// true表示开启增量快照
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(backend);
4.2.1. 内存管理

rocksDB state backend的性能很大程度上取决于它可用的内存量。增加内存对提高性能有很大帮助。或者调整内存的功能。

默认情况下,RocksDB State Backend 使用 Flink 的managed memory预算作为 RocksDB 的缓冲区和缓存(state.backend.rocksdb.memory.managed:true)。

flink并不是直接将Manager memory的内存分配给rocksDB,而是将通过配置和限制来确保rocksDB的内存使用保持在Manager momery内存范围内。

flink在单个slot中的所有rocksDB实例之间共享缓存和写缓冲区。这种共享管理帮助限制rocksDB主要内存消耗组件的内存使用。

  • block cache:用于缓存从磁盘读取的数据块
  • index and bloom filters:用于加速数据检索,过滤不必要的读取
  • memtables:用于暂时存储写入的数据,直到他们被刷新到磁盘。

flink提供了额外的配置来调整rocksDB中写路径和读路径之间的内存分配

  • 写路径(memtable),如果发生频繁的刷新,则表示写缓存区内存不足,适当调整这部分内存。state.backend.rocksdb.memory.write-buffer-ratio,默认0.5,即50%
  • 读路径(index and bloom fliters,剩余缓存),如果发生频繁缓存未命中,则表示读操作的内存不足,适当调整这部分内存。state.backend.rocksdb.memory.high-prio-pool-ratio,默认0.1,即10%

在内存层面的调优,大多数情况下,增加Manager memory。

建议设置的配置。

state.backend.rocksdb.predefined-options,这个参数允许用户选择一组预定义的 RocksDB 配置,以优化不同的使用场景。具体来说,这个参数可以帮助用户在性能和资源使用之间进行权衡,而不需要手动调整大量的 RocksDB 配置项。

  • DEFAULT: 使用 RocksDB 的默认配置。
  • SPINNING_DISK_OPTIMIZED: 针对传统硬盘(HDD)进行优化。建议设置成此项
  • FLASH_SSD_OPTIMIZED: 针对固态硬盘(SSD)进行优化。
  • SPINNING_DISK_OPTIMIZED_HIGH_MEM: 针对高内存环境下的 HDD 进行优化。单个slot的状态达到GB,且托管内存充裕,设置为此最佳。
  • FLASH_SSD_OPTIMIZED_HIGH_MEM: 针对高内存环境下的 SSD 进行优化。

必要的RocksDB监控,观察是否有性能瓶颈,观察完毕后关闭它们

  • state.backend.rocksdb.metrics.block-cache-capacity,显示了为块缓存分配的总内存量,帮助了解块缓存的大小是否适合当前的需求。
  • state.backend.rocksdb.metrics.block-cache-usage,监视块缓存内存的使用情况,帮助了解在运行时使用了多少内存来缓存数据块。
  • state.backend.rocksdb.metrics.cur-size-all-mem-tables,监视以字节为单位的active和unflush的不可变 memtables 的大致大小。
  • state.backend.rocksdb.metrics.mem-table-flush-pending,监控 RocksDB 中挂起的 memtable 刷新次数。
  • state.backend.rocksdb.metrics.num-running-flushes,监控当前正在运行的flush次数。
  • state.backend.rocksdb.metrics.num-running-compactions,监控当前运行的压缩次数。
4.3. Timers

当state backend选则rocksDB时,定时器默认也存储在rocksDB中,这是更健壮和可扩展的选则。但是这样会有较高的成本,因此flink提供了基于JVM heap memory存储的定时器的选项(state.backend.rocksdb.timer-service.factory=heap)。当在无窗口、在ProcessFunction为使用定时器的场景下,将定时器存储在Heap中将会有更优的性能。但是可能会增加checkpoint时间,并且无法自然的扩展到内存之外。

当使用基于rocksDb的state backend和基于heap存储的定时器时,定时器不支持异步快照,其他状态仍会异步存储。

相关文章:

  • 哪里能找到免费网站seo学徒是做什么
  • 东莞做商城网站建设免费聊天软件
  • wordpress站点的根目录百度seo学院
  • 网站建设技术发展现状网站推广策划思路的内容
  • 义乌市建设银行分行网站发布外链的步骤
  • 网站可信认证在哪里做全网营销方案
  • Golang Kratos 系列:领域层model定义是自洽还是直接依赖第三方(三)
  • 帮助装修公司拓展客户资源的微信装修小程序怎么做?
  • 重点解析(软件工程)
  • MonkeyOCR在Win习题部署指南和报错提醒
  • 谷歌 Gemini 2.5 系列模型:性能、功能与应用全方位解析​
  • 深入理解RAG:大语言模型时代的知识增强架构
  • pyqt多界面
  • 人机协作新篇章:艾利特按摩机器人如何重塑健康生活
  • 【JS】整理常复用的JS函数合集
  • python有哪些常用的GUI(图形用户界面)库及选择指南
  • SpringCloud系列(34)--使用Hystrix进行服务熔断
  • c++ 类型擦除技术
  • 使用预训练权重在YOLO模型上训练新数据集的完整指南
  • 数字图像处理——滤波器核(kernel)
  • Jetson家族横向对比:如何选择你的边缘计算设备
  • Rust 项目实战:多线程 Web 服务器
  • 前端后端文件下载防抖实现方案
  • 基于大模型预测的化脓性阑尾炎诊疗方案研究报告
  • 【微信小程序】9、用户拒绝授权地理位置后再次请求授权
  • 【数据结构与算法】数据结构初阶:详解顺序表和链表(二)