python学习打卡:DAY 37 早停策略和模型权重的保存
浙大疏锦行-CSDN博客
知识点回顾:
- 过拟合的判断:测试集和训练集同步打印指标
- 模型的保存和加载
- 仅保存权重
- 保存权重和模型
- 保存全部信息checkpoint,还包含训练状态
- 早停策略
作业:对信贷数据集训练后保存权重,加载权重后继续训练50轮,并采取早停策略
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
import pandas as pd
import numpy as npwarnings.filterwarnings("ignore")# 1. 数据加载和预处理
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")# 假设 data.csv 在当前目录下
try:data = pd.read_csv('data.csv')
except FileNotFoundError:print("错误:'data.csv' 文件未找到。请确保该文件与脚本在同一目录下。")exit()# 离散特征映射
home_ownership_mapping = {'Own Home': 1,'Rent': 2,'Have Mortgage': 3,'Home Mortgage': 4
}
data['Home Ownership'] = data['Home Ownership'].map(home_ownership_mapping)years_in_job_mapping = {'< 1 year': 1, '1 year': 2, '2 years': 3, '3 years': 4,'4 years': 5, '5 years': 6, '6 years': 7, '7 years': 8,'8 years': 9, '9 years': 10, '10+ years': 11
}
data['Years in current job'] = data['Years in current job'].map(years_in_job_mapping)term_mapping = {'Short Term': 0,'Long Term': 1
}
data['Term'] = data['Term'].map(term_mapping)# 对 'Purpose' 列进行独热编码
data = pd.get_dummies(data, columns=['Purpose'], dummy_na=False)# 填充所有特征列的缺失值
for feature in data.columns:if data[feature].isnull().sum() > 0:# 对于数值型特征,使用中位数填充;对于分类型,使用众数if data[feature].dtype == 'float64' or data[feature].dtype == 'int64':mode_value = data[feature].median()else:mode_value = data[feature].mode()[0]data[feature].fillna(mode_value, inplace=True)# 划分特征和目标变量
X = data.drop(['Credit Default'], axis=1)
y = data['Credit Default']# 检查目标变量的类别数
num_classes = len(np.unique(y))
print(f"目标变量中的类别数量: {num_classes}")# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)# 特征缩放
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)# 转换为PyTorch张量
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train.values).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test.values).to(device)# 2. 模型定义 (在数据准备好之后,动态获取维度)
class MLP(nn.Module):def __init__(self, input_dim, output_dim):super(MLP, self).__init__()# 【修正1】使用动态的输入维度self.fc1 = nn.Linear(input_dim, 128) # 增加了隐藏层节点数self.relu1 = nn.ReLU()self.dropout1 = nn.Dropout(0.5) # 添加Dropout防止过拟合self.fc2 = nn.Linear(128, 64)self.relu2 = nn.ReLU()self.dropout2 = nn.Dropout(0.5)# 【修正2】使用动态的输出维度(类别数)self.fc3 = nn.Linear(64, output_dim)def forward(self, x):out = self.fc1(x)out = self.relu1(out)out = self.dropout1(out)out = self.fc2(out)out = self.relu2(out)out = self.dropout2(out)out = self.fc3(out)return out# 使用正确的维度实例化模型
input_features = X_train.shape[1]
model = MLP(input_dim=input_features, output_dim=num_classes).to(device)
print("\n模型结构:")
print(model)# 3. 训练过程
# 【修正3】修正了变量名 `criterion` -> `criterionm`
criterionm = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) # 使用 Adam 优化器可能效果更好num_epochs = 10000 # 减少轮次,因为有早停机制
train_losses = []
test_losses = []
epochs_list = []
best_test_loss = float('inf')
best_epoch = 0
patience = 20 # 早停的耐心值
counter = 0
early_stopped = Falseprint(f"\n开始训练,共 {num_epochs} 轮,输入特征数: {input_features}...")
start_time = time.time()with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:for epoch in range(num_epochs):model.train() # 确保模型在训练模式outputs = model(X_train)train_loss = criterionm(outputs, y_train)optimizer.zero_grad()train_loss.backward()optimizer.step()if (epoch + 1) % 10 == 0: # 评估频率可以提高model.eval() # 切换到评估模式with torch.no_grad():test_outputs = model(X_test)test_loss = criterionm(test_outputs, y_test)train_losses.append(train_loss.item())test_losses.append(test_loss.item())epochs_list.append(epoch + 1)pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})if test_loss.item() < best_test_loss:best_test_loss = test_loss.item()best_epoch = epoch + 1counter = 0torch.save(model.state_dict(), 'best_model.pth')else:counter += 1if counter >= patience:print(f"\n早停触发!在第 {epoch+1} 轮,测试集损失已有 {patience*10} 轮(评估间隔)未改善。")print(f"最佳测试集损失出现在第 {best_epoch} 轮,损失值为 {best_test_loss:.4f}")early_stopped = Truebreakpbar.update(1)time_all = time.time() - start_time
print(f'训练用时: {time_all:.2f} 秒')if early_stopped:print(f"加载第 {best_epoch} 轮的最佳模型进行最终评估...")model.load_state_dict(torch.load('best_model.pth'))# 4. 绘图和评估
plt.figure(figsize=(10, 6))
plt.plot(epochs_list, train_losses, label='Train Loss')
plt.plot(epochs_list, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()model.eval()
with torch.no_grad():outputs = model(X_test)_, predicted = torch.max(outputs, 1)correct = (predicted == y_test).sum().item()accuracy = correct / y_test.size(0)print(f'测试集最终准确率: {accuracy * 100:.2f}%')torch.save(model.state_dict(), "model_weights.pth")resumed_model = MLP(input_dim=input_features, output_dim=num_classes).to(device)
print("模型结构已重新创建。")try:resumed_model.load_state_dict(torch.load(WEIGHTS_FILE))print(f"加载权重")
except FileNotFoundError:print(f"错误: 权重文件不存在。请先完成初始训练。")exit()resumed_model.train()
optimizer = optim.Adam(resumed_model.parameters(), lr=0.0001)
print("3. 模型已设置为训练模式,优化器已准备就绪。")num_additional_epochs = 50
print(f"开始进行额外的 {num_additional_epochs} 轮训练...")for epoch in range(num_additional_epochs):outputs = resumed_model(X_train_tensor)loss = criterion(outputs, y_train_tensor)optimizer.zero_grad()loss.backward()optimizer.step()if (epoch + 1) % 10 == 0:print(f'额外训练周期 [{epoch+1}/{num_additional_epochs}], 损失: {loss.item():.4f}')print("额外训练完成!")print("评估经过额外训练后的最终模型")
resumed_model.eval()
with torch.no_grad():final_outputs = resumed_model(X_test_tensor)_, predicted = torch.max(final_outputs, 1)correct = (predicted == y_test_tensor).sum().item()accuracy = correct / y_test_tensor.size(0)print(f'测试集最终准确率: {accuracy * 100:.2f}%')