当前位置: 首页 > news >正文

Spark专题-第二部分:Spark SQL 入门(7)-算子介绍-Windows

Spark专题-第二部分:Spark SQL 入门(7)-算子介绍-Windows

Windows算子概述

窗口函数在Spark中通过WindowExec物理算子实现,它负责处理OVER子句中定义的分区、排序和框架边界。

窗口函数类型及对应SQL

1. 排名函数

  • ROW_NUMBER(): 为分区内的每行分配唯一序号
  • RANK(): 排名,相同值有相同排名但留下间隔
  • DENSE_RANK(): 密集排名,相同值有相同排名且不留间隔

2. 聚合函数

  • SUM(), AVG(), COUNT(), MAX(), MIN(): 在窗口内聚合

3. 分析函数

  • LEAD(), LAG(): 访问前后行的数据
  • FIRST_VALUE(), LAST_VALUE(): 获取窗口内第一个/最后一个值

窗口函数语法结构

SELECT column1,column2,window_function() OVER (PARTITION BY partition_column ORDER BY order_columnROWS BETWEEN start_boundary AND end_boundary) AS result_column
FROM table_name;

物理执行计划分析

WindowExec算子执行流程

WindowExec内部处理
接收排序后的分区数据
为每个分区维护滑动窗口
根据框架边界计算函数
输出带有窗口函数结果的行
TableScan
Exchange: 按partition_column分区
Sort: 按order_column排序
WindowExec: 应用窗口函数
Output

实际案例与执行流程

案例: 销售数据排名分析

-- 创建销售数据表
CREATE TABLE sales (sale_id INT,salesperson STRING,department STRING,sale_amount DOUBLE,sale_date DATE
);-- 插入示例数据
INSERT INTO sales VALUES
(1, 'Alice', 'Tech', 1000.0, '2023-01-15'),
(2, 'Bob', 'Tech', 1500.0, '2023-01-16'),
(3, 'Alice', 'Tech', 800.0, '2023-02-10'),
(4, 'Charlie', 'HR', 1200.0, '2023-01-20'),
(5, 'Bob', 'Tech', 2000.0, '2023-02-05');

使用ROW_NUMBER()的执行计划

EXPLAIN EXTENDED
SELECT salesperson,department,sale_amount,sale_date,ROW_NUMBER() OVER (PARTITION BY department ORDER BY sale_amount DESC) as rank
FROM sales;

物理执行计划:

== Physical Plan ==
Window [row_number() windowspecdefinition(department#11, sale_amount#10 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#15], [department#11], [sale_amount#10 DESC NULLS LAST]
+- *(1) Sort [department#11 ASC NULLS FIRST, sale_amount#10 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(department#11, 200)
+- *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]
WindowExec处理细节
分区: Tech
排序数据: Bob-2000, Bob-1500, Alice-1000, Alice-800
计算行号: 1, 2, 3, 4
分区: HR
排序数据: Charlie-1200
计算行号: 1
Scan sales表
Exchange: 按department哈希分区
Sort: 按department ASC, sale_amount DESC排序
WindowExec: 计算ROW_NUMBER
Output

使用聚合窗口函数的执行计划

EXPLAIN EXTENDED
SELECT salesperson,sale_date,sale_amount,SUM(sale_amount) OVER (PARTITION BY salespersonORDER BY sale_dateROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total
FROM sales;
滑动窗口处理
框架: UNBOUNDED PRECEDING to CURRENT ROW
Alice的销售记录按日期排序
计算累计和: 1000, 1800
Bob的销售记录按日期排序
计算累计和: 1500, 3500
Scan sales表
Exchange: 按salesperson分区
Sort: 按salesperson, sale_date排序
WindowExec: 计算累计和
Output

窗口框架类型

1. 行基础框架

-- 前2行到当前行
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW-- 当前行到后1行
ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING-- 所有前行到当前行
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

2. 范围基础框架

-- 金额相差100范围内的行
RANGE BETWEEN 100 PRECEDING AND CURRENT ROW-- 所有前行到当前行
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

性能优化考虑

1. 分区键选择

基数过高
基数过低
适中
选择分区键
分区键基数
太多小分区-性能差
分区过大-可能内存不足
最佳性能

2. 排序优化

-- 确保排序键上有合适的索引或统计信息
ANALYZE TABLE sales COMPUTE STATISTICS FOR COLUMNS department, sale_amount;

3. 内存管理

-- 调整窗口函数内存配置
SET spark.sql.windowExec.buffer.spill.threshold=4096;
SET spark.sql.windowExec.buffer.in.memory.threshold=1024;

高级窗口函数应用

使用LEAD/LAG进行时间序列分析

SELECT salesperson,sale_date,sale_amount,LAG(sale_amount, 1) OVER (PARTITION BY salesperson ORDER BY sale_date) as prev_sale,LEAD(sale_amount, 1) OVER (PARTITION BY salesperson ORDER BY sale_date) as next_sale
FROM sales;

使用FIRST_VALUE/LAST_VALUE

SELECT department,salesperson,sale_amount,FIRST_VALUE(salesperson) OVER (PARTITION BY department ORDER BY sale_amount DESC) as top_salesperson
FROM sales;

执行计划解析技巧

使用EXPLAIN分析窗口函数执行计划时关注:

  1. Exchange: 分区操作,影响shuffle量
  2. Sort: 排序操作,影响性能
  3. Window: 窗口函数的具体实现
-- 查看详细的执行计划
EXPLAIN FORMATTED
SELECT department, salesperson, RANK() OVER (PARTITION BY department ORDER BY sale_amount DESC) 
FROM sales;

最佳实践

  1. 合理选择分区键: 避免数据倾斜
  2. 使用合适的框架: 根据业务需求选择行或范围框架
  3. 监控内存使用: 窗口操作可能消耗大量内存
  4. 结合索引优化: 在排序键上建立合适的数据分布

算子部分目前也就想到这么多,后面想起来其他的再单独补充,接下来会准备一些优化方面的文章

http://www.dtcms.com/a/393450.html

相关文章:

  • JavaScript 闭包(Closure)深度讲解
  • QT与Spring Boot通信:实现HTTP请求的完整指南
  • 服务器ubuntu 22.04装nvidia驱动
  • nginx流量复制
  • spring-ai-alibaba-nl2sql 学习(五)——python 分析
  • 分布式链路追踪关键指标实战:精准定位服务调用 “慢节点” 全指南(三)
  • SimpleVLA-RL:通过 RL 实现 VLA 训练的 Scaling
  • Java 大视界 -- 基于 Java 的大数据可视化在企业供应链动态监控与优化中的应用
  • 《Linux 进程控制完全指南》
  • GitHub 热榜项目 - 日榜(2025-09-21)
  • 鹿鼎记豪侠传:Rust 重塑 iOS 江湖(上)
  • echarts监听dataZoom拖动缩放事件
  • Chrome学习小记3:基于Chrome Views框架创建最小示例窗口A(从Example分析开始)
  • Chrome学习小记2:GN构建系统小记
  • Chrome性能优化指南大纲
  • 【iOS】AFNetworking学习
  • Kafka 分层存储(Tiered Storage)原理、配置、快速上手与生产落地
  • 多元函数微分学核心概念辨析:连续、偏导与可微
  • 9.21 快选|倍增|栈+贡献法
  • AI.工作助手.工作提效率.AI应用开发平台
  • 【名人简历】鲁迅
  • linux文件系统基本管理
  • 2.1 进程与线程 (答案见原书 P57)
  • SDL2 开发详解
  • c++ 深拷贝之 std::string 与 char*
  • [数理逻辑] 决定性公理与勒贝格可测性(II) 一维情况
  • [Tongyi] DeepResearch Model | MODEL_PATH
  • 儿童对话玩具模型设计与实现
  • 生成器迁移的偏差消除条件
  • LeetCode 刷题【86. 分隔链表】