Flink Table API SQL 概念、常用 API 与工程落地
1. 项目结构与最小闭环
1.1 建立 TableEnvironment(入口)
import org.apache.flink.table.api.*;EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode() // 或 .inBatchMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);
如果需要与 DataStream 互操作,使用
StreamTableEnvironment:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 支持流表互转
1.2 注册 Source / Sink(Catalog 中的表)
- 临时表(temporary):仅在当前会话内存在,内存持久。
- 永久表(permanent):依赖外部 Catalog(如 Hive Metastore),跨会话可见。
示例:使用内置 datagen 构造源表;用 blackhole 作为下游(丢弃)。
import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;tEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen").schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build()).option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L).build());tEnv.executeSql("CREATE TEMPORARY TABLE SinkTable " +"WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS)"
);
1.3 查询并输出(Table API / SQL 任你选)
Table API:
Table t = tEnv.from("SourceTable"); // 扫描
TableResult tr = t.insertInto("SinkTable").execute(); // 直写 Sink
SQL:
Table t2 = tEnv.sqlQuery("SELECT * FROM SourceTable");
t2.insertInto("SinkTable").execute();
以上完成了从源到汇的“最小闭环”。
2. Catalog 与标识符(Identifier)的那些事
Table 的三段式标识为:catalog.database.table。你可以设置“当前 catalog / database”,省略前两段:
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");// 注册到当前命名空间
tEnv.createTemporaryView("exampleView", tEnv.from("SourceTable"));// 指定数据库
tEnv.createTemporaryView("other_database.exampleView", tEnv.from("SourceTable"));// 标识中包含点号时用反引号转义
tEnv.createTemporaryView("`example.View`", tEnv.from("SourceTable"));// 完整三段式
tEnv.createTemporaryView("other_catalog.other_database.exampleView",tEnv.from("SourceTable"));
2.1 Temporary Shadowing(影子覆盖)
同名临时表会覆盖永久表(shadowing)。这对A/B 验证、脱敏联调很有用:先在同名临时表上验证,确认后删除临时表即可切回生产表。
3. 创建表:Virtual(视图)与 Connector(外部系统)
3.1 视图(Virtual Table / VIEW)
Table API/SQL 的查询结果是逻辑计划,可注册为临时视图:
Table proj = tEnv.from("X").select($("a"), $("b")); // 逻辑查询
tEnv.createTemporaryView("projectedTable", proj);
注意:视图不会物化、也不会共享执行,多处引用会分别内联执行。
3.2 Connector 表(外部系统)
既可用 Table API,也可用 SQL DDL:
final TableDescriptor source = TableDescriptor.forConnector("datagen").schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build()).option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L).build();tEnv.createTable("SourceTableA", source); // 永久表(依 catalog)
tEnv.createTemporaryTable("SourceTableB", source); // 临时表// SQL DDL
tEnv.executeSql("CREATE TEMPORARY TABLE MyTable (...) WITH (...)");
4. 查询 Table:Table API 与 SQL 混搭
4.1 Table API(强类型、链式)
Table orders = tEnv.from("Orders");Table revenue = orders.filter($("cCountry").isEqual("FRANCE")).groupBy($("cID"), $("cName")).select($("cID"), $("cName"), $("revenue").sum().as("revSum"));
4.2 SQL(字符串、标准兼容)
Table revenue = tEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders WHERE cCountry = 'FRANCE' GROUP BY cID, cName"
);
4.3 混搭方式
- 在 SQL 结果返回的
Table上继续走 Table API。 - 反之,把 Table API 的结果注册为视图,让 SQL 的
FROM去引用。
5. 落地与执行:Sink、Pipeline、StatementSet
5.1 写出到 Sink(文件系统示例)
final Schema schema = Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()).column("c", DataTypes.BIGINT()).build();tEnv.createTemporaryTable("CsvSinkTable",TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/path/to/file").format(FormatDescriptor.forFormat("csv").option("field-delimiter", "|").build()).build()
);Table result = ...; // 你的查询结果TablePipeline pipeline = result.insertInto("CsvSinkTable");
pipeline.printExplain(); // 打印计划
pipeline.execute(); // 执行
5.2 多 Sink(StatementSet 合并优化)
StatementSet stmt = tEnv.createStatementSet();Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmt.add(table1.insertInto("MySink1"));Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmt.add(table2.insertInto("MySink2"));String plan = stmt.explain(); // 查看合并后的 DAG
stmt.execute(); // 一次性优化并执行
6. 何时“被翻译并执行”?
-
翻译为 DataStream 程序的时机:
TableEnvironment.executeSql(...)TablePipeline.execute()Table.execute()(本地 collect)StatementSet.execute()
-
Table ↔ DataStream 转换后,执行由
StreamExecutionEnvironment.execute()驱动(针对流环境)。
7. 查询优化(Calcite 加持)
Flink 基于/扩展 Calcite 做规则与基于代价的优化,包括但不限于:
- 子查询去相关(decorrelation)
- 投影/分区裁剪、过滤下推
- 子计划复用/去重
- 子查询改写:
IN/EXISTS→ left semi-join;NOT IN/NOT EXISTS→ left anti-join - 可选 Join 重排序(
table.optimizer.join-reorder-enabled)
高阶用法:通过
CalciteConfig自定义优化规则:tEnv.getConfig().setPlannerConfig(...)。
8. 计划解释(Explain):看懂三张“图”
你可以在 Table / StatementSet 上直接 explain(),或用 SQL 的 EXPLAIN:
- 未优化的逻辑计划(AST)
- 优化后的逻辑计划
- 物理执行计划
System.out.println(table.explain()); // 单个表
System.out.println(stmtSet.explain()); // 多 Sink
9. DataStream 集成(一点提示)
StreamTableEnvironment支持fromDataStream/toDataStream互转;- 转回 DataStream 后就是“普通”流作业,执行由
env.execute()触发; - 注意:不同 TableEnvironment 下的表不能互联(不能直接 join/union)。
10. 工程实践清单(上线前自查)
命名与 Catalog
- 明确
catalog.database.table,在多 catalog 环境中显式指定,避免歧义 - 用临时表做影子覆盖(shadowing)做 A/B/脱敏验证,验证完及时清理
Schema 与兼容性
-
LIKE ... (EXCLUDING OPTIONS)复用 schema 时,确认 connector 必填项 - Sink 与结果表 schema 完全一致(类型、顺序、可空性)
性能与稳定性
- StatementSet 合并多写出,减少重复扫描与 shuffle
- 启用 filter/partition pruning 能力(取决于 connector 能力)
- 大作业建议加
explain()与 Web UI 检查并行度、链路与 shuffle
可观测与排障
- 对关键视图/临时表加 explain 快照,便于问题定位
- Source/Sink 侧指标(QPS、背压、失败率、延迟)接入监控
语言与 API
- 避免新项目再依赖 Scala API;统一 Java Table API / SQL
11. 参考片段:从源 → 查询 → 多 Sink → Explain → 执行(可直接改造)
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);// 1) 注册两个文件源与两个文件 Sink(略:见上文 Csv/Filesystem 示例)// 2) 查询
Table s1 = tEnv.from("MySource1").where($("word").like("F%"));
Table s2 = tEnv.from("MySource2");
Table unioned = s1.unionAll(s2);// 3) 输出(多 Sink 合并执行)
StatementSet stmt = tEnv.createStatementSet();
stmt.add(s1.insertInto("MySink1"));
stmt.add(unioned.insertInto("MySink2"));// 4) 打印计划并执行
System.out.println(stmt.explain());
stmt.execute();
12. 结语
- 记住闭环:TableEnvironment → 注册表/视图 → Table API/SQL 查询 → Sink → Explain → Execute。
- 善用 临时表 + 影子覆盖 做联调与灰度;用 StatementSet 做多写合并;用 Explain 看穿优化与物理计划。
- 新项目优先 Java Table API/SQL,提前规避 Scala API 的弃用风险。
