当前位置: 首页 > news >正文

PySpark 常用算子详解

PySpark 提供了丰富的算子用于分布式数据处理,主要分为转换算子(Transformations)和行动算子(Actions)。转换算子用于创建新的 RDD 或 DataFrame,而行动算子触发实际的计算并返回结果。

一、RDD 常用算子

1. 转换算子

转换算子是惰性的,不会立即执行,而是构建计算图。

1.1 基本转换
  • map(func)
    对 RDD 中的每个元素应用函数func

    rdd = sc.parallelize([1, 2, 3])
    squared = rdd.map(lambda x: x**2)  # [1, 4, 9]
    
  • filter(func)
    过滤出满足函数func条件的元素。

    even_nums = rdd.filter(lambda x: x % 2 == 0)  # [2]
    
  • flatMap(func)
    先应用func,再将结果展平。

    words = sc.parallelize(["Hello world", "Spark is fast"])
    flat_words = words.flatMap(lambda x: x.split())  # ["Hello", "world", "Spark", "is", "fast"]
    
1.2 集合操作
  • union(other)
    合并两个 RDD。

    rdd1 = sc.parallelize([1, 2])
    rdd2 = sc.parallelize([3, 4])
    union_rdd = rdd1.union(rdd2)  # [1, 2, 3, 4]
    
  • intersection(other)
    返回两个 RDD 的交集。

    rdd1 = sc.parallelize([1, 2, 3])
    rdd2 = sc.parallelize([3, 4, 5])
    intersect_rdd = rdd1.intersection(rdd2)  # [3]
    
  • distinct()
    去重。

    rdd = sc.parallelize([1, 1, 2, 2, 3])
    distinct_rdd = rdd.distinct()  # [1, 2, 3]
    
1.3 键值对操作
  • groupByKey()
    按键分组。

    pairs = sc.parallelize([("a", 1), ("a", 2), ("b", 3)])
    grouped = pairs.groupByKey()  # [("a", [1, 2]), ("b", [3])]
    
  • reduceByKey(func)
    按键聚合值。

    summed = pairs.reduceByKey(lambda x, y: x + y)  # [("a", 3), ("b", 3)]
    
  • join(other)
    键值对 RDD 的内连接。

    rdd1 = sc.parallelize([("a", 1), ("b", 2)])
    rdd2 = sc.parallelize([("a", 3), ("a", 4)])
    joined = rdd1.join(rdd2)  # [("a", (1, 3)), ("a", (1, 4))]
    
2. 行动算子

行动算子触发计算并返回结果或写入外部存储。

2.1 基本行动
  • collect()
    将 RDD 的所有元素收集到驱动程序。

    rdd = sc.parallelize([1, 2, 3])
    result = rdd.collect()  # [1, 2, 3]
    
  • count()
    返回 RDD 的元素个数。

    count = rdd.count()  # 3
    
  • take(n)
    返回 RDD 的前 n 个元素。

    first_two = rdd.take(2)  # [1, 2]
    
  • reduce(func)
    使用函数func聚合 RDD 元素。

    total = rdd.reduce(lambda x, y: x + y)  # 6
    
2.2 保存操作
  • saveAsTextFile(path)
    将 RDD 保存为文本文件。
    rdd.saveAsTextFile("hdfs://path/to/output")
    

二、DataFrame 常用算子

1. 转换算子

DataFrame 的转换算子基于关系代数,支持 SQL 风格操作。

1.1 选择与过滤
  • select(cols)
    选择列。

    df.select("name", "age").show()
    
  • filter(condition)
    过滤行。

    df.filter(df["age"] > 20).show()
    
  • where(condition)
    等价于filter

    df.where("age > 20").show()
    
1.2 聚合操作
  • groupBy(cols)
    按列分组。

    df.groupBy("department").avg("salary").show()
    
  • agg(expressions)
    自定义聚合。

    from pyspark.sql.functions import sum, avg
    df.agg(sum("sales"), avg("age")).show()
    
1.3 连接操作
  • join(other, on, how)
    连接两个 DataFrame。
    df1.join(df2, on="id", how="inner").show()
    
1.4 排序与去重
  • sort(cols)
    排序。

    df.sort("age", ascending=False).show()
    
  • dropDuplicates(subset)
    去重。

    df.dropDuplicates(["name", "age"]).show()
    
2. 行动算子
  • show(n)
    显示前 n 行。

    df.show(5)
    
  • count()
    统计行数。

    rows = df.count()
    
  • collect()
    收集所有行到驱动程序。

    data = df.collect()
    
  • toPandas()
    转换为 Pandas DataFrame。

    pandas_df = df.toPandas()
    

三、SQL 函数

PySpark 提供了丰富的 SQL 函数,用于复杂的数据处理。

1. 数学函数
  • sum()avg()max()min()count()
  • round()sqrt()log()exp()
2. 字符串函数
  • concat()substring()lower()upper()trim()
  • split()regexp_replace()
3. 日期时间函数
  • current_date()current_timestamp()
  • date_format()year()month()dayofmonth()
4. 条件函数
  • when()otherwise()
  • ifnull()coalesce()

示例:

from pyspark.sql.functions import when, coldf.withColumn("age_group", when(col("age") < 18, "minor").when(col("age") < 60, "adult").otherwise("senior"))

四、窗口函数

窗口函数允许在特定行组上执行计算,无需分组。

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, row_numberwindow_spec = Window.partitionBy("department").orderBy("salary")# 添加排名列
df.withColumn("rank", rank().over(window_spec)).show()

五、算子执行顺序

PySpark 采用惰性计算

  1. 转换算子构建 DAG(有向无环图)
  2. 行动算子触发 DAG 的执行
  3. 中间结果可能被缓存(使用cache()persist()

示例:

rdd = sc.parallelize([1, 2, 3, 4])
rdd = rdd.map(lambda x: x**2).filter(lambda x: x > 5)  # 转换
result = rdd.collect()  # 行动,触发计算

六、性能优化建议

  1. 避免使用collect():仅用于小数据结果,大数据会导致驱动程序内存溢出
  2. 使用广播变量:将小表广播到所有 Executor,减少 Shuffle
  3. 合理分区:通过repartition()coalesce()调整分区数
  4. 缓存重用数据:对需要多次使用的 RDD 或 DataFrame 使用cache()
  5. 优先使用 DataFrame:比 RDD 更高效(基于 Catalyst 优化器)

通过掌握这些算子,你可以高效地处理和分析大规模数据集。在实际应用中,建议结合具体业务场景选择合适的算子,并注意性能调优。


文章转载自:
http://basha.aaladrg.cn
http://bialy.aaladrg.cn
http://aglisten.aaladrg.cn
http://allelic.aaladrg.cn
http://angiotomy.aaladrg.cn
http://catholic.aaladrg.cn
http://bauk.aaladrg.cn
http://blithe.aaladrg.cn
http://cardiotachometer.aaladrg.cn
http://barkeeper.aaladrg.cn
http://anaerobic.aaladrg.cn
http://chateau.aaladrg.cn
http://agrophilous.aaladrg.cn
http://aswarm.aaladrg.cn
http://auteurism.aaladrg.cn
http://carinate.aaladrg.cn
http://abcoulomb.aaladrg.cn
http://bleacher.aaladrg.cn
http://chanceless.aaladrg.cn
http://asyndetic.aaladrg.cn
http://autographic.aaladrg.cn
http://characterological.aaladrg.cn
http://agglutinability.aaladrg.cn
http://arguer.aaladrg.cn
http://airhead.aaladrg.cn
http://bulli.aaladrg.cn
http://bondservice.aaladrg.cn
http://chalcidian.aaladrg.cn
http://africanism.aaladrg.cn
http://actinide.aaladrg.cn
http://www.dtcms.com/a/280698.html

相关文章:

  • 【BUG处理】构建APK时遇到错误:‘flutter‘ 命令未被识别。这通常表示您的系统中未安装Flutter SDK或环境变量配置不正确。
  • 牛客:HJ20 密码验证合格程序[华为机考][字符串]
  • 【源力觉醒 创作者计划】文心4.5 vs DeepSeek vs Qwen 3.0:三大能力硬核实测!谁才是王者?
  • 纸板加工制造学习1
  • CF37E Trial for Chief 题解
  • 青年科学基金项目答辩PPT模板 | 杰青优青ppt设计制作美化 | WordinPPT
  • uni-app 学习笔记:Vuex 持久化数据
  • 【C++】神奇的AVL树
  • Java单元测试JUnit
  • 使用 Java 获取 PDF 页面信息(页数、尺寸、旋转角度、方向、标签与边框)
  • 已知均数与标准差,如何生成一组正态分布数据?
  • EPLAN 电气制图(九):直流电源绘制+端子排绘制
  • 线程(二) linux 互斥
  • JVM——有哪些常见的垃圾收集器
  • Props
  • 时序数据库与AI的融合:智能时代的数据基石
  • 027_国际化与本地化
  • Spring应用抛出NoHandlerFoundException、全局异常处理、日志级别
  • FreeRTOS学习笔记——移植说明、任务创建
  • 【Ubuntu22.04】repo安装方法
  • Linux715 磁盘管理:逻辑卷
  • 聊聊MySQL中的buffer pool
  • Spring Boot目录变文件夹?3步解决!
  • Unity Editor下拉框,支持搜索,多层级
  • BGP服务器和多线服务器的不同之处
  • Python初学者笔记第十三期 -- (常用内置函数)
  • 原点安全签约金网络数科,共建一体化数据安全防护体系
  • Docker 镜像(Image)常用命令总结
  • ASP .NET Core 8结合JWT轻松实现身份验证和授权
  • CMake基础:覆盖项目开发的五大配套工具