2.9 超参数自动调优(Optuna / Hyperopt)
超参数自动调优(Optuna / Hyperopt)
引言
在机器学习项目中,超参数调优是提升模型性能的关键环节。传统的手动调优方法耗时耗力且难以找到最优配置。本章将深入探讨自动超参数优化技术,重点介绍 Optuna 和 Hyperopt 这两个强大的调优框架,帮助你系统化、自动化地寻找最佳超参数组合。
通过本章学习,你将能够:
- 掌握 Optuna 和 Hyperopt 的核心概念和工作原理
- 设计高效的超参数搜索空间
- 实现并行化超参数搜索
- 分析和可视化调优结果
- 构建完整的自动调优流水线
Optuna 基础与核心概念
Optuna 框架详解
import optuna
from optuna import Trial
from optuna.samplers import TPESampler, RandomSampler, CmaEsSampler
from optuna.pruners import HyperbandPruner, MedianPruner, SuccessiveHalvingPruner
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score, f1_score
import torch
import torch.nn as nn
import warnings
warnings.filterwarnings('ignore')class OptunaFundamentals:"""Optuna 基础概念和核心组件"""def __init__(self):self.study = Noneself.best_params = Noneself.best_value = Nonedef demonstrate_study_creation(self):"""演示 Optuna Study 的创建和配置"""print("Optuna Study 创建演示")print("=" * 50)# 创建研究(Study)study = optuna.create_study(direction='maximize', # 最大化目标函数sampler=TPESampler(), # 使用 TPE 采样器pruner=MedianPruner() # 使用中位数剪枝器)print(f"Study 名称: {study.study_name}")print(f"优化方向: {study.direction}")print(f"采样器: {type(study.sampler).__name__}")print(f"剪枝器: {type(study.pruner).__name__}")self.study = studyreturn studydef objective_function(self, trial: Trial) -> float:"""示例目标函数"""# 定义超参数搜索空间x = trial.suggest_float('x', -10, 10)y = trial.suggest_float('y', -10, 10)# 简单的目标函数(示例)result = (x - 2) ** 2 + (y + 3) ** 2 + np.sin(x * 3) + np.cos(y * 2)# 模拟计算时间import timetime.sleep(0.01)return -result # 负号因为我们要最大化(但实际是最小化这个函数)def run_optimization(self, n_trials: int = 100):"""运行优化过程"""if self.study is None:self.demonstrate_study_creation()print(f"\n开始优化,试验次数: {n_trials}")print("-" * 40)# 运行优化self.study.optimize(self.objective_function, n_trials=n_trials)# 获取最佳结果self.best_params = self.study.best_paramsself.best_value = self.study.best_valueprint(f"最佳参数: {self.best_params}")print(f"最佳值: {self.best_value:.4f}")print(f"完成的试验数: {len(self.study.trials)}")return self.studydef analyze_optimization_process(self):"""分析优化过程"""if self.study is None:print("请先运行优化")returntrials = self.study.trialsprint("\n优化过程分析")print("=" * 40)# 基本统计completed_trials = [t for t in trials if t.state == optuna.trial.TrialState.COMPLETE]pruned_trials = [t for t in trials if t.state == optuna.trial.TrialState.PRUNED]print(f"完成试验: {len(completed_trials)}")print(f"剪枝试验: {len(pruned_trials)}")print(f"总试验: {len(trials)}")# 性能进展values = [t.value for t in completed_trials]print(f"最佳值进展: {min(values):.4f} -> {max(values):.4f}")return {'completed_trials': completed_trials,'pruned_trials': pruned_trials,'all_trials': trials}# Optuna 基础演示
def demonstrate_optuna_basics():"""演示 Optuna 基础功能"""print("Optuna 基础功能演示")print("=" * 50)optuna_fundamentals = OptunaFundamentals()# 创建 studystudy = optuna_fundamentals.demonstrate_study_creation()# 运行优化study = optuna_fundamentals.run_optimization(n_trials=50)# 分析过程analysis = optuna_fundamentals.analyze_optimization_process()return optuna_fundamentals# 运行基础演示
optuna_basics = demonstrate_optuna_basics()
超参数搜索空间设计
class HyperparameterSpaceDesign:"""超参数搜索空间设计"""@staticmethoddef suggest_mlp_parameters(trial: Trial) -> Dict[str, any]:"""MLP 网络超参数搜索空间"""params = {}# 网络结构参数params['hidden_size'] = trial.suggest_int('hidden_size', 32, 512)params['num_layers'] = trial.suggest_int('num_layers', 1, 5)params['dropout_rate'] = trial.suggest_float('dropout_rate', 0.0, 0.5)# 激活函数选择params['activation'] = trial.suggest_categorical('activation', ['relu', 'tanh', 'leaky_relu'])# 优化器参数params['learning_rate'] = trial.suggest_float('learning_rate', 1e-5, 1e-1, log=True)params['weight_decay'] = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)# 批次大小(分类建议)params['batch_size'] = trial.suggest_categorical('batch_size', [16, 32, 64, 128])return params@staticmethoddef suggest_cnn_parameters(trial: Trial) -> Dict[str, any]:"""CNN 网络超参数搜索空间"""params = {}# 卷积层参数params['conv_filters'] = []num_conv_layers = trial.suggest_int('num_conv_layers', 1, 4)for i in range(num_conv_layers):filters = trial.suggest_int(f'conv_filters_{i}', 16, 128)params['conv_filters'].append(filters)# 全连接层参数params['dense_units'] = trial.suggest_int('dense_units', 32, 256)# 正则化参数params['dropout_rate'] = trial.suggest_float('dropout_rate', 0.0, 0.5)params['l2_regularization'] = trial.suggest_float('l2_regularization', 1e-6, 1e-2, log=True)# 优化器参数params['learning_rate'] = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)return params@staticmethoddef suggest_transformer_parameters(trial: Trial) -> Dict[str, any]:"""Transformer 超参数搜索空间"""params = {}# 模型结构参数params['hidden_size'] = trial.suggest_categorical('hidden_size', [128, 256, 512, 768])params['num_attention_heads'] = trial.suggest_int('num_attention_heads', 4, 16)params['num_hidden_layers'] = trial.suggest_int('num_hidden_layers', 2, 8)params['intermediate_size'] = trial.suggest_int('intermediate_size', 512, 2048)# 训练参数params['learning_rate'] = trial.suggest_float('learning_rate', 1e-6, 1e-3, log=True)params['warmup_ratio'] = trial.suggest_float('warmup_ratio', 0.0, 0.2)params['weight_decay'] = trial.suggest_float('weight_decay', 0.0, 0.1)# 正则化params['attention_dropout'] = trial.suggest_float('attention_dropout', 0.0, 0.2)params['hidden_dropout'] = trial.suggest_float('hidden_dropout', 0.0, 0.2)return params@staticmethoddef suggest_sklearn_parameters(trial: Trial, model_type: str) -> Dict[str, any]:"""Scikit-learn 模型超参数搜索空间"""params = {}if model_type == 'random_forest':params['n_estimators'] = trial.suggest_int('n_estimators', 50, 500)params['max_depth'] = trial.suggest_int('max_depth', 3, 20)params['min_samples_split'] = trial.suggest_int('min_samples_split', 2, 20)params['min_samples_leaf'] = trial.suggest_int('min_samples_leaf', 1, 10)params['max_features'] = trial.suggest_categorical('max_features', ['sqrt', 'log2', None])elif model_type == 'svm':params['C'] = trial.suggest_float('C', 0.1, 10.0, log=True)params['kernel'] = trial.suggest_categorical('kernel', ['linear', 'rbf', 'poly'])if params['kernel'] == 'rbf':params['gamma'] = trial.suggest_float('gamma', 1e-4, 1.0, log=True)elif params['kernel'] == 'poly':params['degree'] = trial.suggest_int('degree', 2, 5)elif model_type == 'xgboost':params['max_depth'] = trial.suggest_int('max_depth', 3, 15)params['learning_rate'] = trial.suggest_float('learning_rate', 0.01, 0.3, log=True)params['subsample'] = trial.suggest_float('subsample', 0.5, 1.0)params['colsample_bytree'] = trial.suggest_float('colsample_bytree', 0.5, 1.0)params['reg_alpha'] = trial.suggest_float('reg_alpha', 0, 1.0)params['reg_lambda'] = trial.suggest_float('reg_lambda', 0, 1.0)return paramsclass AdvancedSearchSpace:"""高级搜索空间设计技术"""@staticmethoddef conditional_search_space(trial: Trial) -> Dict[str, any]:"""条件搜索空间示例"""params = {}# 选择模型类型model_type = trial.suggest_categorical('model_type', ['mlp', 'cnn', 'transformer'])if model_type == 'mlp':params.update(HyperparameterSpaceDesign.suggest_mlp_parameters(trial))elif model_type == 'cnn':params.update(HyperparameterSpaceDesign.suggest_cnn_parameters(trial))else: # transformerparams.update(HyperparameterSpaceDesign.suggest_transformer_parameters(trial))# 共同参数params['batch_size'] = trial.suggest_categorical('batch_size', [16, 32, 64, 128])params['optimizer'] = trial.suggest_categorical('optimizer', ['adam', 'adamw', 'sgd'])# 条件学习率调度器use_scheduler = trial.suggest_categorical('use_scheduler', [True, False])if use_scheduler:params['scheduler_type'] = trial.suggest_categorical('scheduler_type', ['cosine', 'linear', 'step'])return params@staticmethoddef hierarchical_search_space(trial: Trial) -> Dict[str, any]:"""分层搜索空间示例"""params = {}# 第一层:选择架构architecture = trial.suggest_categorical('architecture', ['simple', 'complex'])if architecture == 'simple':params['hidden_layers'] = trial.suggest_int('hidden_layers', 1, 2)params['hidden_units'] = trial.suggest_int('hidden_units', 64, 256)else: # complexparams['hidden_layers'] = trial.suggest_int('hidden_layers', 3, 5)params['hidden_units'] = trial.suggest_int('hidden_units', 128, 512)# 第二层:复杂架构的额外参数params['use_residual'] = trial.suggest_categorical('use_residual', [True, False])params['use_batch_norm'] = trial.suggest_categorical('use_batch_norm', [True, False])# 共同参数params['learning_rate'] = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)params['dropout'] = trial.suggest_float('dropout', 0.0, 0.5)return params# 搜索空间设计演示
def demonstrate_search_space_design():"""演示搜索空间设计"""print("超参数搜索空间设计演示")print("=" * 50)# 创建测试 trialstudy = optuna.create_study(direction='maximize')trial = study.ask()print("1. MLP 搜索空间示例:")mlp_params = HyperparameterSpaceDesign.suggest_mlp_parameters(trial)for key, value in mlp_params.items():print(f" {key}: {value}")print("\n2. 条件搜索空间示例:")conditional_params = AdvancedSearchSpace.conditional_search_space(trial)print(f" 选择的模型类型: {conditional_params.get('model_type', 'N/A')}")print(f" 参数量: {len(conditional_params)}")print("\n3. 分层搜索空间示例:")hierarchical_params = AdvancedSearchSpace.hierarchical_search_space(trial)architecture = hierarchical_params.get('architecture', 'N/A')print(f" 架构类型: {architecture}")print(f" 隐藏层数: {hierarchical_params.get('hidden_layers', 'N/A')}")# 可视化搜索空间def visualize_parameter_distribution():"""可视化参数分布(模拟)"""fig, axes = plt.subplots(2, 2, figsize=(12, 8))# 学习率分布(对数尺度)learning_rates = np.logspace(-5, -1, 100)axes[0, 0].plot(learning_rates, np.ones_like(learning_rates), 'b-', alpha=0.7)axes[0, 0].set_xscale('log')axes[0, 0].set_title('学习率搜索空间(对数尺度)')axes[0, 0].set_xlabel('Learning Rate')axes[0, 0].set_ylabel('密度')# 隐藏层大小分布hidden_sizes = np.arange(32, 513, 16)axes[0, 1].bar(hidden_sizes, np.ones_like(hidden_sizes), width=15, alpha=0.7)axes[0, 1].set_title('隐藏层大小搜索空间')axes[0, 1].set_xlabel('Hidden Size')axes[0, 1].set_ylabel('密度')# 分类参数分布activations = ['relu', 'tanh', 'leaky_relu']activation_counts = [1, 1, 1] # 均匀分布axes[1, 0].bar(activations, activation_counts, alpha=0.7, color=['red', 'green', 'blue'])axes[1, 0].set_title('激活函数搜索空间')axes[1, 0].set_ylabel('密度')# Dropout 率分布dropout_rates = np.linspace(0.0, 0.5, 20)axes[1, 1].plot(dropout_rates, np.ones_like(dropout_rates), 'purple', alpha=0.7)axes[1, 1].set_title('Dropout 率搜索空间')axes[1, 1].set_xlabel('Dropout Rate')axes[1, 1].set_ylabel('密度')plt.tight_layout()plt.show()visualize_parameter_distribution()return {'mlp_params': mlp_params,'conditional_params': conditional_params,'hierarchical_params': hierarchical_params}# 运行搜索空间设计演示
search_space_demo = demonstrate_search_space_design()
Optuna 实战应用
机器学习模型调优
class SklearnOptunaOptimizer:"""Scikit-learn 模型 Optuna 优化器"""def __init__(self, X, y, model_type='random_forest', cv_folds=5):self.X = Xself.y = yself.model_type = model_typeself.cv_folds = cv_foldsself.study = Noneself.best_model = Nonedef objective(self, trial: Trial) -> float:"""Optuna 目标函数"""# 获取超参数params = HyperparameterSpaceDesign.suggest_sklearn_parameters(trial, self.model_type)# 创建模型if self.model_type == 'random_forest':model = RandomForestClassifier(**params, random_state=42)elif self.model_type == 'svm':model = SVC(**params, random_state=42)else:raise ValueError(f"不支持的模型类型: {self.model_type}")# 交叉验证评估scores = cross_val_score(model, self.X, self.y, cv=self.cv_folds, scoring='accuracy', n_jobs=-1)return scores.mean()def optimize(self, n_trials: int = 100, timeout: int = None):"""运行优化"""print(f"开始优化 {self.model_type} 模型")print("=" * 40)self.study = optuna.create_study(direction='maximize',sampler=TPESampler(seed=42),pruner=HyperbandPruner())self.study.optimize(self.objective, n_trials=n_trials, timeout=timeout)# 训练最佳模型best_params = self.study.best_paramsif self.model_type == 'random_forest':self.best_model = RandomForestClassifier(**best_params, random_state=42)elif self.model_type == 'svm':self.best_model = SVC(**best_params, random_state=42)self.best_model.fit(self.X, self.y)print(f"优化完成!")print(f"最佳准确率: {self.study.best_value:.4f}")print(f"最佳参数: {self.study.best_params}")return self.studydef plot_optimization_history(self):"""绘制优化历史"""if self.study is None:print("请先运行优化")returnfig, axes = plt.subplots(2, 2, figsize=(15, 10))# 优化历史optuna.visualization.plot_optimization_history(self.study, ax=axes[0, 0])axes[0, 0].set_title('优化历史')# 参数重要性try:optuna.visualization.plot_param_importances(self.study, ax=axes[0, 1])axes[0, 1].set_title('参数重要性')except:axes[0, 1].text(0.5, 0.5, '参数重要性\n不可用', ha='center', va='center')axes[0, 1].set_title('参数重要性')# 平行坐标图try:optuna.visualization.plot_parallel_coordinate(self.study, ax=axes[1, 0])axes[1, 0].set_title('平行坐标图')except:axes[1, 0].text(0.5, 0.5, '平行坐标图\n不可用', ha='center', va='center')axes[1, 0].set_title('平行坐标图')# 切片图try:optuna.visualization.plot_slice(self.study, ax=axes[1, 1])axes[1, 1].set_title('参数切片图')except:axes[1, 1].text(0.5, 0.5, '切片图\n不可用', ha='center', va='center')axes[1, 1].set_title('参数切片图')plt.tight_layout()plt.show()class PyTorchOptunaOptimizer:"""PyTorch 模型 Optuna 优化器"""def __init__(self, train_loader, val_loader, input_size, output_size):self.train_loader = train_loaderself.val_loader = val_loaderself.input_size = input_sizeself.output_size = output_sizeself.study = Noneself.best_model = Nonedef create_model(self, params: Dict[str, any]) -> nn.Module:"""根据参数创建模型"""layers = []input_dim = self.input_size# 添加隐藏层for i in range(params['num_layers']):layers.append(nn.Linear(input_dim, params['hidden_size']))# 激活函数if params['activation'] == 'relu':layers.append(nn.ReLU())elif params['activation'] == 'tanh':layers.append(nn.Tanh())elif params['activation'] == 'leaky_relu':layers.append(nn.LeakyReLU(0.1))# Dropoutif params['dropout_rate'] > 0:layers.append(nn.Dropout(params['dropout_rate']))input_dim = params['hidden_size']# 输出层layers.append(nn.Linear(input_dim, self.output_size))return nn.Sequential(*layers)def objective(self, trial: Trial) -> float:"""Optuna 目标函数"""device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')# 获取超参数params = HyperparameterSpaceDesign.suggest_mlp_parameters(trial)# 创建模型model = self.create_model(params).to(device)# 优化器if params.get('optimizer', 'adam') == 'adam':optimizer = torch.optim.Adam(model.parameters(), lr=params['learning_rate'],weight_decay=params['weight_decay'])else:optimizer = torch.optim.SGD(model.parameters(), lr=params['learning_rate'],weight_decay=params['weight_decay'])criterion = nn.CrossEntropyLoss()# 训练模型model.train()for epoch in range(10): # 快速训练for batch_idx, (data, target) in enumerate(self.train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = criterion(output, target)loss.backward()optimizer.step()# 验证模型model.eval()correct = 0total = 0with torch.no_grad():for data, target in self.val_loader:data, target = data.to(device), target.to(device)output = model(data)pred = output.argmax(dim=1)correct += (pred == target).sum().item()total += target.size(0)accuracy = correct / totalreturn accuracydef optimize(self, n_trials: int = 50):"""运行优化"""print("开始优化 PyTorch 模型")print("=" * 40)self.study = optuna.create_study(direction='maximize',sampler=TPESampler(seed=42))self.study.optimize(self.objective, n_trials=n_trials)print(f"优化完成!")print(f"最佳准确率: {self.study.best_value:.4f}")# 创建最佳模型best_params = self.study.best_paramsself.best_model = self.create_model(best_params)return self.study# Optuna 实战演示
def demonstrate_optuna_practical():"""演示 Optuna 实战应用"""print("Optuna 实战应用演示")print("=" * 50)# 创建示例数据X, y = make_classification(n_samples=1000, n_features=20, n_informative=15, n_redundant=5, n_classes=3, random_state=42)X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)print("1. Scikit-learn 模型优化")print("-" * 30)# 随机森林优化rf_optimizer = SklearnOptunaOptimizer(X_train, y_train, 'random_forest')rf_study = rf_optimizer.optimize(n_trials=30)# 评估最佳模型rf_accuracy = accuracy_score(y_test, rf_optimizer.best_model.predict(X_test))print(f"测试集准确率: {rf_accuracy:.4f}")# 绘制优化结果rf_optimizer.plot_optimization_history()print("\n2. PyTorch 模型优化")print("-" * 30)# 创建 PyTorch 数据加载器class SimpleDataset(torch.utils.data.Dataset):def __init__(self, X, y):self.X = torch.FloatTensor(X)self.y = torch.LongTensor(y)def __len__(self):return len(self.X)def __getitem__(self, idx):return self.X[idx], self.y[idx]train_dataset = SimpleDataset(X_train, y_train)val_dataset = SimpleDataset(X_test, y_test)train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=False)# PyTorch 模型优化pytorch_optimizer = PyTorchOptunaOptimizer(train_loader, val_loader, input_size=20, output_size=3)pytorch_study = pytorch_optimizer.optimize(n_trials=20)return {'rf_optimizer': rf_optimizer,'pytorch_optimizer': pytorch_optimizer,'rf_study': rf_study,'pytorch_study': pytorch_study}# 运行实战演示
practical_demo = demonstrate_optuna_practical()
高级特性和技巧
class AdvancedOptunaFeatures:"""Optuna 高级特性和技巧"""@staticmethoddef multi_objective_optimization():"""多目标优化演示"""print("多目标优化演示")print("=" * 40)def multi_objective_function(trial):x = trial.suggest_float('x', -10, 10)y = trial.suggest_float('y', -10, 10)# 两个目标函数f1 = (x - 2) ** 2 + (y - 3) ** 2 # 最小化到点(2,3)的距离f2 = (x + 1) ** 2 + (y + 1) ** 2 # 最小化到点(-1,-1)的距离return f1, f2# 创建多目标研究study = optuna.create_study(directions=['minimize', 'minimize'],sampler=TPESampler(seed=42))study.optimize(multi_objective_function, n_trials=50)print(f"帕累托前沿解数量: {len(study.best_trials)}")# 可视化帕累托前沿try:fig = optuna.visualization.plot_pareto_front(study)fig.show()except:print("无法显示帕累托前沿图")return study@staticmethoddef pruning_strategies_demo():"""剪枝策略演示"""print("\n剪枝策略演示")print("=" * 40)def objective_with_pruning(trial):x = trial.suggest_float('x', -10, 10)y = trial.suggest_float('y', -10, 10)# 中间结果报告(用于剪枝)for step in range(10):intermediate_value = (x - 2) ** 2 + (y - 3) ** 2 + step * 0.1trial.report(intermediate_value, step)# 如果应该剪枝,抛出 TrialPruned 异常if trial.should_prune():raise optuna.TrialPruned()return intermediate_value# 使用不同的剪枝器pruners = {'MedianPruner': MedianPruner(),'HyperbandPruner': HyperbandPruner(),'SuccessiveHalvingPruner': SuccessiveHalvingPruner()}results = {}for pruner_name, pruner in pruners.items():print(f"\n使用 {pruner_name}:")study = optuna.create_study(direction='minimize',pruner=pruner)study.optimize(objective_with_pruning, n_trials=20)completed = len([t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE])pruned = len([t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED])results[pruner_name] = {'best_value': study.best_value,'completed_trials': completed,'pruned_trials': pruned}print(f" 最佳值: {study.best_value:.4f}")print(f" 完成试验: {completed}, 剪枝试验: {pruned}")return results@staticmethoddef custom_sampler_demo():"""自定义采样器演示"""print("\n自定义采样器演示")print("=" * 40)class CustomSampler(optuna.samplers.BaseSampler):def __init__(self):self.rng = np.random.RandomState(42)def sample_relative(self, study, trial, search_space):return {}def sample_independent(self, study, trial, param_name, param_distribution):# 简单的均匀采样if isinstance(param_distribution, optuna.distributions.FloatDistribution):return self.rng.uniform(param_distribution.low, param_distribution.high)elif isinstance(param_distribution, optuna.distributions.IntDistribution):return self.rng.randint(param_distribution.low, param_distribution.high + 1)else:return self.rng.choice(param_distribution.choices)def objective(trial):x = trial.suggest_float('x', -10, 10)y = trial.suggest_int('y', 0, 10)z = trial.suggest_categorical('z', ['A', 'B', 'C'])return (x - 2) ** 2 + (y - 5) ** 2# 使用自定义采样器study = optuna.create_study(sampler=CustomSampler())study.optimize(objective, n_trials=10)print(f"使用自定义采样器的最佳值: {study.best_value:.4f}")return study@staticmethoddef parallel_optimization_demo():"""并行优化演示"""print("\n并行优化演示")print("=" * 40)# 注意:实际并行优化需要分布式环境# 这里演示并行优化的概念def objective(trial):x = trial.suggest_float('x', -10, 10)y = trial.suggest_float('y', -10, 10)# 模拟耗时计算import timetime.sleep(0.1)return (x - 2) ** 2 + (y - 3) ** 2# 串行执行(基准)start_time = time.time()study_serial = optuna.create_study()study_serial.optimize(objective, n_trials=10)serial_time = time.time() - start_timeprint(f"串行执行时间: {serial_time:.2f}秒")print(f"串行最佳值: {study_serial.best_value:.4f}")# 模拟并行执行(在实际环境中使用 n_jobs 参数)print("\n在实际环境中使用:")print("study.optimize(objective, n_trials=100, n_jobs=4)")print("这将使用4个进程并行执行试验")return study_serial# 高级特性演示
def demonstrate_advanced_features():"""演示 Optuna 高级特性"""print("Optuna 高级特性演示")print("=" * 50)advanced_features = AdvancedOptunaFeatures()# 多目标优化multi_obj_study = advanced_features.multi_objective_optimization()# 剪枝策略pruning_results = advanced_features.pruning_strategies_demo()# 自定义采样器custom_sampler_study = advanced_features.custom_sampler_demo()# 并行优化parallel_demo = advanced_features.parallel_optimization_demo()return {'multi_objective': multi_obj_study,'pruning_results': pruning_results,'custom_sampler': custom_sampler_study,'parallel_demo': parallel_demo}# 运行高级特性演示
advanced_features_demo = demonstrate_advanced_features()
Hyperopt 框架详解
Hyperopt 基础与应用
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK, STATUS_FAIL
from hyperopt.pyll.base import scope
from hyperopt.early_stop import no_progress_lossclass HyperoptFundamentals:"""Hyperopt 基础概念和核心组件"""def __init__(self):self.trials = Trials()self.best_result = Nonedef demonstrate_search_space(self):"""演示 Hyperopt 搜索空间定义"""print("Hyperopt 搜索空间定义")print("=" * 50)# 定义搜索空间space = {'learning_rate': hp.loguniform('learning_rate', np.log(1e-5), np.log(1e-1)),'hidden_size': scope.int(hp.quniform('hidden_size', 32, 512, 16)),'num_layers': scope.int(hp.quniform('num_layers', 1, 5, 1)),'dropout_rate': hp.uniform('dropout_rate', 0.0, 0.5),'activation': hp.choice('activation', ['relu', 'tanh', 'sigmoid']),'optimizer': hp.choice('optimizer', ['adam', 'sgd', 'rmsprop']),'batch_size': hp.choice('batch_size', [16, 32, 64, 128])}print("定义的搜索空间:")for key, value in space.items():print(f" {key}: {value}")return spacedef objective_function(self, params):"""Hyperopt 目标函数"""try:# 模拟模型训练和评估x = params.get('x', 0) if 'x' in params else 0y = params.get('y', 0) if 'y' in params else 0# 简单的目标函数loss = (x - 2) ** 2 + (y - 3) ** 2# 模拟计算时间import timetime.sleep(0.01)return {'loss': loss, 'status': STATUS_OK, 'params': params}except Exception as e:return {'loss': float('inf'), 'status': STATUS_FAIL, 'error': str(e)}def run_optimization(self, max_evals: int = 100):"""运行 Hyperopt 优化"""print(f"\n开始 Hyperopt 优化,最大评估次数: {max_evals}")print("-" * 50)# 定义搜索空间space = {'x': hp.uniform('x', -10, 10),'y': hp.uniform('y', -10, 10)}# 运行优化best = fmin(fn=self.objective_function,space=space,algo=tpe.suggest,max_evals=max_evals,trials=self.trials,rstate=np.random.RandomState(42),early_stop_fn=no_progress_loss(50) # 50次迭代无改进则停止)self.best_result = bestprint(f"优化完成!")print(f"最佳参数: {best}")# 找到最佳损失值best_trial = min(self.trials.trials, key=lambda x: x['result']['loss'])best_loss = best_trial['result']['loss']print(f"最佳损失: {best_loss:.4f}")return bestdef analyze_trials(self):"""分析试验结果"""if not self.trials.trials:print("没有试验数据")returntrials = self.trials.trialsprint("\n试验分析")print("=" * 40)# 基本统计successful_trials = [t for t in trials if t['result']['status'] == STATUS_OK]failed_trials = [t for t in trials if t['result']['status'] == STATUS_FAIL]print(f"成功试验: {len(successful_trials)}")print(f"失败试验: {len(failed_trials)}")print(f"总试验: {len(trials)}")# 损失统计losses = [t['result']['loss'] for t in successful_trials]if losses:print(f"最小损失: {min(losses):.4f}")print(f"最大损失: {max(losses):.4f}")print(f"平均损失: {np.mean(losses):.4f}")return {'successful_trials': successful_trials,'failed_trials': failed_trials,'losses': losses}class HyperoptAdvanced:"""Hyperopt 高级功能"""@staticmethoddef conditional_space():"""条件搜索空间示例"""print("Hyperopt 条件搜索空间")print("=" * 40)space = hp.choice('model_type', [{'type': 'mlp','hidden_size': scope.int(hp.quniform('mlp_hidden', 64, 512, 32)),'num_layers': scope.int(hp.quniform('mlp_layers', 1, 5, 1)),'learning_rate': hp.loguniform('mlp_lr', np.log(1e-5), np.log(1e-2))},{'type': 'cnn','filters': scope.int(hp.quniform('cnn_filters', 16, 128, 16)),'kernel_size': scope.int(hp.quniform('cnn_kernel', 3, 7, 2)),'learning_rate': hp.loguniform('cnn_lr', np.log(1e-5), np.log(1e-2))}])return space@staticmethoddef sklearn_optimization(X, y, max_evals: int = 50):"""Scikit-learn 模型 Hyperopt 优化"""from sklearn.ensemble import RandomForestClassifierfrom sklearn.model_selection import cross_val_scoreprint("Scikit-learn 模型 Hyperopt 优化")print("=" * 50)# 定义搜索空间space = {'n_estimators': scope.int(hp.quniform('n_estimators', 50, 500, 50)),'max_depth': scope.int(hp.quniform('max_depth', 3, 20, 1)),'min_samples_split': scope.int(hp.quniform('min_samples_split', 2, 20, 1)),'min_samples_leaf': scope.int(hp.quniform('min_samples_leaf', 1, 10, 1)),'max_features': hp.choice('max_features', ['sqrt', 'log2', None])}def objective(params):try:# 创建模型model = RandomForestClassifier(n_estimators=params['n_estimators'],max_depth=params['max_depth'],min_samples_split=params['min_samples_split'],min_samples_leaf=params['min_samples_leaf'],max_features=params['max_features'],random_state=42,n_jobs=-1)# 交叉验证scores = cross_val_score(model, X, y, cv=5, scoring='accuracy')accuracy = scores.mean()return {'loss': -accuracy, 'status': STATUS_OK, 'accuracy': accuracy}except Exception as e:return {'loss': 0, 'status': STATUS_FAIL, 'error': str(e)}# 运行优化trials = Trials()best = fmin(fn=objective,space=space,algo=tpe.suggest,max_evals=max_evals,trials=trials,rstate=np.random.RandomState(42))print(f"优化完成!")print(f"最佳参数: {best}")# 找到最佳准确率best_trial = min(trials.trials, key=lambda x: x['result']['loss'])best_accuracy = -best_trial['result']['loss']print(f"最佳准确率: {best_accuracy:.4f}")return best, trials# Hyperopt 演示
def demonstrate_hyperopt():"""演示 Hyperopt 框架"""print("Hyperopt 框架演示")print("=" * 50)hyperopt_basics = HyperoptFundamentals()# 搜索空间演示space = hyperopt_basics.demonstrate_search_space()# 运行优化best_result = hyperopt_basics.run_optimization(max_evals=30)# 分析结果analysis = hyperopt_basics.analyze_trials()# 高级功能演示print("\nHyperopt 高级功能")print("-" * 30)conditional_space = HyperoptAdvanced.conditional_space()print("条件搜索空间定义完成")# Scikit-learn 优化演示X, y = make_classification(n_samples=500, n_features=20, n_classes=2, random_state=42)best_params, trials = HyperoptAdvanced.sklearn_optimization(X, y, max_evals=20)# 可视化结果def plot_hyperopt_results(trials):"""绘制 Hyperopt 结果"""losses = [t['result']['loss'] for t in trials.trials if t['result']['status'] == STATUS_OK]iterations = range(1, len(losses) + 1)plt.figure(figsize=(10, 6))plt.plot(iterations, losses, 'b-', alpha=0.7, linewidth=2)plt.xlabel('迭代次数')plt.ylabel('损失')plt.title('Hyperopt 优化过程')plt.grid(True, alpha=0.3)# 标记最佳点best_idx = np.argmin(losses)plt.scatter(iterations[best_idx], losses[best_idx], color='red', s=100, zorder=5)plt.annotate(f'最佳\n{losses[best_idx]:.4f}', (iterations[best_idx], losses[best_idx]),xytext=(10, 10), textcoords='offset points')plt.tight_layout()plt.show()plot_hyperopt_results(trials)return {'hyperopt_basics': hyperopt_basics,'best_result': best_result,'sklearn_optimization': (best_params, trials)}# 运行 Hyperopt 演示
hyperopt_demo = demonstrate_hyperopt()
框架对比与选择指南
Optuna vs Hyperopt 对比
class FrameworkComparison:"""Optuna 和 Hyperopt 框架对比"""@staticmethoddef feature_comparison():"""功能特性对比"""comparison_data = {'特性': ['搜索空间定义', '采样算法', '剪枝功能', '并行优化', '可视化', '易用性', '社区支持'],'Optuna': ['丰富灵活', 'TPE, Random, CMA-ES', '强大', '优秀', '优秀', '优秀', '活跃'],'Hyperopt': ['灵活', 'TPE, Random', '有限', '良好', '有限', '良好', '稳定']}df_comparison = pd.DataFrame(comparison_data)print("Optuna vs Hyperopt 功能对比")print("=" * 60)print(df_comparison.to_string(index=False))return df_comparison@staticmethoddef performance_benchmark():"""性能基准测试"""print("\n性能基准测试")print("=" * 40)def objective(params):x = params.get('x', 0)y = params.get('y', 0)return (x - 2) ** 2 + (y - 3) ** 2# Optuna 性能def optuna_objective(trial):x = trial.suggest_float('x', -10, 10)y = trial.suggest_float('y', -10, 10)return objective({'x': x, 'y': y})# Hyperopt 性能hyperopt_space = {'x': hp.uniform('x', -10, 10),'y': hp.uniform('y', -10, 10)}def hyperopt_objective(params):return objective(params)# 运行基准测试n_trials = 50# Optunastart_time = time.time()optuna_study = optuna.create_study(direction='minimize')optuna_study.optimize(optuna_objective, n_trials=n_trials)optuna_time = time.time() - start_time# Hyperoptstart_time = time.time()hyperopt_trials = Trials()fmin(hyperopt_objective, hyperopt_space, algo=tpe.suggest, max_evals=n_trials, trials=hyperopt_trials, rstate=np.random.RandomState(42), show_progressbar=False)hyperopt_time = time.time() - start_timeprint(f"Optuna 执行时间: {optuna_time:.2f}秒")print(f"Hyperopt 执行时间: {hyperopt_time:.2f}秒")print(f"Optuna 最佳值: {optuna_study.best_value:.4f}")hyperopt_best = min(hyperopt_trials.trials, key=lambda x: x['result']['loss'])print(f"Hyperopt 最佳值: {hyperopt_best['result']['loss']:.4f}")return {'optuna': {'time': optuna_time, 'best_value': optuna_study.best_value},'hyperopt': {'time': hyperopt_time, 'best_value': hyperopt_best['result']['loss']}}@staticmethoddef use_case_recommendations():"""使用场景推荐"""recommendations = {'研究项目': {'推荐框架': 'Optuna','理由': '强大的可视化、灵活的搜索空间、活跃的社区支持','关键特性': ['参数重要性分析', '并行优化', '多种剪枝策略']},'生产环境': {'推荐框架': 'Hyperopt','理由': '稳定可靠、轻量级、易于集成','关键特性': ['简单的API', '可靠的TPE算法', '良好的兼容性']},'复杂搜索空间': {'推荐框架': 'Optuna','理由': '支持条件搜索空间、分层参数、多目标优化','关键特性': ['条件参数', '多目标优化', '自定义采样器']},'快速原型': {'推荐框架': 'Hyperopt','理由': '上手快速、配置简单、文档完善','关键特性': ['简洁的API', '快速的TPE算法', '易于调试']}}print("\n使用场景推荐")print("=" * 40)for scenario, info in recommendations.items():print(f"\n{scenario}:")print(f" 推荐框架: {info['推荐框架']}")print(f" 理由: {info['理由']}")print(f" 关键特性: {', '.join(info['关键特性'])}")return recommendations# 框架对比演示
def demonstrate_framework_comparison():"""演示框架对比"""print("Optuna vs Hyperopt 框架对比")print("=" * 50)comparison = FrameworkComparison()# 功能对比feature_df = comparison.feature_comparison()# 性能基准测试performance_results = comparison.performance_benchmark()# 使用场景推荐recommendations = comparison.use_case_recommendations()# 可视化对比def plot_comparison_chart(performance_results):"""绘制对比图表"""frameworks = list(performance_results.keys())times = [performance_results[f]['time'] for f in frameworks]values = [performance_results[f]['best_value'] for f in frameworks]fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))# 执行时间对比bars1 = ax1.bar(frameworks, times, color=['skyblue', 'lightcoral'], alpha=0.7)ax1.set_title('执行时间对比')ax1.set_ylabel('时间 (秒)')for bar, time_val in zip(bars1, times):ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1,f'{time_val:.2f}s', ha='center', va='bottom')# 最佳值对比bars2 = ax2.bar(frameworks, values, color=['skyblue', 'lightcoral'], alpha=0.7)ax2.set_title('最佳值对比')ax2.set_ylabel('损失值')for bar, value in zip(bars2, values):ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,f'{value:.4f}', ha='center', va='bottom')plt.tight_layout()plt.show()plot_comparison_chart(performance_results)return {'feature_comparison': feature_df,'performance_results': performance_results,'recommendations': recommendations}# 运行框架对比演示
framework_comparison = demonstrate_framework_comparison()
最佳实践与调优策略
自动调优最佳实践
class AutoTuningBestPractices:"""自动调优最佳实践"""@staticmethoddef get_best_practices():"""获取最佳实践指南"""practices = {"搜索空间设计": ["根据先验知识设置合理的参数范围","使用对数尺度搜索学习率等参数","考虑参数之间的依赖关系","使用条件搜索空间处理复杂场景"],"优化策略": ["从小规模试验开始,逐步扩大","使用剪枝提前终止没有希望的试验","考虑多目标优化平衡不同指标","使用并行化加速优化过程"],"资源管理": ["设置合理的超时和最大试验次数","监控优化过程,及时调整策略","使用检查点保存和恢复优化状态","考虑计算成本和收益的平衡"],"结果分析": ["分析参数重要性,理解模型行为","可视化优化过程,识别模式","验证最佳参数在独立测试集上的表现","记录完整的优化过程和结果"]}return practices@staticmethoddef create_optimization_pipeline(model_type: str, dataset_size: str) -> Dict[str, any]:"""创建优化流水线配置"""configurations = {"小型数据集": {"n_trials": 50,"timeout": 3600, # 1小时"cv_folds": 5,"early_stopping_rounds": 10},"中型数据集": {"n_trials": 100,"timeout": 7200, # 2小时"cv_folds": 3,"early_stopping_rounds": 5},"大型数据集": {"n_trials": 50, # 更少的试验,每个试验更长时间"timeout": 14400, # 4小时"cv_folds": 2,"early_stopping_rounds": 3}}model_specific = {"神经网络": {"framework": "Optuna","sampler": "TPESampler","pruner": "HyperbandPruner","key_parameters": ["learning_rate", "batch_size", "hidden_size"]},"树模型": {"framework": "Hyperopt","sampler": "tpe","pruner": "none","key_parameters": ["n_estimators", "max_depth", "learning_rate"]},"传统机器学习": {"framework": "Optuna","sampler": "TPESampler", "pruner": "MedianPruner","key_parameters": ["C", "kernel", "alpha"]}}config = configurations.get(dataset_size, configurations["中型数据集"])model_config = model_specific.get(model_type, model_specific["传统机器学习"])pipeline = {"model_type": model_type,"dataset_size": dataset_size,"optimization_config": config,"framework_config": model_config,"description": f"{model_type}在{dataset_size}上的优化配置"}return pipeline@staticmethoddef common_pitfalls_and_solutions():"""常见陷阱和解决方案"""pitfalls = {"搜索空间过大": {"问题": "参数范围太广,难以找到最优解","解决方案": "基于领域知识缩小范围,使用对数尺度"},"忽略参数依赖": {"问题": "参数之间相互影响未被考虑","解决方案": "使用条件搜索空间,考虑参数交互"},"过早收敛": {"问题": "优化过早收敛到局部最优","解决方案": "使用不同的采样器,增加探索性试验"},"计算资源不足": {"问题": "优化过程耗时过长","解决方案": "使用剪枝、并行化、设置超时限制"},"过拟合验证集": {"问题": "在验证集上过拟合","解决方案": "使用交叉验证,在独立测试集上验证"}}return pitfallsclass OptimizationMonitoring:"""优化过程监控"""def __init__(self):self.metrics_history = []def log_optimization_step(self, trial_number: int, params: Dict, score: float, duration: float):"""记录优化步骤"""self.metrics_history.append({'trial_number': trial_number,'params': params,'score': score,'duration': duration,'timestamp': time.time()})def plot_optimization_progress(self):"""绘制优化进度图"""if not self.metrics_history:print("没有优化历史数据")returntrials = [m['trial_number'] for m in self.metrics_history]scores = [m['score'] for m in self.metrics_history]durations = [m['duration'] for m in self.metrics_history]fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))# 分数进展ax1.plot(trials, scores, 'b-', alpha=0.7, linewidth=2)ax1.set_xlabel('试验次数')ax1.set_ylabel('分数')ax1.set_title('优化分数进展')ax1.grid(True, alpha=0.3)# 标记最佳点best_idx = np.argmax(scores)ax1.scatter(trials[best_idx], scores[best_idx], color='red', s=100, zorder=5)ax1.annotate(f'最佳\n{scores[best_idx]:.4f}', (trials[best_idx], scores[best_idx]),xytext=(10, 10), textcoords='offset points')# 持续时间分布ax2.hist(durations, bins=20, alpha=0.7, color='green')ax2.set_xlabel('持续时间 (秒)')ax2.set_ylabel('频次')ax2.set_title('试验持续时间分布')ax2.grid(True, alpha=0.3)plt.tight_layout()plt.show()def generate_optimization_report(self) -> Dict[str, any]:"""生成优化报告"""if not self.metrics_history:return {"error": "没有优化数据"}best_trial = max(self.metrics_history, key=lambda x: x['score'])total_duration = sum(m['duration'] for m in self.metrics_history)report = {'total_trials': len(self.metrics_history),'best_score': best_trial['score'],'best_params': best_trial['params'],'total_duration': total_duration,'average_duration_per_trial': total_duration / len(self.metrics_history),'optimization_efficiency': best_trial['score'] / total_duration if total_duration > 0 else 0}return report# 最佳实践总结
def summarize_best_practices():"""总结自动调优最佳实践"""print("自动超参数调优最佳实践总结")print("=" * 60)best_practices = AutoTuningBestPractices()# 显示最佳实践practices = best_practices.get_best_practices()print("最佳实践指南:")for category, items in practices.items():print(f"\n{category}:")for item in items:print(f" • {item}")# 显示优化流水线配置print("\n优化流水线配置示例:")scenarios = [("神经网络", "中型数据集"),("树模型", "大型数据集"), ("传统机器学习", "小型数据集")]for model_type, dataset_size in scenarios:pipeline = best_practices.create_optimization_pipeline(model_type, dataset_size)print(f"\n{pipeline['description']}:")print(f" 试验次数: {pipeline['optimization_config']['n_trials']}")print(f" 超时时间: {pipeline['optimization_config']['timeout']}秒")print(f" 推荐框架: {pipeline['framework_config']['framework']}")# 显示常见陷阱print("\n常见陷阱和解决方案:")pitfalls = best_practices.common_pitfalls_and_solutions()for pitfall, info in pitfalls.items():print(f"\n{pitfall}:")print(f" 问题: {info['问题']}")print(f" 解决方案: {info['解决方案']}")# 创建监控示例print("\n优化过程监控示例:")monitor = OptimizationMonitoring()# 模拟一些监控数据for i in range(10):monitor.log_optimization_step(trial_number=i,params={'lr': 0.001, 'hidden_size': 128},score=0.8 + i * 0.02,duration=10 + i * 2)# 绘制监控图表monitor.plot_optimization_progress()# 生成报告report = monitor.generate_optimization_report()print(f"\n优化报告示例:")for key, value in report.items():if key != 'best_params':print(f" {key}: {value}")# 运行最佳实践总结
best_practices_summary = summarize_best_practices()# 最终总结
def final_auto_tuning_summary():"""最终自动调优总结"""print("\n" + "=" * 70)print("超参数自动调优 - 关键要点总结")print("=" * 70)key_insights = ["1. Optuna 提供丰富的功能和优秀的可视化,适合研究和复杂场景","2. Hyperopt 稳定轻量,适合生产环境和快速原型", "3. 合理的搜索空间设计是成功调优的关键","4. 剪枝策略可以大幅提高优化效率","5. 多目标优化帮助平衡不同的性能指标","6. 并行化可以显著加速优化过程","7. 监控和分析优化过程有助于理解模型行为","8. 基于领域知识的参数范围设置比盲目搜索更有效","9. 考虑使用集成方法组合多个优化结果","10. 始终在独立测试集上验证最佳参数"]print("关键要点:")for insight in key_insights:print(f" {insight}")print(f"\n通过本章学习,你已经掌握了:")skills = ["Optuna 和 Hyperopt 的核心概念和使用方法","各种类型模型的超参数搜索空间设计", "高级优化技术和策略","优化过程的监控和分析","框架选择和实践指南"]for i, skill in enumerate(skills, 1):print(f" {i}. {skill}")# 运行最终总结
final_auto_tuning_summary()
总结
通过本章的深入学习,你已经掌握了超参数自动调优的完整知识体系:
核心技术
- Optuna 框架:强大的优化框架,支持多种采样器和剪枝策略
- Hyperopt 框架:稳定可靠的优化库,基于贝叶斯优化
- 搜索空间设计:灵活的参数空间定义,支持条件参数和分层结构
- 优化策略:剪枝、并行化、多目标优化等高级技术
实践能力
- 框架使用:熟练使用 Optuna 和 Hyperopt 进行模型调优
- 空间设计:为不同模型类型设计合适的搜索空间
- 性能优化:实施高效的优化策略和监控机制
- 结果分析:理解和解释优化结果,指导模型改进
业务价值
- 效率提升:自动化调优大幅减少人工调参时间
- 性能优化:系统化搜索找到更优的超参数组合
- 资源优化:智能剪枝和并行化提高计算资源利用率
- 决策支持:基于数据的参数选择提高模型可靠性
这些技能将帮助你在实际项目中高效地进行模型调优,提升模型性能并加速AI项目的开发周期。
