当前位置: 首页 > news >正文

Spark 基础自定义分区器

(一)什么是分区

【复习提问:RDD的定义是什么?】

在 Spark 里,弹性分布式数据集(RDD)是核心的数据抽象,它是不可变的、可分区的、里面的元素并行计算的集合。

在 Spark 中,分区是指将数据集按照一定的规则划分成多个较小的子集,每个子集可以独立地在不同的计算节点上进行处理,这样可以实现数据的并行处理,提高计算效率。

可以将 Spark 中的分区类比为快递公司处理包裹的过程。假设你有一批包裹要从一个城市发送到另一个城市,快递公司会将这些包裹按照一定的规则进行分区,比如按照收件地址的区域划分。每个分区的包裹会被分配到不同的快递员或运输车辆上进行运输,这些快递员或车辆可以同时出发,并行地将包裹送到不同的区域。这就类似于 Spark 中的分区,每个分区的数据可以在不同的计算节点上同时进行处理,从而加快整个数据处理的速度。

(二)默认分区的情况

从集合创建 RDD(使用 parallelize 方法)

当使用 parallelize 方法从一个集合创建 RDD 时,默认分区数通常取决于集群的配置。

在本地模式下,默认分区数等于本地机器的 CPU 核心数;在集群模式下,默认分区数由 spark.default.parallelism 配置项决定。

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("DefaultPartitionExample").setMaster("local")

val sc = new SparkContext(conf)

val data = Seq(1, 2, 3, 4, 5)

val rdd = sc.parallelize(data)

println(s"默认分区数: ${rdd.partitions.length}")

sc.stop()

2.从外部存储(如文件)创建 RDD(使用 textFile 方法)

当使用 textFile 方法从外部存储(如 HDFS、本地文件系统等)读取文件创建 RDD 时,默认分区数通常由文件的块大小决定。对于 HDFS 文件,默认分区数等于文件的块数。例如,一个 128MB 的文件在 HDFS 上被分成 2 个 64MB 的块,那么创建的 RDD 默认分区数就是 2。

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("DefaultPartitionFileExample").setMaster("local")

val sc = new SparkContext(conf)

// 假设文件存在于本地

val rdd = sc.textFile("path/to/your/file.txt")

println(s"默认分区数: ${rdd.partitions.length}")

sc.stop()

【现场演示,如果文件是一个.gz文件,是一个不可拆分的文件,那么默认分区的数量就会是1】

(三)分区的作用

在 Spark 中,RDD 是数据的集合,它会被划分成多个分区,这些分区可以分布在不同的计算节点上,就像图书馆的书架分布在不同的房间一样。

这样做的好处是什么呢?

并行计算:Spark 能够同时对多个分区的数据进行处理,充分利用集群的计算资源,进而加快作业的执行速度。例如,若一个 RDD 有 10 个分区,且集群有足够的计算资源,Spark 就可以同时处理这 10 个分区的数据。

数据局部性:分区有助于实现数据局部性,也就是让计算尽量在数据所在的节点上进行,减少数据在网络间的传输,从而降低网络开销。

容错性:当某个分区的数据处理失败时,Spark 能够重新计算该分区,而不需要重新计算整个 RDD。

当使用savaAsTextFile做保存操作时,最终生成的文件个数通常和RDD的分区数一致。

object PartitionExample {

  def main(args: Array[String]): Unit = {

    // 创建 SparkConf 对象,设置应用程序名称和运行模式

    val conf = new SparkConf().setAppName("PartitionExample").setMaster("local")

    // 使用 SparkConf 创建 SparkContext 对象

    val sc = new SparkContext(conf)

    // 创建一个包含 10 个元素的 Seq

    val data = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    // 使用 parallelize 方法创建 RDD,并设置分区数为 3

    val rdd = sc.parallelize(data, 3)

    // 将 RDD 保存为文本文件,保存路径为 "output"

    rdd.saveAsTextFile("output")

    // 停止 SparkContext,释放资源

    sc.stop()

  }

}  

在运行代码后,output 目录下会生成与 RDD 分区数量相同的文本文件,这里 RDD 分区数设置为 3,所以会生成 3 个文件,文件名通常为 part-00000、part- 00001、part-00002 。

(四)分区器和默认分区器

分区器是 Spark 中用于决定 RDD 数据如何在不同分区之间进行分布的组件。通过定义分区规则,它能够将具有键值对类型的数据(PairRDD)按照一定策略划分到不同分区,以实现数据的合理分布,进而提高并行计算的效率。

在大多数涉及键值对的转换操作中,Spark 默认使用 HashPartitioner。例如,reduceByKey、groupByKey 等操作,如果没有显式指定分区器,就会使用 HashPartitioner。

HashPartitioner 根据键的哈希值来决定数据应该被分配到哪个分区。具体来说,它会对键的哈希值取模,模的结果就是分区的编号。假设分区数为 n,键为 key,则分区编号的计算公式为 hash(key) % n。

对于键值对 RDD,HashPartitioner 是大多数转换操作的默认分区器,而 RangePartitioner 是 sortByKey 操作的默认分区器。你也可以根据具体需求显式指定分区器来控制数据的分区方式。

(五)为什么需要自定义分区

数据倾斜:当数据分布不均匀,某些分区数据量过大,导致计算负载不均衡时,可自定义分区器,按照特定规则重新分配数据,避免数据倾斜影响计算性能。比如电商订单数据中,按地区统计销售额,若某些热门地区订单数远多于其他地区,使用默认分区器会使部分任务计算量过大。通过自定义分区器,可将热门地区进一步细分,让各分区数据量更均衡。

特定业务逻辑:若业务对数据分区有特殊要求,如按时间段将日志数据分区,不同时间段的数据存到不同分区便于后续处理分析;或在社交网络数据中,按用户关系紧密程度分区等,都需自定义分区器实现。

(六)自定义分区器的实现步骤

自定义分区器需要:继承Partitioner抽象类 + 实现其中的两个方法。

numPartitions :返回分区的数量,即整个 RDD 将被划分成多少个分区 。

getPartition(key: Any) :接收一个键值key(对于非键值对类型 RDD,可根据数据特征构造合适的键 ),根据自定义逻辑返回该键值对应的分区索引(从 0 开始,取值范围为 0 到numPartitions - 1 ) 。

(七)案例

假设要对 NBA 球队比赛信息进行分区存储,要求将湖人、火箭两队信息单独存储,其余球队信息存放在一个分区。

("勇士", "info1"),

("掘金", "info2"),

("湖人", "info3"),

("火箭", "info4")

示例代码如下:

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object CustomPartitionerExample {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("CustomPartitionerExample").setMaster("local[*]")

    val sc = new SparkContext(conf)

    // 准备数据集,数据为(球队名称, 相关信息)形式的键值对

    val rdd = sc.parallelize(List(

      ("勇士", "info1"),

      ("掘金", "info2"),

      ("湖人", "info3"),

      ("火箭", "info4")

    ))

    // 使用自定义分区器对RDD进行分区

    val partitionedRDD = rdd.partitionBy(new MyPartitioner)

    partitionedRDD.saveAsTextFile("output")

    sc.stop()

  }

}

// 自定义分区器类

class MyPartitioner extends Partitioner {

  // 定义分区数量为3

  override def numPartitions: Int = 3

  // 根据球队名称(键值)确定分区索引

  override def getPartition(key: Any): Int = {

    key match {

      case "湖人" => 0

      case "火箭" => 1

      case _ => 2

    }

  }

}

核心代码解释:

MyPartitioner类继承自Partitioner,实现了numPartitions方法指定分区数量为 3 ,实现getPartition方法,根据球队名称判断分区索引,湖人对应分区 0,火箭对应分区 1,其他球队对应分区 2 。

2.在main方法中,创建包含球队信息的 RDD,然后调用partitionBy方法并传入自定义分区器MyPartitioner,对 RDD 进行分区,最后将分区后的数据保存到指定路径。

相关文章:

  • Redis的主从架构
  • Node.js 实战六:日志系统设计 —— 不只是 console.log,而是可追溯的行为记录链
  • 单目测距和双目测距 bev 3D车道线
  • 常见面试题:Webpack的构建流程简单说一下。
  • iOS 内存分区
  • 报错System.BadImageFormatException:“试图加载格式不正确的程序。 (异常来自 HRESULT:0x8007000B)”
  • 滑动窗口算法详解与C++实现
  • 蓝桥杯1140 最小质因子之和(Hard Version)
  • 深入理解位图(Bit - set):概念、实现与应用
  • 蓝桥杯19681 01背包
  • Web开发-JavaEE应用SpringBoot栈SnakeYaml反序列化链JARWAR构建打包
  • linux本地部署ollama+deepseek过程
  • 职场方法论总结(4)-如何正确地汇报
  • 使用Python制作Lorenz吸引子的轨道生成视频
  • 《云端共生体:Flutter与AR Cloud如何改写社交交互规则》
  • 数字电子技术基础(六十)——使用Digital软件绘制脉冲触发的触发器
  • C++:static成员
  • 你引入的lodash充分利用了吗?
  • 封装、继承、多态的理解
  • 基于区块链技术的供应链溯源系统:重塑信任与透明度
  • 国际博物馆日|在辽宁省博物馆遇见敦煌
  • 第十届曹禺剧本奖上海揭晓,首次开放个人申报渠道
  • 普京调整俄陆军高层人事任命
  • 回望星河深处,唤醒文物记忆——读《发现武王墩》
  • 制造四十余年血腥冲突后,库尔德工人党为何自行解散?
  • 4月份全国企业销售收入同比增长4.3%