Spark专题-第二部分:Spark SQL 入门(5)-算子介绍-Join
第二部分:Spark SQL 入门(5)-算子介绍-Join
前面几篇介绍的算子大多与单表查询相关,但实际工作中少不了多表关联,因此咱们这一篇就来聊一聊join相关的算子
1. Join算子类型及触发条件
1. BroadcastHashJoin
- 描述: 当一张表很小的时候,Spark会将这张表广播到所有Executor节点,然后在每个Executor上使用哈希表进行join操作。这避免了shuffle,效率高。
- 触发条件:
- 小表的大小小于
spark.sql.autoBroadcastJoinThreshold
(默认10MB)。 - 或者使用广播提示(如
BROADCAST
hint)。
- 小表的大小小于
- 对应SQL: 通常是小表和大表的join,例如:
-- 自动触发BroadcastHashJoin SELECT * FROM large_table l JOIN small_table s ON l.key = s.key;-- 使用广播提示 SELECT /*+ BROADCAST(s) */ * FROM large_table l JOIN small_table s ON l.key = s.key;
- Spark SQL写法: 使用DataFrame API时,可以自动触发或手动广播:
# 自动触发 result_df = large_df.join(small_df, "key")# 手动广播 from pyspark.sql.functions import broadcast result_df = large_df.join(broadcast(small_df), "key")
- 执行计划示例: 使用
explain()
输出大致包含:
== Physical Plan ==
*(1) BroadcastHashJoin [key], [key], Inner, BuildRight
:- *(1) Scan large_table
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *(1) Scan small_table
- 执行流程:
- Scan: 读取小表(small_table)数据。
- BroadcastExchange: 将小表数据广播到所有Executor。
- Scan: 读取大表(large_table)数据。
- BroadcastHashJoin: 在每个Executor上,使用广播的小表哈希表与大表数据进行join。
- Output: 输出结果。
2. SortMergeJoin
- 描述: 当两张表都很大时,Spark会对双方进行shuffle和排序,使得相同key的数据分布在同一个分区,然后进行排序后的merge join。这是Spark默认的大表join方式。
- 触发条件:
- 表的大小超过广播阈值。
- 没有使用广播提示或广播不适用。
- 对应SQL: 大表之间的join,例如:
SELECT * FROM large_table1 l1 JOIN large_table2 l2 ON l1.key = l2.key;
- Spark SQL写法: 通常自动触发:
result_df = large_df1.join(large_df2, "key")
- 执行计划示例: 使用
explain()
输出大致包含:
== Physical Plan ==
*(3) SortMergeJoin [key], [key], Inner
:- *(1) Sort [key ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key, 200)
: +- *(1) Scan large_table1
+- *(2) Sort [key ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key, 200)
+- *(2) Scan large_table2
- 执行流程:
- Scan: 读取两个大表的数据。
- Exchange: 对两个表都根据key进行shuffle(hash partitioning),将相同key的数据分配到同一个分区。
- Sort: 对每个分区内的数据按key排序。
- SortMergeJoin: 对排序后的数据进行merge join。
- Output: 输出结果。
3. ShuffledHashJoin
- 描述: 在shuffle后,一方表足够小到可以在内存中构建哈希表时使用。类似于BroadcastHashJoin,但需要shuffle。
- 触发条件:
- 一方表在shuffle后的大小小于
spark.sql.autoBroadcastJoinThreshold
* shuffle分区数。 - 通常较少见,因为SortMergeJoin更稳定。
- 对应SQL: 类似SortMergeJoin,但优化器可能选择此方式如果一方数据较小。
- 执行计划示例: 可能包含
ShuffledHashJoin
算子。
实际案例与完整执行流程
案例: BroadcastHashJoin示例
假设我们有两个表:orders
(大表,包含order_id, user_id, amount) 和 users
(小表,包含user_id, user_name)。我们想根据user_id连接它们。
- Spark SQL代码 (使用PySpark):
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("BroadcastJoinExample").getOrCreate()# 创建示例DataFrame - 假设users表很小
orders_data = [(1, 101, 100), (2, 102, 200), (3, 101, 150)]
users_data = [(101, "Alice"), (102, "Bob")]orders_df = spark.createDataFrame(orders_data, ["order_id", "user_id", "amount"])
users_df = spark.createDataFrame(users_data, ["user_id", "user_name"])# 执行join,自动触发BroadcastHashJoin
result_df = orders_df.join(users_df, "user_id")
result_df.show()# 查看执行计划
result_df.explain()
- 解析的执行计划: 使用
explain()
输出文本计划,但这里用流程图简化表示执行流程:
- 完整执行流程:
- Scan: 读取
users
表(小表)数据。 - BroadcastExchange: 将
users
表数据广播到所有Executor。 - Scan: 读取
orders
表(大表)数据。 - BroadcastHashJoin: 在每个Executor上,使用广播的
users
哈希表与orders
数据进行join。 - Output: 输出结果,包含order_id, user_id, amount, user_name。
配置影响
- 通过
spark.sql.autoBroadcastJoinThreshold
调整广播阈值。 - 使用提示(如
BROADCAST
)强制广播,但存在风险。