当前位置: 首页 > news >正文

Flink DataStream × Table API 融合双向转换、变更流、批流一体与执行模型

1. 什么时候需要“混用”?

  1. 先用 Table 生态(Catalog、Connector、SQL 函数)做取数/清洗,再回到 DataStream 写低阶算子(自定义定时器、精细状态)。
  2. 某些 无状态归一化(大小写、正则清洗、简单投影)交给 SQL;复杂业务/乱序处理交给 DataStream。
  3. 历史补数(有界流)走 Batch 模式;实时(无界流)走 Streaming 模式,但同一套管道尽量复用。

跨 API 会有轻微结构转换开销(RowData ↔ Row)。绝大多数场景可以忽略,但对极致延迟敏感需关注。

2. 环境初始化与“谁来执行”

// 统一构建:先 DataStream,再桥接 Table
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  • DataStream 执行env.execute() 提交整套已构建的管道。
  • Table 执行TablePipeline.execute() / executeSql() / StatementSet.execute() 只提交“表内 source→sink”闭环。
  • 混合tEnv.toDataStream(...)toChangelogStream(...)物化其 Table 子管道并挂入 DataStream Builder;必须再 env.execute() 才会跑。

3. 基础:DataStream ↔ Table(Insert-Only)

3.1 DataStream → Table(插入流)

DataStream<String> ds = env.fromElements("Alice","Bob","John");// 自动派生列:f0
Table t = tEnv.fromDataStream(ds);// 注册成视图,用 SQL 继续处理
tEnv.createTemporaryView("InputTable", t);
Table upper = tEnv.sqlQuery("SELECT UPPER(f0) AS name FROM InputTable");

3.2 Table → DataStream(插入流)

// Row(默认)
DataStream<Row> out = tEnv.toDataStream(upper);
out.print();
env.execute();
// 输出示例:+I[ALICE]、+I[BOB]...

注意toDataStream 仅支持 append-only 表(只 INSERT)。若含聚合/窗口等会产生更新,需改用 toChangelogStream

4. 高阶:Changelog(变更流)处理

4.1 带更新的 Table → 变更流

// name, SUM(score) 会产生更新
Table result = tEnv.sqlQuery("SELECT name, SUM(score) AS s FROM InputTable GROUP BY name");// 变更流:RowKind 标记 +I/-U/+U/-D
DataStream<Row> changelog = tEnv.toChangelogStream(result);

4.2 变更流 → Table

DataStream<Row> cdc = env.fromElements(Row.ofKind(RowKind.INSERT, "Alice", 12),Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));Table t = tEnv.fromChangelogStream(cdc); // 默认接受全量 RowKind

优化:若上游保证 upsert 语义,可 fromChangelogStream(..., schema.withPrimaryKey(...), ChangelogMode.upsert()),减少 50% 的 UPDATE_BEFORE 流量;下游 toChangelogStream(..., upsert()) 同理。

5. 事件时间与水印的桥接

常见三种写法:

  1. 基于字段生成行时间 + 策略
Table t = tEnv.fromDataStream(ds,Schema.newBuilder().columnByExpression("rowtime","CAST(event_time AS TIMESTAMP_LTZ(3))").watermark("rowtime","rowtime - INTERVAL '10' SECOND").build());
  1. 继承 DataStream 水印
Table t = tEnv.fromDataStream(ds,Schema.newBuilder().columnByMetadata("rowtime","TIMESTAMP_LTZ(3)").watermark("rowtime","SOURCE_WATERMARK()").build());
  1. Table → DataStream 时保留时间戳
  • toDataStream(table):单一 rowtime 会写入 DataStream 的 record timestamp,并继续传播水印
  • toChangelogStream(table, Schema.columnByMetadata("rowtime", ...)):也可把时间戳当 metadata 输出,不占物理列。

6. 批流一体:Batch Runtime Mode

有界数据(历史补数)可走 BATCH,同一套逻辑在 STREAMING 下也能跑:

env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 切 Streaming:env.setRuntimeMode(RuntimeExecutionMode.STREAMING);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Batch 模式要点

  • 源必须有界(如 datagen number-of-rows、Kafka 设定终止 offset)。
  • 表源目前需 insert-only
  • 操作符可能采用 阻塞交换、禁用 checkpoint,最终只产出 insert-only 结果(增量被折叠)。

实战策略:先写 Streaming 版(最通用),再在有界场景切 BATCH 拿更优算子(如 sort-merge join)。

7. 配置与依赖(桥接模块)

依赖(Java)

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>与集群一致</version><scope>provided</scope>
</dependency>

配置顺序尽量在创建 tEnv 之前统一设置 env

env.setMaxParallelism(256);
env.getCheckpointConfig().setCheckpointingConsistencyMode(EXACTLY_ONCE);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Paris"));

提醒:Scala 隐式转换虽好用(org.apache.flink.table.api.bridge.scala._),但易混淆执行边界;新项目优先 Java

8. TypeInformation ↔ DataType 对照与坑

  • DataStream 的 TypeInformation → Table 的 DataType:自动推断,但遇到 RAW/Generic(如 Row、不可反射类型)需显式声明

  • 常见修复:

    • ds.map(...).returns(TypeInformation)
    • Table 侧 Schema.newBuilder().column("f0", DataTypes.of(MyPojo.class))
    • 结构化显式声明:DataTypes.STRUCTURED(...)

结论:看 schema 打印。若出现 RAW('User', ...),多半需要显式类型或自定义序列化。

9. 在一个 Job 里拼接多条“表内管道”与 DataStream

StreamStatementSet set = tEnv.createStatementSet();// 纯 Table 管道
Table src = tEnv.from(TableDescriptor.forConnector("datagen").option("number-of-rows","3").schema(Schema.newBuilder().column("v", DataTypes.INT()).build()).build());
set.add(src.insertInto(TableDescriptor.forConnector("print").build()));// DataStream → Table → Sink
DataStream<Integer> ds = env.fromElements(1,2,3);
set.add(tEnv.fromDataStream(ds).insertInto(TableDescriptor.forConnector("print").build()));// 把所有表内管道“挂”到 DataStream Builder
set.attachAsDataStream();// 其他 DataStream 分支...
env.fromElements(4,5,6).sinkTo(new DiscardingSink<>());// 统一提交
env.execute();

10. 典型“融合”食谱(可直接套)

10.1 SQL 预处理 → 自定义低阶算子

  1. CREATE TABLE 读 Kafka,SQL 做脱敏/清洗;
  2. toDataStream() 拿干净 Row;
  3. keyBy + KeyedProcessFunction 做复杂会话/定时器/状态机;
  4. 最终 sink(JDBC/Elasticsearch)。

10.2 DataStream 产流 → Table 做时间区间 Join → 回 DataStream

  1. DS 造两路(用户、订单)并补上水印;
  2. createTemporaryView(..., Schema.watermark(...)) 注册为临时表;
  3. SQL 做 interval join
  4. toDataStream() 下游继续自定义聚合/风控规则。

BATCH/STREAMING 两种模式产出的最终表一致(批模式只会 insert)。

10.3 处理更新的聚合

  • toChangelogStream() 承接 SUM/COUNT 等会更新的表;
  • 若目标是 KV 外部存储,优先 upsert 模式 + 主键,降低网络与落库写放大。

11. 上线前检查清单

边界与语义

  • toDataStream 是否满足 append-only?否则改 toChangelogStream
  • Rowtime/Watermark 是否按需 继承 Source自行声明
  • Changelog 模式是否与下游 Sink 能力匹配(append / retract / upsert)?

类型与 Schema

  • 避免 RAW/GenericTypeInfo:显式声明 DataType 或 TypeInformation。
  • Sink 与结果表 列名/类型/可空性 一致,DDL 校验通过。

性能与稳定性

  • 批模式(有界)是否满足前置条件;算子内存/离堆配置是否充足。
  • 多 Sink 使用 StatementSet 合并优化,减少重复扫描与 shuffle。
  • 监控:吞吐/延迟/背压/水印推进/RowKind 统计/状态大小

执行与提交

  • Table 子管道转 DataStream 后,记得最终 env.execute()
  • 纯 Table 闭环,使用 pipeline.execute()executeSql();与 DS 无关。

12. 可改造示例:统一管道,批流同源

// 1) 选择运行模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); // or BATCH
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 2) 两路 DS + 水印 → 注册为表
DataStream<Row> users = ...
DataStream<Row> orders = ...
tEnv.createTemporaryView("UserTable", users,Schema.newBuilder().column("ts", DataTypes.TIMESTAMP_LTZ(3)).column("uid", DataTypes.INT()).column("name", DataTypes.STRING()).watermark("ts", "ts - INTERVAL '1' SECOND").build());tEnv.createTemporaryView("OrderTable", orders,Schema.newBuilder().column("ts", DataTypes.TIMESTAMP_LTZ(3)).column("uid", DataTypes.INT()).column("amount", DataTypes.INT()).watermark("ts", "ts - INTERVAL '1' SECOND").build());// 3) SQL:时间区间 join(模式无关,结果一致)
Table joined = tEnv.sqlQuery("SELECT U.name, O.amount " +"FROM UserTable U, OrderTable O " +"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES");// 4) 回到 DataStream,继续自定义处理
DataStream<Row> out = tEnv.toDataStream(joined);
out.keyBy(r -> r.<String>getFieldAs("name")).process(new MyDedup()).print();env.execute();

结语

  • 把 Table 当入口与加速器,把 DataStream 当控制面与精细算子;二者自由互转,让你兼得开发效率可控性能

  • 牢牢记住三件事:

    1. append-only → toDataStream;有更新 → toChangelogStream
    2. 时间与水印要么继承 Source,要么在 Schema 声明
    3. Table → DataStream 后,最终得 env.execute() 才会跑
http://www.dtcms.com/a/572940.html

相关文章:

  • 汽车配件 AI 系统:重构汽车配件管理与多语言内容生成新范式
  • 使用Requests和加密技术实现淘宝药品信息爬取
  • 分享|智能决策,精准增长:企业数据挖掘关键策略与应用全景
  • (Azure)PGSQL和redis 连通性测试 --code 备份
  • 重构增长:生成式AI如何将CRM打造为企业的销售大脑
  • 唯品会一家做特卖的网站 分析陕西印象信息技术有限公司
  • Scala与Spark算子:大数据处理的黄金搭档
  • mac Android Studio配置adb环境(使用adb报错 adb: command not found)
  • C语言应用实例:学生管理系统1(指针、结构体综合应用,动态内存分配)
  • 找制作网站公司网页制作教程 赵丰年 pdf
  • ffplay 嵌入
  • TDengine 产品组件 taosX
  • 链表相关的算法题(2)
  • 10月谷歌新政 | 涉及真金游戏、约会社交、个人贷款、医疗健康等类别App
  • python实现语音转文本STT
  • 十大免费建站app做网站公司不给源码
  • 07.docker介绍与常用命令
  • 【Docker下部署高可用】StarRocks 存算一体架构高可用部署要点
  • 小型工厂怎么找外贸客户?
  • 【Android】正式打包发布
  • 寻找做网站的合作伙伴北京北京网址建设
  • PyTorch2 Python深度学习 - 模型保存与加载
  • 南京html5网站建设今天发生的重大新闻5条
  • 台州网站排名优化公司中国石油第一建设公司官网
  • JS原型和原型链
  • Rust 赋能图片批量处理:从 ImageKit 实现到行业前沿优化实践
  • ceph osd down排查
  • Android 14 系统启动流程深度解析:内置SD卡挂载流程
  • 【Qt】大数据量表格刷新优化--只刷新可见区域
  • 基于 React 的倒计时组件实现:暴露方法供父组件状态管理