当前位置: 首页 > news >正文

pyspark读取hive表中数据后进行lgb建模

本文使用上篇文章中生成的稀疏向量进行建模。

因from pyspark_lightgbm import LGBMClassifier和from synapse.ml.lightgbm import LightGBMClassifier在集群上均未安装,故使用原生lgb进行建模。(理论上前两者效率更优,可并行处理数据,而原生lgb只能单机处理)

# 网格寻参
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import sys
import time
import numpy as np
import lightgbm as lgb
import joblib
from scipy.sparse import csr_matrix
from sklearn.metrics import (roc_auc_score,average_precision_score,f1_score,precision_score,recall_score,accuracy_score,confusion_matrix
)
from sklearn.model_selection import train_test_split, GridSearchCV# 配置环境变量
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf","/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.driver.memory", "48g") \.config("spark.driver.maxResultSize", "16g") \.appName("test_djj") \.enableHiveSupport() \.getOrCreate()# 计时装饰器
def timeit(func):def wrapper(*args, **kwargs):start_time = time.time()print(f"开始: {func.__name__}...")result = func(*args, **kwargs)end_time = time.time()elapsed = end_time - start_timeprint(f"完成: {func.__name__} | 耗时: {elapsed:.2f}秒")return resultreturn wrapper# 1. 数据加载并转换为CSR矩阵
@timeit
def load_and_prepare_data():print("加载数据并转换为CSR矩阵...")# 直接读取已标记的正负样本(label=1为正样本,label=0为负样本)df = spark.sql("SELECT id, aggregated_vector, label FROM database.table WHERE label IN (0, 1)")# 收集数据到Driver节点print("收集数据到Driver节点...")pdf = df.select(F.col("aggregated_vector").alias("features"),F.col("label").alias("label")).toPandas()print("提取特征和标签...")# 准备CSR矩阵的数据row_indices = []col_indices = []data_values = []labels = []# 遍历所有样本,构建CSR矩阵for i, row in pdf.iterrows():vec = row['features']size = vec['size']indices = vec['indices']values = vec['values']label = row['label']for j in range(len(indices)):row_indices.append(i)col_indices.append(indices[j])data_values.append(values[j])labels.append(label)# 获取特征维度num_features = pdf["features"].iloc[0]['size']num_samples = len(pdf)# 创建CSR矩阵X = csr_matrix((data_values, (row_indices, col_indices)),shape=(num_samples, num_features))y = np.array(labels)print(f"数据矩阵形状: {X.shape}")return X, y# 2. 准备训练数据(直接使用已标记的正负样本)
@timeit
def prepare_training_data(X, y):print("准备训练数据...")# 计算样本权重positive_count = sum(y == 1)negative_count = sum(y == 0)total_count = len(y)print(f"正样本数量: {positive_count:,}")print(f"负样本数量: {negative_count:,}")print(f"总量: {total_count:,}")# 创建样本权重数组sample_weight = np.zeros(total_count)sample_weight[y == 1] = total_count / (2.0 * positive_count)sample_weight[y == 0] = total_count / (2.0 * negative_count)return X, y, sample_weight# 3. LightGBM模型训练(带精简网格寻参)
@timeit
def train_lightgbm_model(X_train, y_train, sample_weight):print("训练LightGBM模型(带精简网格寻参)...")# 定义基础模型base_model = lgb.LGBMClassifier(objective='binary',random_state=42,n_jobs=-1  # 使用所有CPU核心)# 精简参数网格 - 专注于最重要的参数param_grid = {'num_leaves': [31, 63],  # 控制模型复杂度'min_child_samples': [20, 50],  # 防止过拟合'reg_alpha': [0, 0.1],  # L1正则化'reg_lambda': [0, 0.1],  # L2正则化}# 创建网格搜索对象grid_search = GridSearchCV(estimator=base_model,param_grid=param_grid,scoring='roc_auc',cv=3,  # 3折交叉验证verbose=2,n_jobs=-1  # 使用所有CPU核心)# 执行网格搜索start_time = time.time()grid_search.fit(X_train, y_train, sample_weight=sample_weight)training_time = time.time() - start_time# 输出最佳参数print("最佳参数组合:")for param, value in grid_search.best_params_.items():print(f"{param}: {value}")print(f"最佳AUC分数: {grid_search.best_score_:.4f}")# 获取最佳模型best_model = grid_search.best_estimator_return best_model, training_time# 4. 模型评估
@timeit
def evaluate_model(model, X_test, y_test):print("模型评估...")# 使用模型进行预测start_time = time.time()y_pred_proba = model.predict_proba(X_test)[:, 1]  # 正类的概率y_pred = (y_pred_proba > 0.5).astype(int)prediction_time = time.time() - start_time# 计算评估指标auc = roc_auc_score(y_test, y_pred_proba)auprc = average_precision_score(y_test, y_pred_proba)f1 = f1_score(y_test, y_pred)precision = precision_score(y_test, y_pred)recall = recall_score(y_test, y_pred)accuracy = accuracy_score(y_test, y_pred)# 生成混淆矩阵cm = confusion_matrix(y_test, y_pred)tn, fp, fn, tp = cm.ravel()# 计算各类别指标precision_pos = tp / (tp + fp) if (tp + fp) > 0 else 0recall_pos = tp / (tp + fn) if (tp + fn) > 0 else 0f1_pos = 2 * (precision_pos * recall_pos) / (precision_pos + recall_pos) if (precision_pos + recall_pos) > 0 else 0precision_neg = tn / (tn + fn) if (tn + fn) > 0 else 0recall_neg = tn / (tn + fp) if (tn + fp) > 0 else 0f1_neg = 2 * (precision_neg * recall_neg) / (precision_neg + recall_neg) if (precision_neg + recall_neg) > 0 else 0# 生成分类报告(包含所有评估指标)report = f"""================= 分类报告 =================混淆矩阵:[[{tn} {fp}][{fn} {tp}]]正样本 (1):Precision: {precision_pos:.4f}Recall:    {recall_pos:.4f}F1 Score:  {f1_pos:.4f}负样本 (0):Precision: {precision_neg:.4f}Recall:    {recall_neg:.4f}F1 Score:  {f1_neg:.4f}整体评估指标:Accuracy:  {accuracy:.4f}AUC:       {auc:.4f}AUPRC:     {auprc:.4f}F1 Score:  {f1:.4f}Precision: {precision:.4f}Recall:    {recall:.4f}"""# 返回时间信息和报告return {"training_time": None,  # 将在主函数中设置"prediction_time": prediction_time}, report# 主函数
def main():# 1. 加载数据并转换为CSR矩阵X, y = load_and_prepare_data()# 2. 准备训练数据X, y, sample_weight = prepare_training_data(X, y)# 计算正负样本比例(用于scale_pos_weight参数)positive_count = sum(y == 1)negative_count = sum(y == 0)print(f"正负样本比例: {negative_count / positive_count:.2f}:1")# 3. 数据分割print("数据分割...")X_train, X_test, y_train, y_test, sample_weight_train, _ = train_test_split(X, y, sample_weight, test_size=0.2, random_state=42)print(f"训练集大小: {X_train.shape[0]}")print(f"测试集大小: {X_test.shape[0]}")# 4. 训练LightGBM模型print(f"\n{'=' * 50}")print("开始训练 LightGBM 模型(带精简网格寻参)")print(f"{'=' * 50}")model, training_time = train_lightgbm_model(X_train, y_train, sample_weight_train)# 5. 模型评估results, report = evaluate_model(model, X_test, y_test)results["training_time"] = training_time# 打印报告(包含所有评估指标)print(report)# 打印时间信息print(f"\n===== 时间统计 =====")print(f"训练时间: {results['training_time']:.2f}秒")print(f"预测时间: {results['prediction_time']:.2f}秒")print("建模流程完成!")spark.stop()# [Info] 最佳参数组合:# [Info] min_child_samples: 50# [Info] num_leaves: 63# [Info] reg_alpha: 0.1# [Info] reg_lambda: 0# [Info] 最佳AUC分数: 0.6631# [Info] 完成: train_lightgbm_model | 耗时: 15272.61秒# [Info] 模型保存成功!if __name__ == "__main__":main()


文章转载自:

http://JAVRxGHe.dpppx.cn
http://I1vCgDN5.dpppx.cn
http://C9sUoenu.dpppx.cn
http://sYvlraiG.dpppx.cn
http://1jWQ879n.dpppx.cn
http://xDpSKlwl.dpppx.cn
http://MbVzCVGu.dpppx.cn
http://2wj3Ee1Q.dpppx.cn
http://NAUHprgb.dpppx.cn
http://FuVXr4jr.dpppx.cn
http://ncdurZHK.dpppx.cn
http://4ZQqYHOS.dpppx.cn
http://jMYulFsq.dpppx.cn
http://BUWHdoQg.dpppx.cn
http://MtjaaTGz.dpppx.cn
http://iujV1xs5.dpppx.cn
http://pFYPzoVs.dpppx.cn
http://1SfH8JJY.dpppx.cn
http://v7i4PEL4.dpppx.cn
http://V6idIqHf.dpppx.cn
http://cXHnelTx.dpppx.cn
http://UkNzGl4E.dpppx.cn
http://5i1ArqZt.dpppx.cn
http://8D0jkFc1.dpppx.cn
http://1HLe7Aeo.dpppx.cn
http://j2RwJKAz.dpppx.cn
http://yPq1z9I8.dpppx.cn
http://682twK6q.dpppx.cn
http://Xmg9qPAj.dpppx.cn
http://Xar3FWxo.dpppx.cn
http://www.dtcms.com/a/376898.html

相关文章:

  • LeetCode 热题 42.接雨水(双指针写法)
  • 带你走进vue的响应式底层
  • 【算法--链表】117.填充每个节点的下一个右侧节点指针Ⅱ--通俗讲解
  • BFS与FloodFill算法简介与实战
  • 闭包面试题
  • el-table表头做过滤
  • LaTeX 中给单个/部分参考文献标记颜色(BibTeX 文献引用)
  • 深入探讨讲解MOS管工作原理-ASIM阿赛姆
  • 环境变量_进程地址空间
  • 文档抽取技术:革新合同管理,提升效率、准确性和智能化水平
  • 关于CSDN中图片无法粘贴的问题解决办法
  • 初始python
  • webshell上传方式
  • 图论2 图的数据结构表示
  • 09使用Python操作MySQL
  • 视频加水印,推荐使用运营大管家-视频批量加水印软件
  • Golang适配器模式详解
  • 【Linux】jar文件软链接和硬链接的操作区别
  • java控制台手动
  • Java入门级教程16——集合
  • docker桌面版 镜像配置
  • JVM 全面详解:深入理解 Java 的核心运行机制
  • JVM分代收集:原理与调优策略
  • 使用.NET标准库实现多任务并行处理的详细过程
  • 软件测试:功能测试详解
  • 数字图像处理-图像编码
  • 基于RDMA 通信的可负载均衡高性能服务架构
  • java多线程场景3-并发处理和异步请求
  • <uniapp><指针组件>基于uniapp,编写一个自定义箭头指针组件
  • 新手向:中文语言识别的进化之路