当前位置: 首页 > news >正文

# Pandas 与 Spark 数据操作完整教程

Pandas 与 Spark 数据操作完整教程

目录

  1. 基础介绍
  2. 简单数据操作
  3. 复杂数据操作
  4. 事务处理
  5. 窗口函数
  6. 子查询
  7. 性能优化与最佳实践
  8. 常见问题与陷阱

基础介绍

Pandas 简介

Pandas 是 Python 中用于数据分析的核心库,主要用于处理结构化数据。它提供了两种主要的数据结构:

  • Series: 一维数组
  • DataFrame: 二维表格数据

Spark 简介

Apache Spark 是一个分布式计算框架,适用于大规模数据处理。PySpark 是 Spark 的 Python API。

  • RDD: 弹性分布式数据集
  • DataFrame: 分布式的数据表
  • SQL: 支持 SQL 查询

环境配置

# Pandas 安装
pip install pandas numpy# PySpark 安装
pip install pyspark# 导入库
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

初始化 Spark

# 创建 SparkSession
spark = SparkSession.builder \.appName("DataProcessingTutorial") \.config("spark.sql.shuffle.partitions", "4") \.getOrCreate()# 设置日志级别
spark.sparkContext.setLogLevel("ERROR")

简单数据操作

1. 数据读取

Pandas 读取数据
# 读取 CSV
df_pandas = pd.read_csv('data.csv')# 读取 Excel
df_pandas = pd.read_excel('data.xlsx', sheet_name='Sheet1')# 读取 JSON
df_pandas = pd.read_json('data.json')# 读取 SQL 数据库
import sqlite3
conn = sqlite3.connect('database.db')
df_pandas = pd.read_sql_query("SELECT * FROM table_name", conn)# 从字典创建 DataFrame
data = {'name': ['Alice', 'Bob', 'Charlie', 'David'],'age': [25, 30, 35, 28],'salary': [50000, 60000, 75000, 55000],'department': ['IT', 'HR', 'IT', 'Sales']
}
df_pandas = pd.DataFrame(data)
print(df_pandas)
Spark 读取数据
# 读取 CSV
df_spark = spark.read.csv('data.csv', header=True, inferSchema=True)# 读取 JSON
df_spark = spark.read.json('data.json')# 读取 Parquet
df_spark = spark.read.parquet('data.parquet')# 从 JDBC 读取数据库
df_spark = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql://localhost:5432/mydb") \.option("dbtable", "table_name") \.option("user", "username") \.option("password", "password") \.load()# 从列表创建 DataFrame
data = [('Alice', 25, 50000, 'IT'),('Bob', 30, 60000, 'HR'),('Charlie', 35, 75000, 'IT'),('David', 28, 55000, 'Sales')
]
columns = ['name', 'age', 'salary', 'department']
df_spark = spark.createDataFrame(data, columns)
df_spark.show()

2. 数据查看

Pandas
# 查看前几行
print(df_pandas.head())# 查看后几行
print(df_pandas.tail())# 查看数据信息
print(df_pandas.info())# 查看统计信息
print(df_pandas.describe())# 查看形状
print(f"Shape: {df_pandas.shape}")# 查看列名
print(f"Columns: {df_pandas.columns.tolist()}")# 查看数据类型
print(df_pandas.dtypes)
Spark
# 显示数据
df_spark.show()# 显示指定行数
df_spark.show(5, truncate=False)# 打印 schema
df_spark.printSchema()# 查看统计信息
df_spark.describe().show()# 查看行数
print(f"Count: {df_spark.count()}")# 查看列名
print(f"Columns: {df_spark.columns}")

3. 数据选择

Pandas
# 选择单列
ages = df_pandas['age']
# 或
ages = df_pandas.age# 选择多列
subset = df_pandas[['name', 'age']]# 选择行(基于位置)
first_row = df_pandas.iloc[0]
first_three = df_pandas.iloc[0:3]# 选择行(基于标签)
row_by_label = df_pandas.loc[0]# 条件选择
high_earners = df_pandas[df_pandas['salary'] > 60000]# 多条件选择
it_high_earners = df_pandas[(df_pandas['salary'] > 55000) & (df_pandas['department'] == 'IT')
]# 使用 query 方法
result = df_pandas.query('age > 28 and salary > 50000')
Spark
# 选择单列
df_spark.select('age').show()# 选择多列
df_spark.select('name', 'age').show()# 条件过滤
df_spark.filter(df_spark['salary'] > 60000).show()
# 或使用 where
df_spark.where(df_spark['salary'] > 60000).show()# 使用 SQL 表达式
df_spark.filter("salary > 60000").show()# 多条件过滤
df_spark.filter((df_spark['salary'] > 55000) & (df_spark['department'] == 'IT')
).show()# 选择前 N 行
df_spark.limit(3).show()

4. 数据排序

Pandas
# 按单列排序
sorted_df = df_pandas.sort_values('age')# 按多列排序
sorted_df = df_pandas.sort_values(['department', 'salary'], ascending=[True, False])# 降序排序
sorted_df = df_pandas.sort_values('salary', ascending=False)# 按索引排序
sorted_df = df_pandas.sort_index()
Spark
# 按单列排序
df_spark.orderBy('age').show()# 降序排序
df_spark.orderBy(F.col('salary').desc()).show()# 按多列排序
df_spark.orderBy(['department', 'salary'], ascending=[True, False]).show()# 使用 sort
df_spark.sort(F.desc('salary')).show()

5. 数据聚合

Pandas
# 基本聚合
print(f"平均年龄: {df_pandas['age'].mean()}")
print(f"总工资: {df_pandas['salary'].sum()}")
print(f"最高工资: {df_pandas['salary'].max()}")
print(f"最低工资: {df_pandas['salary'].min()}")# 分组聚合
dept_stats = df_pandas.groupby('department').agg({'salary': ['mean', 'sum', 'count'],'age': 'mean'
})
print(dept_stats)# 多种聚合方式
result = df_pandas.groupby('department').agg(avg_salary=('salary', 'mean'),total_salary=('salary', 'sum'),employee_count=('name', 'count'),avg_age=('age', 'mean')
)
print(result)
Spark
# 基本聚合
df_spark.agg(F.mean('age').alias('avg_age'),F.sum('salary').alias('total_salary'),F.max('salary').alias('max_salary')
).show()# 分组聚合
df_spark.groupBy('department').agg(F.mean('salary').alias('avg_salary'),F.sum('salary').alias('total_salary'),F.count('name').alias('employee_count'),F.mean('age').alias('avg_age')
).show()# 多个分组字段
df_spark.groupBy('department', 'age').count().show()

复杂数据操作

1. 数据合并(Join)

Pandas
# 创建示例数据
employees = pd.DataFrame({'emp_id': [1, 2, 3, 4],'name': ['Alice', 'Bob', 'Charlie', 'David'],'dept_id': [101, 102, 101, 103]
})departments = pd.DataFrame({'dept_id': [101, 102, 103, 104],'dept_name': ['IT', 'HR', 'Sales', 'Marketing']
})# Inner Join
inner_join = pd.merge(employees, departments, on='dept_id', how='inner')
print("Inner Join:\n", inner_join)# Left Join
left_join = pd.merge(employees, departments, on='dept_id', how='left')
print("\nLeft Join:\n", left_join)# Right Join
right_join = pd.merge(employees, departments, on='dept_id', how='right')
print("\nRight Join:\n", right_join)# Outer Join
outer_join = pd.merge(employees, departments, on='dept_id', how='outer')
print("\nOuter Join:\n", outer_join)# 多键连接
# 假设有多个共同列
result = pd.merge(df1, df2, on=['key1', 'key2'], how='inner')# 不同列名连接
result = pd.merge(employees, departments, left_on='dept_id', right_on='department_id',how='inner'
)
Spark
# 创建示例数据
employees_spark = spark.createDataFrame([(1, 'Alice', 101),(2, 'Bob', 102),(3, 'Charlie', 101),(4, 'David', 103)
], ['emp_id', 'name', 'dept_id'])departments_spark = spark.createDataFrame([(101, 'IT'),(102, 'HR'),(103, 'Sales'),(104, 'Marketing')
], ['dept_id', 'dept_name'])# Inner Join
inner_join = employees_spark.join(departments_spark, 'dept_id', 'inner')
inner_join.show()# Left Join
left_join = employees_spark.join(departments_spark, 'dept_id', 'left')
left_join.show()# Right Join
right_join = employees_spark.join(departments_spark, 'dept_id', 'right')
right_join.show()# Full Outer Join
outer_join = employees_spark.join(departments_spark, 'dept_id', 'outer')
outer_join.show()# 多键连接
result = df1.join(df2, ['key1', 'key2'], 'inner')# 不同列名连接
result = employees_spark.join(departments_spark,employees_spark.dept_id == departments_spark.department_id,'inner'
)

2. 数据透视(Pivot)

Pandas
# 创建示例数据
sales_data = pd.DataFrame({'date': ['2024-01', '2024-01', '2024-02', '2024-02'],'product': ['A', 'B', 'A', 'B'],'region': ['East', 'East', 'West', 'West'],'sales': [100, 150, 120, 180]
})# 透视表
pivot_table = sales_data.pivot_table(values='sales',index='date',columns='product',aggfunc='sum',fill_value=0
)
print(pivot_table)# 多层透视
pivot_multi = sales_data.pivot_table(values='sales',index=['date', 'region'],columns='product',aggfunc=['sum', 'mean']
)
print(pivot_multi)# 逆透视(melt)
melted = pivot_table.reset_index().melt(id_vars='date',var_name='product',value_name='sales'
)
print(melted)
Spark
# 创建示例数据
sales_spark = spark.createDataFrame([('2024-01', 'A', 'East', 100),('2024-01', 'B', 'East', 150),('2024-02', 'A', 'West', 120),('2024-02', 'B', 'West', 180)
], ['date', 'product', 'region', 'sales'])# 透视表
pivot_df = sales_spark.groupBy('date').pivot('product').sum('sales')
pivot_df.show()# 指定透视值(提高性能)
pivot_df = sales_spark.groupBy('date').pivot('product', ['A', 'B']).sum('sales')
pivot_df.show()# 多个聚合函数
pivot_multi = sales_spark.groupBy('date').pivot('product').agg(F.sum('sales').alias('total_sales'),F.avg('sales').alias('avg_sales')
)
pivot_multi.show()

3. 数据转换

Pandas
# 创建新列
df_pandas['salary_k'] = df_pandas['salary'] / 1000
df_pandas['is_senior'] = df_pandas['age'] > 30# 使用 apply 函数
def categorize_age(age):if age < 25:return 'Junior'elif age < 35:return 'Mid-level'else:return 'Senior'df_pandas['age_category'] = df_pandas['age'].apply(categorize_age)# 使用 lambda
df_pandas['salary_bonus'] = df_pandas['salary'].apply(lambda x: x * 0.1)# 多列操作
df_pandas['total_comp'] = df_pandas.apply(lambda row: row['salary'] + row['salary_bonus'], axis=1
)# 条件转换 (np.where)
df_pandas['performance'] = np.where(df_pandas['salary'] > 60000, 'High', 'Normal'
)# 多条件转换 (np.select)
conditions = [df_pandas['salary'] > 70000,df_pandas['salary'] > 55000,df_pandas['salary'] <= 55000
]
choices = ['Excellent', 'Good', 'Average']
df_pandas['rating'] = np.select(conditions, choices, default='Unknown')# 字符串操作
df_pandas['name_upper'] = df_pandas['name'].str.upper()
df_pandas['name_length'] = df_pandas['name'].str.len()
Spark
# 创建新列
df_spark = df_spark.withColumn('salary_k', F.col('salary') / 1000)
df_spark = df_spark.withColumn('is_senior', F.col('age') > 30)# 使用 UDF (User Defined Function)
from pyspark.sql.types import StringTypedef categorize_age(age):if age < 25:return 'Junior'elif age < 35:return 'Mid-level'else:return 'Senior'categorize_udf = F.udf(categorize_age, StringType())
df_spark = df_spark.withColumn('age_category', categorize_udf('age'))# 使用 when-otherwise
df_spark = df_spark.withColumn('performance',F.when(F.col('salary') > 60000, 'High').otherwise('Normal')
)# 多条件
df_spark = df_spark.withColumn('rating',F.when(F.col('salary') > 70000, 'Excellent').when(F.col('salary') > 55000, 'Good').otherwise('Average')
)# 字符串操作
df_spark = df_spark.withColumn('name_upper', F.upper('name'))
df_spark = df_spark.withColumn('name_length', F.length('name'))# 多列同时处理
df_spark = df_spark.withColumn('total_comp', F.col('salary') * 1.1)df_spark.show()

4. 数据去重

Pandas
# 删除完全重复的行
df_unique = df_pandas.drop_duplicates()# 基于特定列去重
df_unique = df_pandas.drop_duplicates(subset=['department'])# 保留最后一个重复项
df_unique = df_pandas.drop_duplicates(subset=['department'], keep='last')# 保留第一个
df_unique = df_pandas.drop_duplicates(subset=['department'], keep='first')# 标记重复项
df_pandas['is_duplicate'] = df_pandas.duplicated(subset=['department'])
Spark
# 删除完全重复的行
df_unique = df_spark.dropDuplicates()# 基于特定列去重
df_unique = df_spark.dropDuplicates(['department'])# Spark 默认保留第一个,无法像 Pandas 那样选择保留哪一个
# 但可以通过排序后去重来控制
df_unique = df_spark.orderBy('salary', ascending=False) \.dropDuplicates(['department'])df_unique.show()

事务处理

Pandas 与数据库事务

Pandas 本身不直接支持事务,但可以通过数据库连接实现事务操作。

import sqlite3
import pandas as pd# 创建数据库连接
conn = sqlite3.connect('example.db')try:# 开始事务(自动)cursor = conn.cursor()# 创建表cursor.execute('''CREATE TABLE IF NOT EXISTS employees (id INTEGER PRIMARY KEY,name TEXT,salary REAL,department TEXT)''')# 插入数据df_pandas.to_sql('employees', conn, if_exists='replace', index=False)# 执行更新操作cursor.execute('''UPDATE employees SET salary = salary * 1.1 WHERE department = 'IT'''')# 提交事务conn.commit()print("事务提交成功")except Exception as e:# 回滚事务conn.rollback()print(f"事务回滚: {e}")finally:conn.close()

使用 SQLAlchemy 进行事务管理

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import pandas as pd# 创建引擎
engine = create_engine('sqlite:///example.db')
Session = sessionmaker(bind=engine)# 使用会话管理事务
session = Session()try:# 开始事务with session.begin():# 读取数据df = pd.read_sql('SELECT * FROM employees WHERE department = "IT"', session.connection())# 修改数据df['salary'] = df['salary'] * 1.1# 写回数据库df.to_sql('employees_backup', session.connection(), if_exists='replace', index=False)# 执行其他 SQL 操作session.execute(text('UPDATE employees SET updated_at = CURRENT_TIMESTAMP'))# 事务自动提交print("事务成功")except Exception as e:# 自动回滚print(f"事务失败: {e}")finally:session.close()

Spark 的事务支持

Spark 本身是一个分布式计算框架,不直接支持传统的 ACID 事务。但可以通过以下方式实现类似功能:

1. Delta Lake (推荐)
# 安装: pip install delta-sparkfrom delta import *# 配置 Spark 使用 Delta Lake
builder = SparkSession.builder \.appName("DeltaLakeExample") \.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")spark = configure_spark_with_delta_pip(builder).getOrCreate()# 写入 Delta 表
df_spark.write.format("delta").mode("overwrite").save("/path/to/delta-table")# 读取 Delta 表
delta_df = spark.read.format("delta").load("/path/to/delta-table")# 更新操作(类似事务)
from delta.tables import DeltaTabledelta_table = DeltaTable.forPath(spark, "/path/to/delta-table")# 条件更新
delta_table.update(condition = "department = 'IT'",set = {"salary": "salary * 1.1"}
)# 删除操作
delta_table.delete("age < 25")# 合并操作(UPSERT)
delta_table.alias("target").merge(source_df.alias("source"),"target.id = source.id"
).whenMatchedUpdate(set = {"salary": "source.salary"}
).whenNotMatchedInsert(values = {"id": "source.id","name": "source.name","salary": "source.salary"}
).execute()# 时间旅行(访问历史版本)
historical_df = spark.read.format("delta").option("versionAsOf", 0).load("/path/to/delta-table")
2. 使用 JDBC 的事务支持
# 写入数据库时使用事务
jdbc_url = "jdbc:postgresql://localhost:5432/mydb"
properties = {"user": "username","password": "password","driver": "org.postgresql.Driver"
}# Spark 会在批量写入时使用事务
df_spark.write \.jdbc(url=jdbc_url, table="employees", mode="append", properties=properties)# 读取时指定隔离级别
df = spark.read \.format("jdbc") \.option("url", jdbc_url) \.option("dbtable", "employees") \.option("user", "username") \.option("password", "password") \.option("isolationLevel", "READ_COMMITTED") \.load()
3. 使用检查点实现容错
# 设置检查点目录
spark.sparkContext.setCheckpointDir("/path/to/checkpoint")# 对 DataFrame 进行检查点
df_checkpoint = df_spark.checkpoint()# 这样可以在失败时从检查点恢复

窗口函数

窗口函数是高级 SQL 分析的核心,用于在不改变行数的情况下进行聚合计算。

Pandas 窗口函数

# 创建示例数据
sales_df = pd.DataFrame({'department': ['IT', 'IT', 'IT', 'HR', 'HR', 'Sales', 'Sales'],'employee': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', 'Frank', 'Grace'],'salary': [70000, 65000, 80000, 55000, 60000, 50000, 55000],'date': pd.date_range('2024-01-01', periods=7)
})# 1. 排名函数
# rank - 相同值获得相同排名,下一个排名会跳过
sales_df['rank'] = sales_df.groupby('department')['salary'].rank(method='min', ascending=False)# dense_rank - 相同值获得相同排名,下一个排名不跳过
sales_df['dense_rank'] = sales_df.groupby('department')['salary'].rank(method='dense', ascending=False)# row_number - 每行唯一编号
sales_df['row_number'] = sales_df.groupby('department').cumcount() + 1print("排名函数结果:")
print(sales_df[['department', 'employee', 'salary', 'rank', 'dense_rank', 'row_number']])# 2. 聚合窗口函数
# 计算部门内的累计和
sales_df['cumsum'] = sales_df.groupby('department')['salary'].cumsum()# 计算部门内的累计平均
sales_df['cumavg'] = sales_df.groupby('department')['salary'].expanding().mean().reset_index(level=0, drop=True)# 计算部门平均工资(每行都显示)
sales_df['dept_avg'] = sales_df.groupby('department')['salary'].transform('mean')# 计算部门最高工资
sales_df['dept_max'] = sales_df.groupby('department')['salary'].transform('max')# 与部门平均的差值
sales_df['diff_from_avg'] = sales_df['salary'] - sales_df['dept_avg']print("\n聚合窗口函数结果:")
print(sales_df[['department', 'employee', 'salary', 'dept_avg', 'dept_max', 'diff_from_avg']])# 3. 移动窗口(滚动计算)
# 按日期排序
sales_df = sales_df.sort_values('date')# 3天移动平均
sales_df['moving_avg_3'] = sales_df['salary'].rolling(window=3).mean()# 3天移动总和
sales_df['moving_sum_3'] = sales_df['salary'].rolling(window=3).sum()print("\n移动窗口结果:")
print(sales_df[['date', 'employee', 'salary', 'moving_avg_3', 'moving_sum_3']])# 4. 偏移函数
# 前一行的工资
sales_df['prev_salary'] = sales_df.groupby('department')['salary'].shift(1)# 后一行的工资
sales_df['next_salary'] = sales_df.groupby('department')['salary'].shift(-1)# 与前一行的差值
sales_df['salary_change'] = sales_df['salary'] - sales_df['prev_salary']print("\n偏移函数结果:")
print(sales_df[['department', 'employee', 'salary', 'prev_salary', 'salary_change']])# 5. 分位数和百分位
# 计算部门内的工资百分位排名
sales_df['percentile'] = sales_df.groupby('department')['salary'].rank(pct=True)print("\n百分位结果:")
print(sales_df[['department', 'employee', 'salary', 'percentile']])# 6. 复杂示例:找出每个部门工资最高的前2名
top_earners = sales_df[sales_df['dense_rank'] <= 2].sort_values(['department', 'salary'], ascending=[True, False])
print("\n每个部门工资最高的前2名:")
print(top_earners[['department', 'employee', 'salary', 'dense_rank']])

Spark 窗口函数

from pyspark.sql import Window
from pyspark.sql import functions as F# 创建示例数据
sales_spark = spark.createDataFrame([('IT', 'Alice', 70000, '2024-01-01'),('IT', 'Bob', 65000, '2024-01-02'),('IT', 'Charlie', 80000, '2024-01-03'),('HR', 'David', 55000, '2024-01-04'),('HR', 'Eve', 60000, '2024-01-05'),('Sales', 'Frank', 50000, '2024-01-06'),('Sales', 'Grace', 55000, '2024-01-07')
], ['department', 'employee', 'salary', 'date'])# 1. 定义窗口规范
# 按部门分区,按工资降序排列
window_dept = Window.partitionBy('department').orderBy(F.col('salary').desc())# 按部门分区,不排序(用于聚合)
window_dept_unordered = Window.partitionBy('department')# 2. 排名函数
sales_with_rank = sales_spark \.withColumn('rank', F.rank().over(window_dept)) \.withColumn('dense_rank', F.dense_rank().over(window_dept)) \.withColumn('row_number', F.row_number().over(window_dept))print("排名函数结果:")
sales_with_rank.orderBy('department', 'rank').show()# 3. 聚合窗口函数
sales_with_agg = sales_spark \.withColumn('dept_avg', F.avg('salary').over(window_dept_unordered)) \.withColumn('dept_max', F.max('salary').over(window_dept_unordered)) \.withColumn('dept_min', F.min('salary').over(window_dept_unordered)) \.withColumn('dept_count', F.count('salary').over(window_dept_unordered)) \.withColumn('diff_from_avg', F.col('salary') - F.col('dept_avg'))print("\n聚合窗口函数结果:")
sales_with_agg.show()# 4. 累计聚合
window_dept_rows = Window.partitionBy('department').orderBy('salary').rowsBetween(Window.unboundedPreceding, Window.currentRow)sales_with_cumsum = sales_spark \.withColumn('cumsum', F.sum('salary').over(window_dept_rows)) \.withColumn('cumavg', F.avg('salary').over(window_dept_rows))print("\n累计聚合结果:")
sales_with_cumsum.show()# 5. 移动窗口
# 定义3行移动窗口(当前行及前2行)
window_moving = Window.orderBy('date').rowsBetween(-2, 0)sales_with_moving = sales_spark \.withColumn('moving_avg_3', F.avg('salary').over(window_moving)) \.withColumn('moving_sum_3', F.sum('salary').over(window_moving))print("\n移动窗口结果:")
sales_with_moving.show()# 6. 偏移函数
window_dept_ordered = Window.partitionBy('department').orderBy('salary')sales_with_lag = sales_spark \.withColumn('prev_salary', F.lag('salary', 1).over(window_dept_ordered)) \.withColumn('next_salary', F.lead('salary', 1).over(window_dept_ordered)) \.withColumn('salary_change', F.col('salary') - F.col('prev_salary'))print("\n偏移函数结果:")
sales_with_lag.show()# 7. First 和 Last 值
sales_with_first_last = sales_spark \.withColumn('first_in_dept', F.first('salary').over(window_dept)) \.withColumn('last_in_dept', F.last('salary').over(window_dept))print("\nFirst 和 Last 值:")
sales_with_first_last.show()# 8. 复杂示例:找出每个部门工资最高的前2名
top_earners = sales_spark \.withColumn('rank', F.dense_rank().over(window_dept)) \.filter(F.col('rank') <= 2) \.orderBy('department', F.col('salary').desc())print("\n每个部门工资最高的前2名:")
top_earners.show()# 9. 百分位排名
sales_with_percentile = sales_spark \.withColumn('percent_rank', F.percent_rank().over(window_dept)) \.withColumn('cume_dist', F.cume_dist().over(window_dept))print("\n百分位排名:")
sales_with_percentile.show()# 10. 范围窗口(基于值的范围)
# 定义工资范围窗口(当前工资 ± 5000)
window_range = Window.partitionBy('department').orderBy('salary').rangeBetween(-5000, 5000)sales_with_range = sales_spark \.withColumn('salary_range_avg', F.avg('salary').over(window_range))print("\n范围窗口:")
sales_with_range.show()

窗口函数高级应用

# 示例:计算连续增长天数
time_series = spark.createDataFrame([('2024-01-01', 100),('2024-01-02', 110),('2024-01-03', 105),('2024-01-04', 120),('2024-01-05', 125),('2024-01-06', 130)
], ['date', 'value'])window_time = Window.orderBy('date')result = time_series \.withColumn('prev_value', F.lag('value').over(window_time)) \.withColumn('is_increase', F.when(F.col('value') > F.col('prev_value'), 1).otherwise(0)) \.withColumn('group', F.sum(F.when(F.col('is_increase') == 0, 1).otherwise(0)).over(window_time)) \.withColumn('consecutive_days', F.row_number().over(Window.partitionBy('group').orderBy('date')))result.show()

子查询

Pandas 子查询

Pandas 不直接支持 SQL 式的子查询,但可以通过多步操作实现相同效果。

# 创建示例数据
employees = pd.DataFrame({'emp_id': [1, 2, 3, 4, 5],'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],'department': ['IT', 'HR', 'IT', 'Sales', 'HR'],'salary': [70000, 55000, 80000, 50000, 60000]
})# 1. 标量子查询:找出工资高于平均工资的员工
avg_salary = employees['salary'].mean()
high_earners = employees[employees['salary'] > avg_salary]
print("工资高于平均值的员工:")
print(high_earners)# 2. IN 子查询:找出在特定部门的员工
it_dept = employees[employees['department'] == 'IT']['emp_id'].tolist()
it_employees = employees[employees['emp_id'].isin(it_dept)]
print("\nIT 部门员工:")
print(it_employees)# 3. 相关子查询:找出每个部门工资高于部门平均的员工
dept_avg = employees.groupby('department')['salary'].transform('mean')
above_dept_avg = employees[employees['salary'] > dept_avg]
print("\n工资高于部门平均的员工:")
print(above_dept_avg)# 4. EXISTS 子查询模拟:找出有员工的部门
departments_with_employees = employees['department'].unique()
print("\n有员工的部门:", departments_with_employees)# 5. 复杂子查询:找出工资排名前2的员工所在的部门的所有员工
# 第一步:找出工资排名前2的员工
employees['salary_rank'] = employees['salary'].rank(ascending=False, method='dense')
top_employees = employees[employees['salary_rank'] <= 2]# 第二步:找出这些员工所在的部门
top_departments = top_employees['department'].unique()# 第三步:找出这些部门的所有员工
result = employees[employees['department'].isin(top_departments)]
print("\n工资排名前2的员工所在部门的所有员工:")
print(result)# 6. 使用 merge 实现子查询
# 找出每个部门工资最高的员工
max_salary_by_dept = employees.groupby('department')['salary'].max().reset_index()
max_salary_by_dept.columns = ['department', 'max_salary']# 与原表合并
top_paid = pd.merge(employees,max_salary_by_dept,left_on=['department', 'salary'],right_on=['department', 'max_salary']
)
print("\n每个部门工资最高的员工:")
print(top_paid[['emp_id', 'name', 'department', 'salary']])

Spark SQL 子查询

Spark SQL 完全支持标准 SQL 子查询。

# 创建示例数据并注册为临时表
employees_spark = spark.createDataFrame([(1, 'Alice', 'IT', 70000),(2, 'Bob', 'HR', 55000),(3, 'Charlie', 'IT', 80000),(4, 'David', 'Sales', 50000),(5, 'Eve', 'HR', 60000)
], ['emp_id', 'name', 'department', 'salary'])employees_spark.createOrReplaceTempView('employees')# 1. 标量子查询
query1 = """
SELECT *
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees)
"""
result1 = spark.sql(query1)
print("工资高于平均值的员工:")
result1.show()# 2. IN 子查询
query2 = """
SELECT *
FROM employees
WHERE department IN (SELECT departmentFROM employeesWHERE salary > 65000
)
"""
result2 = spark.sql(query2)
print("\n有高薪员工的部门的所有员工:")
result2.show()# 3. 相关子查询
query3 = """
SELECT e1.*
FROM employees e1
WHERE e1.salary > (SELECT AVG(e2.salary)FROM employees e2WHERE e2.department = e1.department
)
"""
result3 = spark.sql(query3)
print("\n工资高于部门平均的员工:")
result3.show()# 4. EXISTS 子查询
# 创建项目表
projects = spark.createDataFrame([(1, 'Project A', 1),(2, 'Project B', 3),(3, 'Project C', 1)
], ['project_id', 'project_name', 'emp_id'])projects.createOrReplaceTempView('projects')query4 = """
SELECT *
FROM employees e
WHERE EXISTS (SELECT 1FROM projects pWHERE p.emp_id = e.emp_id
)
"""
result4 = spark.sql(query4)
print("\n有项目的员工:")
result4.show()# 5. NOT EXISTS
query5 = """
SELECT *
FROM employees e
WHERE NOT EXISTS (SELECT 1FROM projects pWHERE p.emp_id = e.emp_id
)
"""
result5 = spark.sql(query5)
print("\n没有项目的员工:")
result5.show()# 6. FROM 子句中的子查询
query6 = """
SELECT department, avg_salary
FROM (SELECT department, AVG(salary) as avg_salaryFROM employeesGROUP BY department
) dept_stats
WHERE avg_salary > 60000
"""
result6 = spark.sql(query6)
print("\n平均工资超过60000的部门:")
result6.show()# 7. WITH 子句(CTE - Common Table Expression)
query7 = """
WITH dept_avg AS (SELECT department, AVG(salary) as avg_salaryFROM employeesGROUP BY department
),
high_salary_depts AS (SELECT departmentFROM dept_avgWHERE avg_salary > 60000
)
SELECT e.*
FROM employees e
JOIN high_salary_depts h ON e.department = h.department
"""
result7 = spark.sql(query7)
print("\n高薪部门的所有员工:")
result7.show()# 8. 多层嵌套子查询
query8 = """
SELECT *
FROM employees
WHERE department IN (SELECT departmentFROM (SELECT department, MAX(salary) as max_salFROM employeesGROUP BY department) dept_maxWHERE max_sal > 70000
)
"""
result8 = spark.sql(query8)
print("\n最高工资超过70000的部门的所有员工:")
result8.show()

使用 DataFrame API 实现子查询

from pyspark.sql import functions as F# 1. 标量子查询
avg_salary = employees_spark.agg(F.avg('salary')).collect()[0][0]
result1 = employees_spark.filter(F.col('salary') > avg_salary)
print("工资高于平均值的员工:")
result1.show()# 2. IN 子查询
high_salary_depts = employees_spark \.filter(F.col('salary') > 65000) \.select('department') \.distinct()result2 = employees_spark.join(high_salary_depts, 'department', 'inner')
print("\n有高薪员工的部门的所有员工:")
result2.show()# 3. 相关子查询(使用窗口函数)
window_dept = Window.partitionBy('department')
result3 = employees_spark \.withColumn('dept_avg', F.avg('salary').over(window_dept)) \.filter(F.col('salary') > F.col('dept_avg')) \.drop('dept_avg')print("\n工资高于部门平均的员工:")
result3.show()# 4. EXISTS - 使用 semi join
result4 = employees_spark.join(projects,employees_spark.emp_id == projects.emp_id,'left_semi'
)
print("\n有项目的员工:")
result4.show()# 5. NOT EXISTS - 使用 anti join
result5 = employees_spark.join(projects,employees_spark.emp_id == projects.emp_id,'left_anti'
)
print("\n没有项目的员工:")
result5.show()# 6. 子查询作为数据源
dept_stats = employees_spark \.groupBy('department') \.agg(F.avg('salary').alias('avg_salary'))result6 = dept_stats.filter(F.col('avg_salary') > 60000)
print("\n平均工资超过60000的部门:")
result6.show()

性能优化与最佳实践

Pandas 性能优化

import pandas as pd
import numpy as np
import time# 1. 使用向量化操作而不是循环
# 慢方法:使用循环
def slow_method(df):result = []for i in range(len(df)):result.append(df.iloc[i]['value'] * 2)return result# 快方法:向量化
def fast_method(df):return df['value'] * 2# 2. 使用 apply 优化
# 对于复杂操作,使用 apply 而不是 iterrows
def process_row(row):return row['a'] + row['b'] * 2# 较慢
# for idx, row in df.iterrows():
#     result = process_row(row)# 较快
result = df.apply(process_row, axis=1)# 3. 使用合适的数据类型
# 减少内存使用
df['int_col'] = df['int_col'].astype('int32')  # 而不是 int64
df['category_col'] = df['category_col'].astype('category')  # 对于重复值多的列# 4. 使用 query 方法(对于大型 DataFrame 更快)
# 而不是:df[(df['a'] > 5) & (df['b'] < 10)]
result = df.query('a > 5 and b < 10')# 5. 避免链式索引
# 慢方法
# df[df['col'] > 0]['col2'] = value# 快方法
df.loc[df['col'] > 0, 'col2'] = value# 6. 使用 inplace 参数(但要谨慎)
df.drop_duplicates(inplace=True)  # 避免创建副本# 7. 使用 nlargest 和 nsmallest
# 而不是 sort_values().head()
top_5 = df.nlargest(5, 'salary')# 8. 批量读取大文件
# 分块读取
chunk_size = 10000
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):process(chunk)# 9. 使用 eval 进行复杂计算
df.eval('new_col = col1 + col2 * col3', inplace=True)# 10. 并行处理
from multiprocessing import Pooldef process_chunk(chunk):# 处理逻辑return chunk# 分割数据
chunks = np.array_split(df, 4)# 并行处理
with Pool(4) as pool:results = pool.map(process_chunk, chunks)final_result = pd.concat(results)

Spark 性能优化

from pyspark.sql import functions as F
from pyspark.sql.types import *# 1. 缓存频繁使用的 DataFrame
df_cached = df_spark.cache()  # 或 .persist()
df_cached.count()  # 触发缓存# 使用完毕后释放
df_cached.unpersist()# 2. 广播小表(Broadcast Join)
from pyspark.sql.functions import broadcast# 当一个表很小时,广播它以避免 shuffle
result = large_df.join(broadcast(small_df), 'key')# 3. 分区优化
# 重新分区
df_repartitioned = df_spark.repartition(10)  # 增加分区# 合并分区
df_coalesced = df_spark.coalesce(5)  # 减少分区,避免 shuffle# 按列分区
df_partitioned = df_spark.repartition('department')# 4. 避免 UDF,使用内置函数
# 慢方法:UDF
from pyspark.sql.types import IntegerTypedef add_one(x):return x + 1add_one_udf = F.udf(add_one, IntegerType())
df_slow = df_spark.withColumn('new_col', add_one_udf('col'))# 快方法:内置函数
df_fast = df_spark.withColumn('new_col', F.col('col') + 1)# 5. 使用 Pandas UDF(向量化 UDF)
from pyspark.sql.functions import pandas_udf@pandas_udf(IntegerType())
def pandas_add_one(s: pd.Series) -> pd.Series:return s + 1df_faster = df_spark.withColumn('new_col', pandas_add_one('col'))# 6. 优化 Join 操作
# 指定 join 类型
result = df1.join(df2, 'key', 'inner')  # 明确指定 join 类型# 避免交叉连接
# 错误:会产生笛卡尔积
# df1.join(df2)# 正确
df1.join(df2, df1.id == df2.id)# 7. 使用列式存储格式
# 写入 Parquet(推荐)
df_spark.write.parquet('output.parquet', mode='overwrite', compression='snappy')# 读取 Parquet
df_parquet = spark.read.parquet('output.parquet')# 8. 数据倾斜处理
# 加盐技术
from pyspark.sql.functions import rand, concat, lit# 为倾斜的 key 添加随机后缀
df_salted = df_spark.withColumn('salted_key', concat(F.col('key'), lit('_'), (rand() * 10).cast('int')))# 9. 过滤下推
# 先过滤再处理
df_filtered = df_spark.filter(F.col('date') > '2024-01-01')  # 早期过滤# 10. 使用 explain 分析查询计划
df_spark.explain(True)  # 查看物理计划# 11. 配置优化
spark.conf.set("spark.sql.adaptive.enabled", "true")  # 启用自适应查询执行
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)  # 10MB# 12. 避免频繁的 collect()
# 慢方法
# for row in df_spark.collect():
#     process(row)# 快方法:在 Spark 端处理
df_spark.foreach(lambda row: process(row))# 13. 使用 checkpoint 打断血统
df_spark.checkpoint()  # 保存中间结果# 14. 优化窗口函数
# 限制窗口大小
window = Window.partitionBy('key').orderBy('date').rowsBetween(-7, 0)
df_window = df_spark.withColumn('rolling_avg', F.avg('value').over(window))

通用最佳实践

# 1. 数据验证
def validate_data(df, is_spark=False):"""验证数据质量"""if is_spark:# Spark 验证null_counts = df.select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in df.columns])null_counts.show()# 重复值检查total = df.count()unique = df.dropDuplicates().count()print(f"Total: {total}, Unique: {unique}, Duplicates: {total - unique}")else:# Pandas 验证print("缺失值:")print(df.isnull().sum())print("\n重复值:")print(f"重复行数: {df.duplicated().sum()}")# 2. 数据质量检查
def data_quality_check(df, is_spark=False):"""数据质量检查"""if is_spark:# 基本统计df.describe().show()# 值分布df.groupBy('category_col').count().show()else:# Pandas 检查print(df.describe())print(df['category_col'].value_counts())# 3. 错误处理
def safe_division(df, col1, col2, is_spark=False):"""安全除法"""if is_spark:return df.withColumn('result',F.when(F.col(col2) != 0, F.col(col1) / F.col(col2)).otherwise(None))else:df['result'] = df[col1] / df[col2].replace(0, np.nan)return df# 4. 日志记录
import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)def process_with_logging(df):logger.info(f"开始处理,行数: {len(df) if hasattr(df, '__len__') else df.count()}")# 处理逻辑result = df  # 处理...logger.info("处理完成")return result

常见问题与陷阱

Pandas 常见问题

# 1. SettingWithCopyWarning
# 错误方式
subset = df[df['col'] > 0]
subset['new_col'] = value  # 可能产生警告# 正确方式
subset = df[df['col'] > 0].copy()
subset['new_col'] = value# 或使用 loc
df.loc[df['col'] > 0, 'new_col'] = value# 2. 链式索引问题
# 错误
# df[df['A'] > 0]['B'] = value# 正确
df.loc[df['A'] > 0, 'B'] = value# 3. inplace 参数的陷阱
# inplace=True 不一定更快,有时反而更慢
df.drop_duplicates(inplace=True)  # 修改原 DataFrame
df_new = df.drop_duplicates()  # 创建新 DataFrame(有时更好)# 4. 内存问题
# 删除不需要的列
df = df[['needed_col1', 'needed_col2']]# 优化数据类型
df['int_col'] = df['int_col'].astype('int32')
df['category'] = df['category'].astype('category')# 5. merge 的陷阱
# 忘记指定 how 参数
result = pd.merge(df1, df2, on='key')  # 默认 inner join# 应该明确指定
result = pd.merge(df1, df2, on='key', how='left')# 6. 日期时间处理
# 错误:字符串比较
df[df['date'] > '2024-01-01']  # 如果 date 是字符串会出问题# 正确:转换为 datetime
df['date'] = pd.to_datetime(df['date'])
df[df['date'] > pd.Timestamp('2024-01-01')]# 7. groupby 后的列访问
# 多级索引问题
grouped = df.groupby('key').agg({'value': ['mean', 'sum']})
# 列名变成了多级索引# 解决方法
grouped.columns = ['_'.join(col).strip() for col in grouped.columns.values]

Spark 常见问题

# 1. 延迟执行(Lazy Evaluation)
df_result = df_spark.filter(F.col('col') > 0)
# 此时还没有真正执行# 需要 action 才会触发执行
df_result.show()  # 或 .count(), .collect() 等# 2. collect() 的危险
# 错误:可能导致 OOM
# all_data = df_spark.collect()  # 如果数据很大会崩溃# 正确:使用 take 或 limit
sample = df_spark.limit(100).collect()# 3. UDF 性能问题
# UDF 很慢,优先使用内置函数
# 如果必须使用 UDF,考虑 Pandas UDF@pandas_udf(DoubleType())
def better_udf(s: pd.Series) -> pd.Series:return s * 2  # 向量化操作# 4. 分区问题
# 分区太少:无法充分利用集群
# 分区太多:调度开销大# 经验法则:分区数 = 2-3 * 核心数
optimal_partitions = spark.sparkContext.defaultParallelism * 2
df_spark = df_spark.repartition(optimal_partitions)# 5. 数据倾斜
# 检测倾斜
df_spark.groupBy('key').count().orderBy(F.desc('count')).show()# 解决方法:加盐
df_salted = df_spark.withColumn('salt', (F.rand() * 10).cast('int'))
df_salted = df_salted.withColumn('salted_key', F.concat(F.col('key'), F.lit('_'), F.col('salt')))# 6. Shuffle 问题
# Join、groupBy 等操作会触发 shuffle
# 尽量减少 shuffle# 使用 broadcast join
small_df_broadcast = F.broadcast(small_df)
result = large_df.join(small_df_broadcast, 'key')# 7. 缓存使用不当
# 缓存太多会占用大量内存
df1.cache()
df2.cache()  # 如果不常用,不要缓存# 使用完后要释放
df1.unpersist()# 8. Schema 推断问题
# 对大文件进行 schema 推断很慢
# 明确指定 schemaschema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("value", DoubleType(), True)
])df = spark.read.csv('large_file.csv', schema=schema, header=True)# 9. 空值处理
# 某些操作对 null 敏感
df_spark.na.drop()  # 删除任何包含 null 的行
df_spark.na.fill(0)  # 用 0 填充 null# 10. 时区问题
# Spark 中的时间戳可能有时区问题
df_spark = df_spark.withColumn('timestamp_utc',F.to_utc_timestamp('timestamp_col', 'America/Los_Angeles')
)

调试技巧

# Pandas 调试
import pandas as pd# 1. 设置显示选项
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)# 2. 逐步检查
print(df.shape)
print(df.dtypes)
print(df.head())
print(df.isnull().sum())# 3. 使用 pipe 进行链式调试
def debug_df(df, msg=""):print(f"{msg}: shape={df.shape}")return dfresult = (df.pipe(debug_df, "原始数据").dropna().pipe(debug_df, "删除空值后").drop_duplicates().pipe(debug_df, "去重后"))# Spark 调试
from pyspark.sql import DataFrame# 1. 查看执行计划
df_spark.explain(True)# 2. 查看物理计划
df_spark.explain('formatted')# 3. 采样调试
df_sample = df_spark.sample(0.01)  # 1% 采样
df_sample.show()# 4. 检查分区
print(f"分区数: {df_spark.rdd.getNumPartitions()}")# 5. 逐步执行
df1 = df_spark.filter(F.col('col') > 0)
print(f"过滤后: {df1.count()}")df2 = df1.select('col1', 'col2')
print(f"选择列后: {df2.count()}")# 6. Web UI 监控
# 访问 http://localhost:4040 查看 Spark UI

总结

何时使用 Pandas

  • 数据量小于可用内存
  • 单机处理足够
  • 需要快速原型开发
  • 数据探索和分析
  • 丰富的数据操作功能

何时使用 Spark

  • 大规模数据处理(TB 级)
  • 需要分布式计算
  • 数据分散在多个节点
  • ETL 流程处理
  • 流式数据处理
  • 需要与 Hadoop 生态集成

性能对比参考

操作PandasSpark推荐
小数据(<1GB)慢(启动开销)Pandas
大数据(>10GB)慢/不可能Spark
复杂聚合Spark
迭代操作Pandas
Join 操作快(大表)视情况

学习路径建议

  1. 基础阶段:掌握 Pandas 的基本操作
  2. 进阶阶段:学习窗口函数、复杂聚合
  3. 高级阶段:掌握 Spark 分布式处理
  4. 优化阶段:性能调优和最佳实践

实战建议

  1. 先用小数据集在 Pandas 中开发和测试
  2. 验证逻辑正确后迁移到 Spark
  3. 使用相同的 SQL 思维处理两个框架
  4. 重视数据质量和错误处理
  5. 持续监控和优化性能

参考资源

官方文档

  • Pandas Documentation
  • PySpark Documentation
  • Spark SQL Guide

推荐书籍

  • “Python for Data Analysis” by Wes McKinney
  • “Learning Spark” by Holden Karau
  • “Spark: The Definitive Guide”

在线资源

  • Stack Overflow
  • Databricks Community Edition
  • Kaggle Datasets

http://www.dtcms.com/a/477882.html

相关文章:

  • 大数据实战项目-基于K-Means算法与Spark的豆瓣读书数据分析与可视化系统-基于python的豆瓣读书数据分析与可视化大屏
  • AI 数字人小程序功能全拆解:从用户体验到商业落地的产品设计逻辑
  • Agent 开发设计模式(Agentic Design Patterns )第 6 章:规划设计模式 Planning
  • 厦门做网站xm37如何增加网站内链建设
  • css变量的使用。
  • 全网首发 OpenAI Apps SDK 使用教程
  • mysql_page pagesize 如何实现游标分页?
  • Hive 拉链表
  • 集宁网站建设SEO优化安卓开发基础教程
  • C++友元函数和友元类!
  • Java面向对象编程深度解析:从对象思维到系统架构的艺术
  • 多制式基站综合测试线的架构与验证实践(4)
  • 洛阳制作网站ihanshi汉口网站建设制作
  • 2025年 Varjo XR-4 升级新品发布!首款专为陆、海、空领域战备训练打造的XR头显
  • 【XR硬件系列】AR眼镜的终极形态会是“普通眼镜”吗?技术瓶颈还有哪些?
  • 发布自己的 jar 包到 Maven 中央仓库 ( mvnrepository.com )
  • 页表 vs. 组相联缓存:内存管理与性能优化的殊途同归
  • 泉州专业建站品牌校园门户网站开发需求分析
  • 版本控制器之Git理论与实战
  • 注册网站时应注意什么域名注册后 免费自建网站
  • wpf passwordbox控件 光标移到最后
  • Linux wlan网络协议栈-路由框架详解
  • 廊坊安次区网站建设公司上海高登联合建设网站
  • 凡科网站手机投票怎么做wordpress vr主题
  • 【ElasticSearch】text 和 keyword 类型区分
  • vue3的组件通信方式汇总
  • PortSwigger靶场之将 XSS 存储到onclick带有尖括号和双引号 HTML 编码以及单引号和反斜杠转义的事件中通关秘籍
  • 哪些方法可以建设网站后台网站模板下载
  • 根据PID获取K8S-Pod名称-反之POD名称获取PID
  • 做网站三年3万块钱论坛搭建一键