当前位置: 首页 > news >正文

【无标题】spark编程

Value类型:

9) distinct

➢ 函数签名

def distinct()(implicit ord: Ordering[T] = null): RDD[T]

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

➢ 函数说明

将数据集中重复的数据去重

 

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
))
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)

 

10) coalesce

➢ 函数签名

def coalesce(numPartitions: Int, shuffle: Boolean = false,

partitionCoalescer: Option[PartitionCoalescer] = Option.empty) 

(implicit ord: Ordering[T] = null)

: RDD[T]

➢ 函数说明

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)

 

11) repartition

➢ 函数签名

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

➢ 函数说明

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)

 

12) sortBy

➢ 函数签名

def sortBy[K](

f: (T) => K,

ascending: Boolean = true,

numPartitions: Int = this.partitions.length) 

(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

➢ 函数说明

该操作用于排序数据。在排序之前,可以将数据通过f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程。

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
val dataRDD2 = dataRDD.sortBy(num=>num, true, 4)

 

 

双Value类型:

13) intersection

➢ 函数签名

def intersection(other: RDD[T]): RDD[T]

➢ 函数说明

对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)

 

14) union

➢ 函数签名

def union(other: RDD[T]): RDD[T]

➢ 函数说明

对源 RDD 和参数 RDD 求并集后返回一个新的 RDD(重复数据不会去重)

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)

 

15) subtract

➢ 函数签名

def subtract(other: RDD[T]): RDD[T]

➢ 函数说明

以源 RDD 元素为主,去除两个 RDD 中重复元素,将源RDD的其他元素保留下来。(求差集)

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)

 

16) zip

➢ 函数签名

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

➢ 函数说明

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD

中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

val dataRDD1 = sparkContext.makeRDD(List("a","b","c","d"))
val dataRDD2 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD = dataRDD1.zip(dataRDD2)

flatMap

➢ 函数签名

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

➢ 函数说明

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射。

val dataRDD = sparkContext.makeRDD(List(
 List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
 list => list
)

 

map和flatMap的区别:

 

map会将每一条输入数据映射为一个新对象。

 

flatMap包含两个操作:会将每一个输入对象输入映射为一个新集合,然后把这些新集合连成一个大集合。

partitionBy

➢ 函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

➢ 函数说明

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

val rdd: RDD[(Int, String)] =
 sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)

val rdd2: RDD[(Int, String)] =
 rdd.partitionBy(new HashPartitioner(2))

函数说明

将数据源的数据根据 key 对 value 进行分组

val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

可以将数据按照相同的 Key 对 Value 进行聚合

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

将数据根据不同的规则进行分区内计算和分区间计算val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 =
 dataRDD1.aggregateByKey(0)(_+_,_+_)

val dataRDD1 =
 sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

现有数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),求每个key的总值及每个key对应键值对的个数

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRDD: RDD[(String, (Int, Int))] = input.combineByKey(
 (_, 1), //a=>(a,1)
 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //acc_1为数据源的value,acc_2为key出现的次数,二者进行分区内部的计算
 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //将分区内部计算的结果进行分区间的汇总计算,得到每个key的总值以及每个key出现的次数
)

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)

 

 

 

相关文章:

  • HTTP:二.URI及相关术语
  • Linux 调试代码工具:gdb
  • 九屏图分析法以手机为例
  • OPEX baota 2024.02.26
  • NSGA-II 多目标优化 —— 理论、案例与交互式 GUI 实现
  • OpenCV 图像旋转
  • 笔记:头文件与静态库的使用及组织方式
  • 机器学习 从入门到精通 day_03
  • Android Studio Logcat V2 使用指南(适配 2024 年版本)
  • LangChain4j(2):Chat、流式与文生图模型功能
  • xHCI 上 USB 读写分析
  • Vue3 + TypeScript 的 Hooks 实用示例
  • SpringCloud Alibaba 之分布式全局事务 Seata 原理分析
  • GSO-YOLO:基于全局稳定性优化的建筑工地目标检测算法解析
  • 闭包的理解
  • 算法刷题记录——LeetCode篇(1.9) [第81~90题](持续更新)
  • JavaScript防抖与节流
  • Cloudflare教程:免费优化CDN加速配置,提升网站访问速度 | 域名访问缓存压缩视频图片媒体文件优化配置
  • Payoneer(P卡)会关联吗?如何有效防止P卡关联?
  • 第十四天 - Docker容器管理 - docker-py模块实践 - 练习:容器生命周期管理工具
  • 网站模板 整站源码下载/怎么引流推广
  • 做网站详细教程/百度竞价查询
  • 郴州网站建设公司在哪里/游戏推广对接平台
  • 老虎机网站制作/即时热榜
  • 漳州建设企业网站/怎么提交网址让百度收录
  • 香港网站建设展览/爱站网关键词密度