当前位置: 首页 > news >正文

Flink20 SQL 窗口函数概述

Flink SQL 窗口函数概述(Flink 1.20)

1. 窗口函数简介

在流处理中,数据是连续不断的,无法像批处理那样等待所有数据到达后再进行处理。窗口函数(Window Functions)是 Flink SQL 中用于将无限的数据流划分为有限的时间窗口,以便进行聚合和分析的核心工具。

本文档基于 Flink 1.20 版本,主要关注滚动窗口(TUMBLE)的事件时间和处理时间使用场景。

1.1 为什么需要窗口函数?

  • 无限数据流:流数据是连续不断的,无法等待所有数据到达
  • 时间范围聚合:需要按时间范围进行统计(如每分钟的访问量)
  • 实时分析:需要实时计算移动平均值、累计值等指标

2. 窗口类型

Flink SQL 支持三种主要的窗口类型:

2.1 滚动窗口(TUMBLE)

  • 特点:窗口之间不重叠,每个窗口的长度相同
  • 语法TUMBLE(time_attr, size_interval)
  • 示例:每 5 分钟统计一次,窗口大小为 5 分钟
窗口1: [10:00:00, 10:05:00)
窗口2: [10:05:00, 10:10:00)
窗口3: [10:10:00, 10:15:00)

2.2 滑动窗口(HOP)

  • 特点:窗口之间可以重叠,通过滑动步长控制窗口移动
  • 语法HOP(time_attr, slide_interval, size_interval)
  • 示例:每 30 秒统计一次过去 1 分钟的数据
窗口1: [10:00:00, 10:01:00)
窗口2: [10:00:30, 10:01:30)
窗口3: [10:01:00, 10:02:00)
窗口4: [10:01:30, 10:02:30)

滑动窗口的行为

  • slide_interval < size_interval:窗口重叠
  • slide_interval = size_interval:等同于滚动窗口
  • slide_interval > size_interval:窗口之间有间隙(不常用)

注意:本文档主要关注滚动窗口(TUMBLE),滚动窗口是滑动窗口的特殊情况(滑动步长等于窗口大小)。滚动窗口是 Flink SQL 中最常用的窗口类型,适用于大多数时间窗口聚合场景。

2.3 会话窗口(SESSION)

  • 特点:基于活动间隔动态定义窗口,当数据间隔超过指定时间时创建新窗口
  • 语法SESSION(time_attr, gap_interval)
  • 示例:如果数据间隔超过 5 分钟,则创建新窗口

3. 时间属性

Flink SQL 支持两种时间属性:

3.1 事件时间(Event Time)

  • 定义:事件实际发生的时间,通常由事件本身携带的时间戳表示
  • 特点
    • 需要定义水印(Watermark)来处理乱序和延迟数据
    • 结果具有确定性,不受数据到达时间影响
    • 适合需要准确时间语义的场景
  • 定义方式
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    

3.2 处理时间(Processing Time)

  • 定义:事件被 Flink 处理时的系统时间
  • 特点
    • 不需要定义水印
    • 结果可能受数据到达时间影响,每次运行可能不同
    • 延迟低,适合对时间精度要求不高的场景
  • 定义方式
    proctime AS PROCTIME()
    

3.3 事件时间 vs 处理时间对比

特性事件时间处理时间
时间来源事件本身的时间戳系统当前时间
水印需求需要不需要
结果确定性确定不确定
延迟处理支持不支持
性能开销较高较低
适用场景需要准确时间语义对时间精度要求不高

4. 窗口函数语法

4.1 滚动窗口(TUMBLE)语法

TUMBLE(time_attr, size_interval)

参数说明

  • time_attr:时间属性字段,必须是流中的一个合法时间属性字段
  • size_interval:窗口大小,格式为 INTERVAL 'num' timeUnit

时间单位

  • SECONDMINUTEHOURDAY
  • 例如:INTERVAL '30' SECONDINTERVAL '1' MINUTEINTERVAL '1' HOUR

4.2 窗口辅助函数

TUMBLE_START

返回窗口的起始时间(包含边界)

TUMBLE_START(time_attr, size_interval)
TUMBLE_END

返回窗口的结束时间(不包含边界)

TUMBLE_END(time_attr, size_interval)
TUMBLE_ROWTIME

返回窗口的结束时间(不包含边界),返回值是一个 rowtime 属性,可用于基于事件时间的操作

TUMBLE_ROWTIME(time_attr, size_interval)
TUMBLE_PROCTIME

返回窗口的结束时间(不包含边界),返回值是一个 proctime 属性,可用于基于处理时间的操作

TUMBLE_PROCTIME(time_attr, size_interval)

5. 窗口聚合查询基本结构

SELECT[窗口标识函数],[分组字段],[聚合函数]
FROM table_name
GROUP BY[分组字段],[窗口函数]

示例

SELECTTUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks
GROUP BYuser_id,TUMBLE(event_time, INTERVAL '1' MINUTE)

6. 窗口函数使用场景

6.1 滚动窗口适用场景

  • 固定时间段的统计:每小时统计一次,每 5 分钟统计一次
  • 不重叠的聚合:每个时间段独立统计
  • 周期性报表:每天、每小时、每分钟的统计报表
  • 简单的时间窗口聚合:不需要窗口重叠的场景

6.2 滑动窗口适用场景

  • 移动平均值:每 10 秒更新一次过去 5 分钟的平均值
  • 实时监控:每 30 秒统计一次过去 1 分钟的指标
  • 频繁更新的聚合:需要更频繁地输出结果

6.3 会话窗口适用场景

  • 用户会话分析:分析用户的活动会话
  • 动态窗口大小:窗口大小根据数据间隔动态调整

7. 注意事项

  1. 窗口大小:滚动窗口大小应该根据业务需求合理设置
  2. 水印策略:使用事件时间时,需要根据数据延迟情况设置合适的水印
  3. 性能考虑:滚动窗口计算开销相对较小,适合大多数场景
  4. 结果输出:窗口结果在窗口结束时输出(事件时间)或定期输出(处理时间)
  5. 窗口对齐:滚动窗口会自动对齐到时间边界(如整点、整分钟)

8. 参考资源

  • Flink 1.20 窗口函数官方文档
  • Flink 1.20 时间属性文档

事件时间滚动窗口详细说明(Flink 1.20)

1. 事件时间概述

事件时间(Event Time)是指事件实际发生的时间,通常由事件本身携带的时间戳表示。使用事件时间进行窗口操作可以确保结果的准确性,不受数据到达时间的影响。

1.1 事件时间的特点

  • 准确性:基于事件实际发生的时间,结果具有确定性
  • 乱序处理:通过水印(Watermark)机制处理乱序和延迟数据
  • 延迟容忍:可以处理一定时间范围内的延迟数据
  • 适用场景:需要准确反映事件发生时间的业务场景,如实时监控、计费、数据分析等

1.2 水印(Watermark)机制

水印是 Flink 用于处理乱序数据的重要机制:

  • 定义:水印表示"小于该时间戳的数据应该都已经到达"
  • 作用:触发窗口计算,处理延迟数据
  • 设置:通常设置为事件时间减去一个固定的延迟时间
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

这表示允许 5 秒的延迟,即如果当前水印是 10:00:10,那么 10:00:05 之前的数据应该都已经到达。

2. 滚动窗口(TUMBLE)语法

2.1 基本语法

TUMBLE(event_time_attr, size_interval)
  • event_time_attr:事件时间属性字段
  • size_interval:窗口大小,如 INTERVAL '1' MINUTEINTERVAL '10' SECOND

2.2 窗口辅助函数

-- 窗口开始时间(包含边界)
TUMBLE_START(event_time_attr, size_interval)-- 窗口结束时间(不包含边界)
TUMBLE_END(event_time_attr, size_interval)-- 窗口结束时间(不包含边界,rowtime 属性,用于下游操作)
TUMBLE_ROWTIME(event_time_attr, size_interval)

3. 完整示例:用户点击统计(使用 Kafka + Flink SQL Client)

3.1 环境准备

3.1.1 启动 Kafka

确保 Kafka 已安装并运行:

# 启动 Zookeeper(如果使用旧版本 Kafka)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
3.1.2 创建 Kafka 主题
# 创建主题 user_clicks,用于用户点击事件
kafka-topics.sh --create \--topic user_clicks \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 验证主题创建成功
kafka-topics.sh --list --bootstrap-server localhost:9092
3.1.3 启动 Flink 集群
# 启动 Flink Standalone 集群
./bin/start-cluster.sh# 启动 Flink SQL Client
./bin/sql-client.sh

3.2 在 Flink SQL Client 中创建表

3.2.1 创建 Kafka 源表(事件时间)

在 Flink SQL Client 中执行以下 SQL:

-- 创建用户点击事件表(事件时间)
CREATE TABLE user_clicks (user_id STRING,click_time TIMESTAMP(3),url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_clicks','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);

参数说明

  • connector = 'kafka':使用 Kafka 连接器
  • topic = 'user_clicks':Kafka 主题名称
  • properties.bootstrap.servers:Kafka Broker 地址
  • format = 'json':数据格式为 JSON
  • WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND:定义水印,允许 5 秒延迟
  • scan.startup.mode = 'earliest-offset':从最早的数据开始读取
3.2.2 创建结果输出表(可选,用于观察结果)
-- 创建打印表,用于观察窗口计算结果
CREATE TABLE window_results (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);

3.3 定义滚动窗口查询

3.3.1 基本滚动窗口查询

计算每 1 分钟内每个用户的点击次数:

SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
3.3.2 将结果写入打印表(用于观察)
-- 将窗口计算结果写入打印表
INSERT INTO window_results
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;

3.4 通过 Kafka 发送测试数据

3.4.1 启动 Kafka 生产者

打开新的终端窗口,执行:

kafka-console-producer.sh \--topic user_clicks \--bootstrap-server localhost:9092
3.4.2 发送测试数据

在生产者终端中,逐行输入以下 JSON 数据(注意时间戳的顺序和间隔):

{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:00:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:00:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:01:05","url":"/about"}
{"user_id":"user2","click_time":"2025-11-07 10:01:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:01:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:01:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:02:05","url":"/about"}

重要提示

  • 每行一个完整的 JSON 对象
  • 时间格式:YYYY-MM-DD HH:mm:ss
  • 时间戳按顺序递增,便于观察窗口分组
3.4.3 发送乱序数据(测试水印机制)

为了测试水印和乱序数据处理,可以发送一些延迟到达的数据:

{"user_id":"user1","click_time":"2025-11-07 10:00:15","url":"/delayed"}
{"user_id":"user2","click_time":"2025-11-07 10:00:25","url":"/delayed"}

这些数据的时间戳在之前发送的数据之间,用于测试水印机制如何处理乱序数据。

3.5 观察窗口输出结果

3.5.1 在 Flink SQL Client 中观察

执行查询后,Flink SQL Client 会实时显示窗口计算结果。输出示例:

+----+-------------------------+-------------------------+--------+-------------+
| op |          window_start   |          window_end     | user_id| click_count |
+----+-------------------------+-------------------------+--------+-------------+
| +I | 2025-11-07 10:00:00.000| 2025-11-07 10:01:00.000| user1  |           3 |
| +I | 2025-11-07 10:00:00.000| 2025-11-07 10:01:00.000| user2  |           2 |
| +I | 2025-11-07 10:01:00.000| 2025-11-07 10:02:00.000| user1  |           2 |
| +I | 2025-11-07 10:01:00.000| 2025-11-07 10:02:00.000| user2  |           2 |
| +I | 2025-11-07 10:02:00.000| 2025-11-07 10:03:00.000| user1  |           1 |
+----+-------------------------+-------------------------+--------+-------------+

输出说明

  • op 列:+I 表示插入(Insert),-U 表示更新前值,+U 表示更新后值
  • window_start:窗口开始时间(包含边界)
  • window_end:窗口结束时间(不包含边界)
  • user_id:用户ID
  • click_count:该窗口内该用户的点击次数
3.5.2 在 Flink Web UI 中观察
  1. 访问 Flink Web UI:http://localhost:8081
  2. 查看运行中的作业
  3. 点击作业名称,查看详细信息和指标
  4. 在 “Metrics” 标签页中查看窗口相关的指标

3.6 完整示例:订单金额统计

3.6.1 创建订单主题
kafka-topics.sh --create \--topic orders \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
3.6.2 创建订单表
CREATE TABLE orders (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
3.6.3 计算每 5 分钟内的订单总金额
SELECTTUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
3.6.4 发送订单数据
kafka-console-producer.sh \--topic orders \--bootstrap-server localhost:9092

输入数据:

{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:00:05"}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99,"order_time":"2025-11-07 10:02:10"}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99,"order_time":"2025-11-07 10:03:20"}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:04:30"}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99,"order_time":"2025-11-07 10:06:45"}

4. 窗口触发机制

4.1 水印触发

滚动窗口的触发依赖于水印:

  1. 水印推进:当新数据到达时,系统会更新水印
  2. 窗口触发:当水印超过窗口结束时间时,窗口会被触发计算
  3. 延迟数据:在水印允许的延迟范围内到达的数据仍会被包含在窗口中

4.2 示例说明

假设窗口大小为 1 分钟,水印延迟为 5 秒:

  • 窗口 [10:00:00, 10:01:00) 会在水印达到 10:01:00 时触发
  • 如果水印是 10:01:05,那么 10:00:55 之前的数据都应该已经到达
  • 如果 10:00:50 的数据在水印 10:01:05 之后到达,它可能不会被包含在窗口中(取决于具体的水印策略)

5. 最佳实践

5.1 水印延迟设置

  • 太小:可能导致延迟数据被丢弃,结果不准确
  • 太大:窗口触发延迟,实时性差
  • 建议:根据业务场景和数据延迟情况设置,通常设置为最大延迟时间的 1.5-2 倍

5.2 窗口大小选择

  • 太小:窗口数量多,计算开销大
  • 太大:结果延迟高,实时性差
  • 建议:根据业务需求和数据特征选择,常见的有 1 分钟、5 分钟、1 小时等

5.3 时间格式

  • 确保 JSON 数据中的时间格式与 SQL 中定义的时间格式一致
  • 推荐使用 TIMESTAMP(3) 精度(毫秒级)
  • 时间格式:YYYY-MM-DD HH:mm:ssYYYY-MM-DDTHH:mm:ss

5.4 错误处理

-- 忽略 JSON 解析错误
'json.ignore-parse-errors' = 'true'-- 或者使用更严格的错误处理
'json.fail-on-missing-field' = 'false'

6. 常见问题

6.1 窗口没有输出

可能原因

  • 水印没有推进(数据时间戳没有递增)
  • 窗口大小设置过大
  • 数据格式错误

解决方法

  • 检查数据时间戳是否递增
  • 减小窗口大小进行测试
  • 检查 JSON 格式是否正确

6.2 延迟数据丢失

可能原因

  • 水印延迟设置过小
  • 数据延迟超过水印允许范围

解决方法

  • 增加水印延迟时间
  • 检查数据源是否有延迟

6.3 时间格式错误

错误信息Cannot parse timestamp

解决方法

  • 确保 JSON 中的时间格式与 SQL 定义一致
  • 使用 TIMESTAMP(3) 并确保格式为 YYYY-MM-DD HH:mm:ss

7. 调试技巧

7.1 查看原始数据

-- 查看原始数据流
SELECT * FROM user_clicks;

7.2 查看水印

-- 查看当前水印(需要自定义函数或查看 Flink Web UI)
SELECT CURRENT_WATERMARK(click_time) FROM user_clicks;

7.3 使用 Flink Web UI

  • 访问 http://localhost:8081
  • 查看作业的 Metrics
  • 查看窗口相关的指标,如窗口延迟、水印延迟等

8. 参考资源

  • Flink 1.20 窗口函数官方文档
  • Flink 1.20 Kafka 连接器文档
  • Flink 1.20 时间属性文档

处理时间滚动窗口详细说明(Flink 1.20)

1. 处理时间概述

处理时间(Processing Time)是指数据被 Flink 处理时的系统时间。使用处理时间进行窗口操作时,系统不考虑数据的实际发生时间,窗口的触发完全依赖于系统处理数据的时间。

1.1 处理时间的特点

  • 简单性:不需要定义水印,实现简单
  • 实时性:窗口触发快,延迟低
  • 不确定性:结果依赖于数据到达时间,不具有确定性
  • 适用场景:对时间顺序要求不严格,关注快速响应的场景,如实时监控、日志分析等

1.2 处理时间 vs 事件时间

特性处理时间事件时间
时间来源系统处理时间事件本身携带的时间
水印不需要需要
乱序处理不考虑通过水印处理
结果确定性不确定确定
实现复杂度简单复杂
适用场景快速响应、对时间不敏感准确时间、对时间敏感

2. 滚动窗口(TUMBLE)语法

2.1 基本语法

TUMBLE(PROCTIME(), size_interval)
  • PROCTIME():处理时间属性函数
  • size_interval:窗口大小,如 INTERVAL '1' MINUTEINTERVAL '10' SECOND

2.2 窗口辅助函数

-- 窗口开始时间(包含边界)
TUMBLE_START(PROCTIME(), size_interval)-- 窗口结束时间(不包含边界)
TUMBLE_END(PROCTIME(), size_interval)-- 窗口结束时间(不包含边界,proctime 属性,用于下游操作)
TUMBLE_PROCTIME(PROCTIME(), size_interval)

3. 完整示例:用户点击统计(使用 Kafka + Flink SQL Client)

3.1 环境准备

3.1.1 启动 Kafka

确保 Kafka 已安装并运行:

# 启动 Zookeeper(如果使用旧版本 Kafka)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
3.1.2 创建 Kafka 主题
# 创建主题 user_clicks_proc,用于用户点击事件(处理时间)
kafka-topics.sh --create \--topic user_clicks_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 验证主题创建成功
kafka-topics.sh --list --bootstrap-server localhost:9092
3.1.3 启动 Flink 集群
# 启动 Flink Standalone 集群
./bin/start-cluster.sh# 启动 Flink SQL Client
./bin/sql-client.sh

3.2 在 Flink SQL Client 中创建表

3.2.1 创建 Kafka 源表(处理时间)

在 Flink SQL Client 中执行以下 SQL:

-- 创建用户点击事件表(处理时间)
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);

参数说明

  • connector = 'kafka':使用 Kafka 连接器
  • topic = 'user_clicks_proc':Kafka 主题名称
  • properties.bootstrap.servers:Kafka Broker 地址
  • format = 'json':数据格式为 JSON
  • proc_time AS PROCTIME():定义处理时间属性,系统自动生成
  • 注意:处理时间不需要时间字段,也不需要水印
3.2.2 创建结果输出表(可选,用于观察结果)
-- 创建打印表,用于观察窗口计算结果
CREATE TABLE window_results_proc (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);

3.3 定义滚动窗口查询

3.3.1 基本滚动窗口查询

计算每 1 分钟内每个用户的点击次数(基于处理时间):

SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
3.3.2 将结果写入打印表(用于观察)
-- 将窗口计算结果写入打印表
INSERT INTO window_results_proc
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;

3.4 通过 Kafka 发送测试数据

3.4.1 启动 Kafka 生产者

打开新的终端窗口,执行:

kafka-console-producer.sh \--topic user_clicks_proc \--bootstrap-server localhost:9092
3.4.2 发送测试数据

在生产者终端中,逐行输入以下 JSON 数据(注意:处理时间不需要时间字段):

{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
{"user_id":"user2","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}

重要提示

  • 每行一个完整的 JSON 对象
  • 不需要包含时间字段,处理时间由系统自动生成
  • 可以快速连续发送多条数据,观察窗口如何按处理时间分组
3.4.3 分批发送数据(观察窗口触发)

为了更清楚地观察窗口触发机制,可以分批发送数据:

第一批(快速发送,应该在同一窗口内):

{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}

等待 1 分钟后,发送第二批(应该在新窗口内):

{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}

3.5 观察窗口输出结果

3.5.1 在 Flink SQL Client 中观察

执行查询后,Flink SQL Client 会实时显示窗口计算结果。输出示例:

+----+-------------------------+-------------------------+--------+-------------+
| op |          window_start   |          window_end     | user_id| click_count |
+----+-------------------------+-------------------------+--------+-------------+
| +I | 2025-11-07 14:30:00.000| 2025-11-07 14:31:00.000| user1  |           3 |
| +I | 2025-11-07 14:30:00.000| 2025-11-07 14:31:00.000| user2  |           2 |
| +I | 2025-11-07 14:31:00.000| 2025-11-07 14:32:00.000| user1  |           2 |
| +I | 2025-11-07 14:31:00.000| 2025-11-07 14:32:00.000| user2  |           1 |
+----+-------------------------+-------------------------+--------+-------------+

输出说明

  • op 列:+I 表示插入(Insert)
  • window_start:窗口开始时间(系统处理时间,包含边界)
  • window_end:窗口结束时间(系统处理时间,不包含边界)
  • user_id:用户ID
  • click_count:该窗口内该用户的点击次数
  • 注意:窗口时间是基于系统处理时间,不是数据中的时间
3.5.2 在 Flink Web UI 中观察
  1. 访问 Flink Web UI:http://localhost:8081
  2. 查看运行中的作业
  3. 点击作业名称,查看详细信息和指标
  4. 在 “Metrics” 标签页中查看窗口相关的指标

3.6 完整示例:订单金额统计

3.6.1 创建订单主题
kafka-topics.sh --create \--topic orders_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
3.6.2 创建订单表
CREATE TABLE orders_proc (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
3.6.3 计算每 5 分钟内的订单总金额
SELECTTUMBLE_START(proc_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_proc
GROUP BY TUMBLE(proc_time, INTERVAL '5' MINUTE);
3.6.4 发送订单数据
kafka-console-producer.sh \--topic orders_proc \--bootstrap-server localhost:9092

输入数据(不需要时间字段):

{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99}

4. 窗口触发机制

4.1 基于系统时间触发

处理时间滚动窗口的触发完全依赖于系统时间:

  1. 系统时间推进:窗口根据系统当前时间进行分组
  2. 窗口触发:当系统时间超过窗口结束时间时,窗口会被触发计算
  3. 实时性:窗口触发快,不需要等待延迟数据

4.2 示例说明

假设窗口大小为 1 分钟,当前系统时间是 14:30:15

  • 所有在 14:30:0014:30:15 之间处理的数据会被分配到窗口 [14:30:00, 14:31:00)
  • 当系统时间达到 14:31:00 时,窗口 [14:30:00, 14:31:00) 会被触发
  • 之后处理的数据会被分配到新窗口 [14:31:00, 14:32:00)

4.3 与事件时间的区别

  • 事件时间:窗口基于数据中的时间戳,需要等待延迟数据,结果确定
  • 处理时间:窗口基于系统时间,立即触发,结果不确定(依赖于数据到达时间)

5. 最佳实践

5.1 适用场景

处理时间窗口适用于:

  • 实时监控:需要快速响应的监控场景
  • 日志分析:对时间顺序要求不严格的日志分析
  • 快速统计:不需要准确时间语义的统计场景
  • 低延迟要求:对延迟敏感,需要快速输出的场景

5.2 窗口大小选择

  • 太小:窗口数量多,计算开销大,但实时性好
  • 太大:结果延迟高,但计算开销小
  • 建议:根据业务需求选择,常见的有 10 秒、1 分钟、5 分钟等

5.3 数据格式

  • 不需要时间字段:处理时间由系统自动生成
  • JSON 格式简单,只需要业务字段
  • 确保 JSON 格式正确,每行一个完整的 JSON 对象

5.4 错误处理

-- 忽略 JSON 解析错误
'json.ignore-parse-errors' = 'true'-- 或者使用更严格的错误处理
'json.fail-on-missing-field' = 'false'

6. 常见问题

6.1 窗口没有输出

可能原因

  • 窗口大小设置过大,还没有到触发时间
  • 数据格式错误
  • Kafka 连接问题

解决方法

  • 减小窗口大小进行测试(如 10 秒)
  • 检查 JSON 格式是否正确
  • 检查 Kafka 连接配置

6.2 窗口时间不准确

说明:这是处理时间的正常特性,窗口时间基于系统处理时间,不是数据发生时间。

解决方法

  • 如果需要准确的时间语义,应该使用事件时间
  • 处理时间适用于对时间不敏感的场景

6.3 数据顺序问题

说明:处理时间不考虑数据顺序,先到达的数据先处理。

解决方法

  • 如果需要保证数据顺序,应该使用事件时间
  • 或者在前端保证数据顺序

7. 调试技巧

7.1 查看原始数据

-- 查看原始数据流和处理时间
SELECT user_id,url,proc_time
FROM user_clicks_proc;

7.2 查看当前系统时间

-- 查看当前处理时间
SELECT PROCTIME() AS current_proc_time,user_id,url
FROM user_clicks_proc;

7.3 使用 Flink Web UI

  • 访问 http://localhost:8081
  • 查看作业的 Metrics
  • 查看窗口相关的指标,如窗口延迟、处理延迟等

7.4 测试窗口触发

  • 快速发送一批数据,观察是否在同一窗口
  • 等待窗口大小的时间后,再发送数据,观察是否在新窗口
  • 使用较小的窗口大小(如 10 秒)进行测试

8. 处理时间 vs 事件时间对比

8.1 配置对比

配置项处理时间事件时间
时间字段不需要需要
水印不需要需要
表定义proc_time AS PROCTIME()event_time TIMESTAMP(3), WATERMARK FOR event_time AS ...
窗口函数TUMBLE(proc_time, ...)TUMBLE(event_time, ...)

8.2 数据格式对比

处理时间(不需要时间字段):

{"user_id":"user1","url":"/home"}

事件时间(需要时间字段):

{"user_id":"user1","url":"/home","click_time":"2025-11-07 10:00:05"}

8.3 使用场景对比

  • 处理时间:快速响应、实时监控、日志分析
  • 事件时间:准确时间、计费系统、数据分析

9. 参考资源

  • Flink 1.20 窗口函数官方文档
  • Flink 1.20 Kafka 连接器文档
  • Flink 1.20 时间属性文档

测试数据和测试用例(Flink 1.20 滚动窗口)

本文档提供详细的测试数据、测试用例和预期结果,用于验证 Flink SQL 滚动窗口函数的正确性。所有测试均使用 Flink SQL Client 和 Kafka 进行。

1. 测试环境准备

1.1 测试数据格式

所有测试数据使用 JSON 格式,便于在 Kafka 中使用。

事件时间数据格式

{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}

处理时间数据格式

{"user_id":"user1","url":"/home"}

1.2 测试工具

  • Flink SQL Client:用于执行 SQL 查询
  • Kafka:用于数据源(必须)
  • Print Connector:用于结果输出

1.3 测试步骤

  1. 创建 Kafka 主题
  2. 在 Flink SQL Client 中创建表
  3. 定义窗口查询
  4. 通过 Kafka 发送测试数据
  5. 观察窗口输出结果
  6. 验证结果是否符合预期

2. 事件时间滚动窗口测试

2.1 测试用例 1:用户点击统计(基本功能)

测试目标

验证基于事件时间的滚动窗口能够正确统计每个用户每 1 分钟内的点击次数。

表定义
CREATE TABLE user_clicks_event (user_id STRING,click_time TIMESTAMP(3),url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_clicks_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-test','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
测试数据

通过 Kafka 生产者发送以下数据:

{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:00:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:00:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:01:05","url":"/about"}
{"user_id":"user2","click_time":"2025-11-07 10:01:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:01:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:01:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:02:05","url":"/about"}
测试查询
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_event
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
预期结果
window_startwindow_enduser_idclick_count
2025-11-07 10:00:002025-11-07 10:01:00user13
2025-11-07 10:00:002025-11-07 10:01:00user22
2025-11-07 10:01:002025-11-07 10:02:00user12
2025-11-07 10:01:002025-11-07 10:02:00user22
2025-11-07 10:02:002025-11-07 10:03:00user11
结果验证点
  1. ✅ 窗口大小正确:每个窗口覆盖 1 分钟
  2. ✅ 窗口不重叠:窗口之间没有重叠
  3. ✅ 窗口对齐:窗口对齐到整分钟(10:00:00, 10:01:00, 10:02:00)
  4. ✅ 聚合结果正确:每个窗口内的点击次数正确
  5. ✅ 分组正确:按用户分组统计
  6. ✅ 窗口时间基于事件时间:窗口时间基于数据中的 click_time 字段

2.2 测试用例 2:乱序数据处理

测试目标

验证基于事件时间的滚动窗口能够正确处理乱序数据。

表定义

同测试用例 1。

测试数据

发送包含乱序数据的测试数据:

{"user_id":"user1","click_time":"2025-11-07 10:00:10","url":"/first"}
{"user_id":"user1","click_time":"2025-11-07 10:00:30","url":"/third"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/second"}  -- 乱序:实际时间在 first 和 third 之间
{"user_id":"user1","click_time":"2025-11-07 10:00:50","url":"/fourth"}

注意:第三条数据的时间戳是 10:00:20,但到达顺序在 10:00:30 之后。

测试查询

同测试用例 1。

预期结果

如果水印延迟设置为 5 秒,且乱序数据在水印允许范围内到达,应该能正确处理:

window_startwindow_enduser_idclick_count
2025-11-07 10:00:002025-11-07 10:01:00user14

所有 4 条数据都应该被包含在窗口 [10:00:00, 10:01:00) 中。

结果验证点
  1. ✅ 乱序数据处理:乱序数据被正确分配到对应窗口
  2. ✅ 水印机制:水印机制正确处理延迟数据
  3. ✅ 结果准确性:结果准确反映事件发生时间

2.3 测试用例 3:延迟数据处理

测试目标

验证基于事件时间的滚动窗口如何处理延迟数据。

表定义

同测试用例 1(水印延迟 5 秒)。

测试数据
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/normal1"}
{"user_id":"user1","click_time":"2025-11-07 10:00:10","url":"/normal2"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/normal3"}
-- 等待窗口触发后(水印超过 10:01:00),再发送延迟数据
{"user_id":"user1","click_time":"2025-11-07 10:00:15","url":"/delayed"}  -- 延迟数据
预期结果

如果延迟数据在水印允许范围内到达(5 秒内),应该被包含在窗口中。如果超过水印允许范围,可能不会被包含。

结果验证点
  1. ✅ 延迟数据处理:延迟数据根据水印策略处理
  2. ✅ 水印延迟设置:水印延迟设置影响延迟数据的处理

2.4 测试用例 4:订单金额统计

测试目标

验证基于事件时间的滚动窗口能够正确统计订单金额。

表定义
CREATE TABLE orders_event (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-test','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
测试数据
{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:00:05"}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99,"order_time":"2025-11-07 10:02:10"}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99,"order_time":"2025-11-07 10:03:20"}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:04:30"}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99,"order_time":"2025-11-07 10:06:45"}
测试查询
SELECTTUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_event
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
预期结果
window_startwindow_endtotal_amountorder_count
2025-11-07 10:00:002025-11-07 10:05:00599.974
2025-11-07 10:05:002025-11-07 10:10:00399.991
结果验证点
  1. ✅ 金额聚合正确:SUM 函数正确计算总金额
  2. ✅ 计数正确:COUNT 函数正确计算订单数量
  3. ✅ 窗口分组正确:按 5 分钟窗口正确分组

3. 处理时间滚动窗口测试

3.1 测试用例 1:用户点击统计(基本功能)

测试目标

验证基于处理时间的滚动窗口能够正确统计每个用户每 1 分钟内的点击次数。

表定义
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-test','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
测试数据

通过 Kafka 生产者发送以下数据(不需要时间字段):

{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
{"user_id":"user2","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
测试查询
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
预期结果

窗口时间基于系统处理时间,假设当前系统时间是 14:30:xx

window_startwindow_enduser_idclick_count
2025-11-07 14:30:002025-11-07 14:31:00user15
2025-11-07 14:30:002025-11-07 14:31:00user23
2025-11-07 14:31:002025-11-07 14:32:00user12
2025-11-07 14:31:002025-11-07 14:32:00user20

注意:实际窗口时间取决于系统处理时间,每次运行可能不同。

结果验证点
  1. ✅ 窗口大小正确:每个窗口覆盖 1 分钟
  2. ✅ 窗口不重叠:窗口之间没有重叠
  3. ✅ 窗口对齐:窗口对齐到整分钟
  4. ✅ 聚合结果正确:每个窗口内的点击次数正确
  5. ✅ 分组正确:按用户分组统计
  6. ✅ 窗口时间基于处理时间:窗口时间基于系统处理时间

3.2 测试用例 2:窗口触发测试

测试目标

验证处理时间窗口的触发机制。

表定义

同测试用例 1。

测试步骤
  1. 第一批数据(快速发送,应该在同一窗口内):
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
  1. 等待 1 分钟(让窗口触发)

  2. 第二批数据(应该在新窗口内):

{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
预期结果
  • 第一批数据应该在同一窗口内
  • 等待 1 分钟后,窗口应该触发
  • 第二批数据应该在新窗口内
结果验证点
  1. ✅ 窗口触发:窗口在系统时间超过窗口结束时间时触发
  2. ✅ 窗口分组:数据按处理时间正确分配到窗口

3.3 测试用例 3:订单金额统计

测试目标

验证基于处理时间的滚动窗口能够正确统计订单金额。

表定义
CREATE TABLE orders_proc (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-test','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
测试数据
{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99}
测试查询
SELECTTUMBLE_START(proc_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_proc
GROUP BY TUMBLE(proc_time, INTERVAL '5' MINUTE);
预期结果

窗口时间基于系统处理时间,所有数据如果在同一 5 分钟窗口内处理,应该在同一窗口:

window_startwindow_endtotal_amountorder_count
2025-11-07 14:30:002025-11-07 14:35:001098.955
结果验证点
  1. ✅ 金额聚合正确:SUM 函数正确计算总金额
  2. ✅ 计数正确:COUNT 函数正确计算订单数量
  3. ✅ 窗口分组正确:按 5 分钟窗口正确分组

4. 对比测试

4.1 测试用例:相同数据,不同时间语义

测试目标

对比事件时间和处理时间对相同数据的处理结果。

测试步骤
  1. 准备相同的数据(事件时间版本):
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
  1. 发送到事件时间主题,观察结果

  2. 发送到处理时间主题(去掉时间字段),观察结果

预期差异
  • 事件时间:窗口时间基于数据中的时间(10:00:xx
  • 处理时间:窗口时间基于系统时间(可能是 14:30:xx 或其他时间)
结果验证点
  1. ✅ 时间来源不同:事件时间基于数据时间,处理时间基于系统时间
  2. ✅ 结果确定性:事件时间结果确定,处理时间结果不确定

5. 性能测试

5.1 测试用例:大量数据测试

测试目标

验证窗口函数在处理大量数据时的性能。

测试数据

生成大量测试数据(如 10000 条),通过 Kafka 发送。

测试指标
  • 吞吐量(records/second)
  • 延迟(latency)
  • 内存使用
  • CPU 使用
结果验证点
  1. ✅ 性能指标:性能指标在可接受范围内
  2. ✅ 资源使用:资源使用合理

6. 错误处理测试

6.1 测试用例:格式错误数据

测试目标

验证窗口函数如何处理格式错误的数据。

测试数据
{"user_id":"user1","click_time":"invalid-time","url":"/home"}  -- 时间格式错误
{"user_id":"user2","url":"/about"}  -- 缺少时间字段(事件时间)
{"invalid json"  -- JSON 格式错误
预期结果
  • 格式错误的数据应该被忽略或记录错误
  • 不影响其他正常数据的处理
结果验证点
  1. ✅ 错误处理:错误数据被正确处理
  2. ✅ 容错性:不影响正常数据处理

7. 测试数据文件

测试数据文件位于 test_data/ 目录:

  • user_clicks_event_time.json - 事件时间用户点击数据
  • orders_event_time.json - 事件时间订单数据
  • user_clicks_processing_time.json - 处理时间用户点击数据
  • orders_processing_time.json - 处理时间订单数据

8. 测试执行步骤

8.1 完整测试流程

  1. 环境准备

    # 启动 Kafka
    bin/kafka-server-start.sh config/server.properties# 启动 Flink
    ./bin/start-cluster.sh
    ./bin/sql-client.sh
    
  2. 创建 Kafka 主题

    kafka-topics.sh --create \--topic user_clicks_event \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
    
  3. 在 Flink SQL Client 中创建表

    CREATE TABLE user_clicks_event (...);
    
  4. 定义窗口查询

    INSERT INTO window_results_event
    SELECT ... FROM user_clicks_event
    GROUP BY TUMBLE(...);
    
  5. 发送测试数据

    kafka-console-producer.sh \--topic user_clicks_event \--bootstrap-server localhost:9092
    
  6. 观察和验证结果

8.2 自动化测试建议

  • 使用脚本自动化测试流程
  • 使用断言验证结果
  • 记录测试结果和性能指标

9. 参考资源

  • Flink 1.20 窗口函数官方文档
  • Flink 1.20 测试指南

Flink SQL 滚动窗口完整示例(Flink 1.20)

本文档提供可运行的完整示例,包括表定义、Kafka 命令操作、查询语句和结果说明。所有示例均使用 Flink SQL Client 执行。

1. 环境准备

1.1 前置条件

  • Flink 1.20:确保已安装 Flink 1.20 版本
  • Kafka:用于数据源(必须)
  • Flink SQL Client:用于执行 SQL 查询

1.2 启动服务

1.2.1 启动 Kafka
# 启动 Zookeeper(如果使用旧版本 Kafka)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
1.2.2 启动 Flink
# 启动 Flink Standalone 集群
./bin/start-cluster.sh# 启动 Flink SQL Client
./bin/sql-client.sh

1.3 验证环境

# 验证 Kafka 运行
kafka-topics.sh --list --bootstrap-server localhost:9092# 验证 Flink 运行
# 访问 http://localhost:8081 查看 Flink Web UI

2. 事件时间滚动窗口完整示例

2.1 示例场景

统计每个用户每 1 分钟内的点击次数,基于事件时间。

2.2 完整操作步骤

步骤 1:创建 Kafka 主题
# 创建主题
kafka-topics.sh --create \--topic user_clicks_event \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 验证主题创建成功
kafka-topics.sh --list --bootstrap-server localhost:9092
步骤 2:在 Flink SQL Client 中创建表
-- ============================================
-- 事件时间滚动窗口完整示例
-- ============================================-- 1. 创建源表(从 Kafka 读取数据,事件时间)
CREATE TABLE user_clicks_event (user_id STRING,click_time TIMESTAMP(3),url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_clicks_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-event','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);-- 2. 创建结果表(输出到 Print)
CREATE TABLE window_results_event (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
步骤 3:定义窗口查询
-- 3. 执行滚动窗口查询并插入结果
INSERT INTO window_results_event
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_event
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
步骤 4:发送测试数据

打开新的终端窗口,执行:

# 启动 Kafka 生产者
kafka-console-producer.sh \--topic user_clicks_event \--bootstrap-server localhost:9092

在生产者终端中,逐行输入以下 JSON 数据:

{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:00:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:00:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:01:05","url":"/about"}
{"user_id":"user2","click_time":"2025-11-07 10:01:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:01:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:01:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:02:05","url":"/about"}

重要提示

  • 每行一个完整的 JSON 对象
  • 时间格式:YYYY-MM-DD HH:mm:ss
  • 时间戳按顺序递增
步骤 5:观察窗口输出

在 Flink SQL Client 中会看到实时输出:

+I[2025-11-07T10:00:00, 2025-11-07T10:01:00, user1, 3]
+I[2025-11-07T10:00:00, 2025-11-07T10:01:00, user2, 2]
+I[2025-11-07T10:01:00, 2025-11-07T10:02:00, user1, 2]
+I[2025-11-07T10:01:00, 2025-11-07T10:02:00, user2, 2]
+I[2025-11-07T10:02:00, 2025-11-07T10:03:00, user1, 1]

输出说明

  • +I:插入操作
  • 第一列:窗口开始时间(包含边界)
  • 第二列:窗口结束时间(不包含边界)
  • 第三列:用户ID
  • 第四列:点击次数

2.3 测试乱序数据

为了测试水印机制,可以发送一些乱序数据:

{"user_id":"user1","click_time":"2025-11-07 10:00:15","url":"/delayed1"}
{"user_id":"user2","click_time":"2025-11-07 10:00:25","url":"/delayed2"}

这些数据的时间戳在之前发送的数据之间,用于测试水印机制如何处理乱序数据。

3. 处理时间滚动窗口完整示例

3.1 示例场景

统计每个用户每 1 分钟内的点击次数,基于处理时间。

3.2 完整操作步骤

步骤 1:创建 Kafka 主题
# 创建主题
kafka-topics.sh --create \--topic user_clicks_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
步骤 2:在 Flink SQL Client 中创建表
-- ============================================
-- 处理时间滚动窗口完整示例
-- ============================================-- 1. 创建源表(从 Kafka 读取数据,处理时间)
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-proc','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);-- 2. 创建结果表(输出到 Print)
CREATE TABLE window_results_proc (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
步骤 3:定义窗口查询
-- 3. 执行滚动窗口查询并插入结果
INSERT INTO window_results_proc
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
步骤 4:发送测试数据

打开新的终端窗口,执行:

# 启动 Kafka 生产者
kafka-console-producer.sh \--topic user_clicks_proc \--bootstrap-server localhost:9092

在生产者终端中,逐行输入以下 JSON 数据(注意:不需要时间字段):

{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
{"user_id":"user2","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}

重要提示

  • 每行一个完整的 JSON 对象
  • 不需要包含时间字段
  • 可以快速连续发送多条数据
步骤 5:观察窗口输出

在 Flink SQL Client 中会看到实时输出:

+I[2025-11-07T14:30:00, 2025-11-07T14:31:00, user1, 5]
+I[2025-11-07T14:30:00, 2025-11-07T14:31:00, user2, 3]
+I[2025-11-07T14:31:00, 2025-11-07T14:32:00, user1, 2]
+I[2025-11-07T14:31:00, 2025-11-07T14:32:00, user2, 0]

输出说明

  • +I:插入操作
  • 第一列:窗口开始时间(系统处理时间,包含边界)
  • 第二列:窗口结束时间(系统处理时间,不包含边界)
  • 第三列:用户ID
  • 第四列:点击次数
  • 注意:窗口时间基于系统处理时间,不是数据中的时间

3.3 测试窗口触发

为了更清楚地观察窗口触发机制,可以分批发送数据:

第一批(快速发送,应该在同一窗口内):

{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}

等待 1 分钟后,发送第二批(应该在新窗口内):

{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}

4. 订单金额统计示例

4.1 事件时间版本

步骤 1:创建主题
kafka-topics.sh --create \--topic orders_event \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
步骤 2:创建表
-- 创建订单表(事件时间)
CREATE TABLE orders_event (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-orders','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);-- 创建结果表
CREATE TABLE order_statistics_event (window_start TIMESTAMP(3),window_end TIMESTAMP(3),total_amount DECIMAL(10, 2),order_count BIGINT
) WITH ('connector' = 'print'
);
步骤 3:定义查询
-- 计算每 5 分钟内的订单总金额
INSERT INTO order_statistics_event
SELECTTUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_event
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
步骤 4:发送数据
kafka-console-producer.sh \--topic orders_event \--bootstrap-server localhost:9092

输入数据:

{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:00:05"}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99,"order_time":"2025-11-07 10:02:10"}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99,"order_time":"2025-11-07 10:03:20"}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99,"order_time":"2025-11-07 10:04:30"}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99,"order_time":"2025-11-07 10:06:45"}

4.2 处理时间版本

步骤 1:创建主题
kafka-topics.sh --create \--topic orders_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
步骤 2:创建表
-- 创建订单表(处理时间)
CREATE TABLE orders_proc (order_id BIGINT,user_id BIGINT,product_id BIGINT,amount DECIMAL(10, 2),proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-orders-proc','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);-- 创建结果表
CREATE TABLE order_statistics_proc (window_start TIMESTAMP(3),window_end TIMESTAMP(3),total_amount DECIMAL(10, 2),order_count BIGINT
) WITH ('connector' = 'print'
);
步骤 3:定义查询
-- 计算每 5 分钟内的订单总金额
INSERT INTO order_statistics_proc
SELECTTUMBLE_START(proc_time, INTERVAL '5' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '5' MINUTE) AS window_end,SUM(amount) AS total_amount,COUNT(*) AS order_count
FROM orders_proc
GROUP BY TUMBLE(proc_time, INTERVAL '5' MINUTE);
步骤 4:发送数据
kafka-console-producer.sh \--topic orders_proc \--bootstrap-server localhost:9092

输入数据(不需要时间字段):

{"order_id":1001,"user_id":101,"product_id":2001,"amount":99.99}
{"order_id":1002,"user_id":102,"product_id":2002,"amount":199.99}
{"order_id":1003,"user_id":101,"product_id":2003,"amount":299.99}
{"order_id":1004,"user_id":103,"product_id":2001,"amount":99.99}
{"order_id":1005,"user_id":102,"product_id":2004,"amount":399.99}

5. 调试和验证

5.1 查看原始数据流

-- 查看事件时间数据流
SELECT * FROM user_clicks_event;-- 查看处理时间数据流
SELECT user_id,url,proc_time
FROM user_clicks_proc;

5.2 使用 Flink Web UI

  1. 访问 http://localhost:8081
  2. 查看运行中的作业
  3. 点击作业名称,查看详细信息
  4. 在 “Metrics” 标签页中查看:
    • 窗口延迟
    • 水印延迟(事件时间)
    • 处理延迟
    • 吞吐量

5.3 验证窗口行为

验证事件时间窗口
  1. 发送已知时间戳的数据
  2. 观察窗口时间是否基于数据中的时间
  3. 发送乱序数据,验证水印机制
  4. 验证结果是否确定(重复运行结果相同)
验证处理时间窗口
  1. 发送数据(不需要时间字段)
  2. 观察窗口时间是否基于系统时间
  3. 快速发送多条数据,验证是否在同一窗口
  4. 等待窗口大小时间后,再发送数据,验证是否在新窗口

6. 常见问题排查

6.1 窗口没有输出

可能原因

  • 水印没有推进(事件时间)
  • 窗口大小设置过大
  • 数据格式错误
  • Kafka 连接问题

解决方法

  • 检查数据时间戳是否递增(事件时间)
  • 减小窗口大小进行测试
  • 检查 JSON 格式是否正确
  • 检查 Kafka 连接配置

6.2 时间格式错误

错误信息Cannot parse timestamp

解决方法

  • 确保 JSON 中的时间格式为 YYYY-MM-DD HH:mm:ss
  • 使用 TIMESTAMP(3) 精度
  • 检查时间字段名称是否匹配

6.3 数据没有被包含在窗口中

可能原因

  • 水印延迟设置过小(事件时间)
  • 数据延迟超过水印允许范围(事件时间)
  • 窗口已经触发(事件时间)

解决方法

  • 增加水印延迟时间
  • 检查数据源是否有延迟
  • 重新发送数据

7. 性能优化建议

7.1 窗口大小选择

  • 太小:窗口数量多,计算开销大
  • 太大:结果延迟高,实时性差
  • 建议:根据业务需求选择,常见的有 1 分钟、5 分钟、1 小时

7.2 水印延迟设置

  • 太小:可能导致延迟数据被丢弃
  • 太大:窗口触发延迟,实时性差
  • 建议:设置为最大延迟时间的 1.5-2 倍

7.3 Kafka 配置优化

-- 优化 Kafka 消费配置
'properties.fetch.min.bytes' = '1024',
'properties.fetch.max.wait.ms' = '500',
'properties.max.partition.fetch.bytes' = '1048576'

事件时间与处理时间详细对比(Flink 1.20)

本文档详细对比事件时间(Event Time)和处理时间(Processing Time)在 Flink SQL 滚动窗口中的使用,所有示例均使用 Flink SQL Client 和 Kafka 进行演示。

1. 核心概念对比

1.1 定义对比

维度事件时间(Event Time)处理时间(Processing Time)
定义事件实际发生的时间,由事件本身携带数据被 Flink 处理时的系统时间
时间来源数据中的时间戳字段系统当前时间
确定性结果确定,可重复结果不确定,依赖于数据到达时间
水印需求需要定义水印不需要水印
乱序处理通过水印机制处理乱序和延迟数据不考虑乱序,按到达顺序处理
延迟容忍可以容忍一定范围内的延迟不处理延迟数据

1.2 适用场景对比

场景事件时间处理时间
实时监控✅ 需要准确时间语义✅ 快速响应
计费系统✅ 必须使用❌ 不适用
日志分析✅ 准确分析✅ 快速分析
数据分析✅ 准确结果❌ 不适用
快速响应⚠️ 可能有延迟✅ 延迟低
数据乱序✅ 可以处理❌ 不考虑

2. 配置对比

2.1 表定义对比

事件时间表定义
CREATE TABLE user_clicks_event (user_id STRING,click_time TIMESTAMP(3),  -- 事件时间字段url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND  -- 水印定义
) WITH ('connector' = 'kafka','topic' = 'user_clicks_event','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);

关键点

  • 需要定义时间字段(click_time TIMESTAMP(3)
  • 必须定义水印(WATERMARK FOR ...
  • 数据中必须包含时间字段
处理时间表定义
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()  -- 处理时间属性
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);

关键点

  • 不需要时间字段
  • 不需要水印
  • 使用 PROCTIME() 函数定义处理时间属性
  • 数据中不需要包含时间字段

2.2 窗口查询对比

事件时间窗口查询
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_event
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
处理时间窗口查询
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;

2.3 数据格式对比

事件时间数据格式
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}

要求

  • 必须包含时间字段(click_time
  • 时间格式必须正确:YYYY-MM-DD HH:mm:ss
  • 时间戳应该递增(虽然可以处理乱序)
处理时间数据格式
{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}

要求

  • 不需要时间字段
  • 只需要业务字段
  • 格式更简单

3. 完整对比示例

3.1 环境准备

3.1.1 启动 Kafka
# 启动 Kafka Broker
bin/kafka-server-start.sh config/server.properties
3.1.2 创建 Kafka 主题
# 创建事件时间主题
kafka-topics.sh --create \--topic user_clicks_event \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1# 创建处理时间主题
kafka-topics.sh --create \--topic user_clicks_proc \--bootstrap-server localhost:9092 \--partitions 1 \--replication-factor 1
3.1.3 启动 Flink SQL Client
./bin/sql-client.sh

3.2 事件时间完整示例

3.2.1 创建事件时间表
-- 在 Flink SQL Client 中执行
CREATE TABLE user_clicks_event (user_id STRING,click_time TIMESTAMP(3),url STRING,WATERMARK FOR click_time AS click_time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_clicks_event','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-event','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
3.2.2 创建结果表
CREATE TABLE window_results_event (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
3.2.3 定义窗口查询
INSERT INTO window_results_event
SELECTTUMBLE_START(click_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(click_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_event
GROUP BYTUMBLE(click_time, INTERVAL '1' MINUTE),user_id;
3.2.4 发送事件时间数据
# 启动 Kafka 生产者
kafka-console-producer.sh \--topic user_clicks_event \--bootstrap-server localhost:9092

输入数据(注意时间字段):

{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
{"user_id":"user1","click_time":"2025-11-07 10:00:35","url":"/contact"}
{"user_id":"user2","click_time":"2025-11-07 10:00:50","url":"/home"}
{"user_id":"user1","click_time":"2025-11-07 10:01:05","url":"/about"}
{"user_id":"user2","click_time":"2025-11-07 10:01:20","url":"/products"}
3.2.5 观察事件时间窗口输出

在 Flink SQL Client 中会看到:

+----+-------------------------+-------------------------+--------+-------------+
| op |          window_start   |          window_end     | user_id| click_count |
+----+-------------------------+-------------------------+--------+-------------+
| +I | 2025-11-07 10:00:00.000| 2025-11-07 10:01:00.000| user1  |           3 |
| +I | 2025-11-07 10:00:00.000| 2025-11-07 10:01:00.000| user2  |           2 |
| +I | 2025-11-07 10:01:00.000| 2025-11-07 10:02:00.000| user1  |           1 |
| +I | 2025-11-07 10:01:00.000| 2025-11-07 10:02:00.000| user2  |           1 |
+----+-------------------------+-------------------------+--------+-------------+

关键观察点

  • 窗口时间基于数据中的 click_time 字段
  • 窗口 [10:00:00, 10:01:00) 包含时间戳在 10:00:0010:00:59 之间的数据
  • 结果确定,可重复

3.3 处理时间完整示例

3.3.1 创建处理时间表
-- 在 Flink SQL Client 中执行
CREATE TABLE user_clicks_proc (user_id STRING,url STRING,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'user_clicks_proc','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-client-proc','format' = 'json','json.ignore-parse-errors' = 'true','scan.startup.mode' = 'earliest-offset'
);
3.3.2 创建结果表
CREATE TABLE window_results_proc (window_start TIMESTAMP(3),window_end TIMESTAMP(3),user_id STRING,click_count BIGINT
) WITH ('connector' = 'print'
);
3.3.3 定义窗口查询
INSERT INTO window_results_proc
SELECTTUMBLE_START(proc_time, INTERVAL '1' MINUTE) AS window_start,TUMBLE_END(proc_time, INTERVAL '1' MINUTE) AS window_end,user_id,COUNT(*) AS click_count
FROM user_clicks_proc
GROUP BYTUMBLE(proc_time, INTERVAL '1' MINUTE),user_id;
3.3.4 发送处理时间数据
# 启动 Kafka 生产者
kafka-console-producer.sh \--topic user_clicks_proc \--bootstrap-server localhost:9092

输入数据(不需要时间字段):

{"user_id":"user1","url":"/home"}
{"user_id":"user2","url":"/about"}
{"user_id":"user1","url":"/products"}
{"user_id":"user1","url":"/contact"}
{"user_id":"user2","url":"/home"}
{"user_id":"user1","url":"/about"}
{"user_id":"user2","url":"/products"}
3.3.5 观察处理时间窗口输出

在 Flink SQL Client 中会看到:

+----+-------------------------+-------------------------+--------+-------------+
| op |          window_start   |          window_end     | user_id| click_count |
+----+-------------------------+-------------------------+--------+-------------+
| +I | 2025-11-07 14:30:00.000| 2025-11-07 14:31:00.000| user1  |           3 |
| +I | 2025-11-07 14:30:00.000| 2025-11-07 14:31:00.000| user2  |           2 |
| +I | 2025-11-07 14:31:00.000| 2025-11-07 14:32:00.000| user1  |           1 |
| +I | 2025-11-07 14:31:00.000| 2025-11-07 14:32:00.000| user2  |           1 |
+----+-------------------------+-------------------------+--------+-------------+

关键观察点

  • 窗口时间基于系统处理时间(当前是 14:30:xx
  • 窗口 [14:30:00, 14:31:00) 包含在 14:30:0014:30:59 之间处理的数据
  • 结果依赖于数据到达时间,每次运行可能不同

4. 行为差异详细对比

4.1 窗口触发机制对比

事件时间窗口触发
  1. 水印推进:当新数据到达时,系统根据数据时间戳更新水印
  2. 窗口触发:当水印超过窗口结束时间时,窗口被触发
  3. 延迟数据:在水印允许范围内到达的延迟数据仍会被包含

示例

  • 窗口 [10:00:00, 10:01:00) 在水印达到 10:01:05(假设水印延迟 5 秒)时触发
  • 如果 10:00:50 的数据在水印 10:01:05 之后到达,可能不会被包含
处理时间窗口触发
  1. 系统时间推进:窗口根据系统当前时间进行分组
  2. 窗口触发:当系统时间超过窗口结束时间时,窗口被触发
  3. 实时性:窗口触发快,不需要等待延迟数据

示例

  • 窗口 [14:30:00, 14:31:00) 在系统时间达到 14:31:00 时触发
  • 所有在 14:30:0014:30:59 之间处理的数据都会被包含

4.2 乱序数据处理对比

事件时间处理乱序

场景:数据按以下顺序到达(但实际发生时间不同)

到达顺序:A(10:00:10) -> C(10:00:30) -> B(10:00:20) -> D(10:00:40)
实际时间:A(10:00:10) -> B(10:00:20) -> C(10:00:30) -> D(10:00:40)

处理方式

  • 通过水印机制,B 虽然晚到达,但仍会被分配到正确的窗口
  • 如果 B 在水印允许范围内到达,会被包含在窗口 [10:00:00, 10:01:00)
处理时间处理乱序

场景:数据按到达顺序处理

到达顺序:A -> C -> B -> D
处理顺序:A -> C -> B -> D(按到达顺序)

处理方式

  • 不考虑数据实际发生时间
  • 按到达顺序分配到窗口
  • 先到达的数据先处理,分配到较早的窗口

4.3 结果确定性对比

事件时间结果确定性
  • 确定性:相同的数据,无论何时处理,结果都相同
  • 可重复性:可以重复运行,结果一致
  • 准确性:结果准确反映事件发生的时间

测试方法

  1. 发送相同的数据两次
  2. 观察窗口结果是否相同
  3. 结果应该完全一致
处理时间结果不确定性
  • 不确定性:相同的数据,不同时间处理,结果可能不同
  • 不可重复:每次运行结果可能不同
  • 依赖到达时间:结果依赖于数据到达时间

测试方法

  1. 发送相同的数据两次(间隔不同时间)
  2. 观察窗口结果是否相同
  3. 结果可能不同(因为系统时间不同)

5. 性能对比

5.1 计算开销

维度事件时间处理时间
水印计算需要不需要
乱序处理需要额外开销不需要
内存占用较高(需要缓存延迟数据)较低
CPU 开销较高较低
整体性能相对较慢相对较快

5.2 延迟对比

维度事件时间处理时间
窗口触发延迟需要等待水印,有延迟立即触发,延迟低
结果输出延迟较高(等待延迟数据)较低(立即输出)
实时性相对较低相对较高

6. 选择建议

6.1 选择事件时间的场景

必须使用事件时间

  • 计费系统:需要准确的时间语义
  • 数据分析:需要准确的结果
  • 审计系统:需要可重复的结果
  • 对时间顺序有严格要求的场景

推荐使用事件时间

  • 实时监控:需要准确的时间语义
  • 日志分析:需要准确的分析结果
  • 数据质量要求高的场景

6.2 选择处理时间的场景

可以使用处理时间

  • 快速响应:对延迟敏感
  • 实时监控:对时间精度要求不高
  • 日志分析:快速分析,不需要准确时间
  • 对时间顺序要求不严格的场景

不能使用处理时间

  • 计费系统
  • 需要准确时间语义的场景
  • 需要可重复结果的场景

7. 实际测试对比

7.1 测试场景:相同数据,不同时间语义

测试步骤
  1. 准备相同的数据(事件时间版本):
{"user_id":"user1","click_time":"2025-11-07 10:00:05","url":"/home"}
{"user_id":"user2","click_time":"2025-11-07 10:00:10","url":"/about"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/products"}
  1. 发送到事件时间主题,观察结果

  2. 发送到处理时间主题(去掉时间字段),观察结果

预期差异
  • 事件时间:窗口时间基于数据中的时间(10:00:xx
  • 处理时间:窗口时间基于系统时间(可能是 14:30:xx 或其他时间)

7.2 测试场景:乱序数据

测试步骤
  1. 发送乱序数据(事件时间):
{"user_id":"user1","click_time":"2025-11-07 10:00:10","url":"/first"}
{"user_id":"user1","click_time":"2025-11-07 10:00:30","url":"/third"}
{"user_id":"user1","click_time":"2025-11-07 10:00:20","url":"/second"}  -- 乱序
  1. 观察事件时间窗口:应该能正确处理乱序

  2. 发送相同数据(处理时间,去掉时间字段)

  3. 观察处理时间窗口:按到达顺序处理

预期差异
  • 事件时间:能正确处理乱序,B 会被分配到正确的窗口
  • 处理时间:按到达顺序处理,不考虑实际时间

8. 常见问题解答

Q1: 什么时候必须使用事件时间?

A: 当业务需要准确的时间语义时,必须使用事件时间。例如:

  • 计费系统:需要准确记录事件发生时间
  • 数据分析:需要准确的分析结果
  • 审计系统:需要可重复的结果

Q2: 处理时间可以用于生产环境吗?

A: 可以,但需要满足以下条件:

  • 业务对时间精度要求不高
  • 不需要准确的时间语义
  • 可以接受结果的不确定性

Q3: 如何选择水印延迟时间?

A: 根据业务场景和数据延迟情况:

  • 观察数据延迟分布
  • 设置为最大延迟时间的 1.5-2 倍
  • 太小:可能丢失延迟数据
  • 太大:窗口触发延迟高

Q4: 事件时间和处理时间可以混用吗?

A: 可以,但需要注意:

  • 不同时间语义的窗口不能直接关联
  • 需要明确每个窗口使用的时间语义
  • 建议统一使用一种时间语义

Q5: 如何测试窗口是否正确工作?

A: 测试方法:

  1. 发送已知的数据
  2. 观察窗口输出
  3. 验证结果是否符合预期
  4. 测试乱序数据(事件时间)
  5. 测试延迟数据(事件时间)

9. 总结

9.1 核心区别

维度事件时间处理时间
时间来源数据中的时间戳系统当前时间
水印需要不需要
确定性确定不确定
乱序处理支持不支持
性能较低较高
延迟较高较低

9.2 选择原则

  1. 需要准确时间语义 → 使用事件时间
  2. 需要快速响应 → 考虑处理时间
  3. 需要可重复结果 → 使用事件时间
  4. 对时间不敏感 → 可以使用处理时间

9.3 最佳实践

  1. 优先使用事件时间:除非有特殊需求,否则优先使用事件时间
  2. 合理设置水印:根据数据延迟情况设置合适的水印
  3. 测试验证:通过实际测试验证窗口行为
  4. 监控指标:监控窗口延迟、水印延迟等指标

10. 参考资源

  • Flink 1.20 窗口函数官方文档
  • Flink 1.20 时间属性文档
  • Flink 1.20 Kafka 连接器文档
http://www.dtcms.com/a/613642.html

相关文章:

  • Java基础 | SpringBoot实现自启动的方式
  • 【ZeroRange WebRTC】UDP无序传输与丢包检测机制深度分析
  • 零基础建设网站视频教程抚州的电子商务网站建设公司
  • qt显示类控件--- Label
  • 【深度学习】基于Faster R-CNN与HRNet的豆类品种识别与分类系统
  • 专业建设网站公司东莞阿里巴巴代运营
  • 【深度学习】YOLOv10n-MAN-Faster实现包装盒flap状态识别与分类,提高生产效率
  • 网站备案需要费用吗中国容桂品牌网站建设
  • 知识图谱与中医古籍的数智化融合:中医药多智能体大模型系统的未来展望
  • wordpress全站cdn法人变更在哪个网站做公示
  • 鸿蒙开发TypeScript第六课:对象
  • 【Linux日新月异(四)】CentOS 7进程管理深度指南:掌控系统生命线
  • 如何避免新手对 instanceof 的误解?
  • 每周AI看 | OpenAI发布GPT-5.1、网易云商自研内部知识问答Agent、商汤开源空间智能大模型
  • 移动端部署噩梦终结者:动态稀疏视觉Transformer的量化实战
  • 【LeetCode刷题】找到字符串中所有字母异位词
  • 榆林城乡建设规划官方网站中国室内设计师
  • oneinstack wordpress成都官网seo服务
  • Go语言编译 | 探讨Go语言编译原理与优化技巧
  • 【深入理解】动静态库的制作、使用与加载原理(附详细操作指南)
  • OpenFeign:完整学习笔记
  • Vue 3 的Suspense组件:讲解如何使用_Suspense_处理异步组件加载状态
  • 【go.sixue.work】2.2 面向对象:接口与多态
  • 建设网站需要收费吗做淘客找单子的网站
  • 视频号直播视频录制
  • 抓取资源的网站怎么做珠海网站设计培训班
  • CPO(Co-Packaged Optics) 是整个数据中心互连范式的下一代核心
  • 1.5 ShaderFeature
  • 暄桐教练日课·10天《梦瑛篆书千字文》报名啦~
  • 从代码规范到 AI Agent:现代前端开发的智能化演进