当前位置: 首页 > news >正文

带你玩转 Flink TumblingWindow:从理论到代码的深度探索

0.前言

在深入探讨 TumblingWindow 之前,我们先来了解一下流处理或流计算中“窗口”的基本概念。在数据流中,源会持续不断地生成数据,因此计算最终值是不可行的。
在大多数用例中,为了获取有意义的信息,最好使用两种方法:

  1. 计算是针对一段时间内的有限集进行的(例如每分钟的 HTTP 401 错误)

  2. 计算以滚动更新的方式进行(例如记分牌、热门话题)“窗口”定义了无界流上的有限元素集合,我们可以对其应用计算。该集合可以基于时间、元素计数、计数和时间的组合,或者某些自定义逻辑将元素分配给窗口。比如:

  3. 每分钟(固定时间)收到的订单数量

  4. 完成最近 100 个订单的平均时间(固定元素) Flink 提供了三种类型的窗口 :(a) 滚动窗口 (b) 滑动窗口 (c) 会话窗口,本文将重点深入的介绍第一种。

1.滚动窗口 TumblingWindow

这个窗口简单易懂,易于上手。它是一个固定大小的窗口,其中“大小”可以是时间(30 秒、5 分钟)或计数(100 个元素)。

图片

5 分钟的时间窗口将收集窗口中所有到达的元素,并在 5 分钟后进行计算。每 5 分钟将启动一个新窗口。100 的计数窗口将收集窗口中的 100 个元素,并在添加第 100 个元素时对窗口进行计算。 最重要的是,滚动窗口的窗口不会重叠,也不会出现重复元素,每个元素只会被分配到一个窗口。如果指定了key,Flink 会对流进行逻辑分区,并针对每个键对应的元素运行并行的窗口操作。 我们先来看一个非常简单的例子来更好地理解滚动窗口。一个简单的“IntegerGenerator”类充当源,每秒生成一个整数(从 1 开始)。以下代码初始化本地 Flink 环境并创建一个 DataStream 对象。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> intStream = env.addSource(new IntegerGenerator());
intStream.timeWindowAll(Time.second(5)).process(ProcessAllWindowFunction<Integer, Integer, TimeWindow>(){@Overridepublic void process( Context arg0, Iterable<Integer>input, Collector<Integer> output ) throws Exception{logger.info("Computing sum for ", input );int sum = 0;for(int i :input){sum += i;}output.collect(sum );}}).print();eny.execute();

其中:

  • 第 3 行 - 定义一个 5 秒的滚动窗口(大小随时间固定)

  • 第 4 行 - 使用 Flink 的 ProcessAllWindowFunction API 定义计算(业务逻辑)。这里我只是计算在给定窗口期间收集的所有整数的总和。注意:ProcessAllWindowFunction   会让 Flink 将窗口的所有元素缓冲到内存中,然后将整个元素传递给计算。这就是为什么你需要一个 Iterable<>   对象作为 process() 的输入参数   。

  • 第 15 行 - 将此窗口的结果返回给 Flink,以便进行下一步,即在控制台上打印。比如说输出如下:

  • 图片

  • 剖析一下这里的输出:

  1. 第 1 行 - 第 3 行 = 当前窗口关闭前生成了两个整数。请注意,尽管我们说的是五秒,但第一个窗口并没有运行五秒。原因是,默认情况下,Flink 会将时间四舍五入到最接近的时钟边界,在我们的例子中,该边界发生在“13:33:55”。这触发了 Flink TriggerWindow 关闭当前窗口并将其传递给下一步(Flink 的操作符)。

  2. 第 4 行 = 使用所有元素 [1, 2] 调用 process() 方法,并将总和 '3' 打印到控制台

  3. 第 5 行 - 第 10 行 = 启动新窗口并收集下一组整数。5 秒后,即“13:34:00”,窗口关闭。所有收集到的数据都会发送到进程,该进程打印接收到的整数,并计算此窗口中数字的总和 = '18'。

  4. 第 11 行 = 当前窗口总和被打印到控制台。

  5. 从第 12 行开始进一步应用类似的逻辑。

2.自定义滚动窗口的时间偏移

将上面的案例稍微调整一下,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> intStream = env.addSource(new IntegerGenerator());
intStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))).process(ProcessAllWindowFunction<Integer, Integer, TimeWindow>(){@Overridepublic void process( Context arg0, Iterable<Integer>input, Collector<Integer> output ) throws Exception{logger.info("Computing sum for ", input );int sum = 0;for(int i :input){sum += i;}output.collect(sum );}}).print();eny.execute();

这里唯一的变化是第 2 行,现在不再使用 "timeWindowAll( Time.seconds( 5 ) )" ,而现在使用更详细的 "windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ), Time.seconds( 2 ) ) )"。 TimeWindowAll() 是一个包装方法,默认为 windowAll(TumblingProcessingTimeWindows.of(size)) ,即按时间固定大小的窗口(此时间是 Flink 作业正在运行的系统时间)。默认情况下,Flink 在时钟边界启动窗口,但使用 windowAll() 的第二个参数   我们可以自定义时钟边界。 下面显示了上述代码的示例运行:

图片

第 1 行到第 5 行 = Flink 启动一个窗口,收集整数。然而,在 19:26:37,这个窗口闭包被触发,并在第 6 行打印出 [1,2,3,4] 的和。 注意: 如果没有提供偏移量,Flink 会在“19:26:35”关闭窗口。但由于偏移量为 2 秒,因此窗口会在时钟边界后额外结束 2 秒。

3.带有事件时间的 TumblingWindow

到目前为止,在我们的讨论中,我们将“时间”作为 Flink 执行作业时的默认系统时间。然而,在许多用例中,我们希望使用事件的实际时间,即事件在事件源处创建的时间。为了处理这种情况,Flink 支持三种处理“时间”的方式。 让我们看看事件时间以及如何在 Flink 中使用。 在事件时间中,元素根据元素本身的时间戳(而不是任何系统时钟)分组到窗口中。来看这样的一个例子:首先,我定义了一个简单的 POJO 类,名为“Element”,如下所示。我使用 lombok 通过注解生成了 getter/setter 方法。

@Setter @Getter
public class Element{Integer value;Long timestamp;public Element( int counter, long currTime ){this.value = counter;this.timestamp = currTime;}@Overridepublic String toString(){return"" + value;}
}

接下来,我定义一个名为 “ElementGeneratorSource” 的简单 Source 类,它将创建 Element 类型的对象并分配随机递增的时间戳。这是为了确保我不会生成与系统时间匹配的 Element。实际上,时间戳会作为事件本身的一部分出现。

class ElementGeneratorSource implements SourceFunction<Element>{volatile boolean isRunning = true;final Logger logger = LoggerFactory.getLogger(ElementGeneratorSource.class);@Overridepublic void run( SourceContext<Element> ctx ) throws Exception{int counter = 1;   // 比flink程序开始时间晚20秒long eventStartTime = System.currentTimeMillis() - 20000;// 使用上述时间戳创建第一个事件Element element = new Element(counter++, eventStartTime);while( isRunning ){logger.info("Produced Element with value {} and timestamp {}", element.getValue(), printTime(element.getTimestamp()));ctx.collect( element );// create elements and assign timestamp with randomness so that they are not same as current system clock timeelement = new Element(counter++, element.getTimestamp() + ThreadLocalRandom.current().nextLong( 1000, 6000 ));Thread.sleep( 1000 );     }}@Overridepublic void cancel(){isRunning = false;}// 以可读格式打印纪元时间的辅助函数String printTime(long longValue){return LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneId.systemDefault()).toString();}
}

现在,写一个流程序,使用 TumblingEventTime 窗口来处理这些元素。

  • 说明:我删除了类和方法的声明行,以便将注意力集中在重要的代码块上。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );elementStream.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>(){@Overridepublic long extractAscendingTimestamp( Element element ){return element.getTimestamp();}
})
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 10 ) ) )       
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>(){@Overridepublic void process( Context arg0, Iterable<Element> input, Collector<Integer> output ) throws Exception{logger.info( "Computing sum for {}", input );int sum = 0;for(Element e : input) {sum += e.getValue();}output.collect( sum );}
})
.print();env.execute();

在这个例子中,我使用了一个非常方便的类“ AscendingTimestampExtractor ”,根据 Flink 文档,它的意思是 “一个用于时间戳单调递增的流的时间戳分配器和水印生成器。在这种情况下,流的本地watermark很容易生成,因为它们严格遵循时间戳。” 使用这个 Flink 提供的 API 的另一个好处是它会帮我生成watermark。watermark是让 Flink 知道何时关闭当前窗口(窗口的最后一个元素已到达)的一种方式。 简而言之, assignTimestampsAndWatermarks() 将允许 Flink 知道如何读取进入 Flink 的事件/元素的时间戳,最重要的是,如何计算水印。

图片

在第 1、2、3 行生成三个元素,其时间戳与系统时钟不同。(先打印系统时钟时间,再打印日志级别)。 当第三个元素在“2020-02-22T22:22:02.495”处生成时,由于水印已被突破,它会触发当前窗口关闭。如果时间窗口为 10 秒,则此处的结束时间将是“2020-02-22T22:21:59.000”。因此,当前窗口仅收集前两个值。 在下一次运行中,窗口将在“2020-02-22T22:22:09.000”关闭,这意味着值 3 和值 4 将在新窗口中收集,因为第 7 行有一个带有时间戳 > = 当前水印的元素。 通过以上三个案例,我们详细的探究了 Flink 的 TumblingWindow的各种情况,帮助大家更加深入的去理解Flink里的窗口。

相关文章:

  • DMC-1410/1411/1417USER MANUAL 手侧
  • 视频编解码学习8之视频历史
  • 艾体宝方案丨深度解析生成式 AI 安全风险,Lepide 为数据安全护航
  • 垃圾回收的三色标记算法
  • Petalinux开发Linux
  • 最新CDGP单选题(第四章)补充
  • fastjson2 json.tojsonstring 会自动忽略过滤掉 key: null的数据
  • Linux Shell编程之条件语句
  • SGLang 实战介绍 (张量并行 / Qwen3 30B MoE 架构部署)
  • 红黑树详解初版
  • 公链钱包开发:技术逻辑与产品设计实践
  • Asp.Net Core IIS发布后PUT、DELETE请求错误405
  • 飞云分仓操盘副图指标操作技术图文分解
  • Ceph PG unfound/lost 问题排查与解决
  • 101alpah_第5个alpha学习
  • 电子电气架构 --- 如何有助于提安全性并减少事故
  • Spark缓存--cache方法
  • 力扣:多数元素
  • 【C/C++】RPC与线程间通信:高效设计的关键选择
  • 香港国际交易节奏解析:结构性波动背后的信号逻辑
  • 最快3天开通一条定制公交线路!上海推出服务平台更快响应市民需求
  • 【社论】以法治力量促进民企长远健康发展
  • 陕西澄城打造“中国樱桃第一县”:从黄土高原走向海外,年产值超30亿
  • 经济日报:降准降息,提前还房贷划算吗?
  • 轿车追尾半挂车致3死1伤,事故调查报告:司机过分依赖巡航系统
  • 中国证监会:帮助受关税政策影响较大的上市公司纾困解难