Spark专题-第二部分:Spark SQL 入门(4)-算子介绍-Exchange
第二部分:Spark SQL 入门(4)-算子介绍-Exchange
本来没想这么快引入这个算子的,但写完上一篇Aggregate后发现很自然的引出了Exchange,那就顺手带出来吧
1. Exchange算子简介
Exchange
算子是Spark SQL中负责数据重分布(Data Redistribution)的关键算子,它实现了Spark的shuffle操作。当数据需要在不同节点间重新分区时,Exchange算子会被引入到执行计划中。它本质上是Spark分布式计算中数据交换的基础,负责将数据按照特定规则重新组织,以便后续操作能够高效执行。
2. Exchange算子的类型
Spark SQL中的Exchange算子主要有以下几种类型:
2.1 HashPartitioning (哈希分区)
- 描述:根据指定的列计算哈希值,然后按照哈希值将数据分配到不同的分区。
- 适用场景:
GROUP BY
、JOIN
、DISTINCT
等需要按特定键进行聚合或连接的场景。 - 优点:
- 分布相对均匀(在数据倾斜不严重的情况下)
- 实现简单高效
- 适合大多数聚合和连接操作
- 缺点:
- 容易产生数据倾斜(当某些键的值特别多时)
- 哈希冲突可能导致分布不均
2.2 RangePartitioning (范围分区)
- 描述:根据指定的列和排序规则,将数据划分为连续的范围分配到不同分区。
- 适用场景:需要全局排序的操作,如
ORDER BY
、窗口函数等。 - 优点:
- 保持数据的全局有序性
- 适合范围查询和排序操作
- 缺点:
- 需要采样来确定范围边界,有额外开销
- 边界确定不准确可能导致数据倾斜
2.3 SinglePartition (单分区)
- 描述:将所有数据收集到单个分区中。
- 适用场景:全局聚合、小数据量的最终结果收集。
- 优点:
- 简单直接
- 适合小数据量的最终操作
- 缺点:
- 容易成为性能瓶颈(单节点处理所有数据)
- 不适合大数据集
2.4 BroadcastExchange (广播交换)
- 描述:将小数据集广播到所有工作节点。
- 适用场景:广播Join、小表广播。
- 优点:
- 避免大数据集的shuffle
- 显著提高Join性能
- 缺点:
- 只适用于小数据集(受广播大小限制)
- 广播数据占用所有节点的内存
这一部分会在join详细介绍,毕竟使用广播能很好的做性能优化
2.5 RoundRobinPartitioning (轮询分区)
- 描述:以轮询方式均匀分配数据到各个分区。
- 适用场景:需要均匀分布数据但不需要特定排序的场景。
- 优点:
- 数据分布均匀
- 避免数据倾斜
- 缺点:
- 破坏数据的原有顺序
- 不适合需要按键聚合的场景
3. 不同类型Exchange的优缺点比较
类型 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
HashPartitioning | 分布相对均匀,实现简单 | 容易数据倾斜,哈希冲突 | GROUP BY, JOIN |
RangePartitioning | 保持全局有序性 | 需要采样,边界确定复杂 | ORDER BY, 窗口函数 |
SinglePartition | 简单直接 | 单点瓶颈,性能差 | 全局聚合,结果收集 |
BroadcastExchange | 避免shuffle,性能高 | 只适合小数据集 | 广播Join,小表广播 |
RoundRobinPartitioning | 分布均匀,避免倾斜 | 破坏数据顺序 | 数据重分布,均匀分配 |
4. Exchange算子在SQL中的出现场景
4.1 聚合操作(GROUP BY)
-- 会出现HashPartitioning Exchange
SELECT department, AVG(salary)
FROM employees
GROUP BY department;
4.2 表连接(JOIN)
-- 会出现HashPartitioning Exchange
SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id;-- 如果customers表很小,会出现BroadcastExchange
SELECT /*+ BROADCAST(c) */ *
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id;
4.3 排序操作(ORDER BY)
-- 会出现RangePartitioning Exchange
SELECT *
FROM products
ORDER BY price DESC;
4.4 去重操作(DISTINCT)
-- 会出现HashPartitioning Exchange
SELECT DISTINCT category
FROM products;
4.5 窗口函数
-- 会出现RangePartitioning Exchange
SELECT name, department, salary,RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees;
5. Exchange算子的执行流程
6. 性能优化建议
6.1 避免不必要的Exchange
-- 不好的写法:多次聚合导致多次Exchange
SELECT COUNT(*) FROM (SELECT DISTINCT category FROM products);-- 好的写法:一次完成
SELECT COUNT(DISTINCT category) FROM products;
6.2 合理设置分区数
-- 设置合适的shuffle分区数
SET spark.sql.shuffle.partitions=200; -- 根据数据量调整-- 对于数据倾斜场景,可以使用salting技术
SELECT category, COUNT(*)
FROM (SELECT category, CONCAT(cast(rand()*10 as int), '_') as salt FROM products
)
GROUP BY category, salt;
6.3 利用广播避免Exchange
-- 自动广播(当小表小于spark.sql.autoBroadcastJoinThreshold时)
SELECT * FROM orders o JOIN small_table s ON o.id = s.id;-- 手动提示广播
SELECT /*+ BROADCAST(s) */ *
FROM orders o
JOIN small_table s ON o.id = s.id;
6.4 监控Exchange性能
通过Spark UI监控shuffle的读写大小、耗时等指标,识别性能瓶颈:
7. 实际案例演示
7.1 查看执行计划
EXPLAIN EXTENDED
SELECT department, AVG(salary) as avg_salary
FROM employees
WHERE hire_date > '2020-01-01'
GROUP BY department
HAVING AVG(salary) > 5000
ORDER BY avg_salary DESC;
执行计划分析:
== Physical Plan ==
*(3) Sort [avg_salary#x DESC NULLS LAST], true, 0
+- *(3) Filter (avg_salary#x > 5000.0)
+- *(3) HashAggregate(keys=[department#x], functions=[avg(salary#y)])
+- Exchange hashpartitioning(department#x, 200) <-- HashPartitioning Exchange
+- *(2) HashAggregate(keys=[department#x], functions=[partial_avg(salary#y)])
+- *(2) Project [department#x, salary#y]
+- *(2) Filter (hire_date#z > 2020-01-01)
+- *(1) Scan parquet employees
7.2 执行流程说明
- Scan阶段:读取employees表数据
- Filter阶段:过滤hire_date > '2020-01-01’的记录
- Partial Aggregate阶段:在每个分区上计算部门的局部平均工资
- Exchange阶段:按department进行哈希分区,将相同部门的数据发送到同一节点
- Final Aggregate阶段:计算各部门的全局平均工资
- Filter阶段:过滤平均工资 > 5000的部门
- Sort阶段:按平均工资降序排序(可能触发RangePartitioning Exchange)
8. 总结
Exchange算子是Spark SQL中至关重要的shuffle操作实现,理解其不同类型和适用场景对于编写高效的Spark SQL查询至关重要。通过合理选择Exchange策略、优化分区数和避免不必要的数据移动,可以显著提升Spark作业的性能。
在实际应用中,应该:
- 尽量避免或减少Exchange操作
- 根据数据特性选择合适的Exchange类型
- 监控shuffle性能指标,及时发现和解决数据倾斜等问题
- 利用广播等机制避免不必要的shuffle