当前位置: 首页 > news >正文

Flink DatastreamAPI详解(二)

Tumbling Windows

1. 事件时间滚动窗口(Event-Time)

input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);

2. 处理时间滚动窗口(Processing-Time)

input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);

3. 带偏移量的事件时间窗口(带时区调整)

input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

核心概念:滚动窗口(Tumbling Window)

特点

  • 固定大小:窗口长度固定
  • 不重叠:每个数据只属于一个窗口
  • 首尾相接:窗口之间无间隙

窗口划分示意图

时间轴(5秒滚动窗口):
|----5s----|----5s----|----5s----|----5s----|
[0,  5)    [5,  10)   [10, 15)   [15, 20)窗口1      窗口2       窗口3      窗口4特点:
- 左闭右开区间:[start, end)
- 窗口不重叠
- 每条数据只属于一个窗口

三种窗口详解

1. TumblingEventTimeWindows(事件时间窗口)

特点
input.keyBy(value -> value.getUserId()).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("amount");

关键特性

  • 基于事件时间:使用数据本身的时间戳
  • 🎯 结果确定:重放数据结果相同
  • 📦 可处理乱序:配合Watermark处理延迟数据
  • ⚠️ 需要配置:需要分配时间戳和Watermark
完整示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 设置事件时间特性(Flink 1.12+默认就是EventTime)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 2. 定义数据结构
class Event {String userId;Double amount;Long timestamp;  // 事件时间戳public Event(String userId, Double amount, Long timestamp) {this.userId = userId;this.amount = amount;this.timestamp = timestamp;}
}DataStream<Event> events = env.fromElements(new Event("user1", 100.0, 1000L),  // 1秒new Event("user1", 200.0, 2000L),  // 2秒new Event("user1", 150.0, 6000L),  // 6秒(下一个窗口)new Event("user2", 300.0, 3000L),  // 3秒new Event("user2", 250.0, 4000L)   // 4秒
);// 3. 分配时间戳和Watermark
DataStream<Event> withTimestamps = events.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((event, timestamp) -> event.timestamp)
);// 4. 5秒事件时间滚动窗口
withTimestamps.keyBy(event -> event.userId).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("amount").print();/* 窗口划分:
窗口[0, 5000):user1: 100 + 200 = 300user2: 300 + 250 = 550窗口[5000, 10000):user1: 150输出:
user1: 300.0  (窗口[0,5000))
user2: 550.0  (窗口[0,5000))
user1: 150.0  (窗口[5000,10000))
*/env.execute("Event-Time Tumbling Window");
事件时间窗口触发机制
数据流(带事件时间):
timestamp=1000: Event1 → 进入窗口[0, 5000)
timestamp=2000: Event2 → 进入窗口[0, 5000)
timestamp=6000: Event3 → 进入窗口[5000, 10000)Watermark机制:
- Watermark(5000) 到达 → 触发窗口[0, 5000)计算
- Watermark(10000) 到达 → 触发窗口[5000, 10000)计算窗口触发条件:
当 Watermark >= 窗口结束时间时,窗口触发计算

2. TumblingProcessingTimeWindows(处理时间窗口)

特点
input.keyBy(value -> value.getUserId()).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("amount");

关键特性

  • ⏱️ 基于处理时间:使用系统当前时间
  • 低延迟:数据到达就可以处理
  • 结果不确定:重放数据结果可能不同
  • 简单配置:无需配置时间戳和Watermark
完整示例
DataStream<Tuple2<String, Double>> purchases = env.fromElements(new Tuple2<>("user1", 100.0),new Tuple2<>("user1", 200.0),new Tuple2<>("user2", 300.0)
);// 处理时间滚动窗口:5秒
purchases.keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print();/* 窗口划分(基于系统时间):
假设系统时间从 2025-01-01 10:00:00 开始窗口[10:00:00, 10:00:05):- 这5秒内到达的所有数据窗口[10:00:05, 10:00:10):- 接下来5秒到达的数据注意:与数据自带的时间戳无关!
*/env.execute("Processing-Time Tumbling Window");
处理时间 vs 事件时间对比
// 同一批数据的不同处理方式// 场景:数据延迟到达
Event1: eventTime=1000, processTime=5000 (延迟4秒到达)
Event2: eventTime=2000, processTime=6000 (延迟4秒到达)// 事件时间窗口[0, 5000):
// ✅ Event1和Event2都在窗口内(基于eventTime)// 处理时间窗口[0, 5000):
// ❌ Event1和Event2都不在窗口内(基于processTime)
// ✅ 它们在窗口[5000, 10000)内

3. 带偏移量的窗口(Time Offset)

语法
TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))
//                          ^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^
//                          窗口大小         偏移量
作用:处理时区问题
input.keyBy(event -> event.getUserId()).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).sum("amount");

核心概念

  • 📅 每日窗口:统计每天的数据
  • 🌍 时区调整:偏移量用于调整窗口边界
  • 🇨🇳 中国时区Time.hours(-8) 表示UTC-8(北京时间)
窗口边界计算
默认(无偏移):
时间基准:UTC 00:00:00
窗口1: [1970-01-01 00:00:00, 1970-01-02 00:00:00) UTC
窗口2: [1970-01-02 00:00:00, 1970-01-03 00:00:00) UTC带偏移 Time.hours(-8):
时间基准:UTC 00:00:00 - 8小时 = UTC 16:00:00 (前一天)
窗口1: [1970-01-01 16:00:00 UTC, 1970-01-02 16:00:00 UTC)= [1970-01-02 00:00:00 CST, 1970-01-03 00:00:00 CST)北京时间的每日窗口窗口2: [1970-01-02 16:00:00 UTC, 1970-01-03 16:00:00 UTC)= [1970-01-03 00:00:00 CST, 1970-01-04 00:00:00 CST)
完整示例:每日销售统计
public class DailyReportWithOffset {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟订单数据(带时间戳)DataStream<Order> orders = env.fromElements(new Order("product1", 100.0, parseTime("2025-01-01 08:00:00")),new Order("product1", 200.0, parseTime("2025-01-01 15:00:00")),new Order("product1", 150.0, parseTime("2025-01-02 09:00:00")),new Order("product2", 300.0, parseTime("2025-01-02 18:00:00")));// 分配时间戳DataStream<Order> withTimestamps = orders.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner((order, ts) -> order.timestamp));// 每日滚动窗口,按北京时间(UTC-8)withTimestamps.keyBy(order -> order.productId).window(TumblingEventTimeWindows.of(Time.days(1),      // 窗口大小:1天Time.hours(-8)     // 偏移量:-8小时(北京时间))).process(new ProcessWindowFunction<Order, String, String, TimeWindow>() {@Overridepublic void process(String productId,Context ctx,Iterable<Order> elements,Collector<String> out) {double totalAmount = 0;int count = 0;for (Order order : elements) {totalAmount += order.amount;count++;}// 格式化窗口时间SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");sdf.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));String windowStart = sdf.format(new Date(ctx.window().getStart()));String windowEnd = sdf.format(new Date(ctx.window().getEnd()));out.collect(String.format("Product: %s, Window: [%s, %s), Orders: %d, Total: %.2f",productId, windowStart, windowEnd, count, totalAmount));}}).print();env.execute("Daily Report with Offset");}static class Order {String productId;Double amount;Long timestamp;Order(String productId, Double amount, Long timestamp) {this.productId = productId;this.amount = amount;this.timestamp = timestamp;}}static long parseTime(String timeStr) {try {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");sdf.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));return sdf.parse(timeStr).getTime();} catch (Exception e) {return 0L;}}
}/* 输出:
Product: product1, Window: [2025-01-01 00:00:00, 2025-01-02 00:00:00), Orders: 2, Total: 300.00
Product: product1, Window: [2025-01-02 00:00:00, 2025-01-03 00:00:00), Orders: 1, Total: 150.00
Product: product2, Window: [2025-01-02 00:00:00, 2025-01-03 00:00:00), Orders: 1, Total: 300.00
*/
不同时区的偏移量
// 北京时间 UTC+8(实际写成-8是因为窗口边界向前偏移)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))// 纽约时间 UTC-5(冬令时)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(5)))// 伦敦时间 UTC+0
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(0)))
// 或直接省略偏移量
.window(TumblingEventTimeWindows.of(Time.days(1)))// 东京时间 UTC+9
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-9)))

窗口函数(Window Function)

上述代码中的<windowed transformation>可以使用多种窗口函数:

1. 简单聚合函数

// sum:求和
.window(...).sum("amount")
.window(...).sum(1)  // Tuple的第二个字段// min/max:最小/最大值
.window(...).min("price")
.window(...).max("temperature")// minBy/maxBy:返回整个元素
.window(...).minBy("price")  // 返回价格最低的整个订单

2. ReduceFunction(增量聚合)

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((value1, value2) -> {return new Event(value1.userId,value1.amount + value2.amount,  // 累加金额Math.max(value1.timestamp, value2.timestamp));
});

3. AggregateFunction(增量+输出转换)

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Event, Tuple2<Double, Integer>, Double>() {// 计算平均值public Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}public Tuple2<Double, Integer> add(Event event, Tuple2<Double, Integer> acc) {return new Tuple2<>(acc.f0 + event.amount, acc.f1 + 1);}public Double getResult(Tuple2<Double, Integer> acc) {return acc.f1 == 0 ? 0.0 : acc.f0 / acc.f1;}public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
});

4. ProcessWindowFunction(全量窗口函数)

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Event, String, String, TimeWindow>() {@Overridepublic void process(String key,Context context,Iterable<Event> elements,Collector<String> out) {int count = 0;double sum = 0;for (Event event : elements) {count++;sum += event.amount;}out.collect(String.format("Key: %s, Window: [%d-%d], Count: %d, Sum: %.2f",key, context.window().getStart(), context.window().getEnd(),count, sum));}
});

三种窗口类型对比总结

特性Event-TimeProcessing-TimeEvent-Time with Offset
时间基准数据时间戳系统时间数据时间戳+偏移
确定性✅ 确定❌ 不确定✅ 确定
乱序处理✅ 支持❌ 不支持✅ 支持
延迟较高较高
配置复杂度高(需Watermark)高(需Watermark)
适用场景数据分析、报表实时监控跨时区统计

完整实战示例对比

public class WindowComparison {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> input = env.fromElements(new Tuple2<>("user1", 10),new Tuple2<>("user1", 20),new Tuple2<>("user2", 30));// 方式1:事件时间窗口input.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((event, ts) -> System.currentTimeMillis())).keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print("Event-Time");// 方式2:处理时间窗口input.keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print("Processing-Time");// 方式3:带偏移的事件时间窗口input.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((event, ts) -> System.currentTimeMillis())).keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).sum(1).print("Event-Time with Offset");env.execute("Window Comparison");}
}

关键要点总结

  1. 滚动窗口:固定大小、不重叠、首尾相接
  2. 事件时间窗口:基于数据时间戳,结果确定,需配置Watermark
  3. 处理时间窗口:基于系统时间,低延迟,无需配置
  4. 带偏移窗口:用于时区调整,实现跨时区统计
  5. ⚠️ 偏移量计算:北京时间UTC+8,偏移量写Time.hours(-8)
  6. ⚠️ 选择依据:数据分析用事件时间,实时监控用处理时间
  7. ⚠️ 窗口触发:事件时间需Watermark触发,处理时间到时间就触发
http://www.dtcms.com/a/521148.html

相关文章:

  • 丢盖网logo设计免费官网合肥网站建设优化
  • Android View, SurfaceView, GLSurfaceView 的区别
  • 数据结构---优先级队列(堆)
  • PHP反序列化漏洞
  • dw做的网站怎么发布到网上wordpress wamp
  • 信用门户网站建设方案网站建设空间申请
  • RAG性能提升:从查询优化到范式演进的系统性路径
  • 响应式网站开发方法游戏官网平台
  • 网络管理员教程(初级)第六版--第4章 Web网站建设
  • 网站导入链接创建
  • 企业网站最重要的访问对象是Vantage wordpress主题
  • SpringBoot-数据访问之Druid
  • 算法 vs 社区:Pump 与 FourMeme 的增长机制之战
  • 「PPG信号处理——(2)脉搏波信号刺激前后RMSSD心率变异性研究」2025年10月23日
  • 学网站开发可以创业吗学校网站英文
  • 哈尔滨企业自助建站php做的网站手机能看到
  • Ubuntu关于串口的操作
  • 电子商务网站建设与维护考试提供郑州网站建设
  • 内蒙建设厅网站怎么查建筑电工证门户网站底部
  • 全面掌握PostgreSQL关系型数据库 日志配置 笔记07
  • 工厂做哪个网站好鼓楼微网站开发
  • 江门网站建设方案推广wordpress地址改错了
  • 什么叫网站外链如何做企业组织架构图
  • 网页与网站设计工作内容wordpress的登陆地址修改
  • 为什么在数据库表中存储的数字是20.02,但是在前端读取的时候就会呈现20.0200000000000
  • 手机网站的文本排版是怎么做的百度一下就知道官网
  • XSS漏洞攻击 (跨站脚本攻击)
  • 龙之向导外贸经理人网站云南省住房和城乡建设局网站
  • 高端品牌网站建设九五网络家居网站模板
  • 第16章:Spring AI Alibaba Graph框架— 人类反馈