区分:union(),coalesce () 和 repartition ()
一、合并的对象:数据 vs 分区
Spark 中需要区分两个概念:
- 数据(Data):RDD 中的元素(如 [1, 2, 3])。
- 分区(Partitions):数据的物理存储单位,分布在集群的不同节点上。
这三个算子的 “合并” 对象不同:
- union():合并数据(将多个 RDD 的元素叠加)。
- coalesce () 和 repartition ():合并分区(调整数据的物理分布)。
二、union ():合并数据(不改变分区)
核心逻辑
- 将多个 RDD 的元素合并成一个新的 RDD。
- 分区数 = 原 RDD 分区数之和,每个 RDD 的分区保持独立。
示例
假设有两个 RDD:
scala
// RDD 1:2个分区
val rdd1 = sc.parallelize(Seq(1, 2, 3), 2) // 分区0: [1, 2], 分区1: [3]// RDD 2:3个分区
val rdd2 = sc.parallelize(Seq(4, 5, 6), 3) // 分区0: [4], 分区1: [5], 分区2: [6]
执行 union ():
scala
val unionRdd = rdd1.union(rdd2) // 共5个分区(2+3)
unionRdd.glom().collect() // 查看分区内容
结果可视化
plaintext
rdd1:分区0 -> [1, 2]分区1 -> [3]rdd2:分区0 -> [4]分区1 -> [5]分区2 -> [6]union_rdd:分区0 -> [1, 2] # rdd1的分区0分区1 -> [3] # rdd1的分区1分区2 -> [4] # rdd2的分区0分区3 -> [5] # rdd2的分区1分区4 -> [6] # rdd2的分区2
关键点
- 数据合并:rdd1 和 rdd2 的元素被放到一起。
- 分区独立:每个 RDD 的分区保持原样,只是简单叠加。
三、coalesce () 和 repartition ():合并分区(调整数据分布)
核心逻辑
- 合并分区:将同一个 RDD 的多个分区物理合并为更少的分区(或通过 shuffle 重新分布)。
- 数据可能重新分布:通过移动数据实现分区合并。
示例:coalesce (2)
假设初始 RDD 有 4 个分区:
scala
执行 coalesce (2)(合并为 2 个分区):
scala
val coalescedRdd = rdd.coalesce(2) // 合并为2个分区
coalescedRdd.glom().collect()
结果可视化
原 rdd:分区0 -> [1, 2]分区1 -> [3, 4]分区2 -> [5, 6]分区3 -> [7, 8]coalesced_rdd(合并相邻分区):分区0 -> [1, 2, 3, 4] # 合并原分区0和1分区1 -> [5, 6, 7, 8] # 合并原分区2和3
repartition () 的区别
如果用 repartition (2):
scala
val repartitionedRdd = rdd.repartition(2) // 重新分区为2个
repartitionedRdd.glom().collect()
结果可视化
repartitioned_rdd(通过shuffle均匀分布):分区0 -> [1, 3, 5, 7] # 数据被打散到新分区分区1 -> [2, 4, 6, 8]
关键点
- 分区合并:将原本分散的分区物理合并为更少的分区。
- 数据移动:coalesce () 尽量不 shuffle(合并相邻分区),而 repartition () 强制 shuffle 以保证数据均匀。
四、对比总结
算子 | 合并对象 | 是否改变分区数 | 数据是否 shuffle | 核心场景 |
---|---|---|---|---|
union() | 多个 RDD 的数据 | 是(叠加原分区数) | 否 | 快速合并多个数据集 |
coalesce() | 同一个 RDD 的分区 | 是(通常减少) | 否(默认) | 减少分区数,避免 shuffle |
repartition() | 同一个 RDD 的分区 | 是(任意调整) | 是 | 彻底重分区,解决数据倾斜 |
五、常见误区解答
1. union () 会合并分区吗?
不会!union () 只是将多个 RDD 的分区简单叠加,分区数等于原 RDD 分区数之和。例如:
scala
val rdd1 = sc.parallelize(Seq(1, 2), 1) // 1个分区
val rdd2 = sc.parallelize(Seq(3, 4), 1) // 1个分区
val unionRdd = rdd1.union(rdd2) // 2个分区(1+1)
2. coalesce () 和 repartition () 的合并有什么不同?
- coalesce():通过合并相邻分区实现,不 shuffle(默认),可能导致数据倾斜。
scala
rdd.coalesce(1) // 合并为1个分区,数据可能集中在一个节点
- repartition():通过 shuffle 重新分布数据,分区更均匀,但开销大。
scala
rdd.repartition(10) // 增加到10个分区,数据被打散
六、一句话总结
- union():多个 RDD 的数据合并(分区数叠加)。
- coalesce()/repartition():同一个 RDD 的分区合并(调整数据分布)。