实时大数据计算中,windowDuration,slideDuration,trigger,watermark的关系
目录
- 1.开发环境
- 2.几句话先概括
- 3.例子说明
- 3.1.参数配置
- 3.2.窗口是如何产生的
- 3.3.Trigger触发机制
- 3.4.迟到的消息数据
最近做了个实时大数据分析的项目,发现很多东西都忘记了,属实没有好好整理笔记之过,趁眼下闲暇,做个回忆和记录。
1.开发环境
这次环境采用Java17+,Scala2.13,Spark的版本为4.0.0,且基于Kafka创建读取流。其它环境可参考以下maven pom。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.edata.bigdata</groupId><artifactId>edata</artifactId><packaging>pom</packaging><version>1.0</version><properties><java.version>17</java.version><spark.version>4.0.0</spark.version><hadoop.version>3.4.1</hadoop.version><flink.version>2.0.0</flink.version><scala.version>2.13</scala.version><zookeeper.version>3.9.4</zookeeper.version><maven.compiler.version>3.8.1</maven.compiler.version></properties><!--其它配置-->
</project>
该pom所在工程是我的私人仓库,不过没事,这篇blog仅记录windowDuration,slideDuration,trigger,watermark的底层运行逻辑,不涉及太多代码。
2.几句话先概括
- windowDuration 控制“聚合粒度”。
- slideDuration 控制“聚合频率”。
- watermark控制“迟到的数据还能否被计算”。
- trigger控制“何时处理一个微批”。
3.例子说明
实际上每一条消息都自带时间字段,该字段称为事件时间(Event Time),一般是在数据产生时就被打上的时间戳,与Spark,kafka无关,是Spark是否将其纳入窗口计算的依据。
假设连续的消息组成了时间序列为(每一秒产生一条数据):
事件时间(Event Time)00:00 00:01 00:02 ... 00:09 00:10 00:11 ... 00:19 00:20 ...
Spark处理消息时也会产生连续的处理时间(Processing Time),假设为:
数据到达 (Processing Time) → 09:00 09:01 09:02 ... 09:09 09:10 09:11 ... 09:19 09:20 ...
事件时间一开始可能是由其他字段名进行标识,例如以下代码。
Dataset<Row> windowed = data.withColumn("event_time", col("timestamp").cast("timestamp")).withColumn("data", col("value").cast("string")).select("data", "event_time");
原本的时间字段为timestamp,且为字符串,我转成了timestamp类型,且更改了字段名为event_time,真正的消息体原本是value字段,我改成了data,最终仅选择data和event_time进行后续的计算。
3.1.参数配置
假设在本例子中的各项重要参数的配置如下所示。
| 参数 | 值 | 说明 |
|---|---|---|
| windowDuration | 10 seconds | 每个窗口覆盖 10秒分钟事件时间 |
| slideDuration | 5 seconds | 每 5 秒产生一个新窗口(重叠) |
| trigger | ProcessingTime(“1 second”) | 每 1 秒 Spark 处理一次微批 |
| watermark | 20 seconds | 迟到 ≤20秒的数据仍可进入窗口 |
参数的设置方法如下所示
public Dataset<Row> applyWindowAndWatermark(Dataset<Row> data,String windowDuration,String slideDuration,String watermarkDelay) {//将timestamp列的字符串转成timestamp格式,并改名为“event_time”//将value列的数据解码,转成字符串,并改名为“data”//选择data,event_time两列,其他列丢掉Dataset<Row> windowed = data.withColumn("event_time", col("timestamp").cast("timestamp")).withColumn("data", col("value").cast("string")).select("data", "event_time");if (watermarkDelay != null && !watermarkDelay.isBlank()) {windowed = windowed.withWatermark("event_time", watermarkDelay);}Column windowCol = window(col("event_time"),windowDuration,slideDuration != null ? slideDuration : windowDuration);return windowed.withColumn("window", windowCol);
}
3.2.窗口是如何产生的
根据参数配置, Spark每5秒(slideDuration)产生一个10秒(windowDuration)。通过示意图来表达,大概如下所示
事件时间(Event Time) 00:00 ... 00:05 ... 00:10 ... 00:15 ... 00:20 ... 00:25 ...
数据到达(Processing Time) 09:00 ... 09:05 ... 09:10 ... 09:15 ... 09:20 ... 09:25 ...
处理窗口(Processing windows): [--------10s--------)----5s----[--------10s--------)----5s----[--------10s--------)----5s----[--------10s--------)
通过上图可以知道,生成的窗口区间分别为W1:[9:00,9:10),W2:[9:05,9:15),W3:[9:10,9:20),W4:[9:15,9:25),在这些窗口内,会获取对应的消息进行计算。窗口生成的过程也叫做窗口生成时间轴。
通过观察可以发现,窗口之间有重叠的部分,如果不希望窗口内计算的消息有所重叠,则将slideDuration和windowDuration设置为相等即可。
3.3.Trigger触发机制
Trigger的作用是多久“输出”一次计算结果,这个“输出”可能是多方面的,可能是输出到console,可能是写入文件,可能是写入数据库。假设是输出到console,且按照触发时间是3s来算,我们可以得到触发器的触发时间序列如下
事件时间(Event Time) :00:00 ... 00:05 ... 00:10 ... 00:15 ... 00:20 ... 00:25 ...
数据到达(Processing Time) :09:00 ... 09:05 ... 09:10 ... 09:15 ... 09:20 ... 09:25 ...
处理窗口(Processing windows) :[--------10s--------)| ----5s----[--------10s--------)| ----5s----[--------10s--------)| ----5s----[--------10s--------)
触发序列(Trigger Time) : 09:03 09:06 09:09 09:12 09:15 09:18 09:21......
从以上时间轴我们可以看到,触发器分别在09:03,09:06,09:09,09:12,09:15,09:18等时间点开始触发。在这些时间点,有以下结论
- 在09:03,输出W1的部分计算结果。
- 在09:06,输出W1,W2的部分计算结果。
- 在09:09,输出W1,W2的部分计算结果。
- 在09:12,输出W1的完整计算结果(关闭),输出W2,W3的部分计算结果
- 在09:15,输出W2的完整计算结果(关闭),输出W3,W4的部分计算结果
- 在09:18,输出W3,W4的部分计算结果
- 在09:21,输出W3的完整计算结果,输出W4的部分计算结果。
- … …
3.4.迟到的消息数据
在实时计算中,有时会遇到某些消息的事件时间与数据到达时间相差很远,此时可以通过设置watermask来决定是否让该数据回到自己的窗口进行计算。
假设允许迟到的时间为10s,并且某条迟到消息的事件时间为00:02,到达时间为09:15。则
(1)Spark先判断该消息的所属窗口W1:[00:00,00:10),数据尝试回到W1进行计算。
(2)由于09:02+10s=09:12,说明该消息最晚应该在09:12分到达,但实际上它09:15才到达,所以丢弃。
如果该条数据的事件时间为00:07,到达时间为09:15,则
(1)该条数据所属窗口为W1:[00:00,00:10)和W2:[00:05,00:15)。
(2)由于09:07+10s=09:17,说明该消息最晚可以在09:17分到达,09:15分仍在许可范围内。
