Flink 执行模式在 STREAMING 与 BATCH 之间做出正确选择
一、两种执行模式的定位
- STREAMING(默认):经典流式模式,面向无界作业;持续增量处理、低延迟、长期在线。
- BATCH:批风格模式,面向有界作业;输入固定、一次性计算、最终产出结果。
统一的流批模型保证:在有界输入上,两种模式的“最终结果”一致。区别在于过程:
STREAMING 会不断输出增量更新;BATCH 只在结束时输出一次最终结果。
二、如何选择执行模式
应使用 BATCH 的情况(输入有界更高效)
- 历史回放、离线 ETL、定时报表、固定窗口聚合(有终点)。
- 资源有限或希望阶段化执行、减少峰值资源占用。
必须使用 STREAMING 的情况(输入无界)
- 在线写入、持续监控、告警、实时风控、永不停止的计算。
特殊情形
- 状态引导:先用 STREAMING 跑有界作业产出 savepoint,再在无界作业中恢复。
- 测试:写针对无界逻辑的测试时,临时用有界 Source 跑 STREAMING 也很自然。
三、配置方式(推荐在提交时配置,而非写死在代码)
1)命令行
bin/flink run -Dexecution.runtime-mode=BATCH your-job.jar
# 其他可选:STREAMING、AUTOMATIC
2)代码(不推荐固定在程序里)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
建议:保持应用本身“模式无关”,提交时用参数切换,便于同一份代码适配不同场景。
四、执行行为差异一览(工程师视角)
1)调度与网络 Shuffle
-
STREAMING
- 所有 Task 全程在线,端到端流水处理。
- Shuffle pipelined:记录到达即向下游传输(网络层缓冲少量数据)。
- 适合持续低延迟。
-
BATCH
-
将作业切分为多个阶段,逐阶段执行(阶段之间常有 Shuffle 边界)。
-
中间结果物化到非易失存储;下游可在上游离线后读取。
-
好处:
- 故障恢复可回溯到最近的中间结果,无需全作业重启;
- 更省资源:可用更少 slot 顺序执行。
-
代价:端到端延迟更高(批处理特征)。
-
2)状态与 StateBackend
-
STREAMING:按配置的 StateBackend 管理状态与 checkpoint。
-
BATCH:忽略 StateBackend;对 keyed 操作,会对输入按 key 排序分组,逐 key 处理:
- 仅保留当前 key 的状态;
- 切换 key 时丢弃前一 key 状态。
- 优点:内存占用更小,更适合有界批。
3)处理顺序(UDF/算子感知)
-
STREAMING:不保证顺序,来一条处理一条。
-
BATCH:在某些操作上可保证一定顺序(取决于调度/Shuffle/状态实现)。
- 多输入类型顺序:先 broadcast → 再 regular → 后 keyed;
- 多个 keyed 输入:先处理完某个 key 的所有输入,再到下一个 key。
4)事件时间与 Watermark
-
STREAMING:乱序是常态,靠 Watermark(T) 断言“不会再有
< T
的事件”。 -
BATCH:可视为拥有“完美水位线”:
- 每个 key 的输入末尾(或非 keyed 流的全局末尾)发一个
MAX_WATERMARK
即可。 - 忽略用户自定义
WatermarkGenerator
;但TimestampAssigner
仍会生效。
- 每个 key 的输入末尾(或非 keyed 流的全局末尾)发一个
5)处理时间(Processing Time)
- STREAMING:常与事件时间相关(实时摄入),可用于提前触发近似结果。
- BATCH:允许获取处理时间与注册处理时间定时器,但所有处理时间定时器在输入末尾统一触发(可理解为整个执行过程中“处理时间不前进”)。
6)故障恢复
- STREAMING:基于 Checkpoint;失败时通常从 checkpoint 重启全作业。
- BATCH:优先回溯到可用中间结果,仅重启失败任务及其前驱,恢复更轻量。
五、行为变化与不支持项(BATCH 模式)
-
行为变化
-
reduce()
/sum()
这类滚动聚合:- STREAMING:每条增量输出;
- BATCH:只输出最终结果(不滚动)。
-
-
不支持
- Checkpointing 及其依赖能力(如
CheckpointListener
、Kafka EXACTLY_ONCE、FileSink 的OnCheckpointRollingPolicy
)。 - 解决方案:若需在 BATCH 下的事务语义,使用 Unified Sink API(FLIP-143) 的 Sink。
- Checkpointing 及其依赖能力(如
六、开发者清单:常见坑 & 最佳实践
(一)模式选择与提交
- ✅ 有界 →
-Dexecution.runtime-mode=BATCH
;无界 → STREAMING。 - ✅ 应用代码不写死运行模式;利用命令行切换。
- ⚠️ 自动模式(
AUTOMATIC
)依赖 Source 的有界性判断,混合场景要谨慎。
(二)时间与水位线
- ✅ BATCH 仍需
WatermarkStrategy
以赋时间戳(TimestampAssigner
); - ⚠️ 自定义
WatermarkGenerator
在 BATCH 被忽略; - ⚠️ 自定义算子里不要缓存“最后水位线”:BATCH 下按 key 处理会出现 MAX → MIN 跳变。
(三)状态与顺序
- ✅ STREAMING 使用 StateBackend;BATCH 忽略之、按 key 分组排序。
- ✅ 多输入 UDF 的顺序规则要清楚(broadcast→regular→keyed;keyed 间按 key 分批)。
- ⚠️ 自定义算子不要在内部手动改变 key。
(四)Sink 语义
- ⚠️ 需要 exactly-once 的文件输出 → 用
FileSink
(STREAMING); - ⚠️ BATCH 下的事务需求 → 选用 Unified Sink API 的实现,而非依赖 checkpoint。
(五)资源与性能
- ✅ BATCH 适合有限资源下的离线作业(阶段化、顺序执行);
- ✅ STREAMING 追求端到端低延迟;
- ✅ 小流量场景适当降低
bufferTimeout
,避免长时间滞留。
七、从 STREAMING 迁移到 BATCH 的思路
- 确认有界性:所有 Source 是否可界定数据边界?
- 改写触发逻辑:把依赖滚动输出的逻辑调整为最终输出的聚合写法。
- 检查时间语义:保留
TimestampAssigner
,删除/忽略自定义水位线。 - 替换 Sink:涉及 checkpoint 的事务写出,迁移到 Unified Sink API。
- 压测与恢复:验证失败恢复是否能阶段回溯、资源是否显著下降。
八、两个最常用的配置片段
命令行(推荐)
bin/flink run -Dexecution.runtime-mode=BATCH your-job.jar
Java(仅在确实需要时)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// ... build pipeline
env.execute("my-batch-job");
九、结语
- 有界 → BATCH 更高效;无界 → STREAMING 不可替代。
- BATCH 带来阶段化执行、中间结果物化与轻量恢复,非常适合离线与历史回放;
- STREAMING 则提供端到端低延迟与持续增量更新,是实时系统的核心基石。
- 保持应用“模式无关”,用提交参数切换,是工程上的最佳实践。
把执行模式选对,并理解它们在调度/Shuffle、状态、顺序、时间、恢复上的差异,你的 Flink 作业会更稳、更快,也更易于在不同业务形态间复用。