Flink之Table API
Apache Flink 的 Table API 是 Flink 提供的一种高级抽象,用于以声明式方式处理批处理和流处理数据。它是基于关系模型的 API,用户可以像编写 SQL 一样,以简洁、类型安全的方式编写数据处理逻辑。
一、基本概念
1. 什么是 Table API?
Table API 是 Flink 中用于处理结构化数据的 关系型编程接口,它支持:
-
批处理(Batch)
-
流处理(Streaming)
Table API 提供了类似 SQL 的语法风格,但以函数式 API 方式表达,具备更好的类型安全和 IDE 友好性。
二、核心组件
1. Table
-
Flink 中的
Table
是对结构化数据的一种抽象。 -
相当于数据库中的表,可以进行过滤、聚合、连接等操作。
2. TableEnvironment
-
Table API 的执行上下文。
-
创建表、注册 UDF、执行 SQL/Table API 操作等都依赖它。
3. Schema(模式)
-
定义表结构,包括字段名、数据类型、主键、水位线等。
三、编程模型
// 1. 创建 TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);// 2. 注册表(从外部数据源)
tableEnv.executeSql("""CREATE TABLE source_table (id STRING,ts TIMESTAMP(3),val INT,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH ('connector' = 'kafka','topic' = 'test',...)
""");// 3. 使用 Table API 处理数据
Table result = tableEnv.from("source_table").filter($("val").isGreater(10)).groupBy($("id")).select($("id"), $("val").avg().as("avg_val"));// 4. 输出结果到目标表
tableEnv.executeSql("""CREATE TABLE sink_table (id STRING,avg_val DOUBLE) WITH ('connector' = 'print')
""");result.executeInsert("sink_table");
四、常用操作
操作类型 | 示例 |
---|---|
过滤 | table.filter($("age").isGreater(18)) |
投影 | table.select($("name"), $("age")) |
聚合 | table.groupBy($("city")).select($("city"), $("salary").avg()) |
连接 | table1.join(table2).where(...).select(...) |
去重 | table.distinct() |
排序 | table.orderBy($("time").desc()) |
窗口 | table.window(...) (见下文) |
五、时间和窗口支持
Table API 支持两种时间语义:
-
事件时间(Event Time)
-
处理时间(Processing Time)
常见的窗口类型:
-
滚动窗口(Tumble)
-
滑动窗口(Slide)
-
会话窗口(Session)
示例:
table.window(Tumble.over(lit(10).minutes()).on($("ts")).as("w")).groupBy($("id"), $("w")).select($("id"), $("w").start(), $("val").sum());
六、与 SQL 的关系
Table API 与 SQL 是等价的抽象:
-
SQL 更加声明式,适合数据分析人员;
-
Table API 更加灵活、支持编程逻辑,适合开发者。
两者可以混合使用,例如:
Table result = tableEnv.sqlQuery("SELECT id, COUNT(*) FROM source GROUP BY id");
七、数据源和连接器支持
Table API 支持多种数据源和 sink,通过 Flink Connector 实现:
常见的:
-
Kafka
-
HDFS
-
MySQL / JDBC
-
Elasticsearch
-
Hive
-
Iceberg / Delta / Hudi
-
Redis 等
通过 SQL 创建表:
CREATE TABLE example (...
) WITH ('connector' = 'kafka',...
);
八、UDF 扩展
可以定义自定义函数:
-
ScalarFunction(标量函数)
-
TableFunction(表函数)
-
AggregateFunction(聚合函数)
-
TableAggregateFunction(表聚合函数)
示例:
public class HashCode extends ScalarFunction {public int eval(String s) {return s.hashCode();}
}tableEnv.createTemporarySystemFunction("HashCode", HashCode.class);
table.select(call("HashCode", $("name")));
九、批与流统一
Flink 的 Table API 实现了 批流统一语义,相同的 API 可用于处理批或流数据,只需切换 EnvironmentSettings
即可。
十、优点总结
-
统一的 API:批流统一,开发逻辑一致
-
类型安全:Java/Scala 函数式风格,IDE 友好
-
与 SQL 无缝切换
-
可插拔的连接器与格式支持
-
强大的时间与窗口语义支持
-
与 Flink Runtime 深度整合,性能高效