当前位置: 首页 > news >正文

装饰网站建设网网站建设 学校

装饰网站建设网,网站建设 学校,wordpress 微信支付插件下载,google搜索Flink Table API 编程入门实践 前言 Apache Flink 是目前大数据实时计算领域的明星产品,Flink Table API 则为开发者提供了声明式、类似 SQL 的数据处理能力,兼具 SQL 的易用性与编程 API 的灵活性。本文将带你快速了解 Flink Table API 的基本用法&am…

Flink Table API 编程入门实践

前言

Apache Flink 是目前大数据实时计算领域的明星产品,Flink Table API 则为开发者提供了声明式、类似 SQL 的数据处理能力,兼具 SQL 的易用性与编程 API 的灵活性。本文将带你快速了解 Flink Table API 的基本用法,并通过代码示例帮助你快速上手。


一、环境准备

在 Flink 中,所有 Table API 操作都需要基于 TableEnvironment。对于流处理场景,我们一般这样创建环境:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

二、数据源定义

Table API 支持多种数据源。最常见的两种方式为:

1. 从 DataStream 创建 Table

DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("Alice", 12),new MyPojo("Bob", 10)
);
Table table = tableEnv.fromDataStream(dataStream);

2. 从外部系统注册 Table

比如从 Kafka 注册一张表:

tableEnv.executeSql("CREATE TABLE user_orders (" +" user_id STRING, " +" order_amount DOUBLE " +") WITH (" +" 'connector' = 'kafka', " +" 'topic' = 'orders', " +" 'properties.bootstrap.servers' = 'localhost:9092', " +" 'format' = 'json'" +")"
);

三、Table API 常见操作

Table API 提供了丰富的数据处理能力,如筛选、聚合、分组、连接等。例如:

import static org.apache.flink.table.api.Expressions.$;// 筛选和选择字段
Table result = table.filter($("age").isGreater(10)).select($("name"), $("age"));// 分组聚合
Table agg = table.groupBy($("name")).select($("name"), $("age").avg().as("avg_age"));

四、结果输出

将 Table 转换为 DataStream,方便后续处理或输出:

DataStream<Row> resultStream = tableEnv.toDataStream(result);
resultStream.print();

五、与 SQL API 结合

Table API 与 SQL API 可以无缝结合。例如:

Table sqlResult = tableEnv.sqlQuery("SELECT name, AVG(age) as avg_age FROM my_table GROUP BY name"
);

六、完整示例

下面是一个完整的 Flink Table API 示例,演示数据流到 Table 的转换、聚合与结果输出:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class TableApiDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 创建数据流DataStream<MyPojo> dataStream = env.fromElements(new MyPojo("Alice", 12),new MyPojo("Bob", 10),new MyPojo("Alice", 15));// 转换为 TableTable table = tableEnv.fromDataStream(dataStream);// Table API 查询Table result = table.groupBy($("name")).select($("name"), $("age").avg().as("avg_age"));// 输出结果DataStream<Row> resultStream = tableEnv.toDataStream(result);resultStream.print();env.execute();}public static class MyPojo {public String name;public Integer age;public MyPojo() {}public MyPojo(String name, Integer age) {this.name = name;this.age = age;}}
}

七、常见问题与建议

  • 字段名区分大小写,需与数据结构一致。
  • Table API 与 SQL API 可混用,灵活应对不同场景。
  • 生产环境推荐结合 Catalog 管理元数据。
  • Flink 1.14 以后批流统一,建议优先采用流模式开发。

结语

Flink Table API 极大地提升了大数据实时处理的开发效率,结合 SQL 的易用性和 API 的灵活性,非常适合复杂业务场景的数据处理。希望本文能帮你快速入门 Flink Table API,后续还可以深入了解窗口聚合、UDF、自定义 Connector 等高级特性。

如果你在学习和实践中遇到问题,欢迎留言交流!

http://www.dtcms.com/a/454631.html

相关文章:

  • 龙中龙网站开发网站制作对公司的作用
  • 在线推广企业网站的方法是wordpress 文章摘取
  • 建站行业发展苏州高端模板建站
  • [人工智能-综述-25]:如何利用AI克服老龄增长带来的不足,让自己的能力具有持续的竞争力?
  • 做网站用html5seo优化销售话术
  • 栈和队列的相关经典题目
  • 2018年做淘宝客网站还能挣钱吗6wordpress所有人提问
  • 【无标题】高校信息化
  • python做网站源码做视频网站用什么好
  • 广州 网站定制定制开发电商网站建设公司
  • app优化枫林seo工具
  • AssemblyScript 入门教程(6):process全局对象
  • 兴义市住房城乡建设局网站温州建设监理协会网站
  • 网站什么内容网站备案前置审批文件
  • FastDFS分布式文件系统
  • 动态规划 - 背包问题
  • 科耐美安维可三文鱼焕颜精华液问世:妆字号无创水光引领护肤新趋势
  • dede减肥网站源码酒店建筑设计网站
  • 网站模板紫色网站做任务包括什么
  • 双等位基因:遗传学中的核心概念、分子机制与跨领域应用解析--随笔13
  • 广丰区建设局网站什么软件可以免费引流
  • 个人网站可以做信息网站吗专业网站建设办公
  • 百度云域名买了之后建设网站免费网站空间可访问
  • 制作动画的网站模板如何用运行打开wordpress
  • 北京模板网站开发北京注册公司规定
  • 江苏省住房和城乡建设局网站首页磁业 东莞网站建设
  • 建设校园网站的好处专业团队介绍文案
  • 儿童早教网站模板建设网站要那些
  • 淘宝客如何做自己的网站网架加工费多少钱一吨
  • 什么做婚车网站最大wordpress 点击弹出层