Flink DataStream「全分区窗口处理」mapPartition / sortPartition / aggregate / reduce
1. 这项能力究竟解决什么问题?
- 按并行子任务做“批式”处理:每个 subtask 自己的“全量数据”统一交由一次函数处理,适合末端汇总、一次性排序、一次性聚合等。
- 无需额外 keyBy:既支持 keyed DataStream,也支持 non-keyed DataStream 直接转成
PartitionWindowedStream
。 - API 简洁:4 个 API 覆盖常见形态:自定义批处理(
mapPartition
)、局部排序(sortPartition
)、增量聚合(aggregate
)、归并规约(reduce
)。
注意:
PartitionWindowedStream
的“完整窗口”在输入结束时触发(bounded/batch 最直观;unbounded 流通常意味着直到作业停止才会触发)。因此**更适合有界流(批处理)**或“明确的结束信号/分区闭合”场景。
2. 快速上手:把 DataStream 变成 PartitionWindowedStream
DataStream<T> ds = ...;// 关键一步:转成“按 subtask 收集全量数据”的窗口流
PartitionWindowedStream<T> pws = ds.fullWindowPartition();
拿到 PartitionWindowedStream<T>
后,就可以调用下述四类 API。
3. MapPartition:每个 subtask 的“全量批处理”
场景:在某个 subtask 内,把它收到的全部记录一次性交给 MapPartitionFunction
,例如本地求和、本地写文件、本地训练一个小模型等。
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.util.Collector;DataStream<Integer> input = ...;
PartitionWindowedStream<Integer> pws = input.fullWindowPartition();DataStream<Integer> sumPerSubtask = pws.mapPartition(new MapPartitionFunction<Integer, Integer>() {@Overridepublic void mapPartition(Iterable<Integer> values, Collector<Integer> out) {// values:该 subtask 收到的“所有记录”int sum = 0;for (Integer v : values) {sum += v;}// 输出本 subtask 的汇总结果out.collect(sum);}}
);
要点
MapPartitionFunction
只调用一次(在输入结束时),因此更像“每分区末尾的批处理钩子”。- 注意内存占用:如果分区数据量巨大,遍历时要小心 OOM(可边遍历边输出中间结果,或结合外部存储)。
4. SortPartition:每个 subtask 内局部排序
场景:在每个 subtask 内,对“该分区的所有记录”按某字段/比较器整体排序。这常作为两阶段排序的第一步(先局部排,再全局归并)。
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.tuple.Tuple2;DataStream<Tuple2<Integer, Integer>> input = ...;
PartitionWindowedStream<Tuple2<Integer, Integer>> pws = input.fullWindowPartition();// 以二元组的 f0(第一个字段)升序排序,得到局部有序的分区结果
DataStream<Tuple2<Integer, Integer>> sortedPerSubtask =pws.sortPartition(0, Order.ASCENDING);
要点
sortPartition
发生在分区内的全量数据上,不会跨分区打乱。- 常见用法:分区内排序 → 下游再做归并(自定义 CoGroup/合并器或使用 Batch 算子)。
- 大数据量排序需关注溢写/外部排序与内存参数(如网络缓冲、排序内存)。
5. Aggregate:分区内增量聚合(支持累加器)
aggregate
通过 AggregateFunction<IN, ACC, OUT>
做增量聚合:
ACC
是累加器;add
对每条记录累加;getResult
给出最终结果;merge
负责累加器合并(分布式汇总场景需要)。
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;DataStream<Tuple2<String, Integer>> input = ...; // (category, value)
PartitionWindowedStream<Tuple2<String, Integer>> pws = input.fullWindowPartition();DataStream<Integer> aggPerSubtask = pws.aggregate(new AggregateFunction<Tuple2<String, Integer>, Integer, Integer>() {// 初始化累加器(例如 sum=0)@Overridepublic Integer createAccumulator() {return 0;}// 每条记录如何累加@Overridepublic Integer add(Tuple2<String, Integer> value, Integer acc) {return acc + value.f1;}// 输出最终结果@Overridepublic Integer getResult(Integer acc) {return acc;}// 两个累加器如何合并(一般用于并行归并)@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}}
);
要点
- 与
mapPartition
相比,aggregate
更偏“在线累加”,遍历时就滚动聚合,峰值内存可更低。 - 如果你只需要“加总/最大/最小”,优先
aggregate
;复杂批处理用mapPartition
。
6. Reduce:分区内归并规约
reduce
适合可结合的二元规约(如 sum/max/min/拼接等),语义直观:
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;DataStream<Tuple2<String, Integer>> input = ...; // (k, v)
PartitionWindowedStream<Tuple2<String, Integer>> pws = input.fullWindowPartition();DataStream<Tuple2<String, Integer>> reducedPerSubtask = pws.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> a,Tuple2<String, Integer> b) {// 示例:v 字段相加;k 取其一(真实业务请确保语义合理)return Tuple2.of(a.f0, a.f1 + b.f1);}}
);
要点
reduce
比aggregate
更简洁,但没有累加器模型;- 适用于“二元可结合”运算;复杂场景、需要保留额外中间信息时用
aggregate
。
7. 典型用法模式
7.1 两阶段排序(本地排序 + 全局归并)
fullWindowPartition().sortPartition(...)
做每分区局部排序;- 下游使用自定义合并器/批算子做跨分区归并,得到全局有序输出。
适合批处理或有界流。
7.2 分区内统计/聚合
aggregate
/reduce
做本地聚合,产出每分区的聚合摘要;- 下游再做二次聚合(多级汇总),减少数据量与网络开销。
7.3 分区末尾清理/压缩
mapPartition
在输入结束时触发:可以做“分区最终落盘、索引构建、一次性清理”等。
8. 实战注意事项与最佳实践
-
数据有界性:
PartitionWindowedStream
的“完整窗口”在输入结束时触发;无界流意味着很晚甚至永不触发。- 推荐用于 Batch/Bounded 或者能感知“结束信号”的流(例如自定义分区关闭)。
-
内存与状态:
mapPartition
/sortPartition
需要“遍历分区全量数据”,数据量大时需防 OOM;考虑边处理边输出,或借助外部存储/溢写策略。aggregate
/reduce
更省内存(在线累加),优先考虑。
-
并行与倾斜:
- 全分区处理是“每个 subtask 自己的全量”,并行度越高,单分区负载越小。
- 但数据倾斜可能导致个别分区非常大,需通过上游重分区/rebalance 缓解。
-
端到端语义:
- 这些 API 不会跨分区做全局操作(除非你在下游显式归并);
- 若需要全局排序/全局 TopN,请在局部阶段后增加全局阶段。
-
与常规窗口的区别:
- 常规窗口(滚动/滑动/会话)按时间或事件模式滚动;
- Full Partition Window 按分区生命周期定义,“一次性”处理分区全量。
9. 该用哪个 API?
诉求 | 首选 API | 说明 |
---|---|---|
自定义“批式”处理(一次性跑一段逻辑) | mapPartition | 最灵活,适合一次性写盘、构建索引、复杂批处理 |
分区内整体排序 | sortPartition | 常用于两阶段排序的“第一阶段” |
可累加统计(有中间态) | aggregate | 在线累加,内存友好 |
简洁的二元规约(可结合) | reduce | 代码最短,但不如 aggregate 灵活 |
10. 小结
PartitionWindowedStream
让 DataStream 直接具备“每个 subtask 的全量处理”能力,补齐了常规时间窗口难以覆盖的诉求。合理选择 mapPartition / sortPartition / aggregate / reduce
,可以在批流一体的架构中写出更贴近业务语义、同时更高效的处理逻辑。