当前位置: 首页 > news >正文

Spark的算子

目录

一、算子

二、转换算子(Transformations)

2.1、map算子

2.2、flatMap算子 

2.3、filter算子

2.4、union算子

2.5、distinct算子

2.6、分组聚合算子

2.6.1groupByKey算子

2.6.2reduceByKey算子

2.7、排序算子

2.7.1sortBy算子

2.7.2sortByKey

2.8、重分区算子

 2.8.1repartition算子

2.8.2coalesce算子 

三、动作算子(Action)

3.1count

3.2、foreach算子

3.3、saveAsTextFile算子

3.4、first 算子

3.5、take 算子

3.6、collect 算子

3.7、reduce算子

3.8、top算子


一、算子

在 Apache Spark 中,算子(Operators)是指用于处理和转换数据的各种操作。Spark 的核心概念之一是 RDD(弹性分布式数据集),而算子则是对 RDD 进行操作的方法。算子可以分为两大类:转换算子(Transformation)和动作算子(Action)。

这篇文章要说的是转换算子

二、转换算子(Transformations)

转换算子是对 RDD 进行转换,生成新的 RDD。转换算子是惰性的(lazy),即它们不会立即执行,而是在遇到动作算子时才会触发执行。

2.1、map算子

功能:对RDD中每个元素调用一次参数中的函数,并将每次调用的返回值直接放入一个新的RDD中
分类:转换算子
场景:一对一的转换,需要返回值
语法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表参数是一个函数
T:代表RDD中的每个元素
U:代表RDD中每个元素转换的结果

list01 = [1,2,3,4,5,6]
listRdd = sc.parallelize(list01)
mapRdd = listRdd.map(lambda x: math.pow(x,3))
mapRdd.foreach(lambda x: print(x))

2.2、flatMap算子 

功能:将两层嵌套集合中的每个元素取出,扁平化处理,放入一层
集合中返回,类似于SQL中explode函数
分类:转换算子
场景:多层集合元素展开,一个集合对应多个元素【一对多】
语法:
def flatMap(self , f : T -> Iterable[U]) -> RDD[U]

夜曲/发如雪/东风破/七里香
十年/爱情转移/你的背包
日不落/舞娘/倒带
鼓楼/成都/吉姆餐厅/无法长大
月亮之上/荷塘月色

编写代码:
fileRdd = sc.textFile("../datas/a.txt",2)
flatRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatRdd.foreach(lambda x: print(x))

2.3、filter算子

功能:对RDD集合中的每个元素调用一次参数中的表达式对数据进行过滤,符合条件就保留,不符合就过滤
场景:行的过滤,类似于SQL中where或者having
 def filter(self, f: T -> bool ) -> RDD[T]

1 周杰伦 0 夜曲/发如雪/东风破/七里香
2 陈奕迅 0 十年/爱情转移/你的背包
3 1 日不落/舞娘/倒带
4 赵雷 0 鼓楼/成都/吉姆餐厅/无法长大
5 凤凰传奇 -1 月亮之上/荷塘月色

代码演示:
fileRdd = sc.textFile("../datas/b.txt",2)
filterRdd = fileRdd.filter(lambda line: re.split(r"\s",line)[2] != '-1' and len(re.split("\\s",line)) == 4)  #每一行第三列不等于‘-1’的和不足4列的会被剔除
 filterRdd.foreach(lambda x: print(x))

2.4、union算子

union算子
功能:实现两个RDD中数据的合并
分类:转换算子
语法:
def union(self,other:RDD[U]) -> RDD[T/U]

list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
rdd3 = rdd1.union(rdd2)

rdd3.foreach(print)

2.5、distinct算子

功能:实现对RDD元素的去重
分类:转换算子
语法:
def distinct(self) -> RDD[T]

list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.distinct()
rdd4.foreach(print)

2.6、分组聚合算子

2.6.1groupByKey算子

xxxByKey算子,只有KV类型的RDD才能调用

功能:对KV类型的RDD按照Key进行分组,相同K的Value放入一 个集合列表中,返回一个新的RDD

语法:RDD【K,V】.groupByKey => RDD【K, List[V]】

分类:转换算子

场景:需要对数据进行分组的场景,或者说分组以后的聚合逻辑 比较复杂,不适合用reduce

特点:必须经过Shuffle,可以指定新的RDD分区个数,可以指定分区规则

rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd2 = rdd1.groupByKey()  # ("word",List[10,5])
rdd2.foreach(lambda x: print(x[0], *x[1]))

2.6.2reduceByKey算子

功能:对KV类型的RDD按照Key进行分组,并对相同Key的所有
Value使用参数中的reduce函数进行聚合
要求:只有KV类型的RDD才能调用
分类:转换算子
特点:必须经过shuffle,可以指定新的RDD分区个数,可以指定分区规则
语法:
def reduceByKey(self,f: (T,T) ->T,numPartitions,partitionFunction) ->RDD[Tuple[K,V]]

rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)

rdd2= rdd1.reduceByKey(lambda total,num: total * num)
 rdd2.foreach(print)

注意:能用reduceByKey就不要用groupByKey+map

reduceByKey代码更简洁,而且性能会更好

2.7、排序算子

2.7.1sortBy算子

功能:对RDD中的所有元素进行整体排序,可以指定排序规则
【按照谁排序,升序或者降序】
分类:转换算子
场景:适用于所有对大数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
特点:经过Shuffle,可以指定排序后新RDD的分区个数,底层只能使用RangePartitioner来实现
def sortBy(self, keyFunc:(T) -> 0, asc: bool,numPartitions) -> RDD
keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序

rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd1.sortByKey(ascending=False).foreach(print)

2.7.2sortByKey

功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则
要求:只有KV类型的RDD才能调用
分类:转换算子【sortByKey会触发job的运行】
场景:适用于大数据量的KV类型的RDD按照Key排序的场景
特点:经过Shuffle,可以指定排序后新RDD的分区个数
语法:def sortByKey(self, asc, numPartitions) -> RR[Tuple[K,V]]

rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd1.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print) #这样会根据value排序

2.8、重分区算子

 2.8.1repartition算子

功能:调整RDD的分区个数
分类:转换算子
场景:一般用于调大分区个数,必须经过shuffle才能实现
语法:
def repartition(self,numPartitions) -> RDD[T]

list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
    # 没有指定分区,走默认,默认分区个数,因为是local 模式,所以跟核数有关,所以 分区数为2
rdd = sc.parallelize(list01)
print(rdd.getNumPartitions()) # 2
# repartition 是一个转换算子,必然经历shuffle过程
bigrdd = rdd.repartition(4)
print(bigrdd.getNumPartitions()) # 4

2.8.2coalesce算子 

功能:调整RDD的分区个数
分类:转换算子

特点:可以选择是否经过Shuffle,默认情况下不经过shuffle
def coalesce(self, numPartitions, shuffle:bool) -> RDD[T]

bigbigrdd = bigrdd.coalesce(8,shuffle=True) # 8
 print(bigbigrdd.getNumPartitions())

三、动作算子(Action)

动作算子用于触发 RDD 的计算,并返回结果或将其保存到外部存储系统。动作算子会立即执行所有之前定义的转换算子。

3.1count

count算子
功能:统计RDD集合中元素的个数,返回一个int值
分类:动作算子
场景:统计RDD的数据量,计算行数
语法:
def count(self) -> int

data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
count = flow.count()
print(f"RDD 中的元素数量: {count}")

3.2、foreach算子

功能:对RDD中每个元素调用一次参数中的函数,没有返回值【与map场景上区别】
分类:触发算子
场景:对RDD中的每个元素进行输出或者保存,一般用于测试打印或者保存数据到第三方系统【数据库等】

data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 过滤数据
# 1. 过滤掉手机号码不正确(长度不为11位)的数据
# 2. 过滤掉数据长度不等于11的数据
flow_filtered = flow.filter(lambda line: len(re.split(r"\s+", line)) == 11 and len(re.split(r"\s+", line)[1]) == 11)

3.3、saveAsTextFile算子

功能:用于将RDD的数据保存到外部文件系统中
分类:触发算子
场景:保存RDD的计算的结果,一般用于将结果保存到HDFS
文件个数 = Task个数 = 分区个数
def saveAsTextFile(self , path ) -> None

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 过滤数据
# 1. 过滤掉手机号码不正确(长度不为11位)的数据
# 2. 过滤掉数据长度不一致的数据
flow_filtered = flow.filter(lambda line: len(re.split(r"\s+", line)) == 11 and len(re.split(r"\s+", line)[1]) == 11)

# 解析数据,提取手机号码、上行流量和下行流量
flow_parsed = flow_filtered.map(lambda line: (re.split(r"\s+", line)[1], (int(re.split(r"\s+", line)[2]), int(re.split(r"\s+", line)[3]))))

# 计算每个手机号码的总流量
total_traffic = flow_parsed.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# 提取总流量
total_traffic_result = total_traffic.mapValues(lambda v: v[0] + v[1])

# 将结果保存为文本文件
output_path = "D:\\pythonCode\\PySpark\\data\\output\\total_traffic"
total_traffic_result.saveAsTextFile(output_path)

3.4、first 算子

功能:返回RDD集合中的第一个元素【RDD有多个分区,返回的是第一个分区的第一个元素】
分类:触发算子
语法:def first(self) -> T

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的第一个元素
first_element = flow.first()

# 打印结果
print(f"RDD 中的第一个元素: {first_element}")

3.5、take 算子

功能:返回RDD集合中的前N个元素【先从第一个分区取,如果不够再从第二个分区取】
分类:触发算子
注意:take返回的结果放入Driver内存中的,take数据量不能过大

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的前 5 个元素
first_five_elements = flow.take(5)

# 打印结果
print(f"RDD 中的前 5 个元素: {first_five_elements}")

3.6、collect 算子

collect算子
功能:将RDD转化成一个列表返回
分类:触发算子
这个RDD的数据一定不能过大,如果RDD数据量很大,导致Driver内存溢出

理解:假如现在有三个分区,三个分区中都有数据,假如你现在想打印数据,此时打印哪个分区呢?先收集,将数据汇总在一起,再打印。

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的所有元素
all_elements = flow.collect()

# 打印结果
print(f"RDD 中的所有元素: {all_elements}")

3.7、reduce算子

功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
分类:触发算子
# tmp用于存储每次计算临时结果,item就是RDD中的每个元素
def reduce(self,f : (T,T) -> U) -> U

# 创建一个包含整数的 RDD
numbers = sc.parallelize([1, 2, 3, 4, 5])

# 使用 reduce 算子计算总和
total_sum = numbers.reduce(lambda a, b: a + b)

# 打印结果
print(f"整数的总和: {total_sum}")

3.8、top算子

功能:对RDD中的所有元素降序排序,并返回前N个元素,即返回RDD中最大的前N个元数据
分类:触发算子
场景:取RDD数据中的最大的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
语法:def top(self,num) -> List[0]

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的前 5 个元素,按字符串长度降序排序
top_five_elements = flow.top(5, key=lambda x: len(x))

# 打印结果
print(f"RDD 中的前 5 个元素(按字符串长度降序): {top_five_elements}")

3.9、takeOrdered算子

功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
分类:触发算子
场景:取RDD数据中的最小的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能
适合处理小数据量
语法:def takeOrdered(self,num) -> List[0]

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的前 5 个元素,按字符串长度升序排序
ordered_five_elements = flow.takeOrdered(5, key=lambda x: len(x))

# 打印结果
print(f"RDD 中的前 5 个元素(按字符串长度升序): {ordered_five_elements}")

相关文章:

  • 250302-绿联NAS通过Docker配置SearXNG及适配Open-WebUI的yaml配置
  • Time Interval Aware Self-Attention for Sequential Recommendation
  • 2025-03-05 学习记录--C/C++-PTA 习题5-7 使用函数求余弦函数的近似值
  • WindowManagerService之Window类型篇
  • 【资料分享】清华大学-187页:AIGC发展研究3.0版
  • Mysql 数据库免费使用
  • Java高频面试之集合-03
  • unity6 打包webgl注意事项
  • Vue3多页面跳转
  • 低代码+AI双重革命:传统软件开发的破局与重生
  • 四款GIS工具箱软件解析:满足企业多样化空间数据需求
  • SPI硬件设计及通信原理解析
  • FLEXOO的传感器技术:从材料选择到生产工艺的全方位创新
  • Cursor+Claude3.7实现从原型到app开发
  • 软考中级-数据库-3.3 数据结构-树
  • 网络安全数据富化 网络数据安全处理规范
  • Windows零门槛部署DeepSeek大模型:Ollama+7B参数模型本地推理全攻略
  • Ubuntu系统安装Apache2方法
  • 2.数据结构-栈和队列
  • 蓝桥杯C组真题——巧克力
  • 中国科学院院士、我国航天液体火箭技术专家朱森元逝世
  • 秦洪看盘|交易新逻辑,银行股成A股稳定器
  • 习近平举行仪式欢迎巴西总统卢拉访华
  • 北洋“修约外交”的台前幕后——民国条约研究会档案探研
  • 牛市早报|中美日内瓦经贸会谈联合声明公布
  • 老人将房产遗赠给外孙,三个女儿却认为遗嘱应无效,法院判了