当前位置: 首页 > news >正文

FlinkSql(详细讲解一)

Flink SQL 是 Flink 提供的基于 SQL 语法的流批统一处理框架,它屏蔽了底层 Flink 算子的复杂性,让用户可以用熟悉的 SQL 语句处理无界流数据和有界批数据。第一次讲解会聚焦基础概念、核心组件、环境搭建、数据类型、表的创建与管理、基础查询等内容,确保易懂且实用。

一、Flink SQL 核心概念

  1. 流批统一:Flink SQL 不区分 “流处理” 和 “批处理”,而是将批数据视为 “有界流”(数据处理完就结束),流数据视为 “无界流”(持续处理新数据),用同一套 SQL 语法处理两种场景。
  2. Table & View
    • Table(表):是 Flink SQL 中数据的核心载体,可对应外部存储(如 Kafka、MySQL、文件)或中间计算结果。表有 “源表(Source,读数据)” 和 “汇表(Sink,写数据)” 之分。
    • View(视图):是基于表或其他视图的逻辑查询,不存储实际数据,仅保存查询逻辑,用于简化复杂 SQL。
  3. Catalog(元数据目录):管理表、视图、函数等元数据的组件,默认使用内存 Catalog(重启后丢失),也支持 Hive Catalog(持久化元数据)。
  4. Connector(连接器):Flink SQL 通过连接器对接外部系统(如 Kafka、MySQL、HDFS),负责数据的读取(Source Connector)和写入(Sink Connector)。
  5. Function(函数):包括内置函数(如 COUNT()SUBSTRING())和自定义函数(UDF/UDAF/UDTF),用于数据转换和计算。

二、环境搭建:如何运行 Flink SQL?

Flink SQL 有两种常用运行方式:SQL Client(命令行工具) 和 代码集成(Java/Scala/Python)。这里重点讲 SQL Client(最直观,适合入门)。

1. 安装 Flink 并启动 SQL Client
  • 下载 Flink 安装包(推荐 1.17+ 版本,流批统一支持更完善):Flink 官网
  • 解压后启动集群(本地模式,仅用于测试):
    # 进入解压目录
    cd flink-1.17.0
    # 启动本地集群(包含 1 个 JobManager + 1 个 TaskManager)
    ./bin/start-cluster.sh
    
  • 启动 SQL Client:
    ./bin/sql-client.sh embedded  # embedded 表示嵌入集群模式(本地运行)
    

    启动成功后会进入 Flink SQL> 交互界面,可直接输入 SQL 语句执行。

三、Flink SQL 数据类型

Flink SQL 支持丰富的数据类型,和 SQL 标准类似,但有部分特殊类型(尤其是时间类型,对处理流数据很重要)。

1. 基本类型(常用)
类型名说明示例值
INT32 位整数123
BIGINT64 位整数12345678901
VARCHAR(n)可变长度字符串(n 为最大长度,可省略)'flink'
DOUBLE双精度浮点数3.14
BOOLEAN布尔值TRUE / FALSE
2. 复杂类型(处理嵌套数据)
类型名说明示例定义
ARRAY<T>元素类型为 T 的数组ARRAY[1,2,3](INT 数组)
MAP<K, V>键类型 K、值类型 V 的映射MAP['a' -> 1, 'b' -> 2]
ROW<f1 T1, f2 T2>结构化数据(类似 “对象”)ROW(1, 'flink')(INT + VARCHAR)
3. 时间类型(流处理核心)

流数据需要处理 “时间”(如窗口计算、延迟数据处理),Flink SQL 支持两种时间:

  • 事件时间(Event Time):数据产生的实际时间(如日志中的 ts 字段),需通过 “水印(Watermark)” 定义(见下文表创建)。
  • 处理时间(Processing Time):数据进入 Flink 被处理的时间(无需定义,直接用 PROCTIME() 函数获取)。

时间类型语法:

类型名说明示例
TIMESTAMP(3)带毫秒精度的时间(默认 3 位)'2023-10-01 12:00:00.123'
DATE日期(年 - 月 - 日)'2023-10-01'
TIME时间(时:分: 秒)'12:00:00'

四、创建表(核心操作)

表是 Flink SQL 处理数据的入口,创建表需指定:字段名、字段类型、连接器(对接外部系统)、格式(数据序列化方式),如果是流表还需定义水印(处理事件时间)。

语法模板
CREATE TABLE [IF NOT EXISTS] 表名 (字段名1 类型1,字段名2 类型2,-- (可选)定义事件时间字段和水印(流表必用)WATERMARK FOR 事件时间字段 AS 水印规则  -- 水印规则:通常是事件时间减延迟(如 5 秒)
) WITH (-- 连接器类型(如 'kafka'、'filesystem'、'mysql')'connector' = '连接器名称',-- 连接器特有配置(如 Kafka 的 bootstrap.servers、topic)'连接器参数1' = '值1','连接器参数2' = '值2',-- 数据格式(如 'csv'、'json'、'avro')'format' = '格式名称',-- 格式特有配置(如 CSV 的字段分隔符)'格式参数1' = '值1'
);
实战示例:创建源表和汇表
示例 1:创建一个读取 CSV 文件的源表(批处理场景)

假设本地有一个 user_behavior.csv 文件,内容如下(用户行为数据)

1001,2001,300,click,2023-10-01 10:00:00.000
1002,2002,300,buy,2023-10-01 10:00:05.000

创建表读取该文件:

CREATE TABLE user_behavior (user_id BIGINT,       -- 用户IDitem_id BIGINT,       -- 商品IDcategory_id INT,      -- 商品分类IDbehavior VARCHAR,     -- 行为(click/buy)ts TIMESTAMP(3)       -- 事件时间(毫秒精度)
) WITH ('connector' = 'filesystem',  -- 连接器:文件系统'path' = 'file:///本地路径/user_behavior.csv',  -- 文件路径(本地用 file:///,HDFS 用 hdfs:///)'format' = 'csv',            -- 格式:CSV'csv.field-delimiter' = ','  -- CSV 字段分隔符(默认逗号,可省略)
);
示例 2:创建一个 Kafka 源表(流处理场景,带水印)

Kafka 中存储用户行为的 JSON 数据(格式:{"user_id":1001, "item_id":2001, "ts":1696120800000}),创建表读取并定义水印:

CREATE TABLE kafka_user_behavior (user_id BIGINT,item_id BIGINT,ts BIGINT,  -- Kafka 中 ts 是毫秒时间戳(Long 类型)-- 将 ts 转换为事件时间(TIMESTAMP(3)),并定义水印:允许数据延迟 5 秒event_time AS TO_TIMESTAMP_LTZ(ts, 3),  -- 转换为带时区的时间戳WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND  -- 水印规则
) WITH ('connector' = 'kafka','topic' = 'user_behavior_topic',  -- Kafka 主题'properties.bootstrap.servers' = 'localhost:9092',  -- Kafka 地址'properties.group.id' = 'flink_sql_group',  -- 消费者组'scan.startup.mode' = 'earliest-offset',  -- 从最早位置读取'format' = 'json',  -- 数据格式:JSON'json.fail-on-missing-field' = 'false',  -- 字段缺失不报错'json.ignore-parse-errors' = 'true'  -- 解析错误忽略
);

  • 水印(WATERMARK)作用:流数据可能乱序,水印用于标记 “当前已处理到的时间”,超过水印时间的数据会被视为 “迟到数据”(可配置处理策略)。
示例 3:创建一个 MySQL 汇表(写入结果)

将计算结果写入 MySQL 的 user_behavior_result 表:

CREATE TABLE mysql_sink (user_id BIGINT,behavior_count INT,PRIMARY KEY (user_id) NOT ENFORCED  -- MySQL 主键(Flink 不强制校验)
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/test',  -- MySQL 地址'table-name' = 'user_behavior_result',  -- 目标表名'username' = 'root',  -- 用户名'password' = '123456',  -- 密码'sink.buffer-flush.max-rows' = '100',  -- 累计 100 行批量写入'sink.buffer-flush.interval' = '10s'  -- 10 秒批量写入(取两者先满足的)
);

五、基础查询(SELECT 语句)

Flink SQL 的查询语法和标准 SQL 几乎一致,支持投影、过滤、聚合等操作,流批场景通用。

1. 简单投影与过滤

查询 user_behavior 表中 “购买(buy)” 行为的用户 ID 和商品 ID:

SELECT user_id, item_id 
FROM user_behavior 
WHERE behavior = 'buy';

  • 批处理场景:执行后直接返回文件中所有符合条件的结果。
  • 流处理场景(如查询 kafka_user_behavior):会持续监听 Kafka 新数据,每来一条符合条件的数据就输出一次。
2. 基本聚合(GROUP BY)

统计每个用户的行为总数(批处理):

SELECT user_id, COUNT(*) AS behavior_count 
FROM user_behavior 
GROUP BY user_id;

  • 流处理中的聚合:默认是 “无界聚合”(持续更新结果),例如 Kafka 中每新增一条数据,都会重新计算该用户的总数并输出。
3. 写入结果到汇表

将上述聚合结果写入 MySQL 汇表 mysql_sink

INSERT INTO mysql_sink
SELECT user_id, COUNT(*) AS behavior_count 
FROM user_behavior 
GROUP BY user_id;

  • 批处理:执行后一次性写入所有结果,任务结束。
  • 流处理:持续计算并写入最新结果(MySQL 会根据主键更新数据)。

六、视图操作

视图用于简化复杂查询,例如频繁查询 “购买行为且用户 ID> 1000” 的数据,可创建视图:

-- 创建视图
CREATE VIEW buy_behavior_view AS
SELECT user_id, item_id, ts 
FROM user_behavior 
WHERE behavior = 'buy' AND user_id > 1000;-- 查询视图(和查表格一样)
SELECT * FROM buy_behavior_view;-- 删除视图
DROP VIEW IF EXISTS buy_behavior_view;

七、表管理命令

命令作用示例
SHOW TABLES;查看当前 Catalog 中的所有表SHOW TABLES;
DESCRIBE [表名];查看表的结构(字段、类型、水印等)DESCRIBE user_behavior;
DROP TABLE [IF EXISTS] 表名;删除表(仅删除元数据,不删除外部数据)DROP TABLE IF EXISTS user_behavior;

八、执行模式切换(流 / 批)

Flink SQL 默认是 “流处理模式”,可通过配置切换为 “批处理模式”:

-- 切换为批处理模式
SET execution.runtime-mode = 'batch';-- 切换回流处理模式
SET execution.runtime-mode = 'streaming';
  • 批处理模式:适合处理有界数据(如文件),任务执行完自动结束。
  • 流处理模式:适合处理无界数据(如 Kafka),任务持续运行,直到手动停止。

第一次总结

本次讲解了 Flink SQL 的基础概念(表、视图、连接器等)、环境搭建、数据类型、表的创建(源表 / 汇表 / 水印)、基础查询(SELECT/INSERT)、视图操作和表管理命令。这些是使用 Flink SQL 的核心基础,掌握后可处理简单的流批数据场景。

下一次会深入讲解复杂操作:窗口计算(TUMBLE/HOP/SESSION)、流表 JOIN、函数(内置 / 自定义)、CDC 同步(变更数据捕获)等进阶内容。

http://www.dtcms.com/a/325619.html

相关文章:

  • C#中如何运用JWT用户认证
  • AT24C02C-SSHM-T用法
  • 什么情况下会导致日本服务器变慢?解决办法
  • 系统编程——消息队列
  • 前端实现 MD5 + AES 加密的安全登录请求
  • Nacos-1--什么是Nacos?
  • 疫情可视化:基孔肯雅热风险地图实战解析
  • Dubbo从入门到实战:分布式服务开发指南
  • WPF之绑定!
  • OpenCV计算机视觉实战(19)——特征描述符详解
  • 玩转Docker | 使用Docker部署Trilium Notes知识库工具
  • typecho博客设置浏览器标签页图标icon
  • 石材 × 设计:解锁永恒材质的四大灵感密码
  • 数据结构 双链表与LinkedList
  • 18.WEB 服务器
  • 超算中心的一台服务器上有多少个CPU,多少个核?
  • JVM基础【Java】
  • 力扣164:最大间距
  • 深入理解与灵活应用 std::optional
  • vue3中的子组件向父组件通信和父组件向子组件通信
  • python --nacos相关
  • MSE ZooKeeper:Flink高可用架构的企业级选择
  • 《图解技术体系》New generation CMDB resource model framework
  • 自然语言处理实战:用LSTM打造武侠小说生成器
  • 【AI论文】R-Zero:从零数据起步的自进化推理大语言模型
  • JavaScript 中如何实现大文件并行下载
  • AI(2)-神经网络(激活函数)
  • 支持小语种的在线客服系统,自动翻译双方语言,适合对接跨境海外客户
  • NY185NY190美光固态闪存NY193NY195
  • 《深度剖析前端框架中错误边界:异常处理的基石与进阶》