当前位置: 首页 > news >正文

【Spark征服之路-2.8-Spark-Core编程(四)】

Key-Value类型:

17. partitionBy

➢ 函数签名

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

➢ 函数说明

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

val rdd: RDD[(Int, String)] =
  sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)

val rdd2: RDD[(Int, String)] =
  rdd.partitionBy(new HashPartitioner(2))

18. groupByKey

➢ 函数签名

def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

➢ 函数说明

将数据源的数据根据 key 对 value 进行分组

val dataRDD1 =
  sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

19. reduceByKey

➢ 函数签名

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

➢ 函数说明

可以将数据按照相同的 Key 对 Value 进行聚合

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

reduceByKey 和 groupByKey 的区别:

从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。

从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚

合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那

么还是只能使用 groupByKey

20. aggregateByKey

➢ 函数签名

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,

 combOp: (U, U) => U): RDD[(K, U)]

➢ 函数说明

将数据根据不同的规则进行分区内计算和分区间计算val dataRDD1 =
  sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 =
  dataRDD1.aggregateByKey(0)(_+_,_+_)

21. foldByKey

➢ 函数签名

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

➢ 函数说明

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

val dataRDD1 =
  sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

22. combineByKey

➢ 函数签名

def combineByKey[C](

 createCombiner: V => C,//将当前值作为参数进行附加操作并返回

 mergeValue: (C, V) => C,// 在分区内部进行,将新元素V合并到第一步操作得到的C中

 mergeCombiners: (C, C) => C): RDD[(K, C)]//将第二步操作得到的C进行分区间计算

➢ 函数说明

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于

aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

示例:现有数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),求每个key的总值及每个key对应键值对的个数

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRDD: RDD[(String, (Int, Int))] = input.combineByKey(
  (_, 1), //a=>(a,1)
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //acc_1为数据源的value,acc_2为key出现的次数,二者进行分区内部的计算
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //将分区内部计算的结果进行分区间的汇总计算,得到每个key的总值以及每个key出现的次数
)

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:

reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同

FoldByKey: 每一个key 对应的数据和初始值进行分区内计算,分区内和分区间计算规则相

AggregateByKey:每一个 key 对应的数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同

CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区

内和分区间计算规则不相同。

23. sortByKey

➢ 函数签名

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)

 : RDD[(K, V)]

➢ 函数说明

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)

24. join

➢ 函数签名

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

➢ 函数说明

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的

(K,(V,W))的 RDD

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)

25. leftOuterJoin

➢ 函数签名

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

➢ 函数说明

类似于 SQL 语句的左外连接

val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",4)))
val dataRDD2 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

26. cogroup

➢ 函数签名

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

➢ 函数说明

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD

val dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =
  dataRDD1.cogroup(dataRDD2)

相关文章:

  • Qwen3-Embedding-Reranker本地部署教程:8B 参数登顶 MTEB 多语言榜首,100 + 语言跨模态检索无压力!
  • 奇异值分解
  • 深度学习:人工神经网络基础概念
  • 数据驱动SEO:8种自动化实践,精准提升排名与监控效能
  • ECharts:柱状图背景虚线
  • 16. 线性表的链式表示和实现(5)
  • git如何强制拉取远程分支覆盖本地分支
  • 数据仓库面试题合集⑤
  • 在IntelliJ IDEA中使用Maven配置Tomcat环境
  • 条件查询详细说明
  • 如何在 Android 上备份音乐:保护歌曲的 5 种方法
  • OceanBase上架 KubeSphere Marketplace!打造云原生数据库新范式
  • 把springboot打包为maven可引入的jar
  • VsCode 常用快捷键设置方法
  • arcpy数据分析自动化(2)
  • 【Mini-F5265-OB开发板试用测评】基于ST7735STFT屏幕的LVGL9移植
  • 【Linux驱动开发 ---- 1.1_Linux 基础操作入门】
  • 苍穹外卖--添加购物车
  • Websocket 数据实时更新(消息提醒功能)异步+事件发布
  • 我是如何使用Claude Code
  • 广州动态网站开发/写软文的app
  • 大兴安岭商城网站开发设计/免费b站推广网站2022
  • 免费设计房子的软件/网站seo方案策划书
  • 云营销网站建设电话咨询/广东网络推广运营
  • 品牌建设 凝心/360优化大师历史版本
  • 情人做网站/加快实施创新驱动发展战略