基于PassGAN的密码训练系统设计与实现
基于PassGAN的密码训练系统设计与实现
1. 项目概述与背景
密码安全一直是网络安全领域的核心问题。随着网络攻击手段的不断演进,传统的密码强度检测方法已逐渐显露出局限性。基于规则的密码强度检查器往往无法有效识别复杂的密码模式,而基于黑名单的方法又难以应对新型的密码攻击。在这种背景下,利用人工智能技术特别是生成对抗网络(GAN)来分析和生成密码模式,为密码安全研究提供了新的方向。
PassGAN是一种基于生成对抗网络的密码生成模型,它能够通过学习真实密码数据集中的统计规律,生成类似真实密码的样本。与传统方法不同,PassGAN不需要手工制定密码生成规则,而是通过无监督学习自动发现密码的分布特征。这使得PassGAN能够生成更加多样化和符合真实情况的密码样本,可用于密码强度评估、密码破解测试和密码策略改进等多个领域。
本项目旨在构建一个完整的基于PassGAN的密码训练系统,该系统将实现数据预处理、模型训练、密码生成和评估等完整流程,为密码安全研究提供一个强大的工具。
2. 系统架构设计
2.1 整体架构
本系统采用模块化设计,主要包含以下核心组件:
- 数据预处理模块:负责密码数据的清洗、格式化和向量化
- 模型定义模块:实现PassGAN的生成器和判别器网络结构
- 训练模块:管理GAN的训练过程,包括损失计算和参数优化
- 生成模块:使用训练好的模型生成密码样本
- 评估模块:对生成的密码质量进行定量和定性评估
- 用户界面模块:提供命令行和可视化界面供用户交互
2.2 技术栈选择
- 深度学习框架:PyTorch(灵活性强,动态图机制适合研究)
- 数据处理:Pandas + Numpy
- 可视化:Matplotlib + Seaborn
- 进度显示:tqdm
- 并行处理:多进程/多线程(可选)
2.3 数据流设计
系统数据流遵循以下流程:
原始密码数据 → 数据清洗 → 字符编码 → 批量训练 → 模型保存 → 密码生成 → 结果评估
3. 环境配置与依赖管理
3.1 Python环境设置
# 创建conda环境
conda create -n passgan python=3.8
conda activate passgan# 安装核心依赖
pip install torch==1.9.0+cu111 torchvision==0.10.0+cu111 -f https://download.pytorch.org/whl/torch_stable.html
pip install pandas numpy matplotlib seaborn tqdm scikit-learn# 项目目录结构
"""
passgan-system/
├── data/
│ ├── raw/ # 原始数据
│ ├── processed/ # 处理后的数据
│ └── generated/ # 生成的密码
├── models/ # 模型保存目录
├── src/
│ ├── preprocessing/ # 数据预处理
│ ├── models/ # 模型定义
│ ├── training/ # 训练逻辑
│ ├── generation/ # 密码生成
│ ├── evaluation/ # 评估模块
│ └── utils/ # 工具函数
├── configs/ # 配置文件
├── tests/ # 测试代码
└── docs/ # 文档
"""
3.2 配置文件设计
# configs/default.yaml
data:input_path: "data/raw/rockyou.txt"output_path: "data/processed/encoded_passwords.npy"min_length: 4max_length: 16train_ratio: 0.8vocab_size: 256 # ASCII字符集model:latent_dim: 100gen_hidden_dim: 512disc_hidden_dim: 512seq_length: 16training:batch_size: 64epochs: 1000gen_learning_rate: 0.0002disc_learning_rate: 0.0002beta1: 0.5beta2: 0.999disc_iterations: 1 # 判别器训练次数sample_interval: 100generation:num_samples: 10000temperature: 0.8evaluation:test_size: 1000metrics: ["unique_ratio", "novelty", "similarity"]
4. 数据预处理模块
4.1 数据加载与清洗
# src/preprocessing/data_loader.py
import pandas as pd
import numpy as np
import re
from tqdm import tqdm
import loggingclass PasswordDataLoader:def __init__(self, config):self.config = configself.logger = logging.getLogger(__name__)def load_raw_data(self, file_path):"""加载原始密码数据"""self.logger.info(f"Loading data from {file_path}")try:with open(file_path, 'r', encoding='latin-1') as f:passwords = [line.strip() for line in f.readlines()]return passwordsexcept Exception as e:self.logger.error(f"Error loading data: {e}")raisedef clean_data(self, passwords):"""清洗数据,过滤无效密码"""self.logger.info("Cleaning data...")cleaned = []for pwd in tqdm(passwords):# 过滤空密码和过长密码if not pwd or len(pwd) < self.config['data']['min_length']:continueif len(pwd) > self.config['data']['max_length']:continue# 可选:过滤非ASCII字符if self.config['data'].get('ascii_only', True):try:pwd.encode('ascii')except UnicodeEncodeError:continuecleaned.append(pwd)self.logger.info(f"Original: {len(passwords)}, Cleaned: {len(cleaned)}")return cleaneddef analyze_dataset(self, passwords):"""分析数据集特征"""self.logger.info("Analyzing dataset...")lengths = [len(p) for p in passwords]stats = {'total_count': len(passwords),'avg_length': np.mean(lengths),'min_length': np.min(lengths),'max_length': np.max(lengths),'unique_ratio': len(set(passwords)) / len(passwords)}self.logger.info(f"Dataset stats: {stats}")return stats
4.2 字符编码与向量化
# src/preprocessing/encoder.py
import numpy as np
from collections import Counterclass PasswordEncoder:def __init__(self, config):self.config = configself.char_to_idx = {}self.idx_to_char = {}self.vocab_size = config['data']['vocab_size']self.seq_length = config['model']['seq_length']self.build_vocab()def build_vocab(self):"""构建字符词汇表"""# ASCII字符集for i in range(self.vocab_size):self.char_to_idx[chr(i)] = iself.idx_to_char[i] = chr(i)# 添加特殊令牌self.pad_token = '<PAD>'self.start_token = '<START>'self.end_token = '<END>'self.unk_token = '<UNK>'special_tokens = [self.pad_token, self.start_token, self.end_token, self.unk_token]for idx, token in enumerate(special_tokens):actual_idx = self.vocab_size + idxself.char_to_idx[token] = actual_idxself.idx_to_char[actual_idx] = tokendef encode(self, password):"""将密码编码为数字序列"""encoded = []# 添加开始令牌encoded.append(self.char_to_idx[self.start_token])for char in password:if char in self.char_to_idx:encoded.append(self.char_to_idx[char])else:encoded.append(self.char_to_idx[self.unk_token])# 添加结束令牌encoded.append(self.char_to_idx[self.end_token])# 填充或截断if len(encoded) < self.seq_length:encoded.extend([self.char_to_idx[self.pad_token]] * (self.seq_length - len(encoded)))else:encoded = encoded[:self.seq_length-1] + [self.char_to_idx[self.end_token]]return encodeddef decode(self, encoded_seq):"""将数字序列解码为密码"""password = []for idx in encoded_seq:if idx in self.idx_to_char:char = self.idx_to_char[idx]if char == self.end_token:breakif char not in [self.pad_token, self.start_token]:password.append(char)else:password.append(self.unk_token)return ''.join(password)def batch_encode(self, passwords):"""批量编码密码"""encoded = np.zeros((len(passwords), self.seq_length), dtype=np.int64)for i, pwd in enumerate(passwords):encoded[i] = self.encode(pwd)return encoded
4.3 数据集类实现
# src/preprocessing/dataset.py
import torch
from torch.utils.data import Dataset, DataLoaderclass PasswordDataset(Dataset):def __init__(self, encoded_passwords):self.data = encoded_passwordsdef __len__(self):return len(self.data)def __getitem__(self, idx):return torch.tensor(self.data[idx], dtype=torch.long)def create_data_loaders(encoded_data, config):"""创建训练和测试数据加载器"""dataset = PasswordDataset(encoded_data)# 划分训练测试集train_size = int(config['data']['train_ratio'] * len(dataset))test_size = len(dataset) - train_sizetrain_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])train_loader = DataLoader(train_dataset, batch_size=config['training']['batch_size'],shuffle=True,num_workers=4,pin_memory=True)test_loader = DataLoader(test_dataset,batch_size=config['training']['batch_size'],shuffle=False,num_workers=4,pin_memory=True)return train_loader, test_loader
5. PassGAN模型实现
5.1 生成器网络
# src/models/generator.py
import torch
import torch.nn as nn
import torch.nn.functional as Fclass Generator(nn.Module):def __init__(self, config, encoder):super(Generator, self).__init__()self.config = configself.encoder = encoderself.vocab_size = encoder.vocab_size + 4 # 包括特殊令牌self.seq_length = config['model']['seq_length']self.latent_dim = config['model']['latent_dim']self.hidden_dim = config['model']['gen_hidden_dim']# 投影层:将噪声向量转换为LSTM初始状态self.projection = nn.Linear(self.latent_dim, 2 * self.hidden_dim)# LSTM层self.lstm = nn.LSTM(input_size=self.latent_dim,hidden_size=self.hidden_dim,num_layers=2,batch_first=True,dropout=0.2,bidirectional=False)# 输出层self.fc = nn.Linear(self.hidden_dim, self.vocab_size)# 初始化权重self.apply(self._init_weights)def _init_weights(self, module):if isinstance(module, nn.Linear):nn.init.xavier_uniform_(module.weight)if module.bias is not None:nn.init.constant_(module.bias, 0)elif isinstance(module, nn.LSTM):for name, param in module.named_parameters():if 'weight_ih' in name:nn.init.xavier_uniform_(param.data)elif 'weight_hh' in name:nn.init.orthogonal_(param.data)elif 'bias' in name:nn.init.constant_(param.data, 0)def forward(self, z, temperature=1.0):"""前向传播z: 噪声向量 [batch_size, latent_dim]temperature: 温度参数控制生成多样性"""batch_size = z.size(0)# 扩展噪声向量以匹配序列长度z_expanded = z.unsqueeze(1).repeat(1, self.seq_length, 1)# LSTM前向传播lstm_out, _ = self.lstm(z_expanded)# 通过全连接层获取输出logitslogits = self.fc(lstm_out) # [batch_size, seq_length, vocab_size]# 应用温度参数logits = logits / temperature# 使用Gumbel-Softmax获取离散输出if self.training:# 训练时使用Gumbel-Softmax近似samples = F.gumbel_softmax(logits, tau=temperature, hard=True, dim=-1)else:# 推理时直接采样probs = F.softmax(logits, dim=-1)samples = torch.multinomial(probs.view(-1, self.vocab_size), 1)samples = samples.view(batch_size, self.seq_length)return samples, logitsdef generate(self, num_samples, temperature=1.0, device='cpu'):"""生成密码样本"""self.eval()with torch.no_grad():# 生成随机噪声z = torch.randn(num_samples, self.latent_dim).to(device)# 生成样本samples, _ = self.forward(z, temperature)# 转换为密码字符串generated_passwords = []for i in range(num_samples):seq = samples[i].cpu().numpy()password = self.encoder.decode(seq)generated_passwords.append(password)return generated_passwords
5.2 判别器网络
# src/models/discriminator.py
import torch
import torch.nn as nn
import torch.nn.functional as Fclass Discriminator(nn.Module):def __init__(self, config, encoder):super(Discriminator, self).__init__()self.config = configself.encoder = encoderself.vocab_size = encoder.vocab_size + 4self.seq_length = config['model']['seq_length']self.hidden_dim = config['model']['disc_hidden_dim']# 嵌入层self.embedding = nn.Embedding(self.vocab_size, 128)# 卷积网络self.conv_net = nn.Sequential(# 输入: [batch_size, 128, seq_length]nn.Conv1d(128, 256, kernel_size=3, padding=1),nn.LeakyReLU(0.2),nn.Dropout(0.2),nn.Conv1d(256, 512, kernel_size=3, padding=1),nn.LeakyReLU(0.2),nn.Dropout(0.2),nn.Conv1d(512, 1024, kernel_size=3, padding=1),nn.LeakyReLU(0.2),nn.Dropout(0.2),nn.AdaptiveMaxPool1d(1))# 全连接层self.fc = nn.Sequential(nn.Linear(1024, 512),nn.LeakyReLU(0.2),nn.Dropout(0.3),nn.Linear(512, 256),nn.LeakyReLU(0.2),nn.Dropout(0.3),nn.Linear(256, 1),nn.Sigmoid())self.apply(self._init_weights)def _init_weights(self, module):if isinstance(module, nn.Linear) or isinstance(module, nn.Conv1d):nn.init.xavier_uniform_(module.weight)if module.bias is not None:nn.init.constant_(module.bias, 0)elif isinstance(module, nn.Embedding):nn.init.normal_(module.weight, mean=0, std=0.02)def forward(self, x):"""前向传播x: 输入序列 [batch_size, seq_length]"""# 嵌入层embedded = self.embedding(x) # [batch_size, seq_length, embedding_dim]embedded = embedded.transpose(1, 2) # [batch_size, embedding_dim, seq_length]# 卷积网络features = self.conv_net(embedded) # [batch_size, 1024, 1]features = features.squeeze(2) # [batch_size, 1024]# 全连接层validity = self.fc(features) # [batch_size, 1]return validity
5.3 完整PassGAN模型
# src/models/passgan.py
import torch
import torch.nn as nn
from .generator import Generator
from .discriminator import Discriminatorclass PassGAN:def __init__(self, config, encoder, device='cpu'):self.config = configself.encoder = encoderself.device = device# 初始化生成器和判别器self.generator = Generator(config, encoder).to(device)self.discriminator = Discriminator(config, encoder).to(device)# 优化器self.optimizer_G = torch.optim.Adam(self.generator.parameters(),lr=config['training']['gen_learning_rate'],betas=(config['training']['beta1'], config['training']['beta2']))self.optimizer_D = torch.optim.Adam(self.discriminator.parameters(),lr=config['training']['disc_learning_rate'],betas=(config['training']['beta1'], config['training']['beta2']))# 损失函数self.adversarial_loss = nn.BCELoss()# 训练历史self.history = {'d_loss': [], 'g_loss': [], 'd_real': [], 'd_fake': []}def train_step(self, real_passwords):"""单次训练步骤"""batch_size = real_passwords.size(0)# 真实和假标签real_labels = torch.ones(batch_size, 1).to(self.device)fake_labels = torch.zeros(batch_size, 1).to(self.device)# ---------------------# 训练判别器# ---------------------self.discriminator.train()self.generator.eval()self.optimizer_D.zero_grad()# 真实样本的损失real_validity = self.discriminator(real_passwords)d_real_loss = self.adversarial_loss(real_validity, real_labels)# 生成假样本z = torch.randn(batch_size, self.config['model']['latent_dim']).to(self.device)fake_passwords, _ = self.generator(z)# 假样本的损失fake_validity = self.discriminator(fake_passwords.detach())d_fake_loss = self.adversarial_loss(fake_validity, fake_labels)# 总判别器损失d_loss = d_real_loss + d_fake_lossd_loss.backward()self.optimizer_D.step()# ---------------------# 训练生成器# ---------------------self.discriminator.eval()self.generator.train()self.optimizer_G.zero_grad()# 生成新样本z = torch.randn(batch_size, self.config['model']['latent_dim']).to(self.device)gen_passwords, _ = self.generator(z)# 生成器损失:让判别器认为生成的样本是真实的gen_validity = self.discriminator(gen_passwords)g_loss = self.adversarial_loss(gen_validity, real_labels)g_loss.backward()self.optimizer_G.step()# 记录训练指标return {'d_loss': d_loss.item(),'g_loss': g_loss.item(),'d_real': real_validity.mean().item(),'d_fake': fake_validity.mean().item()}def sample(self, num_samples, temperature=1.0):"""生成密码样本"""return self.generator.generate(num_samples, temperature, self.device)def save_models(self, path):"""保存模型"""torch.save({'generator': self.generator.state_dict(),'discriminator': self.discriminator.state_dict(),'optimizer_G': self.optimizer_G.state_dict(),'optimizer_D': self.optimizer_D.state_dict(),'history': self.history}, path)def load_models(self, path):"""加载模型"""checkpoint = torch.load(path, map_location=self.device)self.generator.load_state_dict(checkpoint['generator'])self.discriminator.load_state_dict(checkpoint['discriminator'])self.optimizer_G.load_state_dict(checkpoint['optimizer_G'])self.optimizer_D.load_state_dict(checkpoint['optimizer_D'])self.history = checkpoint['history']
6. 训练模块实现
6.1 训练循环
# src/training/trainer.py
import torch
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import os
import loggingclass PassGANTrainer:def __init__(self, model, train_loader, config, device='cpu'):self.model = modelself.train_loader = train_loaderself.config = configself.device = deviceself.logger = logging.getLogger(__name__)# 创建输出目录os.makedirs('models', exist_ok=True)os.makedirs('results', exist_ok=True)def train(self):"""完整的训练过程"""num_epochs = self.config['training']['epochs']sample_interval = self.config['training']['sample_interval']self.logger.info("Starting training...")for epoch in range(num_epochs):epoch_d_loss = 0epoch_g_loss = 0epoch_d_real = 0epoch_d_fake = 0batch_count = 0with tqdm(self.train_loader, desc=f'Epoch {epoch+1}/{num_epochs}') as pbar:for batch_idx, real_passwords in enumerate(pbar):real_passwords = real_passwords.to(self.device)# 训练一个批次metrics = self.model.train_step(real_passwords)# 累计指标epoch_d_loss += metrics['d_loss']epoch_g_loss += metrics['g_loss']epoch_d_real += metrics['d_real']epoch_d_fake += metrics['d_fake']batch_count += 1# 更新进度条pbar.set_postfix({'D Loss': f"{metrics['d_loss']:.4f}",'G Loss': f"{metrics['g_loss']:.4f}",'D Real': f"{metrics['d_real']:.4f}",'D Fake': f"{metrics['d_fake']:.4f}"})# 计算epoch平均指标avg_d_loss = epoch_d_loss / batch_countavg_g_loss = epoch_g_loss / batch_countavg_d_real = epoch_d_real / batch_countavg_d_fake = epoch_d_fake / batch_count# 记录历史self.model.history['d_loss'].append(avg_d_loss)self.model.history['g_loss'].append(avg_g_loss)self.model.history['d_real'].append(avg_d_real)self.model.history['d_fake'].append(avg_d_fake)self.logger.info(f"Epoch {epoch+1}/{num_epochs} | "f"D Loss: {avg_d_loss:.4f} | G Loss: {avg_g_loss:.4f} | "f"D Real: {avg_d_real:.4f} | D Fake: {avg_d_fake:.4f}")# 定期采样和保存if (epoch + 1) % sample_interval == 0:self._sample_and_save(epoch + 1)self._save_checkpoint(epoch + 1)self._plot_training_history()def _sample_and_save(self, epoch):"""生成样本并保存"""# 生成样本samples = self.model.sample(100, temperature=0.8)# 保存样本sample_file = f"results/samples_epoch_{epoch}.txt"with open(sample_file, 'w') as f:for pwd in samples:f.write(f"{pwd}\n")self.logger.info(f"Saved samples to {sample_file}")def _save_checkpoint(self, epoch):"""保存检查点"""checkpoint_path = f"models/checkpoint_epoch_{epoch}.pth"self.model.save_models(checkpoint_path)self.logger.info(f"Saved checkpoint to {checkpoint_path}")def _plot_training_history(self):"""绘制训练历史"""plt.figure(figsize=(12, 8))# 损失曲线plt.subplot(2, 2, 1)plt.plot(self.model.history['d_loss'], label='Discriminator Loss')plt.plot(self.model.history['g_loss'], label='Generator Loss')plt.title('Training Losses')plt.xlabel('Epoch')plt.ylabel('Loss')plt.legend()# 判别器输出plt.subplot(2, 2, 2)plt.plot(self.model.history['d_real'], label='D(x)')plt.plot(self.model.history['d_fake'], label='D(G(z))')plt.title('Discriminator Outputs')plt.xlabel('Epoch')plt.ylabel('Probability')plt.legend()# 损失比率plt.subplot(2, 2, 3)ratio = [d / g if g != 0 else 0 for d, g in zip(self.model.history['d_loss'], self.model.history['g_loss'])]plt.plot(ratio)plt.title('D Loss / G Loss Ratio')plt.xlabel('Epoch')plt.ylabel('Ratio')plt.tight_layout()plt.savefig('results/training_history.png')plt.close()
6.2 学习率调度和早停
# src/training/scheduler.py
import numpy as npclass LearningRateScheduler:def __init__(self, optimizer, mode='step', **kwargs):self.optimizer = optimizerself.mode = modeself.config = kwargsif mode == 'step':self.step_size = kwargs.get('step_size', 30)self.gamma = kwargs.get('gamma', 0.1)elif mode == 'plateau':self.patience = kwargs.get('patience', 10)self.factor = kwargs.get('factor', 0.5)self.min_lr = kwargs.get('min_lr', 1e-6)self.best_loss = np.infself.counter = 0self.initial_lr = self.optimizer.param_groups[0]['lr']def step(self, current_loss=None):"""更新学习率"""if self.mode == 'step':# 步长衰减if self.optimizer.param_groups[0]['lr'] > self.min_lr:self.optimizer.param_groups[0]['lr'] *= self.gammaelif self.mode == 'plateau' and current_loss is not None:# 基于验证损失的衰减if current_loss < self.best_loss:self.best_loss = current_lossself.counter = 0else:self.counter += 1if self.counter >= self.patience:new_lr = max(self.optimizer.param_groups[0]['lr'] * self.factor, self.min_lr)self.optimizer.param_groups[0]['lr'] = new_lrself.counter = 0class EarlyStopping:def __init__(self, patience=10, min_delta=0):self.patience = patienceself.min_delta = min_deltaself.counter = 0self.best_loss = np.infself.early_stop = Falsedef __call__(self, current_loss):if current_loss < self.best_loss - self.min_delta:self.best_loss = current_lossself.counter = 0else:self.counter += 1if self.counter >= self.patience:self.early_stop = Truereturn self.early_stop
7. 密码生成与评估模块
7.1 批量生成器
# src/generation/batch_generator.py
import torch
import numpy as np
from tqdm import tqdmclass BatchPasswordGenerator:def __init__(self, model, encoder, device='cpu'):self.model = modelself.encoder = encoderself.device = devicedef generate_batch(self, num_samples, batch_size=1000, temperature=1.0):"""批量生成密码"""all_passwords = []num_batches = (num_samples + batch_size - 1) // batch_sizefor i in tqdm(range(num_batches)):current_batch_size = min(batch_size, num_samples - i * batch_size)# 生成一批密码batch_passwords = self.model.sample(current_batch_size, temperature, self.device)all_passwords.extend(batch_passwords)return all_passwordsdef generate_with_constraints(self, num_samples, constraints, temperature=1.0):"""生成满足特定约束的密码"""valid_passwords = []attempts = 0max_attempts = num_samples * 10 # 最大尝试次数with tqdm(total=num_samples) as pbar:while len(valid_passwords) < num_samples and attempts < max_attempts:# 生成一批密码batch = self.model.sample(100, temperature, self.device)for pwd in batch:if self._check_constraints(pwd, constraints):valid_passwords.append(pwd)pbar.update(1)if len(valid_passwords) >= num_samples:breakattempts += 1return valid_passwordsdef _check_constraints(self, password, constraints):"""检查密码是否满足约束条件"""# 长度约束min_len = constraints.get('min_length', 0)max_len = constraints.get('max_length', float('inf'))if not (min_len <= len(password) <= max_len):return False# 字符类型约束if 'require_digit' in constraints and constraints['require_digit']:if not any(c.isdigit() for c in password):return Falseif 'require_upper' in constraints and constraints['require_upper']:if not any(c.isupper() for c in password):return Falseif 'require_lower' in constraints and constraints['require_lower']:if not any(c.islower() for c in password):return Falseif 'require_special' in constraints and constraints['require_special']:special_chars = "!@#$%^&*()_-+=[]{}|;:,.<>?/"if not any(c in special_chars for c in password):return Falsereturn True
7.2 评估指标
# src/evaluation/metrics.py
import numpy as np
from collections import Counter
import math
from sklearn.metrics import pairwise_distances
from scipy.spatial.distance import cosineclass PasswordMetrics:@staticmethoddef uniqueness(generated_passwords):"""计算生成密码的唯一性"""unique_count = len(set(generated_passwords))total_count = len(generated_passwords)return unique_count / total_count, unique_count@staticmethoddef novelty(generated_passwords, training_passwords):"""计算相对于训练集的新颖性"""training_set = set(training_passwords)novel_count = sum(1 for pwd in generated_passwords if pwd not in training_set)return novel_count / len(generated_passwords), novel_count@staticmethoddef entropy(password):"""计算密码的香农熵"""if not password:return 0freq = Counter(password)probs = [count / len(password) for count in freq.values()]return -sum(p * math.log2(p) for p in probs)@staticmethoddef average_entropy(passwords):"""计算平均熵"""entropies = [PasswordMetrics.entropy(pwd) for pwd in passwords]return np.mean(entropies) if entropies else 0@staticmethoddef length_distribution(passwords):"""计算长度分布"""lengths = [len(pwd) for pwd in passwords]return {'mean': np.mean(lengths),'std': np.std(lengths),'min': np.min(lengths),'max': np.max(lengths),'histogram': np.bincount(lengths)}@staticmethoddef character_distribution(passwords):"""计算字符分布"""all_chars = ''.join(passwords)char_count = Counter(all_chars)total_chars = len(all_chars)return {char: count / total_chars for char, count in char_count.items()}@staticmethoddef similarity_to_training(generated_passwords, training_passwords, sample_size=1000):"""计算与训练集的相似度"""if len(generated_passwords) > sample_size:gen_sample = np.random.choice(generated_passwords, sample_size, replace=False)else:gen_sample = generated_passwordsif len(training_passwords) > sample_size:train_sample = np.random.choice(training_passwords, sample_size, replace=False)else:train_sample = training_passwords# 创建字符频率向量def create_char_vector(passwords, charset):vector = np.zeros(len(charset))all_chars = ''.join(passwords)counter = Counter(all_chars)for char, count in counter.items():if char in charset:idx = charset.index(char)vector[idx] = countreturn vector / len(all_chars) if len(all_chars) > 0 else vector# 获取所有字符all_chars = sorted(set(''.join(gen_sample) + ''.join(train_sample)))gen_vector = create_char_vector(gen_sample, all_chars)train_vector = create_char_vector(train_sample, all_chars)# 计算余弦相似度similarity = 1 - cosine(gen_vector, train_vector)return similarity
7.3 综合评估器
# src/evaluation/evaluator.py
import json
import matplotlib.pyplot as plt
import seaborn as sns
from .metrics import PasswordMetricsclass PasswordEvaluator:def __init__(self, training_passwords):self.training_passwords = training_passwordsdef comprehensive_evaluation(self, generated_passwords, output_file=None):"""综合评估生成的密码"""results = {}# 唯一性uniqueness, unique_count = PasswordMetrics.uniqueness(generated_passwords)results['uniqueness'] = uniquenessresults['unique_count'] = unique_count# 新颖性novelty, novel_count = PasswordMetrics.novelty(generated_passwords, self.training_passwords)results['novelty'] = noveltyresults['novel_count'] = novel_count# 平均熵avg_entropy = PasswordMetrics.average_entropy(generated_passwords)results['average_entropy'] = avg_entropy# 长度分布length_stats = PasswordMetrics.length_distribution(generated_passwords)results['length_distribution'] = length_stats# 字符分布char_dist = PasswordMetrics.character_distribution(generated_passwords)results['character_distribution'] = char_dist# 与训练集的相似度similarity = PasswordMetrics.similarity_to_training(generated_passwords, self.training_passwords)results['similarity_to_training'] = similarity# 保存结果if output_file:with open(output_file, 'w') as f:json.dump(results, f, indent=2)# 生成可视化图表self._generate_visualizations(generated_passwords, output_file)return resultsdef _generate_visualizations(self, passwords, output_file):"""生成可视化图表"""base_name = output_file.replace('.json', '')# 长度分布直方图lengths = [len(pwd) for pwd in passwords]plt.figure(figsize=(10, 6))plt.hist(lengths, bins=range(min(lengths), max(lengths) + 1), alpha=0.7)plt.title('Password Length Distribution')plt.xlabel('Length')plt.ylabel('Frequency')plt.savefig(f'{base_name}_length_dist.png')plt.close()# 字符类型分布char_types = {'digit': 0, 'lower': 0, 'upper': 0, 'special': 0}special_chars = "!@#$%^&*()_-+=[]{}|;:,.<>?/"for pwd in passwords:for char in pwd:if char.isdigit():char_types['digit'] += 1elif char.islower():char_types['lower'] += 1elif char.isupper():char_types['upper'] += 1elif char in special_chars:char_types['special'] += 1total_chars = sum(char_types.values())if total_chars > 0:char_types = {k: v/total_chars for k, v in char_types.items()}plt.figure(figsize=(8, 6))plt.bar(char_types.keys(), char_types.values())plt.title('Character Type Distribution')plt.ylabel('Proportion')plt.savefig(f'{base_name}_char_type_dist.png')plt.close()# Top字符分布all_chars = ''.join(passwords)char_freq = Counter(all_chars)top_chars = dict(sorted(char_freq.items(), key=lambda x: x[1], reverse=True)[:20])plt.figure(figsize=(12, 6))plt.bar(top_chars.keys(), top_chars.values())plt.title('Top 20 Character Frequency')plt.xlabel('Character')plt.ylabel('Frequency')plt.savefig(f'{base_name}_top_chars.png')plt.close()
8. 系统集成与用户界面
8.1 命令行界面
# src/cli/main.py
import argparse
import yaml
import logging
from pathlib import Pathfrom preprocessing.data_loader import PasswordDataLoader
from preprocessing.encoder import PasswordEncoder
from preprocessing.dataset import create_data_loaders
from models.passgan import PassGAN
from training.trainer import PassGANTrainer
from generation.batch_generator import BatchPasswordGenerator
from evaluation.evaluator import PasswordEvaluatordef setup_logging():"""设置日志配置"""logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('passgan_system.log'),logging.StreamHandler()])def load_config(config_path):"""加载配置文件"""with open(config_path, 'r') as f:return yaml.safe_load(f)def main():"""主函数"""parser = argparse.ArgumentParser(description='PassGAN Password Generation System')subparsers = parser.add_subparsers(dest='command', help='Command to execute')# 训练命令train_parser = subparsers.add_parser('train', help='Train the PassGAN model')train_parser.add_argument('--config', type=str, required=True, help='Path to config file')train_parser.add_argument('--resume', type=str, help='Path to checkpoint to resume from')# 生成命令generate_parser = subparsers.add_parser('generate', help='Generate passwords')generate_parser.add_argument('--config', type=str, required=True, help='Path to config file')generate_parser.add_argument('--model', type=str, required=True, help='Path to trained model')generate_parser.add_argument('--num-samples', type=int, default=10000, help='Number of passwords to generate')generate_parser.add_argument('--output', type=str, required=True, help='Output file path')# 评估命令eval_parser = subparsers.add_parser('evaluate', help='Evaluate generated passwords')eval_parser.add_argument('--generated', type=str, required=True, help='Path to generated passwords')eval_parser.add_argument('--training', type=str, required=True, help='Path to training passwords')eval_parser.add_argument('--output', type=str, required=True, help='Output evaluation file')args = parser.parse_args()setup_logging()logger = logging.getLogger(__name__)if args.command == 'train':# 训练模式config = load_config(args.config)logger.info("Starting training process...")# 数据加载和预处理data_loader = PasswordDataLoader(config)raw_passwords = data_loader.load_raw_data(config['data']['input_path'])cleaned_passwords = data_loader.clean_data(raw_passwords)# 编码器encoder = PasswordEncoder(config)encoded_data = encoder.batch_encode(cleaned_passwords)# 创建数据加载器train_loader, _ = create_data_loaders(encoded_data, config)# 创建模型device = 'cuda' if torch.cuda.is_available() else 'cpu'model = PassGAN(config, encoder, device)# 恢复训练(如果指定)if args.resume:model.load_models(args.resume)logger.info(f"Resumed training from {args.resume}")# 训练trainer = PassGANTrainer(model, train_loader, config, device)trainer.train()logger.info("Training completed!")elif args.command == 'generate':# 生成模式config = load_config(args.config)logger.info("Starting password generation...")# 加载编码器encoder = PasswordEncoder(config)# 加载模型device = 'cuda' if torch.cuda.is_available() else 'cpu'model = PassGAN(config, encoder, device)model.load_models(args.model)# 生成密码generator = BatchPasswordGenerator(model, encoder, device)passwords = generator.generate_batch(args.num_samples)# 保存生成的密码with open(args.output, 'w') as f:for pwd in passwords:f.write(f"{pwd}\n")logger.info(f"Generated {len(passwords)} passwords to {args.output}")elif args.command == 'evaluate':# 评估模式logger.info("Starting evaluation...")# 加载生成的密码with open(args.generated, 'r') as f:generated_passwords = [line.strip() for line in f]# 加载训练密码with open(args.training, 'r') as f:training_passwords = [line.strip() for line in f]# 评估evaluator = PasswordEvaluator(training_passwords)results = evaluator.comprehensive_evaluation(generated_passwords, args.output)logger.info(f"Evaluation completed. Results saved to {args.output}")logger.info(f"Uniqueness: {results['uniqueness']:.4f}")logger.info(f"Novelty: {results['novelty']:.4f}")logger.info(f"Average Entropy: {results['average_entropy']:.4f}")else:parser.print_help()if __name__ == "__main__":main()
8.2 配置文件示例
# configs/train_config.yaml
data:input_path: "data/raw/rockyou.txt"output_path: "data/processed/encoded_passwords.npy"min_length: 4max_length: 16train_ratio: 0.8vocab_size: 256ascii_only: truemodel:latent_dim: 100gen_hidden_dim: 512disc_hidden_dim: 512seq_length: 16training:batch_size: 64epochs: 1000gen_learning_rate: 0.0002disc_learning_rate: 0.0002beta1: 0.5beta2: 0.999disc_iterations: 1sample_interval: 100generation:num_samples: 10000temperature: 0.8evaluation:test_size: 1000metrics: ["unique_ratio", "novelty", "similarity"]
9. 系统测试与验证
9.1 单元测试
# tests/test_preprocessing.py
import unittest
import numpy as np
from src.preprocessing.data_loader import PasswordDataLoader
from src.preprocessing.encoder import PasswordEncoderclass TestPreprocessing(unittest.TestCase):def setUp(self):self.config = {'data': {'min_length': 4,'max_length': 16,'vocab_size': 256},'model': {'seq_length': 16}}self.sample_passwords = ["password123","hello world","test@123","short","verylongpasswordthatexceedslimit"]def test_data_cleaning(self):loader = PasswordDataLoader(self.config)cleaned = loader.clean_data(self.sample_passwords)# 应该过滤掉过短和过长的密码self.assertEqual(len(cleaned), 3)self.assertNotIn("short", cleaned)self.assertNotIn("verylongpasswordthatexceedslimit", cleaned)def test_encoder_decoder(self):encoder = PasswordEncoder(self.config)# 测试编码解码test_password = "test@123"encoded = encoder.encode(test_password)decoded = encoder.decode(encoded)# 应该能够正确还原密码self.assertEqual(decoded, test_password)def test_batch_encoding(self):encoder = PasswordEncoder(self.config)encoded_batch = encoder.batch_encode(self.sample_passwords[:3])# 批处理应该返回正确形状的数组self.assertEqual(encoded_batch.shape, (3, self.config['model']['seq_length']))if __name__ == '__main__':unittest.main()
9.2 集成测试
# tests/test_integration.py
import unittest
import tempfile
import torch
from src.models.passgan import PassGAN
from src.preprocessing.encoder import PasswordEncoderclass TestIntegration(unittest.TestCase):def setUp(self):self.config = {'data': {'vocab_size': 256},'model': {'latent_dim': 10,'gen_hidden_dim': 32,'disc_hidden_dim': 32,'seq_length': 16},'training': {'gen_learning_rate': 0.0002,'disc_learning_rate': 0.0002,'beta1': 0.5,'beta2': 0.999}}self.encoder = PasswordEncoder(self.config)self.device = 'cuda' if torch.cuda.is_available() else 'cpu'def test_model_creation(self):"""测试模型创建和基本功能"""model = PassGAN(self.config, self.encoder, self.device)# 测试生成器前向传播batch_size = 5z = torch.randn(batch_size, self.config['model']['latent_dim']).to(self.device)samples, logits = model.generator(z)self.assertEqual(samples.shape, (batch_size, self.config['model']['seq_length']))self.assertEqual(logits.shape, (batch_size, self.config['model']['seq_length'], self.encoder.vocab_size + 4))# 测试判别器前向传播validity = model.discriminator(samples)self.assertEqual(validity.shape, (batch_size, 1))def test_password_generation(self):"""测试密码生成"""model = PassGAN(self.config, self.encoder, self.device)# 生成少量样本passwords = model.sample(10, temperature=0.8)self.assertEqual(len(passwords), 10)for pwd in passwords:self.assertIsInstance(pwd, str)self.assertTrue(0 < len(pwd) <= self.config['model']['seq_length'] - 2) # 减去开始和结束令牌def test_model_save_load(self):"""测试模型保存和加载"""with tempfile.NamedTemporaryFile(delete=False) as tmp:model_path = tmp.nametry:model = PassGAN(self.config, self.encoder, self.device)# 保存模型model.save_models(model_path)# 创建新模型并加载new_model = PassGAN(self.config, self.encoder, self.device)new_model.load_models(model_path)# 测试加载的模型是否能正常工作passwords = new_model.sample(5)self.assertEqual(len(passwords), 5)finally:import osif os.path.exists(model_path):os.unlink(model_path)if __name__ == '__main__':unittest.main()
10. 性能优化与部署
10.1 性能优化策略
# src/utils/optimization.py
import torch
import time
from contextlib import contextmanager@contextmanager
def torch_timing(description):"""PyTorch操作计时上下文管理器"""if torch.cuda.is_available():start = torch.cuda.Event(enable_timing=True)end = torch.cuda.Event(enable_timing=True)start.record()else:start = time.time()yieldif torch.cuda.is_available():end.record()torch.cuda.synchronize()elapsed = start.elapsed_time(end) / 1000 # 转换为秒else:elapsed = time.time() - startprint(f"{description}: {elapsed:.4f} seconds")def optimize_model_performance(model, config):"""优化模型性能"""# 混合精度训练if config.get('use_amp', False) and torch.cuda.is_available():from torch.cuda.amp import autocast, GradScalermodel.scaler = GradScaler()model.use_amp = Trueelse:model.use_amp = False# 数据并行(多GPU)if torch.cuda.device_count() > 1 and config.get('data_parallel', False):model.generator = torch.nn.DataParallel(model.generator)model.discriminator = torch.nn.DataParallel(model.discriminator)return modeldef memory_optimization_hooks():"""内存优化钩子"""# 在前向传播后清空中间变量def clear_memory_hook(module, input, output):if hasattr(module, 'intermediate_values'):del module.intermediate_valuesreturn clear_memory_hook
10.2 模型量化与剪枝
# src/utils/quantization.py
import torch
import torch.nn.utils.prune as prunedef quantize_model(model, quantization_bits=8):"""模型量化"""if quantization_bits == 8:# 使用PyTorch的量化功能model = torch.quantization.quantize_dynamic(model, {torch.nn.Linear, torch.nn.LSTM}, dtype=torch.qint8)elif quantization_bits == 16 and torch.cuda.is_available():# 使用半精度浮点数model = model.half()return modeldef prune_model(model, pruning_amount=0.2):"""模型剪枝"""parameters_to_prune = []# 收集所有可剪枝的参数for name, module in model.named_modules():if isinstance(module, (torch.nn.Linear, torch.nn.Conv1d)):parameters_to_prune.append((module, 'weight'))# 全局剪枝prune.global_unstructured(parameters_to_prune,pruning_method=prune.L1Unstructured,amount=pruning_amount,)# 永久移除剪枝掩码for module, param_name in parameters_to_prune:prune.remove(module, param_name)return modeldef optimize_for_inference(model, example_input):"""为推理优化模型"""# 模型剪枝model = prune_model(model, pruning_amount=0.1)# 模型量化model = quantize_model(model, quantization_bits=8)# 使用TorchScript编译model_scripted = torch.jit.trace(model, example_input)return model_scripted
11. 安全性与伦理考虑
11.1 安全使用指南
# docs/security_guidelines.md
# PassGAN系统安全使用指南## 1. 合法使用
- 本系统仅用于安全研究和教育目的
- 在使用前确保获得所有必要的授权和许可
- 禁止用于任何非法或恶意活动## 2. 数据保护
- 处理真实密码数据时采取适当的安全措施
- 训练完成后及时删除或安全存储敏感数据
- 使用加密存储和传输数据## 3. 模型安全
- 定期更新模型以防止过时
- 使用数字签名验证模型完整性
- 限制对训练好的模型的访问权限## 4. 输出管理
- 生成的密码样本应妥善保管
- 避免在公共环境中泄露生成的密码
- 定期清理生成的临时文件## 5. 合规性
- 遵守当地法律法规
- 遵循行业最佳实践和标准
- 进行定期的安全审计# 示例安全配置
security_config = {'data_encryption': True,'model_signature_verification': True,'access_control': {'require_authentication': True,'role_based_access': True},'audit_logging': True,'automatic_data_purging': True
}
11.2 伦理考虑
# docs/ethics_considerations.md
# PassGAN系统伦理考虑## 1. 隐私保护
- 匿名化处理所有训练数据
- 最小化数据收集原则
- 提供数据删除机制## 2. 偏见与公平性
- 检测和缓解模型中的偏见
- 确保生成的密码不包含敏感信息
- 定期进行公平性评估## 3. 透明度
- 明确说明系统能力和局限性
- 提供可解释的评估结果
- 公开使用的算法和方法## 4. 责任与问责
- 明确系统使用责任方
- 建立问题报告和响应机制
- 保持开发过程的透明度## 5. 社会影响
- 评估系统对社会的潜在影响
- 积极参与行业伦理讨论
- 遵循负责任创新原则
12. 总结与未来展望
本项目实现了一个完整的基于PassGAN的密码训练系统,涵盖了从数据预处理到模型训练、密码生成和评估的完整流程。系统采用模块化设计,具有良好的可扩展性和可维护性。
12.1 技术总结
- 数据预处理:实现了高效的数据清洗和编码机制,支持大规模密码数据集的处理
- 模型架构:基于LSTM和CNN设计了生成器和判别器网络,能够有效学习密码分布特征
- 训练优化:实现了稳定的GAN训练流程,包含多种优化策略和监控机制
- 评估体系:建立了全面的密码质量评估指标体系,包括唯一性、新颖性、熵值等多个维度
- 系统集成:提供了完整的命令行界面和配置系统,便于实际部署和使用
12.2 未来改进方向
-
模型架构改进:
- 探索Transformer等新型网络结构
- 引入注意力机制提高生成长序列的能力
- 尝试条件生成模型以支持特定约束的密码生成
-
训练效率提升:
- 实现分布式训练支持
- 优化内存使用以处理更大规模数据集
- 探索更稳定的GAN训练技术
-
功能扩展:
- 添加实时密码强度评估功能
- 支持多模态密码生成(如图形密码)
- 开发Web界面和API服务
-
安全性增强:
- 实现差分隐私训练
- 添加模型水印和溯源功能
- 强化系统的访问控制和审计功能
12.3 应用前景
本系统在以下领域具有广阔的应用前景:
- 网络安全评估:帮助组织评估其密码策略的有效性
- 密码学研究:为密码强度分析和破解抵抗性研究提供工具
- 用户教育:生成示例密码帮助用户理解强密码的特征
- 系统开发:集成到身份验证系统中进行实时密码强度检查
通过持续的技术改进和负责任的使用,基于PassGAN的密码训练系统将成为网络安全领域的重要工具,为提升数字身份安全做出贡献。