import org.apache.spark.{Partitioner, SparkConf, SparkContext}object PartitionCustom {// 分区器决定哪一个元素进入某一个分区// 目标: 把10个分区器,偶数分在第一个分区,奇数分在第二个分区// 自定义分区器// 1. 创建一个类继承Partitioner// 2. 重写两个方法// 3. 在创建RDD的时候,partitionBy方法 指定分区器// 创建一个类继承Partitionerclass MyPartitioner extends Partitioner{override def numPartitions: Int = 2 // 两个分区,编号就是:0,1// key - valueoverride def getPartition(key: Any): Int = {if(key.asInstanceOf[Int] % 2 == 0){0}else{1}}}def main(args: Array[String]): Unit = {// 创建SparkContextval conf = new SparkConf().setAppName("PartitionCustom").setMaster("local[*]")val sc = new SparkContext(conf)// 初始数据val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))//val rdd = sc.parallelize(List( (1,1), (2,2))// 自定义分区器使用的前提:数据是key-value类型val rdd1 = rdd.map(num =>(num,num))// 使用自定义分区器val rdd2 = rdd1.partitionBy(new MyPartitioner)// 在分区完成之后的基础上,只保留keyval rdd3 = rdd2.map(t => t._1)rdd3.saveAsTextFile("output6")}
}