pyspark读取hive表中数据后进行lgb建模
本文使用上篇文章中生成的稀疏向量进行建模。
因from pyspark_lightgbm import LGBMClassifier和from synapse.ml.lightgbm import LightGBMClassifier在集群上均未安装,故使用原生lgb进行建模。(理论上前两者效率更优,可并行处理数据,而原生lgb只能单机处理)
# 网格寻参
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import sys
import time
import numpy as np
import lightgbm as lgb
import joblib
from scipy.sparse import csr_matrix
from sklearn.metrics import (roc_auc_score,average_precision_score,f1_score,precision_score,recall_score,accuracy_score,confusion_matrix
)
from sklearn.model_selection import train_test_split, GridSearchCV# 配置环境变量
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf","/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.driver.memory", "48g") \.config("spark.driver.maxResultSize", "16g") \.appName("test_djj") \.enableHiveSupport() \.getOrCreate()# 计时装饰器
def timeit(func):def wrapper(*args, **kwargs):start_time = time.time()print(f"开始: {func.__name__}...")result = func(*args, **kwargs)end_time = time.time()elapsed = end_time - start_timeprint(f"完成: {func.__name__} | 耗时: {elapsed:.2f}秒")return resultreturn wrapper# 1. 数据加载并转换为CSR矩阵
@timeit
def load_and_prepare_data():print("加载数据并转换为CSR矩阵...")# 直接读取已标记的正负样本(label=1为正样本,label=0为负样本)df = spark.sql("SELECT id, aggregated_vector, label FROM database.table WHERE label IN (0, 1)")# 收集数据到Driver节点print("收集数据到Driver节点...")pdf = df.select(F.col("aggregated_vector").alias("features"),F.col("label").alias("label")).toPandas()print("提取特征和标签...")# 准备CSR矩阵的数据row_indices = []col_indices = []data_values = []labels = []# 遍历所有样本,构建CSR矩阵for i, row in pdf.iterrows():vec = row['features']size = vec['size']indices = vec['indices']values = vec['values']label = row['label']for j in range(len(indices)):row_indices.append(i)col_indices.append(indices[j])data_values.append(values[j])labels.append(label)# 获取特征维度num_features = pdf["features"].iloc[0]['size']num_samples = len(pdf)# 创建CSR矩阵X = csr_matrix((data_values, (row_indices, col_indices)),shape=(num_samples, num_features))y = np.array(labels)print(f"数据矩阵形状: {X.shape}")return X, y# 2. 准备训练数据(直接使用已标记的正负样本)
@timeit
def prepare_training_data(X, y):print("准备训练数据...")# 计算样本权重positive_count = sum(y == 1)negative_count = sum(y == 0)total_count = len(y)print(f"正样本数量: {positive_count:,}")print(f"负样本数量: {negative_count:,}")print(f"总量: {total_count:,}")# 创建样本权重数组sample_weight = np.zeros(total_count)sample_weight[y == 1] = total_count / (2.0 * positive_count)sample_weight[y == 0] = total_count / (2.0 * negative_count)return X, y, sample_weight# 3. LightGBM模型训练(带精简网格寻参)
@timeit
def train_lightgbm_model(X_train, y_train, sample_weight):print("训练LightGBM模型(带精简网格寻参)...")# 定义基础模型base_model = lgb.LGBMClassifier(objective='binary',random_state=42,n_jobs=-1 # 使用所有CPU核心)# 精简参数网格 - 专注于最重要的参数param_grid = {'num_leaves': [31, 63], # 控制模型复杂度'min_child_samples': [20, 50], # 防止过拟合'reg_alpha': [0, 0.1], # L1正则化'reg_lambda': [0, 0.1], # L2正则化}# 创建网格搜索对象grid_search = GridSearchCV(estimator=base_model,param_grid=param_grid,scoring='roc_auc',cv=3, # 3折交叉验证verbose=2,n_jobs=-1 # 使用所有CPU核心)# 执行网格搜索start_time = time.time()grid_search.fit(X_train, y_train, sample_weight=sample_weight)training_time = time.time() - start_time# 输出最佳参数print("最佳参数组合:")for param, value in grid_search.best_params_.items():print(f"{param}: {value}")print(f"最佳AUC分数: {grid_search.best_score_:.4f}")# 获取最佳模型best_model = grid_search.best_estimator_return best_model, training_time# 4. 模型评估
@timeit
def evaluate_model(model, X_test, y_test):print("模型评估...")# 使用模型进行预测start_time = time.time()y_pred_proba = model.predict_proba(X_test)[:, 1] # 正类的概率y_pred = (y_pred_proba > 0.5).astype(int)prediction_time = time.time() - start_time# 计算评估指标auc = roc_auc_score(y_test, y_pred_proba)auprc = average_precision_score(y_test, y_pred_proba)f1 = f1_score(y_test, y_pred)precision = precision_score(y_test, y_pred)recall = recall_score(y_test, y_pred)accuracy = accuracy_score(y_test, y_pred)# 生成混淆矩阵cm = confusion_matrix(y_test, y_pred)tn, fp, fn, tp = cm.ravel()# 计算各类别指标precision_pos = tp / (tp + fp) if (tp + fp) > 0 else 0recall_pos = tp / (tp + fn) if (tp + fn) > 0 else 0f1_pos = 2 * (precision_pos * recall_pos) / (precision_pos + recall_pos) if (precision_pos + recall_pos) > 0 else 0precision_neg = tn / (tn + fn) if (tn + fn) > 0 else 0recall_neg = tn / (tn + fp) if (tn + fp) > 0 else 0f1_neg = 2 * (precision_neg * recall_neg) / (precision_neg + recall_neg) if (precision_neg + recall_neg) > 0 else 0# 生成分类报告(包含所有评估指标)report = f"""================= 分类报告 =================混淆矩阵:[[{tn} {fp}][{fn} {tp}]]正样本 (1):Precision: {precision_pos:.4f}Recall: {recall_pos:.4f}F1 Score: {f1_pos:.4f}负样本 (0):Precision: {precision_neg:.4f}Recall: {recall_neg:.4f}F1 Score: {f1_neg:.4f}整体评估指标:Accuracy: {accuracy:.4f}AUC: {auc:.4f}AUPRC: {auprc:.4f}F1 Score: {f1:.4f}Precision: {precision:.4f}Recall: {recall:.4f}"""# 返回时间信息和报告return {"training_time": None, # 将在主函数中设置"prediction_time": prediction_time}, report# 主函数
def main():# 1. 加载数据并转换为CSR矩阵X, y = load_and_prepare_data()# 2. 准备训练数据X, y, sample_weight = prepare_training_data(X, y)# 计算正负样本比例(用于scale_pos_weight参数)positive_count = sum(y == 1)negative_count = sum(y == 0)print(f"正负样本比例: {negative_count / positive_count:.2f}:1")# 3. 数据分割print("数据分割...")X_train, X_test, y_train, y_test, sample_weight_train, _ = train_test_split(X, y, sample_weight, test_size=0.2, random_state=42)print(f"训练集大小: {X_train.shape[0]}")print(f"测试集大小: {X_test.shape[0]}")# 4. 训练LightGBM模型print(f"\n{'=' * 50}")print("开始训练 LightGBM 模型(带精简网格寻参)")print(f"{'=' * 50}")model, training_time = train_lightgbm_model(X_train, y_train, sample_weight_train)# 5. 模型评估results, report = evaluate_model(model, X_test, y_test)results["training_time"] = training_time# 打印报告(包含所有评估指标)print(report)# 打印时间信息print(f"\n===== 时间统计 =====")print(f"训练时间: {results['training_time']:.2f}秒")print(f"预测时间: {results['prediction_time']:.2f}秒")print("建模流程完成!")spark.stop()# [Info] 最佳参数组合:# [Info] min_child_samples: 50# [Info] num_leaves: 63# [Info] reg_alpha: 0.1# [Info] reg_lambda: 0# [Info] 最佳AUC分数: 0.6631# [Info] 完成: train_lightgbm_model | 耗时: 15272.61秒# [Info] 模型保存成功!if __name__ == "__main__":main()