当前位置: 首页 > news >正文

【Flink实战】升级HDFS,对Flink SQL(On Yarn模式)的功能兼容性验证

升级HDFS,对Flink SQL(On Yarn模式)的兼容性验证内容

文章目录

  • 升级HDFS,对Flink SQL(On Yarn模式)的兼容性验证内容
    • 背景
    • Table API & SQL
      • 验证内容
        • CREATE 语句
        • ALTER 语句
        • DESCRIBE 语句
        • USE 语句
        • SHOW 语句
        • INSERT 语句
        • UPDATE 语句
        • ANALYZE 语句
        • SELECT 语句
        • SQL Hints
        • EXPLAIN 语句
        • LOAD 语句 不测
        • UNLOAD 语句 不测
        • SET 语句
        • RESET 语句
        • JAR 语句
        • Job 语句
        • TRUNCATE 语句
        • DELETE 语句
        • DROP 语句
        • CALL 语句
    • DataStream API
      • 验证内容
        • State Backends
        • 使用状态
        • 算子
          • 数据流转换
          • 物理分区
          • 算子链和资源组
    • 附录一

背景

以Flink1.19 为例子

验证在升级HDFS后,Flink在本身运行,以及涉及读写HDFS任务功能的兼容性。

Table API & SQL

执行方式

  • TableEnvironment
  • SQL CLI

两种planner

  • Blink planner
  • Old planner

Connector使用filesystempath使用hdfs协议,指向HDFS的地址。

模式

  • 批模式

    SET 'execution.runtime-mode' = 'batch';
    
  • 流模式

    SET 'execution.runtime-mode' = 'STREAMING';
    
  • 批模式/流模式自动切换

    SET 'execution.runtime-mode' = 'AUTOMATIC';
    

验证内容

SET 'execution.runtime-mode' = 'batch';
set execution.target=yarn-per-job;
set high-availability.storageDir='hdfs:///flinkcp/bigdata/debug/ha';
SET 'sql-client.execution.result-mode' = 'tableau';CREATE TABLE students001 (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');CREATE TEMPORARY TABLE students (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');CREATE TABLE orders(userid BIGINT, product STRING) PARTITIONED BY (userid) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file03', 'format' = 'json');
CREATE 语句
  • CREATE CATALOG

    WITH ('type' = 'generic_in_memory')CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/home/bigdata/software/flink-1.19.0.0/conf');use CATALOG myhive;
    
  • CREATE DATABASE 不测

  • CREATE TABLE

    • 创建临时表

      CREATE TEMPORARY TABLE students (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');
      
    • 创建分区表

      CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');
      
    • 创建非分区表

      CREATE TABLE orders(userid BIGINT, product STRING) PARTITIONED BY (userid) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file03', 'format' = 'json');
      
  • [CREATE OR] REPLACE TABLE 不测

    • 暂不支持替换临时表。

    • 暂不支持指定列信息。

    • 暂不支持指定 Watermark。

    • 暂不支持创建分区表。

    • 暂不支持主键约束。

  • CREATE VIEW 不测

  • CREATE FUNCTION 不测

/home/bigdata/software/flink-1.19.0.0/bin/flink run -m yarn-cluster -yt /home/bigdata/software/flink-1.19.0.0/examples/streaming/WordCount.jar

    /home/bigdata/software/flink-1.19.0.0/bin/flink run -m yarn-cluster -p 2 -ynm debug1234 -yjm 4096 -ytm 4096 -ys 1 -yd -yD highavailability.zookeeper.path.namespace=/hsj/debug1234 -yD sn.system.code=bigdata -yD state.backend=filesystem -yD checkpoint.interval=60000 -yD checkpoint.mode=exactly-once -yD high-availability.storageDir=hdfs:///flinkcp/bigdata/debug/ha -yD state.checkpoints.dir=hdfs:///flinkcp/bigdata/debug/checkpoints -yD state.savepoints.dir=hdfs:///flinkcp/bigdata/debug/savepoints /home/bigdata/software/flink-1.19.0.0/examples/streaming/WordCount.jar

举例

CREATE TABLE dfs_source_table ( `log` STRING ) WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://user/bigdata/flink/flink1.11/source_file00', 'format' = 'raw','source.monitor-interval' = '3');CREATE TABLE students001 (name STRING, age INT, gpa DECIMAL(3, 2))WITH ( 'connector' = 'filesystem', 'path' = 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/source_file02.json', 'format' = 'json');INSERT INTO students001 VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);BEGIN;insert into dfs_source_table values('1a');COMMIT;EXECUTE STATEMENT SET
BEGIN
INSERT INTO hdfs_source_table VALUES ('fred flintstone'), ('barney rubble');
END;CREATE TABLE hdfs_source_table (`log` STRING,`dt` STRING,  -- 分区字段,天`hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'hdfs://user/bigdata/flink/flink1.11/data/source_file01','format' = 'raw'
);CREATE TABLE hdfs_sink_table (`log` STRING,`dt` STRING,  -- 分区字段,天`hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'hdfs://user/bigdata/flink/flink1.11/data/sink_file01','format' = 'raw'
);
ALTER 语句
  • ALTER TABLE

    • 重命名表名

      ALTER TABLE students001 RENAME TO students;
      
    • 修改表的属性,官方示例没有说可以修改字段,可以验证一下。

      ALTER TABLE students RENAME gpa TO gpa1;
      
  • ALTER DATABASE 不测

    • 修改库的属性
  • ALTER FUNCTION 不测

    • 修改函数的identifier 或者 language tag
  • ALTER VIEW 不测

    • 修改视图名称
    • 修改视图查询子句
DESCRIBE 语句
  • DESCRIBE TABLE

    desc students;
    
  • DESCRIBE VIEW 不测

USE 语句
  • USE catalog

    use catalog catalog_test;
    
  • USE database

  • USE MODULES 不测

    modlues?什么版本开始

SHOW 语句
  • SHOW CATALOGS

    SHOW CATALOGS;
    
  • SHOW DATABASES

    SHOW DATABASES;
    
  • SHOW TABLES

    SHOW TABLES;
    
  • SHOW VIEWS

    SHOW VIEWS;
    
  • SHOW FUNCTIONS

    SHOW FUNCTIONS;
    
  • SHOW CURRENT CATALOG

    SHOW CURRENT CATALOG;
    
  • SHOW CURRENT DATABASE

    SHOW CURRENT DATABASE;
    
  • SHOW CREATE TABLE

    SHOW CREATE TABLE students;
    
  • SHOW COLUMNS

    show columns from students;
    
  • SHOW PARTITIONS

    show partitions orders;
    
  • SHOW PROCEDURES 异常

    SHOW PROCEDURES;[ERROR] Could not execute SQL statement. Reason:
    java.lang.UnsupportedOperationException: listProcedures is not implemented for class org.apache.flink.table.catalog.GenericInMemoryCatalog.[ERROR] Could not execute SQL statement. Reason:
    java.lang.UnsupportedOperationException: listProcedures is not implemented for class org.apache.flink.table.catalog.hive.HiveCatalog.
  • SHOW MODULES

    SHOW MODULES;
    
  • SHOW FULL MODULES

    SHOW FULL MODULES;
    
  • SHOW JARS

    SHOW JARS;
    
  • SHOW JOBS

    SHOW JOBS;[ERROR] Could not execute SQL statement. Reason:
    java.lang.NullPointerException: No cluster ID found for operation 6621bc36-acf3-482b-aee4-d4154d5caef0
    
INSERT 语句
  • 写入非分区表

    • INSERT OVERWRITE 覆盖写

      INSERT OVERWRITE  students VALUES ('fred flintstone1', 35, 1.28), ('barney rubble1', 32, 2.32);[ERROR] Could not execute SQL statement. Reason:
      java.lang.IllegalStateException: Streaming mode not support overwrite.
      
    • INSERT INTO 插入写

      INSERT INTO students VALUES ('fred flintstone2', 35, 1.28), ('barney rubble2', 32, 2.32);
      
    • 查询结果写入表

  • 写入分区的分区

    • INSERT OVERWRITE 覆盖写

      INSERT OVERWRITE orders VALUES (1,'fred flintstone'), (2,'barney rubble');INSERT OVERWRITE orders PARTITION (userid=13) select 'Z';
      
    • INSERT INTO 插入写

      INSERT INTO orders VALUES (1,'fred flintstone2'), (2,'barney rubble2');
      INSERT INTO orders PARTITION (userid=13) select 'X';
      INSERT INTO orders PARTITION (userid=13) select 'Y';
      
  • 将数据同时插入多张表

    EXECUTE STATEMENT SET
    BEGIN
    INSERT INTO students VALUES ('fred flintstone2', 35, 1.28), ('barney rubble2', 32, 2.32);
    INSERT INTO students VALUES ('fred flintstone3', 35, 1.28), ('barney rubble3', 32, 2.32);
    END;
    
UPDATE 语句
  • UPDATE SET

    UPDATE students SET age = 2;[ERROR] Could not execute SQL statement. Reason:
    java.lang.UnsupportedOperationException: Can't perform update operation of the table default_catalog.default_database.students because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate.UPDATE orders SET product = 'product111' where userid = 1;
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.UnsupportedOperationException: Can't perform update operation of the table default_catalog.default_database.orders because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate.
ANALYZE 语句

为存在的Table收集统计信息

  • 统计非分区表

    ANALYZE TABLE students COMPUTE STATISTICS;
    
  • 统计分区表

    ANALYZE TABLE orders PARTITION(userid) COMPUTE STATISTICS;
    
SELECT 语句
  • With子句

    WITH orders_with_total AS (SELECT sum(userid) AS totalFROM orders
    )
    SELECT total-1
    FROM orders_with_total;
    
  • 简单查询

    • 无where查询

      select * from orders;
      
    • 有where查询

      select * from orders where userid=1;
      
    • 有函数调用查询

      select count(1) from orders;
      
    • 排序ORDER BY

      select * from orders order by userid desc;
      
    • 限制LIMIT

      select * from orders limit 1;
      
    • 去重DISTINCT

      select DISTINCT userid from orders;
      
  • 聚合性查询

    • GroupBy

      • 一般聚合字段

        select userid,count(1) from orders group by userid;
        
      • 窗口聚合

    • Over Window

      • PRECEDING
      • CURRENT ROW
  • 窗口函数

    • 滚动窗口

    • 滑动窗口

    • 累积窗口

    • 会话窗口

  • 窗口聚合

    • 窗口聚合

    • 窗口Top-N

    • 窗口join

    • 窗口去重

  • 窗口关联

    • INNER/LEFT/RIGHT/FULL OUTER
    • Semi Window Join
    • Anti Window Join
  • 分组聚合

    • Having过滤

    • Distinct

    • Grouping sets, Rollup, Cube

  • Over聚合

    • order by
    • partition by
    • range
  • 自定义函数查询

  • 联合查询Joins

    • Inner Equi-join

    • Outer Equi-join

    • Interval Join

    • Expanding arrays into a relation

    • Join 表函数 (UDTF)

      将表与表函数的结果进行 join 操作

    • Join Temporal Table Function

      Temporal Table 是跟随时间变化而变化的表

      Temporal Table Function 提供访问 Temporal Tables 在某一时间点的状态的能力。

    • Join Temporal Tables

      仅 Blink planner 支持。

    • Lookup Join

    • Table Function

      将表与表函数的结果联接

  • 集合操作

    • Union与UnionAll
    • Intersect 与INTERSECT ALL
    • Except与Except ALL
    • In
    • Exists
  • Top-N

    仅 Blink 计划器支持 Top-N

    SELECT *
    FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY userid ORDER BY userid DESC) AS row_numFROM orders)
    WHERE row_num <= 1
    
  • 模式检测

    ???

  • 时间旅行

SQL Hints

复写OPTIONS 只影响单次语句

  • insert语句

  • query语句

EXPLAIN 语句
  • EXPLAIN query 语句

    EXPLAIN  select * from orders;
    
  • EXPLAIN insert 查询

    EXPLAIN  INSERT INTO orders PARTITION (userid=13) select 'X';
    
  • 指定的 ExplainDetail 类型

LOAD 语句 不测

LOAD 语句用于加载内置的或用户自定义的模块。

  • LOAD MODULE module_name
UNLOAD 语句 不测

LOAD 语句用于加载内置的或用户自定义的模块。

  • LOAD MODULE module_name
SET 语句

SET 语句用于修改配置或展示配置。

  • SET ‘key’ = ‘value’;

    SET 'execution.runtime-mode' = 'batch';
    
  • SET;

RESET 语句

RESET 语句用于将配置重置为默认值。

  • RESET ‘key’;

    RESET 'execution.runtime-mode';
    
  • RESET ;

JAR 语句
hdfs dfs -put /home/bigdata/software/java/lib/tools.jar /user/bigdata/flink/flink1.19/
  • ADD JAR

    add jar '/home/bigdata/software/flink-1.19.0.0/examples/table/WordCountSQLExample.jar';
    add jar 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/tools.jar'
    
  • SHOW JARS

    show jars;
    
  • REMOVE JAR

    REMOVE JAR '/home/bigdata/software/flink-1.19.0.0/examples/table/WordCountSQLExample.jar';
    REMOVE JAR 'hdfs://hadoop3test2-01.com:9000/user/bigdata/flink/flink1.19/tools.jar'[ERROR] Could not execute SQL statement. Reason:
    java.net.MalformedURLException: unknown protocol: hdfs
    
Job 语句
  • SHOW JOBS

    show jobs;[ERROR] Could not execute SQL statement. Reason:
    java.lang.NullPointerException: No cluster ID found for operation 286fcf54-8821-4115-a966-7371331890fd
    
  • STOP JOB

TRUNCATE 语句
  • TRUNCATE TABLE

    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.TableException: TRUNCATE TABLE statement is not supported for the table default_catalog.default_database.orders since the table hasn't implemented the interface org.apache.flink.table.connector.sink.abilities.SupportsTruncate.
    
DELETE 语句
  • DELETE FROM TABLE

    
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.UnsupportedOperationException: Can't perform delete operation of the table default_catalog.default_database.orders because the corresponding dynamic table sink has not yet implemented org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete.
    
DROP 语句
  • DROP TABLE

    drop table orders;
    
  • DROP DATABASE 不测

  • DROP VIEW 不测

  • DROP FUNCTION 不测

CALL 语句
  • call 存储过程 不测
/home/bigdata/software/flink-1.19.0.0/bin/flink run -m yarn-cluster -yt /home/bigdata/software/flink-1.19.0.0/examples/table/flink-test-project-1.0-SNAPSHOT.jar/home/bigdata/software/flink-1.19.0.0/bin/flink run -m yarn-cluster -p 2 -ynm debug1234 -yjm 4096 -ytm 4096 -ys 1 -yd -yD highavailability.zookeeper.path.namespace=/hsj/debug1234 -yD sn.system.code=bigdata -yD state.backend=filesystem -yD checkpoint.interval=60000 -yD checkpoint.mode=exactly-once -yD high-availability.storageDir=hdfs:///flinkcp/bigdata/debug/ha -yD state.checkpoints.dir=hdfs:///flinkcp/bigdata/debug/checkpoints -yD state.savepoints.dir=hdfs:///flinkcp/bigdata/debug/savepoints -c com.test.tablesql.Test /home/bigdata/software/flink-1.19.0.0/examples/table/flink-test-project-1.0-SNAPSHOT.jar

DataStream API

使用 Hadoop FileSystem 连接器 和 StreamingFileSink 连接器

验证内容

State Backends

开启与配置State Backends和Checkpoint。使用FsStateBackend,将状态,checkpoints,savepoints等写入 file systems(HDFS)。

state.backend=filesystem 
high-availability.storageDir=hdfs:///flinkcp/bigdata/debug/ha
state.checkpoints.dir=hdfs:///flinkcp/bigdata/debug/checkpoints
state.savepoints.dir=hdfs:///flinkcp/bigdata/debug/savepoints
使用状态
  • Keyed State

    借助KeyedStream使用Keyed State

  • Operator State 算子状态

    用于的服务接口

    CheckpointedFunction

  • Broadcast State 广播状态

    用于的服务接口

    BroadcastProcessFunction

    ``KeyedBroadcastProcessFunction`

算子

找到下列中实现过CheckpointedFunctionListCheckpointedBroadcastProcessFunction KeyedBroadcastProcessFunction接口的算子。如果实现了就要测试?

数据流转换
  • Map

  • FlatMap

  • Filter

  • KeyBy

  • Reduce

  • Window

  • WindowAll

  • Window Apply

  • WindowReduce

  • Union

  • Window Join

  • Interval Join

  • Window CoGroup

  • Connect

  • CoMap, CoFlatMap

  • Cache 目前只支持批执行模式下运行的作业。

物理分区
  • Custom partitioning
  • Random partitioning
  • Rebalancing (Round-robin partitioning)
  • Rescaling
  • Broadcasting
算子链和资源组
  • 创建新链
  • 禁止链接
  • 配置 Slot 共享组

附录一

Flink版本官网连接

https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/table/common/

http://www.dtcms.com/a/523969.html

相关文章:

  • LangChain 表达式语言:SQL 数据库查询链
  • 通辽网站网站建设网站卖东西怎么做
  • 免费个人网站建设大全有什么建设网站的书籍
  • 电脑控制DFPlayer Mini MP3播放音乐
  • Day10:Python实现Excel自动汇总
  • 网站建设 美食站点网站设计确认函
  • 新买的笔记本电脑为什么风扇声音一直很大?怎样解决?
  • 鸿蒙 HarmonyOS 6|ArkUI(03):状态管理
  • DeepSeek 最新开源OCR模型,实测,不如百度Paddle
  • 做视频网站多大空间够网络推广是指什么
  • 网站运营维护中需要用到什么服务器网站设计哪家最好
  • 多类别分类中,标签的 “独热编码” 形式与输出层神经元的位置处理过程
  • 搞懂 Kotlin 的 List、Set、Map、HashMap、LinkedHashMap,以及 asSequence() 的底层原理与实战场景。
  • RK3568项目(十八)--debian文件系统的编译
  • 【Elasticsearch 全解析】分布式搜索引擎的原理、实践与优化
  • 亚马逊“Amelia”智能眼镜登场,三星/微美全息加速AI+AR技术融合引领穿戴赛道!
  • 成都有几个区高级seo培训
  • 免费网站模板 带后台网络网站维护费怎么做会计分录
  • Visual Studio 演进之路:从集成套件到AI驱动的开发平台
  • ament_make 详细范例
  • Git Stash 用法详解
  • tailwindcss使用@apply指令定义自己的样式
  • Ubuntu安装nvm(无需梯子自动连接github下载安装)
  • 襄阳云平台网站建设做网络竞拍的网站需要什么
  • 一个虚拟主机怎么做多个网站建立网站花钱吗
  • MySQL一篇速通
  • 用 Cloudflare + Gmail 免费搭建自定义域名邮箱(example.com 实操教程)
  • 02_prometheus监控Grafana展示
  • MYSQL之内置函数
  • 网站内容维护外包协议自己建网站难吗