Flink 并行度与最大并行度从 0 到弹性扩缩容
1. 为什么要关心并行度?
- 吞吐:更高的并行度意味着更多并行实例同时处理数据,整体吞吐更高。
- 延迟:恰当的并行度能缩短排队时间,降低端到端延迟。
- 成本:并行度直接对应到 TaskSlot/CPU/内存消耗,设置过大或过小都会浪费资源或成为瓶颈。
- 弹性:配合 Savepoint,可以在不丢状态的前提下在线扩缩容(更改并行度)。
2. 核心概念速览
- 并行度(parallelism):某个任务(算子/源/汇)的并行实例数。
- 最大并行度(max parallelism):并行度的上限,与 键组(key-groups) 数量直接相关;Flink 会将 Keyed State 分片到 key-groups 中,以支持可伸缩的状态重分配。
- 默认最大并行度:约为
operatorParallelism + (operatorParallelism / 2),
但不小于 128,不大于 32768。 - 重要警告:从原始作业恢复时显式修改最大并行度会导致状态不兼容。扩缩容通常只改并行度,不改最大并行度。
3. 四个层级设置并行度(越具体优先生效)
| 层级 | 作用范围 | 设置方式 | 备注 |
|---|---|---|---|
| 算子级 | 单个 Operator/Source/Sink | .setParallelism(n) | 覆盖所有更上层默认值 |
| 执行环境级 | 当前作业中所有 Operator/Source/Sink 的默认并行度 | env.setParallelism(n) | 被算子级覆盖 |
| 客户端级 | 提交作业时统一指定默认并行度 | CLI -p n 或客户端 API | 被算子/环境覆盖 |
| 系统级 | 集群范围的默认并行度 | 配置 parallelism.default | 被上面各层覆盖 |
4. 代码与命令一把梭
4.1 算子级并行度(Java)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = /* ... */;
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))).sum(1).setParallelism(5); // 只给当前算子设并行度wordCounts.print();
env.execute("Word Count Example");
4.2 执行环境级默认并行度(Java)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); // 本作业默认并行度DataStream<String> text = /* ... */;
DataStream<Tuple2<String, Integer>> wordCounts = /* ... */;
wordCounts.print();env.execute("Word Count Example");
4.3 客户端级(CLI)
# 以并行度 10 提交
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
4.4 客户端级(Java 客户端程序)
try {PackagedProgram program = new PackagedProgram(file, args);InetSocketAddress addr = RemoteExecutor.getInetFromHostport("localhost:6123");Configuration config = new Configuration();Client client = new Client(addr, config, program.getUserCodeClassLoader());// 提交时设默认并行度为 10client.run(program, 10, true);
} catch (ProgramInvocationException e) {e.printStackTrace();
}
4.5 系统级(配置文件)
# flink-conf.yaml
parallelism.default: 4
5. 最大并行度(Max Parallelism)怎么设?
5.1 设置位置
- 与设置并行度相同的地方(除客户端级与系统级):
使用.setMaxParallelism(x)为算子设定最大并行度。
示例(Java):
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0)// ....sum(1).setMaxParallelism(256) // 最大并行度.setParallelism(32); // 运行并行度(可在 1~256 内调整)
5.2 默认规则与性能注意
- 默认:约等于
parallelism + parallelism/2,但下限 128,上限 32768。 - 太大有性能风险:状态后端为每个 key-group 维护内部结构,数量越多内存/CPU 开销越大。
- 不要在从原作业恢复时修改
maxParallelism,否则会触发状态不兼容。
实战建议:
- 上线前就想清楚未来可能的最大并行度,合理设置
setMaxParallelism。- 保持
parallelism ≤ maxParallelism;扩缩容时只改parallelism即可。- 如果不确定,先取一个适度的上限(例如
128或256),避免盲目取极大值。
6. 搭配 Savepoint 做“无损”扩缩容
6.1 标准流程
-
触发保存点并停止作业(不同部署模式命令略有差异):
./bin/flink stop -s file:///savepoints/jobA <jobId> -
以新的并行度从保存点启动(不修改
maxParallelism):./bin/flink run -p 32 \-s file:///savepoints/jobA/savepoint-xxx \path/to/your.jar --your.args ...
6.2 常见陷阱
maxParallelism改了:恢复时报 state incompatibility。- 新并行度大于最大并行度:直接报错或无法恢复。
- 不同版本或不同拓扑变更引入状态不兼容:需确保 UID 不变、状态 schema 未破坏。
7. 典型场景与实践
场景 A:首发上线
- 估算业务峰值吞吐与资源预算,设定
env.setParallelism(n)。 - 预留扩容空间:对核心 Keyed 算子设置
setMaxParallelism(128/256)。 - 接入 Checkpoint 与 Savepoint 流程,演练一次恢复。
场景 B:业务增长,扩容
- 先做一次 stop-with-savepoint。
- 用更大的
-p重新提交(不动maxParallelism)。 - 观察背压与延迟;必要时只对热点算子提升并行度。
场景 C:成本优化,缩容
- 方法同上,降低
-p即可;确保仍满足 SLO。 - 观察处理延迟与 GC 指标,避免过度缩容导致拥塞。
8. 故障排查对照表
| 现象 / 报错 | 可能原因 | 处理思路 |
|---|---|---|
| 恢复时报 state incompatibility | 改了 maxParallelism 或破坏了状态 schema/UID | 恢复到旧的 maxParallelism;保持 UID 与状态结构不变 |
| “Parallelism exceeds max parallelism” | 新并行度 > maxParallelism | 降低并行度或提高 maxParallelism(非恢复场景下改) |
| 扩容后内存暴涨/延迟升高 | maxParallelism 过大导致 key-group 太多;状态后端开销变大 | 合理下调 maxParallelism(仅限全新任务,非恢复场景);核对 RocksDB/HashMap 状态配置 |
| 执行环境设了并行度却不生效 | 算子级覆盖了环境级;或客户端 -p 覆盖了系统级 | 查找具体算子 .setParallelism();核对提交参数 |
9. 速查清单(Cheat Sheet)
- 首选原则:越具体越优先(算子级 > 执行环境级 > 客户端级 > 系统级)。
- 扩缩容:只改
parallelism,不要在恢复时改maxParallelism。 - 默认最大并行度:约
p + p/2,[128, 32768] 之间。 - 设
maxParallelism要克制:既要可扩展,又要控制状态后端开销。 - 上线前做一次 savepoint 恢复演练,确保流程可靠。
10. 可复用模板
(A)最小可用骨架
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8); // 作业默认并行度DataStream<String> text = /* source */;
SingleOutputStreamOperator<Tuple2<String, Integer>> agg = text.flatMap(new LineSplitter()).keyBy(v -> v.f0).sum(1).setMaxParallelism(256) // 预留扩容上限.setParallelism(16); // 当前并行度agg.print();
env.execute("MyJob");
(B)CLI 扩容示例
# 1) 停止并保存点
./bin/flink stop -s file:///savepoints/myjob <jobId># 2) 用更高并行度从保存点恢复(maxParallelism 不变)
./bin/flink run -p 32 -s file:///savepoints/myjob/savepoint-xxx path/to/myjob.jar
