Flink20 SQL 窗口函数概述
Flink SQL 窗口函数概述(Flink 1.20)
1. 窗口函数简介
在流处理中,数据是连续不断的,无法像批处理那样等待所有数据到达后再进行处理。窗口函数(Window Functions)是 Flink SQL 中用于将无限的数据流划分为有限的时间窗口,以便进行聚合和分析的核心工具。
本文档基于 Flink 1.20 版本,主要关注滚动窗口(TUMBLE)的事件时间和处理时间使用场景。
1.1 为什么需要窗口函数?
- 无限数据流:流数据是连续不断的,无法等待所有数据到达
- 时间范围聚合:需要按时间范围进行统计(如每分钟的访问量)
- 实时分析:需要实时计算移动平均值、累计值等指标
2. 窗口类型
Flink SQL 支持三种主要的窗口类型:
2.1 滚动窗口(TUMBLE)
- 特点:窗口之间不重叠,每个窗口的长度相同
- 语法:
TUMBLE(time_attr, size_interval) - 示例:每 5 分钟统计一次,窗口大小为 5 分钟
窗口1: [10:00:00, 10:05:00)
窗口2: [10:05:00, 10:10:00)
窗口3: [10:10:00, 10:15:00)
2.2 滑动窗口(HOP)
- 特点:窗口之间可以重叠,通过滑动步长控制窗口移动
- 语法:
HOP(time_attr, slide_interval, size_interval) - 示例:每 30 秒统计一次过去 1 分钟的数据
窗口1: [10:00:00, 10:01:00)
窗口2: [10:00:30, 10:01:30)
窗口3: [10:01:00, 10:02:00)
窗口4: [10:01:30, 10:02:30)
滑动窗口的行为:
slide_interval < size_interval:窗口重叠slide_interval = size_interval:等同于滚动窗口slide_interval > size_interval:窗口之间有间隙(不常用)
注意:本文档主要关注滚动窗口(TUMBLE),滚动窗口是滑动窗口的特殊情况(滑动步长等于窗口大小)。滚动窗口是 Flink SQL 中最常用的窗口类型,适用于大多数时间窗口聚合场景。
2.3 会话窗口(SESSION)
- 特点:基于活动间隔动态定义窗口,当数据间隔超过指定时间时创建新窗口
- 语法:
SESSION(time_attr, gap_interval) - 示例:如果数据间隔超过 5 分钟,则创建新窗口
3. 时间属性
Flink SQL 支持两种时间属性:
3.1 事件时间(Event Time)
- 定义:事件实际发生的时间,通常由事件本身携带的时间戳表示
- 特点:
- 需要定义水印(Watermark)来处理乱序和延迟数据
- 结果具有确定性,不受数据到达时间影响
- 适合需要准确时间语义的场景
- 定义方式:
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
3.2 处理时间(Processing Time)
- 定义:事件被 Flink 处理时的系统时间
- 特点:
- 不需要定义水印
- 结果可能受数据到达时间影响,每次运行可能不同
- 延迟低,适合对时间精度要求不高的场景
- 定义方式:
proctime AS PROCTIME()
3.3 事件时间 vs 处理时间对比
| 特性 | 事件时间 | 处理时间 |
|---|---|---|
| 时间来源 | 事件本身的时间戳 | 系统当前时间 |
| 水印需求 | 需要 | 不需要 |
| 结果确定性 | 确定 | 不确定 |
| 延迟处理 | 支持 | 不支持 |
| 性能开销 | 较高 | 较低 |
| 适用场景 | 需要准确时间语义 | 对时间精度要求不高 |
4. 窗口函数语法
4.1 滚动窗口(TUMBLE)语法
TUMBLE(time_attr, size_interval)
参数说明:
time_attr:时间属性字段,必须是流中的一个合法时间属性字段size_interval:窗口大小,格式为INTERVAL 'num' timeUnit
时间单位:
SECOND、MINUTE、HOUR、DAY- 例如:
INTERVAL '30' SECOND、INTERVAL '1' MINUTE、INTERVAL '1' HOUR
4.2 窗口辅助函数
TUMBLE_START
返回窗口的起始时间(包含边界)
TUMBLE_START(time_attr, size_interval)
TUMBLE_END
返回窗口的结束时间(不包含边界)
TUMBLE_END(time_attr, size_interval)
TUMBLE_ROWTIME
返回窗口的结束时间(不包含边界),返回值是一个 rowtime 属性,可用于基于事件时间的操作
TUMBLE_ROWTIME(time_attr, size_interval)
TUMBLE_PROCTIME
返回窗口的结束时间(不包含边界),返回值是一个 proctime 属性,可用于基于处理时间的操作
TUMBLE_PROCTIME(time_attr, size_interval)
5. 窗口聚合查询基本结构
SELECT[窗口标识函数],[分组字段],[聚合函数]
FROM table_name
GROUP BY[分组字段],[窗口函数]
示例:
SELECTTUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks
GROUP BYuser_id,TUMBLE(event_time, INTERVAL '1' MINUTE)
6. 窗口函数使用场景
6.1 滚动窗口适用场景
- 固定时间段的统计:每小时统计一次,每 5 分钟统计一次
- 不重叠的聚合:每个时间段独立统计
- 周期性报表:每天、每小时、每分钟的统计报表
- 简单的时间窗口聚合:不需要窗口重叠的场景
6.2 滑动窗口适用场景
- 移动平均值:每 10 秒更新一次过去 5 分钟的平均值
- 实时监控:每 30 秒统计一次过去 1 分钟的指标
- 频繁更新的聚合:需要更频繁地输出结果
6.3 会话窗口适用场景
- 用户会话分析:分析用户的活动会话
- 动态窗口大小:窗口大小根据数据间隔动态调整
7. 注意事项
- 窗口大小:滚动窗口大小应该根据业务需求合理设置
- 水印策略:使用事件时间时,需要根据数据延迟情况设置合适的水印
- 性能考虑:滚动窗口计算开销相对较小,适合大多数场景
- 结果输出:窗口结果在窗口结束时输出(事件时间)或定期输出(处理时间)
- 窗口对齐:滚动窗口会自动对齐到时间边界(如整点、整分钟)
8. 参考资源
- Flink 1.20 窗口函数官方文档
- Flink 1.20 时间属性文档
事件时间滚动窗口详细说明(Flink 1.20)
1. 事件时间概述
事件时间(Event Time)是指事件实际发生的时间,通常由事件本身携带的时间戳表示。使用事件时间进行窗口操作可以确保结果的准确性,不受数据到达时间的影响。
1.1 事件时间的特点
- 准确性:基于事件实际发生的时间,结果具有确定性
- 乱序处理:通过水印(Watermark)机制处理乱序和延迟数据
- 延迟容忍:可以处理一定时间范围内的延迟数据
- 适用场景:需要准确反映事件发生时间的业务场景,如实时监控、计费、数据分析等
1.2 水印(Watermark)机制
水印是 Flink 用于处理乱序数据的重要机制:
- 定义:水印表示"小于该时间戳的数据应该都已经到达"
- 作用:触发窗口计算,处理延迟数据
- 设置:通常设置为事件时间减去一个固定的延迟时间
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
这表示允许 5 秒的延迟,即如果当前水印是 10:00:10,那么 10:00:05 之前的数据应该都已经到达。
2. 滚动窗口(TUMBLE)语法
2.1 基本语法
TUMBLE(event_time_attr, size_interval)
event_time_attr:事件时间属性字段size_interval:窗口大小,如INTERVAL '1' MINUTE、INTERVAL '10' SECOND等
2.2 窗口辅助函数
-- 窗口开始时间(包含边界)
TUMBLE_START(event_time_attr, size_interval)-- 窗口结束时间(不包含边界)
TUMBLE_END(event_time_attr, size_interval)-- 窗口结束时间(不包含边界,rowtime 属性,用于下游操作)
TUMBLE_ROWTIME(event_time_attr, size_interval)
3. 完整示例:用户点击统计(使用 Kafka + Flink SQL Client)
3.1 环境准备
3.1.1 启动 Kafka
确保 Kafka 已安装并运行:
# 启动 Zookeeper(如果使用旧版本 Kafka)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
3.1.2 创建 Kafka 主题
# 创建主题 user_clicks,用于用户点击事件
kafka-topics.sh --create \--topic user_clicks \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 验证主题创建成功
kafka-topics.sh --list --bootstrap-server localhost:9092
3.1.3 启动 Flink 集群
# 启动 Flink Standalone 集群
./bin/start-cluster.sh# 启动 Flink SQL Client
./bin/sql-client.sh
3.2 在 Flink SQL Client 中创建表
3.2.1 创建 Kafka 源表(事件时间)
在 Flink SQL Client 中执行以下 SQL:
-- 创建用户点击事件表(事件时间)
CREATE TABLE user_clicks (user_id STRING,click_time TIMESTAMP(3),url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_clicks','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
参数说明:
connector = 'kafka':使用 Kafka 连接器topic = 'user_clicks':Kafka 主题名称properties.bootstrap.servers:Kafka Broker 地址format = 'json':数据格式为 JSONWATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND:定义水印,允许 5 秒延迟scan.startup.mode = 'earliest-offset':从最早的数据开始读取
3.2.2 创建结果输出表(可选,用于观察结果)
-- 创建打印表,用于观察窗口计算结果
CREATE TABLE window_results (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
3.3 定义滚动窗口查询
3.3.1 基本滚动窗口查询
计算每 1 分钟内每个用户的点击次数:
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
3.3.2 将结果写入打印表(用于观察)
-- 将窗口计算结果写入打印表
INSERT INTO window_results
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
3.4 通过 Kafka 发送测试数据
3.4.1 启动 Kafka 生产者
打开新的终端窗口,执行:
kafka-console-producer.sh \--topic user_clicks \--bootstrap-server localhost:9092
3.4.2 发送测试数据
在生产者终端中,逐行输入以下 JSON 数据(注意时间戳的顺序和间隔):
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:00:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:00:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:01:05","url":"/about"}
{"user_id":"user2","click_time":"2025-11-07 10:01:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:01:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:01:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:02:05","url":"/about"}
重要提示:
- 每行一个完整的 JSON 对象
- 时间格式:
YYYY-MM-DD HH:mm:ss - 时间戳按顺序递增,便于观察窗口分组
3.4.3 发送乱序数据(测试水印机制)
为了测试水印和乱序数据处理,可以发送一些延迟到达的数据:
{"user_id":"user1","click_time":"2025-11-07 10:00:15","url":"/delayed"}
{"user_id":"user2","click_time":"2025-11-07 10:00:25","url":"/delayed"}
这些数据的时间戳在之前发送的数据之间,用于测试水印机制如何处理乱序数据。
3.5 观察窗口输出结果
3.5.1 在 Flink SQL Client 中观察
执行查询后,Flink SQL Client 会实时显示窗口计算结果。输出示例:
+----+-------------------------+-------------------------+--------+-------------+
| op | window_start | window_end | user_id| click_count |
+----+-------------------------+-------------------------+--------+-------------+
| +I | 2025-11-07 10:00:00.000| 2025-11-07 10:01:00.000| user1 | 3 |
| +I | 2025-11-07 10:00:00.000| 2025-11-07 10:01:00.000| user2 | 2 |
| +I | 2025-11-07 10:01:00.000| 2025-11-07 10:02:00.000| user1 | 2 |
| +I | 2025-11-07 10:01:00.000| 2025-11-07 10:02:00.000| user2 | 2 |
| +I | 2025-11-07 10:02:00.000| 2025-11-07 10:03:00.000| user1 | 1 |
+----+-------------------------+-------------------------+--------+-------------+
输出说明:
op列:+I表示插入(Insert),-U表示更新前值,+U表示更新后值window_start:窗口开始时间(包含边界)window_end:窗口结束时间(不包含边界)user_id:用户IDclick_count:该窗口内该用户的点击次数
3.5.2 在 Flink Web UI 中观察
- 访问 Flink Web UI:
http://localhost:8081 - 查看运行中的作业
- 点击作业名称,查看详细信息和指标
- 在 “Metrics” 标签页中查看窗口相关的指标
3.6 完整示例:订单金额统计
3.6.1 创建订单主题
kafka-topics.sh --create \--topic orders \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
3.6.2 创建订单表
CREATE TABLE orders (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
3.6.3 计算每 5 分钟内的订单总金额
SELECTTUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
3.6.4 发送订单数据
kafka-console-producer.sh \--topic orders \--bootstrap-server localhost:9092
输入数据:
{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:00:05"}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99,"order_time":"2025-11-07 10:02:10"}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99,"order_time":"2025-11-07 10:03:20"}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:04:30"}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99,"order_time":"2025-11-07 10:06:45"}
4. 窗口触发机制
4.1 水印触发
滚动窗口的触发依赖于水印:
- 水印推进:当新数据到达时,系统会更新水印
- 窗口触发:当水印超过窗口结束时间时,窗口会被触发计算
- 延迟数据:在水印允许的延迟范围内到达的数据仍会被包含在窗口中
4.2 示例说明
假设窗口大小为 1 分钟,水印延迟为 5 秒:
- 窗口
[10:00:00, 10:01:00)会在水印达到10:01:00时触发 - 如果水印是
10:01:05,那么10:00:55之前的数据都应该已经到达 - 如果
10:00:50的数据在水印10:01:05之后到达,它可能不会被包含在窗口中(取决于具体的水印策略)
5. 最佳实践
5.1 水印延迟设置
- 太小:可能导致延迟数据被丢弃,结果不准确
- 太大:窗口触发延迟,实时性差
- 建议:根据业务场景和数据延迟情况设置,通常设置为最大延迟时间的 1.5-2 倍
5.2 窗口大小选择
- 太小:窗口数量多,计算开销大
- 太大:结果延迟高,实时性差
- 建议:根据业务需求和数据特征选择,常见的有 1 分钟、5 分钟、1 小时等
5.3 时间格式
- 确保 JSON 数据中的时间格式与 SQL 中定义的时间格式一致
- 推荐使用
TIMESTAMP(3)精度(毫秒级) - 时间格式:
YYYY-MM-DD HH:mm:ss或YYYY-MM-DDTHH:mm:ss
5.4 错误处理
-- 忽略 JSON 解析错误
'json.ignore-parse-errors' = 'true'-- 或者使用更严格的错误处理
'json.fail-on-missing-field' = 'false'
6. 常见问题
6.1 窗口没有输出
可能原因:
- 水印没有推进(数据时间戳没有递增)
- 窗口大小设置过大
- 数据格式错误
解决方法:
- 检查数据时间戳是否递增
- 减小窗口大小进行测试
- 检查 JSON 格式是否正确
6.2 延迟数据丢失
可能原因:
- 水印延迟设置过小
- 数据延迟超过水印允许范围
解决方法:
- 增加水印延迟时间
- 检查数据源是否有延迟
6.3 时间格式错误
错误信息:Cannot parse timestamp
解决方法:
- 确保 JSON 中的时间格式与 SQL 定义一致
- 使用
TIMESTAMP(3)并确保格式为YYYY-MM-DD HH:mm:ss
7. 调试技巧
7.1 查看原始数据
-- 查看原始数据流
SELECT * FROM user_clicks;
7.2 查看水印
-- 查看当前水印(需要自定义函数或查看 Flink Web UI)
SELECT CURRENT_WATERMARK(click_time) FROM user_clicks;
7.3 使用 Flink Web UI
- 访问
http://localhost:8081 - 查看作业的 Metrics
- 查看窗口相关的指标,如窗口延迟、水印延迟等
8. 参考资源
- Flink 1.20 窗口函数官方文档
- Flink 1.20 Kafka 连接器文档
- Flink 1.20 时间属性文档
处理时间滚动窗口详细说明(Flink 1.20)
1. 处理时间概述
处理时间(Processing Time)是指数据被 Flink 处理时的系统时间。使用处理时间进行窗口操作时,系统不考虑数据的实际发生时间,窗口的触发完全依赖于系统处理数据的时间。
1.1 处理时间的特点
- 简单性:不需要定义水印,实现简单
- 实时性:窗口触发快,延迟低
- 不确定性:结果依赖于数据到达时间,不具有确定性
- 适用场景:对时间顺序要求不严格,关注快速响应的场景,如实时监控、日志分析等
1.2 处理时间 vs 事件时间
| 特性 | 处理时间 | 事件时间 |
|---|---|---|
| 时间来源 | 系统处理时间 | 事件本身携带的时间 |
| 水印 | 不需要 | 需要 |
| 乱序处理 | 不考虑 | 通过水印处理 |
| 结果确定性 | 不确定 | 确定 |
| 实现复杂度 | 简单 | 复杂 |
| 适用场景 | 快速响应、对时间不敏感 | 准确时间、对时间敏感 |
2. 滚动窗口(TUMBLE)语法
2.1 基本语法
TUMBLE(PROCTIME(), size_interval)
PROCTIME():处理时间属性函数size_interval:窗口大小,如INTERVAL '1' MINUTE、INTERVAL '10' SECOND等
2.2 窗口辅助函数
-- 窗口开始时间(包含边界)
TUMBLE_START(PROCTIME(), size_interval)-- 窗口结束时间(不包含边界)
TUMBLE_END(PROCTIME(), size_interval)-- 窗口结束时间(不包含边界,proctime 属性,用于下游操作)
TUMBLE_PROCTIME(PROCTIME(), size_interval)
3. 完整示例:用户点击统计(使用 Kafka + Flink SQL Client)
3.1 环境准备
3.1.1 启动 Kafka
确保 Kafka 已安装并运行:
# 启动 Zookeeper(如果使用旧版本 Kafka)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
3.1.2 创建 Kafka 主题
# 创建主题 user_clicks_proc,用于用户点击事件(处理时间)
kafka-topics.sh --create \--topic user_clicks_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 验证主题创建成功
kafka-topics.sh --list --bootstrap-server localhost:9092
3.1.3 启动 Flink 集群
# 启动 Flink Standalone 集群
./bin/start-cluster.sh# 启动 Flink SQL Client
./bin/sql-client.sh
3.2 在 Flink SQL Client 中创建表
3.2.1 创建 Kafka 源表(处理时间)
在 Flink SQL Client 中执行以下 SQL:
-- 创建用户点击事件表(处理时间)
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
参数说明:
connector = 'kafka':使用 Kafka 连接器topic = 'user_clicks_proc':Kafka 主题名称properties.bootstrap.servers:Kafka Broker 地址format = 'json':数据格式为 JSONproc_time AS PROCTIME():定义处理时间属性,系统自动生成- 注意:处理时间不需要时间字段,也不需要水印
3.2.2 创建结果输出表(可选,用于观察结果)
-- 创建打印表,用于观察窗口计算结果
CREATE TABLE window_results_proc (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
3.3 定义滚动窗口查询
3.3.1 基本滚动窗口查询
计算每 1 分钟内每个用户的点击次数(基于处理时间):
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
3.3.2 将结果写入打印表(用于观察)
-- 将窗口计算结果写入打印表
INSERT INTO window_results_proc
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
3.4 通过 Kafka 发送测试数据
3.4.1 启动 Kafka 生产者
打开新的终端窗口,执行:
kafka-console-producer.sh \--topic user_clicks_proc \--bootstrap-server localhost:9092
3.4.2 发送测试数据
在生产者终端中,逐行输入以下 JSON 数据(注意:处理时间不需要时间字段):
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
{"user_id":"user2","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
重要提示:
- 每行一个完整的 JSON 对象
- 不需要包含时间字段,处理时间由系统自动生成
- 可以快速连续发送多条数据,观察窗口如何按处理时间分组
3.4.3 分批发送数据(观察窗口触发)
为了更清楚地观察窗口触发机制,可以分批发送数据:
第一批(快速发送,应该在同一窗口内):
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
等待 1 分钟后,发送第二批(应该在新窗口内):
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
3.5 观察窗口输出结果
3.5.1 在 Flink SQL Client 中观察
执行查询后,Flink SQL Client 会实时显示窗口计算结果。输出示例:
+----+-------------------------+-------------------------+--------+-------------+
| op | window_start | window_end | user_id| click_count |
+----+-------------------------+-------------------------+--------+-------------+
| +I | 2025-11-07 14:30:00.000| 2025-11-07 14:31:00.000| user1 | 3 |
| +I | 2025-11-07 14:30:00.000| 2025-11-07 14:31:00.000| user2 | 2 |
| +I | 2025-11-07 14:31:00.000| 2025-11-07 14:32:00.000| user1 | 2 |
| +I | 2025-11-07 14:31:00.000| 2025-11-07 14:32:00.000| user2 | 1 |
+----+-------------------------+-------------------------+--------+-------------+
输出说明:
op列:+I表示插入(Insert)window_start:窗口开始时间(系统处理时间,包含边界)window_end:窗口结束时间(系统处理时间,不包含边界)user_id:用户IDclick_count:该窗口内该用户的点击次数- 注意:窗口时间是基于系统处理时间,不是数据中的时间
3.5.2 在 Flink Web UI 中观察
- 访问 Flink Web UI:
http://localhost:8081 - 查看运行中的作业
- 点击作业名称,查看详细信息和指标
- 在 “Metrics” 标签页中查看窗口相关的指标
3.6 完整示例:订单金额统计
3.6.1 创建订单主题
kafka-topics.sh --create \--topic orders_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
3.6.2 创建订单表
CREATE TABLE orders_proc (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
3.6.3 计算每 5 分钟内的订单总金额
SELECTTUMBLE_START(proc_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_proc
GROUP BY TUMBLE(proc_time, INTERVAL '5' MINUTE);
3.6.4 发送订单数据
kafka-console-producer.sh \--topic orders_proc \--bootstrap-server localhost:9092
输入数据(不需要时间字段):
{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99}
4. 窗口触发机制
4.1 基于系统时间触发
处理时间滚动窗口的触发完全依赖于系统时间:
- 系统时间推进:窗口根据系统当前时间进行分组
- 窗口触发:当系统时间超过窗口结束时间时,窗口会被触发计算
- 实时性:窗口触发快,不需要等待延迟数据
4.2 示例说明
假设窗口大小为 1 分钟,当前系统时间是 14:30:15:
- 所有在
14:30:00到14:30:15之间处理的数据会被分配到窗口[14:30:00, 14:31:00) - 当系统时间达到
14:31:00时,窗口[14:30:00, 14:31:00)会被触发 - 之后处理的数据会被分配到新窗口
[14:31:00, 14:32:00)
4.3 与事件时间的区别
- 事件时间:窗口基于数据中的时间戳,需要等待延迟数据,结果确定
- 处理时间:窗口基于系统时间,立即触发,结果不确定(依赖于数据到达时间)
5. 最佳实践
5.1 适用场景
处理时间窗口适用于:
- 实时监控:需要快速响应的监控场景
- 日志分析:对时间顺序要求不严格的日志分析
- 快速统计:不需要准确时间语义的统计场景
- 低延迟要求:对延迟敏感,需要快速输出的场景
5.2 窗口大小选择
- 太小:窗口数量多,计算开销大,但实时性好
- 太大:结果延迟高,但计算开销小
- 建议:根据业务需求选择,常见的有 10 秒、1 分钟、5 分钟等
5.3 数据格式
- 不需要时间字段:处理时间由系统自动生成
- JSON 格式简单,只需要业务字段
- 确保 JSON 格式正确,每行一个完整的 JSON 对象
5.4 错误处理
-- 忽略 JSON 解析错误
'json.ignore-parse-errors' = 'true'-- 或者使用更严格的错误处理
'json.fail-on-missing-field' = 'false'
6. 常见问题
6.1 窗口没有输出
可能原因:
- 窗口大小设置过大,还没有到触发时间
- 数据格式错误
- Kafka 连接问题
解决方法:
- 减小窗口大小进行测试(如 10 秒)
- 检查 JSON 格式是否正确
- 检查 Kafka 连接配置
6.2 窗口时间不准确
说明:这是处理时间的正常特性,窗口时间基于系统处理时间,不是数据发生时间。
解决方法:
- 如果需要准确的时间语义,应该使用事件时间
- 处理时间适用于对时间不敏感的场景
6.3 数据顺序问题
说明:处理时间不考虑数据顺序,先到达的数据先处理。
解决方法:
- 如果需要保证数据顺序,应该使用事件时间
- 或者在前端保证数据顺序
7. 调试技巧
7.1 查看原始数据
-- 查看原始数据流和处理时间
SELECT user_id,url,proc_time
FROM user_clicks_proc;
7.2 查看当前系统时间
-- 查看当前处理时间
SELECT PROCTIME() AS current_proc_time,user_id,url
FROM user_clicks_proc;
7.3 使用 Flink Web UI
- 访问
http://localhost:8081 - 查看作业的 Metrics
- 查看窗口相关的指标,如窗口延迟、处理延迟等
7.4 测试窗口触发
- 快速发送一批数据,观察是否在同一窗口
- 等待窗口大小的时间后,再发送数据,观察是否在新窗口
- 使用较小的窗口大小(如 10 秒)进行测试
8. 处理时间 vs 事件时间对比
8.1 配置对比
| 配置项 | 处理时间 | 事件时间 |
|---|---|---|
| 时间字段 | 不需要 | 需要 |
| 水印 | 不需要 | 需要 |
| 表定义 | proc_time AS PROCTIME() | event_time TIMESTAMP(3), WATERMARK FOR event_time AS ... |
| 窗口函数 | TUMBLE(proc_time, ...) | TUMBLE(event_time, ...) |
8.2 数据格式对比
处理时间(不需要时间字段):
{"user_id":"user1","url":"/home"}
事件时间(需要时间字段):
{"user_id":"user1","url":"/home","click_time":"2025-11-07 10:00:05"}
8.3 使用场景对比
- 处理时间:快速响应、实时监控、日志分析
- 事件时间:准确时间、计费系统、数据分析
9. 参考资源
- Flink 1.20 窗口函数官方文档
- Flink 1.20 Kafka 连接器文档
- Flink 1.20 时间属性文档
测试数据和测试用例(Flink 1.20 滚动窗口)
本文档提供详细的测试数据、测试用例和预期结果,用于验证 Flink SQL 滚动窗口函数的正确性。所有测试均使用 Flink SQL Client 和 Kafka 进行。
1. 测试环境准备
1.1 测试数据格式
所有测试数据使用 JSON 格式,便于在 Kafka 中使用。
事件时间数据格式:
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
处理时间数据格式:
{"user_id":"user1","url":"/home"}
1.2 测试工具
- Flink SQL Client:用于执行 SQL 查询
- Kafka:用于数据源(必须)
- Print Connector:用于结果输出
1.3 测试步骤
- 创建 Kafka 主题
- 在 Flink SQL Client 中创建表
- 定义窗口查询
- 通过 Kafka 发送测试数据
- 观察窗口输出结果
- 验证结果是否符合预期
2. 事件时间滚动窗口测试
2.1 测试用例 1:用户点击统计(基本功能)
测试目标
验证基于事件时间的滚动窗口能够正确统计每个用户每 1 分钟内的点击次数。
表定义
CREATE TABLE user_clicks_event (user_id STRING,click_time TIMESTAMP(3),url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_clicks_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-test','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
测试数据
通过 Kafka 生产者发送以下数据:
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:00:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:00:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:01:05","url":"/about"}
{"user_id":"user2","click_time":"2025-11-07 10:01:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:01:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:01:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:02:05","url":"/about"}
测试查询
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_event
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
预期结果
| window_start | window_end | user_id | click_count |
|---|---|---|---|
| 2025-11-07 10:00:00 | 2025-11-07 10:01:00 | user1 | 3 |
| 2025-11-07 10:00:00 | 2025-11-07 10:01:00 | user2 | 2 |
| 2025-11-07 10:01:00 | 2025-11-07 10:02:00 | user1 | 2 |
| 2025-11-07 10:01:00 | 2025-11-07 10:02:00 | user2 | 2 |
| 2025-11-07 10:02:00 | 2025-11-07 10:03:00 | user1 | 1 |
结果验证点
- ✅ 窗口大小正确:每个窗口覆盖 1 分钟
- ✅ 窗口不重叠:窗口之间没有重叠
- ✅ 窗口对齐:窗口对齐到整分钟(10:00:00, 10:01:00, 10:02:00)
- ✅ 聚合结果正确:每个窗口内的点击次数正确
- ✅ 分组正确:按用户分组统计
- ✅ 窗口时间基于事件时间:窗口时间基于数据中的
click_time字段
2.2 测试用例 2:乱序数据处理
测试目标
验证基于事件时间的滚动窗口能够正确处理乱序数据。
表定义
同测试用例 1。
测试数据
发送包含乱序数据的测试数据:
{"user_id":"user1","click_time":"2025-11-07 10:00:10","url":"/first"}
{"user_id":"user1","click_time":"2025-11-07 10:00:30","url":"/third"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/second"} -- 乱序:实际时间在 first 和 third 之间
{"user_id":"user1","click_time":"2025-11-07 10:00:50","url":"/fourth"}
注意:第三条数据的时间戳是 10:00:20,但到达顺序在 10:00:30 之后。
测试查询
同测试用例 1。
预期结果
如果水印延迟设置为 5 秒,且乱序数据在水印允许范围内到达,应该能正确处理:
| window_start | window_end | user_id | click_count |
|---|---|---|---|
| 2025-11-07 10:00:00 | 2025-11-07 10:01:00 | user1 | 4 |
所有 4 条数据都应该被包含在窗口 [10:00:00, 10:01:00) 中。
结果验证点
- ✅ 乱序数据处理:乱序数据被正确分配到对应窗口
- ✅ 水印机制:水印机制正确处理延迟数据
- ✅ 结果准确性:结果准确反映事件发生时间
2.3 测试用例 3:延迟数据处理
测试目标
验证基于事件时间的滚动窗口如何处理延迟数据。
表定义
同测试用例 1(水印延迟 5 秒)。
测试数据
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/normal1"}
{"user_id":"user1","click_time":"2025-11-07 10:00:10","url":"/normal2"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/normal3"}
-- 等待窗口触发后(水印超过 10:01:00),再发送延迟数据
{"user_id":"user1","click_time":"2025-11-07 10:00:15","url":"/delayed"} -- 延迟数据
预期结果
如果延迟数据在水印允许范围内到达(5 秒内),应该被包含在窗口中。如果超过水印允许范围,可能不会被包含。
结果验证点
- ✅ 延迟数据处理:延迟数据根据水印策略处理
- ✅ 水印延迟设置:水印延迟设置影响延迟数据的处理
2.4 测试用例 4:订单金额统计
测试目标
验证基于事件时间的滚动窗口能够正确统计订单金额。
表定义
CREATE TABLE orders_event (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-test','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
测试数据
{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:00:05"}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99,"order_time":"2025-11-07 10:02:10"}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99,"order_time":"2025-11-07 10:03:20"}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:04:30"}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99,"order_time":"2025-11-07 10:06:45"}
测试查询
SELECTTUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_event
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
预期结果
| window_start | window_end | total_amount | order_count |
|---|---|---|---|
| 2025-11-07 10:00:00 | 2025-11-07 10:05:00 | 599.97 | 4 |
| 2025-11-07 10:05:00 | 2025-11-07 10:10:00 | 399.99 | 1 |
结果验证点
- ✅ 金额聚合正确:SUM 函数正确计算总金额
- ✅ 计数正确:COUNT 函数正确计算订单数量
- ✅ 窗口分组正确:按 5 分钟窗口正确分组
3. 处理时间滚动窗口测试
3.1 测试用例 1:用户点击统计(基本功能)
测试目标
验证基于处理时间的滚动窗口能够正确统计每个用户每 1 分钟内的点击次数。
表定义
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-test','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
测试数据
通过 Kafka 生产者发送以下数据(不需要时间字段):
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
{"user_id":"user2","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
测试查询
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
预期结果
窗口时间基于系统处理时间,假设当前系统时间是 14:30:xx:
| window_start | window_end | user_id | click_count |
|---|---|---|---|
| 2025-11-07 14:30:00 | 2025-11-07 14:31:00 | user1 | 5 |
| 2025-11-07 14:30:00 | 2025-11-07 14:31:00 | user2 | 3 |
| 2025-11-07 14:31:00 | 2025-11-07 14:32:00 | user1 | 2 |
| 2025-11-07 14:31:00 | 2025-11-07 14:32:00 | user2 | 0 |
注意:实际窗口时间取决于系统处理时间,每次运行可能不同。
结果验证点
- ✅ 窗口大小正确:每个窗口覆盖 1 分钟
- ✅ 窗口不重叠:窗口之间没有重叠
- ✅ 窗口对齐:窗口对齐到整分钟
- ✅ 聚合结果正确:每个窗口内的点击次数正确
- ✅ 分组正确:按用户分组统计
- ✅ 窗口时间基于处理时间:窗口时间基于系统处理时间
3.2 测试用例 2:窗口触发测试
测试目标
验证处理时间窗口的触发机制。
表定义
同测试用例 1。
测试步骤
- 第一批数据(快速发送,应该在同一窗口内):
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
-
等待 1 分钟(让窗口触发)
-
第二批数据(应该在新窗口内):
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
预期结果
- 第一批数据应该在同一窗口内
- 等待 1 分钟后,窗口应该触发
- 第二批数据应该在新窗口内
结果验证点
- ✅ 窗口触发:窗口在系统时间超过窗口结束时间时触发
- ✅ 窗口分组:数据按处理时间正确分配到窗口
3.3 测试用例 3:订单金额统计
测试目标
验证基于处理时间的滚动窗口能够正确统计订单金额。
表定义
CREATE TABLE orders_proc (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-test','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
测试数据
{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99}
测试查询
SELECTTUMBLE_START(proc_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_proc
GROUP BY TUMBLE(proc_time, INTERVAL '5' MINUTE);
预期结果
窗口时间基于系统处理时间,所有数据如果在同一 5 分钟窗口内处理,应该在同一窗口:
| window_start | window_end | total_amount | order_count |
|---|---|---|---|
| 2025-11-07 14:30:00 | 2025-11-07 14:35:00 | 1098.95 | 5 |
结果验证点
- ✅ 金额聚合正确:SUM 函数正确计算总金额
- ✅ 计数正确:COUNT 函数正确计算订单数量
- ✅ 窗口分组正确:按 5 分钟窗口正确分组
4. 对比测试
4.1 测试用例:相同数据,不同时间语义
测试目标
对比事件时间和处理时间对相同数据的处理结果。
测试步骤
- 准备相同的数据(事件时间版本):
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
-
发送到事件时间主题,观察结果
-
发送到处理时间主题(去掉时间字段),观察结果
预期差异
- 事件时间:窗口时间基于数据中的时间(
10:00:xx) - 处理时间:窗口时间基于系统时间(可能是
14:30:xx或其他时间)
结果验证点
- ✅ 时间来源不同:事件时间基于数据时间,处理时间基于系统时间
- ✅ 结果确定性:事件时间结果确定,处理时间结果不确定
5. 性能测试
5.1 测试用例:大量数据测试
测试目标
验证窗口函数在处理大量数据时的性能。
测试数据
生成大量测试数据(如 10000 条),通过 Kafka 发送。
测试指标
- 吞吐量(records/second)
- 延迟(latency)
- 内存使用
- CPU 使用
结果验证点
- ✅ 性能指标:性能指标在可接受范围内
- ✅ 资源使用:资源使用合理
6. 错误处理测试
6.1 测试用例:格式错误数据
测试目标
验证窗口函数如何处理格式错误的数据。
测试数据
{"user_id":"user1","click_time":"invalid-time","url":"/home"} -- 时间格式错误
{"user_id":"user2","url":"/about"} -- 缺少时间字段(事件时间)
{"invalid json" -- JSON 格式错误
预期结果
- 格式错误的数据应该被忽略或记录错误
- 不影响其他正常数据的处理
结果验证点
- ✅ 错误处理:错误数据被正确处理
- ✅ 容错性:不影响正常数据处理
7. 测试数据文件
测试数据文件位于 test_data/ 目录:
user_clicks_event_time.json- 事件时间用户点击数据orders_event_time.json- 事件时间订单数据user_clicks_processing_time.json- 处理时间用户点击数据orders_processing_time.json- 处理时间订单数据
8. 测试执行步骤
8.1 完整测试流程
-
环境准备
# 启动 Kafka bin/kafka-server-start.sh config/server.properties# 启动 Flink ./bin/start-cluster.sh ./bin/sql-client.sh -
创建 Kafka 主题
kafka-topics.sh --create \--topic user_clicks_event \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1 -
在 Flink SQL Client 中创建表
CREATE TABLE user_clicks_event (...); -
定义窗口查询
INSERT INTO window_results_event SELECT ... FROM user_clicks_event GROUP BY TUMBLE(...); -
发送测试数据
kafka-console-producer.sh \--topic user_clicks_event \--bootstrap-server localhost:9092 -
观察和验证结果
8.2 自动化测试建议
- 使用脚本自动化测试流程
- 使用断言验证结果
- 记录测试结果和性能指标
9. 参考资源
- Flink 1.20 窗口函数官方文档
- Flink 1.20 测试指南
Flink SQL 滚动窗口完整示例(Flink 1.20)
本文档提供可运行的完整示例,包括表定义、Kafka 命令操作、查询语句和结果说明。所有示例均使用 Flink SQL Client 执行。
1. 环境准备
1.1 前置条件
- Flink 1.20:确保已安装 Flink 1.20 版本
- Kafka:用于数据源(必须)
- Flink SQL Client:用于执行 SQL 查询
1.2 启动服务
1.2.1 启动 Kafka
# 启动 Zookeeper(如果使用旧版本 Kafka)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
1.2.2 启动 Flink
# 启动 Flink Standalone 集群
./bin/start-cluster.sh# 启动 Flink SQL Client
./bin/sql-client.sh
1.3 验证环境
# 验证 Kafka 运行
kafka-topics.sh --list --bootstrap-server localhost:9092# 验证 Flink 运行
# 访问 http://localhost:8081 查看 Flink Web UI
2. 事件时间滚动窗口完整示例
2.1 示例场景
统计每个用户每 1 分钟内的点击次数,基于事件时间。
2.2 完整操作步骤
步骤 1:创建 Kafka 主题
# 创建主题
kafka-topics.sh --create \--topic user_clicks_event \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 验证主题创建成功
kafka-topics.sh --list --bootstrap-server localhost:9092
步骤 2:在 Flink SQL Client 中创建表
-- ============================================
-- 事件时间滚动窗口完整示例
-- ============================================-- 1. 创建源表(从 Kafka 读取数据,事件时间)
CREATE TABLE user_clicks_event (user_id STRING,click_time TIMESTAMP(3),url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_clicks_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-event','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);-- 2. 创建结果表(输出到 Print)
CREATE TABLE window_results_event (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
步骤 3:定义窗口查询
-- 3. 执行滚动窗口查询并插入结果
INSERT INTO window_results_event
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_event
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
步骤 4:发送测试数据
打开新的终端窗口,执行:
# 启动 Kafka 生产者
kafka-console-producer.sh \--topic user_clicks_event \--bootstrap-server localhost:9092
在生产者终端中,逐行输入以下 JSON 数据:
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:00:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:00:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:01:05","url":"/about"}
{"user_id":"user2","click_time":"2025-11-07 10:01:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:01:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:01:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:02:05","url":"/about"}
重要提示:
- 每行一个完整的 JSON 对象
- 时间格式:
YYYY-MM-DD HH:mm:ss - 时间戳按顺序递增
步骤 5:观察窗口输出
在 Flink SQL Client 中会看到实时输出:
+I[2025-11-07T10:00:00, 2025-11-07T10:01:00, user1, 3]
+I[2025-11-07T10:00:00, 2025-11-07T10:01:00, user2, 2]
+I[2025-11-07T10:01:00, 2025-11-07T10:02:00, user1, 2]
+I[2025-11-07T10:01:00, 2025-11-07T10:02:00, user2, 2]
+I[2025-11-07T10:02:00, 2025-11-07T10:03:00, user1, 1]
输出说明:
+I:插入操作- 第一列:窗口开始时间(包含边界)
- 第二列:窗口结束时间(不包含边界)
- 第三列:用户ID
- 第四列:点击次数
2.3 测试乱序数据
为了测试水印机制,可以发送一些乱序数据:
{"user_id":"user1","click_time":"2025-11-07 10:00:15","url":"/delayed1"}
{"user_id":"user2","click_time":"2025-11-07 10:00:25","url":"/delayed2"}
这些数据的时间戳在之前发送的数据之间,用于测试水印机制如何处理乱序数据。
3. 处理时间滚动窗口完整示例
3.1 示例场景
统计每个用户每 1 分钟内的点击次数,基于处理时间。
3.2 完整操作步骤
步骤 1:创建 Kafka 主题
# 创建主题
kafka-topics.sh --create \--topic user_clicks_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
步骤 2:在 Flink SQL Client 中创建表
-- ============================================
-- 处理时间滚动窗口完整示例
-- ============================================-- 1. 创建源表(从 Kafka 读取数据,处理时间)
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-proc','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);-- 2. 创建结果表(输出到 Print)
CREATE TABLE window_results_proc (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
步骤 3:定义窗口查询
-- 3. 执行滚动窗口查询并插入结果
INSERT INTO window_results_proc
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
步骤 4:发送测试数据
打开新的终端窗口,执行:
# 启动 Kafka 生产者
kafka-console-producer.sh \--topic user_clicks_proc \--bootstrap-server localhost:9092
在生产者终端中,逐行输入以下 JSON 数据(注意:不需要时间字段):
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
{"user_id":"user2","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
重要提示:
- 每行一个完整的 JSON 对象
- 不需要包含时间字段
- 可以快速连续发送多条数据
步骤 5:观察窗口输出
在 Flink SQL Client 中会看到实时输出:
+I[2025-11-07T14:30:00, 2025-11-07T14:31:00, user1, 5]
+I[2025-11-07T14:30:00, 2025-11-07T14:31:00, user2, 3]
+I[2025-11-07T14:31:00, 2025-11-07T14:32:00, user1, 2]
+I[2025-11-07T14:31:00, 2025-11-07T14:32:00, user2, 0]
输出说明:
+I:插入操作- 第一列:窗口开始时间(系统处理时间,包含边界)
- 第二列:窗口结束时间(系统处理时间,不包含边界)
- 第三列:用户ID
- 第四列:点击次数
- 注意:窗口时间基于系统处理时间,不是数据中的时间
3.3 测试窗口触发
为了更清楚地观察窗口触发机制,可以分批发送数据:
第一批(快速发送,应该在同一窗口内):
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
等待 1 分钟后,发送第二批(应该在新窗口内):
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
4. 订单金额统计示例
4.1 事件时间版本
步骤 1:创建主题
kafka-topics.sh --create \--topic orders_event \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
步骤 2:创建表
-- 创建订单表(事件时间)
CREATE TABLE orders_event (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-orders','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);-- 创建结果表
CREATE TABLE order_statistics_event (window_start TIMESTAMP(3),window_end TIMESTAMP(3),total_amount DECIMAL(10, 2),order_count BIGINT
) WITH ('connector' = 'print'
);
步骤 3:定义查询
-- 计算每 5 分钟内的订单总金额
INSERT INTO order_statistics_event
SELECTTUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_event
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
步骤 4:发送数据
kafka-console-producer.sh \--topic orders_event \--bootstrap-server localhost:9092
输入数据:
{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:00:05"}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99,"order_time":"2025-11-07 10:02:10"}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99,"order_time":"2025-11-07 10:03:20"}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:04:30"}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99,"order_time":"2025-11-07 10:06:45"}
4.2 处理时间版本
步骤 1:创建主题
kafka-topics.sh --create \--topic orders_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
步骤 2:创建表
-- 创建订单表(处理时间)
CREATE TABLE orders_proc (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-orders-proc','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);-- 创建结果表
CREATE TABLE order_statistics_proc (window_start TIMESTAMP(3),window_end TIMESTAMP(3),total_amount DECIMAL(10, 2),order_count BIGINT
) WITH ('connector' = 'print'
);
步骤 3:定义查询
-- 计算每 5 分钟内的订单总金额
INSERT INTO order_statistics_proc
SELECTTUMBLE_START(proc_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_proc
GROUP BY TUMBLE(proc_time, INTERVAL '5' MINUTE);
步骤 4:发送数据
kafka-console-producer.sh \--topic orders_proc \--bootstrap-server localhost:9092
输入数据(不需要时间字段):
{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99}
5. 调试和验证
5.1 查看原始数据流
-- 查看事件时间数据流
SELECT * FROM user_clicks_event;-- 查看处理时间数据流
SELECT user_id,url,proc_time
FROM user_clicks_proc;
5.2 使用 Flink Web UI
- 访问
http://localhost:8081 - 查看运行中的作业
- 点击作业名称,查看详细信息
- 在 “Metrics” 标签页中查看:
- 窗口延迟
- 水印延迟(事件时间)
- 处理延迟
- 吞吐量
5.3 验证窗口行为
验证事件时间窗口
- 发送已知时间戳的数据
- 观察窗口时间是否基于数据中的时间
- 发送乱序数据,验证水印机制
- 验证结果是否确定(重复运行结果相同)
验证处理时间窗口
- 发送数据(不需要时间字段)
- 观察窗口时间是否基于系统时间
- 快速发送多条数据,验证是否在同一窗口
- 等待窗口大小时间后,再发送数据,验证是否在新窗口
6. 常见问题排查
6.1 窗口没有输出
可能原因:
- 水印没有推进(事件时间)
- 窗口大小设置过大
- 数据格式错误
- Kafka 连接问题
解决方法:
- 检查数据时间戳是否递增(事件时间)
- 减小窗口大小进行测试
- 检查 JSON 格式是否正确
- 检查 Kafka 连接配置
6.2 时间格式错误
错误信息:Cannot parse timestamp
解决方法:
- 确保 JSON 中的时间格式为
YYYY-MM-DD HH:mm:ss - 使用
TIMESTAMP(3)精度 - 检查时间字段名称是否匹配
6.3 数据没有被包含在窗口中
可能原因:
- 水印延迟设置过小(事件时间)
- 数据延迟超过水印允许范围(事件时间)
- 窗口已经触发(事件时间)
解决方法:
- 增加水印延迟时间
- 检查数据源是否有延迟
- 重新发送数据
7. 性能优化建议
7.1 窗口大小选择
- 太小:窗口数量多,计算开销大
- 太大:结果延迟高,实时性差
- 建议:根据业务需求选择,常见的有 1 分钟、5 分钟、1 小时
7.2 水印延迟设置
- 太小:可能导致延迟数据被丢弃
- 太大:窗口触发延迟,实时性差
- 建议:设置为最大延迟时间的 1.5-2 倍
7.3 Kafka 配置优化
-- 优化 Kafka 消费配置
'properties.fetch.min.bytes' = '1024',
'properties.fetch.max.wait.ms' = '500',
'properties.max.partition.fetch.bytes' = '1048576'
事件时间与处理时间详细对比(Flink 1.20)
本文档详细对比事件时间(Event Time)和处理时间(Processing Time)在 Flink SQL 滚动窗口中的使用,所有示例均使用 Flink SQL Client 和 Kafka 进行演示。
1. 核心概念对比
1.1 定义对比
| 维度 | 事件时间(Event Time) | 处理时间(Processing Time) |
|---|---|---|
| 定义 | 事件实际发生的时间,由事件本身携带 | 数据被 Flink 处理时的系统时间 |
| 时间来源 | 数据中的时间戳字段 | 系统当前时间 |
| 确定性 | 结果确定,可重复 | 结果不确定,依赖于数据到达时间 |
| 水印需求 | 需要定义水印 | 不需要水印 |
| 乱序处理 | 通过水印机制处理乱序和延迟数据 | 不考虑乱序,按到达顺序处理 |
| 延迟容忍 | 可以容忍一定范围内的延迟 | 不处理延迟数据 |
1.2 适用场景对比
| 场景 | 事件时间 | 处理时间 |
|---|---|---|
| 实时监控 | ✅ 需要准确时间语义 | ✅ 快速响应 |
| 计费系统 | ✅ 必须使用 | ❌ 不适用 |
| 日志分析 | ✅ 准确分析 | ✅ 快速分析 |
| 数据分析 | ✅ 准确结果 | ❌ 不适用 |
| 快速响应 | ⚠️ 可能有延迟 | ✅ 延迟低 |
| 数据乱序 | ✅ 可以处理 | ❌ 不考虑 |
2. 配置对比
2.1 表定义对比
事件时间表定义
CREATE TABLE user_clicks_event (user_id STRING,click_time TIMESTAMP(3), -- 事件时间字段url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND -- 水印定义
) WITH ('connector' = 'kafka','topic' = 'user_clicks_event','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);
关键点:
- 需要定义时间字段(
click_time TIMESTAMP(3)) - 必须定义水印(
WATERMARK FOR ...) - 数据中必须包含时间字段
处理时间表定义
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME() -- 处理时间属性
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);
关键点:
- 不需要时间字段
- 不需要水印
- 使用
PROCTIME()函数定义处理时间属性 - 数据中不需要包含时间字段
2.2 窗口查询对比
事件时间窗口查询
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_event
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
处理时间窗口查询
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
2.3 数据格式对比
事件时间数据格式
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
要求:
- 必须包含时间字段(
click_time) - 时间格式必须正确:
YYYY-MM-DD HH:mm:ss - 时间戳应该递增(虽然可以处理乱序)
处理时间数据格式
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
要求:
- 不需要时间字段
- 只需要业务字段
- 格式更简单
3. 完整对比示例
3.1 环境准备
3.1.1 启动 Kafka
# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
3.1.2 创建 Kafka 主题
# 创建事件时间主题
kafka-topics.sh --create \--topic user_clicks_event \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 创建处理时间主题
kafka-topics.sh --create \--topic user_clicks_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
3.1.3 启动 Flink SQL Client
./bin/sql-client.sh
3.2 事件时间完整示例
3.2.1 创建事件时间表
-- 在 Flink SQL Client 中执行
CREATE TABLE user_clicks_event (user_id STRING,click_time TIMESTAMP(3),url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_clicks_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-event','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
3.2.2 创建结果表
CREATE TABLE window_results_event (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
3.2.3 定义窗口查询
INSERT INTO window_results_event
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_event
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
3.2.4 发送事件时间数据
# 启动 Kafka 生产者
kafka-console-producer.sh \--topic user_clicks_event \--bootstrap-server localhost:9092
输入数据(注意时间字段):
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:00:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:00:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:01:05","url":"/about"}
{"user_id":"user2","click_time":"2025-11-07 10:01:20","url":"/products"}
3.2.5 观察事件时间窗口输出
在 Flink SQL Client 中会看到:
+----+-------------------------+-------------------------+--------+-------------+
| op | window_start | window_end | user_id| click_count |
+----+-------------------------+-------------------------+--------+-------------+
| +I | 2025-11-07 10:00:00.000| 2025-11-07 10:01:00.000| user1 | 3 |
| +I | 2025-11-07 10:00:00.000| 2025-11-07 10:01:00.000| user2 | 2 |
| +I | 2025-11-07 10:01:00.000| 2025-11-07 10:02:00.000| user1 | 1 |
| +I | 2025-11-07 10:01:00.000| 2025-11-07 10:02:00.000| user2 | 1 |
+----+-------------------------+-------------------------+--------+-------------+
关键观察点:
- 窗口时间基于数据中的
click_time字段 - 窗口
[10:00:00, 10:01:00)包含时间戳在10:00:00到10:00:59之间的数据 - 结果确定,可重复
3.3 处理时间完整示例
3.3.1 创建处理时间表
-- 在 Flink SQL Client 中执行
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-proc','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
3.3.2 创建结果表
CREATE TABLE window_results_proc (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
3.3.3 定义窗口查询
INSERT INTO window_results_proc
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
3.3.4 发送处理时间数据
# 启动 Kafka 生产者
kafka-console-producer.sh \--topic user_clicks_proc \--bootstrap-server localhost:9092
输入数据(不需要时间字段):
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
{"user_id":"user2","url":"/products"}
3.3.5 观察处理时间窗口输出
在 Flink SQL Client 中会看到:
+----+-------------------------+-------------------------+--------+-------------+
| op | window_start | window_end | user_id| click_count |
+----+-------------------------+-------------------------+--------+-------------+
| +I | 2025-11-07 14:30:00.000| 2025-11-07 14:31:00.000| user1 | 3 |
| +I | 2025-11-07 14:30:00.000| 2025-11-07 14:31:00.000| user2 | 2 |
| +I | 2025-11-07 14:31:00.000| 2025-11-07 14:32:00.000| user1 | 1 |
| +I | 2025-11-07 14:31:00.000| 2025-11-07 14:32:00.000| user2 | 1 |
+----+-------------------------+-------------------------+--------+-------------+
关键观察点:
- 窗口时间基于系统处理时间(当前是
14:30:xx) - 窗口
[14:30:00, 14:31:00)包含在14:30:00到14:30:59之间处理的数据 - 结果依赖于数据到达时间,每次运行可能不同
4. 行为差异详细对比
4.1 窗口触发机制对比
事件时间窗口触发
- 水印推进:当新数据到达时,系统根据数据时间戳更新水印
- 窗口触发:当水印超过窗口结束时间时,窗口被触发
- 延迟数据:在水印允许范围内到达的延迟数据仍会被包含
示例:
- 窗口
[10:00:00, 10:01:00)在水印达到10:01:05(假设水印延迟 5 秒)时触发 - 如果
10:00:50的数据在水印10:01:05之后到达,可能不会被包含
处理时间窗口触发
- 系统时间推进:窗口根据系统当前时间进行分组
- 窗口触发:当系统时间超过窗口结束时间时,窗口被触发
- 实时性:窗口触发快,不需要等待延迟数据
示例:
- 窗口
[14:30:00, 14:31:00)在系统时间达到14:31:00时触发 - 所有在
14:30:00到14:30:59之间处理的数据都会被包含
4.2 乱序数据处理对比
事件时间处理乱序
场景:数据按以下顺序到达(但实际发生时间不同)
到达顺序:A(10:00:10) -> C(10:00:30) -> B(10:00:20) -> D(10:00:40)
实际时间:A(10:00:10) -> B(10:00:20) -> C(10:00:30) -> D(10:00:40)
处理方式:
- 通过水印机制,B 虽然晚到达,但仍会被分配到正确的窗口
- 如果 B 在水印允许范围内到达,会被包含在窗口
[10:00:00, 10:01:00)中
处理时间处理乱序
场景:数据按到达顺序处理
到达顺序:A -> C -> B -> D
处理顺序:A -> C -> B -> D(按到达顺序)
处理方式:
- 不考虑数据实际发生时间
- 按到达顺序分配到窗口
- 先到达的数据先处理,分配到较早的窗口
4.3 结果确定性对比
事件时间结果确定性
- 确定性:相同的数据,无论何时处理,结果都相同
- 可重复性:可以重复运行,结果一致
- 准确性:结果准确反映事件发生的时间
测试方法:
- 发送相同的数据两次
- 观察窗口结果是否相同
- 结果应该完全一致
处理时间结果不确定性
- 不确定性:相同的数据,不同时间处理,结果可能不同
- 不可重复:每次运行结果可能不同
- 依赖到达时间:结果依赖于数据到达时间
测试方法:
- 发送相同的数据两次(间隔不同时间)
- 观察窗口结果是否相同
- 结果可能不同(因为系统时间不同)
5. 性能对比
5.1 计算开销
| 维度 | 事件时间 | 处理时间 |
|---|---|---|
| 水印计算 | 需要 | 不需要 |
| 乱序处理 | 需要额外开销 | 不需要 |
| 内存占用 | 较高(需要缓存延迟数据) | 较低 |
| CPU 开销 | 较高 | 较低 |
| 整体性能 | 相对较慢 | 相对较快 |
5.2 延迟对比
| 维度 | 事件时间 | 处理时间 |
|---|---|---|
| 窗口触发延迟 | 需要等待水印,有延迟 | 立即触发,延迟低 |
| 结果输出延迟 | 较高(等待延迟数据) | 较低(立即输出) |
| 实时性 | 相对较低 | 相对较高 |
6. 选择建议
6.1 选择事件时间的场景
✅ 必须使用事件时间:
- 计费系统:需要准确的时间语义
- 数据分析:需要准确的结果
- 审计系统:需要可重复的结果
- 对时间顺序有严格要求的场景
✅ 推荐使用事件时间:
- 实时监控:需要准确的时间语义
- 日志分析:需要准确的分析结果
- 数据质量要求高的场景
6.2 选择处理时间的场景
✅ 可以使用处理时间:
- 快速响应:对延迟敏感
- 实时监控:对时间精度要求不高
- 日志分析:快速分析,不需要准确时间
- 对时间顺序要求不严格的场景
❌ 不能使用处理时间:
- 计费系统
- 需要准确时间语义的场景
- 需要可重复结果的场景
7. 实际测试对比
7.1 测试场景:相同数据,不同时间语义
测试步骤
- 准备相同的数据(事件时间版本):
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
-
发送到事件时间主题,观察结果
-
发送到处理时间主题(去掉时间字段),观察结果
预期差异
- 事件时间:窗口时间基于数据中的时间(
10:00:xx) - 处理时间:窗口时间基于系统时间(可能是
14:30:xx或其他时间)
7.2 测试场景:乱序数据
测试步骤
- 发送乱序数据(事件时间):
{"user_id":"user1","click_time":"2025-11-07 10:00:10","url":"/first"}
{"user_id":"user1","click_time":"2025-11-07 10:00:30","url":"/third"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/second"} -- 乱序
-
观察事件时间窗口:应该能正确处理乱序
-
发送相同数据(处理时间,去掉时间字段)
-
观察处理时间窗口:按到达顺序处理
预期差异
- 事件时间:能正确处理乱序,B 会被分配到正确的窗口
- 处理时间:按到达顺序处理,不考虑实际时间
8. 常见问题解答
Q1: 什么时候必须使用事件时间?
A: 当业务需要准确的时间语义时,必须使用事件时间。例如:
- 计费系统:需要准确记录事件发生时间
- 数据分析:需要准确的分析结果
- 审计系统:需要可重复的结果
Q2: 处理时间可以用于生产环境吗?
A: 可以,但需要满足以下条件:
- 业务对时间精度要求不高
- 不需要准确的时间语义
- 可以接受结果的不确定性
Q3: 如何选择水印延迟时间?
A: 根据业务场景和数据延迟情况:
- 观察数据延迟分布
- 设置为最大延迟时间的 1.5-2 倍
- 太小:可能丢失延迟数据
- 太大:窗口触发延迟高
Q4: 事件时间和处理时间可以混用吗?
A: 可以,但需要注意:
- 不同时间语义的窗口不能直接关联
- 需要明确每个窗口使用的时间语义
- 建议统一使用一种时间语义
Q5: 如何测试窗口是否正确工作?
A: 测试方法:
- 发送已知的数据
- 观察窗口输出
- 验证结果是否符合预期
- 测试乱序数据(事件时间)
- 测试延迟数据(事件时间)
9. 总结
9.1 核心区别
| 维度 | 事件时间 | 处理时间 |
|---|---|---|
| 时间来源 | 数据中的时间戳 | 系统当前时间 |
| 水印 | 需要 | 不需要 |
| 确定性 | 确定 | 不确定 |
| 乱序处理 | 支持 | 不支持 |
| 性能 | 较低 | 较高 |
| 延迟 | 较高 | 较低 |
9.2 选择原则
- 需要准确时间语义 → 使用事件时间
- 需要快速响应 → 考虑处理时间
- 需要可重复结果 → 使用事件时间
- 对时间不敏感 → 可以使用处理时间
9.3 最佳实践
- 优先使用事件时间:除非有特殊需求,否则优先使用事件时间
- 合理设置水印:根据数据延迟情况设置合适的水印
- 测试验证:通过实际测试验证窗口行为
- 监控指标:监控窗口延迟、水印延迟等指标
10. 参考资源
- Flink 1.20 窗口函数官方文档
- Flink 1.20 时间属性文档
- Flink 1.20 Kafka 连接器文档
