当前位置: 首页 > news >正文

spark-RDD期中

一 sc对象创建处理

from pyspark import SparkConf,SparkContext
conf=SparkConf().setMaster("local[*]").setAppName("My APP")
sc=SparkContext(conf=conf)

二 RDD对象创建

rdd1=sc.textFile("*.csv")
rdd1.disctinct().map(lambda x:x.split(",")).filter(lambda x:x[0]!="id")

三 转化算子

(一):value算子

from pyspark import SparkContextsc = SparkContext("local", "Example")# 1. filter
data = sc.parallelize([1, 2, 3, 4, 5])
filtered_data = data.filter(lambda x: x % 2 == 0)
filtered_data.collect()  # 输出: [2, 4]# 2. map
mapped_data = data.map(lambda x: x * 2)
mapped_data.collect()  # 输出: [2, 4, 6, 8, 10]# 3. flatMap
data = sc.parallelize(["hello world", "hi"])
flattened_data = data.flatMap(lambda x: x.split(" "))
flattened_data.collect()  # 输出: ['hello', 'world', 'hi']# 4. sortBy
data = sc.parallelize([1, 2, 3, 4, 5])
sorted_data = data.sortBy(lambda x: x, ascending=False)
sorted_data.collect()  # 输出: [5, 4, 3, 2, 1]# 5. groupBy
data = sc.parallelize([1, 2, 3, 4, 5])
grouped_data = data.groupBy(lambda x: x % 2)
grouped_data.collect()  # 输出: [(0, <pyspark.resultiterable.ResultIterable object at 0x...>), (1, <pyspark.resultiterable.ResultIterable object at 0x...>)]# 6. distinct
data = sc.parallelize([1, 2, 2, 3, 4, 4, 5])
distinct_data = data.distinct()
distinct_data.collect()  # 输出: [1, 2, 3, 4, 5]# 7. union
data1 = sc.parallelize([1, 2, 3])
data2 = sc.parallelize([4, 5, 6])
union_data = data1.union(data2)
union_data.collect()  # 输出: [1, 2, 3, 4, 5, 6]# 8. intersection
data1 = sc.parallelize([1, 2, 3, 4])
data2 = sc.parallelize([3, 4, 5, 6])
intersection_data = data1.intersection(data2)
intersection_data.collect()  # 输出: [3, 4]# 9. subtract
data1 = sc.parallelize([1, 2, 3, 4])
data2 = sc.parallelize([3, 4, 5, 6])
subtract_data = data1.subtract(data2)
subtract_data.collect()  # 输出: [1, 2]# 10. zip
data1 = sc.parallelize([1, 2, 3])
data2 = sc.parallelize(['a', 'b', 'c'])
zipped_data = data1.zip(data2)
zipped_data.collect()  # 输出: [(1, 'a'), (2, 'b'), (3, 'c')]# 11. glom
data = sc.parallelize([1, 2, 3, 4, 5], 2)  # 2 个分区
glom_data = data.glom()
glom_data.collect()  # 输出: [[1, 2, 3], [4, 5]]# 12. repartition
data = sc.parallelize([1, 2, 3, 4, 5], 2)  # 2 个分区
repartitioned_data = data.repartition(3)  # 重新分区为 3 个分区
repartitioned_data.getNumPartitions()  # 输出: 3# 13. mapPartitions
def process_partition(iterator):return [x * 2 for x in iterator]data = sc.parallelize([1, 2, 3, 4, 5], 2)  # 2 个分区
mapped_partitions_data = data.mapPartitions(process_partition)
mapped_partitions_data.collect()  # 输出: [2, 4, 6, 8, 10]
  • filter:过滤操作,用于从原始数据集中筛选出满足特定条件的元素,返回一个新的数据集,只包含符合条件的元素。

  • map:映射操作,对数据集中的每个元素应用一个指定的函数,将每个元素转换为新的值,返回一个新的数据集。

  • flatMap:扁平化映射操作,对数据集中的每个元素应用一个函数,该函数返回一个可迭代对象(如列表),然后将这些可迭代对象中的所有元素合并成一个扁平化的数据集。

  • sortBy:排序操作,根据指定的函数对数据集中的元素进行排序,可以指定升序或降序。

  • groupBy:分组操作,根据指定的函数对数据集中的元素进行分组,将具有相同键的元素归为一组,返回一个键值对的数据集,其中键是分组的依据,值是属于该组的元素集合。

  • distinct:去重操作,返回一个新的数据集,其中包含原始数据集中的唯一元素,去除重复的元素。

  • union:并集操作,将两个数据集合并为一个新的数据集,包含两个数据集中的所有元素。

  • intersection:交集操作,返回两个数据集的交集,即同时存在于两个数据集中的元素。

  • subtract:差集操作,返回第一个数据集中不在第二个数据集中的元素。

  • zip:配对操作,将两个数据集的元素按顺序配对,返回一个新的数据集,其中每个元素是一个元组,包含来自两个数据集的对应元素。

  • glom:分区聚合操作,将每个分区的数据转换为一个列表,返回一个新的数据集,其中每个元素是一个列表,表示一个分区的数据。

  • repartition:重新分区操作,将数据集重新分区为指定数量的分区,可以用于调整数据的分布和并行度。

  • mapPartitions:分区映射操作,对每个分区的数据应用一个函数,返回一个新的数据集,其中每个分区的数据是经过函数处理后的结果。

(二)Key_Value算子

from pyspark import SparkContextsc = SparkContext("local", "KeyValueExample")# 创建一个 Key-Value 类型的 RDD
data = sc.parallelize([(1, 2), (3, 4), (3, 6), (4, 5), (5, 6), (3, 8)])# 1. partitionBy
partitioned_data = data.partitionBy(2)  # 根据分区器分区
partitioned_data.getNumPartitions()  # 输出: 2# 2. reduceByKey
reduced_data = data.reduceByKey(lambda a, b: a + b)
reduced_data.collect()  # 输出: [(1, 2), (3, 18), (4, 5), (5, 6)]# 3. keys
keys_data = data.keys()
keys_data.collect()  # 输出: [1, 3, 3, 4, 5, 3]# 4. values
values_data = data.values()
values_data.collect()  # 输出: [2, 4, 6, 5, 6, 8]# 5. sortByKey
sorted_data = data.sortByKey()
sorted_data.collect()  # 输出: [(1, 2), (3, 4), (3, 6), (3, 8), (4, 5), (5, 6)]# 6. groupByKey
grouped_data = data.groupByKey()
grouped_data.collect()  # 输出: [(1, <pyspark.resultiterable.ResultIterable object at 0x...>), (3, <pyspark.resultiterable.ResultIterable object at 0x...>), (4, <pyspark.resultiterable.ResultIterable object at 0x...>), (5, <pyspark.resultiterable.ResultIterable object at 0x...>)]
# 转换为更易读的格式
grouped_data.mapValues(list).collect()  # 输出: [(1, [2]), (3, [4, 6, 8]), (4, [5]), (5, [6])]# 7. mapValues
mapped_values_data = data.mapValues(lambda x: x * 2)
mapped_values_data.collect()  # 输出: [(1, 4), (3, 8), (3, 12), (4, 10), (5, 12), (3, 16)]# 8. flatMapValues
flattened_values_data = data.flatMapValues(lambda x: [x, x * 2])
flattened_values_data.collect()  # 输出: [(1, 2), (1, 4), (3, 4), (3, 8), (3, 6), (3, 12), (4, 5), (4, 10), (5, 6), (5, 12), (3, 8), (3, 16)]# 9. join
data1 = sc.parallelize([(1, 2), (3, 4), (3, 6)])
data2 = sc.parallelize([(3, 8), (4, 5)])
joined_data = data1.join(data2)
joined_data.collect()  # 输出: [(3, (4, 8)), (3, (6, 8))]# 10. leftOuterJoin
left_joined_data = data1.leftOuterJoin(data2)
left_joined_data.collect()  # 输出: [(1, (2, None)), (3, (4, 8)), (3, (6, 8))]# 11. rightOuterJoin
right_joined_data = data1.rightOuterJoin(data2)
right_joined_data.collect()  # 输出: [(3, (4, 8)), (4, (None, 5)), (3, (6, 8))]
  • partitionBy:分区操作,根据指定的分区器对 Key-Value 类型的数据集进行分区,将具有相同键的元素分配到同一个分区中,通常用于优化后续的聚合操作。

  • reduceByKey:聚合操作,对 Key-Value 类型的数据集中的每个键对应的值进行聚合,使用指定的函数将同一个键的所有值合并为一个值,返回一个新的 Key-Value 数据集。

  • keys:提取键操作,从 Key-Value 类型的数据集中提取所有的键,返回一个新的数据集,只包含键。

  • values:提取值操作,从 Key-Value 类型的数据集中提取所有的值,返回一个新的数据集,只包含值。

  • sortByKey:按键排序操作,根据 Key-Value 类型数据集中的键对数据进行排序,可以指定升序或降序。

  • groupByKey:按键分组操作,将 Key-Value 类型的数据集中的所有值按照键进行分组,返回一个新的 Key-Value 数据集,其中每个键对应一个值的集合。

  • mapValues:映射值操作,对 Key-Value 类型的数据集中的每个值应用一个函数,返回一个新的 Key-Value 数据集,键保持不变,值被转换为新的值。

  • flatMapValues:扁平化映射值操作,对 Key-Value 类型的数据集中的每个值应用一个函数,该函数返回一个可迭代对象(如列表),然后将这些可迭代对象中的所有元素合并成一个扁平化的 Key-Value 数据集。

  • join:连接操作,对两个 Key-Value 类型的数据集进行内连接,返回一个新的 Key-Value 数据集,其中每个键对应的值是两个数据集中该键对应的值的组合。

  • leftOuterJoin:左外连接操作,对两个 Key-Value 类型的数据集进行左外连接,返回一个新的 Key-Value 数据集,其中第一个数据集中的每个键都包含在结果中,如果第二个数据集中没有对应的键,则值为 None

  • rightOuterJoin:右外连接操作,对两个 Key-Value 类型的数据集进行右外连接,返回一个新的 Key-Value 数据集,其中第二个数据集中的每个键都包含在结果中,如果第一个数据集中没有对应的键,则值为 None

四 行动算子

from pyspark import SparkContextsc = SparkContext("local", "ActionExample")# 创建一个 RDD
data = sc.parallelize([(1, 2), (3, 4), (3, 6), (4, 5), (5, 6), (3, 8)])# 1. collect
collected_data = data.collect()
print(collected_data)  # 输出: [(1, 2), (3, 4), (3, 6), (4, 5), (5, 6), (3, 8)]# 2. collectAsMap
collected_map = data.collectAsMap()
print(collected_map)  # 输出: {1: 2, 3: 8, 4: 5, 5: 6}# 3. reduce
reduced_data = data.reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
print(reduced_data)  # 输出: (16, 31)# 4. take
taken_data = data.take(3)
print(taken_data)  # 输出: [(1, 2), (3, 4), (3, 6)]# 5. top
top_data = data.top(3, key=lambda x: x[1])
print(top_data)  # 输出: [(3, 8), (5, 6), (4, 5)]# 6. first
first_element = data.first()
print(first_element)  # 输出: (1, 2)# 7. count
count_data = data.count()
print(count_data)  # 输出: 6# 8. countByKey
count_by_key = data.countByKey()
print(count_by_key)  # 输出: defaultdict(<class 'int'>, {1: 1, 3: 3, 4: 1, 5: 1})# 9. saveAsTextFile
data.saveAsTextFile("output")# 10. aggregate
# 定义初始值
zero_value = (0, 0)
# 分区内聚合函数
seq_op = lambda acc, value: (acc[0] + value[0], acc[1] + value[1])
# 分区间聚合函数
comb_op = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
aggregated_data = data.aggregate(zero_value, seq_op, comb_op)
print(aggregated_data)  # 输出: (16, 31)
  • collect:将 RDD 中的所有元素收集到驱动程序中,返回一个列表。

  • collectAsMap:将 Key-Value 类型的 RDD 收集到驱动程序中,返回一个字典。

  • reduce:对 RDD 中的所有元素应用一个函数,将所有元素合并为一个值。

  • take:返回 RDD 中的前 n 个元素。

  • top:返回 RDD 中的前 n 个元素,根据指定的排序函数。

  • first:返回 RDD 中的第一个元素。

  • count:返回 RDD 中的元素数量。

  • countByKey:对 Key-Value 类型的 RDD,返回每个键对应的元素数量。

  • saveAsTextFile:将 RDD 保存为文本文件。

  • aggregate:对 RDD 中的元素进行聚合操作,可以指定初始值、分区内的聚合函数和分区间的聚合函数。

我特喜欢hjp(超小声)

http://www.dtcms.com/a/541568.html

相关文章:

  • Linux 网络初识
  • 易天光通信光模块认证全解析:构建全球品质信任网络
  • 计算机网络自顶向下方法12——应用层 对HTTP响应报文优先次序的答疑
  • 上海企业建设网站服务网站的首页标题在哪里设置的
  • 城市建设规划网站无锡网站建设f7wl
  • 算法题 逆波兰表达式/计算器
  • 智能体最佳实践的方法论(四):监控
  • 【java面向对象进阶】------内部类
  • 基于昇腾 NPU 的 Gemma 2 推理实测:性能评测、脚本实现与可视化分析
  • 南京设计公司郑州粒米seo顾问
  • 承接电商网站建设中文网站模板大全
  • 折半查找及其判定树的性质
  • Day 6 PPI与Cox
  • 网站dns刷新庐江县建设局网站
  • 网站的按钮怎么做 视频3g 手机网站建设
  • 豆包凶猛,深度解析字节AI战略
  • 【案例实战】HarmonyOS云开发实战:5分钟快速构建全栈应用
  • 为什么你的React项目到中等规模就开始“烂尾“?问题可能出在文件结构
  • 做思维导图好看的网站企业网络规划开题报告
  • 企业网站建设合同模板wordpress密码可见
  • 基于 Element Plus 的 TableColumnGroup 组件使用说明
  • 学校网站代码这么做3d网站
  • 国外购物网站系统出入东莞最新通知今天
  • 如何删除 AEDT 中的排队模拟?
  • 做网站的公司面试邢台企业建站
  • 万站群cms平台怎么推广技巧
  • 一加13/13T手动偷渡ColorOS16系统-享受德芙丝滑+增量包下载
  • 数据结构——三十二、最短路径问题——BFS算法(王道408)
  • 最新的高端网站建设网站结构方面主要做哪些优化
  • 电子商务静态网站建设心得网站服务合同用交印花税吗