当前位置: 首页 > news >正文

Flink 系列之二十九- Flink SQL - 中间算子:窗口聚合

之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。

注意由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19
代码参考:https://github.com/forever1986/flink-study.git

目录

  • 1 Window Top-N
    • 1.1 说明
    • 1.2 示例演示
  • 2 Window Deduplication
    • 2.1 说明
    • 2.2 示例演示

上一章针对了FlinkSQL中的水位线和窗口进行了讲解,那么这一章结合前面讲到的基于OVER函数的聚合TopN和Deduplication,实现在有窗口定义下的使用,下面统一做演示。

1 Window Top-N

1.1 说明

在《系列之二十七 - Flink SQL - 中间算子:OVER聚合》中讲过使用OVER函数来实现Top-N。这里加入上一章讲解的窗口,按照需求查询每个窗口的Top-N,以下是Top-N的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE rownum <= N [AND conditions]

注意事项

  • 1)其中table_name替换为VTF窗口的写法,即可做到开窗求Top-N
  • 2)Window Top-N 需要 PARTITION BY 子句包含 窗口表值函数 或 窗口聚合 产生的 window_start 和 window_end
  • 3)不支持会话窗口

1.2 示例演示

示例说明:通过随机生成服务器cpu值,进行滚动窗口5秒内排名top-2的cpu值。为了演示说明方便,这里只生成一台服务器的数据

1)创建log表:使用它datagen的connector随机生成cpu值

CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='1','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);

在这里插入图片描述

2)结合Top-N和窗口查询:5秒长度滚动窗口,窗口内排名前2的cpu值

SET sql-client.execution.result-mode=TABLEAU; 
SELECTid,cpu,ts,rownum,window_start, window_end
FROM
(SELECT id , cpu, ts, ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY cpu desc) as rownum,window_start, window_endFROM TABLE(TUMBLE(TABLE log, DESCRIPTOR(ts), INTERVAL '5' SECOND))
)
WHERE rownum<=2;

在这里插入图片描述

说明:从上图可以看到每个窗口不断地打印出前2的cpu值,[15,20)的窗口只打印一条是刚好这个窗口只有1条数据(19秒的时候生产的)

2 Window Deduplication

2.1 说明

在《系列之二十七 - Flink SQL - 中间算子:OVER聚合》中讲过使用OVER函数来实现Deduplication。这里加入上一章讲解的窗口,按照需求查询每个窗口的去重Deduplication,以下是Deduplication的语法:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name) -- relation applied windowing TVF
WHERE (rownum = 1 | rownum <=1 | rownum < 2) [AND conditions]

注意事项

  • 1)其中table_name替换为VTF窗口的写法,即可做到开窗去重
  • 2)Window Top-N 需要 PARTITION BY 子句包含 窗口表值函数 或 窗口聚合 产生的 window_start 和 window_end
  • 3)不支持会话窗口。不支持处理时间,只支持事件时间

2.2 示例演示

示例说明:通过随机生成服务器cpu值,进行滚动窗口5秒内每台服务器最大cpu值。

1)创建log表:使用它datagen的connector随机生成cpu值

CREATE TABLE log(id INT,cpu DOUBLE,ts AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR ts AS ts
) WITH ('connector' = 'datagen','rows-per-second'='1','fields.id.kind'='random','fields.id.min'='1','fields.id.max'='2','fields.cpu.kind'='random','fields.cpu.min'='1','fields.cpu.max'='100'
);

在这里插入图片描述

2)结合Top-N和窗口查询:5秒长度滚动窗口,窗口内不同服务器最大的cpu值

SET sql-client.execution.result-mode=TABLEAU;
SELECTid,cpu,ts,rownum,window_start, window_end
FROM
(SELECT id , cpu, ts, ROW_NUMBER() OVER(PARTITION BY window_start, window_end, id ORDER BY cpu desc) as rownum,window_start, window_endFROM TABLE(TUMBLE(TABLE log, DESCRIPTOR(ts), INTERVAL '5' SECOND))
)
WHERE rownum = 1;

在这里插入图片描述

说明:从上图可以看到每个窗口不断地打印出最大的cpu值

结语:本章主要是将《系列之二十七 - Flink SQL - 中间算子:OVER聚合》中的Top-N和Deduplication场景在有窗口定义下的情况进行演示。下章还是继续回到FlinkSQL的中间算子。

相关文章:

  • 51la工具有哪些功能?悟空统计的核心功能呢?
  • NVIDIA Isaac GR00T N1.5 适用于 LeRobot SO-101 机械臂
  • 【Python打卡Day35】模型可视化与推理@浙大疏锦行
  • Ntfs!NtfsAllocateRestartTableIndex函数分析和Ntfs!DIRTY_PAGE_ENTRY_V0结构的关系
  • Nacos服务注册失败解决方案
  • Unity Mecanim C# 动画切换实践:实现随机播放待机动画
  • 大数据学习(139)-数仓设计
  • 高动态范围成像
  • 论文略读:HR-Extreme: A High-Resolution Dataset for Extreme Weather Forecasting
  • 【2025】Ubuntu22.04深度学习环境搭建记录
  • Flash数据写入及ECC纠错关键函数:Fapi_issueProgrammingCommand()
  • Acrobat 中使用 JavaScript 禁止 PDF 打印
  • CSS实现元素撑满剩余空间的5种方法
  • linux中的locate命令
  • 探索MCP Server Chart:AI赋能的统计图表自动生成工具
  • 柑橘检测模型
  • 中国AI Top30 访问量排行榜 - 2025年05月
  • 基于mapreduce的气候分析系统设计与实现
  • 刀客doc:WPP走下神坛
  • 论文参考文献干干货
  • 微网站的定义/吉林seo技术交流
  • 国内设计师个人网页/seo怎么优化效果更好
  • 政府网站建设应用工作方案/a站
  • 哈尔滨专业做网站公司/怎么在百度上发布个人文章
  • 行业网站建设哪家好/sem优化服务公司
  • 新开传奇网站刚开/竞价排名是什么意思