FlinkSQL通解
参考文档
https://blog.csdn.net/be_racle/article/details/135921061?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522604e8b91e59f598cb3c69ae05c0628f7%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=604e8b91e59f598cb3c69ae05c0628f7&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allfirst_rank_ecpm_v1~rank_v31_ecpm-20-135921061-null-null.142v101pc_search_result_base6&utm_term=FlinkSQL&spm=1018.2226.3001.4187
1. 基本原理
2. 编码套路
使用Flink SQL时,我们通常会遵循如下编码套路,这些套路和使用Flink API的套路是一样的:
-
环境准备:初始化一个TableEnvironment对象,它是执行Flink SQL语句的核心。这个环境可以是流数据环境,也可以是批数据环境。
-
数据源定义:通过CREATE TABLE语句定义输入数据源(source),可以是Kafka、CSV文件等。
-
数据处理:编写SQL语句对数据进行处理,如查询、过滤、聚合等。
-
数据输出:通过CREATE TABLE定义输出数据源(sink),并将处理结果输出。
3、相关语法
Create关键字
Create语句用于向当前或指定的Catalog中注册库、表、视图或者函数,注册后的库、表、视图可以在后期的SQL查询中进行使用。
目前FlinkSQL中支持以下Create语句:
-
Create Table
-
Create DataBase
-
Create View
-
Create Function
官方定义
根据指定的表名创建一个表,如果同名表已经在CataLog中存在了,则无法创建。
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name({ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n][ <watermark_definition> ][ <table_constraint> ][ , ...n])[COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] ]<physical_column_definition>: -- 物理列定义column_name column_type [ <column_constraint> ] [COMMENT column_comment]<column_constraint>:[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED<table_constraint>:[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED<metadata_column_definition>:column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]<computed_column_definition>:column_name AS computed_column_expression [COMMENT column_comment]<watermark_definition>:WATERMARK FOR rowtime_column_name AS watermark_strategy_expression<source_table>:[catalog_name.][db_name.]table_name<like_options>:
{{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
列定义
(1) 常规列
常规列即物理列,定义了物理介质中存储的数据中字段的名称、类型与顺序。其他类型的列也可以在物理列之间进行声明,但是不会影响最终物理列的读取。
形式举例
CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING
) WITH (...
);
(2) 元数据列
元数据列是SQL标准的拓展,允许访问数据源本身具有的一些元数据,元数据列由METADATA关键字标识。
例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。
CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,-- 读取 kafka 本身自带的时间戳`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka'...
);-- 后期数据处理
INSERT INTO MyTable
SELECT user_id, name, record_time + INTERVAL '1' SECOND
FROM MyTable;-- 如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的。CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,-- 读取 kafka 本身自带的时间戳`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH ('connector' = 'kafka'...
);
需要注意的是,每种Connecor提供的METADATA字段不一样,需要参考https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。
默认情况下,FlinkSQL认为meta列是可以读取也可以写入的,但是有些外部存储系统的元数据只能用于读取不能写入。
可以选择使用VIRTUAL关键字来标识某个元数据列不写入外部存储中。
CREATE TABLE MyTable (-- sink 时会写入`timestamp` BIGINT METADATA,-- sink 时不写入`offset` BIGINT METADATA VIRTUAL,`user_id` BIGINT,`name` STRING,
) WITH ('connector' = 'kafka'...
);
这里在写入时需要注意,不要在 SQL 的 INSERT INTO 语句中写入 offset 列,否则 Flink SQL 任务会直接报错。
(3) 计算列
计算列在写建表的DDL时,可以拿已有的一些列经过一些自定义的运算生成的新列。
CREATE TABLE MyTable (`user_id` BIGINT,`price` DOUBLE,`quantity` DOUBLE,-- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity`cost` AS price * quanitity,
) WITH ('connector' = 'kafka'...
);
需要注意的是:如果只是简单的四则运算的话可以直接写在DML中就可以,但是计算列一些用于定义时间属性。把输入数据的时间格式标准化,处理时间、事件时间举例如下:
-
处理时间:使用PROCTIME()函数来定义处理时间列。
-
事件时间:事件时间的时间戳可以在声明WaterMark之前进行预处理
需要注意的是:和虚拟列类型,计算列只能读取不能写入。
WaterMark定义
WaterMark是在Create Table中进行定义的,具体SQL语法标准是:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
-
rowtime_column_name:表的事件时间属性字段。该列必须是 TIMESTAMP(3)、TIMESTAMP_LTZ(3) 类,这个时间可以是一个计算列。
-
watermark_strategy_expression:定义 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark。
注意:
如果你使用的是事件时间语义,那么必须要设设置事件时间属性和 WATERMARK 生成策略。
Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由 pipeline.auto-watermark-interval 进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。
FlinkSQL定义的WATERMARK生产策略
-
有界无序:设置方式:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。此类策略用于设置最大的乱序时间,
-
严格升序: 设置方式为:。一般基本不用这种方式,如果你能保证你的数据源的时间戳是严格升序的,认为时间戳只会越来越大,也不存在相等的情况,只有相等或者小于之前的,认为迟到的数据。
-
递增:设置方式为:。一般基本不用这种方式,如果设置此类,则允许tyou
With定义
在建表时,描述数据源、数据汇的具体外部存储的元数据信息。
一般的With的配置项由FlinkSQL的Connector来定义,每种Connector提供的With配置项都不同。内置连接器参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/
CREATE TABLE KafkaTable (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka', -- 外部存储的方式'topic' = 'user_behavior',-- 主题信息'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', -- 使用哪个组消费'scan.startup.mode' = 'earliest-offset',-- 消费方式'format' = 'csv'-- 在读入与写出时的序列化方式
)
Like定义
like字句是Create Table字句的一个延申。
CREATE TABLE Orders (`user` BIGINT,product STRING,order_time TIMESTAMP(3)
) WITH ( 'connector' = 'kafka','scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (-- 1. 添加了 WATERMARK 定义WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (-- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表
LIKE Orders;
4. 代码示例
FlinkSQL第一个例子
第一步:引用依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.17.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>1.17.0</version> <!-- 根据版本进行修改 -->
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.17.0</version> <!-- 根据版本进行修改 -->
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.17.0</version> <!-- Flink 1.17 版本 -->
</dependency>
第二步:编写代码
public static void main(String[] args) throws Exception {// 加载环境依赖StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings build = EnvironmentSettings.newInstance().inBatchMode() // 批处理模式.build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, build);// 定义数据源String createSourceTableDdl = "CREATE TABLE csv_source (" +" user_id INT," +" product STRING," +" order_amount DOUBLE" +") WITH (" +" 'connector' = 'filesystem'," +" 'path' = 'file:///E:/input.csv'," +" 'format' = 'csv'" +")";tableEnvironment.executeSql(createSourceTableDdl);/*String query = "SELECT user_id, SUM(order_amount) AS total_amount FROM csv_source GROUP BY user_id";tableEnvironment.executeSql(query).print();env.execute("flink SQL Demo");*/// 定义输出源String createSinkTableDdl = "CREATE TABLE csv_sink (" +" user_id INT," +" total_amount DOUBLE" +") WITH (" +" 'connector' = 'filesystem'," +" 'path' = 'file:///E:/output.csv'," +" 'format' = 'csv'" +")";tableEnvironment.executeSql(createSinkTableDdl);// 将执行结果输入到sink中String query = "INSERT INTO csv_sink " +"SELECT user_id, SUM(order_amount) as total_amount " +"FROM csv_source " +"GROUP BY user_id";tableEnvironment.executeSql(query).print();//env.execute();
}
附录
ProcTime函数
是指Flink算子执行具体操作时的机器系统时间,通常以毫秒为单位。
Watermark机制
概念
Flink SQL中的Watermark用于处理流数据中的时间相关问题,特别是在无界流中,时间的延迟、乱序以及处理的顺序会对结果产生影响。Watermark的主要作用是用来标记事件的最大时间戳,并且帮助Flink确定何时可以触发事件时间上的窗口计算。
具体用时间窗口操作:Watermark可以帮助触发窗口操作的计算。当一个事件的时间戳超过了当前Watermark时,窗口计算会被触发。
-
乱序事件处理:当事件发生顺序与事件流中的实际顺序不一致时(即乱序事件),Watermark可以帮助Flink判断何时可以安全地处理和计算事件。
-
延迟处理:Watermark允许Flink推迟处理窗口,直到某些延迟的事件到达,并且不丢失它们。
具体例子
假设我们有一个流数据系统,记录用户的购买事件,并且我们想按时间窗口统计每个用户在某个时间段内的购买次数。每个事件有一个时间戳,表示事件发生的时间,但由于网络延迟或事件乱序,事件的到达时间可能不同于实际发生的时间。
CREATE TABLE purchases (user_id STRING,amount DECIMAL(10, 2),event_time TIMESTAMP(3),WATERMARK FOR event_time time - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'user_purchases','properties.bootstrap.servers' = 'localhost:9092','format' = 'json'
);-- 每10秒的滚动窗口,统计用户购买次数
SELECT user_id, COUNT(*) AS purchase_count
FROM purchases
GROUP BY user_id, TUMBLE(event_time, INTERVAL '10' SECOND);-- WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND:定义了一个Watermark策略,表示Watermark会比事件的时间戳晚5秒,以处理可能出现的延迟事件。
-- TUMBLE(event_time, INTERVAL '10' SECOND):对event_time进行每10秒的滚动窗口计算。
-- 当Flink接收到某个事件时,它会根据事件的时间戳来更新Watermark。Watermark值为事件时间戳减去5秒,这样可以处理一定时间内乱序到达的事件,确保计算窗口时不会遗漏迟到的事件。