当前位置: 首页 > news >正文

王树森深度强化学习 DRL(六)连续控制 DDPG + 随机策略

【王树森】深度强化学习(DRL) 连续控制 -- B站

github - ddpg 代码

1. 离散控制与连续控制区别

之前的问题:像超级玛丽,动作空间只有三种动作,DQN的输出维度为 3 。

像机械臂控制问题,两个自由度对应两个维度的角度,

角度是一个连续的量(连续动作空间)

想法1:连续量离散化:划分小间隔为一个单位。

(但这么操作 会随着维度升高而指数爆炸

2. Deterministic Policy Gradient (DDPG) 深度确定性策略梯度

https://arxiv.org/pdf/1509.02971

属于一种 actor-critic 方法,策略网络直接输出连续动作的确定性取值

(而非离散动作的概率分布),突破了传统强化学习在连续动作空间的局限。

TD 算法更新价值网络,下一步的 a' 由策略网络得到。

同价值学习部分优化手段(使用目标网络、经验回放、多步TD损失)

使得 q 打分更高,训练策略网络;链式法则 q -> a -> θ

代码实现的时候:损失设置为 q(s,π(s)) 的相反数。

actor_loss = -self.critic(s, self.actor(s)).mean()

伪代码:先把目标网络参数设置为 = 主网络,后续每一次的循环中

π(s) + 扰动 -> 动作 a -> (s,a,r,s') 存入经验回放;

经验回放取一个 Batch;价值网络和策略网络更新;目标网络软更新

3. 随机策略连续控制

用正态分布的概率密度函数 作为策略函数进行抽样:

因为标准差 σ 必须非负,如果把 σ 作为优化变量,那么优化模型有约束条件,给求解造成困难。

于是我们用 方差对数 ρ = ln(σ²) 和 均值 μ,并且拿神经网络拟合。

辅助网络 f() = ln π() + C

故随机策略梯度可写为:

再使用 REINFORCE 或 actor-critic 的方法训练。

4. 倒立摆代码实践  stable-baselines3

https://stable-baselines3.readthedocs.io/en/v2.7.0/modules/ddpg.html

环境的目标:将倒立摆从随机位置摆动到竖直向上(倒立)的状态,并尽可能地保持住。

奖励在每个时间步的计算公式如下:

reward = -(θ² + 0.1 * θ'² + 0.001 * τ²)

θ: 摆杆角度与竖直向上位置的偏差

θ': 摆杆的角速度。        τ: 施加在摆杆铰链上的扭矩(动作)。

理想稳定状态:角度偏差小,角速度小,尽量少施加外力。

最后返回 200 步的总奖励值,reward 越接近 0 表现越好。

Policy 网络参数:

MlpPolicy:状态观测空间是向量的环境;多层感知机策略;Pendulum-v1, CartPole 经典控制

CnnPolicy:状态观测空间是图像的环境;卷积神经网络策略;Atari 游戏,机器人视觉导航

MultiInputPolicy:同时接收图像和向量信息的环境

# pip install stable-baselines3
import gymnasium as gym
import numpy as np
from stable_baselines3 import DDPG
from stable_baselines3.common.noise import NormalActionNoise# 1. 创建训练环境:关闭渲染,提升训练效率
env_train = gym.make("Pendulum-v1", render_mode=None)# 创建DDPG需要的动作噪声
n_actions = env_train.action_space.shape[-1]
action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))# 初始化DDPG模型,传入训练环境
model = DDPG("MlpPolicy", env_train, action_noise=action_noise, verbose=1)print("开始训练(无可视化)...")
model.learn(total_timesteps=10000, log_interval=10) # 10000步训练,10个episode输出一次日志
model.save("ddpg_pendulum")
# 关闭训练环境,释放资源
env_train.close()

日志输出示例:

| rollout/           |          |
|    ep_len_mean     | 200      |  # 平均每个episode长度 = 200步
|    ep_rew_mean     | -649     |  # 平均每个episode总奖励 = -649(默认是最近100个episode平均)| time/              |          |
|    episodes        | 30       |  # 已完成的episode总数
|    fps             | 192      |  # 每秒处理的帧数/步数
|    time_elapsed    | 31       |  # 已训练时间(秒)
|    total_timesteps | 6000     |  # 总训练步数 = 30 episodes × 200步| train/             |          |
|    actor_loss      | 62.7     |  # Actor网络损失值
|    critic_loss     | 1        |  # Critic网络损失值  
|    learning_rate   | 0.001    |  # 学习率
|    n_updates       | 5899     |  # 网络参数更新次数

2. 测试阶段:创建支持 human 渲染的环境

print("开始测试(启用可视化)...")
env_test = gym.make("Pendulum-v1", render_mode="human")
# 加载训练好的模型
model = DDPG.load("ddpg_pendulum")for test_ep in range(5):obs, _ = env_test.reset()test_reward = 0done = Falsewhile not done:# 确定性预测(关闭噪声,使动作更稳定)action, _states = model.predict(obs, deterministic=True)# 与测试环境交互obs, reward, terminated, truncated, info = env_test.step(action)test_reward += rewarddone = terminated or truncatedprint(f"测试轮{test_ep + 1} 总奖励:{test_reward:.1f}")# 关闭测试环境
env_test.close()

plus:使用 callback 追踪 episode 和日志里的两个网络损失。

from stable_baselines3.common.callbacks import BaseCallbackclass SimpleTrainingCallback(BaseCallback):def __init__(self, verbose: int = 0):super().__init__(verbose)self.episode_rewards = []  # 每个episode的总奖励self.actor_losses = []  # Actor网络损失self.critic_losses = []  # Critic网络损失self.current_episode_reward = 0  # 当前episode累计奖励def _on_step(self) -> bool:# 累加当前episode的奖励self.current_episode_reward += self.locals['rewards'][0]# 记录网络损失(如果有)log_data = self.logger.name_to_valueif "train/actor_loss" in log_data:self.actor_losses.append(log_data["train/actor_loss"])if "train/critic_loss" in log_data:self.critic_losses.append(log_data["train/critic_loss"])# episode结束 记录总的奖励if self.locals.get('dones', [False])[0]:self.episode_rewards.append(self.current_episode_reward)self.current_episode_reward = 0return Truecallback = SimpleTrainingCallback()model = DDPG("MlpPolicy",env,action_noise=action_noise,verbose=1,learning_starts=1000,train_freq=(1, "episode"))

并画出奖励和网络损失的折线图。发现 30 个 episode 后 reward就比较稳定。

5. 手搓 DDPG

1. 超参数设置上和原论文一致

import torch
# 训练相关
EPISODES = 100  # 总训练轮数
MAX_STEPS = 200  # 每轮最大步数
BATCH_SIZE = 64  # 批量采样大小
GAMMA = 0.99  # 折扣因子(未来奖励权重)# 网络更新相关
TAU = 0.005  # 目标网络软更新系数(tau越小更新越平滑)
LR_ACTOR = 1e-4  # 演员网络学习率
LR_CRITIC = 1e-3  # 评论员网络学习率# 经验回放
BUFFER_SIZE = 10000  # 经验缓冲区最大容量# 噪声探索
NOISE_SCALE = 0.1  # 高斯噪声强度(平衡探索与利用)
SEED = 42# 设备配置(自动识别GPU/CPU)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

2. 经验回放 类似之前 dqn

from collections import deque
import randomclass ReplayBuffer:def __init__(self, buffer_size):self.buffer = deque(maxlen=buffer_size)  # 固定容量,超容自动丢弃旧数据def add(self, s, a, r, s_next, done):self.buffer.append((s, a, r, s_next, done))def sample_batch(self, batch_size):"""随机采样批量经验"""if len(self.buffer) < batch_size:return None  # 样本不足时返回Nonereturn random.sample(self.buffer, batch_size)def __len__(self):return len(self.buffer)

3. net 的隐藏层维度均为 400、300

Actor 网络,输出动作值:输出值先用 激活函数 tanh 放缩到 [-1,1]

再乘以 action_bound 到 [-2,2];对输出层进行 [ -3e-3, 3e-3 ] 的初始化。

class Actor(nn.Module):def __init__(self, state_dim, action_dim, action_bound):super(Actor, self).__init__()self.action_bound = action_bound# 网络结构self.fc1 = nn.Linear(state_dim, 400)self.fc2 = nn.Linear(400, 300)self.fc3 = nn.Linear(300, action_dim)# 正交初始化nn.init.orthogonal_(self.fc1.weight, gain=nn.init.calculate_gain('relu'))nn.init.constant_(self.fc1.bias, 0.0)nn.init.orthogonal_(self.fc2.weight, gain=nn.init.calculate_gain('relu'))nn.init.constant_(self.fc2.bias, 0.0)# 输出层:小权重初始化,确保初始动作接近0nn.init.uniform_(self.fc3.weight, -3e-3, 3e-3)nn.init.uniform_(self.fc3.bias, -3e-3, 3e-3)def forward(self, x):x = torch.relu(self.fc1(x))x = torch.relu(self.fc2(x))x = torch.tanh(self.fc3(x)) # 放缩到[-1,1]return x * self.action_bound

Critic 网络,输入(s,a) 输出打分值

class Critic(nn.Module):def __init__(self, state_dim, action_dim):super(Critic, self).__init__()# 网络结构self.fc1 = nn.Linear(state_dim + action_dim, 400)self.fc2 = nn.Linear(400, 300)self.fc3 = nn.Linear(300, 1)# 正交初始化nn.init.orthogonal_(self.fc1.weight, gain=nn.init.calculate_gain('relu'))nn.init.constant_(self.fc1.bias, 0.0)nn.init.orthogonal_(self.fc2.weight, gain=nn.init.calculate_gain('relu'))nn.init.constant_(self.fc2.bias, 0.0)# 输出层:小权重初始化nn.init.uniform_(self.fc3.weight, -3e-3, 3e-3)nn.init.uniform_(self.fc3.bias, -3e-3, 3e-3)def forward(self, s, a):x = torch.cat([s, a], dim=1)x = torch.relu(self.fc1(x))x = torch.relu(self.fc2(x))return self.fc3(x)

4. 主函数为了可复现性,设定随机种子。

包含训练 + reward&网络损失可视化 + 参数保存 + 参数导入测试。

import randomdef set_seed(seed=42):"""设置所有随机种子以确保结果可重复"""# Python随机种子random.seed(seed)# Numpy随机种子np.random.seed(seed)# PyTorch随机种子torch.manual_seed(seed)torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)  # 如果使用多GPU# PyTorch确定性设置torch.backends.cudnn.deterministic = Truetorch.backends.cudnn.benchmark = False

5. DDPG 智能体

包含初始化、动作选择、更新、软更新

5.1 初始化 Actor & Critic 网络,经验回放,目标网络 = 主网络

class DDPGAgent:def __init__(self, state_dim, action_dim, action_bound):# 1. 初始化 Actorself.actor = Actor(state_dim, action_dim, action_bound).to(HyperParams.DEVICE)self.actor_target = Actor(state_dim, action_dim, action_bound).to(HyperParams.DEVICE)self.actor_optim = optim.Adam(self.actor.parameters(), lr=HyperParams.LR_ACTOR)# 2. 初始化 Criticself.critic = Critic(state_dim, action_dim).to(HyperParams.DEVICE)self.critic_target = Critic(state_dim, action_dim).to(HyperParams.DEVICE)self.critic_optim = optim.Adam(self.critic.parameters(), lr=HyperParams.LR_CRITIC)# 3. 初始化经验回放缓冲区self.replay_buffer = ReplayBuffer(HyperParams.BUFFER_SIZE)# 4. 目标网络参数初始化self.soft_update(tau=1.0)  # 初始化目标网络等于主网络self.actor_losses = []self.critic_losses = []

5.2 软更新目标网络

    def soft_update(self, tau=HyperParams.TAU):"""软更新目标网络:target = tau*main + (1-tau)*target"""# Actor 网络软更新for main_param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):target_param.data.copy_(tau * main_param.data + (1 - tau) * target_param.data)# Critic 网络软更新for main_param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):target_param.data.copy_(tau * main_param.data + (1 - tau) * target_param.data)

5.3 s->a 不进行梯度更新,先转为 eval 评估模式 + with torch.no_grad();

training 模式则引入高斯噪声并裁剪到动作边界

    def select_action(self, s, is_training=True):"""选择动作:训练时加噪声探索,测试时纯策略输出"""s_tensor = torch.FloatTensor(s).unsqueeze(0).to(HyperParams.DEVICE)self.actor.eval()  # 评估模式with torch.no_grad():action = self.actor(s_tensor).cpu().numpy()[0]  # 输出动作self.actor.train()  # 恢复训练模式# 训练时添加高斯噪声(探索),并裁剪到动作边界if is_training:noise = np.random.normal(0, HyperParams.NOISE_SCALE, size=action.shape)action = np.clip(action + noise, -action_bound, action_bound)return action

5.4 取一个 batch 网络更新

使用 TD 更新 Critic 网络;a' q' 分别由 目标动作网络 和 目标价值网络得到。

    def update(self):"""从经验缓冲区采样并训练网络"""# 采样批量经验(样本不足时跳过)batch = self.replay_buffer.sample_batch(HyperParams.BATCH_SIZE)if batch is None:return# 转换批量数据为tensor(适配网络输入)s, a, r, s_next, done = (torch.FloatTensor(np.array(elem)).to(HyperParams.DEVICE) for elem in zip(*batch))r, done = r.unsqueeze(1), done.unsqueeze(1)# -------------------------- 训练critic网络 --------------------------# 计算目标Q值:r + gamma*(1-done)*Q_target(s_next, a_next)a_next = self.actor_target(s_next)  # 目标演员输出-next动作q_target = r + HyperParams.GAMMA * (1 - done) * self.critic_target(s_next, a_next)# 计算预测Q值q_pred = self.critic(s, a)# 最小化MSE损失critic_loss = nn.MSELoss()(q_pred, q_target.detach())  # detach冻结目标网络self.critic_losses.append(critic_loss.item())self.critic_optim.zero_grad()critic_loss.backward()self.critic_optim.step()

Actor 要使得 q(a) 尽量大,目标函数即为 -q(a)

# -------------------------- 训练actor网络 --------------------------# 最大化Q值(策略梯度:通过负号转为梯度上升)actor_loss = -self.critic(s, self.actor(s)).mean()self.actor_losses.append(actor_loss.item())self.actor_optim.zero_grad()actor_loss.backward()self.actor_optim.step()# -------------------------- 软更新目标网络 --------------------------self.soft_update()

进行 100 episode 的训练,reward 在 -150 上下:

http://www.dtcms.com/a/600777.html

相关文章:

  • 【SatWetCH4 第一期】全球湿地甲烷排放通量估算过程模型 SatWetCH4 介绍
  • Opencv(十一) : 图像缩放
  • 开源 Objective-C IOS 应用开发(四)Xcode工程文件结构
  • 儿童网站 源码html5网站开发教学
  • 编译类语言的特点与应用
  • Python 数组使用方法总结
  • 网站风格变化免费logo在线制作头像
  • 第三章深度解析:智能体“大脑”的底层逻辑——大语言模型技术基石全拆解
  • 100个云计算基础知识
  • 对比 DeepSeek(MLA)、Qwen 和 Llama 系列大模型在 Attention 架构/算法层面的核心设计及理解它们的本质区别。
  • 【C++】List容器模拟实现(超详细)
  • 湖南火电建设有限公司网站龙采哈尔滨建站公司
  • 【PHP反序列化】css夺旗赛
  • ServletLess架构简介
  • 安卓C语言编译器的选择与使用技巧 | 优化C语言编程体验,提升开发效率
  • (三)自然语言处理笔记——Transformer
  • iOS性能分析工具,有UI卡顿、app启动、内存、webview等性能优化解析
  • 电商网站建设 数商云招商码头无忧查询系统
  • 开源 Objective-C IOS 应用开发(三)第一个iPhone的APP
  • (11)(2.2.2) BLHeli32,AM32, and BLHeli_S ESCs(二)
  • Google Chrome v142.0.7444.135 便携增强版
  • [Windows] PDF文件浏览OCR工具1.0
  • 2025人形机器人产业链全景分析报告:核心技术与市场趋势|附130+份报告PDF、数据、可视化模板汇总下载
  • 长春教做网站带维护的培训机构淮安网站建设
  • 图文详述:MySQL的下载、安装、配置、使用
  • 把课本内容抄到PPT上就行吗?会不会太乱?
  • MySQL XtraBackup 使用文档(全量 + 增量备份与恢复)
  • 在k8s中seaweedfs中,weed 命令详细举例说明
  • 动易 网站统计 首次打开阿里云服务器学生
  • 【底层奥秘与性能艺术】让 RTOS 在 48 MHz MCU 上跑出 0.5 µs 上下文切换——一场从零开始的嵌入式“时间革命”