当前位置: 首页 > news >正文

PySpark中python环境打包和JAR包依赖

在 PySpark 中打包 Python 环境并调度到集群是处理依赖一致性的关键步骤。以下是完整的解决方案,包含环境打包、分发和配置方法:

一、环境打包方法

  1. 使用 Conda 打包环境
# 创建 Conda 环境
conda create -n pyspark_env python=3.8
conda activate pyspark_env
conda install numpy pandas pyarrow  # 安装所需依赖# 导出环境为 YAML 文件
conda env export > environment.yml# 打包环境(Linux/macOS)
conda-pack -n pyspark_env -o pyspark_env.tar.gz
  1. 使用 Virtualenv 打包环境
# 创建虚拟环境
python3 -m venv pyspark_env
source pyspark_env/bin/activate  # Linux/macOS
pyspark_env\Scripts\activate  # Windows# 安装依赖
pip install numpy pandas pyarrow# 打包环境(需使用第三方工具)
pip install virtualenv-pack
virtualenv-pack -f -o pyspark_env.tar.gz

二、分发环境到集群

方法 1:通过 --archives 参数上传
在提交作业时,使用 --archives 参数将打包的环境分发到所有节点:

# 将环境包上传到 HDFS,避免每次提交都重新传输:
hdfs dfs -put pyspark_env.tar.gz /path/in/hdfs/spark-submit \--master yarn \--deploy-mode cluster \--py-files helper.py\ # python依赖文件,比如第三方代码等--archives hdfs:///path/in/hdfs/pyspark_env.tar.gz#environment \your_script.py

三、配置 PySpark 使用打包环境

  1. 设置 Python 解释器路径
    在代码中指定 Executor 使用打包环境中的 Python:
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"  # 对应 --archives 指定的目录名
os.environ["PYSPARK_DRIVER_PYTHON"] = "./environment/bin/python"  # Cluster 模式需要,如果是client模式,driver_python配置本地python路径,比如/opt/conda/bin/python, 需注意本地python和集群打包python的版本一致from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PackagedEnvApp").getOrCreate()
  1. 编写pyspark脚本
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"from pyspark.sql import SparkSession
import pandas as pd  # 使用打包环境中的 pandasspark = SparkSession.builder.appName("PackagedEnvExample").getOrCreate()# 使用 pandas 处理数据
pdf = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
df = spark.createDataFrame(pdf)
df.show()spark.stop()
  1. 提交作业
spark-submit \--master yarn \--deploy-mode cluster \--archives pyspark_env.tar.gz#environment \example.py
  1. 配置优先级与运行模式详解
  • 配置优先级规则
    Spark 配置的优先级从高到低如下:

    SparkSession.build.config
    spark-submit --conf参数
    spark-defaults.conf
    系统默认值
  • 关键结论:

    • SparkSession.builder.config() 优先级最高,会覆盖其他配置
    • spark-submit 参数优先级次之
    • 特殊参数例外:–master 和 --deploy-mode 在 spark-submit 中具有最高优先级
  • deploy-mode 配置规则

    设置方式是否生效说明
    spark-submit✅ 总是生效命令行参数具有最终决定权
    SparkSession 代码❌ 当 spark-submit 指定时无效spark-submit 未指定,则代码配置生效
    spark-defaults⚠️ 最低优先级仅当其他方式未配置时生效

四、替代方案

  1. Docker 容器
    使用 Docker 打包完整环境,通过 Kubernetes 调度:
# Dockerfile 示例
FROM apache/spark-py:v3.3.2
RUN pip install pandas numpy# 构建并推送镜像
docker build -t my-spark-image:v1 .
docker push myregistry/my-spark-image:v1# 提交作业
spark-submit \--master k8s://https://kubernetes-host:port \--conf spark.kubernetes.container.image=myregistry/my-spark-image:v1 \...
  1. PySpark 内置依赖管理
    通过 --py-files 参数上传 Python 文件 / 包:
spark-submit \--py-files my_module.zip,another_dep.py \your_script.py

五、Pyspark调用Xgboost或者LightGBM

5.1 调用 XGBoost 模型

  1. 准备依赖
    下载 XGBoost 的 Spark 扩展 jar 包:可以从 XGBoost 的官方 GitHub 发布页面 或者 Maven 仓库下载与你使用的 XGBoost 和 Spark 版本兼容的xgboost4j-spark和xgboost4j的 jar 包。例如,如果你使用的是 Spark 3.3.0 和 XGBoost 1.6.2,可以下载对应的版本。
    下载其他依赖:确保scala-library等相关依赖也在合适的版本,因为xgboost4j-spark会依赖它们。
  2. 配置 Spark 提交参数
    在使用spark-submit提交作业时,通过–jars参数指定上述下载的 jar 包路径。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/xgboost4j-spark-1.6.2.jar,/path/to/xgboost4j-1.6.2.jar,/path/to/scala-library-2.12.10.jar \your_script.py

也可以将这些 jar 包上传到 HDFS,然后使用 HDFS 路径:

hdfs dfs -put /path/to/xgboost4j-spark-1.6.2.jar /lib/
hdfs dfs -put /path/to/xgboost4j-1.6.2.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/xgboost4j-spark-1.6.2.jar,hdfs:///lib/xgboost4j-1.6.2.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
  1. Python 代码示例
    在 Python 代码中,导入相关模块并使用 XGBoost 的 Spark 接口:
from pyspark.sql import SparkSession
from xgboost.spark import XGBoostClassifierspark = SparkSession.builder \.appName("XGBoostOnSpark") \.getOrCreate()# 假设data是一个包含特征和标签的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 创建XGBoost分类器
model = XGBoostClassifier(num_round=10, objective="binary:logistic")
# 拟合模型
model.fit(data, label_col=label_col, features_col=feature_cols)

5.2 调用 LightGBM 模型

  1. 准备依赖
    下载 LightGBM 的 Spark 扩展 jar 包:从 LightGBM 的官方 GitHub 发布页面或者 Maven 仓库获取lightgbm4j-spark相关的 jar 包,以及lightgbm4j的 jar 包。注意选择与你的 Spark 和 LightGBM 版本适配的版本。
    处理其他依赖:同样要保证scala-library等依赖的兼容性。
  2. 配置 Spark 提交参数
    和 XGBoost 类似,使用spark-submit时通过–jars参数指定 jar 包路径。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/lightgbm4j-spark-3.3.1.jar,/path/to/lightgbm4j-3.3.1.jar,/path/to/scala-library-2.12.10.jar \your_script.py

或者上传到 HDFS 后使用 HDFS 路径:

hdfs dfs -put /path/to/lightgbm4j-spark-3.3.1.jar /lib/
hdfs dfs -put /path/to/lightgbm4j-3.3.1.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/lightgbm4j-spark-3.3.1.jar,hdfs:///lib/lightgbm4j-3.3.1.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
  1. Python 代码示例
    在 Python 代码中,导入模块并使用 LightGBM 的 Spark 接口:
from pyspark.sql import SparkSession
from lightgbm4j.spark import LightGBMClassifierspark = SparkSession.builder \.appName("LightGBMOnSpark") \.getOrCreate()# 假设data是一个包含特征和标签的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 创建LightGBM分类器
params = {"objective": "binary","num_leaves": 31,"learning_rate": 0.05,"feature_fraction": 0.9
}
model = LightGBMClassifier(params=params, num_round=10)
# 拟合模型
model.fit(data, label_col=label_col, features_col=feature_cols)
http://www.dtcms.com/a/273689.html

相关文章:

  • spark3 streaming 读kafka写es
  • Google Benchmark 介绍和使用指南
  • 流批一体的“奥卡姆剃刀”:Apache Cloudberry 增量物化视图应用解析
  • CReFT-CAD 笔记 带标注工程图dxf,png数据集
  • 【EGSR2025】材质+扩散模型+神经网络相关论文整理随笔(四)
  • Jenkins 项目类型及配置项
  • FPGA实现SDI转LVDS视频发送,基于GTP+OSERDES2原语架构,提供工程源码和技术支持
  • 资源分享-FPS, 矩阵, 骨骼, 绘制, 自瞄, U3D, UE4逆向辅助实战视频教程
  • 飞算 JavaAI 深度体验:开启 Java 开发智能化新纪元
  • 【拓扑空间】示例及详解4
  • python的社区残障人士服务系统
  • 了解环网式 CAN 转光纤中继器
  • BPE(Byte Pair Encoding)分词算法
  • leetcode-hot100(283.移动零)
  • 政安晨【零基础玩转开源AI项目】ACE-Step —— 迈向音乐生成基础模型的重要一步:AI自动谱曲与自动演唱的免费开源框架部署实践
  • RLHF:人类反馈强化学习 | 对齐AI与人类价值观的核心引擎
  • python实现DoIP基本通信(收发报文)
  • 第十二章:网络编程
  • Typescript -字面量类型
  • Linux的基础I/O
  • 买小屏幕的时候注意避坑
  • [Java 17] 无模版动态生成 PDF:图片嵌入与动态表格渲染实战
  • Linux磁盘限速(Ubuntu24实测)
  • 算法学习笔记:17.蒙特卡洛算法 ——从原理到实战,涵盖 LeetCode 与考研 408 例题
  • cnpm exec v.s. npx
  • C语言常见面试知识点详解:从入门到精通
  • 亿级流量下的缓存架构设计:Redis+Caffeine多级缓存实战
  • Web安全 - 基于 SM2/SM4 的前后端国产加解密方案详解
  • Flutter优缺点
  • Java学习第三十二部分——异常