当前位置: 首页 > news >正文

大数据Spark(六十八):Transformation转换算子所有Join操作和union

文章目录

Transformation转换算子所有Join操作和union

一、join/leftOuterJoin/rightOuterJoin/fullOuterJoin

二、union


Transformation转换算子所有Join操作和union

一、join/leftOuterJoin/rightOuterJoin/fullOuterJoin

join、leftOuterJoin、rightOuterJoin 和 fullOuterJoin 是用于对两个K,V格式 RDD 进行连接操作的转换算子,这些算子根据键(key)将两个RDD 进行合并,从而实现类似于关系型数据库中的连接操作。

注意:所有Join操作join后生成RDD的分区数与父RDD分区数多的那一个相同。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("JoinTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaPairRDD<String, Integer> personRDD = sc.<String, Integer>parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("zhangsan", 18),new Tuple2<String, Integer>("lisi", 19),new Tuple2<String, Integer>("wangwu", 20),new Tuple2<String, Integer>("maliu", 21)
),3);JavaPairRDD<String, Integer> scoreRDD = sc.<String, Integer>parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("zhangsan", 90),new Tuple2<String, Integer>("lisi", 80),new Tuple2<String, Integer>("wangwu", 70),new Tuple2<String, Integer>("tianqi", 60)
),4);//join算子:对两个RDD进行join操作,返回一个新的RDD,
// 新RDD中的元素是元组,元组的第一个元素是key,第二个元素是一个元组,元组的第一个元素是第一个RDD中的value,第二个元素是第二个RDD中的value
JavaPairRDD<String, Tuple2<Integer, Integer>> joinRDD = personRDD.join(scoreRDD);
System.out.println("JoinRDD 分区数:" + joinRDD.getNumPartitions());
joinRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Integer>> tp) throws Exception {System.out.println(tp);}
});
/*** 结果:* (zhangsan,(18,90))* (lisi,(19,80))* (wangwu,(20,70))*///leftOuterJoin算子:对两个RDD进行leftOuterJoin操作,返回一个新的RDD,
// 新RDD中的元素是元组,元组的第一个元素是key,第二个元素是一个元组,
// 元组的第一个元素是第一个RDD中的value,第二个元素是第二个RDD中的value
JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> leftOuterJoinRDD = personRDD.leftOuterJoin(scoreRDD);
System.out.println("leftOuterJoinRDD 分区数:" + leftOuterJoinRDD.getNumPartitions());
leftOuterJoinRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Optional<Integer>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Optional<Integer>>> tp) throws Exception {String name = tp._1;Integer age = tp._2._1;Optional<Integer> optionalScore = tp._2._2;if(optionalScore.isPresent()){//判断是否有值Integer score = optionalScore.get();System.out.println(name + "," + age + "," + score);} else {System.out.println(name + "," + age + "," + "null");}
////如果第二个RDD中没有对应的value,则返回默认值0
//Integer score = tp._2._2.orElse(0);
//System.out.println(name + "," + age + "," + score);}
});
/*** 结果:* zhangsan,18,90* wangwu,20,70* maliu,21,0* lisi,19,80*///rightOuterJoin算子:对两个RDD进行rightOuterJoin操作,返回一个新的RDD,
// 新RDD中的元素是元组,元组的第一个元素是key,第二个元素是一个元组,
// 元组的第一个元素是第一个RDD中的value,第二个元素是第二个RDD中的value
JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> rightOuterJoinRDD = personRDD.rightOuterJoin(scoreRDD);
System.out.println("rightOuterJoinRDD 分区数:" + rightOuterJoinRDD.getNumPartitions());
rightOuterJoinRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Optional<Integer>, Integer>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Optional<Integer>, Integer>> tp) throws Exception {String name = tp._1;//如果第一个RDD中没有对应的value,则返回默认值0Integer age = tp._2._1.orElse(0);Integer score = tp._2._2;System.out.println(name + "," + age + "," + score);}
});/*** 结果:* zhangsan,18,90* wangwu,20,70* lisi,19,80* tianqi,0,60*///fullOuterJoin算子:对两个RDD进行fullOuterJoin操作,返回一个新的RDD,
// 新RDD中的元素是元组,元组的第一个元素是key,第二个元素是一个元组,
// 元组的第一个元素是第一个RDD中的value,第二个元素是第二个RDD中的value
JavaPairRDD<String, Tuple2<Optional<Integer>, Optional<Integer>>> fullOuterJoinRDD = personRDD.fullOuterJoin(scoreRDD);
System.out.println("fullOuterJoinRDD 分区数:" + fullOuterJoinRDD.getNumPartitions());
fullOuterJoinRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> tp) throws Exception {String name = tp._1;//如果第一个RDD中没有对应的value,则返回默认值0Integer age = tp._2._1.orElse(0);//如果第二个RDD中没有对应的value,则返回默认值0Integer score = tp._2._2.orElse(0);System.out.println(name + "," + age + "," + score);}
});/*** 结果:* zhangsan,18,90* wangwu,20,70* maliu,21,0* lisi,19,80* tianqi,0,60*/sc.stop();

Scala代码:

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("JoinTest")
val sc = new SparkContext(conf)val personRDD: RDD[(String, Int)] = sc.parallelize(List(("zhangsan", 18), ("lisi", 19), ("wangwu", 20), ("maliu", 21)),3)
val scoreRDD: RDD[(String, Int)] = sc.parallelize(List(("zhangsan", 90), ("lisi", 80), ("wangwu", 70), ("tianqi", 60)),2)//join: 两个RDD中key相同的元素进行连接
val joinRDD: RDD[(String, (Int, Int))] = personRDD.join(scoreRDD)
println(s"joinRDD 分区数:${joinRDD.getNumPartitions}")
joinRDD.foreach(println)//leftOuterJoin: 左外连接,以左边的RDD为主,右边的RDD中key相同的元素进行连接,左边RDD中key没有的元素,右边RDD中key对应的value为None
val leftOuterJoinRDD: RDD[(String, (Int, Option[Int]))] = personRDD.leftOuterJoin(scoreRDD)
println(s"leftOuterJoinRDD 分区数:${leftOuterJoinRDD.getNumPartitions}")
leftOuterJoinRDD.foreach(println)//rightOuterJoin: 右外连接,以右边的RDD为主,左边的RDD中key相同的元素进行连接,右边RDD中key没有的元素,左边RDD中key对应的value为None
val rightOuterJoinRDD: RDD[(String, (Option[Int], Int))] = personRDD.rightOuterJoin(scoreRDD)
println(s"rightOuterJoinRDD 分区数:${rightOuterJoinRDD.getNumPartitions}")
rightOuterJoinRDD.foreach(println)//fullOuterJoin: 全外连接,以两个RDD中的key进行连接,两个RDD中key没有的元素,对应的value为None
val fullOuterJoinRDD: RDD[(String, (Option[Int], Option[Int]))] = personRDD.fullOuterJoin(scoreRDD)
println(s"fullOuterJoinRDD 分区数:${fullOuterJoinRDD.getNumPartitions}")
fullOuterJoinRDD.foreach(println)sc.stop()

二、union

合并两个RDD数据,两个RDD数据集的类型要一致,不会对数据进行去重。

注意:返回新的RDD的分区数是两个合并RDD分区数的总和。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("UnionTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 3);
JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(6, 7, 8, 9, 10), 4);//union算子:对两个RDD进行union操作,返回一个新的RDD,RDD的分区数是两个RDD的分区数的总和
JavaRDD<Integer> rdd3 = rdd1.union(rdd2);
System.out.println("rdd3 分区数:" + rdd3.getNumPartitions());rdd3.foreach(x-> System.out.println(x));
sc.stop();

Scala代码:

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("UnionTest")
val sc = new SparkContext(conf)val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5),4)
val rdd2: RDD[Int] = sc.parallelize(List(6, 7, 8, 9, 10), 3)//union: 两个RDD中的元素进行合并,合并后的RDD的分区数为两个RDD的分区数之和
val rdd3: RDD[Int] = rdd1.union(rdd2)
println(s"rdd3 分区数:${rdd3.getNumPartitions}")
rdd3.foreach(println)sc.stop()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
http://www.dtcms.com/a/473450.html

相关文章:

  • HTTP初识
  • 【Linux网络】Socket编程:TCP网络编程
  • 离线docker安装jupyter(python网页版编辑器)
  • 自己怎么做彩票网站吗网站建设招标2017
  • 达梦守护集群部署安装
  • 农村电子商务网站建设wordpress不能安装插件
  • 每天五分钟深度学习:两个角度解释正则化解决网络过拟合的原理
  • 【Android Gradle学习笔记】第二天:Gradle工程目录结构
  • 【知识拓展Trip Six】宿主OS是什么,传统虚拟机和容器又有什么区别?
  • AI眼镜:作为人机交互新范式的感知延伸与智能融合终端
  • 开发网站 语言卡片式网站
  • 长乐市住房和城乡建设局网站在线购物商城网站建设
  • qt5.14查看调试源码
  • 深度学习实战:Python水果识别 CNN算法 卷积神经网络(TensorFlow训练+Django网页源码)✅
  • J1939基础通信
  • 前端开发与后端开发的区别是什么?
  • 模块使用教程(基于STM32)——蓝牙模块
  • BaseLine与BackBone
  • 多视图几何--密集匹配--视差平面推导
  • 官网和商城结合的网站网站推广合同模板
  • 微软新模型UserLM:如何为AI助手打造一个“真实世界”模拟器
  • Linux中页面分配alloc_pages相关函数
  • Qt---布局管理器
  • 基于单片机的图书馆智能座位管理平台
  • 中国机械工业建设集团有限公司网站高端网站建设论坛
  • Envoy Gateway + ext_authz 做“入口统一鉴权”,ABP 只做资源执行
  • vscode免密码认证ssh连接virtual box虚拟机
  • 3.6 JSON Mode与JSON Schema
  • React Native::关于react的匿名函数
  • 基于JETSON ORIN+FPGA+GMSL AI相机的工业双目视觉感知方案