当前位置: 首页 > news >正文

Flink Table API SQL 概念、常用 API 与工程落地

1. 项目结构与最小闭环

1.1 建立 TableEnvironment(入口)

import org.apache.flink.table.api.*;EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode()   // 或 .inBatchMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);

如果需要与 DataStream 互操作,使用 StreamTableEnvironment

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // 支持流表互转

1.2 注册 Source / Sink(Catalog 中的表)

  • 临时表(temporary):仅在当前会话内存在,内存持久。
  • 永久表(permanent):依赖外部 Catalog(如 Hive Metastore),跨会话可见。

示例:使用内置 datagen 构造源表;用 blackhole 作为下游(丢弃)。

import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;tEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen").schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build()).option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L).build());tEnv.executeSql("CREATE TEMPORARY TABLE SinkTable " +"WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS)"
);

1.3 查询并输出(Table API / SQL 任你选)

Table API:

Table t = tEnv.from("SourceTable");  // 扫描
TableResult tr = t.insertInto("SinkTable").execute(); // 直写 Sink

SQL:

Table t2 = tEnv.sqlQuery("SELECT * FROM SourceTable");
t2.insertInto("SinkTable").execute();

以上完成了从源到汇的“最小闭环”。

2. Catalog 与标识符(Identifier)的那些事

Table 的三段式标识为:catalog.database.table。你可以设置“当前 catalog / database”,省略前两段:

tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");// 注册到当前命名空间
tEnv.createTemporaryView("exampleView", tEnv.from("SourceTable"));// 指定数据库
tEnv.createTemporaryView("other_database.exampleView", tEnv.from("SourceTable"));// 标识中包含点号时用反引号转义
tEnv.createTemporaryView("`example.View`", tEnv.from("SourceTable"));// 完整三段式
tEnv.createTemporaryView("other_catalog.other_database.exampleView",tEnv.from("SourceTable"));

2.1 Temporary Shadowing(影子覆盖)

同名临时表会覆盖永久表(shadowing)。这对A/B 验证、脱敏联调很有用:先在同名临时表上验证,确认后删除临时表即可切回生产表。

3. 创建表:Virtual(视图)与 Connector(外部系统)

3.1 视图(Virtual Table / VIEW)

Table API/SQL 的查询结果是逻辑计划,可注册为临时视图

Table proj = tEnv.from("X").select($("a"), $("b")); // 逻辑查询
tEnv.createTemporaryView("projectedTable", proj);

注意:视图不会物化、也不会共享执行,多处引用会分别内联执行

3.2 Connector 表(外部系统)

既可用 Table API,也可用 SQL DDL:

final TableDescriptor source = TableDescriptor.forConnector("datagen").schema(Schema.newBuilder().column("f0", DataTypes.STRING()).build()).option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L).build();tEnv.createTable("SourceTableA", source);           // 永久表(依 catalog)
tEnv.createTemporaryTable("SourceTableB", source);  // 临时表// SQL DDL
tEnv.executeSql("CREATE TEMPORARY TABLE MyTable (...) WITH (...)");

4. 查询 Table:Table API 与 SQL 混搭

4.1 Table API(强类型、链式)

Table orders = tEnv.from("Orders");Table revenue = orders.filter($("cCountry").isEqual("FRANCE")).groupBy($("cID"), $("cName")).select($("cID"), $("cName"), $("revenue").sum().as("revSum"));

4.2 SQL(字符串、标准兼容)

Table revenue = tEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum " +"FROM Orders WHERE cCountry = 'FRANCE' GROUP BY cID, cName"
);

4.3 混搭方式

  • 在 SQL 结果返回的 Table 上继续走 Table API。
  • 反之,把 Table API 的结果注册为视图,让 SQL 的 FROM 去引用。

5. 落地与执行:Sink、Pipeline、StatementSet

5.1 写出到 Sink(文件系统示例)

final Schema schema = Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()).column("c", DataTypes.BIGINT()).build();tEnv.createTemporaryTable("CsvSinkTable",TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/path/to/file").format(FormatDescriptor.forFormat("csv").option("field-delimiter", "|").build()).build()
);Table result = ...; // 你的查询结果TablePipeline pipeline = result.insertInto("CsvSinkTable");
pipeline.printExplain();   // 打印计划
pipeline.execute();        // 执行

5.2 多 Sink(StatementSet 合并优化)

StatementSet stmt = tEnv.createStatementSet();Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmt.add(table1.insertInto("MySink1"));Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmt.add(table2.insertInto("MySink2"));String plan = stmt.explain(); // 查看合并后的 DAG
stmt.execute();               // 一次性优化并执行

6. 何时“被翻译并执行”?

  • 翻译为 DataStream 程序的时机:

    • TableEnvironment.executeSql(...)
    • TablePipeline.execute()
    • Table.execute()(本地 collect)
    • StatementSet.execute()
  • Table ↔ DataStream 转换后,执行由 StreamExecutionEnvironment.execute() 驱动(针对流环境)。

7. 查询优化(Calcite 加持)

Flink 基于/扩展 Calcite 做规则与基于代价的优化,包括但不限于:

  • 子查询去相关(decorrelation)
  • 投影/分区裁剪过滤下推
  • 子计划复用/去重
  • 子查询改写:IN/EXISTS → left semi-join;NOT IN/NOT EXISTS → left anti-join
  • 可选 Join 重排序(table.optimizer.join-reorder-enabled

高阶用法:通过 CalciteConfig 自定义优化规则:tEnv.getConfig().setPlannerConfig(...)

8. 计划解释(Explain):看懂三张“图”

你可以在 Table / StatementSet 上直接 explain(),或用 SQL 的 EXPLAIN

  • 未优化的逻辑计划(AST)
  • 优化后的逻辑计划
  • 物理执行计划
System.out.println(table.explain());  // 单个表
System.out.println(stmtSet.explain()); // 多 Sink

9. DataStream 集成(一点提示)

  • StreamTableEnvironment 支持 fromDataStream / toDataStream 互转;
  • 转回 DataStream 后就是“普通”流作业,执行由 env.execute() 触发;
  • 注意:不同 TableEnvironment 下的表不能互联(不能直接 join/union)。

10. 工程实践清单(上线前自查)

命名与 Catalog

  • 明确 catalog.database.table,在多 catalog 环境中显式指定,避免歧义
  • 用临时表做影子覆盖(shadowing)做 A/B/脱敏验证,验证完及时清理

Schema 与兼容性

  • LIKE ... (EXCLUDING OPTIONS) 复用 schema 时,确认 connector 必填项
  • Sink 与结果表 schema 完全一致(类型、顺序、可空性)

性能与稳定性

  • StatementSet 合并多写出,减少重复扫描与 shuffle
  • 启用 filter/partition pruning 能力(取决于 connector 能力)
  • 大作业建议加 explain() 与 Web UI 检查并行度、链路与 shuffle

可观测与排障

  • 对关键视图/临时表加 explain 快照,便于问题定位
  • Source/Sink 侧指标(QPS、背压、失败率、延迟)接入监控

语言与 API

  • 避免新项目再依赖 Scala API;统一 Java Table API / SQL

11. 参考片段:从源 → 查询 → 多 Sink → Explain → 执行(可直接改造)

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);// 1) 注册两个文件源与两个文件 Sink(略:见上文 Csv/Filesystem 示例)// 2) 查询
Table s1 = tEnv.from("MySource1").where($("word").like("F%"));
Table s2 = tEnv.from("MySource2");
Table unioned = s1.unionAll(s2);// 3) 输出(多 Sink 合并执行)
StatementSet stmt = tEnv.createStatementSet();
stmt.add(s1.insertInto("MySink1"));
stmt.add(unioned.insertInto("MySink2"));// 4) 打印计划并执行
System.out.println(stmt.explain());
stmt.execute();

12. 结语

  • 记住闭环:TableEnvironment → 注册表/视图 → Table API/SQL 查询 → Sink → Explain → Execute
  • 善用 临时表 + 影子覆盖 做联调与灰度;用 StatementSet 做多写合并;用 Explain 看穿优化与物理计划。
  • 新项目优先 Java Table API/SQL,提前规避 Scala API 的弃用风险。
http://www.dtcms.com/a/569759.html

相关文章:

  • rag-mcp
  • 建筑人才网站关于建网站新闻
  • 【设计模式】UML和设计原则
  • 东莞网站开发找谁建筑网站建设公司
  • 2025进博会4310家展商名录
  • 手机软件开发网站个人网站名称有哪些
  • 名气特别高的手表网站遵义市做网站的地方
  • LNMT部署zrlog个人博客(动静分离集群)
  • 企业网站功能模块设计阜阳哪里有做网站的
  • 如何做网站赚流量钱织梦做中英文企业网站
  • 【Unet++ MobileNetv2语义分割部署至RK3588】模型训练→转换RKNN→开发板部署
  • 深圳做网站做app我想自己做网站可以赚钱
  • 新颖网站页面设计wordpress弹窗登录注册插件
  • 服务端开发的基本概念
  • 快递鸟电子面单打印接口技术对接文档
  • FreeRTOS事件组全解析:多任务同步核心技巧
  • 网站的头尾和导航的公用文件wordpress主题 问答
  • 【GlobalMapper精品教程】096:连接PostGIS数据库(Postgresql)
  • 网站备案服务商查询网站域名使用期
  • 机器学习日报12
  • 【文档】Stomp 协议
  • 自己的网站怎么接广告联盟设计库
  • 网站怎么解析域名解析网站制作公司官网南京
  • liunx文件及目录管理和vim编辑
  • [leetcode]对顶堆,对数时间添加元素,常数时间取中位数(或者第K大的数)
  • 公司软件网站建设免费dw网页模板
  • 力扣146LRU缓存
  • 网站怎么做充值系统下载网站需要写哪些内容
  • 网站有没有做网站地图怎么看vi设计案例ppt
  • 网站页面设计培训班长沙人才招聘网最新招聘2024