当前位置: 首页 > news >正文

Flink 并行度与最大并行度从 0 到弹性扩缩容

1. 为什么要关心并行度?

  • 吞吐:更高的并行度意味着更多并行实例同时处理数据,整体吞吐更高。
  • 延迟:恰当的并行度能缩短排队时间,降低端到端延迟。
  • 成本:并行度直接对应到 TaskSlot/CPU/内存消耗,设置过大或过小都会浪费资源或成为瓶颈。
  • 弹性:配合 Savepoint,可以在不丢状态的前提下在线扩缩容(更改并行度)。

2. 核心概念速览

  • 并行度(parallelism):某个任务(算子/源/汇)的并行实例数。
  • 最大并行度(max parallelism):并行度的上限,与 键组(key-groups) 数量直接相关;Flink 会将 Keyed State 分片到 key-groups 中,以支持可伸缩的状态重分配。
  • 默认最大并行度:约为
    operatorParallelism + (operatorParallelism / 2)
    不小于 128,不大于 32768
  • 重要警告从原始作业恢复时显式修改最大并行度会导致状态不兼容。扩缩容通常只改并行度,不改最大并行度。

3. 四个层级设置并行度(越具体优先生效)

层级作用范围设置方式备注
算子级单个 Operator/Source/Sink.setParallelism(n)覆盖所有更上层默认值
执行环境级当前作业中所有 Operator/Source/Sink 的默认并行度env.setParallelism(n)被算子级覆盖
客户端级提交作业时统一指定默认并行度CLI -p n 或客户端 API被算子/环境覆盖
系统级集群范围的默认并行度配置 parallelism.default被上面各层覆盖

4. 代码与命令一把梭

4.1 算子级并行度(Java)

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = /* ... */;
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Duration.ofSeconds(5))).sum(1).setParallelism(5); // 只给当前算子设并行度wordCounts.print();
env.execute("Word Count Example");

4.2 执行环境级默认并行度(Java)

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); // 本作业默认并行度DataStream<String> text = /* ... */;
DataStream<Tuple2<String, Integer>> wordCounts = /* ... */;
wordCounts.print();env.execute("Word Count Example");

4.3 客户端级(CLI)

# 以并行度 10 提交
./bin/flink run -p 10 ../examples/*WordCount-java*.jar

4.4 客户端级(Java 客户端程序)

try {PackagedProgram program = new PackagedProgram(file, args);InetSocketAddress addr = RemoteExecutor.getInetFromHostport("localhost:6123");Configuration config = new Configuration();Client client = new Client(addr, config, program.getUserCodeClassLoader());// 提交时设默认并行度为 10client.run(program, 10, true);
} catch (ProgramInvocationException e) {e.printStackTrace();
}

4.5 系统级(配置文件)

# flink-conf.yaml
parallelism.default: 4

5. 最大并行度(Max Parallelism)怎么设?

5.1 设置位置

  • 与设置并行度相同的地方(除客户端级与系统级):
    使用 .setMaxParallelism(x) 为算子设定最大并行度。

示例(Java):

DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new LineSplitter()).keyBy(value -> value.f0)// ....sum(1).setMaxParallelism(256)  // 最大并行度.setParallelism(32);     // 运行并行度(可在 1~256 内调整)

5.2 默认规则与性能注意

  • 默认:约等于 parallelism + parallelism/2,但下限 128,上限 32768
  • 太大有性能风险:状态后端为每个 key-group 维护内部结构,数量越多内存/CPU 开销越大。
  • 不要在从原作业恢复时修改 maxParallelism,否则会触发状态不兼容

实战建议:

  • 上线前就想清楚未来可能的最大并行度,合理设置 setMaxParallelism
  • 保持 parallelism ≤ maxParallelism;扩缩容时只改 parallelism 即可。
  • 如果不确定,先取一个适度的上限(例如 128256),避免盲目取极大值。

6. 搭配 Savepoint 做“无损”扩缩容

6.1 标准流程

  1. 触发保存点并停止作业(不同部署模式命令略有差异):

    ./bin/flink stop -s file:///savepoints/jobA <jobId>
    
  2. 以新的并行度从保存点启动不修改 maxParallelism):

    ./bin/flink run -p 32 \-s file:///savepoints/jobA/savepoint-xxx \path/to/your.jar --your.args ...
    

6.2 常见陷阱

  • maxParallelism 改了:恢复时报 state incompatibility
  • 新并行度大于最大并行度:直接报错或无法恢复。
  • 不同版本或不同拓扑变更引入状态不兼容:需确保 UID 不变、状态 schema 未破坏。

7. 典型场景与实践

场景 A:首发上线

  • 估算业务峰值吞吐与资源预算,设定 env.setParallelism(n)
  • 预留扩容空间:对核心 Keyed 算子设置 setMaxParallelism(128/256)
  • 接入 Checkpoint 与 Savepoint 流程,演练一次恢复。

场景 B:业务增长,扩容

  • 先做一次 stop-with-savepoint
  • 用更大的 -p 重新提交(不动 maxParallelism)。
  • 观察背压与延迟;必要时只对热点算子提升并行度。

场景 C:成本优化,缩容

  • 方法同上,降低 -p 即可;确保仍满足 SLO。
  • 观察处理延迟与 GC 指标,避免过度缩容导致拥塞。

8. 故障排查对照表

现象 / 报错可能原因处理思路
恢复时报 state incompatibility改了 maxParallelism 或破坏了状态 schema/UID恢复到旧的 maxParallelism;保持 UID 与状态结构不变
“Parallelism exceeds max parallelism”新并行度 > maxParallelism降低并行度或提高 maxParallelism非恢复场景下改)
扩容后内存暴涨/延迟升高maxParallelism 过大导致 key-group 太多;状态后端开销变大合理下调 maxParallelism(仅限全新任务,非恢复场景);核对 RocksDB/HashMap 状态配置
执行环境设了并行度却不生效算子级覆盖了环境级;或客户端 -p 覆盖了系统级查找具体算子 .setParallelism();核对提交参数

9. 速查清单(Cheat Sheet)

  • 首选原则:越具体越优先(算子级 > 执行环境级 > 客户端级 > 系统级)。
  • 扩缩容:只改 parallelism不要在恢复时改 maxParallelism
  • 默认最大并行度:约 p + p/2[128, 32768] 之间。
  • maxParallelism克制:既要可扩展,又要控制状态后端开销。
  • 上线前做一次 savepoint 恢复演练,确保流程可靠。

10. 可复用模板

(A)最小可用骨架

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8); // 作业默认并行度DataStream<String> text = /* source */;
SingleOutputStreamOperator<Tuple2<String, Integer>> agg = text.flatMap(new LineSplitter()).keyBy(v -> v.f0).sum(1).setMaxParallelism(256) // 预留扩容上限.setParallelism(16);    // 当前并行度agg.print();
env.execute("MyJob");

(B)CLI 扩容示例

# 1) 停止并保存点
./bin/flink stop -s file:///savepoints/myjob <jobId># 2) 用更高并行度从保存点恢复(maxParallelism 不变)
./bin/flink run -p 32 -s file:///savepoints/myjob/savepoint-xxx path/to/myjob.jar
http://www.dtcms.com/a/540079.html

相关文章:

  • STL list深度解析:从原理到手写实现
  • AI驱动数据分析革新:奥威BI一键生成智能报告
  • day20_权限控制
  • Flutter 状态管理详解:深入理解与使用 Bloc
  • Spring Boot 移除 Undertow 深度解析:技术背景、迁移方案与性能优化实践
  • c# stateless介绍
  • 烽火台网站网站优化要从哪些方面做
  • 建设一个网站需要多少钱网页版游戏在线玩2022
  • 基于Flask的穷游网酒店数据分析系统(源码+论文+部署+安装)
  • Linux系统--线程的同步与互斥
  • 智慧校园顶层设计与规划方案PPT(71页)
  • 滨州网站建设费用学校网站管理系统 php
  • Spring Boot3零基础教程,定制 Health 健康端点,笔记83
  • Linux 反向 Shell 分析
  • Go Web 编程快速入门 11 - WebSocket实时通信:实时消息推送和双向通信
  • 科研数据可视化工具:助力学术成果清晰呈现
  • 基于GIS的智慧畜牧数据可视化监控平台
  • 热力图可视化为何被广泛应用?| 图扑数字孪生
  • 个人简历网页html代码做网站优化最快的方式
  • Jenkins 已成过去式!新兴替代工具GitHub Actions即将崛起
  • 数组-环形数组【arr2】
  • 打开AI黑箱:SHAP让医疗AI决策更清晰的编程路径
  • 营销型商务网站wordpress html5 主题
  • 知识掘金者:API+Dify工作流,开启「深度思考」的搜索革命
  • 《道德经》第三十八章
  • 企业网站管理系统湖南岚鸿搜狗网站入口
  • 汕头网站推广制作怎么做济南源聚网络公司
  • webrtc代码走读(十)-QOS-Sender Side BWE原理
  • 102-Spring AI Alibaba RAG Pgvector 示例
  • 【刷机分享】解决K20Pro刷入PixelOS后“网络连接”受限问题(附详细ADB命令)