大数据Spark(七十):Transformation转换算子cogroup和zip使用案例

文章目录
Transformation转换算子cogroup和zip使用案例
一、cogroup使用案例
二、zip使用案例
Transformation转换算子cogroup和zip使用案例
一、cogroup使用案例
作用到K,V格式的两个RDD上,如两个RDD类型为(K,V)和(K,W)格式数据,返回一个数据集RDD(K,(Iterable,Iterable))。
注意:子RDD的分区与父RDD多的一致。
Java代码:
SparkConf conf = new SparkConf().setMaster("local").setAppName("CogroupTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaPairRDD<String, Integer> personRDD = sc.<String, Integer>parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("zhangsan", 18),new Tuple2<String, Integer>("zhangsan", 180),new Tuple2<String, Integer>("lisi", 19),new Tuple2<String, Integer>("wangwu", 20),new Tuple2<String, Integer>("wangwu", 200),new Tuple2<String, Integer>("maliu", 21)
),3);JavaPairRDD<String, Integer> scoreRDD = sc.<String, Integer>parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("zhangsan", 90),new Tuple2<String, Integer>("zhangsan", 900),new Tuple2<String, Integer>("lisi", 80),new Tuple2<String, Integer>("lisi", 800),new Tuple2<String, Integer>("wangwu", 70),new Tuple2<String, Integer>("wangwu", 700),new Tuple2<String, Integer>("tianqi", 60),new Tuple2<String, Integer>("tianqi", 600)
),4);JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupRDD = personRDD.cogroup(scoreRDD);
System.out.println("cogroupRDD 分区数:" + cogroupRDD.getNumPartitions());cogroupRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> tp) throws Exception {String name = tp._1;Iterable<Integer> ages = tp._2._1;Iterable<Integer> scores = tp._2._2;System.out.println("name: " + name+" ages: " + ages + " scores: " + scores);}
});sc.stop();
Scala代码:
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("CogroupTest")
val sc = new SparkContext(conf)val personRDD: RDD[(String, Int)] = sc.parallelize(List(("zhangsan", 18),("zhangsan", 180),("lisi", 19),("lisi", 190),("wangwu", 20),("wangwu", 200),("maliu", 21)),3)val scoreRDD: RDD[(String, Int)] = sc.parallelize(List(("zhangsan", 90),("zhangsan", 900),("lisi", 80),("lisi", 800),("wangwu", 70),("wangwu", 700),("tianqi", 60)),2)//cogroup: 两个RDD中key相同的元素进行连接
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = personRDD.cogroup(scoreRDD)
println(s"cogroupRDD 分区数:${cogroupRDD.getNumPartitions}")
cogroupRDD.foreach(tp=>{val name: String = tp._1val ageList: List[Int] = tp._2._1.toListval scoreList: List[Int] = tp._2._2.toListprintln(s"name:${name}, ageList:${ageList}, scoreList:${scoreList}")
})sc.stop()
二、zip使用案例
将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。
Java代码:
SparkConf conf = new SparkConf().setMaster("local").setAppName("zipTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c", "d"));
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("e", "f", "g", "h"));
//zip: 将两个RDD中的元素按照位置一一对应,返回一个新的RDD
JavaPairRDD<String, String> zip = rdd1.zip(rdd2);
zip.foreach(tp-> System.out.println(tp));sc.stop();
Scala代码:
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("zipTest")
val sc = new SparkContext(conf)val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5))
val rdd2: RDD[String] = sc.parallelize(Array("a", "b", "c", "d", "e"))rdd1.zip(rdd2).foreach(println)sc.stop()
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
