带你玩转 Flink TumblingWindow:从理论到代码的深度探索
0.前言
在深入探讨 TumblingWindow 之前,我们先来了解一下流处理或流计算中“窗口”的基本概念。在数据流中,源会持续不断地生成数据,因此计算最终值是不可行的。
在大多数用例中,为了获取有意义的信息,最好使用两种方法:
-
计算是针对一段时间内的有限集进行的(例如每分钟的 HTTP 401 错误)
-
计算以滚动更新的方式进行(例如记分牌、热门话题)“窗口”定义了无界流上的有限元素集合,我们可以对其应用计算。该集合可以基于时间、元素计数、计数和时间的组合,或者某些自定义逻辑将元素分配给窗口。比如:
-
每分钟(固定时间)收到的订单数量
-
完成最近 100 个订单的平均时间(固定元素) Flink 提供了三种类型的窗口 :(a) 滚动窗口 (b) 滑动窗口 (c) 会话窗口,本文将重点深入的介绍第一种。
1.滚动窗口 TumblingWindow
这个窗口简单易懂,易于上手。它是一个固定大小的窗口,其中“大小”可以是时间(30 秒、5 分钟)或计数(100 个元素)。
5 分钟的时间窗口将收集窗口中所有到达的元素,并在 5 分钟后进行计算。每 5 分钟将启动一个新窗口。100 的计数窗口将收集窗口中的 100 个元素,并在添加第 100 个元素时对窗口进行计算。 最重要的是,滚动窗口的窗口不会重叠,也不会出现重复元素,每个元素只会被分配到一个窗口。如果指定了key,Flink 会对流进行逻辑分区,并针对每个键对应的元素运行并行的窗口操作。 我们先来看一个非常简单的例子来更好地理解滚动窗口。一个简单的“IntegerGenerator”类充当源,每秒生成一个整数(从 1 开始)。以下代码初始化本地 Flink 环境并创建一个 DataStream 对象。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> intStream = env.addSource(new IntegerGenerator());
intStream.timeWindowAll(Time.second(5)).process(ProcessAllWindowFunction<Integer, Integer, TimeWindow>(){@Overridepublic void process( Context arg0, Iterable<Integer>input, Collector<Integer> output ) throws Exception{logger.info("Computing sum for ", input );int sum = 0;for(int i :input){sum += i;}output.collect(sum );}}).print();eny.execute();
其中:
-
第 3 行 - 定义一个 5 秒的滚动窗口(大小随时间固定)
-
第 4 行 - 使用 Flink 的 ProcessAllWindowFunction API 定义计算(业务逻辑)。这里我只是计算在给定窗口期间收集的所有整数的总和。注意:ProcessAllWindowFunction 会让 Flink 将窗口的所有元素缓冲到内存中,然后将整个元素传递给计算。这就是为什么你需要一个 Iterable<> 对象作为 process() 的输入参数 。
-
第 15 行 - 将此窗口的结果返回给 Flink,以便进行下一步,即在控制台上打印。比如说输出如下:
-
-
剖析一下这里的输出:
-
第 1 行 - 第 3 行 = 当前窗口关闭前生成了两个整数。请注意,尽管我们说的是五秒,但第一个窗口并没有运行五秒。原因是,默认情况下,Flink 会将时间四舍五入到最接近的时钟边界,在我们的例子中,该边界发生在“13:33:55”。这触发了 Flink TriggerWindow 关闭当前窗口并将其传递给下一步(Flink 的操作符)。
-
第 4 行 = 使用所有元素 [1, 2] 调用 process() 方法,并将总和 '3' 打印到控制台
-
第 5 行 - 第 10 行 = 启动新窗口并收集下一组整数。5 秒后,即“13:34:00”,窗口关闭。所有收集到的数据都会发送到进程,该进程打印接收到的整数,并计算此窗口中数字的总和 = '18'。
-
第 11 行 = 当前窗口总和被打印到控制台。
-
从第 12 行开始进一步应用类似的逻辑。
2.自定义滚动窗口的时间偏移
将上面的案例稍微调整一下,如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> intStream = env.addSource(new IntegerGenerator());
intStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))).process(ProcessAllWindowFunction<Integer, Integer, TimeWindow>(){@Overridepublic void process( Context arg0, Iterable<Integer>input, Collector<Integer> output ) throws Exception{logger.info("Computing sum for ", input );int sum = 0;for(int i :input){sum += i;}output.collect(sum );}}).print();eny.execute();
这里唯一的变化是第 2 行,现在不再使用 "timeWindowAll( Time.seconds( 5 ) )" ,而现在使用更详细的 "windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ), Time.seconds( 2 ) ) )"。 TimeWindowAll() 是一个包装方法,默认为 windowAll(TumblingProcessingTimeWindows.of(size)) ,即按时间固定大小的窗口(此时间是 Flink 作业正在运行的系统时间)。默认情况下,Flink 在时钟边界启动窗口,但使用 windowAll() 的第二个参数 我们可以自定义时钟边界。 下面显示了上述代码的示例运行:
第 1 行到第 5 行 = Flink 启动一个窗口,收集整数。然而,在 19:26:37,这个窗口闭包被触发,并在第 6 行打印出 [1,2,3,4] 的和。 注意: 如果没有提供偏移量,Flink 会在“19:26:35”关闭窗口。但由于偏移量为 2 秒,因此窗口会在时钟边界后额外结束 2 秒。
3.带有事件时间的 TumblingWindow
到目前为止,在我们的讨论中,我们将“时间”作为 Flink 执行作业时的默认系统时间。然而,在许多用例中,我们希望使用事件的实际时间,即事件在事件源处创建的时间。为了处理这种情况,Flink 支持三种处理“时间”的方式。 让我们看看事件时间以及如何在 Flink 中使用。 在事件时间中,元素根据元素本身的时间戳(而不是任何系统时钟)分组到窗口中。来看这样的一个例子:首先,我定义了一个简单的 POJO 类,名为“Element”,如下所示。我使用 lombok 通过注解生成了 getter/setter 方法。
@Setter @Getter
public class Element{Integer value;Long timestamp;public Element( int counter, long currTime ){this.value = counter;this.timestamp = currTime;}@Overridepublic String toString(){return"" + value;}
}
接下来,我定义一个名为 “ElementGeneratorSource” 的简单 Source 类,它将创建 Element 类型的对象并分配随机递增的时间戳。这是为了确保我不会生成与系统时间匹配的 Element。实际上,时间戳会作为事件本身的一部分出现。
class ElementGeneratorSource implements SourceFunction<Element>{volatile boolean isRunning = true;final Logger logger = LoggerFactory.getLogger(ElementGeneratorSource.class);@Overridepublic void run( SourceContext<Element> ctx ) throws Exception{int counter = 1; // 比flink程序开始时间晚20秒long eventStartTime = System.currentTimeMillis() - 20000;// 使用上述时间戳创建第一个事件Element element = new Element(counter++, eventStartTime);while( isRunning ){logger.info("Produced Element with value {} and timestamp {}", element.getValue(), printTime(element.getTimestamp()));ctx.collect( element );// create elements and assign timestamp with randomness so that they are not same as current system clock timeelement = new Element(counter++, element.getTimestamp() + ThreadLocalRandom.current().nextLong( 1000, 6000 ));Thread.sleep( 1000 ); }}@Overridepublic void cancel(){isRunning = false;}// 以可读格式打印纪元时间的辅助函数String printTime(long longValue){return LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneId.systemDefault()).toString();}
}
现在,写一个流程序,使用 TumblingEventTime 窗口来处理这些元素。
-
说明:我删除了类和方法的声明行,以便将注意力集中在重要的代码块上。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );elementStream.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>(){@Overridepublic long extractAscendingTimestamp( Element element ){return element.getTimestamp();}
})
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 10 ) ) )
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>(){@Overridepublic void process( Context arg0, Iterable<Element> input, Collector<Integer> output ) throws Exception{logger.info( "Computing sum for {}", input );int sum = 0;for(Element e : input) {sum += e.getValue();}output.collect( sum );}
})
.print();env.execute();
在这个例子中,我使用了一个非常方便的类“ AscendingTimestampExtractor ”,根据 Flink 文档,它的意思是 “一个用于时间戳单调递增的流的时间戳分配器和水印生成器。在这种情况下,流的本地watermark很容易生成,因为它们严格遵循时间戳。” 使用这个 Flink 提供的 API 的另一个好处是它会帮我生成watermark。watermark是让 Flink 知道何时关闭当前窗口(窗口的最后一个元素已到达)的一种方式。 简而言之, assignTimestampsAndWatermarks() 将允许 Flink 知道如何读取进入 Flink 的事件/元素的时间戳,最重要的是,如何计算水印。
在第 1、2、3 行生成三个元素,其时间戳与系统时钟不同。(先打印系统时钟时间,再打印日志级别)。 当第三个元素在“2020-02-22T22:22:02.495”处生成时,由于水印已被突破,它会触发当前窗口关闭。如果时间窗口为 10 秒,则此处的结束时间将是“2020-02-22T22:21:59.000”。因此,当前窗口仅收集前两个值。 在下一次运行中,窗口将在“2020-02-22T22:22:09.000”关闭,这意味着值 3 和值 4 将在新窗口中收集,因为第 7 行有一个带有时间戳 > = 当前水印的元素。 通过以上三个案例,我们详细的探究了 Flink 的 TumblingWindow的各种情况,帮助大家更加深入的去理解Flink里的窗口。