当前位置: 首页 > news >正文

FlinkSQL的常用语言

FlinkSQL 常用语言指南

FlinkSQL 是 Apache Flink 提供的 SQL 接口,允许用户使用标准 SQL 或扩展的 SQL 语法来处理流式和批式数据。以下是 FlinkSQL 的常用语言元素和操作:

  1. 基本查询
-- 选择查询
SELECT * FROM table_name;

-- 带条件的查询
SELECT column1, column2 FROM table_name WHERE condition;

-- 分组聚合
SELECT user_id, COUNT(*) as cnt 
FROM orders 
GROUP BY user_id;
  1. 时间属性定义
-- 定义处理时间
CREATE TABLE orders (
    order_id STRING,
    product STRING,
    amount DOUBLE,
    order_time TIMESTAMP(3),
    -- 声明处理时间属性
    proc_time AS PROCTIME()
) WITH (...);

-- 定义事件时间和水位线
CREATE TABLE orders (
    order_id STRING,
    product STRING,
    amount DOUBLE,
    order_time TIMESTAMP(3),
    -- 声明事件时间属性
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
  1. 窗口操作
-- 滚动窗口
SELECT 
    window_start, 
    window_end, 
    SUM(amount) as total_amount
FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end;

-- 滑动窗口
SELECT 
    window_start, 
    window_end, 
    user_id,
    SUM(amount) as total_amount
FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTES, INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, user_id;

-- 会话窗口
SELECT 
    window_start, 
    window_end, 
    user_id,
    COUNT(*) as event_count
FROM TABLE(
    SESSION(TABLE orders, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end, user_id;
  1. 连接操作
-- 常规连接
SELECT o.order_id, o.product, u.user_name
FROM orders AS o
JOIN users AS u ON o.user_id = u.user_id;

-- 时间区间连接
SELECT 
    o.order_id, 
    p.promotion_name,
    o.order_time,
    o.amount
FROM orders o
JOIN promotions p 
ON o.product_id = p.product_id
AND o.order_time BETWEEN p.start_time AND p.end_time;

-- 窗口连接
SELECT 
    o.order_id,
    s.shipment_id,
    o.order_time,
    s.ship_time,
    TIMESTAMPDIFF(HOUR, o.order_time, s.ship_time) as hours_to_ship
FROM orders o
JOIN shipments s
ON o.order_id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '1' HOUR AND s.ship_time;
  1. 常用函数

标量函数

-- 字符串函数
SELECT LOWER(name), SUBSTRING(email, 1, 5) FROM users;

-- 数学函数
SELECT ABS(amount), ROUND(price, 2) FROM products;

-- 时间函数
SELECT 
    DATE_FORMAT(order_time, 'yyyy-MM-dd'),
    TIMESTAMPDIFF(DAY, order_time, CURRENT_TIMESTAMP)
FROM orders;

聚合函数

SELECT 
    COUNT(*) as total_orders,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount,
    MAX(amount) as max_amount,
    MIN(amount) as min_amount
FROM orders;

窗口函数

SELECT 
    product_id,
    order_time,
    amount,
    ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY order_time) as row_num,
    SUM(amount) OVER (PARTITION BY product_id ORDER BY order_time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as moving_sum
FROM orders;
  1. DDL 语句
-- 创建表
CREATE TABLE orders (
    order_id STRING,
    product_id STRING,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- 创建视图
CREATE VIEW large_orders AS
SELECT * FROM orders WHERE amount > 1000;

-- 创建函数
CREATE FUNCTION my_udf AS 'com.example.MyUDF';
  1. DML 语句
-- 插入数据
INSERT INTO target_table
SELECT * FROM source_table WHERE amount > 100;

-- 更新数据 (Flink 1.12+ 支持有限)
UPDATE orders SET amount = 200 WHERE order_id = '123';

-- 删除数据 (Flink 1.12+ 支持有限)
DELETE FROM orders WHERE order_id = '456';
  1. 模式匹配 (MATCH_RECOGNIZE)
SELECT *
FROM orders
MATCH_RECOGNIZE (
    PARTITION BY user_id
    ORDER BY order_time
    MEASURES
        START_ROW.order_id AS start_order,
        LAST(PRICE_DOWN.order_id) AS bottom_order,
        LAST(PRICE_UP.order_id) AS end_order
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO LAST PRICE_UP
    PATTERN (START_ROW PRICE_DOWN+ PRICE_UP+)
    DEFINE
        PRICE_DOWN AS (LAST(PRICE_DOWN.amount, 1) IS NULL AND PRICE_DOWN.amount < START_ROW.amount) 
            OR PRICE_DOWN.amount < LAST(PRICE_DOWN.amount, 1),
        PRICE_UP AS PRICE_UP.amount > LAST(PRICE_DOWN.amount, 1)
) MR;
  1. 配置参数
-- 设置参数
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '1000';
  1. 常用连接器配置
-- Kafka 源表
CREATE TABLE kafka_source (
    id INT,
    name STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'input_topic',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

-- JDBC 结果表
CREATE TABLE jdbc_sink (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/mydb',
    'table-name' = 'sink_table',
    'username' = 'user',
    'password' = 'password'
);

FlinkSQL 不断演进,这里只是举例一些常用的语句,参考官方文档可以获取最新语法和功能。

相关文章:

  • C++20 统一容器擦除:std::erase 和 std::erase_if
  • 大厂算法面试 7 天冲刺:第7天-系统设计与模拟面试实战 —— 架构思维 + Java落地
  • Git 拉取时常见冲突及解决方法总结
  • MySQL---数据库基础
  • 封装公共方法,并存在异步请求接口情况 封装及调用
  • vue keep-alive 如何设置动态的页面缓存
  • Python | kelvin波的水平空间结构
  • [MySQL]复合查询
  • 408 计算机网络 知识点记忆(7)
  • 基于phpStudy/宝塔搭建pbootcms,用于公司官网 | 解决管理后台登录报错问题 runtime\\data\\xx.php
  • 一文详解ffmpeg环境搭建:Ubuntu系统ffmpeg配置nvidia硬件加速
  • 2.2.3 Spark Standalone集群
  • 各类神经网络学习:(十)注意力机制(第2/4集),pytorch 中的多维注意力机制、自注意力机制、掩码自注意力机制、多头注意力机制
  • 游戏盾IP可以被破解吗
  • [特殊字符] macOS + Lima 离线下载 Calico 镜像教程
  • UML-饮料自助销售系统(饮料已售完)序列图
  • 每日一题-力扣-2999. 统计强大整数的数目 0410
  • 预言机与数据聚合器:DeFi的数据桥梁与风险博弈
  • 云原生运维在 2025 年的发展蓝图
  • PyTorch实现多输入输出通道的卷积操作
  • 做网站需要买服务器么/天津网络推广公司
  • 可以做烟的网站吗/全国网站排名
  • 手机做网站的步骤/网上在线看视频为什么卡
  • 赛车pk10计划网站建设/免费注册
  • 正定网站建设/餐饮培训
  • 成都模版网站制作/怎么找网站