当前位置: 首页 > wzjs >正文

做网站怎么样引流网站建设 昆明邦凯网络

做网站怎么样引流,网站建设 昆明邦凯网络,游戏平台网站开发,html5响应式网站psdApache Flink 提供了强大的 Table API 和 SQL 接口,用于统一处理批数据和流数据。它们为开发者提供了类 SQL 的编程方式,简化了复杂的数据处理逻辑,并支持与外部系统集成。 🧩 一、Flink Table & SQL 核心概念 概念描述Table…

Apache Flink 提供了强大的 Table API 和 SQL 接口,用于统一处理批数据和流数据。它们为开发者提供了类 SQL 的编程方式,简化了复杂的数据处理逻辑,并支持与外部系统集成。


🧩 一、Flink Table & SQL 核心概念

概念描述
Table API基于 Java/Scala 的 DSL,提供类型安全的操作接口
Flink SQL支持标准 ANSI SQL 语法的查询语言
DataStream / DataSet ↔ Table可以在 DataStream 或 Table 之间互相转换
Catalog元数据管理器,如 Hive Catalog、Memory Catalog
TableEnvironment管理表、SQL 执行环境的核心类
Connectors支持 Kafka、Hive、MySQL、文件等数据源接入
Time Attributes定义事件时间(Event Time)、处理时间(Processing Time)
Windowing支持滚动窗口、滑动窗口、会话窗口等

💻 二、Flink Table API 和 SQL 的优势

特性描述
统一接口同一套代码可运行在 Batch 和 Streaming 场景下
高性能底层使用 Apache Calcite 进行优化,自动进行查询优化
易用性强对熟悉 SQL 的用户非常友好
生态集成好支持 Kafka、Hive、JDBC、Elasticsearch 等多种数据源
状态管理在流式场景中自动管理状态和窗口逻辑

📦 三、核心组件说明

1. TableEnvironment

  • 是操作 Table 和 SQL 的入口
  • 负责注册表、执行查询、管理元数据等
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

2. DataStream ↔ Table 转换

示例:DataStream 转 Table
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 2));// 将 DataStream 转换为 Table
Table table = tEnv.fromDataStream(dataStream);// 注册为临时表
tEnv.createTemporaryView("myTable", dataStream);
示例:Table 转 DataStream
Table resultTable = tEnv.sqlQuery("SELECT * FROM myTable WHERE f1 > 1");
DataStream<Row> resultStream = tEnv.toDataStream(resultTable);

3. Flink SQL 查询

示例:使用 SQL 查询统计结果
// 创建临时表
tEnv.executeSql("CREATE TABLE MyKafkaSource (" +"  user STRING," +"  url STRING," +"  ts BIGINT" +") WITH (" +"  'connector' = 'kafka'," +"  'format' = 'json'" +")"
);// 执行 SQL 查询
Table result = tEnv.sqlQuery("SELECT user, COUNT(*) AS cnt FROM MyKafkaSource GROUP BY user");// 转换为 DataStream 并输出
tEnv.toDataStream(result).print();env.execute();

🧪 四、Java 示例:完整的 Table API + SQL 使用案例

✅ 功能:

从 Kafka 读取日志数据,按用户分组统计访问次数

📁 依赖建议(pom.xml)

<dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- Flink Streaming --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- Kafka Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><!-- JSON Format --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.1</version></dependency>
</dependencies>

🧱 五、完整 Java 示例代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableAndSQLEntry {public static void main(String[] args) throws Exception {// 1. 初始化流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建 TableEnvironmentStreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 3. 创建 Kafka Source 表(模拟从 Kafka 读取日志)tEnv.executeSql("CREATE TABLE KafkaLog (" +"  user STRING," +"  url STRING," +"  ts BIGINT" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'user_log'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'properties.group.id' = 'flink-sql-group'," +"  'format' = 'json'" +")");// 4. 创建 Sink 表(控制台输出)tEnv.executeSql("CREATE TABLE ConsoleSink (" +"  user STRING," +"  cnt BIGINT" +") WITH (" +"  'connector' = 'print'" +")");// 5. 使用 SQL 编写业务逻辑tEnv.executeSql("INSERT INTO ConsoleSink " +"SELECT user, COUNT(*) AS cnt " +"FROM KafkaLog " +"GROUP BY user");}
}

📊 六、SQL 查询示例汇总

SQL 示例描述
SELECT * FROM table查询所有字段
SELECT user, COUNT(*) FROM table GROUP BY user分组聚合
SELECT * FROM table WHERE ts > 1000条件过滤
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) ...时间窗口聚合
SELECT * FROM LATERAL TABLE(udtf(col))使用 UDTF
CREATE VIEW view_name AS SELECT ...创建视图
INSERT INTO sink_table SELECT ...写入到目标表

⏱️ 七、时间属性与窗口聚合

示例:定义事件时间并使用滚动窗口

-- 定义带有事件时间的表
CREATE TABLE EventTable (user STRING,url STRING,ts BIGINT,WATERMARK FOR ts AS ts - 1000 -- 定义水印
) WITH (...);-- 使用滚动窗口进行统计
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND) AS window_end,user,COUNT(*) AS cnt
FROM EventTable
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), user;

📁 八、连接器(Connector)配置示例

1. Kafka Source

CREATE TABLE KafkaSource (user STRING,url STRING,ts BIGINT
) WITH ('connector' = 'kafka','topic' = 'input-topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-group','format' = 'json'
);

2. MySQL Sink

CREATE TABLE MysqlSink (user STRING,cnt BIGINT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydb','table-name' = 'user_access_log'
);

📈 九、Flink SQL + Table API 的典型应用场景

场景示例
实时 ETL从 Kafka 读取数据 → 清洗 → 写入 HDFS
流式分析统计每分钟点击量、异常检测
数据质量监控判断字段是否为空、格式是否合法
风控规则引擎使用 CEP 检测异常行为
数仓建模构建 DWD、DWS 层表结构

🧠 十、Table API vs SQL

特性Table APISQL
语法风格函数式链式调用类 SQL 语法
易用性对 Java 开发者更友好对 SQL 用户更友好
动态解析不适合动态 SQL支持字符串拼接、模板引擎
性能一致(底层都是 Calcite)一致
支持功能大部分 SQL 功能都有对应 API支持完整 SQL 语法
调试难度相对较难调试更直观、便于调试

✅ 十一、总结

技术点描述
Table API基于 Java/Scala 的函数式 API
Flink SQL支持 ANSI SQL,易于上手
TableEnvironment管理表和 SQL 的核心类
Connectors支持 Kafka、Hive、JDBC、File、Print 等
Time Attributes支持事件时间、处理时间
Windowing支持滚动、滑动、会话窗口
State Backend支持 RocksDB、FS、Memory 状态后端

🧩 十二、扩展学习方向

如果你希望我为你演示以下内容,请继续提问:

  • 自定义函数(UDF、UDAF、UDTF)
  • Kafka + MySQL 实时同步方案
  • 基于 Hive 的批处理 SQL 作业
  • 使用 PyFlink 实现 SQL 作业
  • 使用 WITH 子句定义临时表
  • 使用 LATERAL TABLE 调用 UDTF
  • 使用 MATCH_RECOGNIZE 实现 CEP 模式匹配

📌 一句话总结:

Flink Table API 和 SQL 提供了一种统一的批流一体编程模型,适合数据仓库、实时分析、ETL、风控等多种大数据处理场景。


文章转载自:

http://fVm9TXLL.LjxxL.cn
http://kmoWzBax.LjxxL.cn
http://vYvLVJTY.LjxxL.cn
http://VymMSyOe.LjxxL.cn
http://x2HKoqTb.LjxxL.cn
http://ib5Lxk17.LjxxL.cn
http://xghLs20o.LjxxL.cn
http://GqIA9AFM.LjxxL.cn
http://TPBEhgAq.LjxxL.cn
http://XaZCd7kw.LjxxL.cn
http://eNsrWxSj.LjxxL.cn
http://JOA9WoMW.LjxxL.cn
http://B8IeISZj.LjxxL.cn
http://0AZnbOVz.LjxxL.cn
http://oPrVUAC0.LjxxL.cn
http://woVVBKrx.LjxxL.cn
http://lHXSKGn0.LjxxL.cn
http://hPVfu3Od.LjxxL.cn
http://Bz5Kyfnp.LjxxL.cn
http://uoPA63Rs.LjxxL.cn
http://9QAh3WBB.LjxxL.cn
http://d3R1BjlH.LjxxL.cn
http://JpcGp25X.LjxxL.cn
http://w2weTdqZ.LjxxL.cn
http://XFG0TAMk.LjxxL.cn
http://ALH2kxGq.LjxxL.cn
http://6K3T4Wwv.LjxxL.cn
http://Jdq38Xnk.LjxxL.cn
http://lq19aamo.LjxxL.cn
http://xFnZiYij.LjxxL.cn
http://www.dtcms.com/wzjs/631547.html

相关文章:

  • 广州中新知识城开发建设网站第八章 电子商务网站建设课件
  • 公司网站怎么做推广网站备案收录下降
  • 电子商务网站建设与管理的理解摄影设计说明200字
  • 湛江网站建设费用模仿淘宝网站
  • 线在科技成都网站推广公司郴州网络营销推广
  • 网站开发中效率较高的编程语言seo点击软件手机
  • 如何与网站管理员联系合肥市建设网站
  • 制作网站的技术wordpress商业主题
  • 先做个在线电影网站该怎么做做超链接网站的代码
  • 建设网站时候应该注意哪些同城信息小程序源码
  • 北京网站开发设计杭州seook优屏网络
  • 网站建设培训内容公司做网站比较好的平台
  • 讨债公司 做网站企业网站免费建设
  • 常熟做网站优化百度上如何做企业网站
  • 免费绘画素材网站个人免费网站怎么建设
  • wap网站空间重庆横幅制作
  • 一个网站做两个语言模板可以吗外贸响应式网站建设
  • 分析企业营销型网站建设的可能性知名网络公司
  • 阿里巴巴如何做网站wordpress页面内容设计
  • 建筑工程网上申报南沙seo培训
  • 汕头企业自助建站七牛上传wordpress
  • 深圳宝安网站推广电脑禁止访问网站设置
  • 备案 网站名称怎么写新公司怎么做网络推广
  • 自己做网站制作教程安卓手机做网站服务器
  • 网站开发所需要的语言wordpress能生成静态文件下载
  • 做 在线观看免费网站有哪些企划做网站
  • 做暧嗳网站o2o电子商务网站开发与运营
  • 没有网站也可以做外贸吗过年做那些网站能致富
  • 网站空间不续费赣州做网站jx25
  • wordpress首页随机推荐搜索引擎优化实训心得