强化学习-价值学习算法
Sarsa
理论解释
Sarsa是基于时序差分算法的,它的公式非常简单且易理解,不像策略梯度算法那样需要复杂的推导过程。
Sarsa的核心函数是
Q
(
s
,
a
)
Q(s, a)
Q(s,a),它的含义是在状态
s
s
s下执行
a
a
a,在后续轨迹中获取的期望总奖励。时序差分算法的核心思想,就是用当前获得的奖励加上下一个状态的价值估计来作为当前状态的价值估计,因此有以下公式,其中
V
(
s
t
+
1
)
V(s_{t+1})
V(st+1)的含义是以状态
s
t
+
1
s_{t+1}
st+1为起点,在后续的轨迹中获取的期望总奖励。
Q
(
s
t
,
a
t
)
←
r
t
+
γ
⋅
V
(
s
t
+
1
)
Q(s_t, a_t) \leftarrow r_t + \gamma \cdot V(s_{t+1})
Q(st,at)←rt+γ⋅V(st+1)
在这里我们做一步近似,在相同策略下智能体实际采取的动作为
a
t
+
1
a_{t + 1}
at+1,那么我们认为
V
(
s
t
+
1
)
V(s_{t+1})
V(st+1)和
Q
(
s
t
+
1
,
a
t
+
1
)
Q(s_{t+1}, a_{t+1})
Q(st+1,at+1)是近似相等的,因此我们可以得到Sarsa算法的核心公式:
Q
(
s
t
,
a
t
)
←
r
t
+
γ
⋅
Q
(
s
t
+
1
,
a
t
+
1
)
Q(s_t, a_t) \leftarrow r_t + \gamma \cdot Q(s_{t+1}, a_{t+1})
Q(st,at)←rt+γ⋅Q(st+1,at+1)
在这里,我们使用神经网络来拟合 Q ( s , a ) Q(s, a) Q(s,a),在选取动作时采用 ϵ \epsilon ϵ-greedy策略,即有 ϵ \epsilon ϵ的概率随机选取一个动作, 1 − ϵ 1 - \epsilon 1−ϵ的概率选取 Q ( s , a ) Q(s, a) Q(s,a)最大的动作。
按照此策略我们在状态
s
t
s_t
st时选取动作
a
t
a_t
at,此时环境会返回状态
s
t
+
1
s_{t+1}
st+1,则再按照此策略选取动作
a
t
+
1
a_{t+1}
at+1,然后按照上述的公式来更新
Q
(
s
,
a
)
Q(s, a)
Q(s,a)参数。由于这里我们使用神经网络来拟合参数,所以我们更新的方式是计算loss值,然后进行梯度下降。如下面所示,其中
l
o
s
s
f
n
loss_{fn}
lossfn是指根据现有值和目标值来计算loss值的函数,在代码中采取的MSE均方误差函数。
q
v
a
l
u
e
=
Q
(
s
,
a
)
q_{value} = Q(s, a)
qvalue=Q(s,a)
q
t
a
r
g
e
t
=
r
t
+
γ
⋅
Q
(
s
t
+
1
,
a
t
+
1
)
q_{target} = r_t + \gamma \cdot Q(s_{t+1}, a_{t+1})
qtarget=rt+γ⋅Q(st+1,at+1)
l
o
s
s
=
l
o
s
s
f
n
(
q
v
a
l
u
e
,
q
t
a
r
g
e
t
)
loss = loss_{fn}(q_{value}, q_{target})
loss=lossfn(qvalue,qtarget)
代码
环境为python3.12,各依赖包均为最新版。
import random
import gymnasium as gym
import torch
import torch.nn as nn
from torch import tensor
class QNet(torch.nn.Module):
def __init__(self, action_state_dim, hidden_dim):
"""
网络的输入由action和state连接而成,网络的输出是长度为1的向量,代表 q value。
action用one-hot向量表示,例如动作空间为A = {0, 1, 2}时,
向量(1, 0, 0)和(0, 1, 0)分别代表动作a = 0和动作a = 1。
"""
super(QNet, self).__init__()
# 一个线性层 + 激活函数 + 一个线性层
self.network = nn.Sequential(
nn.Linear(action_state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
)
def forward(self, x):
x = self.network(x)
return x
class Agent:
def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, device, epsilon):
# 策略网络
self.action_value_net = QNet(state_dim + action_dim, hidden_dim).to(device)
# 创建优化器,优化器的作用是根据每个参数的梯度来更新参数
self.optimizer = torch.optim.Adam(self.action_value_net.parameters(), lr=learning_rate)
# 折扣因子
self.gamma = gamma
# 进行神经网络计算的设备
self.device = device
# 探索策略,有epsilon的概率随机选取动作
self.epsilon = epsilon
# 状态维度
self.state_dim = state_dim
# 动作维度
self.action_dim = action_dim
# 损失函数,根据当前值和目标值来计算得出损失值
self.loss_fn = nn.MSELoss()
def take_action(self, state):
# 随机探索
if random.random() < self.epsilon:
return random.choice(range(self.action_dim))
# 生成一个对角线矩阵,矩阵的每一行元素代表一个动作
actions = torch.eye(self.action_dim).to(self.device)
# 对state进行复制,actions中有多少个动作,就state复制为多少行
state = tensor(state, dtype=torch.float).to(self.device)
states = state.unsqueeze(0).repeat(actions.shape[0], 1)
# 连接actions和states矩阵,得到的action_states可以看做是一个batch的动作状态向量
action_states = torch.cat((actions, states), dim=1)
# 将一个batch的动作状态向量输入到Q网络中,得到一组Q值
# 注意q_values的形状是(batch_size, 1),我们将它转换成一维向量
q_values = self.action_value_net(action_states).view(-1)
# 获取最大Q值对应的下标,下标的值就是采取的最优动作
max_value, max_index = torch.max(q_values, dim=0)
return max_index.item()
def update(self, transition):
# 取出相关数据
reward = torch.tensor(transition['reward']).to(self.device)
state = torch.tensor(transition['state']).to(self.device)
next_state = torch.tensor(transition['next_state']).to(self.device)
terminated = transition['terminated']
# 将数字action转换成one-hot action向量
action = torch.zeros(self.action_dim, dtype=torch.float).to(self.device)
action[transition['action']] = 1.
# 将数字next_action转换成one-hot next_action向量
next_action = torch.zeros(self.action_dim, dtype=torch.float).to(self.device)
next_action[transition['next_action']] = 1.
# 连接action和state向量
action_state = torch.cat((action, state), dim=0)
next_action_state = torch.cat((next_action, next_state), dim=0)
# 获取Q值
q_value = self.action_value_net(action_state)[0]
# 计算目标Q值。一定要注意如果terminated为true,说明执行action后游戏就终止了
# 那么next_state和next_action是无意义的,它们的Q值应该为0
# 通过将Q值乘以(1. - float(terminated))的方式,来使其在终止时为0
q_target = reward + self.action_value_net(next_action_state)[0] * self.gamma \
* (1. - float(terminated))
# 计算损失值,第一个参数为当前Q值,第二个参数为目标Q值
loss = self.loss_fn(q_value, q_target)
# 更新参数
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
if __name__ == '__main__':
# 更新网络参数的学习率
learning_rate = 1e-3
# 训练轮次
num_episodes = 1000
# 隐藏层神经元数量
hidden_dim = 128
# 计算累计奖励时的折扣率
gamma = 0.98
epsilon = 0.2
# 如果存在cuda就用cuda,否则用cpu
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
env = gym.make('CartPole-v1')
# 获取状态维度,为4
state_dim = env.observation_space.shape[0]
# 获取离散动作数量,为2
action_dim = env.action_space.n
# 强化学习智能体
agent = Agent(state_dim, hidden_dim, action_dim, learning_rate, gamma, device, epsilon)
for episode in range(num_episodes):
# transition含义是,在state执行action后,环境返回reward、next_state、terminated
# 根据next_state,继续采取next_action作为下一动作
transition = {
'state': None,
'action': None,
'next_state': None,
'next_action': None,
'reward': None,
'terminated': None
}
# 统计信息,游戏结束时获得的总奖励
sum_reward = 0
# reset返回的是一个元组,第一个元素是初始state值,第二个元素是一个字典
state = env.reset()[0]
# 游戏终止信号
terminated = False
action = agent.take_action(state)
while not terminated:
next_state, reward, terminated, _, _ = env.step(action)
next_action = agent.take_action(next_state)
# 为transition中添加当前的状态、动作等信息
transition['state'] = state
transition['action'] = action
transition['next_state'] = next_state
transition['reward'] = reward
transition['next_action'] = next_action
transition['terminated'] = terminated
# 一定确保这里会学习到terminated为true的那一步
agent.update(transition)
sum_reward += reward
# 进入下一状态
state = next_state
action = next_action
# 每10轮打印一次统计信息
if episode % 10 == 0:
print(f"Episode: {episode}, Reward: {sum_reward}")
DQN
理论解释
DQN全程Deep Q Learning,与Sarsa算法十分类似,依然是使用时序差分算法来优化 Q ( s , a ) Q(s, a) Q(s,a)函数。不过DQN的 Q ( s , a ) Q(s, a) Q(s,a)函数含义和优化方式与Sarsa略有不同。
DQN中 Q ( s , a ) Q(s, a) Q(s,a)的含义是在状态 s s s执行动作 a a a后,在后续的轨迹中所能获得的最大累积奖励,为了作区分也有人把DQN的 Q ( s , a ) Q(s, a) Q(s,a)表示为 Q ⋆ ( s , a ) Q^\star(s, a) Q⋆(s,a),本文就不在作区分表示了。
DQN中
Q
(
s
,
a
)
Q(s, a)
Q(s,a)的时序差分优化过程如下,其中
A
A
A是动作空间:
Q
(
s
t
,
a
t
)
←
r
t
+
γ
⋅
max
a
′
∈
A
Q
(
s
t
+
1
,
a
′
)
Q(s_t, a_t) \leftarrow r_t + \gamma \cdot \max\limits_{a' \in A} Q(s_{t+1}, a')
Q(st,at)←rt+γ⋅a′∈AmaxQ(st+1,a′)
使用神经网络来拟合
Q
(
s
,
a
)
Q(s, a)
Q(s,a),在选取动作时依然采用
ϵ
\epsilon
ϵ-greedy策略。按照此策略我们在状态
s
t
s_t
st时选取动作
a
t
a_t
at,此时环境会返回状态
s
t
+
1
s_{t+1}
st+1,然后遍历所有的动作,选取
Q
(
s
t
+
1
,
a
′
)
Q(s_{t+1}, a')
Q(st+1,a′)最大的动作
a
′
a'
a′,然后计算loss值。
q
v
a
l
u
e
=
Q
(
s
,
a
)
q_{value} = Q(s, a)
qvalue=Q(s,a)
q
t
a
r
g
e
t
=
r
t
+
γ
⋅
max
a
′
∈
A
Q
(
s
t
+
1
,
a
′
)
q_{target} = r_t + \gamma \cdot \max\limits_{a' \in A} Q(s_{t+1}, a')
qtarget=rt+γ⋅a′∈AmaxQ(st+1,a′)
l
o
s
s
=
l
o
s
s
f
n
(
q
v
a
l
u
e
,
q
t
a
r
g
e
t
)
loss = loss_{fn}(q_{value}, q_{target})
loss=lossfn(qvalue,qtarget)
与Sarsa相同,损失函数的计算方式依然选择MSE均方误差。
代码
环境为python3.12,各依赖包均为最新版。
实现代码与Sarsa基本相同,仅有两处做了修改,修改位置已在代码中注释。
import random
import gymnasium as gym
import torch
import torch.nn as nn
from torch import tensor
class QNet(torch.nn.Module):
def __init__(self, action_state_dim, hidden_dim):
"""
网络的输入由action和state连接而成,网络的输出是长度为1的向量,代表 q value。
action用one-hot向量表示,例如动作空间为A = {0, 1, 2}时,
向量(1, 0, 0)和(0, 1, 0)分别代表动作a = 0和动作a = 1。
"""
super(QNet, self).__init__()
# 一个线性层 + 激活函数 + 一个线性层
self.network = nn.Sequential(
nn.Linear(action_state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 1),
)
def forward(self, x):
x = self.network(x)
return x
class Agent:
def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, device, epsilon):
# 策略网络
self.action_value_net = QNet(state_dim + action_dim, hidden_dim).to(device)
# 创建优化器,优化器的作用是根据每个参数的梯度来更新参数
self.optimizer = torch.optim.Adam(self.action_value_net.parameters(), lr=learning_rate)
# 折扣因子
self.gamma = gamma
# 进行神经网络计算的设备
self.device = device
# 探索策略,有epsilon的概率随机选取动作
self.epsilon = epsilon
# 状态维度
self.state_dim = state_dim
# 动作维度
self.action_dim = action_dim
# 损失函数,根据当前值和目标值来计算得出损失值
self.loss_fn = nn.MSELoss()
def take_action(self, state):
# 随机探索
if random.random() < self.epsilon:
return random.choice(range(self.action_dim))
# 生成一个对角线矩阵,矩阵的每一行元素代表一个动作
actions = torch.eye(self.action_dim).to(self.device)
# 对state进行复制,actions中有多少个动作,就state复制为多少行
state = tensor(state, dtype=torch.float).to(self.device)
states = state.unsqueeze(0).repeat(actions.shape[0], 1)
# 连接actions和states矩阵,得到的action_states可以看做是一个batch的动作状态向量
action_states = torch.cat((actions, states), dim=1)
# 将一个batch的动作状态向量输入到Q网络中,得到一组Q值
# 注意q_values的形状是(batch_size, 1),我们将它转换成一维向量
q_values = self.action_value_net(action_states).view(-1)
# 获取最大Q值对应的下标,下标的值就是采取的最优动作
max_value, max_index = torch.max(q_values, dim=0)
return max_index.item()
def update(self, transition):
# 取出相关数据
reward = torch.tensor(transition['reward']).to(self.device)
state = torch.tensor(transition['state']).to(self.device)
next_state = torch.tensor(transition['next_state']).to(self.device)
terminated = transition['terminated']
# 将数字action转换成one-hot action向量
action = torch.zeros(self.action_dim, dtype=torch.float).to(self.device)
action[transition['action']] = 1.
# 连接action和state向量
action_state = torch.cat((action, state), dim=0)
# 获取Q值
q_value = self.action_value_net(action_state)[0]
"""
与Sarsa算法主要不同的地方,在于q_target的计算方式:
类似于take_action函数中的内容,这里需要把所有动作都进行one-hot操作,与状态连接
并输入到网络中,获取所有动作的q_value中最大的值,作为计算q_target的一部分。
"""
next_actions = torch.eye(self.action_dim).to(self.device)
next_states = next_state.unsqueeze(0).repeat(next_actions.shape[0], 1)
next_action_states = torch.cat((next_actions, next_states), dim=1)
q_target = reward + torch.max(self.action_value_net(next_action_states)) \
* self.gamma * (1. - float(terminated))
# 计算损失值,第一个参数为当前Q值,第二个参数为目标Q值
loss = self.loss_fn(q_value, q_target)
# 更新参数
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
if __name__ == '__main__':
# 更新网络参数的学习率
learning_rate = 1e-3
# 训练轮次
num_episodes = 1000
# 隐藏层神经元数量
hidden_dim = 128
# 计算累计奖励时的折扣率
gamma = 0.98
epsilon = 0.2
# 如果存在cuda就用cuda,否则用cpu
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
env = gym.make('CartPole-v1')
# 获取状态维度,为4
state_dim = env.observation_space.shape[0]
# 获取离散动作数量,为2
action_dim = env.action_space.n
# 强化学习智能体
agent = Agent(state_dim, hidden_dim, action_dim, learning_rate, gamma, device, epsilon)
for episode in range(num_episodes):
# transition含义是,在state执行action后,环境返回reward、next_state、terminated
# 根据next_state,继续采取next_action作为下一动作
transition = {
'state': None,
'action': None,
'next_state': None,
'next_action': None,
'reward': None,
'terminated': None
}
# 统计信息,游戏结束时获得的总奖励
sum_reward = 0
# reset返回的是一个元组,第一个元素是初始state值,第二个元素是一个字典
state = env.reset()[0]
# 游戏终止信号
terminated = False
while not terminated:
"""与Sarsa算法略有不同的地方,这里不需要再获取next_action"""
action = agent.take_action(state)
next_state, reward, terminated, _, _ = env.step(action)
# 为transition中添加当前的状态、动作等信息
transition['state'] = state
transition['action'] = action
transition['next_state'] = next_state
transition['reward'] = reward
transition['terminated'] = terminated
# 一定确保这里会学习到terminated为true的那一步
agent.update(transition)
sum_reward += reward
# 进入下一状态
state = next_state
# 每10轮打印一次统计信息
if episode % 10 == 0:
print(f"Episode: {episode}, Reward: {sum_reward}")