大数据Spark(六十八):Transformation转换算子所有Join操作和union
文章目录
Transformation转换算子所有Join操作和union
一、join/leftOuterJoin/rightOuterJoin/fullOuterJoin
二、union
Transformation转换算子所有Join操作和union
一、join/leftOuterJoin/rightOuterJoin/fullOuterJoin
join、leftOuterJoin、rightOuterJoin 和 fullOuterJoin 是用于对两个K,V格式 RDD 进行连接操作的转换算子,这些算子根据键(key)将两个RDD 进行合并,从而实现类似于关系型数据库中的连接操作。
注意:所有Join操作join后生成RDD的分区数与父RDD分区数多的那一个相同。
Java代码:
SparkConf conf = new SparkConf().setMaster("local").setAppName("JoinTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaPairRDD<String, Integer> personRDD = sc.<String, Integer>parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("zhangsan", 18),new Tuple2<String, Integer>("lisi", 19),new Tuple2<String, Integer>("wangwu", 20),new Tuple2<String, Integer>("maliu", 21)
),3);JavaPairRDD<String, Integer> scoreRDD = sc.<String, Integer>parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("zhangsan", 90),new Tuple2<String, Integer>("lisi", 80),new Tuple2<String, Integer>("wangwu", 70),new Tuple2<String, Integer>("tianqi", 60)
),4);//join算子:对两个RDD进行join操作,返回一个新的RDD,
// 新RDD中的元素是元组,元组的第一个元素是key,第二个元素是一个元组,元组的第一个元素是第一个RDD中的value,第二个元素是第二个RDD中的value
JavaPairRDD<String, Tuple2<Integer, Integer>> joinRDD = personRDD.join(scoreRDD);
System.out.println("JoinRDD 分区数:" + joinRDD.getNumPartitions());
joinRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Integer>> tp) throws Exception {System.out.println(tp);}
});
/*** 结果:* (zhangsan,(18,90))* (lisi,(19,80))* (wangwu,(20,70))*///leftOuterJoin算子:对两个RDD进行leftOuterJoin操作,返回一个新的RDD,
// 新RDD中的元素是元组,元组的第一个元素是key,第二个元素是一个元组,
// 元组的第一个元素是第一个RDD中的value,第二个元素是第二个RDD中的value
JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> leftOuterJoinRDD = personRDD.leftOuterJoin(scoreRDD);
System.out.println("leftOuterJoinRDD 分区数:" + leftOuterJoinRDD.getNumPartitions());
leftOuterJoinRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Optional<Integer>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Optional<Integer>>> tp) throws Exception {String name = tp._1;Integer age = tp._2._1;Optional<Integer> optionalScore = tp._2._2;if(optionalScore.isPresent()){//判断是否有值Integer score = optionalScore.get();System.out.println(name + "," + age + "," + score);} else {System.out.println(name + "," + age + "," + "null");}
////如果第二个RDD中没有对应的value,则返回默认值0
//Integer score = tp._2._2.orElse(0);
//System.out.println(name + "," + age + "," + score);}
});
/*** 结果:* zhangsan,18,90* wangwu,20,70* maliu,21,0* lisi,19,80*///rightOuterJoin算子:对两个RDD进行rightOuterJoin操作,返回一个新的RDD,
// 新RDD中的元素是元组,元组的第一个元素是key,第二个元素是一个元组,
// 元组的第一个元素是第一个RDD中的value,第二个元素是第二个RDD中的value
JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> rightOuterJoinRDD = personRDD.rightOuterJoin(scoreRDD);
System.out.println("rightOuterJoinRDD 分区数:" + rightOuterJoinRDD.getNumPartitions());
rightOuterJoinRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Optional<Integer>, Integer>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Optional<Integer>, Integer>> tp) throws Exception {String name = tp._1;//如果第一个RDD中没有对应的value,则返回默认值0Integer age = tp._2._1.orElse(0);Integer score = tp._2._2;System.out.println(name + "," + age + "," + score);}
});/*** 结果:* zhangsan,18,90* wangwu,20,70* lisi,19,80* tianqi,0,60*///fullOuterJoin算子:对两个RDD进行fullOuterJoin操作,返回一个新的RDD,
// 新RDD中的元素是元组,元组的第一个元素是key,第二个元素是一个元组,
// 元组的第一个元素是第一个RDD中的value,第二个元素是第二个RDD中的value
JavaPairRDD<String, Tuple2<Optional<Integer>, Optional<Integer>>> fullOuterJoinRDD = personRDD.fullOuterJoin(scoreRDD);
System.out.println("fullOuterJoinRDD 分区数:" + fullOuterJoinRDD.getNumPartitions());
fullOuterJoinRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> tp) throws Exception {String name = tp._1;//如果第一个RDD中没有对应的value,则返回默认值0Integer age = tp._2._1.orElse(0);//如果第二个RDD中没有对应的value,则返回默认值0Integer score = tp._2._2.orElse(0);System.out.println(name + "," + age + "," + score);}
});/*** 结果:* zhangsan,18,90* wangwu,20,70* maliu,21,0* lisi,19,80* tianqi,0,60*/sc.stop();
Scala代码:
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("JoinTest")
val sc = new SparkContext(conf)val personRDD: RDD[(String, Int)] = sc.parallelize(List(("zhangsan", 18), ("lisi", 19), ("wangwu", 20), ("maliu", 21)),3)
val scoreRDD: RDD[(String, Int)] = sc.parallelize(List(("zhangsan", 90), ("lisi", 80), ("wangwu", 70), ("tianqi", 60)),2)//join: 两个RDD中key相同的元素进行连接
val joinRDD: RDD[(String, (Int, Int))] = personRDD.join(scoreRDD)
println(s"joinRDD 分区数:${joinRDD.getNumPartitions}")
joinRDD.foreach(println)//leftOuterJoin: 左外连接,以左边的RDD为主,右边的RDD中key相同的元素进行连接,左边RDD中key没有的元素,右边RDD中key对应的value为None
val leftOuterJoinRDD: RDD[(String, (Int, Option[Int]))] = personRDD.leftOuterJoin(scoreRDD)
println(s"leftOuterJoinRDD 分区数:${leftOuterJoinRDD.getNumPartitions}")
leftOuterJoinRDD.foreach(println)//rightOuterJoin: 右外连接,以右边的RDD为主,左边的RDD中key相同的元素进行连接,右边RDD中key没有的元素,左边RDD中key对应的value为None
val rightOuterJoinRDD: RDD[(String, (Option[Int], Int))] = personRDD.rightOuterJoin(scoreRDD)
println(s"rightOuterJoinRDD 分区数:${rightOuterJoinRDD.getNumPartitions}")
rightOuterJoinRDD.foreach(println)//fullOuterJoin: 全外连接,以两个RDD中的key进行连接,两个RDD中key没有的元素,对应的value为None
val fullOuterJoinRDD: RDD[(String, (Option[Int], Option[Int]))] = personRDD.fullOuterJoin(scoreRDD)
println(s"fullOuterJoinRDD 分区数:${fullOuterJoinRDD.getNumPartitions}")
fullOuterJoinRDD.foreach(println)sc.stop()
二、union
合并两个RDD数据,两个RDD数据集的类型要一致,不会对数据进行去重。
注意:返回新的RDD的分区数是两个合并RDD分区数的总和。
Java代码:
SparkConf conf = new SparkConf().setMaster("local").setAppName("UnionTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 3);
JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(6, 7, 8, 9, 10), 4);//union算子:对两个RDD进行union操作,返回一个新的RDD,RDD的分区数是两个RDD的分区数的总和
JavaRDD<Integer> rdd3 = rdd1.union(rdd2);
System.out.println("rdd3 分区数:" + rdd3.getNumPartitions());rdd3.foreach(x-> System.out.println(x));
sc.stop();
Scala代码:
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("UnionTest")
val sc = new SparkContext(conf)val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5),4)
val rdd2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10), 3)//union: 两个RDD中的元素进行合并,合并后的RDD的分区数为两个RDD的分区数之和
val rdd3: RDD[Int] = rdd1.union(rdd2)
println(s"rdd3 分区数:${rdd3.getNumPartitions}")
rdd3.foreach(println)sc.stop()
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨