当前位置: 首页 > news >正文

Spark专题-第二部分:Spark SQL 入门(8)-算子介绍-sort

Spark专题-第二部分:Spark SQL 入门(8)-算子介绍-sort

要不是在工作中发现了性能极差的cluster by写法,真把sort算子给忘了,赶紧补上

Sort算子概述

在Spark SQL中,排序操作通过SortExec物理算子实现。不同的SQL排序语法会产生不同的执行计划,涉及不同的数据分布和排序策略。

排序相关SQL语法及对应算子

1. ORDER BY

  • 描述: 全局排序,对整个数据集进行排序
  • 对应算子: SortExec + Exchange(单分区)
  • 执行特点: 需要将所有数据收集到单个Executor进行排序
  • SQL写法:
    SELECT * FROM sales ORDER BY sale_amount DESC;
    

2. SORT BY

  • 描述: 在每个分区内排序,不保证全局有序
  • 对应算子: SortExec(无Exchange)
  • 执行特点: 分区内排序,性能较好但结果不是全局有序
  • SQL写法:
    SELECT * FROM sales SORT BY sale_amount DESC;
    

3. DISTRIBUTE BY

  • 描述: 按指定列重新分布数据,但不排序
  • 对应算子: Exchange(哈希分区)
  • 执行特点: 仅重新分区,不进行排序
  • SQL写法:
    SELECT * FROM sales DISTRIBUTE BY department;
    

4. CLUSTER BY

  • 描述: DISTRIBUTE BY和SORT BY的组合,按相同列分区和排序
  • 对应算子: Exchange + SortExec
  • 执行特点: 先按列分区,然后在每个分区内按同一列排序
  • SQL写法:
    SELECT * FROM sales CLUSTER BY department;
    

物理执行计划对比

ORDER BY执行流程

数据流示例
分区1: Bob-1500, Alice-1000
分区2: Charlie-1200, Alice-800
Exchange收集所有数据到单个分区
单分区: Charlie-1200, Bob-1500, Alice-1000, Alice-800
SortExec按sale_amount DESC排序
结果: Bob-1500, Charlie-1200, Alice-1000, Alice-800
TableScan sales
Exchange: 单分区全局收集
SortExec: 全局排序
Output

SORT BY执行流程

数据流示例
分区1原始: Bob-1500, Alice-1000
分区1排序后: Bob-1500, Alice-1000
分区2原始: Charlie-1200, Alice-800
分区2排序后: Charlie-1200, Alice-800
输出不是全局有序
TableScan sales
SortExec: 分区内排序
Output

DISTRIBUTE BY执行流程

数据分布示例
原始分区
按department重新分区
Tech分区: Alice-1000, Bob-1500, Alice-800, Bob-2000
HR分区: Charlie-1200
分区内无序
TableScan sales
Exchange: 按department哈希分区
Output

CLUSTER BY执行流程

数据流示例
原始数据分布在多个分区
按department哈希分区
Tech分区: Alice-1000, Bob-1500, Alice-800, Bob-2000
Tech分区排序后: Alice-1000, Alice-800, Bob-1500, Bob-2000
HR分区: Charlie-1200
TableScan sales
Exchange: 按department哈希分区
SortExec: 分区内按department排序
Output

实际案例与执行计划分析

案例: 销售数据分析

-- 创建示例表
CREATE TABLE sales (sale_id INT,salesperson STRING,department STRING,sale_amount DOUBLE,sale_date DATE
);-- 插入数据
INSERT INTO sales VALUES
(1, 'Alice', 'Tech', 1000.0, '2023-01-15'),
(2, 'Bob', 'Tech', 1500.0, '2023-01-16'),
(3, 'Alice', 'Tech', 800.0, '2023-02-10'),
(4, 'Charlie', 'HR', 1200.0, '2023-01-20'),
(5, 'Bob', 'Tech', 2000.0, '2023-02-05');

ORDER BY执行计划

EXPLAIN EXTENDED
SELECT * FROM sales ORDER BY sale_amount DESC;

物理执行计划:

== Physical Plan ==
*(2) Sort [sale_amount#10 DESC NULLS LAST], true, 0
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
+- *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]

SORT BY执行计划

EXPLAIN EXTENDED
SELECT * FROM sales SORT BY sale_amount DESC;

物理执行计划:
== Physical Plan ==
*(1) Sort [sale_amount#10 DESC NULLS LAST], false, 0
± *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]

DISTRIBUTE BY + SORT BY执行计划

EXPLAIN EXTENDED
SELECT * FROM sales 
DISTRIBUTE BY department 
SORT BY sale_amount DESC;

物理执行计划:

== Physical Plan ==
*(2) Sort [sale_amount#10 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(department#11, 200), ENSURE_REQUIREMENTS, [id=#15]
+- *(1) Scan ExistingRDD[sale_id#8,salesperson#9,sale_amount#10,department#11,sale_date#12]

性能优化策略

1. 避免全局排序(ORDER BY)

小数据
大数据
需要全局排序?
数据量大小
使用ORDER BY
考虑替代方案
使用SORT BY + 后续处理
使用CLUSTER BY
使用SORT BY

2. 分区策略优化

-- 调整分区数避免数据倾斜
SET spark.sql.shuffle.partitions=100;-- 对于已知数据分布,使用范围分区
SET spark.sql.adaptive.enabled=true;
SET spark.sql.adaptive.coalescePartitions.enabled=true;

3. 内存管理优化

-- 调整排序内存
SET spark.sql.sort.spill.numElementsForceSpillThreshold=1000000;
SET spark.sql.execution.sort.spill.initialMemoryThreshold=100000;-- 启用外部排序
SET spark.sql.sort.enableRadixSort=true;
SET spark.sql.useExternalSort=true;

不同场景下的选择策略

场景1: 生成排序报告(需要全局有序)

-- 小数据量:使用ORDER BY
SELECT * FROM daily_sales ORDER BY sale_amount DESC;-- 大数据量:分阶段处理
CREATE TABLE temp_sorted AS
SELECT * FROM big_sales SORT BY sale_amount DESC;-- 然后对较小结果集进行全局排序(如果需要)
SELECT * FROM temp_sorted ORDER BY sale_amount DESC;

场景2: 为后续聚合操作准备数据

-- 使用CLUSTER BY优化后续的窗口函数
SELECT salesperson,SUM(sale_amount) OVER (PARTITION BY department ORDER BY sale_date) 
FROM sales 
CLUSTER BY department, sale_date;

场景3: 数据重新分布

-- 使用DISTRIBUTE BY优化数据分布
INSERT OVERWRITE TABLE sales_by_dept
SELECT * FROM sales DISTRIBUTE BY department;

执行计划解析技巧

识别排序类型

在物理执行计划中关注:

  • 全局排序: Sort [column ASC/DESC], true, 0 + Exchange SinglePartition
  • 分区内排序: Sort [column ASC/DESC], false, 0
  • 分区排序: Sort [column ASC/DESC], false, 0 + Exchange hashpartitioning

性能瓶颈诊断

-- 查看排序操作的详细统计
EXPLAIN COST
SELECT * FROM sales ORDER BY sale_amount DESC;-- 监控排序过程中的数据倾斜
SET spark.sql.adaptive.skew.enabled=true;
SET spark.sql.adaptive.logLevel=DEBUG;

最佳实践总结

  1. 小数据全局排序: 使用ORDER BY,但注意内存限制
  2. 大数据局部排序: 使用SORT BY或CLUSTER BY
  3. 优化数据分布: 使用DISTRIBUTE BY为后续操作准备数据
  4. 监控内存使用: 排序操作容易导致内存溢出,需要合理配置
  5. 利用自适应查询: 启用Spark的自适应查询执行优化排序过程

这下应该没有遗漏的重要算子了

http://www.dtcms.com/a/414194.html

相关文章:

  • 知识体系_分布式内存计算框架_spark
  • 银行 网站开发 干什么wordpress路径错误
  • QML 语法基础详解
  • ExcelVBA一键生成智能散点趋势图
  • ✨WPF编程基础【1.4】:类型转换器(含示例及源码)
  • 公链分析报告 - 模块化区块链2
  • 数图实战项目(十五-2:第一阶段:从RAW数据到ISP管道,听不懂在说啥?---> 那就盘它):从奥运大屏,到手机小屏,快来挖一挖里面都有什么
  • 网站开发常见面试东莞网站优化关键词推广
  • GauGAN详解与实现
  • Word如何一次性合并多个文档
  • 互联网技术服务优化大师优化项目有
  • 状态管理库 Zustand 的接入流程与注意点
  • 河北网站建设推广电话wordpress网址导航主题
  • NFS 服务器 iSCSI 服务器
  • display this 概念、故障排错及题目
  • whisper-large-v3部署详细步骤,包括cpu和gpu方式,跟着做一次成功
  • 个人用云计算学习笔记 --16(DHCP 服务器)
  • 【Linux】基础IO与文件描述符
  • ​​FFmpeg 教程:从入门到精通,探索多媒体处理的瑞士军刀​
  • 使用ffmpeg8.0的whisper模块语音识别
  • 免费版Markdown 编辑器:Typora
  • 个人建网站有什么好处网站运营需要 做哪些工作
  • MySQL库、表的操作
  • FileProvider 配置必须针对 Android 7.0+(API 24+)做兼容
  • 混合止损策略在加密货币交易中的应用
  • Java模拟实现socket通信
  • iSCSI服务器
  • PyQt5 界面美化:从基础到高级的完整指南
  • 【Linux系列】让 Vim “跑”起来:实现一个会动的进度条
  • 上海商务网站建设wordpress 云相册