当前位置: 首页 > news >正文

Spark专题-第二部分:Spark SQL 入门(4)-算子介绍-Exchange

第二部分:Spark SQL 入门(4)-算子介绍-Exchange

本来没想这么快引入这个算子的,但写完上一篇Aggregate后发现很自然的引出了Exchange,那就顺手带出来吧

1. Exchange算子简介

Exchange算子是Spark SQL中负责数据重分布(Data Redistribution)的关键算子,它实现了Spark的shuffle操作。当数据需要在不同节点间重新分区时,Exchange算子会被引入到执行计划中。它本质上是Spark分布式计算中数据交换的基础,负责将数据按照特定规则重新组织,以便后续操作能够高效执行。

2. Exchange算子的类型

Spark SQL中的Exchange算子主要有以下几种类型:

2.1 HashPartitioning (哈希分区)

  • 描述:根据指定的列计算哈希值,然后按照哈希值将数据分配到不同的分区。
  • 适用场景GROUP BYJOINDISTINCT等需要按特定键进行聚合或连接的场景。
  • 优点
    • 分布相对均匀(在数据倾斜不严重的情况下)
    • 实现简单高效
    • 适合大多数聚合和连接操作
  • 缺点
    • 容易产生数据倾斜(当某些键的值特别多时)
    • 哈希冲突可能导致分布不均

2.2 RangePartitioning (范围分区)

  • 描述:根据指定的列和排序规则,将数据划分为连续的范围分配到不同分区。
  • 适用场景:需要全局排序的操作,如ORDER BY、窗口函数等。
  • 优点
    • 保持数据的全局有序性
    • 适合范围查询和排序操作
  • 缺点
    • 需要采样来确定范围边界,有额外开销
    • 边界确定不准确可能导致数据倾斜

2.3 SinglePartition (单分区)

  • 描述:将所有数据收集到单个分区中。
  • 适用场景:全局聚合、小数据量的最终结果收集。
  • 优点
    • 简单直接
    • 适合小数据量的最终操作
  • 缺点
    • 容易成为性能瓶颈(单节点处理所有数据)
    • 不适合大数据集

2.4 BroadcastExchange (广播交换)

  • 描述:将小数据集广播到所有工作节点。
  • 适用场景:广播Join、小表广播。
  • 优点
    • 避免大数据集的shuffle
    • 显著提高Join性能
  • 缺点
    • 只适用于小数据集(受广播大小限制)
    • 广播数据占用所有节点的内存

这一部分会在join详细介绍,毕竟使用广播能很好的做性能优化

2.5 RoundRobinPartitioning (轮询分区)

  • 描述:以轮询方式均匀分配数据到各个分区。
  • 适用场景:需要均匀分布数据但不需要特定排序的场景。
  • 优点
    • 数据分布均匀
    • 避免数据倾斜
  • 缺点
    • 破坏数据的原有顺序
    • 不适合需要按键聚合的场景

3. 不同类型Exchange的优缺点比较

类型优点缺点适用场景
HashPartitioning分布相对均匀,实现简单容易数据倾斜,哈希冲突GROUP BY, JOIN
RangePartitioning保持全局有序性需要采样,边界确定复杂ORDER BY, 窗口函数
SinglePartition简单直接单点瓶颈,性能差全局聚合,结果收集
BroadcastExchange避免shuffle,性能高只适合小数据集广播Join,小表广播
RoundRobinPartitioning分布均匀,避免倾斜破坏数据顺序数据重分布,均匀分配

4. Exchange算子在SQL中的出现场景

4.1 聚合操作(GROUP BY)

-- 会出现HashPartitioning Exchange
SELECT department, AVG(salary) 
FROM employees 
GROUP BY department;
Scan employees表
Partial Aggregate
计算每个分区的局部平均值
Exchange
HashPartitioning
按department哈希分区
Final Aggregate
合并各分区的聚合结果
输出最终结果

4.2 表连接(JOIN)

-- 会出现HashPartitioning Exchange
SELECT * 
FROM orders o 
JOIN customers c ON o.customer_id = c.customer_id;-- 如果customers表很小,会出现BroadcastExchange
SELECT /*+ BROADCAST(c) */ * 
FROM orders o 
JOIN customers c ON o.customer_id = c.customer_id;

4.3 排序操作(ORDER BY)

-- 会出现RangePartitioning Exchange
SELECT * 
FROM products 
ORDER BY price DESC;

4.4 去重操作(DISTINCT)

-- 会出现HashPartitioning Exchange
SELECT DISTINCT category 
FROM products;

4.5 窗口函数

-- 会出现RangePartitioning Exchange
SELECT name, department, salary,RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees;

5. Exchange算子的执行流程

Exchange详细过程
按分区写入临时文件
数据序列化
生成索引文件
Driver协调数据传输
Executor间网络传输
读取索引文件
获取对应分区数据
数据反序列化
开始Shuffle操作
Map端处理
数据按分区规则写入磁盘
Exchange传输
网络数据传输
Reduce端处理
从各个节点读取数据
后续操作
聚合/连接等处理

6. 性能优化建议

6.1 避免不必要的Exchange

-- 不好的写法:多次聚合导致多次Exchange
SELECT COUNT(*) FROM (SELECT DISTINCT category FROM products);-- 好的写法:一次完成
SELECT COUNT(DISTINCT category) FROM products;

6.2 合理设置分区数

-- 设置合适的shuffle分区数
SET spark.sql.shuffle.partitions=200;  -- 根据数据量调整-- 对于数据倾斜场景,可以使用salting技术
SELECT category, COUNT(*) 
FROM (SELECT category, CONCAT(cast(rand()*10 as int), '_') as salt FROM products
) 
GROUP BY category, salt;

6.3 利用广播避免Exchange

-- 自动广播(当小表小于spark.sql.autoBroadcastJoinThreshold时)
SELECT * FROM orders o JOIN small_table s ON o.id = s.id;-- 手动提示广播
SELECT /*+ BROADCAST(s) */ * 
FROM orders o 
JOIN small_table s ON o.id = s.id;

6.4 监控Exchange性能

通过Spark UI监控shuffle的读写大小、耗时等指标,识别性能瓶颈:

45%25%15%10%5%Shuffle性能问题分布数据倾斜分区数不合理网络瓶颈序列化效率低其他

7. 实际案例演示

7.1 查看执行计划

EXPLAIN EXTENDED
SELECT department, AVG(salary) as avg_salary
FROM employees
WHERE hire_date > '2020-01-01'
GROUP BY department
HAVING AVG(salary) > 5000
ORDER BY avg_salary DESC;

执行计划分析:

== Physical Plan ==
*(3) Sort [avg_salary#x DESC NULLS LAST], true, 0
+- *(3) Filter (avg_salary#x > 5000.0)
+- *(3) HashAggregate(keys=[department#x], functions=[avg(salary#y)])
+- Exchange hashpartitioning(department#x, 200) <-- HashPartitioning Exchange
+- *(2) HashAggregate(keys=[department#x], functions=[partial_avg(salary#y)])
+- *(2) Project [department#x, salary#y]
+- *(2) Filter (hire_date#z > 2020-01-01)
+- *(1) Scan parquet employees

7.2 执行流程说明

  1. Scan阶段:读取employees表数据
  2. Filter阶段:过滤hire_date > '2020-01-01’的记录
  3. Partial Aggregate阶段:在每个分区上计算部门的局部平均工资
  4. Exchange阶段:按department进行哈希分区,将相同部门的数据发送到同一节点
  5. Final Aggregate阶段:计算各部门的全局平均工资
  6. Filter阶段:过滤平均工资 > 5000的部门
  7. Sort阶段:按平均工资降序排序(可能触发RangePartitioning Exchange)

8. 总结

Exchange算子是Spark SQL中至关重要的shuffle操作实现,理解其不同类型和适用场景对于编写高效的Spark SQL查询至关重要。通过合理选择Exchange策略、优化分区数和避免不必要的数据移动,可以显著提升Spark作业的性能。

在实际应用中,应该:

  1. 尽量避免或减少Exchange操作
  2. 根据数据特性选择合适的Exchange类型
  3. 监控shuffle性能指标,及时发现和解决数据倾斜等问题
  4. 利用广播等机制避免不必要的shuffle
http://www.dtcms.com/a/392210.html

相关文章:

  • Spark专题-第二部分:Spark SQL 入门(3)-算子介绍-Aggregate
  • Go基础:Go语言中集合详解(包括:数组、切片、Map、列表等)
  • 《算法闯关指南:优选算法--滑动窗口》--09长度最小的子数串,10无重复字符的最长字串
  • 请卸载xshell,一款国产的终端工具,界面漂亮,功能强大,支持win,mac,linux平台,安全免费
  • 用批处理文件实现Excel和word文件的重造
  • unseping(反序列化漏洞)
  • 麒麟系统 word转为pdf
  • 【Codex CLI 配置指南(小白速通版)】
  • R及RStudio的配置与安装
  • 深度解析:基于 ODBC连接 KingbaseES 数据库的完整操作与实践
  • springboot川剧科普平台(代码+数据库+LW)
  • Vue中的监听方式
  • CentOS 7系统解决yum报错
  • GD32VW553-IOT V2开发版【温湿度检测】
  • Perplexica - 开源AI搜索引擎,让搜索更智能
  • Windows在VSCode Cline中安装Promptx
  • 深入解析 Spring AI 系列:解析返回参数处理
  • LeetCode:34.合并K个升序链表
  • 精细化关键词优化:提升SEO效果的长尾策略解析
  • Go基础:Go语言详细介绍,环境搭建,及第一个程序详解
  • 【开题答辩全过程】以 HL新闻为例,包含答辩的问题和答案
  • docker运行wonderShaper实现网卡限速
  • Windows 安装 Docker Desktop 到 D 盘完整教程(含迁移方案)
  • 基于陌讯AI检测算法本地化部署教程:基于Docker的环境配置与性能测试
  • Docker Docker Compose 完整入门与实用技巧
  • ARP协议工作原理分析(基于Wireshark)
  • CKS-CN 考试知识点分享(14) Istio网络策略
  • TCP 协议全解析:握手、挥手、重传与流控的深度剖析
  • 计算机视觉(opencv)实战二十七——目标跟踪
  • 深度学习中神经网络与损失函数优化