PySpark中DataFrame应用升阶及UDF使用
目录
- 1. 加载数据
- 2. 列常见操作
- 2.1 添加新列
- 2.2 重命名列
- 2.3 删除指定列
- 2.4 修改数据
 
- 3 空值处理
- 3.1 丢弃空值
- 3.2 空值填充
 
- 4 聚合操作
- 4.1 分组聚合
 
- 5 用户自定义函数(UDF)
- 5.1 传统UDF函数
- 5.2 Pandas UDF(向量化UDF)
 
- 参考资料

import findspark
findspark.init() 
from pyspark.sql import SparkSession
# 创建 Spark 会话
spark = SparkSession.builder \.appName("Test PySpark") \.master("local[*]") \.getOrCreate()
sc=spark.sparkContext
sc.master
'local[*]'
1. 加载数据
mes_df = spark.read.csv('spark练习数据.csv',inferSchema=True,header=True)
mes_df.columns
['销售年份','车辆用途','电池形状','动力类型','电池类型','行驶里程(km)','容量保持率(%)','电量(KWh)','质量(kg)','车辆数(辆)']
print(mes_df.printSchema())
mes_df.show(3)
root|-- 销售年份: integer (nullable = true)|-- 车辆用途: string (nullable = true)|-- 电池形状: string (nullable = true)|-- 动力类型: string (nullable = true)|-- 电池类型: string (nullable = true)|-- 行驶里程(km): integer (nullable = true)|-- 容量保持率(%): double (nullable = true)|-- 电量(KWh): double (nullable = true)|-- 质量(kg): double (nullable = true)|-- 车辆数(辆): integer (nullable = true)None
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+
only showing top 3 rows
2. 列常见操作
2.1 添加新列
mes_df2 = mes_df.withColumn('功率',mes_df['电量(KWh)']/mes_df['质量(kg)'])
mes_df2.show(3)
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|              功率|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5|0.0827491408934708|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
only showing top 3 rows
2.2 重命名列
mes_df2 = mes_df2.withColumnsRenamed({'功率':'功率(wh)'})
mes_df2.show(5)
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|动力类型|电池类型|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|  纯电动|磷酸铁锂|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+--------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows
2.3 删除指定列
mes_df3 = mes_df2.drop('电池类型','动力类型')
mes_df3.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows
2.4 修改数据
mes_f = mes_df3.replace({'私人乘用车':'私人'})
mes_f.show(3)
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|销售年份|车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|          功率(wh)|
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
|    2013|    私人|    方形|       50000|         92.2|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|    私人|    方形|       85000|         91.6|    128.0|  1525.0|         5|0.0839344262295082|
|    2013|    私人|    方形|       30000|         94.3|    120.4|  1455.0|         5|0.0827491408934708|
+--------+--------+--------+------------+-------------+---------+--------+----------+------------------+
only showing top 3 rows
3 空值处理
3.1 丢弃空值
mes_df4 = mes_df3.dropna(how = 'all',subset=['车辆用途'])
mes_df4.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows
3.2 空值填充
mes_df5 = mes_df4.fillna({'销售年份':1970,'车辆用途':'私人乘用车'})
mes_df5.show(5)
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|销售年份|  车辆用途|电池形状|行驶里程(km)|容量保持率(%)|电量(KWh)|质量(kg)|车辆数(辆)|           功率(wh)|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
|    2013|私人乘用车|    方形|       50000|         92.2|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       85000|         91.6|    128.0|  1525.0|         5| 0.0839344262295082|
|    2013|私人乘用车|    方形|       30000|         94.3|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       70000|         87.4|    120.4|  1455.0|         5| 0.0827491408934708|
|    2013|私人乘用车|    方形|       75000|         87.5|     94.8|  1150.0|         4|0.08243478260869565|
+--------+----------+--------+------------+-------------+---------+--------+----------+-------------------+
only showing top 5 rows
4 聚合操作
4.1 分组聚合
g_df = mes_df.groupBy('销售年份').agg({'电量(KWh)':'sum','质量(kg)':'sum'})
# g_df = g_df.withColumnRenamed({'sum(质量(kg))':'总质量','sum(电量(KWh))':'总电量'})
g_df =  g_df.withColumnRenamed('sum(质量(kg))','总质量').withColumnRenamed('sum(电量(KWh))','总电量')
g_df.show(3)
+--------+--------------------+-------------------+
|销售年份|              总质量|             总电量|
+--------+--------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|
|    2015|           2551879.0| 240302.29999999897|
|    2013|             20373.6| 1867.0999999999992|
+--------+--------------------+-------------------+
only showing top 3 rows
g_df.show()
+--------+--------------------+-------------------+
|销售年份|       sum(质量(kg))|     sum(电量(KWh))|
+--------+--------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|
|    2015|           2551879.0| 240302.29999999897|
|    2013|             20373.6| 1867.0999999999992|
|    2014|            220269.0| 20847.199999999968|
|    2019| 2.346486377000002E8|3.259483540000003E7|
|    2016| 4.531763910000002E7|  4402030.500000004|
|    2017|1.7762183689999998E8|1.887332539999987E7|
+--------+--------------------+-------------------+
5 用户自定义函数(UDF)
PySpark 有两种 UDF:传统UDF(非向量化UDF) 和 Pandas UDF(向量化UDF)
传统UDF(非向量化UDF) :通过Python函数逐行处理数据,使用pyspark.sql.functions.udf注册
- 优点:
- 适合简单逻辑(如字符串处理、数值转换)
- 在所有Spark版本(≥1.3)中均可使用
- 逐行调试方便,便于通过print或日志逐行调试逻辑。
- 缺点:
- 性能差,高延迟,尤其在大数据集上可能成为瓶颈。
- 需手动处理Spark数据类型与Python类型的映射,易因类型不匹配出错。
- 无法批量处理数据,无法利用现代CPU的SIMD指令加速。
Pandas UDF(向量化UDF):将整个列或分块数据转换为Pandas Series/DataFrame,批量处理。基于Apache Arrow的批量处理模式,使用pyspark.sql.functions.pandas_udf定义。
- 优点:
- 高性能,利用Pandas向量化操作,数据通过Arrow以零拷贝方式传输,减少序列化开销。
- 支持复杂操作,适合处理整列或分组数据(如时间窗口计算、分组聚合)
- Arrow优化内存布局,减少内存占用。
- 缺点:
- 依赖Arrow和Pandas。需要安装PyArrow和Pandas,且版本需与Spark兼容。
- 调试困难,批量处理逻辑出错时,难以定位具体行的问题。
- 型处理隐式,需熟悉Pandas与Spark类型的隐式转换规则,类型错误可能更隐晦。
5.1 传统UDF函数
from pyspark.sql.functions import udf # 传统udfdef powerCal(num,num2):return num/num2# udf创建
power_udf = udf(powerCal,DoubleType())
g_df2 = g_df.withColumn('功率',power_udf(g_df['总电量'],g_df['总质量']))
g_df2.show()
+--------+--------------------+-------------------+-------------------+
|销售年份|              总质量|             总电量|               功率|
+--------+--------------------+-------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|0.12600067844570834|
|    2015|           2551879.0| 240302.29999999897|0.09416680806574253|
|    2013|             20373.6| 1867.0999999999992|0.09164310676561822|
|    2014|            220269.0| 20847.199999999968|0.09464427586269501|
|    2019| 2.346486377000002E8|3.259483540000003E7|0.13890911841419995|
|    2016| 4.531763910000002E7|  4402030.500000004|0.09713724252682886|
|    2017|1.7762183689999998E8|1.887332539999987E7|0.10625565937945705|
+--------+--------------------+-------------------+-------------------+
5.2 Pandas UDF(向量化UDF)
from pyspark.sql.functions import pandas_udf # 向量化udfdef powerCal2(num,num2):return num/num2# udf创建
power_pudf = pandas_udf(powerCal,DoubleType())
g_df3 = g_df.withColumn('功率',power_pudf(g_df['总电量'],g_df['总质量']))
g_df3.show()
+--------+--------------------+-------------------+-------------------+
|销售年份|              总质量|             总电量|               功率|
+--------+--------------------+-------------------+-------------------+
|    2018|2.4133957670000014E8|3.040895040000008E7|0.12600067844570834|
|    2015|           2551879.0| 240302.29999999897|0.09416680806574253|
|    2013|             20373.6| 1867.0999999999992|0.09164310676561822|
|    2014|            220269.0| 20847.199999999968|0.09464427586269501|
|    2019| 2.346486377000002E8|3.259483540000003E7|0.13890911841419995|
|    2016| 4.531763910000002E7|  4402030.500000004|0.09713724252682886|
|    2017|1.7762183689999998E8|1.887332539999987E7|0.10625565937945705|
+--------+--------------------+-------------------+-------------------+
spark.stop()
参考资料
《Python+Spark 2.0+Hadoop机器学习与大数据实战》, 林大贵,清华大学出版社,2017-12,9787302490739

