当前位置: 首页 > news >正文

Flink 滑动窗口实战:从 KeyedProcessFunction 到 AggregateFunction WindowFunction 的完整旅程

一、业务背景

我们要在 Flink 实时流上统计 每个用户-品牌组合最近 1 小时的最晚行为时间,并且每 5 分钟更新一次结果。
数据来自 Kafka,事件类型为 CartEvent

public class CartEvent {public String userId;public String brandId;public long   ts;      // 事件时间戳
}

二、三种实现模式对比速查表

方案适用场景优点缺点
KeyedProcessFunction需要完全自己管理定时器、状态最灵活,可定制任何逻辑代码量大,易出错
AggregateFunction + WindowFunction(老 API)预聚合后再做最终加工性能好,API 简单只能拿到预聚合结果
Reduce/Aggregate + ProcessWindowFunction(新 API)预聚合后还能拿到窗口元数据兼具性能与信息相对复杂

三、KeyedProcessFunction 版本(最底层)

3.1 核心思想

  • MapState<String, Long> 保存每个用户-品牌的最晚时间。
  • 注册事件时间定时器,1 小时后触发输出。
  • 每来一条数据就更新状态并重新注册定时器(滑动要靠我们自己做)。

3.2 代码骨架

public class MaxTimeProcessFuncextends KeyedProcessFunction<String, CartEvent, Tuple3<String,String,Long>> {// 状态:userId_brandId -> maxTsprivate MapState<String, Long> maxTsState;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Long> desc =new MapStateDescriptor<>("maxTs", String.class, Long.class);maxTsState = getRuntimeContext().getMapState(desc);}@Overridepublic void processElement(CartEvent value,Context ctx,Collector<Tuple3<String,String,Long>> out) throws Exception {String key = value.userId + "\001" + value.brandId;Long old = maxTsState.get(key);if (old == null || value.ts > old) {maxTsState.put(key, value.ts);}// 每 5 min 滑动 -> 注册“窗口结束时间”定时器long windowEnd = ctx.timestamp() - ctx.timestamp() % 300_000L + 300_000L;ctx.timerService().registerEventTimeTimer(windowEnd);}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple3<String,String,Long>> out) throws Exception {for (Map.Entry<String, Long> e : maxTsState.entries()) {String[] arr = e.getKey().split("\001");out.collect(Tuple3.of(arr[0], arr[1], e.getValue()));}// 可选:清理 1h 之前的状态}
}

缺点:得自己算窗口边界、清理过期状态,容易踩坑。


四、滑动窗口 + AggregateFunction(推荐)

4.1 需求映射

  • 窗口长度 = 1 小时
  • 滑动间隔 = 5 分钟

4.2 AggregateFunction 签名解析

public class MaxTimeAggimplements AggregateFunction<CartEvent, Long, Long> {@Overridepublic Long createAccumulator() {return Long.MIN_VALUE;         // 初始极小值}@Overridepublic Long add(CartEvent value, Long acc) {return Math.max(acc, value.ts); // 每条数据进来取最大}@Overridepublic Long getResult(Long acc) {return acc;                    // 窗口触发时返回最大时间}@Overridepublic Long merge(Long acc1, Long acc2) {return Math.max(acc1, acc2);   // 用于会话窗口或多线程合并}
}
泛型含义
CartEventIN:输入事件
LongACC:累加器,只保存当前最大值
LongOUT:最终输出

注意:AggregateFunction 只能拿到 窗口内聚合后的结果拿不到窗口元数据(起止时间、key 等),所以需要再配合 WindowFunction。


五、AggregateFunction + WindowFunction 组合

5.1 主流程

DataStream<CartEvent> source = env.addSource(...);source.keyBy(e -> e.userId + "\001" + e.brandId).window(SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(5))).aggregate(new MaxTimeAgg(), new WindowEndFunc()).print();

5.2 WindowFunction 把窗口信息补回来

public class WindowEndFuncimplements WindowFunction<Long, WindowResult, String, TimeWindow> {@Overridepublic void apply(String key,TimeWindow window,Iterable<Long> maxTsIt,Collector<WindowResult> out) {Long maxTs = maxTsIt.iterator().next();  // 只有一条String[] arr = key.split("\001");out.collect(new WindowResult(arr[0], arr[1], maxTs,"brand", "T"));}
}

5.3 WindowResult POJO

public class WindowResult {public String userId;public String value;   // brandIdpublic long   time;    // 最晚行为时间public String dimType; // 维度类型public String list;    // 附加字段public WindowResult() {}public WindowResult(String userId, String value,long time, String dimType, String list) {this.userId  = userId;this.value   = value;this.time    = time;this.dimType = dimType;this.list    = list;}@Overridepublic String toString() {return "WindowResult{" +"userId='" + userId + '\'' +", value='" + value + '\'' +", time=" + time +", dimType='" + dimType + '\'' +", list='" + list + '\'' +'}';}
}

六、时间与输出节奏

现象解释
作业启动后 1 小时内 看不到任何结果必须等第一条窗口完整闭合(1 小时)。
之后 每 5 分钟 都会输出一次滑动窗口每滑动一格(5 min)触发一次。

七、调优 & 踩坑小结

  1. 状态 TTL
    滑动窗口会产生大量过期窗口,建议设置 StateTtlConfig 清理 1.5 小时前的状态。

  2. 并行度
    如果 key 量大,可以把 userId_brandId 做二次分区,避免热点 key。

  3. 迟到数据
    允许迟到 1 分钟:allowedLateness(Time.minutes(1))

  4. 空闲数据源
    使用 WatermarkStrategy.withIdleness() 防止空闲分区不触发窗口。


八、一句话总结

  • 想最省事 —— 直接 aggregate(new MaxTimeAgg(), new WindowEndFunc())
  • 想最灵活 —— 用 KeyedProcessFunction 自己管定时器和状态
  • 想兼顾性能与信息 —— 用 AggregateFunction + ProcessWindowFunction(新 API)

至此,我们已经把 Flink 滑动窗口的三种写法、时间语义、状态管理和调优思路全部串了一遍。祝实战愉快!

http://www.dtcms.com/a/350463.html

相关文章:

  • vi/vim 查找字符串
  • h5和微信小程序查看pdf文件
  • 实验1 第一个微信小程序
  • Linux学习-TCP网络协议(补充)
  • 贝叶斯方法和朴素贝叶斯算法
  • tcpdump学习
  • 20250825的学习笔记
  • 2025年09月计算机二级Java选择题每日一练——第七期
  • 配置单区域 OSPF
  • 集群与集群概念
  • 自动修改excel 自动统计文件名称插入 excel辅助工具
  • 升级openssh后ORACLE RAC EM 安装失败处理
  • 【机器学习入门】1.2 初识机器学习:从数据到智能的认知之旅
  • C#_性能优化高级话题
  • MySQL数据备份与恢复全攻略
  • 10-应用调试与性能优化
  • 嵌入式与机器视觉的交叉点:构建智能化设备的实时视频通路
  • [pilot智驾系统] 控制守护进程(controlsd) | 纵向横向 | 比例-积分-微分(PID)
  • AR技术赋能农业机械智能运维
  • imx586手册和相机寄存器部分解读
  • 钉钉推出下一代AI办公应用形态:钉钉ONE
  • 智谱多模态系列:GLM-4.5V 环境配置与本地部署
  • java全局处理Date和LocalDateTime,统一响应固定格式
  • 无刷电机控制 - STM32F405+CubeMX+HAL库+SimpleFOC08,速度闭环控制(有电流环)
  • xm-select多选组件在layer.open中使用、获取、复现
  • 交叉导轨在医疗设备领域中的应用
  • 5G与6G技术演进与创新对比分析
  • 在线旅游及旅行管理系统项目SQL注入
  • 力扣(用队列实现栈)
  • STL——vector的使用(快速入门详细)