Spark的算子
目录
一、算子
二、转换算子(Transformations)
2.1、map算子
2.2、flatMap算子
2.3、filter算子
2.4、union算子
2.5、distinct算子
2.6、分组聚合算子
2.6.1groupByKey算子
2.6.2reduceByKey算子
2.7、排序算子
2.7.1sortBy算子
2.7.2sortByKey
2.8、重分区算子
2.8.1repartition算子
2.8.2coalesce算子
三、动作算子(Action)
3.1count
3.2、foreach算子
3.3、saveAsTextFile算子
3.4、first 算子
3.5、take 算子
3.6、collect 算子
3.7、reduce算子
3.8、top算子
一、算子
在 Apache Spark 中,算子(Operators)是指用于处理和转换数据的各种操作。Spark 的核心概念之一是 RDD(弹性分布式数据集),而算子则是对 RDD 进行操作的方法。算子可以分为两大类:转换算子(Transformation)和动作算子(Action)。
这篇文章要说的是转换算子
二、转换算子(Transformations)
转换算子是对 RDD 进行转换,生成新的 RDD。转换算子是惰性的(lazy),即它们不会立即执行,而是在遇到动作算子时才会触发执行。
2.1、map算子
功能:对RDD中每个元素调用一次参数中的函数,并将每次调用的返回值直接放入一个新的RDD中
分类:转换算子
场景:一对一的转换,需要返回值
语法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表参数是一个函数
T:代表RDD中的每个元素
U:代表RDD中每个元素转换的结果
list01 = [1,2,3,4,5,6]
listRdd = sc.parallelize(list01)
mapRdd = listRdd.map(lambda x: math.pow(x,3))
mapRdd.foreach(lambda x: print(x))
2.2、flatMap算子
功能:将两层嵌套集合中的每个元素取出,扁平化处理,放入一层
集合中返回,类似于SQL中explode函数
分类:转换算子
场景:多层集合元素展开,一个集合对应多个元素【一对多】
语法:
def flatMap(self , f : T -> Iterable[U]) -> RDD[U]
夜曲/发如雪/东风破/七里香
十年/爱情转移/你的背包
日不落/舞娘/倒带
鼓楼/成都/吉姆餐厅/无法长大
月亮之上/荷塘月色编写代码:
fileRdd = sc.textFile("../datas/a.txt",2)
flatRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatRdd.foreach(lambda x: print(x))
2.3、filter算子
功能:对RDD集合中的每个元素调用一次参数中的表达式对数据进行过滤,符合条件就保留,不符合就过滤
场景:行的过滤,类似于SQL中where或者having
def filter(self, f: T -> bool ) -> RDD[T]
1 周杰伦 0 夜曲/发如雪/东风破/七里香
2 陈奕迅 0 十年/爱情转移/你的背包
3 1 日不落/舞娘/倒带
4 赵雷 0 鼓楼/成都/吉姆餐厅/无法长大
5 凤凰传奇 -1 月亮之上/荷塘月色代码演示:
fileRdd = sc.textFile("../datas/b.txt",2)
filterRdd = fileRdd.filter(lambda line: re.split(r"\s",line)[2] != '-1' and len(re.split("\\s",line)) == 4) #每一行第三列不等于‘-1’的和不足4列的会被剔除
filterRdd.foreach(lambda x: print(x))
2.4、union算子
union算子
功能:实现两个RDD中数据的合并
分类:转换算子
语法:
def union(self,other:RDD[U]) -> RDD[T/U]
list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
rdd3 = rdd1.union(rdd2)rdd3.foreach(print)
2.5、distinct算子
功能:实现对RDD元素的去重
分类:转换算子
语法:
def distinct(self) -> RDD[T]
list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.distinct()
rdd4.foreach(print)
2.6、分组聚合算子
2.6.1groupByKey算子
xxxByKey算子,只有KV类型的RDD才能调用
功能:对KV类型的RDD按照Key进行分组,相同K的Value放入一 个集合列表中,返回一个新的RDD
语法:RDD【K,V】.groupByKey => RDD【K, List[V]】
分类:转换算子
场景:需要对数据进行分组的场景,或者说分组以后的聚合逻辑 比较复杂,不适合用reduce
特点:必须经过Shuffle,可以指定新的RDD分区个数,可以指定分区规则
rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd2 = rdd1.groupByKey() # ("word",List[10,5])
rdd2.foreach(lambda x: print(x[0], *x[1]))
2.6.2reduceByKey算子
功能:对KV类型的RDD按照Key进行分组,并对相同Key的所有
Value使用参数中的reduce函数进行聚合
要求:只有KV类型的RDD才能调用
分类:转换算子
特点:必须经过shuffle,可以指定新的RDD分区个数,可以指定分区规则
语法:
def reduceByKey(self,f: (T,T) ->T,numPartitions,partitionFunction) ->RDD[Tuple[K,V]]
rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd2= rdd1.reduceByKey(lambda total,num: total * num)
rdd2.foreach(print)
注意:能用reduceByKey就不要用groupByKey+map
reduceByKey代码更简洁,而且性能会更好
2.7、排序算子
2.7.1sortBy算子
功能:对RDD中的所有元素进行整体排序,可以指定排序规则
【按照谁排序,升序或者降序】
分类:转换算子
场景:适用于所有对大数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
特点:经过Shuffle,可以指定排序后新RDD的分区个数,底层只能使用RangePartitioner来实现
def sortBy(self, keyFunc:(T) -> 0, asc: bool,numPartitions) -> RDD
keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序
rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd1.sortByKey(ascending=False).foreach(print)
2.7.2sortByKey
功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则
要求:只有KV类型的RDD才能调用
分类:转换算子【sortByKey会触发job的运行】
场景:适用于大数据量的KV类型的RDD按照Key排序的场景
特点:经过Shuffle,可以指定排序后新RDD的分区个数
语法:def sortByKey(self, asc, numPartitions) -> RR[Tuple[K,V]]
rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd1.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print) #这样会根据value排序
2.8、重分区算子
2.8.1repartition算子
功能:调整RDD的分区个数
分类:转换算子
场景:一般用于调大分区个数,必须经过shuffle才能实现
语法:
def repartition(self,numPartitions) -> RDD[T]
list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
# 没有指定分区,走默认,默认分区个数,因为是local 模式,所以跟核数有关,所以 分区数为2
rdd = sc.parallelize(list01)
print(rdd.getNumPartitions()) # 2
# repartition 是一个转换算子,必然经历shuffle过程
bigrdd = rdd.repartition(4)
print(bigrdd.getNumPartitions()) # 4
2.8.2coalesce算子
功能:调整RDD的分区个数
分类:转换算子
特点:可以选择是否经过Shuffle,默认情况下不经过shuffle
def coalesce(self, numPartitions, shuffle:bool) -> RDD[T]
bigbigrdd = bigrdd.coalesce(8,shuffle=True) # 8
print(bigbigrdd.getNumPartitions())
三、动作算子(Action)
动作算子用于触发 RDD 的计算,并返回结果或将其保存到外部存储系统。动作算子会立即执行所有之前定义的转换算子。
3.1count
count算子
功能:统计RDD集合中元素的个数,返回一个int值
分类:动作算子
场景:统计RDD的数据量,计算行数
语法:
def count(self) -> int
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
count = flow.count()
print(f"RDD 中的元素数量: {count}")
3.2、foreach算子
功能:对RDD中每个元素调用一次参数中的函数,没有返回值【与map场景上区别】
分类:触发算子
场景:对RDD中的每个元素进行输出或者保存,一般用于测试打印或者保存数据到第三方系统【数据库等】
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)# 过滤数据
# 1. 过滤掉手机号码不正确(长度不为11位)的数据
# 2. 过滤掉数据长度不等于11的数据
flow_filtered = flow.filter(lambda line: len(re.split(r"\s+", line)) == 11 and len(re.split(r"\s+", line)[1]) == 11)
3.3、saveAsTextFile算子
功能:用于将RDD的数据保存到外部文件系统中
分类:触发算子
场景:保存RDD的计算的结果,一般用于将结果保存到HDFS
文件个数 = Task个数 = 分区个数
def saveAsTextFile(self , path ) -> None
# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)# 过滤数据
# 1. 过滤掉手机号码不正确(长度不为11位)的数据
# 2. 过滤掉数据长度不一致的数据
flow_filtered = flow.filter(lambda line: len(re.split(r"\s+", line)) == 11 and len(re.split(r"\s+", line)[1]) == 11)# 解析数据,提取手机号码、上行流量和下行流量
flow_parsed = flow_filtered.map(lambda line: (re.split(r"\s+", line)[1], (int(re.split(r"\s+", line)[2]), int(re.split(r"\s+", line)[3]))))# 计算每个手机号码的总流量
total_traffic = flow_parsed.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))# 提取总流量
total_traffic_result = total_traffic.mapValues(lambda v: v[0] + v[1])# 将结果保存为文本文件
output_path = "D:\\pythonCode\\PySpark\\data\\output\\total_traffic"
total_traffic_result.saveAsTextFile(output_path)
3.4、first 算子
功能:返回RDD集合中的第一个元素【RDD有多个分区,返回的是第一个分区的第一个元素】
分类:触发算子
语法:def first(self) -> T
# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)# 获取 RDD 中的第一个元素
first_element = flow.first()# 打印结果
print(f"RDD 中的第一个元素: {first_element}")
3.5、take 算子
功能:返回RDD集合中的前N个元素【先从第一个分区取,如果不够再从第二个分区取】
分类:触发算子
注意:take返回的结果放入Driver内存中的,take数据量不能过大
# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)# 获取 RDD 中的前 5 个元素
first_five_elements = flow.take(5)# 打印结果
print(f"RDD 中的前 5 个元素: {first_five_elements}")
3.6、collect 算子
collect算子
功能:将RDD转化成一个列表返回
分类:触发算子
这个RDD的数据一定不能过大,如果RDD数据量很大,导致Driver内存溢出
理解:假如现在有三个分区,三个分区中都有数据,假如你现在想打印数据,此时打印哪个分区呢?先收集,将数据汇总在一起,再打印。
# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)# 获取 RDD 中的所有元素
all_elements = flow.collect()# 打印结果
print(f"RDD 中的所有元素: {all_elements}")
3.7、reduce算子
功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
分类:触发算子
# tmp用于存储每次计算临时结果,item就是RDD中的每个元素
def reduce(self,f : (T,T) -> U) -> U
# 创建一个包含整数的 RDD
numbers = sc.parallelize([1, 2, 3, 4, 5])# 使用 reduce 算子计算总和
total_sum = numbers.reduce(lambda a, b: a + b)# 打印结果
print(f"整数的总和: {total_sum}")
3.8、top算子
功能:对RDD中的所有元素降序排序,并返回前N个元素,即返回RDD中最大的前N个元数据
分类:触发算子
场景:取RDD数据中的最大的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
语法:def top(self,num) -> List[0]
# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)# 获取 RDD 中的前 5 个元素,按字符串长度降序排序
top_five_elements = flow.top(5, key=lambda x: len(x))# 打印结果
print(f"RDD 中的前 5 个元素(按字符串长度降序): {top_five_elements}")
3.9、takeOrdered算子
功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
分类:触发算子
场景:取RDD数据中的最小的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能
适合处理小数据量
语法:def takeOrdered(self,num) -> List[0]
# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)# 获取 RDD 中的前 5 个元素,按字符串长度升序排序
ordered_five_elements = flow.takeOrdered(5, key=lambda x: len(x))# 打印结果
print(f"RDD 中的前 5 个元素(按字符串长度升序): {ordered_five_elements}")