Flink中 Window解析
Window
: 窗口的抽象基类
在 Flink 中,Window
是一个抽象类,它是所有具体窗口类型(如时间窗口、计数窗口)的父类。可以把它理解为定义“窗口”这一概念的顶层设计。
- 数据分桶: Flink 是一个流处理引擎,数据流是无限的。为了在无界流上进行聚合等计算,必须将流切分成有限大小的“桶”(buckets),而
Window
就是这些桶的逻辑表示。 - 状态命名空间: 在 Flink 的窗口计算中,数据会根据 Key 和分配到的窗口进行分组。
Window
对象本身,连同 Key,共同构成了一个唯一的命名空间。Flink 的窗口状态(例如,窗口内的数据、聚合的中间结果)就是存储在这个命名空间下的。这意味着,WindowA
的状态和WindowB
的状态是完全隔离的。 - 定义窗口生命周期:
Window
类有一个核心的抽象方法maxTimestamp()
。public abstract long maxTimestamp();
- 这个方法返回该窗口所能包含的元素的最大时间戳。对于时间窗口,它返回的是窗口的结束时间。对于非时间窗口(如
CountWindow
),返回一个超大整数表示无效。这个返回值对于 Flink 的事件时间处理机制(特别是 watermark)至关重要,Flink 会用它来判断一个窗口是否已经“过时”,从而可以安全地清理其状态。
Window
类还要求其子类实现 equals()
和 hashCode()
方法,以确保可以正确地将窗口作为 Map 的 Key 或在集合中进行比较。同时,它实现了 Comparable
接口,使得窗口之间可以排序。
CountWindow
: 基于计数的窗口实现
现在我们来看 CountWindow.java
,它是 Window
的一个具体实现,专门用于表示基于计数的窗口(Count Window)。
public class CountWindow extends Window {private final long id;public CountWindow(long id) {this.id = id;}/** Gets the id (0-based) of the window. */public long getId() {return id;}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}@Overridepublic boolean equals(Object o) {// ...CountWindow window = (CountWindow) o;return id == window.id;}@Overridepublic int hashCode() {return MathUtils.longToIntWithBitMixing(id);}// .../** The serializer used to write the CountWindow type. */public static class Serializer extends TypeSerializerSingleton<CountWindow> {// ...@Overridepublic void serialize(CountWindow record, DataOutputView target) throws IOException {target.writeLong(record.id);}@Overridepublic CountWindow deserialize(DataInputView source) throws IOException {return new CountWindow(source.readLong());}// ...}
}
核心属性与方法
private final long id;
: 这是CountWindow
最核心的属性。与TimeWindow
使用start
和end
时间戳来定义不同,CountWindow
仅用一个唯一的long
型id
来标识一个窗口。正如注释所说:For each count window, we will assign a unique id. Thus this CountWindow can act as namespace part in state. 这个
id
是由WindowAssigner
(例如CountSlidingWindowAssigner
)在分配窗口时计算出来的。maxTimestamp()
: 这个方法返回Long.MAX_VALUE
。这是一个非常关键的设计。它告诉 Flink 的事件时间处理器:“这个窗口永远不会基于 Watermark 过期”。因为计数窗口的生命周期是由元素的数量决定的,而不是时间。所以,它的状态清理必须由其他机制(如自定义 Trigger 或 State TTL)来管理,而不能依赖 Watermark。equals()
和hashCode()
: 这两个方法都只基于id
来实现。这意味着,两个CountWindow
对象当且仅当它们的id
相同时,才被认为是同一个窗口。这对于 Flink 将窗口状态正确地关联到对应的窗口至关重要。没有
start
和end
: 请注意,CountWindow
没有像TimeWindow
那样的getStart()
或getEnd()
方法。因为它不代表一个时间区间。这也解释了为什么在测试代码中会出现这样的错误:"Window start and Window end cannot be selected for a row-count tumble window." 来源:
GroupWindowValidationTest.scala
序列化器 (Serializer)
内部类 Serializer
负责 CountWindow
对象的序列化和反序列化,以便在网络传输或写入状态后端时使用。
TypeSerializerSingleton
: 它继承自TypeSerializerSingleton
,表明这个序列化器是无状态且线程安全的,因此在整个 JVM 中只需要一个实例,这是一种性能优化。isImmutableType()
: 返回true
,因为CountWindow
只有一个final
字段id
,是不可变对象。这告诉 Flink 在处理时可以省去不必要的对象拷贝,也是一种性能优化。serialize()
/deserialize()
: 实现非常高效,仅仅是写入或读取一个long
类型的id
。
总结与关联
Window
是抽象,CountWindow
是具体:Window
定义了所有窗口的通用行为和目的,而CountWindow
是专门为“数据驱动”(data-driven)的场景设计的具体实现。CountWindow
由Assigner
创建:CountWindow
的实例(特别是它的id
)是由CountTumblingWindowAssigner
或CountSlidingWindowAssigner
在处理每个元素时动态计算并创建的。简单而强大:
CountWindow
的设计非常简洁,仅用一个id
就完成了对一个计数窗口的全部定义。这种简单性带来了高性能(序列化开销小,比较快),同时通过maxTimestamp()
的巧妙设计,清晰地将自己与基于时间的窗口体系分离开来,避免了逻辑上的混淆。应用场景: 当你的业务逻辑需要“每N个元素计算一次”或者“计算最近N个元素”时,底层使用的就是
CountWindow
。例如,在WindowWordCount.java
示例中调用的.countWindow(windowSize, slideSize)
就会使用CountWindow
。
TimeWindow
TimeWindow
是 Window
抽象类的一个核心具体实现,专门用于表示一个时间区间。当你在 Flink 中使用滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)或会话窗口(Session Windows)时,其底层的窗口表示就是 TimeWindow
。
TimeWindow
的设计非常直观,它代表一个左闭右开的时间区间 [start, end)
。
// ... existing code ...
/*** A {@link Window} that represents a time interval from {@code start} (inclusive) to {@code end}* (exclusive).*/
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}
// ... existing code ...
- 核心属性:
start
: 窗口的起始时间戳(包含在窗口内),单位是毫秒。end
: 窗口的结束时间戳(不包含在窗口内),单位是毫秒。
- 时间语义无关性:
TimeWindow
本身与**事件时间(Event Time)或处理时间(Processing Time)**无关。它只是一个时间区间的表示。具体这个区间是根据事件时间还是处理时间来划分,是由WindowAssigner
(如TumblingEventTimeWindows
或SlidingProcessingTimeWindows
)决定的。
我们来分析 TimeWindow.java
中的一些关键方法实现。
maxTimestamp()
// ... existing code .../*** Gets the largest timestamp that still belongs to this window.** <p>This timestamp is identical to {@code getEnd() - 1}.** @return The largest timestamp that still belongs to this window.* @see #getEnd()*/@Overridepublic long maxTimestamp() {return end - 1;}
// ... existing code ...
这是从父类 Window
继承的非常重要的方法。它返回此窗口能包含的最大时间戳。由于 end
是开区间,所以属于该窗口的最大时间戳就是 end - 1
。这个返回值对于 Flink 的事件时间处理机制至关重要:
- 窗口触发与清理: Flink 的 Watermark(水位线)会与
maxTimestamp()
的值进行比较。当 Watermark 超过了window.maxTimestamp()
,Flink 就知道这个窗口的所有常规数据都已到达,可以触发窗口计算。 - 生命周期: 当 Watermark 超过
window.maxTimestamp() + allowedLateness
时,Flink 认为这个窗口已经彻底结束,可以安全地清理该窗口的所有状态。
hashCode()
// ... existing code ...@Overridepublic int hashCode() {// inspired from Apache BEAM// The end values are themselves likely to be arithmetic sequence, which// is a poor distribution to use for a hashtable, so we// add a highly non-linear transformation.return (int) (start + modInverse((int) (end << 1) + 1));}/** Compute the inverse of (odd) x mod 2^32. */private int modInverse(int x) {// Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x.int inverse = x * x * x;// Newton iteration doubles correct bits at each step.inverse *= 2 - x * inverse;inverse *= 2 - x * inverse;inverse *= 2 - x * inverse;return inverse;}
// ... existing code ...
hashCode()
的实现并非简单地组合 start
和 end
。注释中提到,这是借鉴了 Apache Beam 的实现。因为窗口的结束时间 end
通常是等差数列(例如,每5分钟一个窗口,end
可能是 10:05
, 10:10
, 10:15
...),这种序列的哈希分布性很差,容易导致大量哈希冲突,影响用作 HashMap
Key 时的性能。因此,这里通过一个非线性的 modInverse
变换来打乱分布,使得哈希码更均匀。
intersects()
和 cover()
// ... existing code .../** Returns {@code true} if this window intersects the given window. */public boolean intersects(TimeWindow other) {return this.start <= other.end && this.end >= other.start;}/** Returns the minimal window covers both this window and the given window. */public TimeWindow cover(TimeWindow other) {return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));}
// ... existing code ...
这两个方法主要用于可合并窗口的场景,最典型的就是 SessionWindowAssigner
。当两个会话窗口的间隔足够近时,它们需要被合并成一个更大的窗口。intersects
用于判断是否需要合并,cover
则用于执行合并操作。
getWindowStartWithOffset()
// ... existing code .../*** Method to get the window start for a timestamp.** @param timestamp epoch millisecond to get the window start.* @param offset The offset which window start would be shifted by.* @param windowSize The size of the generated windows.* @return window start*/public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {final long remainder = (timestamp - offset) % windowSize;// handle both positive and negative casesif (remainder < 0) {return timestamp - (remainder + windowSize);} else {return timestamp - remainder;}}
// ... existing code ...
这是一个极其重要的静态工具方法,它是所有固定大小窗口(滚动、滑动)分配算法的核心。它根据任意一个时间戳,计算出该时间戳所属窗口的起始时间。
offset
: 偏移量。这个参数非常灵活,最常见的用途就是处理时区。例如,一个按天聚合的窗口,在默认 UTC 时区下,窗口是[00:00, 24:00)
。如果我想在Asia/Shanghai
(UTC+8) 时区下按天聚合,就需要一个-8
小时的偏移量,这样窗口的起始点就变成了 UTC 时间的16:00
。- 计算逻辑:
(timestamp - offset) % windowSize
计算出时间戳在一个对齐后的时间轴上,距离上一个窗口边界的余数。用时间戳减去这个余数,就得到了窗口的起始点。代码中对负数取余的特殊处理确保了在处理公元元年(epoch)之前的时间戳时也能得到正确结果。
与时区的关系
TimeWindow
与时区的关系是一个重要主题。
- 内部表示:
TimeWindow
的start
和end
始终是 UTC 毫秒时间戳。 - 外部影响: 用户指定的
table.local-time-zone
会影响窗口边界的计算。- 当使用
TIMESTAMP
(不带时区信息) 作为事件时间属性时,Flink 视其为 UTC 时间。无论table.local-time-zone
设置成什么,计算出的window_start
和window_end
的 UTC 值都是相同的。 - 当使用
TIMESTAMP_LTZ
(带本地时区) 作为事件时间属性时,Flink 会根据table.local-time-zone
来解析时间戳并计算窗口边界。因此,切换时区会导致计算出的window_start
和window_end
的 UTC 值发生变化。例如,对于Asia/Shanghai
时区,TUMBLE(ts_ltz, INTERVAL '1' DAY)
的窗口会从北京时间的00:00
开始,这对应的是前一天的16:00
UTC。
- 当使用
总结
TimeWindow
是 Flink 时间窗口计算的核心数据结构。它通过 [start, end)
毫秒时间戳清晰地定义了一个时间区间。它的 maxTimestamp()
方法与 Flink 的 Watermark 机制紧密结合,共同驱动着事件时间窗口的触发和生命周期管理。其内部精巧的 hashCode
实现和强大的 getWindowStartWithOffset
工具方法,分别在性能和功能(尤其是时区支持)上提供了坚实的保障。理解 TimeWindow
是深入理解 Flink 所有时间相关操作的基础。
GlobalWindow
与 TimeWindow
或 CountWindow
不同,它不将数据流切分成多个小窗口,而是将所有具有相同 Key 的元素都分配到同一个、唯一的、全局的窗口中。
GlobalWindow
的实现采用了单例模式,这完美地体现了其“全局唯一”的核心思想。
public class GlobalWindow extends Window {private static final GlobalWindow INSTANCE = new GlobalWindow();private GlobalWindow() {}public static GlobalWindow get() {return INSTANCE;}// ...
}
private static final GlobalWindow INSTANCE
: 在类加载时就创建了唯一的实例。private GlobalWindow()
: 私有构造函数防止外部直接创建新实例。public static GlobalWindow get()
: 提供全局唯一的访问点。
这意味着在整个 Flink 作业的生命周期中,逻辑上只存在一个 GlobalWindow
。所有被分配到全局窗口的数据,无论它们的 Key 是什么,它们所属的窗口对象都是同一个 GlobalWindow.INSTANCE
。
关键方法分析
GlobalWindow
的方法实现进一步强化了它的特性:
// ... existing code ...@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}@Overridepublic int hashCode() {return 0;}@Overridepublic boolean equals(Object o) {return this == o || !(o == null || getClass() != o.getClass());}
// ... existing code ...
maxTimestamp()
: 返回Long.MAX_VALUE
。这个返回值至关重要,它向 Flink 的事件时间系统传递了一个明确的信号:“这个窗口永远不会基于 Watermark 过期”。因此,你不能依赖 Watermark 来触发全局窗口的计算或清理。正如文档中所说:使用
GlobalWindows
时,没有数据会被视作迟到,因为全局窗口的结束 timestamp 是Long.MAX_VALUE
。 来源:docs/content.zh/docs/dev/datastream/operators/windows.md:1185-1210
equals()
和hashCode()
:hashCode
总是返回0,equals
判断是否为GlobalWindow
类型。这确保了在任何情况下,GlobalWindow
实例都被视为同一个,这对于在状态后端中作为命名空间的一部分至关重要。
窗口分配器:GlobalWindows
与 GlobalWindow
配套使用的是 GlobalWindows
分配器。它的逻辑非常简单:
// ... existing code ...@Overridepublic Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {return Collections.singletonList(GlobalWindow.get());}
// ... existing code ...
无论什么元素、什么时间戳进来,assignWindows
方法总是返回同一个 GlobalWindow.INSTANCE
实例。
必须与触发器 (Trigger) 配合使用
这是理解和使用 GlobalWindow
最关键的一点。由于 GlobalWindow
没有天然的结束点,它自身永远不会触发计算。必须为它指定一个自定义的触发器(Trigger)来定义计算和清空的条件。
Flink 的 GlobalWindows
分配器默认使用 NeverTrigger
:
// ... existing code .../** A trigger that never fires, as default Trigger for GlobalWindows. */@Internalpublic static class NeverTrigger extends Trigger<Object, GlobalWindow> {// ... 所有 onElement, onEventTime, onProcessingTime 方法都只返回 CONTINUE}
// ... existing code ...
这意味着,如果你仅仅写了 .window(GlobalWindows.create())
而不指定触发器,你的窗口计算将永远不会发生。
文档中也明确指出了这一点:
这样的窗口模式仅在你指定了自定义的 trigger 时有用。否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。 来源:
docs/content.zh/docs/dev/datastream/operators/windows.md:395-427
常见的触发器策略包括:
- 基于数量: 使用
.trigger(CountTrigger.of(N))
,每当窗口中有 N 个元素时触发一次。 - 基于事件: 自定义一个 Trigger,当收到一个特定的“终止”事件时触发。
- 流末端触发 (End of Stream): 对于有界流(批处理),可以使用
GlobalWindows.createWithEndOfStreamTrigger()
,它会在数据流结束时触发一次计算。
在 Table API / SQL 中的角色
在执行计划(XML 和 JSON 文件)中,我们频繁看到 GlobalWindowAggregate
。例如:
GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], ...)
来源:WindowJoinTest.xml
这里需要特别注意,执行计划中的 GlobalWindowAggregate
并不等同于 用户代码中使用的 GlobalWindow
。
在 Flink Table/SQL 的窗口聚合优化中,通常采用两阶段聚合(也叫 Local-Global Aggregation)来减少网络 shuffle 的数据量:
LocalWindowAggregate
: 在上游 TaskManager 中,对数据进行本地的、小范围的预聚合。- Shuffle: 将预聚合的结果按
group by
的 key 进行 shuffle。 GlobalWindowAggregate
: 在下游 TaskManager 中,对来自不同上游节点的数据进行最终的合并和计算。
这里的 "Global" 指的是聚合的第二个阶段是“全局”的,汇集了所有相关数据,而不是指窗口类型是 GlobalWindow
。从计划中可以看到,它处理的仍然是 TUMBLE
, HOP
等时间窗口。
总结
GlobalWindow
是一个强大但需要小心使用的工具。它的核心特点可以概括为:
- 全局唯一: 所有数据(按 Key 分组后)都进入同一个窗口实例。
- 单例实现: 设计上保证了全局只有一个
GlobalWindow
对象,非常高效。 - 永不过期:
maxTimestamp
为Long.MAX_VALUE
,不受 Watermark 影响。 - 依赖触发器: 其本身没有触发逻辑,必须配合自定义
Trigger
才能进行计算,否则将不会有任何输出。
它最适合的场景是需要自定义非常灵活的窗口触发逻辑,或者在有界流上进行全量聚合的场景。