python37天打卡
知识点回顾:
过拟合的判断:测试集和训练集同步打印指标
模型的保存和加载
仅保存权重
保存权重和模型
保存全部信息checkpoint,还包含训练状态
早停策略
作业:对信贷数据集训练后保存权重,加载权重后继续训练50轮,并采取早停策略
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,confusion_matrix)
import matplotlib.pyplot as plt
import seaborn as sns
import os
from pathlib import Path
from typing import Tuple, Dict, List, Optional# --- 1. 配置常量 ---
# 使用Path对象处理路径,确保跨平台兼容性
BASE_DIR = Path(__file__).parent.resolve()
DATA_PATH = BASE_DIR / "data" / "credit_risk_data.csv"
MODEL_SAVE_DIR = BASE_DIR / "saved_models"
MODEL_SAVE_PATH = MODEL_SAVE_DIR / "credit_risk_model.pth"# 确保目录存在
MODEL_SAVE_DIR.mkdir(parents=True, exist_ok=True)# 训练超参数
RANDOM_SEED = 42
BATCH_SIZE = 64
LEARNING_RATE = 0.001
NUM_EPOCHS = 50
HIDDEN_LAYER_SIZES = [128, 64, 32] # 隐藏层配置
DROPOUT_RATE = 0.3 # 添加dropout防止过拟合# 设备配置
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {DEVICE}")# 设置随机种子确保可复现性
def set_seed(seed):torch.manual_seed(seed)np.random.seed(seed)if torch.cuda.is_available():torch.cuda.manual_seed_all(seed)torch.backends.cudnn.deterministic = Truetorch.backends.cudnn.benchmark = Falseset_seed(RANDOM_SEED)# --- 2. 数据加载与预处理 ---
class CreditRiskDataset(Dataset):"""信贷风险数据集类"""def __init__(self, features: np.ndarray, labels: np.ndarray):self.features = torch.tensor(features, dtype=torch.float32)self.labels = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)def __len__(self) -> int:return len(self.labels)def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:return self.features[idx], self.labels[idx]def load_and_preprocess_data(file_path: Path, target_col: str = 'default') -> Tuple[DataLoader, DataLoader, DataLoader, StandardScaler]:"""加载并预处理数据"""print(f"加载数据: {file_path}")# 检查文件是否存在if not file_path.exists():raise FileNotFoundError(f"数据文件不存在: {file_path}")# 读取数据df = pd.read_csv(file_path)print(f"数据形状: {df.shape}")# 处理缺失值if df.isnull().sum().sum() > 0:print("处理缺失值...")for col in df.select_dtypes(include=np.number).columns:df[col].fillna(df[col].median(), inplace=True)# 分离特征和目标X = df.drop(target_col, axis=1).valuesy = df[target_col].values# 标准化特征scaler = StandardScaler()X_scaled = scaler.fit_transform(X)# 创建数据集full_dataset = CreditRiskDataset(X_scaled, y)# 划分数据集 (70% 训练, 15% 验证, 15% 测试)train_size = int(0.7 * len(full_dataset))val_size = int(0.15 * len(full_dataset))test_size = len(full_dataset) - train_size - val_sizetrain_dataset, val_dataset, test_dataset = random_split(full_dataset, [train_size, val_size, test_size], generator=torch.Generator().manual_seed(RANDOM_SEED))# 创建数据加载器train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)print(f"数据集划分: 训练集 {len(train_dataset)} | 验证集 {len(val_dataset)} | 测试集 {len(test_dataset)}")return train_loader, val_loader, test_loader, scaler# --- 3. 模型架构 ---
class CreditRiskPredictor(nn.Module):"""信贷风险预测神经网络"""def __init__(self, input_size: int, hidden_sizes: List[int], output_size: int = 1):super().__init__()layers = []prev_size = input_size# 构建隐藏层for i, h_size in enumerate(hidden_sizes):layers.append(nn.Linear(prev_size, h_size))layers.append(nn.BatchNorm1d(h_size)) # 添加批归一化layers.append(nn.ReLU())layers.append(nn.Dropout(DROPOUT_RATE)) # 添加dropoutprev_size = h_size# 输出层layers.append(nn.Linear(prev_size, output_size))self.model = nn.Sequential(*layers)def forward(self, x: torch.Tensor) -> torch.Tensor:return self.model(x)# --- 4. 训练函数 ---
def train_model(model, train_loader, val_loader, optimizer, criterion, epochs, device):"""训练模型并返回训练历史"""history = {'train_loss': [], 'val_loss': [], 'val_auc': []}best_val_loss = float('inf')model.to(device)for epoch in range(epochs):# 训练阶段model.train()train_loss = 0.0for inputs, targets in train_loader:inputs, targets = inputs.to(device), targets.to(device)optimizer.zero_grad()outputs = model(inputs)loss = criterion(outputs, targets)loss.backward()optimizer.step()train_loss += loss.item() * inputs.size(0)train_loss = train_loss / len(train_loader.dataset)history['train_loss'].append(train_loss)# 验证阶段model.eval()val_loss = 0.0all_targets = []all_probs = []with torch.no_grad():for inputs, targets in val_loader:inputs, targets = inputs.to(device), targets.to(device)outputs = model(inputs)loss = criterion(outputs, targets)val_loss += loss.item() * inputs.size(0)probs = torch.sigmoid(outputs)all_targets.extend(targets.cpu().numpy())all_probs.extend(probs.cpu().numpy())val_loss = val_loss / len(val_loader.dataset)val_auc = roc_auc_score(all_targets, all_probs)history['val_loss'].append(val_loss)history['val_auc'].append(val_auc)# 保存最佳模型if val_loss < best_val_loss:best_val_loss = val_losstorch.save(model.state_dict(), MODEL_SAVE_PATH)print(f"保存最佳模型 @ Epoch {epoch+1}, Val Loss: {val_loss:.4f}, AUC: {val_auc:.4f}")print(f"Epoch {epoch+1}/{epochs} | "f"Train Loss: {train_loss:.4f} | "f"Val Loss: {val_loss:.4f} | "f"AUC: {val_auc:.4f}")# 绘制训练历史plt.figure(figsize=(12, 5))plt.subplot(1, 2, 1)plt.plot(history['train_loss'], label='Train Loss')plt.plot(history['val_loss'], label='Validation Loss')plt.title('Training and Validation Loss')plt.xlabel('Epoch')plt.ylabel('Loss')plt.legend()plt.grid(True)plt.subplot(1, 2, 2)plt.plot(history['val_auc'], 'g-', label='Validation AUC')plt.title('Validation AUC')plt.xlabel('Epoch')plt.ylabel('AUC')plt.legend()plt.grid(True)plt.tight_layout()plt.savefig('training_history.png')plt.show()return history# --- 5. 评估函数 ---
def evaluate_model(model, test_loader, device):"""评估模型性能"""model.eval()model.to(device)all_targets = []all_preds = []all_probs = []with torch.no_grad():for inputs, targets in test_loader:inputs, targets = inputs.to(device), targets.to(device)outputs = model(inputs)probs = torch.sigmoid(outputs)preds = (probs > 0.5).float()all_targets.extend(targets.cpu().numpy())all_preds.extend(preds.cpu().numpy())all_probs.extend(probs.cpu().numpy())# 计算指标metrics = {'accuracy': accuracy_score(all_targets, all_preds),'precision': precision_score(all_targets, all_preds),'recall': recall_score(all_targets, all_preds),'f1': f1_score(all_targets, all_preds),'roc_auc': roc_auc_score(all_targets, all_probs)}# 打印指标print("\n模型评估结果:")for metric, value in metrics.items():print(f"{metric.capitalize()}: {value:.4f}")# 绘制混淆矩阵cm = confusion_matrix(all_targets, all_preds)plt.figure(figsize=(8, 6))sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['No Default', 'Default'],yticklabels=['No Default', 'Default'])plt.title('Confusion Matrix')plt.xlabel('Predicted')plt.ylabel('Actual')plt.savefig('confusion_matrix.png')plt.show()return metrics# --- 6. 主函数 ---
def main():# 加载数据try:train_loader, val_loader, test_loader, scaler = load_and_preprocess_data(DATA_PATH)except Exception as e:print(f"数据加载失败: {e}")return# 初始化模型sample_features, _ = next(iter(train_loader))input_size = sample_features.shape[1]model = CreditRiskPredictor(input_size, HIDDEN_LAYER_SIZES)print(f"模型架构:\n{model}")# 损失函数和优化器criterion = nn.BCEWithLogitsLoss()optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)# 训练模型history = train_model(model, train_loader, val_loader, optimizer, criterion, NUM_EPOCHS, DEVICE)# 加载最佳模型进行评估best_model = CreditRiskPredictor(input_size, HIDDEN_LAYER_SIZES)best_model.load_state_dict(torch.load(MODEL_SAVE_PATH))best_model.to(DEVICE)# 在测试集上评估test_metrics = evaluate_model(best_model, test_loader, DEVICE)# 示例预测sample_idx = np.random.randint(0, len(test_loader.dataset))sample_data, true_label = test_loader.dataset[sample_idx]best_model.eval()with torch.no_grad():sample_data = sample_data.unsqueeze(0).to(DEVICE)logit = best_model(sample_data)prob = torch.sigmoid(logit).item()pred = 1 if prob > 0.5 else 0print(f"\n示例预测:")print(f"原始特征: {sample_data.cpu().numpy().squeeze()}")print(f"真实标签: {true_label.item()}")print(f"预测概率: {prob:.4f}")print(f"预测结果: {pred}")if __name__ == "__main__":main()
@浙大疏锦行