强化学习之入门笔记(三)
文章目录
- 强化学习
- DDPG算法
- 参考文献
强化学习
强化学习之入门笔记(一)
强化学习之入门笔记(二)
DDPG算法
深度策略性梯度算法(Deep Deterministic Policy Gradient,DDPG):适用于连续动作空间
DDPG算法采用Actor-Critic框架,利用深度神经网络近似策略和动作价值函数,利用随机梯度法训练策略网络和价值网络模型中的参数,并通过经验回放和双重网络结构提高学习稳定性

在Actor-Critic(AC)算法中,Critic网络的任务是评估当前策略并估计价值函数,并通过最小化时序差分误差来优化这个估计⇒\Rightarrow⇒δt=Rt+1+γV(st+1)−V(st)\delta_t=R_{t+1}+\gamma V(s_{t+1})-V(s_t)δt=Rt+1+γV(st+1)−V(st),TD-Errorδt\delta_tδt衡量了状态值估计与实际获得的回报之间的差异⇒\Rightarrow⇒如果价值函数估计与实际回报之间的TD-Error较大,减少Critic网络的误差,降低Actor网络的输出概率;如果价值函数估计与实际回报之间的TD-Error较小,减少Critic网络的误差,提高Actor网络的输出概率
DDPG算法使用双重神经网络架构,对于策略函数和价值函数均使用双重神经网络架构(即Online网络和Target网络);引入经验回放机制,Actor与环境交互产生的数据样本存储到经验池中,抽取批量数据样本进行训练
比喻:学生和老师的“错题本”→\rightarrow→假设你是一个学生(主Actor网络),每天做题(生成动作),老师(Critic网络)会批改你的答案并打分(Q值)。但老师打分标准经常变,今天说答案A好,明天又说答案B好,导致你学得很混乱。于是,老师决定做一个“错题本”(目标网络),记录过去稳定的评分标准,用来更温和地指导你学习
DDPG共包含4个神经网络,用于近似表示Q值函数和策略
- Critic目标网络用于近似估计下一个时刻的状态—动作的Q值函数Qw′(St+1,πθ′(St+1))Q_{w^{\prime}}(S_{t+1},\pi_{\theta^{\prime}}(S_{t+1}))Qw′(St+1,πθ′(St+1)),其中下一个时刻的动作值πθ′(St+1)\pi_{\theta^{\prime}}(S_{t+1})πθ′(St+1)是通过Actor目标网络近似估计得到的⇒\Rightarrow⇒当前状态下Q值函数的目标值yi=ri+γQw′(Si+1,πθ′(Si+1))y_{i}=r_{i}+\gamma Q_{w^{\prime}}(S_{i+1},\pi_{\theta^{\prime}}(S_{i+1}))yi=ri+γQw′(Si+1,πθ′(Si+1))
- Critic训练网络输出当前时刻状态—动作的Q值函数Qw(St,at)Q_w(S_t,a_t)Qw(St,at),用于对当前策略进行评价。通过最小化损失值(均方误差损失)来更新Critic网络的参数,Critic网络更新时的损失函数为loss=1N∑i(yi−Qw(Si,ai))2\text{loss}=\frac{1}{N}\sum_i(y_i-Q_w(S_i,a_i))^2loss=N1∑i(yi−Qw(Si,ai))2,其中ai=πθ(Si)+εa_i=\pi_\theta(S_i)+\varepsilonai=πθ(Si)+ε,ε\varepsilonε表示行为策略上的探索噪声
- Actor目标网络用于提供下一个状态的策略,Actor训练网络则是提供当前状态的策略,结合Critic训练网络的Q值函数可以得到Actor在参数更新时的策略梯度∇πθJ=1N∑i∇aQw′(s,a)∣s=si,a=πθ(si)∇θπθ(s)∣si\nabla_{\pi_\theta}J=\frac{1}{N}\sum_i\nabla_aQ_{w'}(s,a)|_{s=s_i,a=\pi_\theta(s_i)}\nabla_\theta\pi_\theta(s)|_{s_i}∇πθJ=N1∑i∇aQw′(s,a)∣s=si,a=πθ(si)∇θπθ(s)∣si,其中∇aQw′\nabla_aQ_{w'}∇aQw′是Critic提供的“目标信号”(“应该把动作往这个方向改”)、∇θπθ(s)\nabla_\theta\pi_\theta(s)∇θπθ(s)是Actor内部的“传导机制”(“为了把动作往那个方向改,需要这样调整权重θ\thetaθ”)、1N∑i\frac{1}{N}\sum_iN1∑i表示对从经验回放池中获取的NNN个样本的梯度取平均值
演员Actor/策略网络πθ(s)\pi_{\theta}(s)πθ(s):给Actor一个状态sss,Actor会确定地输出一个具体的动作aaa,即a=πθ(s)a=\pi_{\theta}(s)a=πθ(s)
评论家Critic/Q值网络Qw′(s,a)Q_{w'}(s,a)Qw′(s,a):给Critic一个状态sss和一个动作aaa,Critic会告诉(s,a)(s,a)(s,a)组合的“价值”或“分数”
演员(Actor)πθ\pi_{\theta}πθ的目标是调整自己的参数θ\thetaθ,使得它在任意状态sss时,它所输出的动作a=πθ(s)a=\pi_{\theta}(s)a=πθ(s)能让Qw′(s,a)Q_{w'}(s,a)Qw′(s,a)的分数尽可能高⇒\Rightarrow⇒Actor的目标函数J(θ)J(\theta)J(θ)是所选动作的期望QQQ值,即J(θ)=Es∼ρ[Qw′(s,a)]J(\theta)=\mathbb{E}_{s\sim\rho}[Q_{w'}(s,a)]J(θ)=Es∼ρ[Qw′(s,a)],其中a=πθ(s)a=\pi_{\theta}(s)a=πθ(s)、ρ\rhoρ表示从经验回放池中采样的状态分布
Actor通过梯度上升最大化J(θ)J(\theta)J(θ),需要计算J(θ)J(\theta)J(θ)相对于Actor参数θ\thetaθ的梯度∇θJ\nabla_{\theta}J∇θJ
链式法则:Actor的参数θ\thetaθ决定了策略πθ\pi_{\theta}πθ→\to→策略πθ(s)\pi_{\theta}(s)πθ(s)决定了输出的动作aaa→\to→动作aaa和状态sss决定了Critic的QQQ值Qw′(s,a)Q_{w'}(s,a)Qw′(s,a)⇒\Rightarrow⇒如果稍微改变θ\thetaθ,对最终分数QQQ的影响是∂Q∂θ=∂Q∂a⏟Q 对 a 的梯度⋅∂a∂θ⏟a 对 θ的梯度\frac{\partial Q}{\partial\theta}=\underbrace{\frac{\partial Q}{\partial a}}_{\text{Q 对 a 的梯度}}\cdot\underbrace{\frac{\partial a}{\partial\theta}}_{\text{a 对 }\theta\text{ 的梯度}}∂θ∂Q=Q 对 a 的梯度∂a∂Q⋅a 对 θ 的梯度∂θ∂a
对于Critic目标网络参数w′w'w′和θ′\theta'θ′的更新,DDPG通过软更新机制保证参数可以缓慢更新,从而提高学习的稳定性⇒\Rightarrow⇒w′←ξw+(1−ξ)w′θ′=←ξθ+(1−ξ)θ′\begin{aligned} w^{\prime} & \leftarrow\xi w+(1-\xi)w^{\prime} \\ \theta^{\prime} & =\leftarrow\xi\theta+(1-\xi)\theta^{\prime} \end{aligned}w′θ′←ξw+(1−ξ)w′=←ξθ+(1−ξ)θ′,其中w′w'w′和θ′\theta'θ′是Critic目标网络的参数、www和θ\thetaθ是Critic训练网络的参数、ξ\xiξ表示Critic目标网络更新的速度
Critic目标网络的策略梯度,就是用“过去的你”生成的稳定动作,指导“现在的你”改进策略,防止被Critic的善变评价带跑偏

DDPG算法的伪代码

# -*- coding: utf-8 -*-
# @Author : 楚楚
# @File : 01DDPG.py
# @Software: PyCharmimport torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import collections
import random# 经验回放
class ReplayBuffer(object):def __init__(self, capacity):""":param capacity: 最大容量"""self.buffer = collections.deque(maxlen=capacity)def add(self, state, action, reward, next_state, done):"""在队列中添加数据:param state: 当前状态:param action: 动作:param reward: 奖励:param next_state: 下一个状态:param done: 是否完成:return:"""self.buffer.append((state, action, reward, next_state, done))def sample(self, batch_size):"""在队列中随机取样:param batch_size: batch size:return:"""transitions = random.sample(self.buffer, batch_size)state, action, reward, next_state, done = zip(*transitions)return np.array(state), action, reward, np.array(next_state), donedef size(self):return len(self.buffer)# 策略网络
class PolicyNet(nn.Module):def __init__(self, state_dim, hidden_dim, action_dim, action_bound):""":param state_dim: 状态数量:param hidden_dim: 隐层数据的维度:param action_dim: 动作数量:param action_bound: 环境可以接收的动作最大值"""super(PolicyNet, self).__init__()super(PolicyNet, self).__init__()self.action_bound = action_boundself.fc1 = nn.Linear(state_dim, hidden_dim)self.fc2 = nn.Linear(hidden_dim, action_dim)def forward(self, x):x = self.fc1(x) # [b,state_dim]-->[b,hidden_dim]x = F.relu(x)x = self.fc2(x) # [b,hidden_dim]-->[b,action_dim]x = torch.tanh(x) # 缩放到 [-1,1]x = x * self.action_bound # 缩放到 [-action_bound, action_bound]return x# 价值网络
class QValueNet(nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):""":param state_dim: 状态数量:param hidden_dim: 隐层数据的维度:param action_dim: 动作数量"""super(QValueNet, self).__init__()self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)self.fc2 = nn.Linear(hidden_dim, hidden_dim)self.fc3 = nn.Linear(hidden_dim, 1)def forward(self, state, action):""":param state: 状态:param action: 动作:return:""""""拼接动作和状态: Q值函数是动作和价值的联合函数"""cat = torch.cat([state, action], dim=1) # [b, state_dim + action_dim]x = self.fc1(cat) # [b, hidden_dim]x = F.relu(x)x = self.fc2(x) # [b, hidden_dim]x = F.relu(x)x = self.fc3(x) # [b, 1]return x# DDPG
class DDPG(object):def __init__(self, state_dim, hidden_dim, action_dim, action_bound, sigma, action_lr, critic_lr, tau, gamma,device, weight_decay=5e-3):""":param state_dim: 状态数量:param hidden_dim: 隐层数据的维度:param action_dim: 动作数量:param action_bound: 环境可以接收的动作最大值:param sigma: 高斯噪声的标准差:param action_lr: 策略网络的learning rate:param critic_lr: 价值网络的learning rate:param tau: 目标网络的软更新参数:param gamma: 折扣因子:param device: 设备 cuda or cpu:param weight_decay: 权重衰减系数"""# 策略网络 -- 训练self.actor = PolicyNet(state_dim=state_dim, hidden_dim=hidden_dim, action_dim=action_dim,action_bound=action_bound).to(device)# 价值网络 -- 训练self.critic = QValueNet(state_dim=state_dim, hidden_dim=hidden_dim, action_dim=action_dim).to(device)# 策略网络 -- 目标self.target_actor = PolicyNet(state_dim=state_dim, hidden_dim=hidden_dim, action_dim=action_dim,action_bound=action_bound).to(device)# 价值网络 -- 目标self.target_critic = QValueNet(state_dim=state_dim, hidden_dim=hidden_dim, action_dim=action_dim).to(device)# 初始化价值网络的参数,两个价值网络的参数相同self.target_actor.load_state_dict(self.actor.state_dict())# 初始化策略网络的参数,两个策略网络的参数相同self.target_critic.load_state_dict(self.critic.state_dict())# 策略网络的优化器self.actor_optimizer = optim.AdamW(self.actor.parameters(), lr=action_lr, weight_decay=weight_decay)# 价值网络的优化器self.critic_optimizer = optim.AdamW(self.critic.parameters(), lr=critic_lr, weight_decay=weight_decay)self.gamma = gamma # 折扣因子self.sigma = sigma # 高斯噪声的标准差,均值设为0self.tau = tau # 目标网络的软更新参数self.action_dim = action_dimself.device = device# 动作选择def take_action(self, state):""":param state: 当前状态:return:"""# list[state_dim]-->tensor[1,state_dim]state = torch.tensor(state, dtype=torch.float32).view(1, -1).to(self.device)# 策略网络计算出当前状态下的动作价值 [1,state_dim]-->[1,1]action = self.actor(state).item()# 给动作添加噪声,增加搜索action = action + self.sigma * np.random.randn(self.action_dim)return action# 软更新,每次learn的时候更新部分参数def soft_update(self, net, target_net):# 获取训练网络和目标网络需要更新的参数for target_parameter, parameter in zip(target_net.parameters(), net.parameters()):# 训练网络的参数更新要综合考虑目标网络和训练网络"""调用 target_parameter.data.copy_(source) 时,将 source 张量的数据复制到 target_parameter 张量的 .data 属性中可以更新 target_parameter 的值,而不会影响它的梯度信息,因为梯度信息存储在 .grad 属性中,而不是 .data 属性"""target_parameter.data.copy_(target_parameter.data * (1 - self.tau) + parameter.data * self.tau)# 训练def update(self, transition_dict):""":param transition_dict: 训练集:return:"""states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) # [b,state_dim]actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1, 1).to(self.device) # [b,1]rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device) # [b,1]next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) # [b,state_dim]dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device) # [b,1]# 策略目标网络获取下一时刻的动作next_actions = self.target_actor(next_states)# 价值目标网络获取下一个时刻动作的价值next_q_values = self.target_critic(next_states, next_actions)# 当前时刻动作价值的目标值q_targets = rewards + self.gamma * next_q_values * (1 - dones)# 当前时刻动作价值的预测值q_values = self.critic(states, actions)# 预测值和目标值之间的均方差损失critic_loss = torch.mean(F.mse_loss(q_values, q_targets))# 价值网络的梯度self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()# 当前状态下选择的动作current_actions = self.actor(states)# 当前状态下动作的价值scores = self.critic(states, current_actions)# 计算损失# 最小化负的期望回报,即最大化期望回报actor_loss = -torch.mean(scores)self.actor_optimizer.zero_grad()actor_loss.backward()self.actor_optimizer.step()# 软更新策略网络的梯度self.soft_update(self.actor, self.target_actor)# 软更新价值网络的梯度self.soft_update(self.critic, self.target_critic)
案例:基于 OpenAI 的 gym 环境完成一个推车游戏,目标是将小车推到山顶旗子处。动作维度为1,属于连续值;状态维度为 2,分别是 x 坐标和小车速度

# -*- coding: utf-8 -*-
# @Author : 楚楚
# @File : 02DDPG_OpenAI.py
# @Software: PyCharmimport torch
import numpy as np
import gym
import matplotlib.pyplot as plt
from RL_DDPG import ReplayBuffer, DDPG# 环境加载
env_name = "MountainCarContinuous-v0" # 连续型动作
env = gym.make(env_name, render_mode="human")
state_dim = env.observation_space.shape[0] # 状态数 2
action_dim = env.action_space.shape[0] # 动作数 1
action_bound = env.action_space.high[0] # 动作的最大值 1.0hidden_dim = 32# 经验回放池最大尺寸
buffer_size = 1024
# 经验回放池的最小尺寸
min_size = 32
# batch size
batch_size = 16
# 高斯噪声的标准差
sigma = 1.0
# 策略网络的学习率
actor_lr = 5e-4
# 价值网络的学习率
critic_lr = 5e-4
# 软更新系数
tau = 0.01
# 折扣因子
gamma = 0.8device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")replay_buffer = ReplayBuffer(capacity=buffer_size)agent = DDPG(state_dim=state_dim,hidden_dim=hidden_dim,action_dim=action_dim,action_bound=action_bound,sigma=sigma,tau=tau,gamma=gamma,action_lr=actor_lr,critic_lr=critic_lr,device=device)# -------------------------------------- #
# 模型训练
# -------------------------------------- ## 记录每个回合的return
return_list = []
# 记录每个回合的return均值
mean_return_list = []for i in range(10):# 每个episode的累计奖励值episode_return = 0# 初始时的状态state = env.reset()[0]# 回合结束标记done = Falsewhile not done:# 获取当前状态的动作action = agent.take_action(state)# 环境更新next_state, reward, done, _, _ = env.step(action)# 更新经验回放池replay_buffer.add(state, action, reward, next_state, done)# 状态更新state = next_state# 累计每一步的rewardepisode_return += reward# 如果经验池超过容量,开始训练if replay_buffer.size() > min_size:states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size=batch_size)# 构造数据集transition_dict = {'states': states,'actions': actions,'rewards': rewards,'next_states': next_states,'dones': dones,}# 模型训练agent.update(transition_dict)# 保存每一个回合的回报return_list.append(episode_return)mean_return_list.append(np.mean(return_list[-10:]))# 打印回合信息print(f'iter:{i}, return:{episode_return}, mean_return:{mean_return_list}')# 关闭动画窗格
env.close()# -------------------------------------- #
# 绘图
# -------------------------------------- #x_range = list(range(len(return_list)))"""
plt.subplot(121):第一个参数 1 表示图形窗口中子图的行数(rows)第二个参数 2 表示图形窗口中子图的列数(columns)第三个参数 1(可选)表示当前激活的子图编号,编号从1开始
"""
plt.subplot(121)
plt.plot(x_range, return_list) # 每个回合return
plt.xlabel('episode')
plt.ylabel('return')plt.subplot(122)
plt.plot(x_range, mean_return_list) # 每回合return均值
plt.xlabel('episode')
plt.ylabel('mean_return')
参考文献
- DDPG 模型解析
