Spark专题-第二部分:Spark SQL 入门(7)-算子介绍-Windows
Spark专题-第二部分:Spark SQL 入门(7)-算子介绍-Windows
Windows算子概述
窗口函数在Spark中通过WindowExec
物理算子实现,它负责处理OVER子句中定义的分区、排序和框架边界。
窗口函数类型及对应SQL
1. 排名函数
- ROW_NUMBER(): 为分区内的每行分配唯一序号
- RANK(): 排名,相同值有相同排名但留下间隔
- DENSE_RANK(): 密集排名,相同值有相同排名且不留间隔
2. 聚合函数
- SUM(), AVG(), COUNT(), MAX(), MIN(): 在窗口内聚合
3. 分析函数
- LEAD(), LAG(): 访问前后行的数据
- FIRST_VALUE(), LAST_VALUE(): 获取窗口内第一个/最后一个值
窗口函数语法结构
SELECT column1,column2,window_function() OVER (PARTITION BY partition_column ORDER BY order_columnROWS BETWEEN start_boundary AND end_boundary) AS result_column
FROM table_name;
物理执行计划分析
WindowExec算子执行流程
实际案例与执行流程
案例: 销售数据排名分析
-- 创建销售数据表
CREATE TABLE sales (sale_id INT,salesperson STRING,department STRING,sale_amount DOUBLE,sale_date DATE
);-- 插入示例数据
INSERT INTO sales VALUES
(1, 'Alice', 'Tech', 1000.0, '2023-01-15'),
(2, 'Bob', 'Tech', 1500.0, '2023-01-16'),
(3, 'Alice', 'Tech', 800.0, '2023-02-10'),
(4, 'Charlie', 'HR', 1200.0, '2023-01-20'),
(5, 'Bob', 'Tech', 2000.0, '2023-02-05');
使用ROW_NUMBER()的执行计划
EXPLAIN EXTENDED
SELECT salesperson,department,sale_amount,sale_date,ROW_NUMBER() OVER (PARTITION BY department ORDER BY sale_amount DESC) as rank
FROM sales;
物理执行计划:
== Physical Plan ==
Window [row_number() windowspecdefinition(department#11, sale_amount#10 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#15], [department#11], [sale_amount#10 DESC NULLS LAST]
+- *(1) Sort [department#11 ASC NULLS FIRST, sale_amount#10 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(department#11, 200)
+- *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]
使用聚合窗口函数的执行计划
EXPLAIN EXTENDED
SELECT salesperson,sale_date,sale_amount,SUM(sale_amount) OVER (PARTITION BY salespersonORDER BY sale_dateROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total
FROM sales;
窗口框架类型
1. 行基础框架
-- 前2行到当前行
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW-- 当前行到后1行
ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING-- 所有前行到当前行
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
2. 范围基础框架
-- 金额相差100范围内的行
RANGE BETWEEN 100 PRECEDING AND CURRENT ROW-- 所有前行到当前行
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
性能优化考虑
1. 分区键选择
2. 排序优化
-- 确保排序键上有合适的索引或统计信息
ANALYZE TABLE sales COMPUTE STATISTICS FOR COLUMNS department, sale_amount;
3. 内存管理
-- 调整窗口函数内存配置
SET spark.sql.windowExec.buffer.spill.threshold=4096;
SET spark.sql.windowExec.buffer.in.memory.threshold=1024;
高级窗口函数应用
使用LEAD/LAG进行时间序列分析
SELECT salesperson,sale_date,sale_amount,LAG(sale_amount, 1) OVER (PARTITION BY salesperson ORDER BY sale_date) as prev_sale,LEAD(sale_amount, 1) OVER (PARTITION BY salesperson ORDER BY sale_date) as next_sale
FROM sales;
使用FIRST_VALUE/LAST_VALUE
SELECT department,salesperson,sale_amount,FIRST_VALUE(salesperson) OVER (PARTITION BY department ORDER BY sale_amount DESC) as top_salesperson
FROM sales;
执行计划解析技巧
使用EXPLAIN
分析窗口函数执行计划时关注:
- Exchange: 分区操作,影响shuffle量
- Sort: 排序操作,影响性能
- Window: 窗口函数的具体实现
-- 查看详细的执行计划
EXPLAIN FORMATTED
SELECT department, salesperson, RANK() OVER (PARTITION BY department ORDER BY sale_amount DESC)
FROM sales;
最佳实践
- 合理选择分区键: 避免数据倾斜
- 使用合适的框架: 根据业务需求选择行或范围框架
- 监控内存使用: 窗口操作可能消耗大量内存
- 结合索引优化: 在排序键上建立合适的数据分布
算子部分目前也就想到这么多,后面想起来其他的再单独补充,接下来会准备一些优化方面的文章