当前位置: 首页 > news >正文

大数据Spark(七十):Transformation转换算子cogroup和zip使用案例

文章目录

Transformation转换算子cogroup和zip使用案例

一、cogroup使用案例

二、zip使用案例


Transformation转换算子cogroup和zip使用案例

一、cogroup使用案例

作用到K,V格式的两个RDD上,如两个RDD类型为(K,V)和(K,W)格式数据,返回一个数据集RDD(K,(Iterable,Iterable))。

注意:子RDD的分区与父RDD多的一致。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("CogroupTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaPairRDD<String, Integer> personRDD = sc.<String, Integer>parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("zhangsan", 18),new Tuple2<String, Integer>("zhangsan", 180),new Tuple2<String, Integer>("lisi", 19),new Tuple2<String, Integer>("wangwu", 20),new Tuple2<String, Integer>("wangwu", 200),new Tuple2<String, Integer>("maliu", 21)
),3);JavaPairRDD<String, Integer> scoreRDD = sc.<String, Integer>parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("zhangsan", 90),new Tuple2<String, Integer>("zhangsan", 900),new Tuple2<String, Integer>("lisi", 80),new Tuple2<String, Integer>("lisi", 800),new Tuple2<String, Integer>("wangwu", 70),new Tuple2<String, Integer>("wangwu", 700),new Tuple2<String, Integer>("tianqi", 60),new Tuple2<String, Integer>("tianqi", 600)
),4);JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupRDD = personRDD.cogroup(scoreRDD);
System.out.println("cogroupRDD 分区数:" + cogroupRDD.getNumPartitions());cogroupRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> tp) throws Exception {String name = tp._1;Iterable<Integer> ages = tp._2._1;Iterable<Integer> scores = tp._2._2;System.out.println("name: " + name+" ages: " + ages + " scores: " + scores);}
});sc.stop();

Scala代码:

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("CogroupTest")
val sc = new SparkContext(conf)val personRDD: RDD[(String, Int)] = sc.parallelize(List(("zhangsan", 18),("zhangsan", 180),("lisi", 19),("lisi", 190),("wangwu", 20),("wangwu", 200),("maliu", 21)),3)val scoreRDD: RDD[(String, Int)] = sc.parallelize(List(("zhangsan", 90),("zhangsan", 900),("lisi", 80),("lisi", 800),("wangwu", 70),("wangwu", 700),("tianqi", 60)),2)//cogroup: 两个RDD中key相同的元素进行连接
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = personRDD.cogroup(scoreRDD)
println(s"cogroupRDD 分区数:${cogroupRDD.getNumPartitions}")
cogroupRDD.foreach(tp=>{val name: String = tp._1val ageList: List[Int] = tp._2._1.toListval scoreList: List[Int] = tp._2._2.toListprintln(s"name:${name}, ageList:${ageList}, scoreList:${scoreList}")
})sc.stop()

二、zip使用案例

将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("zipTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c", "d"));
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("e", "f", "g", "h"));
//zip: 将两个RDD中的元素按照位置一一对应,返回一个新的RDD
JavaPairRDD<String, String> zip = rdd1.zip(rdd2);
zip.foreach(tp-> System.out.println(tp));sc.stop();

Scala代码:

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("zipTest")
val sc = new SparkContext(conf)val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5))
val rdd2: RDD[String] = sc.parallelize(Array("a", "b", "c", "d", "e"))rdd1.zip(rdd2).foreach(println)sc.stop()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
http://www.dtcms.com/a/524301.html

相关文章:

  • Drools在java中的使用
  • 【办公类-121-02】20251024淘宝视频红包(UIBOT点击“左箭头”“视频”“消息”切换)
  • 9 种高级 RAG 技术及其实现方法
  • 计算机网络面试核心知识点大全
  • 做网站建站现在什么传奇最火电脑版
  • C语言需要掌握的基础知识点之图
  • 做一个类似京东的网站海外注册公司
  • python舆情分析可视化系统 情感分析 微博 爬虫 scrapy爬虫技术 朴素贝叶斯分类算法大数据 计算机✅
  • 压缩与缓存调优实战指南:从0到1根治性能瓶颈(六)
  • 做百度手机网站优化点asp网站制作教程
  • element+vue3 table上下左右键切换input和select
  • 元萝卜 1.0.27| 免Root,XP模块框架,支持应用多开分身,一键微信平板模式
  • 长春企业网站seo珠海企业官网设计制作
  • MySQL 函数详细说明
  • 《Memcached 连接:深入理解与优化实践》
  • C++ EigenSolver无优化模式下报错分析
  • 数据结构——折半插入排序
  • io_uring 快吗? Postgres 17 与 18 的基准测试
  • 国产数据库替代MongoDB:政务电子证照新选择
  • 甘孜建设网站集团响应式网站建设
  • 枸杞网站建设方案2024年即将上市的手机
  • Git 版本回退 reset --mixed 命令
  • 博途DWORD中包含word ,字节,位的关系
  • Java Character 类详解
  • 【数据结构】队列“0”基础知识讲解 + 实战演练
  • 【生活】秋冬季节,鼻子很干结痂,扣掉鼻孔干痂流血,鼻塞等护理方法
  • 网站关键词公司百度关键词查询
  • 大模型通识
  • 346. 执行操作后元素的最高频率 I
  • 一些常用的linux操作指令