强化学习-策略梯度算法
梯度公式推导
首先要明确几个基本概念:
- J ( θ ) J(\theta) J(θ) 是目标函数,即优化目标对应的计算函数,我们希望这个函数的值越大越好
- 通常把损失函数定义为目标函数的负值 l o s s ( θ ) = − J ( θ ) loss(\theta) = -J(\theta) loss(θ)=−J(θ)
- 策略参数的优化过程为 θ = θ − α ⋅ ∇ θ l o s s ( θ ) \theta = \theta - \alpha \cdot \nabla_{\theta}loss(\theta) θ=θ−α⋅∇θloss(θ) 等同于 θ = θ + α ⋅ ∇ θ J ( θ ) \theta = \theta + \alpha \cdot \nabla_{\theta}J(\theta) θ=θ+α⋅∇θJ(θ),其中 α \alpha α为学习率
在代码实现上,通常不需要我们手动求导计算梯度,pytorch会自动帮我们完成这个过程,我们只需要计算得出 J ( θ ) J(\theta) J(θ)的值,进而得出 l o s s ( θ ) loss(\theta) loss(θ),然后调用pytorch提供的方法来计算梯度并更新参数即可。
接下来,再介绍一些关于梯度的基本知识。一个函数关于某个参数的梯度,其实就是该函数对该参数的偏导数。例如对于函数 f ( x 1 , x 2 , . . . , x n ) f(x_1, x_2, ..., x_n) f(x1,x2,...,xn),它对所有参数的梯度和对某个具体参数的梯度可以表示为:
∇ f = ( ∂ f ∂ x 1 , ∂ f ∂ x 2 , … , ∂ f ∂ x n ) , ∇ x i f = ∂ f ∂ x i \nabla f = \left( \frac{\partial f}{\partial x_1}, \frac{\partial f}{\partial x_2}, \dots, \frac{\partial f}{\partial x_n} \right), \nabla_{x_i}f = \frac{\partial f}{\partial x_i} ∇f=(∂x1∂f,∂x2∂f,…,∂xn∂f),∇xif=∂xi∂f
∝ \propto ∝的含义是正比于, A ∝ B A \propto B A∝B表明存在一个常数 k k k 使 A = k ⋅ B A = k \cdot B A=k⋅B
定义
τ
\tau
τ为一条轨迹,包含一次游戏从开始到结束的状态序列、动作序列、奖励序列等等。则目标函数定义为每条轨迹所获取总奖励的期望值,如下面所示,其中
P
θ
(
τ
)
P_{\theta}(\tau)
Pθ(τ)为在策略参数
θ
\theta
θ下轨迹
τ
\tau
τ的概率分布函数,函数返回值为在策略
θ
\theta
θ下轨迹为
τ
\tau
τ的概率。
J
(
θ
)
=
E
(
R
(
τ
)
)
τ
∼
P
θ
(
τ
)
=
∑
τ
R
(
τ
)
P
θ
(
τ
)
J(\theta) = E(R(\tau))_{\tau \sim P_{\theta}(\tau)} = \sum_{\tau}R(\tau)P_{\theta}(\tau)
J(θ)=E(R(τ))τ∼Pθ(τ)=τ∑R(τ)Pθ(τ)
我们对
θ
\theta
θ求导,因为总奖励函数
R
(
τ
)
R(\tau)
R(τ)是由环境决定的,与
θ
\theta
θ无关,所以可得:
∇
θ
J
(
θ
)
=
∑
τ
R
(
τ
)
⋅
∇
θ
P
θ
(
τ
)
\nabla_{\theta}J(\theta) = \sum_{\tau}R(\tau) \cdot \nabla_{\theta}P_{\theta}(\tau)
∇θJ(θ)=τ∑R(τ)⋅∇θPθ(τ)
蒙特卡洛是一种近似方法,随机采样若干次,将采样得到的平均值近似当做目标值。按照此近似方法,我们可以通过采样
N
N
N条轨迹,来近似每条轨迹的期望总奖励。
E
(
R
(
τ
)
)
τ
∼
P
θ
(
τ
)
=
∑
τ
R
(
τ
)
P
θ
(
τ
)
≈
1
N
∑
n
=
1
N
R
(
τ
n
)
E(R(\tau))_{\tau \sim P_{\theta}(\tau)} = \sum_{\tau}R(\tau)P_{\theta}(\tau) \approx \frac{1}{N} \sum_{n = 1}^NR({\tau^n})
E(R(τ))τ∼Pθ(τ)=τ∑R(τ)Pθ(τ)≈N1n=1∑NR(τn)
因此,我们可以得到如下变换:
∇
θ
J
(
θ
)
=
∑
τ
R
(
τ
)
⋅
∇
θ
P
θ
(
τ
)
\nabla_{\theta}J(\theta) = \sum_{\tau}R(\tau) \cdot \nabla_{\theta}P_{\theta}(\tau)
∇θJ(θ)=τ∑R(τ)⋅∇θPθ(τ)
∇
θ
J
(
θ
)
=
∑
τ
P
θ
(
τ
)
R
(
τ
)
⋅
∇
θ
P
θ
(
τ
)
P
θ
(
τ
)
\nabla_{\theta}J(\theta) = \sum_{\tau}P_{\theta}(\tau)R(\tau) \cdot \frac{\nabla_{\theta}P_{\theta}(\tau)}{P_{\theta}(\tau)}
∇θJ(θ)=τ∑Pθ(τ)R(τ)⋅Pθ(τ)∇θPθ(τ)
∇
θ
J
(
θ
)
≈
1
N
∑
n
=
1
N
R
(
τ
n
)
⋅
∇
θ
l
o
g
(
P
θ
(
τ
n
)
)
\nabla_{\theta}J(\theta) \approx \frac{1}{N} \sum_{n = 1}^N R({\tau^n}) \cdot \nabla_{\theta}log(P_{\theta}(\tau^n))
∇θJ(θ)≈N1n=1∑NR(τn)⋅∇θlog(Pθ(τn))
其中 ∇ θ P θ ( τ ) P θ ( τ ) = ∇ θ l o g ( P θ ( τ ) ) \frac{\nabla_{\theta}P_{\theta}(\tau)}{P_{\theta}(\tau)} = \nabla_{\theta}log(P_{\theta}(\tau)) Pθ(τ)∇θPθ(τ)=∇θlog(Pθ(τ))是复合函数求导法则的运用, l o g ( y ) ′ = y ′ y log(y)' = \frac{y'}{y} log(y)′=yy′。
一条轨迹的概率等于轨迹的动作序列中各个动作概率的乘积,因此有如下表达式,其中
π
θ
(
a
t
∣
s
t
)
\pi_{\theta}(a_t|s_t)
πθ(at∣st)为在状态
s
t
s_t
st下采取
a
t
a_t
at的概率,实际上就是策略网络。
l
o
g
(
P
θ
(
τ
)
)
=
l
o
g
(
∏
t
=
1
T
π
θ
(
a
t
∣
s
t
)
)
=
∑
t
=
1
T
l
o
g
(
π
θ
(
a
t
∣
s
t
)
)
log(P_{\theta}(\tau)) = log(\prod_{t = 1}^T \pi_{\theta}(a_t|s_t)) = \sum_{t = 1}^T log(\pi_{\theta}(a_t|s_t))
log(Pθ(τ))=log(t=1∏Tπθ(at∣st))=t=1∑Tlog(πθ(at∣st))
因此可以得到:
∇
θ
J
(
θ
)
≈
.
.
.
=
∑
n
=
1
N
∑
t
=
1
T
n
R
(
τ
n
)
⋅
∇
θ
l
o
g
(
π
θ
(
a
t
n
∣
s
t
n
)
)
\nabla_{\theta}J(\theta) \approx ... = \sum_{n = 1}^N \sum_{t = 1}^{T^n} R({\tau^n}) \cdot \nabla_{\theta}log(\pi_{\theta}(a_t^n|s_t^n))
∇θJ(θ)≈...=n=1∑Nt=1∑TnR(τn)⋅∇θlog(πθ(atn∣stn))
这里存在两个待优化的问题:
- 一个动作只会影响该动作之后的奖励,因此对所有的动作 a t a_t at都乘以整条轨迹的总奖励 R ( τ n ) R(\tau^n) R(τn)是不合理的。
- 对于评价一个动作的好坏来说,执行动作后的立刻奖励是最重要的,而未来的奖励随着时间步的推移,权重应该逐渐降低。
因此,对表达式中的
R
(
τ
n
)
R(\tau^n)
R(τn)进行替换,如下所示,其中
γ
\gamma
γ为折扣因子,
r
r
r为执行一个动作后的奖励值。
R
(
τ
n
)
→
∑
t
′
=
t
T
n
γ
t
′
−
t
⋅
r
t
n
R(\tau^n) \rightarrow \sum_{t' = t}^{T^n}\gamma^{t' - t} \cdot r_t^n
R(τn)→t′=t∑Tnγt′−t⋅rtn
为了简便,我们暂时每次参数更新只采样一条轨迹,即
N
=
1
N = 1
N=1,可以得出最终表达式如下
∇
θ
J
(
θ
)
≈
.
.
.
=
∑
t
=
1
T
∑
t
′
=
t
T
γ
t
′
−
t
⋅
r
t
⋅
∇
θ
l
o
g
(
π
θ
(
a
t
∣
s
t
)
)
\nabla_{\theta}J(\theta) \approx ... = \sum_{t = 1}^{T} \sum_{t' = t}^{T}\gamma^{t' - t} \cdot r_t \cdot \nabla_{\theta}log(\pi_{\theta}(a_t|s_t))
∇θJ(θ)≈...=t=1∑Tt′=t∑Tγt′−t⋅rt⋅∇θlog(πθ(at∣st))
为了便于理解,可以加上括号
∇ θ J ( θ ) ≈ . . . = ∑ t = 1 T ( ∑ t ′ = t T γ t ′ − t ⋅ r t ) ⋅ ∇ θ l o g ( π θ ( a t ∣ s t ) ) \nabla_{\theta}J(\theta) \approx ... = \sum_{t = 1}^{T} (\sum_{t' = t}^{T} \gamma^{t' - t} \cdot r_t) \cdot \nabla_{\theta}log(\pi_{\theta}(a_t|s_t)) ∇θJ(θ)≈...=t=1∑T(t′=t∑Tγt′−t⋅rt)⋅∇θlog(πθ(at∣st))
所以
J ( θ ) ≈ . . . = ∑ t = 1 T ( ∑ t ′ = t T γ t ′ − t ⋅ r t ) ⋅ l o g ( π θ ( a t ∣ s t ) ) + C J(\theta) \approx ... = \sum_{t = 1}^{T} (\sum_{t' = t}^{T} \gamma^{t' - t} \cdot r_t) \cdot log(\pi_{\theta}(a_t|s_t)) + C J(θ)≈...=t=1∑T(t′=t∑Tγt′−t⋅rt)⋅log(πθ(at∣st))+C
C
C
C是一个常数,我们不妨令C = 0,所以可得损失函数表达式:
l
o
s
s
(
θ
)
=
−
∑
t
=
1
T
(
∑
t
′
=
t
T
γ
t
′
−
t
⋅
r
t
)
⋅
l
o
g
(
π
θ
(
a
t
∣
s
t
)
)
loss(\theta) = -\sum_{t = 1}^{T} (\sum_{t' = t}^{T} \gamma^{t' - t} \cdot r_t) \cdot log(\pi_{\theta}(a_t|s_t))
loss(θ)=−t=1∑T(t′=t∑Tγt′−t⋅rt)⋅log(πθ(at∣st))
到此,基础版的策略梯度算法公式已推导完毕,在代码中按照 l o s s ( θ ) loss(\theta) loss(θ)公式计算loss值并进行参数更新即可。具体代码如下面所示,其中log_prob变量对应的是 l o g ( π θ ( a t ∣ s t ) ) log(\pi_{\theta}(a_t|s_t)) log(πθ(at∣st))值,G变量对应的是 ∑ t ′ = t T γ t ′ − t ⋅ r t \sum_{t' = t}^{T} \gamma^{t' - t} \cdot r_t ∑t′=tTγt′−t⋅rt值。
代码实现
环境为python3.12,各依赖包均为最新版。
import gymnasium as gym
import torch
import torch.distributions as dist
import torch.nn as nn
from torch import tensor
class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(PolicyNet, self).__init__()
# 一个线性层 + 激活函数 + 一个线性层
self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim),
)
# 使用nn.Softmax(dim=-1)创建softmax函数
# 如果是单个输入,那么输入形状为(4,),如果batch输入,那么输出形状为(batch_size, 4)
# 我们需要在4这个维度上,在4个元素的范围内对数据进行softmax处理
# 所以参数传入dim=-1,意思是在最后一个维度进行softmax
self.softmax = nn.Softmax(dim=-1)
def forward(self, x):
x = self.network(x)
return self.softmax(x)
class Agent:
def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, device):
# 策略网络
self.policy_net = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
# 创建优化器,优化器的作用是根据每个参数的梯度来更新参数
self.optimizer = torch.optim.Adam(self.policy_net.parameters(), lr=learning_rate)
# 折扣因子
self.gamma = gamma
# 进行神经网络计算的设备
self.device = device
def take_action(self, state):
# 输入的state是一个长度为4的数组序列,将state转换为tensor
state = tensor(state, dtype=torch.float).to(self.device)
# 将tensor state输入到网络中得到输出,输出的内容即时采用各个动作的概率,是一个tensor数组
# 调用self.policy_net(state)本质上就是调用forward函数
probs = self.policy_net(state)
# Categorical会根据一个概率序列生成一个随机分布,分布的值是概率对应的下表
# sample()会根据概率值,随机取一个分布值
action = dist.Categorical(probs).sample()
return action.item()
def update(self, trajectory):
# 更新策略网络时,我们需要的是一条轨迹,即一次游戏从开始到结束,产生的动作、状态、奖励序列等等
reward_list = trajectory['reward_list']
state_list = trajectory['state_list']
action_list = trajectory['action_list']
G = 0
# 清空梯度
self.optimizer.zero_grad()
# 从后往前对轨迹中的状态进行遍历
for i in reversed(range(len(reward_list))):
reward = reward_list[i]
action = action_list[i]
state = tensor(state_list[i]).to(self.device)
# G为从当前动作开始直到游戏结束,总共的累计奖励
G = G * self.gamma + reward
# 这里只计算单个动作的概率的对数
log_prob = self.policy_net(state)[action].log()
# loss值
loss = -log_prob * G
# 反向计算梯度,每次循环的梯度会进行累积
loss.backward()
# 根据上面累积的梯度,更新网络参数
self.optimizer.step()
if __name__ == '__main__':
# 更新网络参数的学习率
learning_rate = 1e-3
# 训练轮次
num_episodes = 1000
# 隐藏层神经元数量
hidden_dim = 64
# 计算累计奖励时的折扣率
gamma = 0.98
# 如果存在cuda就用cuda,否则用cpu
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
env = gym.make('CartPole-v1')
# 获取状态维度,为4
state_dim = env.observation_space.shape[0]
# 获取离散动作数量,为2
action_dim = env.action_space.n
# 强化学习智能体
agent = Agent(state_dim, hidden_dim, action_dim, learning_rate, gamma, device)
for episode in range(num_episodes):
trajectory = {
'state_list': [],
'action_list': [],
'next_state_list': [],
'reward_list': [],
'terminated_list': []
}
# 统计信息,游戏结束时获得的总奖励
sum_reward = 0
# reset返回的是一个元组,第一个元素是初始state值,第二个元素是一个字典
state = env.reset()[0]
# 游戏终止信号
terminated = False
while not terminated:
action = agent.take_action(state)
next_state, reward, terminated, _, _ = env.step(action)
# 为序列中添加当前的状态、动作等信息
trajectory['state_list'].append(state)
trajectory['action_list'].append(action)
trajectory['next_state_list'].append(next_state)
trajectory['reward_list'].append(reward)
trajectory['terminated_list'].append(terminated)
sum_reward += reward
# 进入下一状态
state = next_state
# 根据轨迹更新策略网络
agent.update(trajectory)
# 每10轮打印一次统计信息
if episode % 10 == 0:
print(f"Episode: {episode}, Reward: {sum_reward}")