当前位置: 首页 > news >正文

大数据Spark(七十二):Transformation转换算子repartition和coalesce使用案例

文章目录

Transformation转换算子Repartition和Coalesce使用案例

一、Repartition使用案例

二、Coalesce使用案例


Transformation转换算子Repartition和Coalesce使用案例

一、Repartition使用案例

repartition可以对RDD进行重新分区,可以增加或减少分区,这个过程会产生shuffle,常用于对RDD进行增加分区,提高并行度场景。

注意:在底层,repartition(numPartitions) = coalesce(numPartitions,true)。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("repartitionTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("love1", "love2", "love3", "love4","love5", "love6", "love7", "love8","love9", "love10", "love11", "love12"
), 3);JavaRDD<String> rdd2 = rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {//index: 分区的索引,从0开始//iter: 分区中的元素@Overridepublic Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {ArrayList<String> list = new ArrayList<>();while (iter.hasNext()) {String next = iter.next();list.add("rdd1 partition index: " + index + " current value: " + next);}return list.iterator();}
}, true);//对rdd2 进行重新分区
//JavaRDD<String> rdd3 = rdd2.repartition(4); //增加分区
JavaRDD<String> rdd3 = rdd2.repartition(2);//减少分区JavaRDD<String> rdd4 = rdd3.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {//index: 分区的索引,从0开始//iter: 分区中的元素@Overridepublic Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {ArrayList<String> list = new ArrayList<>();while (iter.hasNext()) {String next = iter.next();list.add("rdd3 partition index: 【" + index + "】,current value: 【" + next + "】");}return list.iterator();}
}, true);List<String> result = rdd4.collect();
for (String s : result) {System.out.println(s);
}sc.stop();

Scala代码:

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("RepartitionTest")
val sc = new SparkContext(conf)val rdd1: RDD[String] = sc.parallelize(List("love1", "love2", "love3", "love4","love5", "love6", "love7", "love8","love9", "love10", "love11", "love12"
), 3)val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {list.append(s"rdd1 partition index: $index ,current value: ${iter.next()}")}list.iterator
})//对rdd2进行重分区
val rdd3: RDD[String] = rdd2.repartition(4) //增加分区
//val rdd3: RDD[String] = rdd2.repartition(2) //减少分区val rdd4: RDD[String] = rdd3.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {list.append(s"rdd3 partition index: 【$index】 ,current value: 【${iter.next()}】")}list.iterator
})rdd4.collect.foreach(println)sc.stop()

Java和Scala API结果如下,可见通过repartition进行增加或者减少分区操作会产生shuffle操作。

#repartition(4)结果
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love4】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 1 current value: love8】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 2 current value: love9】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 0 current value: love1】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love5】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love10】
rdd3 partition index: 【2】,current value: 【rdd1 partition index: 0 current value: love2】
rdd3 partition index: 【2】,current value: 【rdd1 partition index: 1 current value: love6】
rdd3 partition index: 【2】,current value: 【rdd1 partition index: 2 current value: love11】
rdd3 partition index: 【3】,current value: 【rdd1 partition index: 0 current value: love3】
rdd3 partition index: 【3】,current value: 【rdd1 partition index: 1 current value: love7】
rdd3 partition index: 【3】,current value: 【rdd1 partition index: 2 current value: love12】#repartition(2)结果
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love2】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love4】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 1 current value: love6】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 1 current value: love8】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 2 current value: love9】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 2 current value: love11】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 0 current value: love1】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 0 current value: love3】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love5】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love7】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love10】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love12】

二、Coalesce使用案例

coalesce也可以对RDD分区增加或者减少,常用于减少RDD的分区数量,常用于提高小数据集的执行效率。与 repartition 不同,coalesce 默认情况下不会触发 Shuffle 操作,因此在减少分区时更加高效。函数签名如下:

def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

其中,numPartitions 表示目标分区数,shuffle 参数指示是否进行 Shuffle,默认为 false。

特别注意:如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)。

Java代码:

SparkConf conf = new SparkConf().setMaster("local").setAppName("CoalesceTest");
JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("love1", "love2", "love3", "love4","love5", "love6", "love7", "love8","love9", "love10", "love11", "love12"
), 3);JavaRDD<String> rdd2 = rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {//index: 分区的索引,从0开始//iter: 分区中的元素@Overridepublic Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {ArrayList<String> list = new ArrayList<>();while (iter.hasNext()) {String next = iter.next();list.add("rdd1 partition index: " + index + " current value: " + next);}return list.iterator();}
}, true);//coalesce对rdd2 进行重新分区,没有shuffle
//JavaRDD<String> rdd3 = rdd2.coalesce(2);
//JavaRDD<String> rdd3 = rdd2.coalesce(2,true);
JavaRDD<String> rdd3 = rdd2.coalesce(4,false);JavaRDD<String> rdd4 = rdd3.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {//index: 分区的索引,从0开始//iter: 分区中的元素@Overridepublic Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {ArrayList<String> list = new ArrayList<>();while (iter.hasNext()) {String next = iter.next();list.add("rdd3 partition index: 【" + index + "】,current value: 【" + next + "】");}return list.iterator();}
}, true);List<String> result = rdd4.collect();
for (String s : result) {System.out.println(s);
}sc.stop();

Scala代码:

val conf: SparkConf = new SparkConf().setMaster("local").setAppName("CoalesceTest")
val sc = new SparkContext(conf)val rdd1: RDD[String] = sc.parallelize(List("love1", "love2", "love3", "love4","love5", "love6", "love7", "love8","love9", "love10", "love11", "love12"
), 3)val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {list.append(s"rdd1 partition index: $index ,current value: ${iter.next()}")}list.iterator
})//coalesce对rdd2进行重分区,不产生shuffle
//val rdd3: RDD[String] = rdd2.coalesce(2)
//val rdd3: RDD[String] = rdd2.coalesce(2,true)
val rdd3: RDD[String] = rdd2.coalesce(4)val rdd4: RDD[String] = rdd3.mapPartitionsWithIndex((index, iter) => {val list = new ListBuffer[String]()while (iter.hasNext) {list.append(s"rdd3 partition index: 【$index】 ,current value: 【${iter.next()}】")}list.iterator
})rdd4.collect.foreach(println)sc.stop()

Java和Scala API结果如下:

#coalesce(2)
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love1】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love2】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love3】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love4】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love5】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love6】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love7】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love8】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love9】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love10】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love11】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love12】#coalesce(2,true),等同于repartition(2)
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love2】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love4】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 1 current value: love6】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 1 current value: love8】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 2 current value: love9】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 2 current value: love11】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 0 current value: love1】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 0 current value: love3】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love5】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love7】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love10】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 2 current value: love12】#coalesce(4,false) 不起作用
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love1】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love2】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love3】
rdd3 partition index: 【0】,current value: 【rdd1 partition index: 0 current value: love4】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love5】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love6】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love7】
rdd3 partition index: 【1】,current value: 【rdd1 partition index: 1 current value: love8】
rdd3 partition index: 【2】,current value: 【rdd1 partition index: 2 current value: love9】
rdd3 partition index: 【2】,current value: 【rdd1 partition index: 2 current value: love10】
rdd3 partition index: 【2】,current value: 【rdd1 partition index: 2 current value: love11】
rdd3 partition index: 【2】,current value: 【rdd1 partition index: 2 current value: love12】

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
http://www.dtcms.com/a/585284.html

相关文章:

  • Android 16 Kotlin协程 第二部分
  • 网站建设公司兴田德润电话新县城乡规划建设局网站
  • Claude Code使用指南
  • 如何进行MSSQL提权?默认库,xp_cmdshell提权
  • 第三章 布局
  • 「数据获取」《中国口岸年鉴》(2001-2024)(2002未出版)
  • Visual Studio笔记
  • 【开题答辩全过程】以 二手手机交易平台的设计与实现为例,包含答辩的问题和答案
  • “AI+XR”赋能智慧研创中心,预见职业教育“新双高”的未来
  • 保障房建设网站首页河北信息门户网站定制
  • MySQL的IFNULL函数介绍
  • 【数据结构】从零开始认识图论 --- 单源/多源最短路算法
  • 基于PyTorch的动物识别模型训练与应用实战
  • JS之BOM与DOM操作
  • 品牌企业网站案例wordpress 漂浮广告
  • 【人工智能学习笔记 三】 AI教学之前端跨栈一:React整体分层架构
  • 【ZeroRange WebRTC】WebRTC 在 IPC(网络摄像头)中的应用:架构、实现与实践(深入指南)
  • WiFi 热点启动失败问题排查与解决
  • 手写序列化与反序列化
  • T41NQ/T41N高性能低功耗SOC芯片 软硬件资料T41NQ适用于各种AIoT应用,适用于智能安防、智能家居,机器视觉等领域方案
  • 购物网站建设要求用wordpress改
  • vector 底层模拟实现(上):核心机制全解析 + 迭代器失效深度剖析
  • mysql内置函数——了解常用的函数
  • 网站建设步骤ppt一个企业seo网站的优化流程
  • 技术演进中的开发沉思-178 JSP :前世今生(下)
  • 做网站学什么软件网页美工实例教程
  • 深入理解 Spring Boot Actuator:构建可观测性与运维友好的应用
  • 现代C++的AI革命:C++20/C++23核心特性解析与实战应用
  • 【数据结构】单链表的经典算法题
  • 网站优化要用什么软件做公司网站哪家好