强化学习 PPO
4. PPO
近端策略优化 (PPO) 的思想是,我们希望通过限制在每个训练周期对策略所做的更改来提高策略的训练稳定性:我们希望避免策略更新幅度过大。
原因有二
1)经验表明,训练期间较小的策略更新更可能收敛到最优解。
2)策略更新中过大的步幅可能导致“跌落悬崖”(获得糟糕的策略),并花费很长时间甚至没有恢复的可能性。
因此,使用 PPO,我们保守地更新策略。为此,我们需要通过计算当前策略与先前策略之间的比率来衡量当前策略相对于先前策略的变化程度。并且我们将此比率裁剪到一个范围[1−ϵ,1+ϵ],这意味着我们消除了当前策略偏离旧策略太远的动机(因此称为近端策略)。
交叉熵,KL, 熵的概念:https://blog.csdn.net/m0_62881487/article/details/133279415
4.1 回顾策略梯度的目标函数
A是优势函数, π是策略函数
4.2 PPO cliped surrogate目标函数 和 ratio计算
目的就是使策略更新更稳定。
相当于把之前的策略函数 部分 变成了ratio部分,且对ratio加了限制。
之前是某个动作的回报高,使策略对应的动作的概率高
现在是某个动作的回报高,使策略相比之前的策略,产生该动作的概率高。
通过剪裁比率,我们确保我们不会有太大的策略更新,因为当前策略不能与旧策略相差太远。
epsilon 是一个超参数,可以帮助我们定义这个剪裁范围(在论文中ϵ=0.2.).
然后,我们取剪裁目标和未剪裁目标的最小值,因此最终目标是未剪裁目标的下界(悲观界)。
取剪裁目标和未剪裁目标的最小值意味着我们将根据比率和优势情况选择剪裁目标或未剪裁目标。
4.3 Generalized Advantage Estimation (GAE) 详解 广泛优势估计
在强化学习中,优势函数(A(s,a)=Q(s,a)-V(s))用于衡量在状态s下采取动作a相对于平均值的优势。然而,准确估计优势函数具有挑战性。传统方法包括:-蒙特卡洛(MC)估计:使用从当前时刻到episode结束的累积奖励作为回报估计,无偏但高方差。-时序差分(TD)估计:使用当前奖励加上下一个状态的估计值,有偏但低方差。为了平衡偏差和方差,通常使用n步优势估计,即结合n步的实际奖励和剩余步骤的估计值。但n的选择是一个难题,不同的n会导致不同的偏差-方差权衡。GAE的提出就是为了解决这个问题,它通过一个参数λ来平滑地结合不同n步的估计值,从而在偏差和方差之间取得平衡。
以下是关于Generalized Advantage Estimation (GAE) 的详细解析,结合数学原理、使用动机和实现细节:
1. GAE的核心原理
GAE是一种平衡偏差与方差的优势函数估计方法,通过引入参数λ在单步TD误差和多步蒙特卡洛回报之间进行折中。
数学定义:
A t G A E = ∑ l = 0 ∞ ( γ λ ) l δ t + l A_t^{GAE} = \sum_{l=0}^{\infty} (\gamma \lambda)^l \delta_{t+l} AtGAE=l=0∑∞(γλ)lδt+l
其中:
- δ t = r t + γ V ( s t + 1 ) − V ( s t ) \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) δt=rt+γV(st+1)−V(st)是TD误差
- γ \gamma γ 是折扣因子
- λ \lambda λ 是GAE参数(0 ≤ λ ≤ 1)
递推计算形式:
A t G A E = δ t + γ λ A t + 1 G A E A_t^{GAE} = \delta_t + \gamma \lambda A_{t+1}^{GAE} AtGAE=δt+γλAt+1GAE
2. 使用动机
(1) 解决偏差-方差权衡问题
方法 | 偏差 | 方差 | 特点 |
---|---|---|---|
单步TD (λ=0) | 高 | 低 | 依赖局部价值估计 |
蒙特卡洛 (λ=1) | 低 | 高 | 依赖完整轨迹 |
GAE (0<λ<1) | 中 | 中 | 平滑折中 |
直观理解:
- λ→0:更信任Critic的估计(高偏差,低方差)
- λ→1:更信任实际回报(低偏差,高方差)
(2) 减少策略梯度的高方差
策略梯度公式:
∇ θ J ( θ ) = E [ ∇ θ log π θ ( a ∣ s ) A ( s , a ) ] \nabla_\theta J(\theta) = \mathbb{E} \left[ \nabla_\theta \log \pi_\theta(a|s) A(s,a) \right] ∇θJ(θ)=E[∇θlogπθ(a∣s)A(s,a)]
GAE通过多步混合估计提供更稳定的优势值,降低梯度更新的方差。
3. 算法实现步骤
(1) 计算TD误差序列
对于轨迹中的每个时间步t:
δ t = r t + γ V ( s t + 1 ) − V ( s t ) \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) δt=rt+γV(st+1)−V(st)
(2) 反向计算GAE
def compute_gae(rewards, values, dones, gamma=0.99, lambda_=0.95):"""参数:rewards: 奖励序列 (T,)values: Critic输出的价值序列 (T+1,)dones: 终止标志 (T,)gamma: 折扣因子lambda_: GAE参数返回:advantages: 优势函数序列 (T,)"""advantages = np.zeros_like(rewards)gae = 0for t in reversed(range(len(rewards))):delta = rewards[t] + gamma * values[t+1] * (1-dones[t]) - values[t]gae = delta + gamma * lambda_ * (1-dones[t]) * gaeadvantages[t] = gaereturn advantages
形状说明:
values[t]
对应状态 s t s_t st 的价值估计 V ( s t ) V(s_t) V(st)- 需要额外计算 V ( s T + 1 ) V(s_{T+1}) V(sT+1) 作为轨迹终止状态的价值
4. GAE的数学展开
将GAE展开为无限级数:
A t G A E = δ t + γ λ δ t + 1 + ( γ λ ) 2 δ t + 2 + ⋯ = ∑ l = 0 ∞ ( γ λ ) l ( r t + l + γ V ( s t + l + 1 ) − V ( s t + l ) ) \begin{align*} A_t^{GAE} &= \delta_t + \gamma \lambda \delta_{t+1} + (\gamma \lambda)^2 \delta_{t+2} + \cdots \\ &= \sum_{l=0}^{\infty} (\gamma \lambda)^l (r_{t+l} + \gamma V(s_{t+l+1}) - V(s_{t+l})) \end{align*} AtGAE=δt+γλδt+1+(γλ)2δt+2+⋯=l=0∑∞(γλ)l(rt+l+γV(st+l+1)−V(st+l))
特殊情况:
- 当λ=0时:( A t G A E = δ t A_t^{GAE} = \delta_t AtGAE=δt )(单步TD误差)
- 当λ=1时:( A t G A E = ∑ l = 0 ∞ γ l r t + l − V ( s t ) A_t^{GAE} = \sum_{l=0}^{\infty} \gamma^l r_{t+l} - V(s_t) AtGAE=∑l=0∞γlrt+l−V(st) )(蒙特卡洛回报)
5. 与其他方法的对比
方法 | 优势函数计算 | 特点 |
---|---|---|
TD(0) | r t + γ V ( s t + 1 ) − V ( s t ) r_t + \gamma V(s_{t+1}) - V(s_t) rt+γV(st+1)−V(st) | 高偏差,低方差 |
MC | ∑ l = 0 T γ l r t + l − V ( s t ) \sum_{l=0}^T \gamma^l r_{t+l} - V(s_t) ∑l=0Tγlrt+l−V(st) | 低偏差,高方差 |
GAE | ∑ l = 0 T ( γ λ ) l δ t + l \sum_{l=0}^T (\gamma \lambda)^l \delta_{t+l} ∑l=0T(γλ)lδt+l | 可调平衡(通过λ) |
6. 实际应用建议
-
参数选择:
- 通常设定 λ ∈ [0.9, 0.99]
- 对于高噪声环境(如稀疏奖励),使用更大的λ
- 对于确定性环境,可减小λ
-
与PPO的配合:
# PPO中使用GAE的典型流程 values = critic(states) # (T+1,1) advantages = compute_gae(rewards, values, dones, gamma, lambda_) returns = advantages + values[:-1] # (T,1)
-
数值稳定性:
- 标准化优势函数:
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
- 标准化优势函数:
7. 理论意义
GAE实际上是TD(λ)在优势函数估计上的推广,其核心贡献在于:
- 提供了一种连续调节偏差-方差的机制
- 保持了时序差分学习的在线更新特性
- 兼容各种策略梯度算法(PPO, A2C等)
通过合理选择λ,GAE能在大多数RL任务中实现比纯TD或MC更稳定的学习性能。
代码分析
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torch.distributions import Categorical
from torch.utils.tensorboard import SummaryWriter
import gym
from collections import deque
import matplotlib.pyplot as pltclass PPONetwork(nn.Module):"""PPO网络架构,包含Actor和Critic两个头"""def __init__(self, state_dim, action_dim, hidden_dim=64):"""初始化PPO网络Args:state_dim: 状态空间维度action_dim: 动作空间维度 hidden_dim: 隐藏层维度"""super(PPONetwork, self).__init__()# 共享特征提取层# input shape: (batch_size, state_dim)self.shared_layers = nn.Sequential(nn.Linear(state_dim, hidden_dim), # (batch_size, state_dim) -> (batch_size, hidden_dim)nn.Tanh(),nn.Linear(hidden_dim, hidden_dim), # (batch_size, hidden_dim) -> (batch_size, hidden_dim)nn.Tanh())# Actor头:输出动作概率分布# output shape: (batch_size, action_dim)self.actor_head = nn.Linear(hidden_dim, action_dim)# Critic头:输出状态价值# output shape: (batch_size, 1)self.critic_head = nn.Linear(hidden_dim, 1)# 初始化权重self._init_weights()def _init_weights(self):"""初始化网络权重"""for m in self.modules():if isinstance(m, nn.Linear):nn.init.orthogonal_(m.weight, gain=np.sqrt(2))nn.init.constant_(m.bias, 0)def forward(self, state):"""前向传播Args:state: 状态张量,shape: (batch_size, state_dim)Returns:action_logits: 动作logits,shape: (batch_size, action_dim)state_value: 状态价值,shape: (batch_size, 1)"""# 特征提取features = self.shared_layers(state) # shape: (batch_size, hidden_dim)# Actor输出:动作logitsaction_logits = self.actor_head(features) # shape: (batch_size, action_dim)# Critic输出:状态价值state_value = self.critic_head(features) # shape: (batch_size, 1)return action_logits, state_valuedef get_action_and_value(self, state, action=None):"""获取动作和价值(用于训练和推理)Args:state: 状态张量,shape: (batch_size, state_dim)action: 动作张量,shape: (batch_size,),如果为None则采样新动作Returns:action: 动作,shape: (batch_size,)log_prob: 动作对数概率,shape: (batch_size,)entropy: 策略熵,shape: (batch_size,)value: 状态价值,shape: (batch_size,)"""action_logits, value = self.forward(state)# 创建分布probs = Categorical(logits=action_logits) # 分布对象if action is None:# 采样新动作action = probs.sample() # shape: (batch_size,)# 计算对数概率和熵log_prob = probs.log_prob(action) # shape: (batch_size,)entropy = probs.entropy() # shape: (batch_size,)return action, log_prob, entropy, value.squeeze(-1) # value shape: (batch_size,)class PPOAgent:"""PPO智能体实现"""def __init__(self, state_dim, action_dim, lr=3e-4, gamma=0.99, gae_lambda=0.95, clip_epsilon=0.2, value_coef=0.5, entropy_coef=0.01, max_grad_norm=0.5):"""初始化PPO智能体Args:state_dim: 状态空间维度action_dim: 动作空间维度lr: 学习率gamma: 折扣因子gae_lambda: GAE参数clip_epsilon: PPO裁剪参数value_coef: 价值损失系数entropy_coef: 熵损失系数max_grad_norm: 梯度裁剪阈值"""self.gamma = gammaself.gae_lambda = gae_lambdaself.clip_epsilon = clip_epsilonself.value_coef = value_coefself.entropy_coef = entropy_coefself.max_grad_norm = max_grad_norm# 创建网络和优化器self.network = PPONetwork(state_dim, action_dim)self.optimizer = optim.Adam(self.network.parameters(), lr=lr)# 存储轨迹数据self.reset_storage()# TensorBoard记录器self.writer = SummaryWriter('runs/ppo_training')self.global_step = 0def reset_storage(self):"""重置存储缓冲区"""self.states = [] # 状态序列,每个元素shape: (state_dim,)self.actions = [] # 动作序列,每个元素shape: ()self.rewards = [] # 奖励序列,每个元素shape: ()self.log_probs = [] # 对数概率序列,每个元素shape: ()self.values = [] # 状态价值序列,每个元素shape: ()self.dones = [] # 终止标志序列,每个元素shape: ()def select_action(self, state):"""选择动作(推理模式)Args:state: 当前状态,shape: (state_dim,)Returns:action: 选择的动作,标量log_prob: 动作对数概率,标量value: 状态价值,标量"""with torch.no_grad():# 转换为张量并添加batch维度state_tensor = torch.FloatTensor(state).unsqueeze(0) # shape: (1, state_dim)# 获取动作和价值action, log_prob, _, value = self.network.get_action_and_value(state_tensor)return action.item(), log_prob.item(), value.item()def store_transition(self, state, action, reward, log_prob, value, done):"""存储一步转移Args:state: 状态,shape: (state_dim,)action: 动作,标量reward: 奖励,标量log_prob: 对数概率,标量value: 状态价值,标量done: 是否终止,布尔值"""self.states.append(state)self.actions.append(action)self.rewards.append(reward)self.log_probs.append(log_prob)self.values.append(value)self.dones.append(done)def compute_gae(self, next_value=0):"""计算广义优势估计(GAE)Args:next_value: 下一个状态的价值Returns:advantages: 优势序列,shape: (trajectory_length,)returns: 回报序列,shape: (trajectory_length,)"""trajectory_length = len(self.rewards)# 转换为numpy数组便于计算rewards = np.array(self.rewards) # shape: (trajectory_length,)values = np.array(self.values) # shape: (trajectory_length,)dones = np.array(self.dones) # shape: (trajectory_length,)# 计算优势和回报advantages = np.zeros(trajectory_length) # shape: (trajectory_length,)returns = np.zeros(trajectory_length) # shape: (trajectory_length,)# 从后往前计算GAEgae = 0for t in reversed(range(trajectory_length)):if t == trajectory_length - 1:# 最后一步next_non_terminal = 1.0 - dones[t]next_value_t = next_valueelse:# 中间步骤next_non_terminal = 1.0 - dones[t]next_value_t = values[t + 1]# TD误差:δ_t = r_t + γ * V(s_{t+1}) * (1-done) - V(s_t)delta = rewards[t] + self.gamma * next_value_t * next_non_terminal - values[t]# GAE:A_t = δ_t + γ * λ * (1-done) * A_{t+1}gae = delta + self.gamma * self.gae_lambda * next_non_terminal * gaeadvantages[t] = gae# 计算回报:R_t = A_t + V(s_t)returns = advantages + valuesreturn advantages, returnsdef update(self, next_value=0, update_epochs=4, batch_size=64):"""更新网络参数Args:next_value: 下一个状态的价值update_epochs: 更新轮数batch_size: 批次大小Returns:loss_info: 损失信息字典"""if len(self.rewards) == 0:return {}# 计算优势和回报advantages, returns = self.compute_gae(next_value)# 转换为张量states = torch.FloatTensor(np.array(self.states)) # shape: (trajectory_length, state_dim)actions = torch.LongTensor(self.actions) # shape: (trajectory_length,)old_log_probs = torch.FloatTensor(self.log_probs) # shape: (trajectory_length,)advantages = torch.FloatTensor(advantages) # shape: (trajectory_length,)returns = torch.FloatTensor(returns) # shape: (trajectory_length,)# 标准化优势advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)# 存储损失信息total_loss_list = []actor_loss_list = []critic_loss_list = []entropy_loss_list = []# 多轮更新for epoch in range(update_epochs):# 随机打乱数据indices = torch.randperm(len(states))# 分批更新for start in range(0, len(states), batch_size):end = start + batch_sizebatch_indices = indices[start:end]# 获取批次数据batch_states = states[batch_indices] # shape: (batch_size, state_dim)batch_actions = actions[batch_indices] # shape: (batch_size,)batch_old_log_probs = old_log_probs[batch_indices] # shape: (batch_size,)batch_advantages = advantages[batch_indices] # shape: (batch_size,)batch_returns = returns[batch_indices] # shape: (batch_size,)# 前向传播_, new_log_probs, entropy, new_values = self.network.get_action_and_value(batch_states, batch_actions)# new_log_probs shape: (batch_size,)# entropy shape: (batch_size,)# new_values shape: (batch_size,)# 计算重要性采样比率ratio = torch.exp(new_log_probs - batch_old_log_probs) # shape: (batch_size,)# 计算PPO损失# 未裁剪的目标surr1 = ratio * batch_advantages # shape: (batch_size,)# 裁剪的目标surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * batch_advantages# Actor损失:取最小值(保守更新)actor_loss = -torch.min(surr1, surr2).mean() # 标量# Critic损失:均方误差critic_loss = F.mse_loss(new_values, batch_returns) # 标量# 熵损失:鼓励探索entropy_loss = -entropy.mean() # 标量# 总损失total_loss = actor_loss + self.value_coef * critic_loss + self.entropy_coef * entropy_loss# 反向传播self.optimizer.zero_grad()total_loss.backward()# 梯度裁剪torch.nn.utils.clip_grad_norm_(self.network.parameters(), self.max_grad_norm)# 更新参数self.optimizer.step()# 记录损失total_loss_list.append(total_loss.item())actor_loss_list.append(actor_loss.item())critic_loss_list.append(critic_loss.item())entropy_loss_list.append(entropy_loss.item())# 计算平均损失loss_info = {'total_loss': np.mean(total_loss_list),'actor_loss': np.mean(actor_loss_list),'critic_loss': np.mean(critic_loss_list),'entropy_loss': np.mean(entropy_loss_list),'advantages_mean': advantages.mean().item(),'advantages_std': advantages.std().item(),'returns_mean': returns.mean().item()}# 记录到TensorBoardself.log_to_tensorboard(loss_info)# 重置存储self.reset_storage()return loss_infodef log_to_tensorboard(self, loss_info):"""记录训练信息到TensorBoardArgs:loss_info: 损失信息字典"""for key, value in loss_info.items():self.writer.add_scalar(f'Loss/{key}', value, self.global_step)self.global_step += 1def close_writer(self):"""关闭TensorBoard写入器"""self.writer.close()
训练代码:
def train_ppo(env_name='CartPole-v1', total_timesteps=100000, steps_per_update=2048, learning_rate=3e-4, num_envs=1):"""训练PPO智能体Args:env_name: 环境名称total_timesteps: 总训练步数steps_per_update: 每次更新的步数learning_rate: 学习率num_envs: 并行环境数量Returns:agent: 训练好的智能体episode_rewards: 每回合奖励列表"""# 创建环境env = gym.make(env_name)state_dim = env.observation_space.shape[0] # 状态维度action_dim = env.action_space.n # 动作维度print(f"环境: {env_name}")print(f"状态维度: {state_dim}, 动作维度: {action_dim}")# 创建智能体agent = PPOAgent(state_dim=state_dim,action_dim=action_dim,lr=learning_rate)# 训练记录episode_rewards = [] # 每回合奖励episode_lengths = [] # 每回合长度recent_rewards = deque(maxlen=100) # 最近100回合奖励# 训练循环state = env.reset() # shape: (state_dim,)episode_reward = 0episode_length = 0timestep = 0episode_count = 0while timestep < total_timesteps:# 收集轨迹数据for step in range(steps_per_update):# 选择动作action, log_prob, value = agent.select_action(state)# 执行动作next_state, reward, done, info = env.step(action)# next_state shape: (state_dim,)# reward: 标量# done: 布尔值# 存储转移agent.store_transition(state, action, reward, log_prob, value, done)# 更新状态和统计state = next_stateepisode_reward += rewardepisode_length += 1timestep += 1# 回合结束处理if done:# 记录回合信息episode_rewards.append(episode_reward)episode_lengths.append(episode_length)recent_rewards.append(episode_reward)episode_count += 1# 记录到TensorBoardagent.writer.add_scalar('Episode/Reward', episode_reward, episode_count)agent.writer.add_scalar('Episode/Length', episode_length, episode_count)# 打印进度if episode_count % 10 == 0:avg_reward = np.mean(recent_rewards) if recent_rewards else 0print(f"Episode {episode_count}, Timestep {timestep}, "f"Reward: {episode_reward:.2f}, Avg Reward: {avg_reward:.2f}")# 重置环境state = env.reset()episode_reward = 0episode_length = 0# 达到总步数则退出if timestep >= total_timesteps:break# 计算下一个状态的价值(用于GAE计算)with torch.no_grad():if done:next_value = 0 # 终止状态价值为0else:# 估计下一个状态的价值state_tensor = torch.FloatTensor(state).unsqueeze(0)_, _, _, next_value = agent.network.get_action_and_value(state_tensor)next_value = next_value.item()# 更新网络loss_info = agent.update(next_value=next_value)# 打印损失信息if loss_info and timestep % (steps_per_update * 5) == 0:print(f"Timestep {timestep}:")for key, value in loss_info.items():print(f" {key}: {value:.6f}")# 关闭环境和TensorBoardenv.close()agent.close_writer()return agent, episode_rewardsdef test_agent(agent, env_name='CartPole-v1', num_episodes=10, render=False):"""测试训练好的智能体Args:agent: 训练好的PPO智能体env_name: 环境名称num_episodes: 测试回合数render: 是否渲染Returns:test_rewards: 测试奖励列表"""env = gym.make(env_name)test_rewards = []for episode in range(num_episodes):state = env.reset() # shape: (state_dim,)episode_reward = 0done = Falsewhile not done:if render:env.render()# 选择动作(测试时只需要动作)action, _, _ = agent.select_action(state)state, reward, done, _ = env.step(action)episode_reward += rewardtest_rewards.append(episode_reward)print(f"Test Episode {episode + 1}: Reward = {episode_reward}")env.close()avg_test_reward = np.mean(test_rewards)print(f"\n平均测试奖励: {avg_test_reward:.2f}")return test_rewards
可视化代码:
def plot_training_results(episode_rewards, save_path='ppo_training_results.png'):"""绘制训练结果Args:episode_rewards: 每回合奖励列表save_path: 保存路径"""plt.figure(figsize=(15, 5))# 原始奖励曲线plt.subplot(1, 3, 1)plt.plot(episode_rewards, alpha=0.6, color='blue')plt.title('Episode Rewards')plt.xlabel('Episode')plt.ylabel('Reward')plt.grid(True)# 移动平均奖励曲线plt.subplot(1, 3, 2)window_size = min(50, len(episode_rewards) // 4)if len(episode_rewards) >= window_size:moving_avg = []for i in range(window_size - 1, len(episode_rewards)):moving_avg.append(np.mean(episode_rewards[i - window_size + 1:i + 1]))plt.plot(range(window_size - 1, len(episode_rewards)), moving_avg, color='red')plt.title(f'Moving Average Rewards (window={window_size})')plt.xlabel('Episode')plt.ylabel('Average Reward')plt.grid(True)# 奖励分布直方图plt.subplot(1, 3, 3)plt.hist(episode_rewards, bins=30, alpha=0.7, color='green')plt.title('Reward Distribution')plt.xlabel('Reward')plt.ylabel('Frequency')plt.grid(True)plt.tight_layout()plt.savefig(save_path, dpi=300, bbox_inches='tight')plt.show()# 打印统计信息print(f"\n训练统计:")print(f"总回合数: {len(episode_rewards)}")print(f"平均奖励: {np.mean(episode_rewards):.2f}")print(f"最大奖励: {np.max(episode_rewards):.2f}")print(f"最小奖励: {np.min(episode_rewards):.2f}")print(f"奖励标准差: {np.std(episode_rewards):.2f}")def analyze_tensorboard_logs(log_dir='runs/ppo_training'):"""分析TensorBoard日志Args:log_dir: 日志目录"""print(f"\nTensorBoard日志保存在: {log_dir}")print("运行以下命令查看训练过程:")print(f"tensorboard --logdir {log_dir}")print("然后在浏览器中打开: http://localhost:6006")
完整入口
def main():"""主函数:完整的PPO训练和测试流程"""print("开始PPO训练...")# 设置随机种子torch.manual_seed(42)np.random.seed(42)# 训练参数config = {'env_name': 'CartPole-v1','total_timesteps': 100000,'steps_per_update': 2048,'learning_rate': 3e-4,'gamma': 0.99,'gae_lambda': 0.95,'clip_epsilon': 0.2,'value_coef': 0.5,'entropy_coef': 0.01}print("训练配置:")for key, value in config.items():print(f" {key}: {value}")# 训练智能体agent, episode_rewards = train_ppo(env_name=config['env_name'],total_timesteps=config['total_timesteps'],steps_per_update=config['steps_per_update'],learning_rate=config['learning_rate'])print("\n训练完成!开始测试...")# 测试智能体test_rewards = test_agent(agent, env_name=config['env_name'], num_episodes=10)# 绘制结果plot_training_results(episode_rewards)# 分析TensorBoard日志analyze_tensorboard_logs()# 保存模型torch.save(agent.network.state_dict(), 'ppo_model.pth')print("\n模型已保存为: ppo_model.pth")print("\n训练和测试完成!")return agent, episode_rewards, test_rewards# 运行示例
if __name__ == "__main__":agent, train_rewards, test_rewards = main()
一些高级优化功能
class AdvancedPPOAgent(PPOAgent):"""高级PPO智能体,包含更多优化技巧"""def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)# 学习率调度器self.scheduler = optim.lr_scheduler.LinearLR(self.optimizer, start_factor=1.0, end_factor=0.1, total_iters=1000)# 早停机制self.best_reward = -float('inf')self.patience = 50self.patience_counter = 0def update_with_scheduling(self, *args, **kwargs):"""带学习率调度的更新"""loss_info = self.update(*args, **kwargs)# 更新学习率self.scheduler.step()# 记录学习率current_lr = self.optimizer.param_groups[0]['lr']self.writer.add_scalar('Training/LearningRate', current_lr, self.global_step)return loss_infodef check_early_stopping(self, current_reward):"""检查早停条件Args:current_reward: 当前平均奖励Returns:should_stop: 是否应该停止训练"""if current_reward > self.best_reward:self.best_reward = current_rewardself.patience_counter = 0return Falseelse:self.patience_counter += 1return self.patience_counter >= self.patience# 使用示例
def train_advanced_ppo():"""使用高级PPO智能体训练"""env = gym.make('CartPole-v1')agent = AdvancedPPOAgent(state_dim=env.observation_space.shape[0],action_dim=env.action_space.n)# 训练循环(简化版)recent_rewards = deque(maxlen=100)for episode in range(1000):# ... 训练逻辑 ...# 检查早停if len(recent_rewards) >= 100:avg_reward = np.mean(recent_rewards)if agent.check_early_stopping(avg_reward):print(f"Early stopping at episode {episode}")breakreturn agent"""
PPO算法中关键变量的Shape说明:网络输入输出:
- state: (batch_size, state_dim) - 状态批次
- action_logits: (batch_size, action_dim) - 动作logits
- state_value: (batch_size, 1) - 状态价值轨迹数据:
- states: List[(state_dim,)] -> Tensor(trajectory_length, state_dim)
- actions: List[scalar] -> Tensor(trajectory_length,)
- rewards: List[scalar] -> Tensor(trajectory_length,)
- log_probs: List[scalar] -> Tensor(trajectory_length,)
- values: List[scalar] -> Tensor(trajectory_length,)
- dones: List[bool] -> Tensor(trajectory_length,)GAE计算:
- advantages: (trajectory_length,) - 优势函数
- returns: (trajectory_length,) - 回报
- delta: (trajectory_length,) - TD误差PPO更新:
- ratio: (batch_size,) - 重要性采样比率
- surr1: (batch_size,) - 未裁剪目标
- surr2: (batch_size,) - 裁剪目标
- actor_loss: scalar - Actor损失
- critic_loss: scalar - Critic损失
- entropy_loss: scalar - 熵损失分布相关:
- probs: Categorical分布对象
- log_prob: (batch_size,) - 动作对数概率
- entropy: (batch_size,) - 策略熵
"""