在 Apache Flink 中,并行度(Parallelism) 是控制任务并发执行的核心参数之一。Flink 提供了 多个层级设置并行度的方式,优先级从高到低如下:
🧩 一、Flink 并行度的四个设置层级
层级 | 描述 | 设置方式 |
---|
Operator Level | 为某个具体的算子设置并行度 | operator.setParallelism(n) |
Execution Environment Level | 为整个流处理环境设置默认并行度 | env.setParallelism(n) |
Client Level(提交作业时) | 通过命令行指定全局并行度 | flink run -p n |
System Level(系统配置) | 在 flink-conf.yaml 中定义全局默认值 | parallelism.default: n |
✅ 二、各层级设置详解与示例
1. Operator Level(算子级别)
- 优先级最高
- 可以为特定算子设置不同并行度,适用于数据倾斜或资源敏感操作
🔧 示例:
DataStream<String> stream = env.fromElements("a", "b", "c");
stream.map(new MyMapFunction()).setParallelism(4).print();
✅ 适用场景:
- 某个算子计算密集,需要更多资源
- 数据源分区数较少,但后续算子可并行化处理
2. Execution Environment Level(执行环境级别)
- 设置整个 Job 的默认并行度
- 如果未对某些算子单独设置,并使用此值
🔧 示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); DataStream<String> stream = env.fromElements("a", "b", "c");
stream.map(new MyMapFunction()).print();
✅ 适用场景:
3. Client Level(客户端提交作业时)
- 使用命令行参数动态设置并行度
- 不修改代码即可适配不同运行环境(如测试/生产)
🔧 示例:
flink run -p 4 -c com.example.MyJob ./myjob.jar
✅ 适用场景:
4. System Level(系统级别)
- 在
flink-conf.yaml
中设置全局默认并行度 - 对所有提交的作业生效(除非被更高级别覆盖)
🔧 示例(flink-conf.yaml
):
parallelism.default: 4
✅ 适用场景:
📊 三、并行度优先级对比表
设置方式 | 是否推荐 | 场景 | 覆盖关系 |
---|
Operator Level | ✅✅✅ | 特定算子优化 | 最高优先级 |
Execution Environment Level | ✅✅ | 整体统一配置 | 被 Operator 覆盖 |
Client Level (-p) | ✅ | 动态部署 | 被前两者覆盖 |
System Level (flink-conf.yaml) | ⚠️ | 兜底默认值 | 最低优先级 |
💡 四、并行度设置建议
✅ 推荐做法:
- 开发/测试环境:使用
.setParallelism()
或 -p
命令行设置较小值(如1~4) - 生产环境:
- 使用
flink-conf.yaml
设置基础并行度 - 使用
env.setParallelism()
明确控制默认值 - 为关键算子单独设置更高并行度(如窗口聚合、复杂逻辑)
⚙️ 示例组合:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(8) .map(new MyMapFunction()) .keyBy(keySelector).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new MyProcessWindowFunction()) .print();
🧠 五、并行度与资源的关系
并行度 | TaskManager 数量 | Slot 数量 | 资源要求 |
---|
≤ TM × slot | ✅ 正常运行 | ✅ 正常运行 | 资源充足 |
> TM × slot | ❌ 无法启动 | ❌ 无法启动 | 资源不足 |
✅ 建议:确保总并行度 ≤ 总 slot 数量
📈 六、实际调优建议
场景 | 建议设置 |
---|
Kafka Source | 并行度 = Kafka Topic 分区数 |
Map / FlatMap | 根据 CPU 利用率设置 |
Keyed Window Aggregation | 可适当提高并行度提升吞吐 |
Join / CoGroup | 视数据分布决定是否提高并行度 |
Sink | 若写入慢可适当增加并行度 |
✅ 七、完整示例(Java + Shell)
Java 设置(Env + Operator):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);env.fromElements("a", "b", "c").map(x -> x).setParallelism(2) .print();env.execute("Parallelism Example");
Shell 设置(Client Level):
flink run -p 8 -c com.example.MyJob ./myjob.jar
✅ 八、总结
层级 | 用途 | 是否推荐使用 |
---|
Operator Level | 控制单个算子并行度 | ✅✅✅ 强烈推荐用于关键路径优化 |
Execution Environment Level | 设置默认并行度 | ✅✅ 推荐作为基础配置 |
Client Level | 动态设置并行度 | ✅ 适合多环境部署 |
System Level | 全局兜底配置 | ⚠️ 推荐配合其他方式使用 |