当前位置: 首页 > news >正文

Spark自定义分区器-基础


在 Spark 中,RDD(弹性分布式数据集)的分区器决定了数据在各个分区的分布方式。Spark 内置了哈希分区器(Hash Partitioner)和范围分区器(Range Partitioner) ,但在一些特殊场景下,内置分区器无法满足需求,此时就需要自定义分区器。以下是关于 RDD 自定义分区器的详细介绍:

在 Spark 里,弹性分布式数据集(RDD)是核心的数据抽象,它是不可变的、可分区的、里面的元素并行计算的集合。

在 Spark 中,分区是指将数据集按照一定的规则划分成多个较小的子集,每个子集可以独立地在不同的计算节点上进行处理,这样可以实现数据的并行处理,提高计算效率。

可以将 Spark 中的分区类比为快递公司处理包裹的过程。假设你有一批包裹要从一个城市发送到另一个城市,快递公司会将这些包裹按照一定的规则进行分区,比如按照收件地址的区域划分。每个分区的包裹会被分配到不同的快递员或运输车辆上进行运输,这些快递员或车辆可以同时出发,并行地将包裹送到不同的区域。这就类似于 Spark 中的分区,每个分区的数据可以在不同的计算节点上同时进行处理,从而加快整个数据处理的速度。

默认分区的情况

  1. 从集合创建 RDD(使用 parallelize 方法)

当使用 parallelize 方法从一个集合创建 RDD 时,默认分区数通常取决于集群的配置。

在本地模式下,默认分区数等于本地机器的 CPU 核心数;在集群模式下,默认分区数由 spark.default.parallelism 配置项决定。

2.从外部存储(如文件)创建 RDD(使用 textFile 方法)

当使用 textFile 方法从外部存储(如 HDFS、本地文件系统等)读取文件创建 RDD 时,默认分区数通常由文件的块大小决定。对于 HDFS 文件,默认分区数等于文件的块数。例如,一个 128MB 的文件在 HDFS 上被分成 2 个 64MB 的块,那么创建的 RDD 默认分区数就是 2。

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
//分区器决定哪一个元素进入某一个分区!
//目标:把10个数分区,偶数分在第一个分区,奇数分在第二个分区
//自定义分区器
//1.创建一个类继承Partitioner
//2.重写两个方法
//3.在创建RDD的时候,partitionBy方法,指定分区器//创建一个类继承Partitioner
class MyPartitioner extends Partitioner{override def numPartitions: Int = 2//两个分区,编号就是:0,1
//key -valueoverride def getPartition(key: Any): Int = {if(key.asInstanceOf[Int]%2==0){0}else{1}}
}
object PartitionCustom {def main(args: Array[String]): Unit = {//创建SparkContextval conf = new SparkConf().setAppName("Partition").setMaster("local[*]")val sc = new SparkContext(conf)//初始数据val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
//val rdd = sc.parallelize(List((1,1),(2,2)))//自定义分区器使用的前提:数据是key-value类型val rdd1 = rdd.map(num => (num,num))
//使用自定义分区器val rdd2 = rdd1.partitionBy(new MyPartitioner)//在分区完成之后的基础上,只保留keyval rdd3 = rdd2.map(x => x._1)rdd3.saveAsTextFile("output3")}}

在 Spark 中,RDD 是数据的集合,它会被划分成多个分区,这些分区可以分布在不同的计算节点上,就像图书馆的书架分布在不同的房间一样。

这样做的好处是什么呢?

并行计算:Spark 能够同时对多个分区的数据进行处理,充分利用集群的计算资源,进而加快作业的执行速度。例如,若一个 RDD 有 10 个分区,且集群有足够的计算资源,Spark 就可以同时处理这 10 个分区的数据。

数据局部性:分区有助于实现数据局部性,也就是让计算尽量在数据所在的节点上进行,减少数据在网络间的传输,从而降低网络开销。

容错性:当某个分区的数据处理失败时,Spark 能够重新计算该分区,而不需要重新计算整个 RDD。

  • 分区器和默认分区器
  • 分区器是 Spark 中用于决定 RDD 数据如何在不同分区之间进行分布的组件。通过定义分区规则,它能够将具有键值对类型的数据(PairRDD)按照一定策略划分到不同分区,以实现数据的合理分布,进而提高并行计算的效率。

    在大多数涉及键值对的转换操作中,Spark 默认使用 HashPartitioner。例如,reduceByKey、groupByKey 等操作,如果没有显式指定分区器,就会使用 HashPartitioner

    HashPartitioner 根据键的哈希值来决定数据应该被分配到哪个分区。具体来说,它会对键的哈希值取模,模的结果就是分区的编号。假设分区数为 n,键为 key,则分区编号的计算公式为 hash(key) % n。

    对于键值对 RDD,HashPartitioner 是大多数转换操作的默认分区器,而 RangePartitioner 是 sortByKey 操作的默认分区器。你也可以根据具体需求显式指定分区器来控制数据的分区方式。

    • 为什么需要自定义分区
    • 【提问:如果我们希望在分区的时候,把偶数放在一个区,奇数放在另一个区,应该怎么办?】

      数据倾斜:当数据分布不均匀,某些分区数据量过大,导致计算负载不均衡时,可自定义分区器,按照特定规则重新分配数据,避免数据倾斜影响计算性能。比如电商订单数据中,按地区统计销售额,若某些热门地区订单数远多于其他地区,使用默认分区器会使部分任务计算量过大。通过自定义分区器,可将热门地区进一步细分,让各分区数据量更均衡。

      特定业务逻辑:若业务对数据分区有特殊要求,如按时间段将日志数据分区,不同时间段的数据存到不同分区便于后续处理分析;或在社交网络数据中,按用户关系紧密程度分区等,都需自定义分区器实现。

      自定义分区器的实现步骤

      自定义分区器需要:继承Partitioner抽象类 + 实现其中的两个方法。

      1. numPartitions :返回分区的数量,即整个 RDD 将被划分成多少个分区 。
      2. getPartition(key: Any) :接收一个键值key(对于非键值对类型 RDD,可根据数据特征构造合适的键 ),根据自定义逻辑返回该键值对应的分区索引(从 0 开始,取值范围为 0 到numPartitions - 1 ) 
      import org.apache.spark.{Partitioner, SparkConf, SparkContext}//定义Order类:id   price   info
      case class Order(id: Int, price: Double, info: String) {override def toString: String = s"$id, $price, $info"
      }//定义分区器
      class OrderPartitioner extends Partitioner{override def numPartitions: Int = 3override def getPartition(key: Any): Int = {//0-1000 =>1//1001-2000 =>2//3if(key.asInstanceOf[Int] <= 1000){0}else if(key.asInstanceOf[Int] <= 2000){1}else{2}}
      }
      object PartitonOrder {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("PartitionOrder").setMaster("local[*]")val sc = new SparkContext(conf)//读入data/order创建RDDval orderRDD = sc.textFile("input/order.csv")val rdd1 = orderRDD.map(line => {val fields = line.split(",")val order = Order(fields(0).toInt, fields(1).toDouble, fields(2))(order.id, order)})//自定义分区器val rdd2 = rdd1.partitionBy(new  OrderPartitioner)rdd2.map(x => x._2)saveAsTextFile("output2")//统计各个分区的订单数量及价格总和rdd2.mapPartitions(iter => {
      //定义数量为0var count = 0//定义金额为0var sum = 0.0iter.foreach(x => {sum += x._2.pricecount += 1})Iterator(s"${count}件,${sum}元)")}).saveAsTextFile("output4")}}
    1. MyPartitioner类继承自Partitioner,实现了numPartitions方法指定分区数量为 3 ,实现getPartition方法,根据球队名称判断分区索引,湖人对应分区 0,火箭对应分区 1,其他球队对应分区 2 。

    2.在main方法中,创建包含球队信息的 RDD,然后调用partitionBy方法并传入自定义分区器MyPartitioner,对 RDD 进行分区,最后将分区后的数据保存到指定路径。

     

    相关文章:

  • 订单服务拆分库表迁移实践
  • 杰理-701-手表sdk无法电脑连接经典蓝牙
  • calico.yaml+国内源
  • 《Effective Python》第2章 字符串和切片操作——深入理解Python 中的字符数据类型(bytes 与 str)的差异
  • Day1 时间复杂度
  • 【深度学习-Day 10】机器学习基石:从零入门线性回归与逻辑回归
  • 云共享虚拟主机具体是指什么?
  • “追光”植物背后的故事(二)
  • SpringBoot--springboot简述及快速入门
  • 基于 PLC 的轮式服务机器人研究
  • 医疗实时操作系统方案:手术机器人的微秒级运动控制
  • 【Hot 100】208. 实现 Trie (前缀树)
  • 基于C#+MySQL实现(WinForm)企业设备使用信息管理系统
  • niushop单商户V5多门店版V5.5.0全插件+商品称重、商家手机端+搭建环境教程
  • 从数据中台到数据飞轮:数字化转型的演进之路
  • python如何做人脸识别
  • Docker与PostgreSQL
  • 国联股份卫多多与七腾机器人签署战略合作协议
  • 基于Spring Boot+Layui构建企业级电子招投标系统实战指南
  • 工业巡检机器人 —— 机器人市场的新兴增长引擎
  • “11+2”复式票,宝山购彩者领走大乐透1170万头奖
  • 再获殊荣!IP SH跻身上海文化品牌全球传播力TOP 6
  • 尹锡悦涉嫌发动内乱案举行第三次庭审
  • 重庆三峡学院回应“85万元中标设备,网购价不到300元”:已着手解决
  • 母亲节书单|关于生育自由的未来
  • 呼和浩特推进新一轮国企重组整合:杜绝一项目一公司、一业务一公司