当前位置: 首页 > news >正文

Flink Savepoints 总结

官网

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/ops/state/savepoints/

测试任务

先起一个测试任务,就用之前文章中的 cdc_mysql2mysql

bin/sql-client.sh -f sql/cdc_mysql2mysql.sql

使用 YARN 触发 Savepoint

根据官方文档:

bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

那么命令应该为

bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql -yid application_1750755047138_0076

报错:

java.lang.NoSuchMethodError: org.apache.commons.cli.CommandLine.hasOption(Lorg/apache/commons/cli/Option;)Zat org.apache.flink.client.cli.SavepointOptions.<init>(SavepointOptions.java:45)at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:738)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1118)at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1198)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1198)

查看有哪些jar包中有这个冲突的类

grep -rl "org.apache.commons.cli.CommandLine" lib/*lib/flink-dist-1.15.3.jar
lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar
lib/flink-sql-connector-hbase-2.2-1.15.3.jar
lib/hudi-flink1.15-bundle-0.13.0.jar

最终确定冲突的包为 flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar

mv lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar.bak

继续尝试,但是报错:

org.apache.flink.util.FlinkException: No cluster id was specified. Please specify a cluster to which you would like to connect.at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1038)at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:784)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1118)at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1198)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1198)

这与不指定yid报的错一样

bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql

猜测 -yid 不生效,改为使用 -Dyarn.application.id

bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql -Dyarn.application.id=application_1750755047138_0076

然后报错:

Caused by: java.io.IOException: Failed to create savepoint directory at /savepoint/cdc_mysql2mysql

观察 savepoint 相关的日志

yarn logs -applicationId application_1750755047138_0076 | grep "Savepoint"
2025-06-27 10:45:31,471 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7.at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.initializeLocationForSavepoint(AbstractFsCheckpointStorageAccess.java:191) ~[flink-dist-1.15.3.jar:1.15.3]

这时,虽然因为创建 savepoint 文件夹失败了,但是其实已经触发了savepoint,所以有 savepoint 相关的日志

将 savepoint 路径加上 hdfs:// 前缀

bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 hdfs:///savepoint/cdc_mysql2mysql -Dyarn.application.id=application_1750755047138_0076

成功:

Triggering savepoint for job 24f61a106d31205a122b66e45b2984e7.
Waiting for response...
Savepoint completed. Path: hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82
You can resume your program from this savepoint with the run command.

查看 savepoint 结果

hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 1 items
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82

观察 savepoint 相关的日志

2025-06-27 10:20:02,159 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 5 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750990802141 for job 2025-06-27 10:45:31,471 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7.at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.initializeLocationForSavepoint(AbstractFsCheckpointStorageAccess.java:191) ~[flink-dist-1.15.3.jar:1.15.3]
2025-06-27 10:46:02,645 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 19 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992362637 for job 24f61a106d31205a122b66e45b2984e7.

又多了一条 savepoint 相关的日志

我们在 web ui 中也可以看到 最新的 savepoint (这里截的别的任务的图)

-yid 不生效

在 Flink 1.10 及以后的版本中,-yid 参数已被弃用,必须使用 -Dyarn.application.id 来指定 YARN Application ID。Flink 在 2020 年左右(1.10 版本)重构了命令行参数解析系统,将所有 YARN 相关参数统一为 -D 前缀的配置项,目的是:

  • 简化参数体系:避免记忆 -m、-yid、-ytm 等特殊前缀。
  • 统一配置方式:所有参数都可以通过 -D 传递,与 flink-conf.yaml 保持一致。
  • 减少版本间兼容性问题。

使用 savepoint 取消作业 (cancel)

bin/flink cancel -s hdfs:///savepoint/cdc_mysql2mysql 24f61a106d31205a122b66e45b2984e7 -Dyarn.application.id=application_1750755047138_0076
Cancelled job 24f61a106d31205a122b66e45b2984e7. Savepoint stored in hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a.

-s--withSavepoint 的简写

观察 savepoint 相关的日志

2025-06-27 10:45:31,471 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7.at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.initializeLocationForSavepoint(AbstractFsCheckpointStorageAccess.java:191) ~[flink-dist-1.15.3.jar:1.15.3]
2025-06-27 10:46:02,645 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 19 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992362637 for job 24f61a106d31205a122b66e45b2984e7.
2025-06-27 10:48:05,420 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 33 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992485404 for job 24f61a106d31205a122b66e45b2984e7.
2025-06-27 10:48:05,594 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Savepoint stored in hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a. Now cancelling 24f61a106d31205a122b66e45b2984e7.

savepoint 结果:

hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 2 items
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:48 /savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82

从 web ui 中看状态为 CANCELED :

使用 savepoint 停止作业 (stop)

先试一下停止刚才取消的作业

bin/flink stop -p hdfs:///savepoint/cdc_mysql2mysql/ 24f61a106d31205a122b66e45b2984e7 -Dyarn.application.id=application_1750755047138_0076

因为刚才的作业已经取消了,所以报错:

Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (24f61a106d31205a122b66e45b2984e7)

那么我们再启动一个新的任务

bin/flink stop -p hdfs:///savepoint/cdc_mysql2mysql/ 37a54dad1f7c781ffc0001555b5dcee6 -Dyarn.application.id=application_1750755047138_0077
Savepoint completed. Path: hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-37a54d-0e07c7bd195d

-p--savepointPath 的简写

观察日志:

2025-06-27 10:59:34,479 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 14 (type=SavepointType{name='Suspend Savepoint', postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1750993174460 for job 37a54dad1f7c781ffc0001555b5dcee6.

观察结果

hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 3 items
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:48 /savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:59 /savepoint/cdc_mysql2mysql/savepoint-37a54d-0e07c7bd195d

根据结果可知,savepoint 路径的名称中的中间部分是 jobId 的前六个字符。

从 web ui 中看状态为 FINISHED :

SET ‘table.dml-sync’ = ‘true’;

通过 sql-client 提交的任务,默认参数下,当我们 cancel 或者 stop 任务后,只有taskmanger挂掉,jobmanager还存活,所以yarn任务还是running,可以通过设置参数: SET ‘table.dml-sync’ = ‘true’; 这样 cancel 或者 stop 任务后对应的 yarn 任务也会 FINISHED 。
具体可参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sqlclient/

与 flink run 的 --detached(简写:-d)表现效果一样。

cancel 和 stop

默认不指定 savepoint:

bin/flink cancel 15312da78d337525a36ca3cf40f04ff9 -Dyarn.application.id=application_1750755047138_0092
bin/flink stop 15312da78d337525a36ca3cf40f04ff9 -Dyarn.application.id=application_1750755047138_0092 

这时 cancel 是成功的:

Cancelled job 15312da78d337525a36ca3cf40f04ff9.

但stop会报错:

org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided.

说明 stop 必须指定 savepoint 路径,或者说在任务中配置了 state.savepoints.dir 会自动生成 savepoint
sql 中添加:

set state.savepoints.dir=hdfs:///flink/savepoints;

再stop:

Savepoint completed. Path: hdfs://cluster1/flink/savepoints/savepoint-9324cd-af49c550593d
特性flink cancelflink stop
终止方式强制终止:立即中断作业执行优雅停止:等待当前处理中的数据完成
savepoint 生成默认不生成,需通过 -s--withSavepoint 参数指定默认生成, 可以通过 -p--savepointPath 参数显式指定
适用场景作业出现故障需紧急终止;无需保留作业状态需要平滑下线作业;希望后续恢复作业(需配合 savepoint)
作业状态最终状态为 CANCELED最终状态为 FINISHED(如果所有算子成功关闭)

可以结合官网了解更多:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/cli/

从作业恢复

这与 Flink Checkpoint 的恢复方法一样。

SQL

SET execution.savepoint.path = hdfs:///savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a;

命令

-s,--fromSavepoint <savepointPath>

示例

bin/flink run -m yarn-cluster \-d \--fromSavepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-e90151-d60a4ba04076 \-c com.dkl.flink.Test \/opt/dkl/flink-demo-1.0.jar

修改任务算子

如果任务有改动,比如添加或删除了 set pipeline.operator-chaining=false; 那么在恢复任务时会抛出异常:

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

可以通过 --allowNonRestoredState(简写:-n) 选项跳过无法映射到新程序的状态,但这样不能保证 Exactly-Once。

-n,--allowNonRestoredState

示例:

bin/flink run -m yarn-cluster \-d \--fromSavepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-e90151-d60a4ba04076 \-c com.dkl.flink.Test \--allowNonRestoredState \/opt/dkl/flink-demo-1.0.jar

没有设置 checkpoint 的任务

无论有没有设置checkpoint,都支持savepoint,通过下面的日志可以发现,当没有设置 checkpoint 时,savepoint对应的 checkpoint 数字值是连续的。

yarn logs -applicationId application_1750755047138_0093 | grep "Savepoint"
25/06/27 15:14:40 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
25/06/27 15:14:40 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
2025-06-27 15:12:42,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 1 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1751008362762 for job 94b5cf983749eec750ab8e9d78b0cd7d.
2025-06-27 15:13:04,664 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 2 (type=SavepointType{name='Suspend Savepoint', postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1751008384640 for job 94b5cf983749eec750ab8e9d78b0cd7d.

迁移支持

特性CheckpointSavepoint
迁移支持有限支持,需严格条件设计支持跨环境迁移
格式稳定性版本敏感,易损坏版本无关,向后兼容
拓扑变更支持通常不支持支持算子增删和并行度调整
手动干预复杂度高,需修改配置和代码低,CLI 命令直接支持
http://www.dtcms.com/a/263613.html

相关文章:

  • js代码09
  • Spring Boot WebSocket方案终极指南:Netty与官方Starter对比与实践
  • MFC的List Control自适应主界面大小
  • Android Gradle 插件和 Android Studio 兼容性
  • Windows下配置Docker+WSL集成开发环境
  • 【C#】如果有一个数值如 168.0000100,如何去除末尾的无效零,只显示有效的小数位数,让DeepSeek给我们解答
  • 飞算JavaAI—AI编程助手 | 编程领域的‘高科技指南针’,精准导航开发!
  • 小米YU7使用UWB技术,厘米级定位精准迎宾,安全防破解无感控车
  • CentOS系统新手指导手册
  • 微信小程序实现table表格
  • 【锂电池剩余寿命预测】GRU门控循环单元锂电池剩余寿命预测(Pytorch完整源码和数据)
  • 清理 Docker 缓存占用
  • 前端常用构建工具介绍及对比
  • 西交从语义到关系、重塑具身导航策略!RSRNav:基于空间关系推理的图像目标导航
  • android stdio 创建 mediaplayertest
  • SpringBoot+MySQL旅游资源管理系统Java源码
  • Reactor ConnectableFlux支持多订阅者
  • OpenCV CUDA模块设备层-----双曲正切函数tanh()
  • IDEA相关配置记录
  • 基于Python的GIS-RS多源数据处理(TIF/SHP/NC/...)【20250630】
  • 国产化替换中政务行业通用的解决方案是什么?需要注意的事项有哪些?
  • 03认证原理自定义认证添加认证验证码
  • Android阴影效果的艺术与实现:从入门到精通
  • GO 语言学习 之 Map
  • 38.docker启动python解释器,pycharm通过SSH服务直连
  • DBeaver 设置阿里云中央仓库地址的操作步骤
  • AlpineLinux安装docker
  • 我认知的AI宇宙系列第三期
  • 车载Tier1 supplier梳理
  • 使用 collected 向 TDengine 写入数据