Flink 滑动窗口实战:从 KeyedProcessFunction 到 AggregateFunction WindowFunction 的完整旅程
一、业务背景
我们要在 Flink 实时流上统计 每个用户-品牌组合最近 1 小时的最晚行为时间,并且每 5 分钟更新一次结果。
数据来自 Kafka,事件类型为 CartEvent
:
public class CartEvent {public String userId;public String brandId;public long ts; // 事件时间戳
}
二、三种实现模式对比速查表
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
KeyedProcessFunction | 需要完全自己管理定时器、状态 | 最灵活,可定制任何逻辑 | 代码量大,易出错 |
AggregateFunction + WindowFunction(老 API) | 预聚合后再做最终加工 | 性能好,API 简单 | 只能拿到预聚合结果 |
Reduce/Aggregate + ProcessWindowFunction(新 API) | 预聚合后还能拿到窗口元数据 | 兼具性能与信息 | 相对复杂 |
三、KeyedProcessFunction 版本(最底层)
3.1 核心思想
- 用
MapState<String, Long>
保存每个用户-品牌的最晚时间。 - 注册事件时间定时器,1 小时后触发输出。
- 每来一条数据就更新状态并重新注册定时器(滑动要靠我们自己做)。
3.2 代码骨架
public class MaxTimeProcessFuncextends KeyedProcessFunction<String, CartEvent, Tuple3<String,String,Long>> {// 状态:userId_brandId -> maxTsprivate MapState<String, Long> maxTsState;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Long> desc =new MapStateDescriptor<>("maxTs", String.class, Long.class);maxTsState = getRuntimeContext().getMapState(desc);}@Overridepublic void processElement(CartEvent value,Context ctx,Collector<Tuple3<String,String,Long>> out) throws Exception {String key = value.userId + "\001" + value.brandId;Long old = maxTsState.get(key);if (old == null || value.ts > old) {maxTsState.put(key, value.ts);}// 每 5 min 滑动 -> 注册“窗口结束时间”定时器long windowEnd = ctx.timestamp() - ctx.timestamp() % 300_000L + 300_000L;ctx.timerService().registerEventTimeTimer(windowEnd);}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple3<String,String,Long>> out) throws Exception {for (Map.Entry<String, Long> e : maxTsState.entries()) {String[] arr = e.getKey().split("\001");out.collect(Tuple3.of(arr[0], arr[1], e.getValue()));}// 可选:清理 1h 之前的状态}
}
缺点:得自己算窗口边界、清理过期状态,容易踩坑。
四、滑动窗口 + AggregateFunction(推荐)
4.1 需求映射
- 窗口长度 = 1 小时
- 滑动间隔 = 5 分钟
4.2 AggregateFunction 签名解析
public class MaxTimeAggimplements AggregateFunction<CartEvent, Long, Long> {@Overridepublic Long createAccumulator() {return Long.MIN_VALUE; // 初始极小值}@Overridepublic Long add(CartEvent value, Long acc) {return Math.max(acc, value.ts); // 每条数据进来取最大}@Overridepublic Long getResult(Long acc) {return acc; // 窗口触发时返回最大时间}@Overridepublic Long merge(Long acc1, Long acc2) {return Math.max(acc1, acc2); // 用于会话窗口或多线程合并}
}
泛型 | 含义 |
---|---|
CartEvent | IN:输入事件 |
Long | ACC:累加器,只保存当前最大值 |
Long | OUT:最终输出 |
注意:AggregateFunction 只能拿到 窗口内聚合后的结果,拿不到窗口元数据(起止时间、key 等),所以需要再配合 WindowFunction。
五、AggregateFunction + WindowFunction 组合
5.1 主流程
DataStream<CartEvent> source = env.addSource(...);source.keyBy(e -> e.userId + "\001" + e.brandId).window(SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(5))).aggregate(new MaxTimeAgg(), new WindowEndFunc()).print();
5.2 WindowFunction 把窗口信息补回来
public class WindowEndFuncimplements WindowFunction<Long, WindowResult, String, TimeWindow> {@Overridepublic void apply(String key,TimeWindow window,Iterable<Long> maxTsIt,Collector<WindowResult> out) {Long maxTs = maxTsIt.iterator().next(); // 只有一条String[] arr = key.split("\001");out.collect(new WindowResult(arr[0], arr[1], maxTs,"brand", "T"));}
}
5.3 WindowResult POJO
public class WindowResult {public String userId;public String value; // brandIdpublic long time; // 最晚行为时间public String dimType; // 维度类型public String list; // 附加字段public WindowResult() {}public WindowResult(String userId, String value,long time, String dimType, String list) {this.userId = userId;this.value = value;this.time = time;this.dimType = dimType;this.list = list;}@Overridepublic String toString() {return "WindowResult{" +"userId='" + userId + '\'' +", value='" + value + '\'' +", time=" + time +", dimType='" + dimType + '\'' +", list='" + list + '\'' +'}';}
}
六、时间与输出节奏
现象 | 解释 |
---|---|
作业启动后 1 小时内 看不到任何结果 | 必须等第一条窗口完整闭合(1 小时)。 |
之后 每 5 分钟 都会输出一次 | 滑动窗口每滑动一格(5 min)触发一次。 |
七、调优 & 踩坑小结
-
状态 TTL
滑动窗口会产生大量过期窗口,建议设置StateTtlConfig
清理 1.5 小时前的状态。 -
并行度
如果 key 量大,可以把userId_brandId
做二次分区,避免热点 key。 -
迟到数据
允许迟到 1 分钟:allowedLateness(Time.minutes(1))
。 -
空闲数据源
使用WatermarkStrategy.withIdleness()
防止空闲分区不触发窗口。
八、一句话总结
- 想最省事 —— 直接
aggregate(new MaxTimeAgg(), new WindowEndFunc())
- 想最灵活 —— 用
KeyedProcessFunction
自己管定时器和状态 - 想兼顾性能与信息 —— 用
AggregateFunction + ProcessWindowFunction
(新 API)
至此,我们已经把 Flink 滑动窗口的三种写法、时间语义、状态管理和调优思路全部串了一遍。祝实战愉快!