【Flink实战】升级HDFS,对Flink SQL(On Yarn模式)的功能兼容性验证
升级HDFS,对Flink SQL(On Yarn模式)的兼容性验证内容
文章目录
- 升级HDFS,对Flink SQL(On Yarn模式)的兼容性验证内容
- 背景
- Table API & SQL
- 验证内容
- CREATE 语句
- ALTER 语句
- DESCRIBE 语句
- USE 语句
- SHOW 语句
- INSERT 语句
- UPDATE 语句
- ANALYZE 语句
- SELECT 语句
- SQL Hints
- EXPLAIN 语句
- LOAD 语句 不测
- UNLOAD 语句 不测
- SET 语句
- RESET 语句
- JAR 语句
- Job 语句
- TRUNCATE 语句
- DELETE 语句
- DROP 语句
- CALL 语句
- DataStream API
- 验证内容
- State Backends
- 使用状态
- 算子
- 数据流转换
- 物理分区
- 算子链和资源组
- 附录一
背景
以Flink1.19 为例子
验证在升级HDFS后,Flink在本身运行,以及涉及读写HDFS任务功能的兼容性。
Table API & SQL
执行方式
- TableEnvironment
- SQL CLI
两种planner
- Blink planner
- Old planner
Connector使用filesystem,path使用hdfs协议,指向HDFS的地址。
模式
-
批模式
SET 'execution.runtime-mode' = 'batch'; -
流模式
SET 'execution.runtime-mode' = 'STREAMING'; -
批模式/流模式自动切换
SET 'execution.runtime-mode' = 'AUTOMATIC';
验证内容
SET 'execution.runtime-mode' = 'batch';
set execution.target=yarn-per-job;
set high-availability.storageDir='hdfs:///flinkcp/bigdata/debug/ha';
SET 'sql-client.execution.result-mode' = 'tableau';CREATE TABLE students001 (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');CREATE TEMPORARY TABLE students (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');CREATE TABLE orders(userid BIGINT, product STRING) PARTITIONED BY (userid) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file03', 'format' = 'json');
CREATE 语句
-
CREATE CATALOG
WITH ('type' = 'generic_in_memory')CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/home/bigdata/software/flink-1.19.0.0/conf');use CATALOG myhive; -
CREATE DATABASE 不测
-
CREATE TABLE
-
创建临时表
CREATE TEMPORARY TABLE students (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json'); -
创建分区表
CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json'); -
创建非分区表
CREATE TABLE orders(userid BIGINT, product STRING) PARTITIONED BY (userid) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file03', 'format' = 'json');
-
-
[CREATE OR] REPLACE TABLE 不测
-
暂不支持替换临时表。
-
暂不支持指定列信息。
-
暂不支持指定 Watermark。
-
暂不支持创建分区表。
-
暂不支持主键约束。
-
-
CREATE VIEW 不测
-
CREATE FUNCTION 不测
/home/bigdata/software/flink-1.19.0.0/bin/flink run -m yarn-cluster -yt /home/bigdata/software/flink-1.19.0.0/examples/streaming/WordCount.jar
/home/bigdata/software/flink-1.19.0.0/bin/flink run -m yarn-cluster -p 2 -ynm debug1234 -yjm 4096 -ytm 4096 -ys 1 -yd -yD highavailability.zookeeper.path.namespace=/hsj/debug1234 -yD sn.system.code=bigdata -yD state.backend=filesystem -yD checkpoint.interval=60000 -yD checkpoint.mode=exactly-once -yD high-availability.storageDir=hdfs:///flinkcp/bigdata/debug/ha -yD state.checkpoints.dir=hdfs:///flinkcp/bigdata/debug/checkpoints -yD state.savepoints.dir=hdfs:///flinkcp/bigdata/debug/savepoints /home/bigdata/software/flink-1.19.0.0/examples/streaming/WordCount.jar
举例
CREATE TABLE dfs_source_table ( `log` STRING ) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://user/bigdata/flink/flink1.11/source_file00', 'format' = 'raw','source.monitor-interval' = '3');CREATE TABLE students001 (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');INSERT INTO students001 VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);BEGIN;insert into dfs_source_table values('1a');COMMIT;EXECUTE STATEMENT SET
BEGIN
INSERT INTO hdfs_source_table VALUES ('fred flintstone'), ('barney rubble');
END;CREATE TABLE hdfs_source_table (`log` STRING,`dt` STRING, -- 分区字段,天`hour` STRING -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'hdfs://user/bigdata/flink/flink1.11/data/source_file01','format' = 'raw'
);CREATE TABLE hdfs_sink_table (`log` STRING,`dt` STRING, -- 分区字段,天`hour` STRING -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'hdfs://user/bigdata/flink/flink1.11/data/sink_file01','format' = 'raw'
);
ALTER 语句
-
ALTER TABLE
-
重命名表名
ALTER TABLE students001 RENAME TO students; -
修改表的属性,官方示例没有说可以修改字段,可以验证一下。
ALTER TABLE students RENAME gpa TO gpa1;
-
-
ALTER DATABASE 不测
- 修改库的属性
-
ALTER FUNCTION 不测
- 修改函数的identifier 或者 language tag
-
ALTER VIEW 不测
- 修改视图名称
- 修改视图查询子句
DESCRIBE 语句
-
DESCRIBE TABLE
desc students; -
DESCRIBE VIEW 不测
USE 语句
-
USE catalog
use catalog catalog_test; -
USE database
-
USE MODULES 不测
modlues?什么版本开始
SHOW 语句
-
SHOW CATALOGS
SHOW CATALOGS; -
SHOW DATABASES
SHOW DATABASES; -
SHOW TABLES
SHOW TABLES; -
SHOW VIEWS
SHOW VIEWS; -
SHOW FUNCTIONS
SHOW FUNCTIONS; -
SHOW CURRENT CATALOG
SHOW CURRENT CATALOG; -
SHOW CURRENT DATABASE
SHOW CURRENT DATABASE; -
SHOW CREATE TABLE
SHOW CREATE TABLE students; -
SHOW COLUMNS
show columns from students; -
SHOW PARTITIONS
show partitions orders; -
SHOW PROCEDURES 异常
SHOW PROCEDURES;[ERROR] Could not execute SQL statement. Reason: java.lang.UnsupportedOperationException: listProcedures is not implemented for class org.apache.flink.table.catalog.GenericInMemoryCatalog.[ERROR] Could not execute SQL statement. Reason: java.lang.UnsupportedOperationException: listProcedures is not implemented for class org.apache.flink.table.catalog.hive.HiveCatalog. -
SHOW MODULES
SHOW MODULES; -
SHOW FULL MODULES
SHOW FULL MODULES; -
SHOW JARS
SHOW JARS; -
SHOW JOBS
SHOW JOBS;[ERROR] Could not execute SQL statement. Reason: java.lang.NullPointerException: No cluster ID found for operation 6621bc36-acf3-482b-aee4-d4154d5caef0
INSERT 语句
-
写入非分区表
-
INSERT OVERWRITE 覆盖写
INSERT OVERWRITE students VALUES ('fred flintstone1', 35, 1.28), ('barney rubble1', 32, 2.32);[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Streaming mode not support overwrite. -
INSERT INTO 插入写
INSERT INTO students VALUES ('fred flintstone2', 35, 1.28), ('barney rubble2', 32, 2.32); -
查询结果写入表
-
-
写入分区的分区
-
INSERT OVERWRITE 覆盖写
INSERT OVERWRITE orders VALUES (1,'fred flintstone'), (2,'barney rubble');INSERT OVERWRITE orders PARTITION (userid=13) select 'Z'; -
INSERT INTO 插入写
INSERT INTO orders VALUES (1,'fred flintstone2'), (2,'barney rubble2'); INSERT INTO orders PARTITION (userid=13) select 'X'; INSERT INTO orders PARTITION (userid=13) select 'Y';
-
-
将数据同时插入多张表
EXECUTE STATEMENT SET BEGIN INSERT INTO students VALUES ('fred flintstone2', 35, 1.28), ('barney rubble2', 32, 2.32); INSERT INTO students VALUES ('fred flintstone3', 35, 1.28), ('barney rubble3', 32, 2.32); END;
UPDATE 语句
-
UPDATE SET
UPDATE students SET age = 2;[ERROR] Could not execute SQL statement. Reason: java.lang.UnsupportedOperationException: Can't perform update operation of the table default_catalog.default_database.students because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate.UPDATE orders SET product = 'product111' where userid = 1; [ERROR] Could not execute SQL statement. Reason: java.lang.UnsupportedOperationException: Can't perform update operation of the table default_catalog.default_database.orders because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate.
ANALYZE 语句
为存在的Table收集统计信息
-
统计非分区表
ANALYZE TABLE students COMPUTE STATISTICS; -
统计分区表
ANALYZE TABLE orders PARTITION(userid) COMPUTE STATISTICS;
SELECT 语句
-
With子句
WITH orders_with_total AS (SELECT sum(userid) AS totalFROM orders ) SELECT total-1 FROM orders_with_total; -
简单查询
-
无where查询
select * from orders; -
有where查询
select * from orders where userid=1; -
有函数调用查询
select count(1) from orders; -
排序ORDER BY
select * from orders order by userid desc; -
限制LIMIT
select * from orders limit 1; -
去重DISTINCT
select DISTINCT userid from orders;
-
-
聚合性查询
-
GroupBy
-
一般聚合字段
select userid,count(1) from orders group by userid; -
窗口聚合
-
-
Over Window
- PRECEDING
- CURRENT ROW
-
-
窗口函数
-
滚动窗口
-
滑动窗口
-
累积窗口
-
会话窗口
-
-
窗口聚合
-
窗口聚合
-
窗口Top-N
-
窗口join
-
窗口去重
-
-
窗口关联
- INNER/LEFT/RIGHT/FULL OUTER
- Semi Window Join
- Anti Window Join
-
分组聚合
-
Having过滤
-
Distinct
-
Grouping sets, Rollup, Cube
-
-
Over聚合
- order by
- partition by
- range
-
自定义函数查询
-
联合查询Joins
-
Inner Equi-join
-
Outer Equi-join
-
Interval Join
-
Expanding arrays into a relation
-
Join 表函数 (UDTF)
将表与表函数的结果进行 join 操作
-
Join Temporal Table Function
Temporal Table 是跟随时间变化而变化的表
Temporal Table Function 提供访问 Temporal Tables 在某一时间点的状态的能力。
-
Join Temporal Tables
仅 Blink planner 支持。
-
Lookup Join
-
Table Function
将表与表函数的结果联接
-
-
集合操作
- Union与UnionAll
- Intersect 与INTERSECT ALL
- Except与Except ALL
- In
- Exists
-
Top-N
仅 Blink 计划器支持 Top-N
SELECT * FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY userid ORDER BY userid DESC) AS row_numFROM orders) WHERE row_num <= 1 -
模式检测
???
-
时间旅行
SQL Hints
复写OPTIONS 只影响单次语句
-
insert语句
-
query语句
EXPLAIN 语句
-
EXPLAIN query 语句
EXPLAIN select * from orders; -
EXPLAIN insert 查询
EXPLAIN INSERT INTO orders PARTITION (userid=13) select 'X'; -
指定的
ExplainDetail类型
LOAD 语句 不测
LOAD 语句用于加载内置的或用户自定义的模块。
- LOAD MODULE module_name
UNLOAD 语句 不测
LOAD 语句用于加载内置的或用户自定义的模块。
- LOAD MODULE module_name
SET 语句
SET 语句用于修改配置或展示配置。
-
SET ‘key’ = ‘value’;
SET 'execution.runtime-mode' = 'batch'; -
SET;
RESET 语句
RESET 语句用于将配置重置为默认值。
-
RESET ‘key’;
RESET 'execution.runtime-mode'; -
RESET ;
JAR 语句
hdfs dfs -put /home/bigdata/software/java/lib/tools.jar /user/bigdata/flink/flink1.19/
-
ADD JAR
add jar '/home/bigdata/software/flink-1.19.0.0/examples/table/WordCountSQLExample.jar'; add jar 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/tools.jar' -
SHOW JARS
show jars; -
REMOVE JAR
REMOVE JAR '/home/bigdata/software/flink-1.19.0.0/examples/table/WordCountSQLExample.jar'; REMOVE JAR 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/tools.jar'[ERROR] Could not execute SQL statement. Reason: java.net.MalformedURLException: unknown protocol: hdfs
Job 语句
-
SHOW JOBS
show jobs;[ERROR] Could not execute SQL statement. Reason: java.lang.NullPointerException: No cluster ID found for operation 286fcf54-8821-4115-a966-7371331890fd -
STOP JOB
TRUNCATE 语句
-
TRUNCATE TABLE
[ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: TRUNCATE TABLE statement is not supported for the table default_catalog.default_database.orders since the table hasn't implemented the interface org.apache.flink.table.connector.sink.abilities.SupportsTruncate.
DELETE 语句
-
DELETE FROM TABLE
[ERROR] Could not execute SQL statement. Reason: java.lang.UnsupportedOperationException: Can't perform delete operation of the table default_catalog.default_database.orders because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete.
DROP 语句
-
DROP TABLE
drop table orders; -
DROP DATABASE 不测
-
DROP VIEW 不测
-
DROP FUNCTION 不测
CALL 语句
- call 存储过程 不测
/home/bigdata/software/flink-1.19.0.0/bin/flink run -m yarn-cluster -yt /home/bigdata/software/flink-1.19.0.0/examples/table/flink-test-project-1.0-SNAPSHOT.jar/home/bigdata/software/flink-1.19.0.0/bin/flink run -m yarn-cluster -p 2 -ynm debug1234 -yjm 4096 -ytm 4096 -ys 1 -yd -yD highavailability.zookeeper.path.namespace=/hsj/debug1234 -yD sn.system.code=bigdata -yD state.backend=filesystem -yD checkpoint.interval=60000 -yD checkpoint.mode=exactly-once -yD high-availability.storageDir=hdfs:///flinkcp/bigdata/debug/ha -yD state.checkpoints.dir=hdfs:///flinkcp/bigdata/debug/checkpoints -yD state.savepoints.dir=hdfs:///flinkcp/bigdata/debug/savepoints -c com.test.tablesql.Test /home/bigdata/software/flink-1.19.0.0/examples/table/flink-test-project-1.0-SNAPSHOT.jar
DataStream API
使用 Hadoop FileSystem 连接器 和 StreamingFileSink 连接器
验证内容
State Backends
开启与配置State Backends和Checkpoint。使用FsStateBackend,将状态,checkpoints,savepoints等写入 file systems(HDFS)。
state.backend=filesystem
high-availability.storageDir=hdfs:///flinkcp/bigdata/debug/ha
state.checkpoints.dir=hdfs:///flinkcp/bigdata/debug/checkpoints
state.savepoints.dir=hdfs:///flinkcp/bigdata/debug/savepoints
使用状态
-
Keyed State
借助
KeyedStream使用Keyed State -
Operator State 算子状态
用于的服务接口
CheckpointedFunction -
Broadcast State 广播状态
用于的服务接口
BroadcastProcessFunction``KeyedBroadcastProcessFunction`
算子
找到下列中实现过CheckpointedFunction、ListCheckpointed、BroadcastProcessFunction 、KeyedBroadcastProcessFunction接口的算子。如果实现了就要测试?
数据流转换
-
Map
-
FlatMap
-
Filter
-
KeyBy
-
Reduce
-
Window
-
WindowAll
-
Window Apply
-
WindowReduce
-
Union
-
Window Join
-
Interval Join
-
Window CoGroup
-
Connect
-
CoMap, CoFlatMap
-
Cache 目前只支持批执行模式下运行的作业。
物理分区
- Custom partitioning
- Random partitioning
- Rebalancing (Round-robin partitioning)
- Rescaling
- Broadcasting
算子链和资源组
- 创建新链
- 禁止链接
- 配置 Slot 共享组
附录一
Flink版本官网连接
https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/table/common/
