Flink ExecutionConfig 实战并行度、序列化、对象重用与全局参数
一、什么是 ExecutionConfig?
ExecutionConfig 是作业内的运行时配置集合,通过 StreamExecutionEnvironment#getConfig() 获取;它只影响当前作业。如果你要改集群/所有作业的缺省值,请去改全局 Configuration(如 flink-conf.yaml)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig config = env.getConfig();
二、常用开关与推荐实践
| 能力 | 关键 API | 默认值 | 典型场景 | 建议 |
|---|---|---|---|---|
| 并行度 | setParallelism(int) | 集群默认 | 控制算子默认并行度 | 以环境/参数注入;不要硬编码 |
| 最大并行度 | setMaxParallelism(int) | 集群默认 | 决定 key-group 上限、影响动态扩缩容 | 128/256 较稳妥;上线前定好,后改需仔细评估 |
| 执行模式 | setExecutionMode(...) | PIPELINED | 流水/批式交换 | 流处理保持默认 |
| 闭包清理 | setClosureCleanerLevel(...) | RECURSIVE | 移除匿名函数对外部类的无用引用,避免序列化失败 | 保持默认;除非你非常确定 |
| 对象重用 | enableObjectReuse() | 关闭 | 极致低 GC | 高级选项:明确做了拷贝/不可变才开启 |
| Kryo/Avro 强制 | enableForceKryo() / enableForceAvro() | 关闭 | 内置序列化器异常或想统一栈 | 少用;按需局部开启并做基准测试 |
| 类型注册 | registerKryoType / registerPojoType / addDefaultKryoSerializer | 自动注册开启 | 降低序列化元数据开销 | 热路径类型建议显式注册 |
| 全局参数 | setGlobalJobParameters(...) | - | 在 UDF 内共享只读配置 | 推荐配合 ParameterTool |
| (弃用)重试 | setNumberOfExecutionRetries / setExecutionRetryDelay | - | 故障恢复 | 用重启策略替代(env.setRestartStrategy(...)) |
| 取消轮询间隔 | setTaskCancellationInterval(ms) | 30000 | 任务取消时中断轮询间隔 | 默认即可;长 GC/IO 可适当加大 |
三、参数注入 + ExecutionConfig + UDF 读取
1、Main 中设置
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ExecutionConfig cfg = env.getConfig();// ① 并行度与最大并行度(建议从参数配置)ParameterTool pt = ParameterTool.fromArgs(args);int p = pt.getInt("parallelism", env.getParallelism());int mp = pt.getInt("maxParallelism", 128);env.setParallelism(p);cfg.setMaxParallelism(mp);// ② 闭包清理(保持默认递归)cfg.setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.RECURSIVE);// ③ 序列化优化(示例)cfg.registerPojoType(MyPojo.class);cfg.registerKryoType(org.joda.time.DateTime.class);// cfg.addDefaultKryoSerializer(MyType.class, MyKryoSerializer.class);// ④ 对象重用(谨慎)if (pt.getBoolean("objectReuse", false)) {cfg.enableObjectReuse();}// ⑤ 全局参数:UDF 中可访问cfg.setGlobalJobParameters(pt);// pipeline ...env.execute("execution-config-demo");
}
2、在 UDF 中读取全局参数
public static final class MyRichMap extends RichMapFunction<String, String> {private transient ParameterTool pt;@Overridepublic void open(Configuration parameters) {GlobalJobParameters gp = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();if (gp instanceof ParameterTool) {this.pt = (ParameterTool) gp;} else {this.pt = ParameterTool.fromMap(gp.toMap()); // 兼容处理}}@Overridepublic String map(String value) {String prefix = pt.get("logPrefix", "[default]");return prefix + ":" + value;}
}
四、性能/兼容关键点
- 让类型可被 POJO 序列化识别:字段可见、无禁止反射、无奇怪泛型;否则会落到 Kryo。
- 高频对象显式注册:
registerPojoType/registerKryoType,减少类名写入与反射成本。 - 跨版本稳定性:更换 Kryo 序列化器或注册顺序变动,可能影响落盘状态/检查点兼容;生产前演练恢复。
- 仅当必要时强制 Kryo/Avro:统一栈虽方便,但要以基准数据说话,避免“为整齐而牺牲性能”。
五、能快一截,也最容易踩坑
启用 enableObjectReuse() 后,Flink 可能重复复用同一实例传给下游。你的代码必须:
- 立刻拷贝/消费输入对象,不要缓存引用;
- 产出的对象不要在后续修改(或者每次新建)。
反例(会出错):
map(value -> {value.setField("x"); // 修改被复用的对象return value; // 下游可能看到被后续修改过的内容
});
正例:
map(value -> value.copy()); // 或创建不可变对象
建议:默认关闭。只有在有压测且代码完全遵循不可变/拷贝规范时才开启。
六、别忽略的“架构性”参数
- **最大并行度(maxParallelism)**决定 key-group 上限,它与状态分片直接相关。
- 上线前确定好
maxParallelism(常见 128/256):后续变更需要考虑状态迁移与一致性。 - 作业并行度可以灵活调;但不要超过
maxParallelism。 - 有状态作业做动态扩缩容,
maxParallelism越合理越顺滑。
七、弃用项该怎么迁移?
setNumberOfExecutionRetries、setExecutionRetryDelay已弃用。- 迁移到重启策略(示例):
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // 重启次数Time.seconds(10)
));
八、长尾任务的最后“刹车片”
config.setTaskCancellationInterval(45_000); // 默认 30s,可按需调大
适用于:你有长 IO 或阻塞的算子,在取消时需要更“温和”的中断频率。
九、常见问题与排查
-
匿名函数序列化失败
- 现象:
NotSerializableException - 方案:保持
ClosureCleanerLevel.RECURSIVE,避免捕获不可序列化对象;重构为Rich*Function#open()初始化资源。
- 现象:
-
类型推断失败/性能差
- 现象:
InvalidTypesException或大量 Kryo 反射开销 - 方案:使用
.returns(Types...)/TypeHint<>;对热类型registerPojoType/registerKryoType。
- 现象:
-
状态兼容问题
- 现象:升级或调整序列化器后恢复失败
- 方案:演练 savepoint 恢复;保持序列化器/注册表一致;必要时做有监督迁移。
-
对象重用引发“值被篡改”
- 现象:下游读取到的对象与期望不符
- 方案:关闭对象重用或严格 copy/不可变。
十、最佳实践清单(Checklist)
- 并行度、最大并行度来自外部配置(
ParameterTool/环境变量),不要硬编码 - 默认保持
ClosureCleanerLevel.RECURSIVE - 热路径类型显式注册,并有恢复演练
- 仅在充分验证后开启对象重用
- 使用重启策略而非弃用的重试 API
-
GlobalJobParameters统一注入/读取,UDF 不直接依赖系统属性 - 对“涉及序列化/状态”的变更,有回滚/演练与基准压测
