Flink DatastreamAPI详解(二)
Tumbling Windows
1. 事件时间滚动窗口(Event-Time)
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);
2. 处理时间滚动窗口(Processing-Time)
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);
3. 带偏移量的事件时间窗口(带时区调整)
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);
核心概念:滚动窗口(Tumbling Window)
特点
- ✅ 固定大小:窗口长度固定
- ✅ 不重叠:每个数据只属于一个窗口
- ✅ 首尾相接:窗口之间无间隙
窗口划分示意图
时间轴(5秒滚动窗口):
|----5s----|----5s----|----5s----|----5s----|
[0, 5) [5, 10) [10, 15) [15, 20)窗口1 窗口2 窗口3 窗口4特点:
- 左闭右开区间:[start, end)
- 窗口不重叠
- 每条数据只属于一个窗口
三种窗口详解
1. TumblingEventTimeWindows(事件时间窗口)
特点
input.keyBy(value -> value.getUserId()).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("amount");
关键特性:
- ⏰ 基于事件时间:使用数据本身的时间戳
- 🎯 结果确定:重放数据结果相同
- 📦 可处理乱序:配合Watermark处理延迟数据
- ⚠️ 需要配置:需要分配时间戳和Watermark
完整示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 设置事件时间特性(Flink 1.12+默认就是EventTime)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 2. 定义数据结构
class Event {String userId;Double amount;Long timestamp; // 事件时间戳public Event(String userId, Double amount, Long timestamp) {this.userId = userId;this.amount = amount;this.timestamp = timestamp;}
}DataStream<Event> events = env.fromElements(new Event("user1", 100.0, 1000L), // 1秒new Event("user1", 200.0, 2000L), // 2秒new Event("user1", 150.0, 6000L), // 6秒(下一个窗口)new Event("user2", 300.0, 3000L), // 3秒new Event("user2", 250.0, 4000L) // 4秒
);// 3. 分配时间戳和Watermark
DataStream<Event> withTimestamps = events.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((event, timestamp) -> event.timestamp)
);// 4. 5秒事件时间滚动窗口
withTimestamps.keyBy(event -> event.userId).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("amount").print();/* 窗口划分:
窗口[0, 5000):user1: 100 + 200 = 300user2: 300 + 250 = 550窗口[5000, 10000):user1: 150输出:
user1: 300.0 (窗口[0,5000))
user2: 550.0 (窗口[0,5000))
user1: 150.0 (窗口[5000,10000))
*/env.execute("Event-Time Tumbling Window");
事件时间窗口触发机制
数据流(带事件时间):
timestamp=1000: Event1 → 进入窗口[0, 5000)
timestamp=2000: Event2 → 进入窗口[0, 5000)
timestamp=6000: Event3 → 进入窗口[5000, 10000)Watermark机制:
- Watermark(5000) 到达 → 触发窗口[0, 5000)计算
- Watermark(10000) 到达 → 触发窗口[5000, 10000)计算窗口触发条件:
当 Watermark >= 窗口结束时间时,窗口触发计算
2. TumblingProcessingTimeWindows(处理时间窗口)
特点
input.keyBy(value -> value.getUserId()).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("amount");
关键特性:
- ⏱️ 基于处理时间:使用系统当前时间
- ⚡ 低延迟:数据到达就可以处理
- ❌ 结果不确定:重放数据结果可能不同
- ✅ 简单配置:无需配置时间戳和Watermark
完整示例
DataStream<Tuple2<String, Double>> purchases = env.fromElements(new Tuple2<>("user1", 100.0),new Tuple2<>("user1", 200.0),new Tuple2<>("user2", 300.0)
);// 处理时间滚动窗口:5秒
purchases.keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print();/* 窗口划分(基于系统时间):
假设系统时间从 2025-01-01 10:00:00 开始窗口[10:00:00, 10:00:05):- 这5秒内到达的所有数据窗口[10:00:05, 10:00:10):- 接下来5秒到达的数据注意:与数据自带的时间戳无关!
*/env.execute("Processing-Time Tumbling Window");
处理时间 vs 事件时间对比
// 同一批数据的不同处理方式// 场景:数据延迟到达
Event1: eventTime=1000, processTime=5000 (延迟4秒到达)
Event2: eventTime=2000, processTime=6000 (延迟4秒到达)// 事件时间窗口[0, 5000):
// ✅ Event1和Event2都在窗口内(基于eventTime)// 处理时间窗口[0, 5000):
// ❌ Event1和Event2都不在窗口内(基于processTime)
// ✅ 它们在窗口[5000, 10000)内
3. 带偏移量的窗口(Time Offset)
语法
TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))
// ^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^
// 窗口大小 偏移量
作用:处理时区问题
input.keyBy(event -> event.getUserId()).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).sum("amount");
核心概念:
- 📅 每日窗口:统计每天的数据
- 🌍 时区调整:偏移量用于调整窗口边界
- 🇨🇳 中国时区:
Time.hours(-8)表示UTC-8(北京时间)
窗口边界计算
默认(无偏移):
时间基准:UTC 00:00:00
窗口1: [1970-01-01 00:00:00, 1970-01-02 00:00:00) UTC
窗口2: [1970-01-02 00:00:00, 1970-01-03 00:00:00) UTC带偏移 Time.hours(-8):
时间基准:UTC 00:00:00 - 8小时 = UTC 16:00:00 (前一天)
窗口1: [1970-01-01 16:00:00 UTC, 1970-01-02 16:00:00 UTC)= [1970-01-02 00:00:00 CST, 1970-01-03 00:00:00 CST)北京时间的每日窗口窗口2: [1970-01-02 16:00:00 UTC, 1970-01-03 16:00:00 UTC)= [1970-01-03 00:00:00 CST, 1970-01-04 00:00:00 CST)
完整示例:每日销售统计
public class DailyReportWithOffset {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟订单数据(带时间戳)DataStream<Order> orders = env.fromElements(new Order("product1", 100.0, parseTime("2025-01-01 08:00:00")),new Order("product1", 200.0, parseTime("2025-01-01 15:00:00")),new Order("product1", 150.0, parseTime("2025-01-02 09:00:00")),new Order("product2", 300.0, parseTime("2025-01-02 18:00:00")));// 分配时间戳DataStream<Order> withTimestamps = orders.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forMonotonousTimestamps().withTimestampAssigner((order, ts) -> order.timestamp));// 每日滚动窗口,按北京时间(UTC-8)withTimestamps.keyBy(order -> order.productId).window(TumblingEventTimeWindows.of(Time.days(1), // 窗口大小:1天Time.hours(-8) // 偏移量:-8小时(北京时间))).process(new ProcessWindowFunction<Order, String, String, TimeWindow>() {@Overridepublic void process(String productId,Context ctx,Iterable<Order> elements,Collector<String> out) {double totalAmount = 0;int count = 0;for (Order order : elements) {totalAmount += order.amount;count++;}// 格式化窗口时间SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");sdf.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));String windowStart = sdf.format(new Date(ctx.window().getStart()));String windowEnd = sdf.format(new Date(ctx.window().getEnd()));out.collect(String.format("Product: %s, Window: [%s, %s), Orders: %d, Total: %.2f",productId, windowStart, windowEnd, count, totalAmount));}}).print();env.execute("Daily Report with Offset");}static class Order {String productId;Double amount;Long timestamp;Order(String productId, Double amount, Long timestamp) {this.productId = productId;this.amount = amount;this.timestamp = timestamp;}}static long parseTime(String timeStr) {try {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");sdf.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));return sdf.parse(timeStr).getTime();} catch (Exception e) {return 0L;}}
}/* 输出:
Product: product1, Window: [2025-01-01 00:00:00, 2025-01-02 00:00:00), Orders: 2, Total: 300.00
Product: product1, Window: [2025-01-02 00:00:00, 2025-01-03 00:00:00), Orders: 1, Total: 150.00
Product: product2, Window: [2025-01-02 00:00:00, 2025-01-03 00:00:00), Orders: 1, Total: 300.00
*/
不同时区的偏移量
// 北京时间 UTC+8(实际写成-8是因为窗口边界向前偏移)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))// 纽约时间 UTC-5(冬令时)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(5)))// 伦敦时间 UTC+0
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(0)))
// 或直接省略偏移量
.window(TumblingEventTimeWindows.of(Time.days(1)))// 东京时间 UTC+9
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-9)))
窗口函数(Window Function)
上述代码中的<windowed transformation>可以使用多种窗口函数:
1. 简单聚合函数
// sum:求和
.window(...).sum("amount")
.window(...).sum(1) // Tuple的第二个字段// min/max:最小/最大值
.window(...).min("price")
.window(...).max("temperature")// minBy/maxBy:返回整个元素
.window(...).minBy("price") // 返回价格最低的整个订单
2. ReduceFunction(增量聚合)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((value1, value2) -> {return new Event(value1.userId,value1.amount + value2.amount, // 累加金额Math.max(value1.timestamp, value2.timestamp));
});
3. AggregateFunction(增量+输出转换)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Event, Tuple2<Double, Integer>, Double>() {// 计算平均值public Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}public Tuple2<Double, Integer> add(Event event, Tuple2<Double, Integer> acc) {return new Tuple2<>(acc.f0 + event.amount, acc.f1 + 1);}public Double getResult(Tuple2<Double, Integer> acc) {return acc.f1 == 0 ? 0.0 : acc.f0 / acc.f1;}public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
});
4. ProcessWindowFunction(全量窗口函数)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Event, String, String, TimeWindow>() {@Overridepublic void process(String key,Context context,Iterable<Event> elements,Collector<String> out) {int count = 0;double sum = 0;for (Event event : elements) {count++;sum += event.amount;}out.collect(String.format("Key: %s, Window: [%d-%d], Count: %d, Sum: %.2f",key, context.window().getStart(), context.window().getEnd(),count, sum));}
});
三种窗口类型对比总结
| 特性 | Event-Time | Processing-Time | Event-Time with Offset |
|---|---|---|---|
| 时间基准 | 数据时间戳 | 系统时间 | 数据时间戳+偏移 |
| 确定性 | ✅ 确定 | ❌ 不确定 | ✅ 确定 |
| 乱序处理 | ✅ 支持 | ❌ 不支持 | ✅ 支持 |
| 延迟 | 较高 | 低 | 较高 |
| 配置复杂度 | 高(需Watermark) | 低 | 高(需Watermark) |
| 适用场景 | 数据分析、报表 | 实时监控 | 跨时区统计 |
完整实战示例对比
public class WindowComparison {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> input = env.fromElements(new Tuple2<>("user1", 10),new Tuple2<>("user1", 20),new Tuple2<>("user2", 30));// 方式1:事件时间窗口input.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((event, ts) -> System.currentTimeMillis())).keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print("Event-Time");// 方式2:处理时间窗口input.keyBy(tuple -> tuple.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print("Processing-Time");// 方式3:带偏移的事件时间窗口input.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((event, ts) -> System.currentTimeMillis())).keyBy(tuple -> tuple.f0).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).sum(1).print("Event-Time with Offset");env.execute("Window Comparison");}
}
关键要点总结
- ✅ 滚动窗口:固定大小、不重叠、首尾相接
- ✅ 事件时间窗口:基于数据时间戳,结果确定,需配置Watermark
- ✅ 处理时间窗口:基于系统时间,低延迟,无需配置
- ✅ 带偏移窗口:用于时区调整,实现跨时区统计
- ⚠️ 偏移量计算:北京时间UTC+8,偏移量写
Time.hours(-8) - ⚠️ 选择依据:数据分析用事件时间,实时监控用处理时间
- ⚠️ 窗口触发:事件时间需Watermark触发,处理时间到时间就触发
