当前位置: 首页 > news >正文

Flink中 Window解析

Window: 窗口的抽象基类

在 Flink 中,Window 是一个抽象类,它是所有具体窗口类型(如时间窗口、计数窗口)的父类。可以把它理解为定义“窗口”这一概念的顶层设计。

  1. 数据分桶: Flink 是一个流处理引擎,数据流是无限的。为了在无界流上进行聚合等计算,必须将流切分成有限大小的“桶”(buckets),而 Window 就是这些桶的逻辑表示。
  2. 状态命名空间: 在 Flink 的窗口计算中,数据会根据 Key 和分配到的窗口进行分组。Window 对象本身,连同 Key,共同构成了一个唯一的命名空间。Flink 的窗口状态(例如,窗口内的数据、聚合的中间结果)就是存储在这个命名空间下的。这意味着,WindowA 的状态和 WindowB 的状态是完全隔离的。
  3. 定义窗口生命周期Window 类有一个核心的抽象方法 maxTimestamp()
    • public abstract long maxTimestamp();
    • 这个方法返回该窗口所能包含的元素的最大时间戳。对于时间窗口,它返回的是窗口的结束时间。对于非时间窗口(如 CountWindow),返回一个超大整数表示无效。这个返回值对于 Flink 的事件时间处理机制(特别是 watermark)至关重要,Flink 会用它来判断一个窗口是否已经“过时”,从而可以安全地清理其状态。

Window 类还要求其子类实现 equals() 和 hashCode() 方法,以确保可以正确地将窗口作为 Map 的 Key 或在集合中进行比较。同时,它实现了 Comparable 接口,使得窗口之间可以排序。

CountWindow: 基于计数的窗口实现

现在我们来看 CountWindow.java,它是 Window 的一个具体实现,专门用于表示基于计数的窗口(Count Window)。

public class CountWindow extends Window {private final long id;public CountWindow(long id) {this.id = id;}/** Gets the id (0-based) of the window. */public long getId() {return id;}@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}@Overridepublic boolean equals(Object o) {// ...CountWindow window = (CountWindow) o;return id == window.id;}@Overridepublic int hashCode() {return MathUtils.longToIntWithBitMixing(id);}// .../** The serializer used to write the CountWindow type. */public static class Serializer extends TypeSerializerSingleton<CountWindow> {// ...@Overridepublic void serialize(CountWindow record, DataOutputView target) throws IOException {target.writeLong(record.id);}@Overridepublic CountWindow deserialize(DataInputView source) throws IOException {return new CountWindow(source.readLong());}// ...}
}

核心属性与方法

  • private final long id;: 这是 CountWindow 最核心的属性。与 TimeWindow 使用 start 和 end 时间戳来定义不同,CountWindow 仅用一个唯一的 long 型 id 来标识一个窗口。正如注释所说:

    For each count window, we will assign a unique id. Thus this CountWindow can act as namespace part in state. 这个 id 是由 WindowAssigner(例如  CountSlidingWindowAssigner)在分配窗口时计算出来的。

  • maxTimestamp(): 这个方法返回 Long.MAX_VALUE。这是一个非常关键的设计。它告诉 Flink 的事件时间处理器:“这个窗口永远不会基于 Watermark 过期”。因为计数窗口的生命周期是由元素的数量决定的,而不是时间。所以,它的状态清理必须由其他机制(如自定义 Trigger 或 State TTL)来管理,而不能依赖 Watermark。

  • equals() 和 hashCode(): 这两个方法都只基于 id 来实现。这意味着,两个 CountWindow 对象当且仅当它们的 id 相同时,才被认为是同一个窗口。这对于 Flink 将窗口状态正确地关联到对应的窗口至关重要。

  • 没有 start 和 end: 请注意,CountWindow 没有像 TimeWindow 那样的 getStart() 或 getEnd() 方法。因为它不代表一个时间区间。这也解释了为什么在测试代码中会出现这样的错误:

    "Window start and Window end cannot be selected for a row-count tumble window." 来源: GroupWindowValidationTest.scala

序列化器 (Serializer)

内部类 Serializer 负责 CountWindow 对象的序列化和反序列化,以便在网络传输或写入状态后端时使用。

  • TypeSerializerSingleton: 它继承自 TypeSerializerSingleton,表明这个序列化器是无状态且线程安全的,因此在整个 JVM 中只需要一个实例,这是一种性能优化。
  • isImmutableType(): 返回 true,因为 CountWindow 只有一个 final 字段 id,是不可变对象。这告诉 Flink 在处理时可以省去不必要的对象拷贝,也是一种性能优化。
  • serialize() / deserialize(): 实现非常高效,仅仅是写入或读取一个 long 类型的 id

总结与关联

  1. Window 是抽象,CountWindow 是具体Window 定义了所有窗口的通用行为和目的,而 CountWindow 是专门为“数据驱动”(data-driven)的场景设计的具体实现。

  2. CountWindow 由 Assigner 创建CountWindow 的实例(特别是它的 id)是由 CountTumblingWindowAssigner 或 CountSlidingWindowAssigner 在处理每个元素时动态计算并创建的。

  3. 简单而强大CountWindow 的设计非常简洁,仅用一个 id 就完成了对一个计数窗口的全部定义。这种简单性带来了高性能(序列化开销小,比较快),同时通过 maxTimestamp() 的巧妙设计,清晰地将自己与基于时间的窗口体系分离开来,避免了逻辑上的混淆。

  4. 应用场景: 当你的业务逻辑需要“每N个元素计算一次”或者“计算最近N个元素”时,底层使用的就是 CountWindow。例如,在 WindowWordCount.java 示例中调用的 .countWindow(windowSize, slideSize) 就会使用 CountWindow

TimeWindow 

TimeWindow 是 Window 抽象类的一个核心具体实现,专门用于表示一个时间区间。当你在 Flink 中使用滚动窗口(Tumbling Windows)、滑动窗口(Sliding Windows)或会话窗口(Session Windows)时,其底层的窗口表示就是 TimeWindow

TimeWindow 的设计非常直观,它代表一个左闭右开的时间区间 [start, end)

// ... existing code ...
/*** A {@link Window} that represents a time interval from {@code start} (inclusive) to {@code end}* (exclusive).*/
public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}
// ... existing code ...
  • 核心属性:
    • start: 窗口的起始时间戳(包含在窗口内),单位是毫秒。
    • end: 窗口的结束时间戳(不包含在窗口内),单位是毫秒。
  • 时间语义无关性TimeWindow 本身与**事件时间(Event Time)处理时间(Processing Time)**无关。它只是一个时间区间的表示。具体这个区间是根据事件时间还是处理时间来划分,是由 WindowAssigner(如 TumblingEventTimeWindows 或 SlidingProcessingTimeWindows)决定的。

我们来分析 TimeWindow.java 中的一些关键方法实现。

maxTimestamp()

// ... existing code .../*** Gets the largest timestamp that still belongs to this window.** <p>This timestamp is identical to {@code getEnd() - 1}.** @return The largest timestamp that still belongs to this window.* @see #getEnd()*/@Overridepublic long maxTimestamp() {return end - 1;}
// ... existing code ...

这是从父类 Window 继承的非常重要的方法。它返回此窗口能包含的最大时间戳。由于 end 是开区间,所以属于该窗口的最大时间戳就是 end - 1。这个返回值对于 Flink 的事件时间处理机制至关重要:

  • 窗口触发与清理: Flink 的 Watermark(水位线)会与 maxTimestamp() 的值进行比较。当 Watermark 超过了 window.maxTimestamp(),Flink 就知道这个窗口的所有常规数据都已到达,可以触发窗口计算。
  • 生命周期: 当 Watermark 超过 window.maxTimestamp() + allowedLateness 时,Flink 认为这个窗口已经彻底结束,可以安全地清理该窗口的所有状态。

hashCode()

// ... existing code ...@Overridepublic int hashCode() {// inspired from Apache BEAM// The end values are themselves likely to be arithmetic sequence, which// is a poor distribution to use for a hashtable, so we// add a highly non-linear transformation.return (int) (start + modInverse((int) (end << 1) + 1));}/** Compute the inverse of (odd) x mod 2^32. */private int modInverse(int x) {// Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x.int inverse = x * x * x;// Newton iteration doubles correct bits at each step.inverse *= 2 - x * inverse;inverse *= 2 - x * inverse;inverse *= 2 - x * inverse;return inverse;}
// ... existing code ...

hashCode() 的实现并非简单地组合 start 和 end。注释中提到,这是借鉴了 Apache Beam 的实现。因为窗口的结束时间 end 通常是等差数列(例如,每5分钟一个窗口,end 可能是 10:0510:1010:15...),这种序列的哈希分布性很差,容易导致大量哈希冲突,影响用作 HashMap Key 时的性能。因此,这里通过一个非线性的 modInverse 变换来打乱分布,使得哈希码更均匀。

intersects() 和 cover()

// ... existing code .../** Returns {@code true} if this window intersects the given window. */public boolean intersects(TimeWindow other) {return this.start <= other.end && this.end >= other.start;}/** Returns the minimal window covers both this window and the given window. */public TimeWindow cover(TimeWindow other) {return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));}
// ... existing code ...

这两个方法主要用于可合并窗口的场景,最典型的就是 SessionWindowAssigner。当两个会话窗口的间隔足够近时,它们需要被合并成一个更大的窗口。intersects 用于判断是否需要合并,cover 则用于执行合并操作。

getWindowStartWithOffset()

// ... existing code .../*** Method to get the window start for a timestamp.** @param timestamp epoch millisecond to get the window start.* @param offset The offset which window start would be shifted by.* @param windowSize The size of the generated windows.* @return window start*/public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {final long remainder = (timestamp - offset) % windowSize;// handle both positive and negative casesif (remainder < 0) {return timestamp - (remainder + windowSize);} else {return timestamp - remainder;}}
// ... existing code ...

这是一个极其重要的静态工具方法,它是所有固定大小窗口(滚动、滑动)分配算法的核心。它根据任意一个时间戳,计算出该时间戳所属窗口的起始时间。

  • offset: 偏移量。这个参数非常灵活,最常见的用途就是处理时区。例如,一个按天聚合的窗口,在默认 UTC 时区下,窗口是 [00:00, 24:00)。如果我想在 Asia/Shanghai (UTC+8) 时区下按天聚合,就需要一个 -8 小时的偏移量,这样窗口的起始点就变成了 UTC 时间的 16:00
  • 计算逻辑(timestamp - offset) % windowSize 计算出时间戳在一个对齐后的时间轴上,距离上一个窗口边界的余数。用时间戳减去这个余数,就得到了窗口的起始点。代码中对负数取余的特殊处理确保了在处理公元元年(epoch)之前的时间戳时也能得到正确结果。

与时区的关系

TimeWindow 与时区的关系是一个重要主题。

  • 内部表示TimeWindow 的 start 和 end 始终是 UTC 毫秒时间戳
  • 外部影响: 用户指定的 table.local-time-zone 会影响窗口边界的计算。
    • 当使用 TIMESTAMP (不带时区信息) 作为事件时间属性时,Flink 视其为 UTC 时间。无论 table.local-time-zone 设置成什么,计算出的 window_start 和 window_end 的 UTC 值都是相同的。
    • 当使用 TIMESTAMP_LTZ (带本地时区) 作为事件时间属性时,Flink 会根据 table.local-time-zone 来解析时间戳并计算窗口边界。因此,切换时区会导致计算出的 window_start 和 window_end 的 UTC 值发生变化。例如,对于 Asia/Shanghai 时区,TUMBLE(ts_ltz, INTERVAL '1' DAY) 的窗口会从北京时间的 00:00 开始,这对应的是前一天的 16:00 UTC。

总结

TimeWindow 是 Flink 时间窗口计算的核心数据结构。它通过 [start, end) 毫秒时间戳清晰地定义了一个时间区间。它的 maxTimestamp() 方法与 Flink 的 Watermark 机制紧密结合,共同驱动着事件时间窗口的触发和生命周期管理。其内部精巧的 hashCode 实现和强大的 getWindowStartWithOffset 工具方法,分别在性能和功能(尤其是时区支持)上提供了坚实的保障。理解 TimeWindow 是深入理解 Flink 所有时间相关操作的基础。

GlobalWindow

与 TimeWindow 或 CountWindow 不同,它不将数据流切分成多个小窗口,而是将所有具有相同 Key 的元素都分配到同一个、唯一的、全局的窗口中。

GlobalWindow 的实现采用了单例模式,这完美地体现了其“全局唯一”的核心思想。

public class GlobalWindow extends Window {private static final GlobalWindow INSTANCE = new GlobalWindow();private GlobalWindow() {}public static GlobalWindow get() {return INSTANCE;}// ...
}
  • private static final GlobalWindow INSTANCE: 在类加载时就创建了唯一的实例。
  • private GlobalWindow(): 私有构造函数防止外部直接创建新实例。
  • public static GlobalWindow get(): 提供全局唯一的访问点。

这意味着在整个 Flink 作业的生命周期中,逻辑上只存在一个 GlobalWindow。所有被分配到全局窗口的数据,无论它们的 Key 是什么,它们所属的窗口对象都是同一个 GlobalWindow.INSTANCE

关键方法分析

GlobalWindow 的方法实现进一步强化了它的特性:

// ... existing code ...@Overridepublic long maxTimestamp() {return Long.MAX_VALUE;}@Overridepublic int hashCode() {return 0;}@Overridepublic boolean equals(Object o) {return this == o || !(o == null || getClass() != o.getClass());}
// ... existing code ...
  • maxTimestamp(): 返回 Long.MAX_VALUE。这个返回值至关重要,它向 Flink 的事件时间系统传递了一个明确的信号:“这个窗口永远不会基于 Watermark 过期”。因此,你不能依赖 Watermark 来触发全局窗口的计算或清理。正如文档中所说:

    使用 GlobalWindows 时,没有数据会被视作迟到,因为全局窗口的结束 timestamp 是 Long.MAX_VALUE。 来源: docs/content.zh/docs/dev/datastream/operators/windows.md:1185-1210

  • equals() 和 hashCode()hashCode 总是返回0,equals 判断是否为 GlobalWindow 类型。这确保了在任何情况下,GlobalWindow 实例都被视为同一个,这对于在状态后端中作为命名空间的一部分至关重要。

窗口分配器:GlobalWindows

与 GlobalWindow 配套使用的是 GlobalWindows 分配器。它的逻辑非常简单:

// ... existing code ...@Overridepublic Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {return Collections.singletonList(GlobalWindow.get());}
// ... existing code ...

无论什么元素、什么时间戳进来,assignWindows 方法总是返回同一个 GlobalWindow.INSTANCE 实例。

必须与触发器 (Trigger) 配合使用

这是理解和使用 GlobalWindow 最关键的一点。由于 GlobalWindow 没有天然的结束点,它自身永远不会触发计算。必须为它指定一个自定义的触发器(Trigger)来定义计算和清空的条件。

Flink 的 GlobalWindows 分配器默认使用 NeverTrigger

// ... existing code .../** A trigger that never fires, as default Trigger for GlobalWindows. */@Internalpublic static class NeverTrigger extends Trigger<Object, GlobalWindow> {// ... 所有 onElement, onEventTime, onProcessingTime 方法都只返回 CONTINUE}
// ... existing code ...

这意味着,如果你仅仅写了 .window(GlobalWindows.create()) 而不指定触发器,你的窗口计算将永远不会发生

文档中也明确指出了这一点:

这样的窗口模式仅在你指定了自定义的 trigger 时有用。否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。 来源: docs/content.zh/docs/dev/datastream/operators/windows.md:395-427

常见的触发器策略包括:

  • 基于数量: 使用 .trigger(CountTrigger.of(N)),每当窗口中有 N 个元素时触发一次。
  • 基于事件: 自定义一个 Trigger,当收到一个特定的“终止”事件时触发。
  • 流末端触发 (End of Stream): 对于有界流(批处理),可以使用 GlobalWindows.createWithEndOfStreamTrigger(),它会在数据流结束时触发一次计算。

在 Table API / SQL 中的角色

在执行计划(XML 和 JSON 文件)中,我们频繁看到 GlobalWindowAggregate。例如:

GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(slice_end=[$slice_end], size=[15 min])], ...) 来源: WindowJoinTest.xml

这里需要特别注意,执行计划中的 GlobalWindowAggregate 并不等同于 用户代码中使用的 GlobalWindow

在 Flink Table/SQL 的窗口聚合优化中,通常采用两阶段聚合(也叫 Local-Global Aggregation)来减少网络 shuffle 的数据量:

  1. LocalWindowAggregate: 在上游 TaskManager 中,对数据进行本地的、小范围的预聚合。
  2. Shuffle: 将预聚合的结果按 group by 的 key 进行 shuffle。
  3. GlobalWindowAggregate: 在下游 TaskManager 中,对来自不同上游节点的数据进行最终的合并和计算。

这里的 "Global" 指的是聚合的第二个阶段是“全局”的,汇集了所有相关数据,而不是指窗口类型是 GlobalWindow。从计划中可以看到,它处理的仍然是 TUMBLEHOP 等时间窗口。

总结

GlobalWindow 是一个强大但需要小心使用的工具。它的核心特点可以概括为:

  • 全局唯一: 所有数据(按 Key 分组后)都进入同一个窗口实例。
  • 单例实现: 设计上保证了全局只有一个 GlobalWindow 对象,非常高效。
  • 永不过期maxTimestamp 为 Long.MAX_VALUE,不受 Watermark 影响。
  • 依赖触发器: 其本身没有触发逻辑,必须配合自定义 Trigger 才能进行计算,否则将不会有任何输出。

它最适合的场景是需要自定义非常灵活的窗口触发逻辑,或者在有界流上进行全量聚合的场景。

http://www.dtcms.com/a/394739.html

相关文章:

  • 医疗数据互操作性与联邦学习的python编程方向研究(下)
  • 摄像头视频云存储与回放系统架构
  • C# 压缩解压文件的常用方法
  • .NET驾驭Word之力:打造专业文档 - 页面设置与打印控制完全指南
  • 为什么要创建音频地图?——探索Highcharts可视化的声音创新
  • Sass开发【四】
  • 从图片到实时摄像头:OpenCV EigenFace 人脸识别实战教程
  • kotlin 为什么要有协程作用域
  • MySQL二进制安装
  • 基于Java(SSH)+ Oracle 实现的(Web)视频教学平台
  • 西门子 S7-200 SMART PLC 结构化编程核心:子程序、中断程序与库概念详解
  • 树上LCA和树链剖分(未完待续)
  • 开发避坑指南(54):Mybatis plus查询指定的列
  • SQL注入可能用到的语句
  • 【论文阅读】GR00T N1:面向通用人形机器人的开放基础模型
  • 关于debian老系统安装软件失败的问题
  • ahooks:一套高质量、可靠的 React Hooks 库
  • 【一天一个Web3概念】Uniswap:去中心化金融(DeFi)的自动做市商革命
  • ROS2_YAML参数系统完整指南
  • day01电路基础
  • 贪心算法:以局部最优达成全局最优的艺术
  • Rancher学习
  • 华为认证HCIA备考:Vlan间通信,原理、三层交换机配置实验
  • 104、23种设计模式之访问者模式(13/23)
  • 什么是Mvcc
  • 如何在同一站点支持多版本的 reCAPTCHA 的兼容性方案
  • 管家预约字段修复说明
  • java面试day3 | 框架篇、Spring、SpringMVC、SpringBoot、MyBatis、注解、AOP、Bean
  • 【log4j2】log4j2插件挂载变更msg格式(工作实战,原理详解)
  • MVCC(多版本并发控制):InnoDB 高并发的核心技术