Flink DataStream × Table API 融合双向转换、变更流、批流一体与执行模型
1. 什么时候需要“混用”?
- 先用 Table 生态(Catalog、Connector、SQL 函数)做取数/清洗,再回到 DataStream 写低阶算子(自定义定时器、精细状态)。
- 某些 无状态归一化(大小写、正则清洗、简单投影)交给 SQL;复杂业务/乱序处理交给 DataStream。
- 历史补数(有界流)走 Batch 模式;实时(无界流)走 Streaming 模式,但同一套管道尽量复用。
跨 API 会有轻微结构转换开销(RowData ↔ Row)。绝大多数场景可以忽略,但对极致延迟敏感需关注。
2. 环境初始化与“谁来执行”
// 统一构建:先 DataStream,再桥接 Table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- DataStream 执行:
env.execute()提交整套已构建的管道。 - Table 执行:
TablePipeline.execute()/executeSql()/StatementSet.execute()只提交“表内 source→sink”闭环。 - 混合:
tEnv.toDataStream(...)或toChangelogStream(...)会物化其 Table 子管道并挂入 DataStream Builder;必须再env.execute()才会跑。
3. 基础:DataStream ↔ Table(Insert-Only)
3.1 DataStream → Table(插入流)
DataStream<String> ds = env.fromElements("Alice","Bob","John");// 自动派生列:f0
Table t = tEnv.fromDataStream(ds);// 注册成视图,用 SQL 继续处理
tEnv.createTemporaryView("InputTable", t);
Table upper = tEnv.sqlQuery("SELECT UPPER(f0) AS name FROM InputTable");
3.2 Table → DataStream(插入流)
// Row(默认)
DataStream<Row> out = tEnv.toDataStream(upper);
out.print();
env.execute();
// 输出示例:+I[ALICE]、+I[BOB]...
注意:
toDataStream仅支持 append-only 表(只 INSERT)。若含聚合/窗口等会产生更新,需改用toChangelogStream。
4. 高阶:Changelog(变更流)处理
4.1 带更新的 Table → 变更流
// name, SUM(score) 会产生更新
Table result = tEnv.sqlQuery("SELECT name, SUM(score) AS s FROM InputTable GROUP BY name");// 变更流:RowKind 标记 +I/-U/+U/-D
DataStream<Row> changelog = tEnv.toChangelogStream(result);
4.2 变更流 → Table
DataStream<Row> cdc = env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));Table t = tEnv.fromChangelogStream(cdc); // 默认接受全量 RowKind
优化:若上游保证 upsert 语义,可
fromChangelogStream(..., schema.withPrimaryKey(...), ChangelogMode.upsert()),减少 50% 的 UPDATE_BEFORE 流量;下游toChangelogStream(..., upsert())同理。
5. 事件时间与水印的桥接
常见三种写法:
- 基于字段生成行时间 + 策略
Table t = tEnv.fromDataStream(ds,Schema.newBuilder().columnByExpression("rowtime","CAST(event_time AS TIMESTAMP_LTZ(3))").watermark("rowtime","rowtime - INTERVAL '10' SECOND").build());
- 继承 DataStream 水印
Table t = tEnv.fromDataStream(ds,Schema.newBuilder().columnByMetadata("rowtime","TIMESTAMP_LTZ(3)").watermark("rowtime","SOURCE_WATERMARK()").build());
- Table → DataStream 时保留时间戳
toDataStream(table):单一 rowtime 会写入 DataStream 的 record timestamp,并继续传播水印。toChangelogStream(table, Schema.columnByMetadata("rowtime", ...)):也可把时间戳当 metadata 输出,不占物理列。
6. 批流一体:Batch Runtime Mode
有界数据(历史补数)可走 BATCH,同一套逻辑在 STREAMING 下也能跑:
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 切 Streaming:env.setRuntimeMode(RuntimeExecutionMode.STREAMING);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Batch 模式要点:
- 源必须有界(如 datagen
number-of-rows、Kafka 设定终止 offset)。 - 表源目前需 insert-only。
- 操作符可能采用 阻塞交换、禁用 checkpoint,最终只产出 insert-only 结果(增量被折叠)。
实战策略:先写 Streaming 版(最通用),再在有界场景切 BATCH 拿更优算子(如 sort-merge join)。
7. 配置与依赖(桥接模块)
依赖(Java):
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>与集群一致</version><scope>provided</scope>
</dependency>
配置顺序:尽量在创建 tEnv 之前统一设置 env。
env.setMaxParallelism(256);
env.getCheckpointConfig().setCheckpointingConsistencyMode(EXACTLY_ONCE);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Paris"));
提醒:Scala 隐式转换虽好用(
org.apache.flink.table.api.bridge.scala._),但易混淆执行边界;新项目优先 Java。
8. TypeInformation ↔ DataType 对照与坑
-
DataStream 的 TypeInformation → Table 的 DataType:自动推断,但遇到 RAW/Generic(如
Row、不可反射类型)需显式声明。 -
常见修复:
ds.map(...).returns(TypeInformation)- Table 侧
Schema.newBuilder().column("f0", DataTypes.of(MyPojo.class)) - 结构化显式声明:
DataTypes.STRUCTURED(...)。
结论:看 schema 打印。若出现
RAW('User', ...),多半需要显式类型或自定义序列化。
9. 在一个 Job 里拼接多条“表内管道”与 DataStream
StreamStatementSet set = tEnv.createStatementSet();// 纯 Table 管道
Table src = tEnv.from(TableDescriptor.forConnector("datagen").option("number-of-rows","3").schema(Schema.newBuilder().column("v", DataTypes.INT()).build()).build());
set.add(src.insertInto(TableDescriptor.forConnector("print").build()));// DataStream → Table → Sink
DataStream<Integer> ds = env.fromElements(1,2,3);
set.add(tEnv.fromDataStream(ds).insertInto(TableDescriptor.forConnector("print").build()));// 把所有表内管道“挂”到 DataStream Builder
set.attachAsDataStream();// 其他 DataStream 分支...
env.fromElements(4,5,6).sinkTo(new DiscardingSink<>());// 统一提交
env.execute();
10. 典型“融合”食谱(可直接套)
10.1 SQL 预处理 → 自定义低阶算子
CREATE TABLE读 Kafka,SQL 做脱敏/清洗;toDataStream()拿干净 Row;keyBy+KeyedProcessFunction做复杂会话/定时器/状态机;- 最终 sink(JDBC/Elasticsearch)。
10.2 DataStream 产流 → Table 做时间区间 Join → 回 DataStream
- DS 造两路(用户、订单)并补上水印;
createTemporaryView(..., Schema.watermark(...))注册为临时表;- SQL 做 interval join;
toDataStream()下游继续自定义聚合/风控规则。
BATCH/STREAMING 两种模式产出的最终表一致(批模式只会 insert)。
10.3 处理更新的聚合
- 用
toChangelogStream()承接 SUM/COUNT 等会更新的表; - 若目标是 KV 外部存储,优先 upsert 模式 + 主键,降低网络与落库写放大。
11. 上线前检查清单
边界与语义
-
toDataStream是否满足 append-only?否则改toChangelogStream。 - Rowtime/Watermark 是否按需 继承 Source 或 自行声明?
- Changelog 模式是否与下游 Sink 能力匹配(append / retract / upsert)?
类型与 Schema
- 避免
RAW/GenericTypeInfo:显式声明 DataType 或 TypeInformation。 - Sink 与结果表 列名/类型/可空性 一致,DDL 校验通过。
性能与稳定性
- 批模式(有界)是否满足前置条件;算子内存/离堆配置是否充足。
- 多 Sink 使用 StatementSet 合并优化,减少重复扫描与 shuffle。
- 监控:吞吐/延迟/背压/水印推进/RowKind 统计/状态大小。
执行与提交
- Table 子管道转 DataStream 后,记得最终
env.execute()。 - 纯 Table 闭环,使用
pipeline.execute()或executeSql();与 DS 无关。
12. 可改造示例:统一管道,批流同源
// 1) 选择运行模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // or BATCH
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 2) 两路 DS + 水印 → 注册为表
DataStream<Row> users = ...
DataStream<Row> orders = ...
tEnv.createTemporaryView("UserTable", users,Schema.newBuilder().column("ts", DataTypes.TIMESTAMP_LTZ(3)).column("uid", DataTypes.INT()).column("name", DataTypes.STRING()).watermark("ts", "ts - INTERVAL '1' SECOND").build());tEnv.createTemporaryView("OrderTable", orders,Schema.newBuilder().column("ts", DataTypes.TIMESTAMP_LTZ(3)).column("uid", DataTypes.INT()).column("amount", DataTypes.INT()).watermark("ts", "ts - INTERVAL '1' SECOND").build());// 3) SQL:时间区间 join(模式无关,结果一致)
Table joined = tEnv.sqlQuery("SELECT U.name, O.amount " +"FROM UserTable U, OrderTable O " +"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");// 4) 回到 DataStream,继续自定义处理
DataStream<Row> out = tEnv.toDataStream(joined);
out.keyBy(r -> r.<String>getFieldAs("name")).process(new MyDedup()).print();env.execute();
结语
-
把 Table 当入口与加速器,把 DataStream 当控制面与精细算子;二者自由互转,让你兼得开发效率与可控性能。
-
牢牢记住三件事:
- append-only → toDataStream;有更新 → toChangelogStream
- 时间与水印要么继承 Source,要么在 Schema 声明
- Table → DataStream 后,最终得
env.execute()才会跑
