当前位置: 首页 > news >正文

Flink SQL 查询 核心概念与实战指南

参考官网,Flink 2.1

https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/sql/queries/overview/

查询的核心概念和使用方法

核心概念

  1. ​查询执行方式​

    • 通过 TableEnvironment.sqlQuery()执行 SELECT/VALUES 语句,返回 Table 对象

    • 支持 SQL 与 Table API 混合使用,统一优化翻译为单一程序

  2. ​表注册机制​

    • 查询前需注册表(TableSource/Table/CREATE TABLE/DataStream)

    • Table.toString()自动生成唯一表名便于内联查询

    • 支持 Catalog 管理数据源位置

查询示例

// 内联未注册表
Table result = tableEnv.sqlQuery("SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");// 查询已注册表
tableEnv.createTemporaryView("Orders", ds);
Table result2 = tableEnv.sqlQuery("SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");// 结果写入 TableSink
tableEnv.executeSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

结果收集

  • ​执行方法​​:TableEnvironment.executeSql()Table.execute()

  • ​结果获取​​:

    • TableResult.collect():返回可关闭的行迭代器(需主动释放资源)

    • TableResult.print():输出到控制台

  • ​注意事项​​:

    • 结果数据只能访问一次

    • 流处理场景下精确一次(exactly-once)或至少一次(at-least-once)的语义保证取决于检查点配置

语法特性

  1. ​标识符规则​​:

    • 保留大小写且区分大小写

    • 反引号支持非字母数字字符(如 `my field`

  2. ​字符串字面量​​:

    • 单引号包裹,双单引号转义(It''s me

    • 支持 Unicode 字符(U&'\263A'

    • 提供 C 风格转义序列(如 \n, \u0061

支持的操作

文档列出了完整的 SQL 操作集,包括:

  • 聚合查询(分组聚合、窗口聚合、Over 聚合)

  • 连接查询(JOIN)

  • 集合操作

  • 排序限制(ORDER BY/LIMIT)

  • 高级特性(Top-N、去重、模式识别、时间旅行等)

​注意​​:使用不支持的 SQL 功能会抛出 TableException,具体支持范围需参考后续章节的批流处理特性对照表。

​Flink SQL Hints 

​1. 动态表选项(Dynamic Table Options)​
  • ​用途​​:动态覆盖表的配置选项(如数据源、连接参数等),适用于临时查询(如 SQL-CLI)。

  • ​语法​​:

    table_path /*+ OPTIONS(key=val [, key=val]*) */
  • ​示例​​:

    -- 覆盖 Kafka 源的启动模式
    SELECT id FROM kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- 覆盖 JOIN 中表的选项
    SELECT * FROM kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1JOIN kafka_table2 /*+ OPTIONS('sink.partitioner'='round-robin') */ t2ON t1.id = t2.id;

​2. 查询提示(Query Hints)​
  • ​用途​​:优化查询执行计划(当前仅支持 Join Hints)。

  • ​语法​​:

    SELECT /*+ hint [, hint ] */ ...
  • ​冲突解决​​:

    • ​键值冲突​​:最后指定的值生效(last-write-wins)。

    • ​列表冲突​​:第一个指定的提示生效(first-accept)。


​3. Join Hints​

支持 4 种 Join 策略提示:

  1. BROADCAST

    • ​适用场景​​:小表广播(无视 table.optimizer.join.broadcast-threshold)。

    • ​限制​​:仅等价连接(=),不支持全外连接。

    • ​示例​​:

      SELECT /*+ BROADCAST(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
  2. SHUFFLE_HASH

    • ​适用场景​​:中等规模表作为构建端(Build Side)。

    • ​限制​​:仅等价连接。

    • ​示例​​:

      SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
  3. SHUFFLE_MERGE

    • ​适用场景​​:大表排序合并(数据已有序时性能更佳)。

    • ​限制​​:仅等价连接。

    • ​示例​​:

      SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
  4. NEST_LOOP

    • ​适用场景​​:特殊需求(如非等价连接)。

    • ​示例​​:

      SELECT /*+ NEST_LOOP(t1) */ * FROM t1 JOIN t2 ON t1.id > t2.id;

​4. LOOKUP Hint(流处理专用)​
  • ​用途​​:优化维表关联(Lookup Join)行为。

  • ​核心功能​​:

    1. ​同步/异步模式​​:

      LOOKUP('table'='Customers', 'async'='true')
    2. ​异步参数配置​​(如超时、缓存容量):

      LOOKUP('table'='Customers', 'async'='true', 'timeout'='300s', 'capacity'='1000')
    3. ​重试策略​​(解决维表延迟更新问题):

      LOOKUP('table'='Customers', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3')
    4. ​自定义数据分布​​(优化缓存):

      LOOKUP('table'='Customers', 'shuffle'='true')

例子、

SELECT /*+ LOOKUP('table'='Customers','async'='true','output-mode'='allow_unordered',  -- 允许无序输出(提升性能)'capacity'='1000',               -- 异步队列容量'timeout'='180s'                 -- 超时时间
) */ * FROM Orders JOIN Customers ...;

​5. STATE_TTL Hint(流处理专用)​
  • ​用途​​:为有状态计算(如 Regular Join、Group Aggregation)指定算子级状态 TTL。

  • ​示例​​:

    -- Regular Join
    SELECT /*+ STATE_TTL('orders'='3d', 'lineitem'='1d') */ * 
    FROM orders LEFT JOIN lineitem ON orders.id = lineitem.id;-- Group Aggregation
    SELECT /*+ STATE_TTL('orders'='1d') */ user, SUM(amount) 
    FROM orders GROUP BY user;

​6. 冲突处理规则​
  • ​Join Hints 冲突​​:

    • 同策略冲突:选择第一个匹配的表。

    • 不同策略冲突:选择第一个匹配的提示。

  • ​STATE_TTL 冲突​​:

    • 重复键:取最后一次指定的值。

    • 多提示块:取第一次出现的值。


​关键注意事项​

  1. ​表必须存在​​:Join Hints 中指定的表需已注册。

  2. ​语法兼容性​​:采用 Oracle 风格的注释语法(/*+ HINT */)。

  3. ​流处理语义​​:LOOKUPSTATE_TTL是流处理特有功能。

  4. ​性能权衡​​:如 BROADCAST适合小表,SHUFFLE_MERGE适合大表。

通过合理使用 Hints,用户可以在 Flink SQL 中更精细地控制执行计划,优化查询性能。

​Flink 窗口表值函数(Windowing TVFs)

窗口表值函数是 Flink SQL 用于处理无限流数据的核心工具,将数据流划分为有限大小的"桶"进行计算。相比传统的分组窗口函数,TVF 更符合 SQL 标准且功能更强大。


​四种窗口类型​

​1. 滚动窗口(TUMBLE)​
  • ​特点​​:固定大小、不重叠的连续窗口

  • ​适用场景​​:固定时间段的统计(如每5分钟统计一次)

  • ​语法​​:

    TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset])
  • ​示例​​:

    SELECT * FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES);
    SELECT window_start, window_end, SUM(price) 
    FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
    GROUP BY window_start, window_end;
​2. 滑动窗口(HOP)​
  • ​特点​​:窗口可重叠,需指定窗口大小和滑动步长

  • ​适用场景​​:计算最近一段时间内的指标(如最近10分钟,每5分钟更新一次)

  • ​语法​​:

    HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset])
  • ​示例​​:

    SELECT * FROM HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES);
​3. 累积窗口(CUMULATE)​
  • ​特点​​:窗口逐步扩大直到最大尺寸,适合早期触发计算

  • ​适用场景​​:累计统计(如从00:00开始每分钟累计UV)

  • ​语法​​:

    CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size [, offset])
  • ​示例​​:

    SELECT * FROM CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES);
​4. 会话窗口(SESSION)​
  • ​特点​​:基于活动间隙的动态窗口,无数据时关闭

  • ​适用场景​​:用户行为会话分析

  • ​语法​​:

    SESSION(TABLE data [PARTITION BY(keycols)], DESCRIPTOR(timecol), gap)
  • ​示例​​:

    -- 带分区的会话窗口
    SELECT * FROM SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES);

​输出列说明​

所有窗口TVF都会在原始表基础上添加三列:

  • ​window_start​​:窗口开始时间(包含)

  • ​window_end​​:窗口结束时间(不包含)

  • ​window_time​​:窗口时间属性(= window_end - 1ms)


​高级特性​

​窗口偏移(Offset)​

  • ​作用​​:调整窗口对齐时间点

  • ​示例​​:

    -- 偏移1分钟,窗口从01分开始而不是00分
    SELECT * FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES);

​时间属性处理​

  • 流处理:支持事件时间和处理时间属性

  • 批处理:时间字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型


​应用场景扩展​

基于窗口TVF可构建更复杂的计算:

  • ​窗口聚合​​(Window Aggregation)

  • ​窗口TopN​​(Window TopN)

  • ​窗口关联​​(Window Join)

  • ​窗口去重​​(Window Deduplication)


​注意事项​

  1. ​会话窗口限制​​:

    • 批处理模式暂不支持

    • 性能调优功能有限

  2. ​语法兼容​​:支持命名参数和位置参数

  3. ​水位线影响​​:窗口偏移不影响水位线计算

通过合理选择窗口类型和参数,可以高效处理各种流式计算场景,从简单的固定窗口统计到复杂的会话分析都能很好支持。

​Flink SQL 模型推理(Model Inference)

Flink SQL 提供 ​ML_PREDICT表值函数(TVF)​​,允许直接在 SQL 查询中调用机器学习模型进行实时预测,支持流式数据处理。


基本语法​

SELECT * FROM ML_PREDICT(TABLE input_table,               -- 输入表(需包含特征列)MODEL model_name,                -- 已注册的模型名称DESCRIPTOR(feature_columns),    -- 指定特征列[CONFIG => MAP['key', 'value']]  -- 可选配置参数
);

关键参数说明​

参数

类型

描述

input_table

TABLE

输入数据表,必须包含模型所需的特征列。

model_name

STRING

已在 Catalog 中注册的模型名称。

feature_columns

DESCRIPTOR

指定输入表中哪些列作为模型的特征输入(需与模型输入维度匹配)。

config

MAP

可选配置项,支持异步模式、超时时间等。


配置选项(CONFIG)​

配置项

默认值

类型

描述

async

-

BOOLEAN

是否启用异步推理模式(true/false)。

max-concurrent-operations

-

INTEGER

异步推理的最大并发操作数。

output-mode

ORDERED

ENUM

异步输出模式:ORDERED(有序)或 ALLOW_UNORDERED(允许无序)。

timeout

-

DURATION

异步推理的超时时间(如 '100s')。


使用示例​

​基础用法(同步推理)​

SELECT * FROM ML_PREDICT(TABLE orders,                  -- 输入表MODEL fraud_detection_model,  -- 欺诈检测模型DESCRIPTOR(amount, user_risk_score)  -- 特征列
);

​异步推理配置​

SELECT * FROM ML_PREDICT(TABLE sensor_data,MODEL anomaly_detector,DESCRIPTOR(temperature, vibration),CONFIG => MAP['async', 'true','timeout', '30s','output-mode', 'ALLOW_UNORDERED']
);

​命名参数写法​

SELECT * FROM ML_PREDICT(INPUT => TABLE logs,MODEL => MODEL sentiment_analyzer,ARGS => DESCRIPTOR(text_column),CONFIG => MAP['async', 'false']
);

输出结果​

  • ​输出表结构​​:包含输入表的所有列 + 模型预测结果列。

  • ​列名冲突处理​​:若预测列名与输入列重复,自动添加索引(如 predictionprediction0)。


注意事项​

  1. ​模型注册​​:模型必须提前通过 CREATE MODEL注册到 Catalog。

  2. ​特征匹配​​:DESCRIPTOR指定的特征列必须与模型输入维度一致。

  3. ​异步支持​​:需模型实现 AsyncPredictRuntimeProvider接口。

  4. ​流式限制​​:仅支持 Append-Only 流表,不支持 CDC 变更日志表。

  5. ​错误场景​​:

    • 模型不存在 → 抛出异常。

    • 特征列数量不匹配 → 抛出异常。


性能优化建议​

  • ​高吞吐场景​​:优先使用异步模式(async=true)。

  • ​资源调优​​:合理设置 max-concurrent-operationstimeout

  • ​模型实现​​:性能依赖底层模型提供方(如 TensorFlow/PyTorch 适配器)。


相关操作​

  • ​模型管理​​:

    -- 创建模型
    CREATE MODEL my_model USING 'path/to/model';-- 修改模型
    ALTER MODEL my_model SET 'new_config';

适用场景​

  • ​实时风控​​:流式交易数据欺诈检测。

  • ​IoT 异常检测​​:传感器数据实时分析。

  • ​推荐系统​​:用户行为流式预测。

通过 ML_PREDICT,Flink SQL 实现了机器学习模型与流式数据的无缝集成,简化了实时 AI 应用的开发流程。

​Flink SQL 窗口聚合(Window Aggregation)


窗口表值函数聚合(Window TVF Aggregation)​

​核心特性​

  • ​语法结构​​:在 GROUP BY子句中包含窗口的 window_startwindow_end

  • ​输出结果​​:每个窗口输出一条最终聚合结果(不输出中间结果)

  • ​状态管理​​:窗口结束后自动清理中间状态

​基本语法​

SELECT window_start, window_end, aggregate_function(column)
FROM TABLE(窗口函数(...))
GROUP BY window_start, window_end, [其他分组键]

四种窗口类型示例​

​1. 滚动窗口(TUMBLE)​

-- 10分钟滚动窗口
SELECT window_start, window_end, SUM(price) AS total_price
FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
GROUP BY window_start, window_end;

​结果​​:每个10分钟区间输出一个聚合值,窗口不重叠

​2. 滑动窗口(HOP)​

-- 10分钟窗口,每5分钟滑动一次
SELECT window_start, window_end, SUM(price) AS total_price
FROM HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
GROUP BY window_start, window_end;

​结果​​:窗口重叠,数据可能属于多个窗口

​3. 累积窗口(CUMULATE)​

-- 最大10分钟,每2分钟扩展一次
SELECT window_start, window_end, SUM(price) AS total_price
FROM CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
GROUP BY window_start, window_end;

​结果​​:窗口逐步扩大,适合早期结果预览

​4. 会话窗口(SESSION)​

-- 基于2分钟不活动间隙的会话窗口
SELECT window_start, window_end, SUM(price) AS total_price
FROM SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES)
GROUP BY window_start, window_end;

​特点​​:窗口大小动态,根据数据活跃度决定


高级分组功能​

​GROUPING SETS​

SELECT window_start, window_end, supplier_id, SUM(price) AS total_price
FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());

​效果​​:同时输出按供应商分组和总聚合结果

​ROLLUP(层级聚合)​

-- 等价于 GROUPING SETS ((supplier_id, item), (supplier_id), ())
SELECT window_start, window_end, supplier_id, item, SUM(price)
FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
GROUP BY window_start, window_end, ROLLUP (supplier_id, item);

​CUBE(所有组合聚合)​

-- 所有可能的维度组合
SELECT window_start, window_end, supplier_id, item, SUM(price)
FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
GROUP BY window_start, window_end, CUBE (supplier_id, item);

GROUPING SETS、ROLLUP 和 CUBE 详解​

在标准 SQL 中,GROUP BY只能按固定维度分组。而 ​​GROUPING SETS​​、​​ROLLUP​​ 和 ​​CUBE​​ 是更灵活的分组方式,允许在​​一次查询中按多种维度组合​​计算聚合结果。

在 Flink 窗口聚合中,这些语法需要与窗口的 window_startwindow_end配合使用。


GROUPING SETS​

​功能​

  • 同时按​​多个维度组合​​分组计算,相当于多个 GROUP BY的联合。

  • ​空分组 ()​ 表示全局聚合(不按任何维度分组)。

​语法规则​

  • window_startwindow_end​必须​​出现在 GROUP BY子句中。

  • 其他分组维度放在 GROUPING SETS中。

​示例​

SELECT window_start, window_end, supplier_id, SUM(price) AS total_price
FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());

​输出​​:

window_start

window_end

supplier_id

total_price

2020-04-15 08:00

2020-04-15 08:10

NULL

11.00

2020-04-15 08:00

2020-04-15 08:10

supplier1

6.00

2020-04-15 08:00

2020-04-15 08:10

supplier2

5.00

2020-04-15 08:10

2020-04-15 08:20

NULL

10.00

...

...

...

...

​关键点​​:

  • (supplier_id)()是两个独立的分组维度。

  • 未参与当前分组的列(如全局聚合时的 supplier_id)显示为 NULL


ROLLUP​

​功能​

  • 生成​​层级聚合​​(Hierarchical Aggregation),按维度逐步上卷。

  • 等价于 GROUPING SETS中所有​​前缀组合​​ + 全局聚合。

​语法规则​

  • window_startwindow_end​必须​​在 GROUP BY中。

  • 其他维度放在 ROLLUP中。

​示例​

-- 按 (supplier_id, item) 和 (supplier_id) 和 () 分组
SELECT window_start, window_end, supplier_id, item,SUM(price) AS total_price
FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
GROUP BY window_start, window_end, ROLLUP (supplier_id, item);

​等价于​​:

GROUP BY window_start, window_end, GROUPING SETS ((supplier_id, item),  -- 最细粒度(supplier_id),         -- 上卷一层()                     -- 全局聚合
)

CUBE​

​功能​

  • 生成​​全组合聚合​​,计算所有可能的维度子集。

  • 等价于 GROUPING SETS中所有可能的​​非空子集​​ + 全局聚合。

​语法规则​

  • window_startwindow_end​必须​​在 GROUP BY中。

  • 其他维度放在 CUBE中。

​示例​

-- 按 (supplier_id, item)、(supplier_id)、(item)、() 分组
SELECT window_start, window_end, supplier_id, item,SUM(price) AS total_price
FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
GROUP BY window_start, window_end, CUBE (supplier_id, item);

​等价于​​:

GROUP BY window_start, window_end, GROUPING SETS ((supplier_id, item),  -- 两列组合(supplier_id),         -- 仅 supplier_id(item),                -- 仅 item()                     -- 全局聚合
)

三者的对比​

语法

分组组合规则

示例输入维度

等效 GROUPING SETS

​GROUPING SETS​

手动指定任意组合

(a), (b)

GROUPING SETS ((a), (b))

​ROLLUP​

按维度顺序逐步上卷(前缀组合)

(a, b)

GROUPING SETS ((a,b), (a), ())

​CUBE​

所有可能的非空子集(幂集)

(a, b)

GROUPING SETS ((a,b), (a), (b), ())



实际应用场景​

​场景 1:多维分析仪表盘​
-- 同时展示按供应商、商品、以及全局的销售额
SELECT window_start,window_end,supplier_id,item,SUM(price) AS revenue
FROM TUMBLE(TABLE Orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
GROUP BY window_start, window_end, CUBE (supplier_id, item);
​场景 2:层级报表​
-- 生成国家 → 省份 → 城市的层级销售汇总
SELECT window_start,window_end,country,province,city,SUM(amount) AS total
FROM HOP(TABLE Sales, DESCRIPTOR(event_time), INTERVAL '1' DAY, INTERVAL '7' DAY)
GROUP BY window_start, window_end, ROLLUP (country, province, city);

注意事项​
  1. ​性能开销​​:CUBE的计算复杂度为 2n(n 是维度数),需谨慎使用。

  2. ​NULL 值处理​​:分组列显示为 NULL时,需区分是原始数据为 NULL 还是因聚合产生的 NULL。

  3. ​流批一体​​:语法在流式和批处理模式下一致,但流式模式下需注意状态大小。

通过灵活使用这些分组语法,可以在单次查询中实现复杂的多维分析需求,避免多次查询的冗余计算。

级联窗口聚合​

​关键概念​

  • window_time:窗口的时间属性,可用于后续时间相关操作

  • 级联聚合:将第一个窗口的结果作为第二个窗口的输入

​示例:5分钟 → 10分钟级联聚合​

-- 第一层:5分钟滚动窗口
CREATE VIEW window1 AS
SELECT window_start AS window_5mintumble_start, window_end AS window_5mintumble_end, window_time AS rowtime, SUM(price) AS partial_price
FROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)
GROUP BY supplier_id, window_start, window_end, window_time;-- 第二层:10分钟滚动窗口
SELECT window_start, window_end, SUM(partial_price) AS total_price
FROM TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)
GROUP BY window_start, window_end;

传统分组窗口聚合(已弃用)​

​⚠️ 注意:推荐使用 Window TVF​

传统方式使用函数如 TUMBLE(event_time, interval),存在以下限制:

  • 性能优化有限

  • 不支持标准 GROUPING SETS语法

  • 无法应用 Window TopN 等高级操作

​传统语法示例​

SELECT user,TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,SUM(amount) 
FROM Orders
GROUP BY TUMBLE(order_time, INTERVAL '1' DAY), user;

重要注意事项​

​时间属性要求​

  • ​流处理​​:必须使用事件时间或处理时间属性

  • ​批处理​​:必须为 TIMESTAMPTIMESTAMP_LTZ类型

  • ​会话窗口​​:目前批处理模式不支持

​列名冲突处理​

当预测列名与输入列重复时,自动添加索引(如 predictionprediction0

​状态清理​

窗口聚合会自动清理过期窗口的状态,避免状态无限增长


适用场景对比​

窗口类型

特点

适用场景

​TUMBLE​

固定大小,不重叠

固定时间段的统计(如每5分钟统计)

​HOP​

可重叠的滑动窗口

最近一段时间内的移动统计

​CUMULATE​

逐步扩大的窗口

累计统计,早期结果预览

​SESSION​

基于活动间隙的动态窗口

用户会话分析

​推荐优先使用 Window TVF 聚合​​,功能更强大且符合 SQL 标准。

​Flink SQL 分组聚合(Group Aggregation)

​核心功能​

  • ​聚合函数​​:从多行输入计算单个结果(如 COUNT、SUM、AVG、MAX、MIN)

  • ​分组聚合​​:通过 GROUP BY按指定维度分组计算

  • ​流批一体​​:支持批处理和流处理模式

流处理特性​​

​持续查询模式​

  • ​流式聚合​​:结果表随输入数据更新而​​持续更新​

  • ​状态管理​​:维护中间状态用于增量计算

  • ​内存考虑​​:状态可能无限增长,需配置 TTL

​状态大小影响因素​

聚合类型

状态开销

说明

COUNT

仅需计数状态

SUM/AVG

中等

需维护累加值

MIN/MAX

需维护所有值比较

DISTINCT

需维护去重集合


HAVING 子句​

​功能​

  • ​分组后过滤​​:对 GROUP BY产生的分组结果进行条件筛选

  • ​与 WHERE 区别​​:

    • WHERE:在分组前过滤​​单行数据​

    • HAVING:在分组后过滤​​分组结果​

​语法​

-- 过滤聚合结果
SELECT user_id, SUM(amount) AS total
FROM Orders
GROUP BY user_id
HAVING SUM(amount) > 50;  -- 只显示总额大于50的分组

​特殊情况​

-- 无 GROUP BY 的 HAVING:将所有数据视为一个分组
SELECT SUM(amount) 
FROM Orders
HAVING SUM(amount) > 100;
  • 条件为真:输出一行结果

  • 条件为假:无输出


流处理优化策略​

​状态 TTL 配置​

-- 设置状态存活时间,防止无限增长
SET 'table.exec.state.ttl' = '1h';SELECT user_id, COUNT(*) 
FROM Orders 
GROUP BY user_id;

​权衡​​:TTL 过期可能导致​​结果不准确​​,但能控制状态大小。

​性能调优建议​

  1. ​避免高基数分组​​:分组键唯一值过多会导致状态膨胀

  2. ​合理使用 DISTINCT​​:仅在必要时使用去重聚合

  3. ​选择合适聚合函数​​:优先使用状态开销小的函数(如 COUNT > SUM > MAX/MIN)


适用场景对比​

场景

推荐语法

说明

​简单分组统计​

GROUP BY col

基础分组聚合

​多维度分析​

GROUPING SETS

同时多维度查看

​层级报表​

ROLLUP

从详细到汇总的层级结构

​全维度交叉分析​

CUBE

所有组合的全面分析

​结果筛选​

HAVING

过滤聚合结果


注意事项​

​流处理限制​

  • ​状态管理​​:需关注内存使用和 TTL 配置

  • ​正确性权衡​​:TTL 可能导致近似结果而非精确结果

  • ​性能监控​​:高基数分组键可能影响系统稳定性

​语法要求​

  • HAVING中只能引用分组列或聚合函数

  • GROUPING SETS等高级分组需确保列名引用明确

通过合理运用这些分组聚合功能,可以在 Flink SQL 中实现复杂的数据分析需求,同时兼顾流处理的实时性和资源效率。

​Flink SQL 窗口聚合(Over Aggregation)

​与 GROUP BY 聚合的区别​

特性

GROUP BY 聚合

OVER 聚合

​输出行数​

每组一行

每输入一行输出一行

​结果缩减​

减少行数

保持原行数,增加聚合列

​适用场景​

分组统计

滑动窗口计算、排名、累计值

​基本语法结构​

SELECTagg_func(column) OVER ([PARTITION BY partition_col]ORDER BY order_colRANGE|ROWS BETWEEN lower_bound AND CURRENT ROW),...
FROM table_name

OVER 窗口三要素​

​1. PARTITION BY(可选)​

  • ​功能​​:按指定列分区,在每个分区内独立计算

  • ​示例​​:PARTITION BY product→ 每个产品单独计算聚合

​2. ORDER BY(必须)​

  • ​功能​​:定义行的排序顺序,决定窗口范围

  • ​限制​​:流处理中必须按​​时间属性​​或​​非时间属性升序​​排序

​3. 窗口范围定义(必须)​

支持两种定义方式:

​RANGE 间隔(基于值)​
-- 时间范围:当前行前1小时内的所有行
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW-- 无界范围:从分区开始到当前行
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
​ROWS 间隔(基于行数)​
-- 行数范围:前10行到当前行(共11行)
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW-- 无界行数:分区开始到当前行
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

实际应用示例​

​场景1:实时累计销售额​

SELECT order_id,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_product_sum
FROM Orders;

​输出结果​​:

order_id

order_time

amount

one_hour_product_sum

001

08:00

100

100

002

08:30

200

300

003

09:05

150

350

​场景2:移动平均计算​

SELECT order_id,order_time,amount,AVG(amount) OVER (ORDER BY order_timeROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_avg
FROM Orders;

​输出结果​​:

order_id

order_time

amount

moving_avg

001

08:00

100

100.0

002

08:30

200

150.0

003

09:00

150

150.0

004

09:30

250

200.0


WINDOW 子句(语法糖)​

​功能​

  • 定义可重用的窗口规范

  • 提高查询可读性

  • 支持多个聚合函数共享同一窗口

​示例​

SELECT order_id,order_time,amount,SUM(amount) OVER w AS total_amount,AVG(amount) OVER w AS avg_amount,COUNT(*) OVER w AS order_count
FROM Orders
WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
);

流处理特殊考虑​

​排序限制​

  • ​必须升序​​:ORDER BY列必须按升序排列

  • ​时间属性优先​​:推荐使用事件时间或处理时间属性

​窗口一致性​

-- ✅ 正确:所有OVER窗口使用相同定义
SELECT SUM(amount) OVER w,AVG(amount) OVER w  -- 使用相同的窗口w
FROM Orders
WINDOW w AS (...)-- ❌ 错误:流处理中不支持不同窗口定义
SELECT SUM(amount) OVER (ORDER BY time ROWS 5 PRECEDING),AVG(amount) OVER (ORDER BY time RANGE INTERVAL '1' HOUR PRECEDING)
FROM Orders

​状态管理​

  • ​无界窗口​​:UNBOUNDED PRECEDING可能导致状态无限增长

  • ​有界窗口​​:基于时间或行数的窗口状态可控


RANGE vs ROWS 对比​

特性

RANGE 间隔

ROWS 间隔

​依据​

基于ORDER BY列的值

基于物理行数

​时间场景​

适合时间滑动窗口

适合固定行数窗口

​结果确定性​

可能包含变长行数

固定行数

​性能​

需要维护值范围

需要维护行缓冲区

​选择建议​

  • ​时间序列分析​​:优先使用 RANGE+ 时间间隔

  • ​固定窗口统计​​:使用 ROWS+ 行数间隔

  • ​累计计算​​:使用 UNBOUNDED PRECEDING


适用场景​

​典型用例​

  1. ​移动平均/求和​​:股票价格、传感器数据平滑

  2. ​排名计算​​:销售额排名、访问量TopN

  3. ​累计统计​​:月度累计、年度累计

  4. ​差值计算​​:与前一行的差异比较

​性能优化建议​

  • ORDER BY列创建索引(批处理)

  • 合理设置窗口范围,避免无界窗口

  • 使用 WINDOW子句重用窗口定义

OVER 聚合为实时数据分析提供了强大的滑动窗口计算能力,特别适合需要保留原始行同时进行窗口计算的场景。

​Flink SQL 连接操作(Joins)


连接类型概览​

连接类型

特点

适用场景

状态管理

​Regular Join​

全量连接,任意更新可见

通用场景

需永久保留双流状态

​Interval Join​

时间范围限制的连接

订单-发货等时效关联

按时间清理旧状态

​Temporal Join​

版本表关联(时间点匹配)

汇率转换/维度表关联

按需保留版本

​Lookup Join​

外部系统实时查询

维表关联(如JDBC)

无状态

​Table Function Join​

自定义函数扩展连接

复杂数据处理

依赖UDF实现


Regular Joins(常规连接)​

​核心特性​

  • ​双流更新​​:任意一侧的INSERT/UPDATE/DELETE都会触发结果更新

  • ​状态保留​​:需永久保存左右表所有数据(可能无限增长)

  • ​语法限制​​:仅支持等值连接(=),不支持Theta Join或Cross Join

​示例​

-- INNER JOIN(仅匹配记录)
SELECT * 
FROM Orders 
JOIN Products ON Orders.product_id = Products.id;-- LEFT JOIN(保留左表全部记录)
SELECT * 
FROM Orders 
LEFT JOIN Products ON Orders.product_id = Products.id;

​流处理注意事项​

  • ​状态TTL​​:需配置 table.exec.state.ttl避免状态无限增长

  • ​性能风险​​:高基数连接键(如用户ID)易导致状态膨胀


Interval Joins(时间区间连接)​

​核心特性​

  • ​时间窗口约束​​:仅连接特定时间范围内的记录

  • ​状态清理​​:超时数据自动清除(基于事件时间)

  • ​语法要求​​:必须包含等值条件+时间范围谓词

​示例​

-- 订单创建后1小时内发货的记录
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '1' HOUR AND s.ship_time;

​时间谓词形式​

-- 精确匹配
ltime = rtime  -- 时间范围(推荐)
ltime BETWEEN rtime - INTERVAL '10' MINUTE AND rtime + INTERVAL '5' MINUTE-- 单边范围
ltime >= rtime AND ltime < rtime + INTERVAL '1' DAY

Temporal Joins(时态表连接)​

​1. Event Time Temporal Join​

​特点​

  • ​版本回溯​​:根据左表事件时间关联右表对应版本

  • ​语法​​:使用 FOR SYSTEM_TIME AS OF指定时间点

​示例(汇率转换)​

SELECT order_id, price, currency,conversion_rate
FROM Orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF Orders.order_time
ON Orders.currency = currency_rates.currency;

​要求​

  • 右表需定义主键和水位线

  • 连接条件必须包含主键

​2. Processing Time Temporal Join​

​特点​

  • ​最新版本​​:总是关联右表当前最新版本

  • ​实现方式​​:通过Lookup Join或Temporal Function

​示例​

-- 方式1:Lookup Join(JDBC维表)
SELECT o.*, c.country
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;-- 方式2:Temporal Function
SELECT o.amount, r.rate
FROM Orders o, 
LATERAL TABLE (Rates(o.proc_time)) r
WHERE o.currency = r.currency;

​时态表连接(Temporal Join)详解​


时态表连接是一种​​基于时间版本的关联操作​​,允许将动态表(如订单流)与​​随时间变化的版本表​​(如汇率表、用户信息表)关联,且能精确匹配到某个时间点的版本。

​类比现实场景​​:

假设你有一本随时间更新的汇率手册(每天版本不同),时态表连接就是让你在查某笔外币交易时,​​自动翻到交易发生当天的汇率页​​,而不是用最新汇率计算。


为什么需要时态表连接?​

​问题场景​

  • ​动态维度表​​:如汇率、商品价格、用户档案等会随时间变化。

  • ​错误关联​​:若直接用 Regular Join,会用最新版本数据关联历史数据,导致计算结果失真。

​示例​​:

订单时间

订单金额(EUR)

实时汇率(EUR→USD)

正确汇率(历史版本)

2023-01-01

100

1.20(最新)

1.10(当日实际)

2023-01-02

200

1.20

1.15(当日实际)

若用最新汇率1.2计算历史订单,会导致金额错误!

​解决方案​

时态表连接能根据左表(订单)的​​事件时间​​,从右表(汇率)中找到​​对应时间点的版本​​,确保计算结果准确。


 时态表连接的两种类型​

​(1) Event Time Temporal Join(事件时间时态连接)​
  • ​匹配逻辑​​:用左表的​​事件时间​​关联右表的历史版本。

  • ​语法​​:FOR SYSTEM_TIME AS OF left_table.event_time

  • ​要求​​:

    • 右表必须是​​版本表​​(如通过CDC捕获变更的数据库表)

    • 右表需定义​​主键​​和​​事件时间属性​​(Watermark)

​示例​​:订单按下单时间匹配对应汇率

SELECT order_id, price, currency,conversion_rate
FROM Orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF Orders.order_time
ON Orders.currency = currency_rates.currency;
​(2) Processing Time Temporal Join(处理时间时态连接)​
  • ​匹配逻辑​​:总是用右表的​​最新版本​​关联左表。

  • ​适用场景​​:关联静态维表(如用户档案),或可接受轻微延迟的实时数据。

  • ​实现方式​​:通过 Lookup JoinTemporal Table Function

​示例​​:关联最新用户信息

-- 方式1:Lookup Join
SELECT o.*, c.country
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;-- 方式2:Temporal Function
SELECT o.amount, r.rate
FROM Orders o, 
LATERAL TABLE (Rates(o.proc_time)) r
WHERE o.currency = r.currency;

技术实现原理​

​版本表存储结构​

currency_rates 表:
| currency | rate | update_time          |
|----------|------|---------------------|
| EUR      | 1.10 | 2023-01-01 00:00:00 |
| EUR      | 1.15 | 2023-01-02 00:00:00 |
| USD      | 1.00 | 2023-01-01 00:00:00 |

​执行步骤​

  1. ​水位线触发​​:当左表(订单)的 order_time水位线推进到 2023-01-01

  2. ​版本匹配​​:右表(汇率)返回 update_time <= 2023-01-01的最新版本(EUR=1.10);

  3. ​状态清理​​:早于水位线的旧版本(如1.10)可被安全删除。


典型应用场景​
  1. ​金融计算​

    • 订单金额按交易时的汇率转换

    • 股票交易按历史价格结算

  2. ​合规审计​

    • 查询用户操作时的权限配置(而非当前配置)

  3. ​实时数据分析​

    • 广告点击事件关联当时的广告计划版本


注意事项​
  • ​版本表要求​​:右表必须能提供历史版本(如通过CDC、数据库日志)

  • ​主键约束​​:连接条件必须包含右表主键(如 currency_rates.currency

  • ​水位线设置​​:需正确定义事件时间和水位线延迟

  • ​状态大小​​:版本表需控制历史版本保留周期


对比其他连接类型​

连接类型

状态开销

时间精度

适用场景

​Regular Join​

高(永久保留双流)

通用场景

​Interval Join​

中(按窗口保留)

事件时间范围

时效性关联(如订单-发货)

​Temporal Join​

低(按需保留版本)

精确时间点

版本化维度关联

时态表连接是处理​​时间敏感型维度关联​​的最佳选择,既能保证计算准确性,又能有效控制状态大小。

Lookup Join(查找连接)​

​核心特性​

  • ​外部系统查询​​:实时查询维表数据(如MySQL、HBase)

  • ​无状态​​:不维护历史状态

  • ​处理时间​​:基于 proc_time关联最新数据

​示例​

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (id INT,name STRING,country STRING,zip STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://mysqlhost:3306/customerdb','table-name' = 'customers'
);-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS oJOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id = c.id;

​连接器支持​

  • JDBC、HBase、Hive、Redis等支持Lookup的Connector

LATERAL TABLE (Rates(o_proctime))语法详解​

​1. 语法结构​

SELECT o.amount, r.rate
FROM Orders o, 
LATERAL TABLE (Rates(o.proc_time)) r  -- 关键语法
WHERE o.currency = r.currency;
  • LATERAL TABLE​:表示对左表(Orders)的每一行,调用右侧的​​表函数​​(Rates)。

  • Rates(o.proc_time)​:是一个​​表函数(Table Function)​​,接收左表的处理时间(proc_time)作为参数,返回对应时刻的汇率数据。

  • r​:是表函数返回结果的别名,后续可引用其列(如 r.rate)。

​2. 表函数(Table Function)的作用​

  • ​动态生成表​​:根据输入参数(如时间)实时计算或查询数据。

  • ​示例场景​​:

    • Rates(proc_time)可能是一个连接到外部数据库的函数,根据 proc_time返回当时的汇率快照。

    • 也可以是自定义的UDF,如解析JSON数组并展开为多行。

​3. 为什么需要 LATERAL?​

  • ​关联依赖​​:表函数的输入参数(如 o.proc_time)​​依赖左表的当前行​​(Orders o)。

  • ​执行逻辑​​:

    1. 遍历左表 Orders的每一行。

    2. 对每一行,调用 Rates(o.proc_time)获取对应的汇率数据。

    3. 将左表行与表函数返回的行按 WHERE条件关联。

​4. 对比普通 JOIN​

语法

行为

FROM A, B

笛卡尔积:A的每一行与B的所有行组合

FROM A, LATERAL TABLE(func(A.col)) B

A的每一行仅与 func(A.col)返回的行组合


为什么 Lookup Join 必须是处理时间(Processing Time)?​

​1. 设计初衷​

Lookup Join 的核心目标是:​​用外部系统的当前最新数据实时补充流数据​​。

  • ​典型场景​​:订单流关联用户维表(如MySQL),获取用户最新信息。

  • ​关键需求​​:流数据到达时,立即查询外部系统的最新值,无需关心历史版本。

​2. 处理时间的必然性​
  • ​事件时间(Event Time)的问题​​:

    • 若按事件时间关联,需查询维表的历史版本(如用户过去某时刻的档案)。

    • 但大多数外部系统(如MySQL、Redis)​​不存储历史版本​​,只能查最新数据。

  • ​处理时间的优势​​:

    • 直接关联当前最新数据,无需维表支持历史回溯。

    • 实现简单,性能高(无状态管理)。

​3. 语义示例​
-- 用处理时间关联最新用户信息
SELECT o.order_id, c.name
FROM Orders o 
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
  • ​行为​​:当订单到达时,立刻查询 Customers表的​​当前最新数据​​。

  • ​结果​​:即使用户后来改了名字,订单关联的仍是处理时刻的名字。

​4. 与 Temporal Join 的对比​

特性

Lookup Join(处理时间)

Temporal Join(事件时间)

​关联逻辑​

总是用维表最新数据

按事件时间匹配历史版本

​维表要求​

只需最新数据(如MySQL)

需存储历史版本(如CDC日志)

​状态管理​

无状态

需维护版本历史状态

​适用场景​

实时数据补充(如用户档案)

精确时间点计算(如汇率转换)


​Temporal Table Function 定义

通过 ​​Flink Table API​​ 动态创建一个时态表函数(Temporal Table Function),其核心逻辑如下:

// 从已注册的表 "currency_rates" 创建时态表函数
TemporalTableFunction ratesFunc = tEnv.from("currency_rates")                      // 指定数据源表.createTemporalTableFunction("update_time",   // 版本时间字段"currency");      // 主键字段// 注册为临时函数,命名为 "rates"
tEnv.createTemporarySystemFunction("rates", ratesFunc);

关键参数解析​

参数

作用

示例值

必须性

"currency_rates"

数据源表名

需提前用DDL注册

必填

"update_time"

版本时间字段

必须是事件时间或处理时间

必填

"currency"

主键字段

用于关联时定位版本

必填


底层实现逻辑​

当调用 createTemporalTableFunction时,Flink会:

  1. ​检查表结构​​:确认 currency_rates表存在,且包含 update_timecurrency字段。

  2. ​构建版本管理器​​:

    • update_time排序数据

    • 为每个主键值(currency)维护一个​​版本链​

      (如 EUR → [(1.10, 10:00), (1.15, 11:00)]

  3. ​注册查询接口​​:

    • 函数 rates(order_time)会:

      • 根据 order_time查找对应版本

      • 返回满足 update_time ≤ order_time的最新数据


完整定义示例​
​步骤1:定义数据源表(DDL)​
-- 汇率表(Kafka流)
CREATE TABLE currency_rates (currency STRING,rate DECIMAL(10, 2),update_time TIMESTAMP(3),WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,PRIMARY KEY (currency) NOT ENFORCED
) WITH ('connector' = 'kafka','topic' = 'rates','format' = 'json'
);-- 订单表
CREATE TABLE orders (order_id STRING,amount DECIMAL(10, 2),currency STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
​步骤2:Java代码注册函数​
// 初始化TableEnvironment
StreamTableEnvironment tEnv = ...; // 注册时态表函数
TemporalTableFunction ratesFunc = tEnv.from("currency_rates").createTemporalTableFunction("update_time", "currency");
tEnv.createTemporarySystemFunction("rates", ratesFunc);
​步骤3:SQL查询使用​
SELECT o.order_id,o.amount,o.currency,r.rate AS historical_rate
FROM orders o,LATERAL TABLE(rates(o.order_time)) r  -- 调用时态函数
WHERE o.currency = r.currency;

运行时行为​
​数据流处理过程
currency_rates流:
│ Euro │ 1.10 │ 10:00 │  ← Version 1
│ USD  │ 1.00 │ 10:00 │
│ Euro │ 1.15 │ 11:00 │  ← Version 2orders流:
│ Order1 │ 100 │ Euro │ 10:30 │ → 关联 Version 1 (1.10)
│ Order2 │ 200 │ Euro │ 11:30 │ → 关联 Version 2 (1.15)
​状态管理​
  • Flink会为每个主键(currency)维护一个​​版本历史​​(按update_time排序)

  • 通过水位线(Watermark)机制清理过期版本

    (如 WATERMARK FOR update_time AS update_time - INTERVAL '1' HOUR


常见问题解答​

​Q1: 为什么需要代码注册?不能纯SQL实现吗?​

  • Flink将时态表函数视为​​动态函数​​而非静态表,需通过API明确版本管理逻辑。

​Q2: 主键字段的作用是什么?​

  • 主键(如 currency)用于定位数据版本。例如:

    EUR的汇率在 10:0011:00有两个版本,需按主键区分。

​Q3: 如何保证版本查询性能?​

  • Flink内部对主键建立索引,版本链按时间排序,查询复杂度为 O(log n)。


高级配置​
​状态后端优化​
// 设置状态后端(如RocksDB)
tEnv.getConfig().getConfiguration().setString("state.backend", "rocksdb"
);
​空闲状态清理​
-- 设置状态TTL(避免内存泄漏)
SET 'table.exec.state.ttl' = '1h';


Table Function Join(表函数连接)​

​核心特性​

  • ​UDF扩展​​:通过自定义函数实现复杂连接逻辑

  • ​LATERAL TABLE​​ 语法:类似PostgreSQL的LATERAL JOIN

​示例​

-- INNER JOIN(空结果丢弃左表行)
SELECT order_id, res
FROM Orders,
LATERAL TABLE(MyTableFunc(order_id)) t(res);-- LEFT JOIN(保留左表空结果)
SELECT order_id, res
FROM Orders
LEFT JOIN LATERAL TABLE(MyTableFunc(order_id)) t(res)ON TRUE;

集合展开(UNNEST)​

将ARRAY/MAP/MULTISET列展开为多行

​示例​

-- 展开数组(带序号)
SELECT order_id, product, pos
FROM Orders
CROSS JOIN UNNEST(products) WITH ORDINALITY AS t(product, pos);-- 保存索引
SELECT * 
FROM (VALUES ('order_1'), ('order_2'))CROSS JOIN UNNEST(ARRAY['shirt', 'pants', 'hat']) WITH ORDINALITY AS t(product_name, index)id       product_name  index
=======  ============  =====
order_1  shirt             1
order_1  pants             2
order_1  hat               3
order_2  shirt             1
order_2  pants             2
order_2  hat               3-- Returns a new row for each element and its position in the array
-- assuming a Orders table with an array column `product_names`
SELECT order_id, product_name, product_index
FROM Orders CROSS JOIN UNNEST(product_names) WITH ORDINALITY AS t(product_name, product_index)-- Returns a new row each key/value pair in the map.
SELECT *
FROM (VALUES('order_1'))CROSS JOIN UNNEST(MAP['shirt', 2, 'pants', 1, 'hat', 1]) WITH ORDINALITYid       product_name  amount index
=======  ============  =====  =====
order_1  shirt             2      1
order_1  pants             1      2
order_1  hat               1      3-- Returns a new row for each instance of a element in a multiset
-- If an element has been seen twice (multiplicity is 2), it will be returned twice
WITH ProductMultiset AS(SELECT COLLECT(product_name) AS product_multisetFROM (VALUES ('shirt'), ('pants'), ('hat'), ('shirt'), ('hat')) AS t(product_name)) -- produces { 'shirt': 2, 'pants': 1, 'hat': 2 } 
SELECT id, product_name, ordinality
FROM (VALUES ('order_1'), ('order_2')) AS t(id),ProductMultisetCROSS JOIN UNNEST(product_multiset) WITHORDINALITY AS u(product_name, ordinality);id       product_name  index
=======  ============  =====
order_1  shirt             1
order_1  shirt             2
order_1  pants             3
order_1  hat               4
order_1  hat               5
order_2  shirt             1
order_2  shirt             2
order_2  pants             3
order_2  hat               4
order_1  hat               5

最佳实践指南​

​1. 状态管理优化​

  • ​Regular Join​​:设置合理TTL(如 'table.exec.state.ttl' = '7d'

  • ​Interval Join​​:根据业务需求缩小时间窗口范围

  • ​Temporal Join​​:控制版本表的历史保留周期

​2. 性能调优​

  • ​连接顺序​​:将更新频率低的表放在JOIN左侧

  • ​分区键​​:对连接键进行合理分区

  • ​异步查询​​:Lookup Join可配置异步模式提升吞吐

​3. 连接类型选择​

场景

推荐连接类型

实时订单-商品详情

Regular Join

订单-发货记录匹配

Interval Join

汇率/维度表关联

Temporal Join

外部系统数据补充

Lookup Join

复杂数据转换

Table Function Join


注意事项​

  1. ​流表JOIN限制​​:

    • 不支持非等值连接(Theta Join)

    • 不支持全外连接(FULL OUTER JOIN)的某些场景

  2. ​时间属性​​:

    • Interval Join必须使用事件时间

    • Temporal Join需正确定义水位线

  3. ​资源监控​​:Regular Join需重点监控状态大小

通过合理选择连接类型和优化配置,可以在保证语义正确性的同时实现高效流式关联分析。

​Flink SQL 窗口连接(Window Join)

窗口连接(Window Join)是一种​​基于时间窗口的关联操作​​,它将两个流中​​共享相同键(Key)且落在同一时间窗口内的数据​​进行关联。与常规连接不同,窗口连接会​​按时间分桶​​后再执行连接。

​核心特点​

  • ​时间维度​​:连接条件必须包含窗口起止时间的等值匹配(L.window_start = R.window_start AND L.window_end = R.window_end

  • ​状态管理​​:窗口结束后自动清理中间状态

  • ​输出时机​​:仅在窗口结束时输出最终结果(流模式下不输出中间结果)

  • ​支持类型​​:INNERLEFTRIGHTFULL OUTERANTISEMIJOIN


基础语法​​

SELECT ...
FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) L
[INNER|LEFT|RIGHT|FULL OUTER] JOIN (SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) R
ON L.key = R.key AND L.window_start = R.window_start  -- 必须包含窗口时间匹配AND L.window_end = R.window_end

连接类型示例​

​(1) INNER JOIN​

仅输出两表中​​同一窗口内能匹配的数据​​:

SELECT L.id, R.id, L.window_start, L.window_end
FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) L
JOIN (SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) R
ON L.id = R.id AND L.window_start = R.window_start AND L.window_end = R.window_end;
​(2) FULL OUTER JOIN​

输出两表所有数据,未匹配侧补NULL

SELECT COALESCE(L.id, R.id) AS id,L.window_start, L.window_end
FROM L FULL JOIN R ON ...;

​输出​​:

L.id

R.id

window_start

window_end

L1

NULL

2023-01-01 10:00

2023-01-01 10:05

NULL

R2

2023-01-01 10:00

2023-01-01 10:05

L3

R3

2023-01-01 10:00

2023-01-01 10:05

​(3) SEMI JOIN​

左表行在窗口内有右表匹配时返回:

SELECT * FROM L 
WHERE EXISTS (SELECT * FROM R WHERE L.id = R.id AND L.window_start = R.window_start AND L.window_end = R.window_end
);
​(4) ANTI JOIN​

左表行在窗口内无右表匹配时返回:

SELECT * FROM L 
WHERE NOT EXISTS (SELECT * FROM R WHERE L.id = R.id AND L.window_start = R.window_start AND L.window_end = R.window_end
);

执行原理​

​1. 数据分桶​

LeftTable流:
│ L1 │ 10:02 │ [10:00, 10:05) │
│ L2 │ 10:06 │ [10:05, 10:10) │RightTable流:
│ R1 │ 10:01 │ [10:00, 10:05) │
│ R2 │ 10:04 │ [10:00, 10:05) │
  • 按相同窗口规则(如5分钟滚动)对两表分桶

  • 窗口对齐:[10:00, 10:05)vs [10:00, 10:05)

​2. 连接过程​

  • ​键值匹配​​:在同一个窗口内,按ON条件(如L.id = R.id)关联数据

  • ​状态存储​​:窗口未关闭时,缓存两侧数据

  • ​结果输出​​:窗口结束时触发计算并输出

​3. 状态清理​

  • 窗口结束后立即清理对应的中间状态

  • 通过水位线(Watermark)机制保证时效性


关键限制​

​1. 语法限制​

  • ​必须匹配窗口时间​​:连接条件需包含 L.window_start = R.window_start AND L.window_end = R.window_end

  • ​窗口类型一致​​:两表必须使用相同的窗口TVF(如都是TUMBLE

  • ​会话窗口限制​​:批处理模式不支持会话窗口连接

​2. 流处理特性​

  • ​延迟数据处理​​:依赖水位线判断窗口结束,迟到的数据可能被丢弃

  • ​精确一次语义​​:需配置Checkpoint保证状态一致性


适用场景​

场景

推荐连接类型

示例

​实时订单-支付匹配​

INNER JOIN

5分钟内订单关联对应支付记录

​设备状态关联​

FULL OUTER JOIN

合并传感器双流数据(补NULL)

​异常检测​

ANTI JOIN

找出未收到心跳包的设备

​维度过滤​

SEMI JOIN

筛选出有权限访问的记录


优化建议​

  1. ​合理设置窗口大小​​:

    • 太小 → 状态频繁清理,增加开销

    • 太大 → 状态保留时间长,内存压力大

  2. ​选择高效键值​​:

    • 避免高基数字段(如用户ID)作为连接键

    • 优先使用分区键(如product_id

  3. ​状态后端调优​​:

    -- 设置状态TTL(单位毫秒)
    SET 'table.exec.state.ttl' = '3600000'; -- 1小时
  4. ​水位线配置​​:

    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND

总结​

  • ​窗口连接​​是流处理中​​按时间分桶后关联​​的核心操作。

  • ​精确匹配窗口时间​​是语法强制要求,确保时间对齐。

  • ​状态自动清理​​机制适合无限流处理。

  • 通过INNER/OUTER/ANTI/SEMIJOIN满足不同业务需求。

通过合理配置窗口大小和水位线,可以在保证准确性的同时实现高效流式关联分析。

集合操作与排序、限制


集合操作(Set Operations)​

​1. UNION 与 UNION ALL​

操作

描述

去重

示例结果

​UNION​

合并两个查询结果集

['a', 'b', 'c', 'd', 'e']

​UNION ALL​

合并两个查询结果集

['a', 'a', 'b', 'b', 'c', 'c', 'd', 'e']

​语法示例​​:

-- 去重合并
(SELECT s FROM t1) UNION (SELECT s FROM t2);-- 保留所有重复值
(SELECT s FROM t1) UNION ALL (SELECT s FROM t2);

​适用场景​​:

  • 合并多源数据(如日志表合并)

  • UNION ALL性能更高,优先使用(除非需明确去重)


​2. INTERSECT 与 INTERSECT ALL​

操作

描述

去重

示例结果

​INTERSECT​

返回两个查询共有的行

['a', 'b']

​INTERSECT ALL​

返回共有的行(保留重复)

['a', 'b', 'b']

​语法示例​​:

-- 去重交集
(SELECT s FROM t1) INTERSECT (SELECT s FROM t2);-- 保留重复交集
(SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);

​适用场景​​:

  • 找出共同用户(如活跃用户交集)

  • 数据一致性检查


​3. EXCEPT 与 EXCEPT ALL​

操作

描述

去重

示例结果

​EXCEPT​

返回左表有、右表无的行

['c']

​EXCEPT ALL​

返回左表有、右表无的行(保留重复)

['c', 'c']

​语法示例​​:

-- 去重差集
(SELECT s FROM t1) EXCEPT (SELECT s FROM t2);-- 保留重复差集
(SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);

​适用场景​​:

  • 找出新增用户(如今日新增 vs 昨日)

  • 数据差异分析


子查询条件​

​1. IN 子查询​

​功能​​:检查值是否存在于子查询结果中

​优化​​:Flink 会将其重写为 JOIN + GROUP BY

​流处理风险​​:需注意状态TTL配置,避免状态无限增长

​示例​​:

-- 查询新产品订单
SELECT user, amount 
FROM Orders
WHERE product IN (SELECT product FROM NewProducts);

​等效重写​​:

SELECT o.user, o.amount
FROM Orders o
JOIN (SELECT DISTINCT product FROM NewProducts) np
ON o.product = np.product;

​2. EXISTS 子查询​

​功能​​:检查子查询是否返回至少一行

​优化​​:同样重写为 JOIN + GROUP BY

​示例​​:

-- 存在新产品记录的订单
SELECT user, amount
FROM Orders o
WHERE EXISTS (SELECT 1 FROM NewProducts np WHERE np.product = o.product
);

​状态管理​​:

-- 设置状态TTL(单位毫秒)
SET 'table.exec.state.ttl' = '3600000'; -- 1小时

排序与限制​

​1. ORDER BY​

模式

限制

示例

​流处理​

第一排序字段必须是​​时间属性的升序​

ORDER BY event_time ASC, price DESC

​批处理​

无限制

ORDER BY price DESC, user_id ASC

​流处理示例​​:

-- 正确:按事件时间升序
SELECT * FROM Orders 
ORDER BY order_time ASC, amount DESC;-- 错误:流处理禁止非时间字段首排序
SELECT * FROM Orders 
ORDER BY amount DESC; -- 抛出异常

​2. LIMIT​

模式

支持

说明

​批处理​

需配合 ORDER BY保证确定性

​流处理​

不支持(因结果持续更新)

​批处理示例​​:

-- 查询价格最高的3个订单
SELECT * FROM Orders
ORDER BY amount DESC
LIMIT 3;

​流处理替代方案​​:

使用 Top-N窗口函数:

SELECT * FROM (SELECT *, ROW_NUMBER() OVER (ORDER BY amount DESC) AS row_numFROM Orders
) WHERE row_num <= 3;

状态管理与优化​

​1. 流处理状态风险​

集合操作和子查询(如 UNIONINEXISTS)在流模式下:

  • 需要维护中间状态

  • 状态大小取决于​​输入数据的基数​​(如不同键的数量)

​2. 调优建议​

-- 设置状态存活时间(避免OOM)
SET 'table.exec.state.ttl' = '86400000'; -- 24小时-- 启用微批处理(减少状态访问)
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.size' = '5000';

最佳实践总结​

操作

批处理

流处理

注意事项

​UNION ALL​

优先使用(高性能)

​INTERSECT​

注意状态大小

​ORDER BY​

⚠️ 时间列优先

流处理需时间属性

​LIMIT​

流处理用Top-N替代

​IN/EXISTS​

设置合理TTL


常见问题解答​

​Q1: 为什么流处理中 ORDER BY必须时间字段优先?​

A1: 流数据是无限的,必须按时间有序处理以保证水位线(Watermark)正确推进。

​Q2: 如何高效实现流数据的去重?​

A2: 使用 DISTINCTGROUP BY+ 状态TTL,或利用 ROW_NUMBER()窗口函数。

​Q3: INTERSECT ALLINNER JOIN的区别?​

A3:

  • INTERSECT ALL:按行匹配(保留重复次数)

  • INNER JOIN:按关联键匹配(可能产生笛卡尔积)

​示例​​:

-- INTERSECT ALL(按行匹配)
(SELECT 'a'), ('a'), ('b') INTERSECT ALL (SELECT 'a'), ('b'), ('b');
-- 结果:'a', 'b'-- INNER JOIN(按键匹配)
SELECT t1.s FROM t1 JOIN t2 ON t1.s = t2.s;
-- 结果:'a', 'a', 'b', 'b', 'b', 'b'

通过合理选择集合操作和状态配置,可以在批流一体中实现高效数据分析。

​Flink SQL Top-N 查询

Top-N 查询用于获取​​按某列排序后的前N条记录​​(最大或最小值)。Flink 通过 OVER窗口和 QUALIFY子句实现,支持​​批处理和流处理​​模式。

​核心特点​

  • ​动态更新​​:流模式下,Top-N 结果会随数据变化更新(需支持更新的存储)

  • ​分区支持​​:可对每个分组(PARTITION BY)独立计算 Top-N

  • ​状态管理​​:窗口 Top-N 会在窗口结束时清理中间状态


基础语法​

SELECT [column_list]
FROM (SELECT *,ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]  -- 可选分组ORDER BY sort_col1 [ASC|DESC]    -- 排序字段和方向) AS rownumFROM table_name
) 
WHERE rownum <= N  -- 必须包含此条件
[AND other_conditions];

​关键参数​

参数

说明

示例

PARTITION BY

分组列(如按类别分组)

PARTITION BY category

ORDER BY

排序字段和方向

ORDER BY sales DESC

rownum <= N

必须的过滤条件

WHERE rownum <= 5


Top-N 类型​

​1. 常规 Top-N​

​特点​​:

  • 流模式下持续更新结果

  • 需支持更新的外部存储(如MySQL、HBase)

​示例​​:实时销售额 Top 5 商品

SELECT product_id, sales
FROM (SELECT *,ROW_NUMBER() OVER (ORDER BY sales DESC) AS rownumFROM Sales
) 
WHERE rownum <= 5;

​2. 窗口 Top-N​

​特点​​:

  • 基于时间窗口(如每10分钟)

  • 窗口结束时输出最终结果

  • 自动清理中间状态

​示例​​:每10分钟销售额 Top 3 供应商

SELECT *
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY window_start, window_end  -- 必须包含窗口时间ORDER BY price DESC) AS rownumFROM (SELECT window_start, window_end, supplier_id, SUM(price) AS priceFROM TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)GROUP BY window_start, window_end, supplier_id)
) 
WHERE rownum <= 3;

优化技巧​

​1. 省略 rownum 输出​

减少结果表写入量(仅输出变化的记录):

-- 优化后(不输出 rownum 列)
SELECT product_id, category, sales
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rownumFROM Sales
) 
WHERE rownum <= 5;

​2. 状态管理​

-- 设置状态TTL(流处理)
SET 'table.exec.state.ttl' = '3600000';  -- 1小时

​3. 唯一键约束​

确保结果表与 Top-N 查询的​​唯一键一致​​:

-- 假设 product_id 是唯一键
CREATE TABLE OutputTable (product_id STRING PRIMARY KEY,category STRING,sales BIGINT
) WITH (...);

执行原理​

​1. 常规 Top-N​

输入流: [A:10], [B:8], [C:15], [D:12], [E:9], [F:20]
状态维护:按 sales DESC 排序的 Top-N 列表
输出变化:
1. 插入 [F:20] → 输出 [F:20]
2. 插入 [C:15] → 输出 [F:20, C:15]
3. 插入 [D:12] → 输出 [F:20, C:15, D:12]
4. 更新 [A:10] → [A:25] → 触发重新排序

​2. 窗口 Top-N​

窗口 [10:00, 10:10):输入: [A:5], [B:3], [C:9], [D:7]计算: Top 2 → [C:9, D:7]
窗口结束:输出最终结果并清理状态

使用限制​

限制项

说明

​窗口类型​

仅支持滚动(TUMBLE)、滑动(HOP)、累积(CUMULATE)窗口

​会话窗口​

批处理模式不支持

​排序函数​

目前仅支持 ROW_NUMBER()(未来支持 RANK()/DENSE_RANK()

​流处理 ORDER BY​

第一排序字段必须是时间属性的升序


实战示例​

​场景1:实时热销商品排行榜​

-- 每5分钟更新全平台销量Top 10
SELECT product_id, product_name, sales
FROM (SELECT *,ROW_NUMBER() OVER (ORDER BY sales DESC) AS rnFROM ProductSales
) 
WHERE rn <= 10;

​场景2:各品类月度Top 3​

-- 按月统计每个品类的Top 3商品
SELECT category, product_id, monthly_sales
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY category, year_monthORDER BY monthly_sales DESC) AS rnFROM MonthlyCategorySales
) 
WHERE rn <= 3;

常见问题​

​Q1: 为什么流模式下 Top-N 结果会变化?​

A1: 流数据持续到达可能导致排名变化(如新记录冲榜),Flink 会通过​​撤回机制​​(Retraction)更新下游结果。

​Q2: 如何解决 Top-N 状态过大?​

A2:

  1. 增加状态TTL:SET 'table.exec.state.ttl' = '3600000'

  2. 使用窗口 Top-N 替代常规 Top-N

  3. 限制分组数量(避免高基数 PARTITION BY

​Q3: 窗口 Top-N 与普通 Top-N 如何选择?​

特性

常规 Top-N

窗口 Top-N

​输出频率​

持续更新

窗口结束时输出

​状态开销​

需长期维护

窗口结束即清理

​适用场景​

实时排行榜

时段统计(如每小时TopN)


通过合理使用 Top-N 查询,可以实现高效的实时数据分析与监控场景!

​Flink SQL 去重(Deduplication)

去重操作用于​​移除重复数据行​​,保留每组重复数据中的第一条或最后一条记录。Flink 使用 ROW_NUMBER()窗口函数实现去重,本质上是一种特殊的 Top-N 查询(N=1)。

​应用场景​

  • ​ETL 数据清洗​​:上游作业故障导致数据重复

  • ​数据质量​​:确保下游聚合计算(SUM、COUNT)的准确性

  • ​事件流处理​​:如用户行为去重、订单去重


常规去重(Regular Deduplication)​

​语法结构​

SELECT [column_list]
FROM (SELECT *,ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]  -- 去重键ORDER BY time_attr [ASC|DESC]   -- 时间属性(决定保留第一条/最后一条)) AS row_numFROM table_name
) 
WHERE row_num = 1;  -- 必须条件

​参数说明​

参数

说明

示例

PARTITION BY

去重分组键(重复判断依据)

PARTITION BY order_id

ORDER BY

时间属性(决定保留策略)

ORDER BY proctime ASC(保留第一条)

row_num = 1

必须的过滤条件

WHERE row_num = 1

​示例:订单去重​

-- 创建含处理时间的订单表
CREATE TABLE Orders (order_id STRING,user STRING,product STRING,num BIGINT,proctime AS PROCTIME()  -- 处理时间属性
) WITH (...);-- 按order_id去重,保留最先到达的记录
SELECT order_id, user, product, num
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC  -- ASC保留第一条,DESC保留最后一条) AS row_numFROM Orders
) 
WHERE row_num = 1;

窗口去重(Window Deduplication)​

​特点​

  • ​基于时间窗口​​:在固定时间窗口内去重

  • ​状态自动清理​​:窗口结束后清理中间状态

  • ​性能优化​​:适合不需要实时更新的场景

​语法结构​

SELECT [column_list]
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key...]  -- 必须包含窗口时间ORDER BY time_attr [ASC|DESC]) AS rownumFROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(time_col), INTERVAL '10' MINUTES))
) 
WHERE rownum = 1;  -- 或 rownum <= 1 或 rownum < 2

​示例:每10分钟窗口内保留最后一条投标记录​

-- 投标表(含事件时间)
CREATE TABLE Bid (bidtime TIMESTAMP(3),price DECIMAL(10, 2),item STRING,WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECOND
) WITH (...);-- 每10分钟窗口内,按item去重,保留时间最新的记录
SELECT *
FROM (SELECT bidtime, price, item, window_start, window_end,ROW_NUMBER() OVER (PARTITION BY window_start, window_end, item  -- 按窗口和item分组ORDER BY bidtime DESC  -- DESC保留最后一条(时间最大)) AS rownumFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
) 
WHERE rownum = 1;

​输出结果​​:

bidtime

price

item

window_start

window_end

rownum

08:09

5.00

D

08:00

08:10

1

08:17

6.00

F

08:10

08:20

1


去重策略对比​

​保留策略​

ORDER BY 方向

保留规则

适用场景

ASC

保留第一条(时间最早)

首次出现的数据

DESC

保留最后一条(时间最新)

最新状态的数据

​常规去重 vs 窗口去重​

特性

常规去重

窗口去重

​输出时机​

实时输出(有更新即输出)

窗口结束时输出

​状态管理​

需长期维护状态

窗口结束自动清理

​性能​

状态压力大

状态开销小

​适用场景​

需要实时去重

定时批量去重


关键技术细节​

​1. 时间属性要求​

  • ​常规去重​​:支持处理时间(PROCTIME)和事件时间

  • ​窗口去重​​:目前​​仅支持事件时间​​(未来版本支持处理时间)

​2. 语法严格性​

必须遵循以下模式,否则优化器无法识别:

-- 正确模式
SELECT * FROM (SELECT *, ROW_NUMBER() OVER (...) AS row_num FROM table
) WHERE row_num = 1;-- 错误模式(优化器无法识别)
SELECT *, ROW_NUMBER() OVER (...) AS row_num 
FROM table 
WHERE row_num = 1;  -- 缺少子查询包装

​3. 窗口类型支持​

窗口类型

是否支持

滚动窗口(TUMBLE)

滑动窗口(HOP)

累积窗口(CUMULATE)

会话窗口(SESSION)

❌(未来支持)


实战应用场景​

​场景1:用户行为去重​

-- 去除同一用户5分钟内的重复点击
SELECT user_id, page_url, click_time
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY user_id, page_urlORDER BY click_time ASC  -- 保留第一次点击) AS rnFROM UserClicks
) 
WHERE rn = 1;

​场景2:传感器数据最新状态​

-- 每1小时窗口内,保留每个传感器的最新读数
SELECT sensor_id, temperature, window_end as update_time
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY window_start, window_end, sensor_idORDER BY event_time DESC  -- 保留最新读数) AS rnFROM TABLE(TUMBLE(TABLE SensorData, DESCRIPTOR(event_time), INTERVAL '1' HOUR))
) 
WHERE rn = 1;

​场景3:订单状态去重​

-- 按订单号去重,保留最后更新的状态
SELECT order_id, status, update_time
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY order_idORDER BY update_time DESC  -- 保留最新状态) AS rnFROM OrderUpdates
) 
WHERE rn = 1;

性能优化建议​

​1. 状态管理​

-- 设置合理的状态TTL(流处理)
SET 'table.exec.state.ttl' = '3600000';  -- 1小时-- 使用窗口去重减少状态压力
SELECT ... FROM TABLE(TUMBLE(...)) WHERE rownum = 1;

​2. 分区键选择​

  • 避免高基数字段作为分区键(如用户ID)

  • 使用业务主键或复合键平衡分组粒度

​3. 时间属性选择​

  • 实时性要求高:使用处理时间(PROCTIME

  • 需要事件顺序:使用事件时间(EVENT TIME


常见问题解答​

​Q1: 去重和DISTINCT的区别?​

A1:

  • DISTINCT:基于所有选中列的完全匹配去重

  • Deduplication:可指定保留策略(第一条/最后一条),支持时间属性排序

​Q2: 窗口去重为什么需要包含window_start/end?​

A2: 这是优化器识别窗口去重模式的必要条件,确保按窗口分组去重。

​Q3: 如何处理迟到数据?​

A3: 使用事件时间+水位线机制,迟到数据会被正确处理(分配到正确窗口)。

通过合理使用去重操作,可以有效提升数据质量,确保下游分析的准确性!

​Flink SQL 模式识别(Pattern Recognition)

模式识别(Pattern Recognition)通过 MATCH_RECOGNIZE子句在数据流中​​检测复杂事件模式​​,类似于正则表达式匹配时间序列数据。

​应用场景​

  • ​金融风控​​:检测欺诈交易模式

  • ​物联网​​:识别设备异常行为序列

  • ​用户行为分析​​:发现特定用户行为路径


基础语法结构​

SELECT [column_list]
FROM table_name
MATCH_RECOGNIZE (PARTITION BY partition_key      -- 分区键(类似GROUP BY)ORDER BY time_attr              -- 时间属性(必须ASC)MEASURES                        -- 输出列定义pattern_var.column AS alias,FIRST(pattern_var.time) AS start_timeONE ROW PER MATCH               -- 输出模式AFTER MATCH skip_strategy       -- 匹配后跳转策略PATTERN (pattern_expression)   -- 模式表达式(类似正则)DEFINE                          -- 模式变量定义条件pattern_var AS condition
) AS alias

关键组件详解​

​1. PARTITION BY​

  • ​功能​​:按指定列分区,在每个分区内独立进行模式匹配

  • ​示例​​:PARTITION BY symbol(按股票代码分区)

​2. ORDER BY​

  • ​要求​​:第一排序字段必须是​​时间属性的升序​

  • ​示例​​:ORDER BY rowtime ASC

​3. PATTERN(模式表达式)​

支持类似正则表达式的语法:

操作符

含义

示例

A B

连续匹配

A后紧跟B

A+

1次或多次

贪婪匹配

A*

0次或多次

A?

0次或1次

A{3}

精确3次

A{1,5}

1到5次

​示例模式​​:

PATTERN (A B+ C?)      -- A后跟1个或多个B,可选C
PATTERN (A{3} B{2,})   -- 精确3个A后跟2个以上B

​4. DEFINE(变量定义)​

定义每个模式变量的匹配条件:

DEFINEA AS A.price > 10,                    -- 基础条件B AS B.price > A.price,              -- 引用其他变量C AS C.price > LAST(B.price, 1)      -- 使用逻辑偏移

​5. MEASURES(输出定义)​

定义匹配结果的输出列:

MEASURESFIRST(A.rowtime) AS start_time,      -- 匹配开始时间LAST(B.rowtime) AS end_time,          -- 匹配结束时间  COUNT(B.*) AS b_count                -- 统计B出现次数

完整示例分析​

​场景:股票价格V型反转检测​

-- 检测价格先下降后上升的V型模式
SELECT *
FROM Ticker
MATCH_RECOGNIZE (PARTITION BY symbolORDER BY rowtimeMEASURESFIRST(A.rowtime) AS bottom_time,A.price AS bottom_price,LAST(C.rowtime) AS recovery_timeONE ROW PER MATCHAFTER MATCH SKIP TO LAST CPATTERN (A B+ C+)DEFINEB AS B.price < LAST(B.price, 1),    -- 持续下降C AS C.price > LAST(C.price, 1)     -- 持续上升
)

​输入数据​​:

symbol | rowtime          | price
ACME   | 10:00:00 | 100
ACME   | 10:00:01 | 95    -- A
ACME   | 10:00:02 | 90    -- B
ACME   | 10:00:03 | 85    -- B  
ACME   | 10:00:04 | 80    -- B (底部)
ACME   | 10:00:05 | 85    -- C
ACME   | 10:00:06 | 90    -- C

​输出结果​​:

symbol | bottom_time | bottom_price | recovery_time
ACME   | 10:00:04   | 80          | 10:00:06

高级特性​

​1. 贪婪 vs 惰性匹配​

-- 贪婪匹配(默认):匹配尽可能多的行
PATTERN (A B* C)   -- B*会匹配所有可能的B-- 惰性匹配:匹配最少的行  
PATTERN (A B*? C)  -- B*?在满足条件时立即停止

​2. 逻辑偏移函数​

函数

描述

示例

LAST(var.col, n)

变量中倒数第n个值

LAST(B.price, 1)

FIRST(var.col, n)

变量中正数第n个值

FIRST(A.time, 1)

​3. 时间约束(WITHIN)​

限制模式匹配的时间窗口:

PATTERN (A B+ C) WITHIN INTERVAL '1' HOUR
-- 整个匹配必须在1小时内完成

​4. 聚合函数支持​

在DEFINE和MEASURES中使用聚合:

DEFINEA AS COUNT(A.*) < 5,              -- 计数B AS AVG(B.price) > 100           -- 平均值
MEASURES  SUM(A.price) AS total_price       -- 求和

匹配后策略(AFTER MATCH)​

​策略类型​

策略

描述

效果

SKIP PAST LAST ROW

跳到匹配的最后一行之后

每个事件只属于一个匹配

SKIP TO NEXT ROW

跳到匹配的起始行之后

事件可参与多个匹配

SKIP TO LAST var

跳到变量的最后一行

重叠匹配

SKIP TO FIRST var

跳到变量的第一行

可能产生无限循环

​示例比较​

-- 策略1:非重叠匹配
AFTER MATCH SKIP PAST LAST ROW-- 策略2:重叠匹配  
AFTER MATCH SKIP TO NEXT ROW

时间属性处理​

​1. 事件时间 vs 处理时间​

-- 事件时间(推荐)
ORDER BY rowtime ASC-- 处理时间
ORDER BY proctime ASC

​2. 时间函数​

MEASURESMATCH_ROWTIME() AS match_time,      -- 匹配时间属性MATCH_PROCTIME() AS proc_time       -- 处理时间属性

性能优化建议​

​1. 分区优化​

-- 合理分区减少状态大小
PARTITION BY user_id, device_type

​2. 模式设计​

  • 避免无限量词:A*→ 使用A{1,100}限制范围

  • 使用时间约束:WITHIN INTERVAL '10' MINUTE

  • 尽早使用严格条件过滤

​3. 状态管理​

-- 使用WITHIN限制匹配时间,避免状态无限增长
PATTERN (A B+ C) WITHIN INTERVAL '1' HOUR

当前限制​

​语法限制​

  • ❌ 模式分组:(A (B C)+)

  • ❌ 选择操作:(A B | C D)

  • ❌ 排列操作:PERMUTE(A, B, C)

  • ❌ 排除模式:{- A -} B

  • ❌ 惰性可选量词:A??

​功能限制​

  • ALL ROWS PER MATCH输出模式

  • DISTINCT聚合

  • ❌ Table API支持


实战案例​

​案例1:用户会话超时检测​

-- 检测用户30分钟内无活动的会话超时
SELECT user_id, session_start, timeout_time
FROM UserEvents
MATCH_RECOGNIZE (PARTITION BY user_idORDER BY event_timeMEASURESFIRST(A.event_time) AS session_start,LAST(B.event_time) AS timeout_timeONE ROW PER MATCHPATTERN (A B*) WITHIN INTERVAL '30' MINUTEDEFINEB AS B.event_time > LAST(A.event_time, 1) + INTERVAL '5' MINUTE
)

​案例2:设备连续故障检测​

-- 检测设备连续3次读数异常
SELECT device_id, error_start, error_count
FROM SensorReadings  
MATCH_RECOGNIZE (PARTITION BY device_idORDER BY reading_timeMEASURESFIRST(A.reading_time) AS error_start,COUNT(B.*) + 1 AS error_countONE ROW PER MATCHPATTERN (A B{2,})DEFINEA AS A.value > 100 OR A.value < 0,B AS B.value > 100 OR B.value < 0
)

​总结​

MATCH_RECOGNIZE是Flink SQL中最强大的复杂事件处理工具,通过类正则表达式语法实现​​时间序列模式检测​​。合理使用分区、时间约束和匹配策略可以构建高效的实时模式识别应用。

​Flink SQL 时间旅行(Time Travel)

时间旅行(Time Travel)允许查询​​历史时间点​​的表数据快照,实现​​数据版本回溯​​功能。

​应用场景​

  • ​数据审计​​:查看特定时间点的数据状态

  • ​错误恢复​​:分析数据变更历史,定位问题

  • ​版本对比​​:比较不同时间点的数据差异

  • ​合规要求​​:满足数据追溯的法规要求


基础语法​

SELECT column_list 
FROM table_name 
FOR SYSTEM_TIME AS OF timestamp_expression

​参数说明​

参数

说明

示例

table_name

目标表名

orders

timestamp_expression

时间点表达式

TIMESTAMP '2023-07-31 00:00:00'


时间表达式格式​

​1. 直接时间戳常量​

-- 查询2023年7月31日零点数据
SELECT * FROM orders 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 00:00:00';-- 查询当前时间数据(最新版本)
SELECT * FROM orders 
FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP;

​2. 时间计算表达式​

-- 查询1天前的数据
SELECT * FROM orders 
FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP - INTERVAL '1' DAY;-- 查询特定日期前3小时的数据
SELECT * FROM orders 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 00:00:00' - INTERVAL '3' HOUR;-- 查询一周前的数据
SELECT * FROM orders 
FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP - INTERVAL '7' DAY;

​3. 函数表达式(有限支持)​

-- 使用日期函数(需能简化为常量)
SELECT * FROM orders 
FOR SYSTEM_TIME AS OF DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd 00:00:00');

技术实现要求​

​1. Catalog支持​

时间旅行需要底层Catalog实现特定方法:

// Catalog必须实现的方法
Table getTable(ObjectPath tablePath, long timestamp);

​支持的Catalog类型​​:

  • ​Apache Paimon​​:原生支持时间旅行

  • ​Hive​​:3.0+版本支持ACID和快照

  • ​自定义Catalog​​:需实现时间旅行接口

​2. 表类型限制​

表类型

是否支持

说明

​物理表​

支持时间旅行

​视图(View)​

不支持

​子查询​

不支持

​内存表​

无版本管理


时区处理机制​

​关键问题​

时间表达式中的TIMESTAMP类型会​​根据本地时区​​转换为LONG类型时间戳,导致不同时区的相同查询可能返回不同结果。

​示例:时区影响​

-- 在UTC+8时区执行
SELECT * FROM logs 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 08:00:00';-- 在UTC时区执行相同查询,实际查询的是
-- UTC+8的08:00 = UTC时间的00:00点数据

​最佳实践​

-- 明确指定时区,避免歧义
SELECT * FROM logs 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 08:00:00+08:00';-- 使用UTC时间标准
SELECT * FROM logs 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 00:00:00Z';

表达式限制​

​支持的表达式类型​

类型

示例

说明

​TIMESTAMP常量​

TIMESTAMP '2023-07-31 00:00:00'

直接支持

​时间加减​

CURRENT_TIMESTAMP - INTERVAL '1' DAY

支持

​简单函数​

DATE_FORMAT(...)

有限支持

​不支持的表达式​

-- ❌ 无法在解析时简化为常量的表达式
SELECT * FROM table_name 
FOR SYSTEM_TIME AS OF TO_TIMESTAMP_LTZ(0, 3);-- ❌ 运行时才能确定的表达式  
SELECT * FROM table_name 
FOR SYSTEM_TIME AS OF some_user_defined_function();-- ❌ 子查询结果作为时间点
SELECT * FROM table_name 
FOR SYSTEM_TIME AS OF (SELECT max(update_time) FROM other_table);

​错误信息​​:

Unsupported time travel expression: TO_TIMESTAMP_LTZ(0, 3) 
for the expression can not be reduced to a constant by Flink.

完整示例​

​场景1:数据错误排查​

-- 排查今天凌晨的数据问题
SELECT * 
FROM user_transactions 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 06:00:00'
WHERE amount > 10000;-- 对比当前数据与1小时前的差异
SELECT t1.user_id, t1.amount as current_amount, t2.amount as previous_amount
FROM user_transactions t1
LEFT JOIN user_transactions 
FOR SYSTEM_TIME AS OF CURRENT_TIMESTAMP - INTERVAL '1' HOUR t2
ON t1.user_id = t2.user_id
WHERE t1.amount != t2.amount OR t2.amount IS NULL;

​场景2:业务指标历史分析​

-- 分析上月同期的销售数据
SELECT product_category, SUM(sales_amount) as total_sales
FROM sales_records 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-06-15 00:00:00'
WHERE sale_date BETWEEN '2023-06-01' AND '2023-06-15'
GROUP BY product_category;-- 对比今年和去年同期的数据
SELECT this_year.product_id,this_year.sales as sales_2023,last_year.sales as sales_2022
FROM (SELECT product_id, SUM(amount) as salesFROM sales FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 23:59:59'WHERE sale_date LIKE '2023-07-%'GROUP BY product_id
) this_year
LEFT JOIN (SELECT product_id, SUM(amount) as sales  FROM salesFOR SYSTEM_TIME AS OF TIMESTAMP '2022-07-31 23:59:59'WHERE sale_date LIKE '2022-07-%'GROUP BY product_id
) last_year ON this_year.product_id = last_year.product_id;

流批一体支持​

​批处理模式​

-- 批处理中查询历史快照
SELECT * FROM historical_table 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-01-01 00:00:00';

​流处理模式​

-- 流处理中可查询维表的历史版本(时态表连接)
SELECT o.order_id, p.historical_price
FROM orders o
JOIN product_prices 
FOR SYSTEM_TIME AS OF o.order_time p  -- 时间旅行用于时态连接
ON o.product_id = p.product_id;

性能优化建议​

​1. 时间点选择​

-- ✅ 优先使用精确时间点
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 12:00:00'-- ❌ 避免过于频繁的时间旅行查询
-- 可能触发大量历史版本加载

​2. 数据版本管理​

  • ​合理设置数据保留策略​​:避免保存过多历史版本影响性能

  • ​定期清理过期快照​​:使用表管理命令清理旧版本

​3. 查询优化​

-- 结合分区过滤减少数据扫描
SELECT * FROM partitioned_table 
FOR SYSTEM_TIME AS OF TIMESTAMP '2023-07-31 00:00:00'
WHERE partition_date = '2023-07-31';

注意事项​

​1. 数据一致性​

  • 时间旅行查询的是​​指定时间点的快照​​,不反映后续变更

  • 在长时间运行的事务中,可能读到​​历史一致视图​

​2. 错误处理​

-- 如果指定时间点无数据版本,可能返回空结果或错误
SELECT * FROM table_name 
FOR SYSTEM_TIME AS OF TIMESTAMP '1990-01-01 00:00:00';

​3. 存储引擎要求​

不同存储引擎的时间旅行支持程度:

存储引擎

时间旅行支持

备注

​Apache Paimon​

✅ 完整支持

基于快照机制

​Apache Iceberg​

✅ 支持

时间旅行查询

​Hudi​

✅ 有限支持

增量查询模式

​JDBC​

❌ 不支持

无版本管理


未来发展方向​

​计划增强功能​

  • ​更灵活的表达式支持​​:支持更多时间计算函数

  • ​跨表时间一致性​​:多表在同一时间点查询

  • ​流式时间旅行​​:实时流中的历史数据查询

  • ​性能优化​​:增量式历史数据加载

时间旅行是Flink SQL中强大的​​数据追溯工具​​,结合版本化表存储(如Paimon),为数据审计、故障排查和时序分析提供了重要能力。

http://www.dtcms.com/a/430671.html

相关文章:

  • 建设网站的合同招远网站建设
  • 免费域名的网站有哪些可视化建网站
  • 【Linuxvs code】Xshell远程配置到VS Code环境配置指南
  • 微服务网关深度设计:从Spring Cloud Gateway到Envoy,流量治理与安全认证实战指南
  • 全新体验:利用Istio提升微服务安全与监控
  • Nuitka加快打包速度(ccache)全平台配置——持续更新中
  • 大数据毕业设计选题推荐-基于大数据的全球能源消耗量数据分析与可视化系统-大数据-Spark-Hadoop-Bigdata
  • 机械行业做网站猎头公司找的工作怎么样
  • 04_Numpy结构化数组
  • 深圳市龙华区价格优化网站建设
  • 博客标题:解密 IntelliJ IDEA 调试:当你的 List 不仅仅是 List
  • 12.如何使用 JavaScript 构建便签应用程序 | 入门项目
  • 第四届云计算、大数据应用与软件工程国际学术会议(CBASE 2025)
  • 全栈工程师项目练习记录
  • Vue CLI为何不显示webpack配置
  • 设计模式之策略模式学习
  • 自己做的网站外国人能访问吗广告设计公司有哪些渠道通路
  • 分布式专题——24 Kafka功能扩展
  • 范式革命:RDMA 如何让网络成为 “分布式内存总线”
  • 如何弄公司网站青岛专业网站制作
  • Langchain4j笔记
  • 云计算介绍
  • 什么是Redis哨兵机制?
  • Web本体语言(OWL)
  • 快学快用系列:一文学会java后端WebApi开发
  • 网站加速器免费永久网站开发学习课程
  • SpringBoot 整合Jasypt 实现配置文件加密读取操作详解
  • apache 服务器如何使用
  • CI/CD 流水线与 agentic AI:如何创建自我纠正的 monorepos
  • Coze源码分析-资源库-编辑工作流-后端源码-IDL/API/应用服务层