DAY 53 对抗生成网络
知识点回顾:
- 对抗生成网络的思想:关注损失从何而来
- 生成器、判别器
- nn.sequential容器:适合于按顺序运算的情况,简化前向传播写法
- leakyReLU介绍:避免relu的神经元失活现象
ps;如果你学有余力,对于gan的损失函数的理解,建议去找找视频看看,如果只是用,没必要学
作业:对于心脏病数据集,对于病人这个不平衡的样本用GAN来学习并生成病人样本,观察不用GAN和用GAN的F1分数差异。
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt# 加载数据
def load_data():# 这里需要替换为实际的心脏病数据集路径# df = pd.read_csv('heart_disease.csv')# 为了示例,我们创建一个模拟数据集np.random.seed(42)n_samples = 1000n_features = 13# 生成健康人群特征 (标签0)healthy_features = np.random.randn(800, n_features) * 0.5 + 2# 生成病人特征 (标签1) - 数量较少,导致类别不平衡patient_features = np.random.randn(200, n_features) * 0.5 + 3# 合并特征和标签features = np.vstack([healthy_features, patient_features])labels = np.hstack([np.zeros(800), np.ones(200)])# 创建DataFramecolumns = [f'feature_{i}' for i in range(n_features)]df = pd.DataFrame(features, columns=columns)df['target'] = labelsreturn df# 构建生成器网络
class Generator(nn.Module):def __init__(self, input_dim, output_dim):super(Generator, self).__init__()self.model = nn.Sequential(nn.Linear(input_dim, 64),nn.LeakyReLU(0.2),nn.Linear(64, 128),nn.LeakyReLU(0.2),nn.Linear(128, 64),nn.LeakyReLU(0.2),nn.Linear(64, output_dim),nn.Tanh() # 输出范围限制在[-1, 1]之间)def forward(self, z):return self.model(z)# 构建判别器网络
class Discriminator(nn.Module):def __init__(self, input_dim):super(Discriminator, self).__init__()self.model = nn.Sequential(nn.Linear(input_dim, 64),nn.LeakyReLU(0.2),nn.Dropout(0.3),nn.Linear(64, 32),nn.LeakyReLU(0.2),nn.Dropout(0.3),nn.Linear(32, 1),nn.Sigmoid() # 输出为概率值)def forward(self, x):return self.model(x)# 训练GAN
def train_gan(generator, discriminator, dataloader, n_epochs, latent_dim, device):# 优化器g_optimizer = optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))d_optimizer = optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))# 损失函数criterion = nn.BCELoss()# 训练记录g_losses = []d_losses = []for epoch in range(n_epochs):epoch_g_loss = 0epoch_d_loss = 0for i, (real_samples, _) in enumerate(dataloader):batch_size = real_samples.size(0)# 真实样本标签为1,生成样本标签为0real_labels = torch.ones(batch_size, 1).to(device)fake_labels = torch.zeros(batch_size, 1).to(device)# 训练判别器d_optimizer.zero_grad()# 真实样本的损失real_outputs = discriminator(real_samples)d_loss_real = criterion(real_outputs, real_labels)# 生成样本z = torch.randn(batch_size, latent_dim).to(device)fake_samples = generator(z)# 生成样本的损失fake_outputs = discriminator(fake_samples.detach())d_loss_fake = criterion(fake_outputs, fake_labels)# 总判别器损失d_loss = d_loss_real + d_loss_faked_loss.backward()d_optimizer.step()# 训练生成器g_optimizer.zero_grad()# 生成样本的损失 - 希望判别器将生成样本识别为真实样本fake_outputs = discriminator(fake_samples)g_loss = criterion(fake_outputs, real_labels)g_loss.backward()g_optimizer.step()# 累加损失epoch_d_loss += d_loss.item()epoch_g_loss += g_loss.item()# 计算平均损失epoch_d_loss /= len(dataloader)epoch_g_loss /= len(dataloader)g_losses.append(epoch_g_loss)d_losses.append(epoch_d_loss)if (epoch + 1) % 10 == 0:print(f'Epoch [{epoch+1}/{n_epochs}], D Loss: {epoch_d_loss:.4f}, G Loss: {epoch_g_loss:.4f}')# 绘制损失曲线plt.figure(figsize=(10, 5))plt.plot(g_losses, label='Generator Loss')plt.plot(d_losses, label='Discriminator Loss')plt.xlabel('Epochs')plt.ylabel('Loss')plt.legend()plt.title('GAN Training Loss')plt.savefig('gan_loss.png')plt.close()return generator# 使用GAN生成样本
def generate_samples(generator, n_samples, latent_dim, scaler, device):# 生成随机噪声z = torch.randn(n_samples, latent_dim).to(device)# 生成样本generator.eval()with torch.no_grad():generated_samples = generator(z).cpu().numpy()# 反标准化generated_samples = scaler.inverse_transform(generated_samples)return generated_samples# 评估模型性能
def evaluate_model(X_train, y_train, X_test, y_test):# 使用随机森林分类器clf = RandomForestClassifier(random_state=42)clf.fit(X_train, y_train)# 预测y_pred = clf.predict(X_test)# 计算F1分数f1 = f1_score(y_test, y_pred)return f1# 主函数
def main():# 设置设备device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')print(f'Using device: {device}')# 加载数据df = load_data()print(f"数据形状: {df.shape}")print(f"类别分布:\n{df['target'].value_counts()}")# 准备特征和标签X = df.drop('target', axis=1).valuesy = df['target'].values# 划分训练集和测试集X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# 标准化特征scaler = StandardScaler()X_train_scaled = scaler.fit_transform(X_train)X_test_scaled = scaler.transform(X_test)# 分离病人和健康人的样本patient_indices = np.where(y_train == 1)[0]healthy_indices = np.where(y_train == 0)[0]X_patients = X_train_scaled[patient_indices]X_healthy = X_train_scaled[healthy_indices]# 计算需要生成的病人样本数量,使类别平衡n_healthy = len(healthy_indices)n_patients = len(patient_indices)n_samples_to_generate = n_healthy - n_patientsprint(f"健康样本数量: {n_healthy}")print(f"病人样本数量: {n_patients}")print(f"需要生成的病人样本数量: {n_samples_to_generate}")# 如果有必要生成样本if n_samples_to_generate > 0:# 准备GAN训练数据patient_dataset = TensorDataset(torch.FloatTensor(X_patients))patient_dataloader = DataLoader(patient_dataset, batch_size=32, shuffle=True)# 初始化模型input_dim = X_patients.shape[1]latent_dim = 10generator = Generator(latent_dim, input_dim).to(device)discriminator = Discriminator(input_dim).to(device)# 训练GANprint("开始训练GAN...")trained_generator = train_gan(generator, discriminator, patient_dataloader, n_epochs=100, latent_dim=latent_dim, device=device)# 生成新的病人样本print("生成新的病人样本...")generated_patients = generate_samples(trained_generator, n_samples_to_generate, latent_dim, scaler, device)# 创建生成样本的标签generated_labels = np.ones(n_samples_to_generate)# 将生成的样本与原始训练数据合并X_train_augmented = np.vstack([X_train, generated_patients])y_train_augmented = np.hstack([y_train, generated_labels])print(f"增强后的训练数据形状: {X_train_augmented.shape}")print(f"增强后的类别分布: {np.bincount(y_train_augmented.astype(int))}")# 评估原始数据上的模型性能f1_original = evaluate_model(X_train, y_train, X_test, y_test)print(f"原始数据上的F1分数: {f1_original:.4f}")# 评估增强数据上的模型性能f1_augmented = evaluate_model(X_train_augmented, y_train_augmented, X_test, y_test)print(f"增强数据上的F1分数: {f1_augmented:.4f}")# 打印结果比较print(f"\nF1分数提升: {f1_augmented - f1_original:.4f}")print(f"提升百分比: {(f1_augmented - f1_original) / f1_original * 100:.2f}%")else:print("数据已经平衡,不需要生成额外样本")if __name__ == "__main__":main()