PySpark 使用pyarrow指定版本
背景说明
在 PySpark 3.1.3 环境中,当需要使用与集群环境不同版本的 PyArrow (如 1.0.0 版本)时,可以通过以下方法实现,而无需更改集群环境配置
完整操作说明
- 去pyarrow·PyPI下载对应版本的whl文件
- 后缀whl直接改成zip
- 解压后有两个文件夹,分别是pyarrow和pyarrow-1.0.0.dist-info
- 直接把那两个文件夹打包成pyarrow.zip
因为pyarrow里不是单纯的python代码,还有C扩展,所以不能用--py-files
参数,只能放在--archives
参数里
spark -submit spark-submit \--master yarn \--deploy-mode cluster \ --executor-memory 4G \--num-executors 10 \--archives /my_path/pyarrow-1.0.0.zip#pyarrow #必须添加的参数your_script.py
#pyarrow
表示在容器内解压到./pyarrow
目录
pyspark里要添加对应代码
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import pyspark.sql.types as T
import pandas as pd
import os, sys# 初始化Spark并配置Arrow支持
spark = SparkSession.builder.config("spark.sql.execution.arrow.pyspark.enabled", "true").getOrCreate()# 加载自定义PyArrow
**pyarrow_dir = os.path.join(os.getcwd(), "pyarrow") # 对应 --archives 中的解压目录
sys.path.insert(0, pyarrow_dir) # 添加到 Python 路径**import pyarrow
print("pyarrow version ", pyarrow.__version__) # 应显示 1.0.0# 定义大写转换UDF
@pandas_udf(T.StringType())
def uppercase(s: pd.Series) -> pd.Series:return s.str.upper()# 数据读取和处理
df = spark.read.load("my_path_tofile/*").select("version")
df.withColumn("hh", uppercase(F.col("version"))).show()