当前位置: 首页 > news >正文

强化学习-价值学习算法

Sarsa

理论解释

Sarsa是基于时序差分算法的,它的公式非常简单且易理解,不像策略梯度算法那样需要复杂的推导过程。

Sarsa的核心函数是 Q ( s , a ) Q(s, a) Q(s,a),它的含义是在状态 s s s下执行 a a a,在后续轨迹中获取的期望总奖励。时序差分算法的核心思想,就是用当前获得的奖励加上下一个状态的价值估计来作为当前状态的价值估计,因此有以下公式,其中 V ( s t + 1 ) V(s_{t+1}) V(st+1)的含义是以状态 s t + 1 s_{t+1} st+1为起点,在后续的轨迹中获取的期望总奖励。
Q ( s t , a t ) ← r t + γ ⋅ V ( s t + 1 ) Q(s_t, a_t) \leftarrow r_t + \gamma \cdot V(s_{t+1}) Q(st,at)rt+γV(st+1)

在这里我们做一步近似,在相同策略下智能体实际采取的动作为 a t + 1 a_{t + 1} at+1,那么我们认为 V ( s t + 1 ) V(s_{t+1}) V(st+1) Q ( s t + 1 , a t + 1 ) Q(s_{t+1}, a_{t+1}) Q(st+1,at+1)是近似相等的,因此我们可以得到Sarsa算法的核心公式:
Q ( s t , a t ) ← r t + γ ⋅ Q ( s t + 1 , a t + 1 ) Q(s_t, a_t) \leftarrow r_t + \gamma \cdot Q(s_{t+1}, a_{t+1}) Q(st,at)rt+γQ(st+1,at+1)

在这里,我们使用神经网络来拟合 Q ( s , a ) Q(s, a) Q(s,a),在选取动作时采用 ϵ \epsilon ϵ-greedy策略,即有 ϵ \epsilon ϵ的概率随机选取一个动作, 1 − ϵ 1 - \epsilon 1ϵ的概率选取 Q ( s , a ) Q(s, a) Q(s,a)最大的动作。

按照此策略我们在状态 s t s_t st时选取动作 a t a_t at,此时环境会返回状态 s t + 1 s_{t+1} st+1,则再按照此策略选取动作 a t + 1 a_{t+1} at+1,然后按照上述的公式来更新 Q ( s , a ) Q(s, a) Q(s,a)参数。由于这里我们使用神经网络来拟合参数,所以我们更新的方式是计算loss值,然后进行梯度下降。如下面所示,其中 l o s s f n loss_{fn} lossfn是指根据现有值和目标值来计算loss值的函数,在代码中采取的MSE均方误差函数。
q v a l u e = Q ( s , a ) q_{value} = Q(s, a) qvalue=Q(s,a)
q t a r g e t = r t + γ ⋅ Q ( s t + 1 , a t + 1 ) q_{target} = r_t + \gamma \cdot Q(s_{t+1}, a_{t+1}) qtarget=rt+γQ(st+1,at+1)
l o s s = l o s s f n ( q v a l u e , q t a r g e t ) loss = loss_{fn}(q_{value}, q_{target}) loss=lossfn(qvalue,qtarget)

代码

环境为python3.12,各依赖包均为最新版。

import random
import gymnasium as gym
import torch
import torch.nn as nn
from torch import tensor

class QNet(torch.nn.Module):
    def __init__(self, action_state_dim, hidden_dim):
        """
        网络的输入由action和state连接而成,网络的输出是长度为1的向量,代表 q value。
        action用one-hot向量表示,例如动作空间为A = {0, 1, 2}时,
        向量(1, 0, 0)和(0, 1, 0)分别代表动作a = 0和动作a = 1。
        """
        super(QNet, self).__init__()
        # 一个线性层 + 激活函数 + 一个线性层
        self.network = nn.Sequential(
            nn.Linear(action_state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
        )

    def forward(self, x):
        x = self.network(x)
        return x

class Agent:
    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, device, epsilon):
        # 策略网络
        self.action_value_net = QNet(state_dim + action_dim, hidden_dim).to(device)
        # 创建优化器,优化器的作用是根据每个参数的梯度来更新参数
        self.optimizer = torch.optim.Adam(self.action_value_net.parameters(), lr=learning_rate)
        # 折扣因子
        self.gamma = gamma
        # 进行神经网络计算的设备
        self.device = device
        # 探索策略,有epsilon的概率随机选取动作
        self.epsilon = epsilon
        # 状态维度
        self.state_dim = state_dim
        # 动作维度
        self.action_dim = action_dim
        # 损失函数,根据当前值和目标值来计算得出损失值
        self.loss_fn = nn.MSELoss()

    def take_action(self, state):
        # 随机探索
        if random.random() < self.epsilon:
            return random.choice(range(self.action_dim))
        # 生成一个对角线矩阵,矩阵的每一行元素代表一个动作
        actions = torch.eye(self.action_dim).to(self.device)
        # 对state进行复制,actions中有多少个动作,就state复制为多少行
        state = tensor(state, dtype=torch.float).to(self.device)
        states = state.unsqueeze(0).repeat(actions.shape[0], 1)
        # 连接actions和states矩阵,得到的action_states可以看做是一个batch的动作状态向量
        action_states = torch.cat((actions, states), dim=1)
        # 将一个batch的动作状态向量输入到Q网络中,得到一组Q值
        # 注意q_values的形状是(batch_size, 1),我们将它转换成一维向量
        q_values = self.action_value_net(action_states).view(-1)
        # 获取最大Q值对应的下标,下标的值就是采取的最优动作
        max_value, max_index = torch.max(q_values, dim=0)
        return max_index.item()

    def update(self, transition):
        # 取出相关数据
        reward = torch.tensor(transition['reward']).to(self.device)
        state = torch.tensor(transition['state']).to(self.device)
        next_state = torch.tensor(transition['next_state']).to(self.device)
        terminated = transition['terminated']
        # 将数字action转换成one-hot action向量
        action = torch.zeros(self.action_dim, dtype=torch.float).to(self.device)
        action[transition['action']] = 1.
        # 将数字next_action转换成one-hot next_action向量
        next_action = torch.zeros(self.action_dim, dtype=torch.float).to(self.device)
        next_action[transition['next_action']] = 1.
        # 连接action和state向量
        action_state = torch.cat((action, state), dim=0)
        next_action_state = torch.cat((next_action, next_state), dim=0)
        # 获取Q值
        q_value = self.action_value_net(action_state)[0]
        # 计算目标Q值。一定要注意如果terminated为true,说明执行action后游戏就终止了
        # 那么next_state和next_action是无意义的,它们的Q值应该为0
        # 通过将Q值乘以(1. - float(terminated))的方式,来使其在终止时为0
        q_target = reward + self.action_value_net(next_action_state)[0] * self.gamma \
                   * (1. - float(terminated))
        # 计算损失值,第一个参数为当前Q值,第二个参数为目标Q值
        loss = self.loss_fn(q_value, q_target)
        # 更新参数
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

if __name__ == '__main__':
    # 更新网络参数的学习率
    learning_rate = 1e-3
    # 训练轮次
    num_episodes = 1000
    # 隐藏层神经元数量
    hidden_dim = 128
    # 计算累计奖励时的折扣率
    gamma = 0.98
    epsilon = 0.2
    # 如果存在cuda就用cuda,否则用cpu
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    env = gym.make('CartPole-v1')
    # 获取状态维度,为4
    state_dim = env.observation_space.shape[0]
    # 获取离散动作数量,为2
    action_dim = env.action_space.n
    # 强化学习智能体
    agent = Agent(state_dim, hidden_dim, action_dim, learning_rate, gamma, device, epsilon)
    for episode in range(num_episodes):
        # transition含义是,在state执行action后,环境返回reward、next_state、terminated
        # 根据next_state,继续采取next_action作为下一动作
        transition = {
            'state': None,
            'action': None,
            'next_state': None,
            'next_action': None,
            'reward': None,
            'terminated': None
        }
        # 统计信息,游戏结束时获得的总奖励
        sum_reward = 0
        # reset返回的是一个元组,第一个元素是初始state值,第二个元素是一个字典
        state = env.reset()[0]
        # 游戏终止信号
        terminated = False
        action = agent.take_action(state)
        while not terminated:
            next_state, reward, terminated, _, _ = env.step(action)
            next_action = agent.take_action(next_state)
            # 为transition中添加当前的状态、动作等信息
            transition['state'] = state
            transition['action'] = action
            transition['next_state'] = next_state
            transition['reward'] = reward
            transition['next_action'] = next_action
            transition['terminated'] = terminated
            # 一定确保这里会学习到terminated为true的那一步
            agent.update(transition)
            sum_reward += reward
            # 进入下一状态
            state = next_state
            action = next_action
        # 每10轮打印一次统计信息
        if episode % 10 == 0:
            print(f"Episode: {episode}, Reward: {sum_reward}")

DQN

理论解释

DQN全程Deep Q Learning,与Sarsa算法十分类似,依然是使用时序差分算法来优化 Q ( s , a ) Q(s, a) Q(s,a)函数。不过DQN的 Q ( s , a ) Q(s, a) Q(s,a)函数含义和优化方式与Sarsa略有不同。

DQN中 Q ( s , a ) Q(s, a) Q(s,a)的含义是在状态 s s s执行动作 a a a后,在后续的轨迹中所能获得的最大累积奖励,为了作区分也有人把DQN的 Q ( s , a ) Q(s, a) Q(s,a)表示为 Q ⋆ ( s , a ) Q^\star(s, a) Q(s,a),本文就不在作区分表示了。

DQN中 Q ( s , a ) Q(s, a) Q(s,a)的时序差分优化过程如下,其中 A A A是动作空间:
Q ( s t , a t ) ← r t + γ ⋅ max ⁡ a ′ ∈ A Q ( s t + 1 , a ′ ) Q(s_t, a_t) \leftarrow r_t + \gamma \cdot \max\limits_{a' \in A} Q(s_{t+1}, a') Q(st,at)rt+γaAmaxQ(st+1,a)

使用神经网络来拟合 Q ( s , a ) Q(s, a) Q(s,a),在选取动作时依然采用 ϵ \epsilon ϵ-greedy策略。按照此策略我们在状态 s t s_t st时选取动作 a t a_t at,此时环境会返回状态 s t + 1 s_{t+1} st+1,然后遍历所有的动作,选取 Q ( s t + 1 , a ′ ) Q(s_{t+1}, a') Q(st+1,a)最大的动作 a ′ a' a,然后计算loss值。
q v a l u e = Q ( s , a ) q_{value} = Q(s, a) qvalue=Q(s,a)
q t a r g e t = r t + γ ⋅ max ⁡ a ′ ∈ A Q ( s t + 1 , a ′ ) q_{target} = r_t + \gamma \cdot \max\limits_{a' \in A} Q(s_{t+1}, a') qtarget=rt+γaAmaxQ(st+1,a)
l o s s = l o s s f n ( q v a l u e , q t a r g e t ) loss = loss_{fn}(q_{value}, q_{target}) loss=lossfn(qvalue,qtarget)

与Sarsa相同,损失函数的计算方式依然选择MSE均方误差。

代码

环境为python3.12,各依赖包均为最新版。
实现代码与Sarsa基本相同,仅有两处做了修改,修改位置已在代码中注释。

import random
import gymnasium as gym
import torch
import torch.nn as nn
from torch import tensor

class QNet(torch.nn.Module):
    def __init__(self, action_state_dim, hidden_dim):
        """
        网络的输入由action和state连接而成,网络的输出是长度为1的向量,代表 q value。
        action用one-hot向量表示,例如动作空间为A = {0, 1, 2}时,
        向量(1, 0, 0)和(0, 1, 0)分别代表动作a = 0和动作a = 1。
        """
        super(QNet, self).__init__()
        # 一个线性层 + 激活函数 + 一个线性层
        self.network = nn.Sequential(
            nn.Linear(action_state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
        )

    def forward(self, x):
        x = self.network(x)
        return x

class Agent:
    def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, device, epsilon):
        # 策略网络
        self.action_value_net = QNet(state_dim + action_dim, hidden_dim).to(device)
        # 创建优化器,优化器的作用是根据每个参数的梯度来更新参数
        self.optimizer = torch.optim.Adam(self.action_value_net.parameters(), lr=learning_rate)
        # 折扣因子
        self.gamma = gamma
        # 进行神经网络计算的设备
        self.device = device
        # 探索策略,有epsilon的概率随机选取动作
        self.epsilon = epsilon
        # 状态维度
        self.state_dim = state_dim
        # 动作维度
        self.action_dim = action_dim
        # 损失函数,根据当前值和目标值来计算得出损失值
        self.loss_fn = nn.MSELoss()

    def take_action(self, state):
        # 随机探索
        if random.random() < self.epsilon:
            return random.choice(range(self.action_dim))
        # 生成一个对角线矩阵,矩阵的每一行元素代表一个动作
        actions = torch.eye(self.action_dim).to(self.device)
        # 对state进行复制,actions中有多少个动作,就state复制为多少行
        state = tensor(state, dtype=torch.float).to(self.device)
        states = state.unsqueeze(0).repeat(actions.shape[0], 1)
        # 连接actions和states矩阵,得到的action_states可以看做是一个batch的动作状态向量
        action_states = torch.cat((actions, states), dim=1)
        # 将一个batch的动作状态向量输入到Q网络中,得到一组Q值
        # 注意q_values的形状是(batch_size, 1),我们将它转换成一维向量
        q_values = self.action_value_net(action_states).view(-1)
        # 获取最大Q值对应的下标,下标的值就是采取的最优动作
        max_value, max_index = torch.max(q_values, dim=0)
        return max_index.item()

    def update(self, transition):
        # 取出相关数据
        reward = torch.tensor(transition['reward']).to(self.device)
        state = torch.tensor(transition['state']).to(self.device)
        next_state = torch.tensor(transition['next_state']).to(self.device)
        terminated = transition['terminated']
        # 将数字action转换成one-hot action向量
        action = torch.zeros(self.action_dim, dtype=torch.float).to(self.device)
        action[transition['action']] = 1.
        # 连接action和state向量
        action_state = torch.cat((action, state), dim=0)
        # 获取Q值
        q_value = self.action_value_net(action_state)[0]
        """
        与Sarsa算法主要不同的地方,在于q_target的计算方式:
        类似于take_action函数中的内容,这里需要把所有动作都进行one-hot操作,与状态连接
        并输入到网络中,获取所有动作的q_value中最大的值,作为计算q_target的一部分。
        """
        next_actions = torch.eye(self.action_dim).to(self.device)
        next_states = next_state.unsqueeze(0).repeat(next_actions.shape[0], 1)
        next_action_states = torch.cat((next_actions, next_states), dim=1)
        q_target = reward + torch.max(self.action_value_net(next_action_states)) \
                   * self.gamma * (1. - float(terminated))
        # 计算损失值,第一个参数为当前Q值,第二个参数为目标Q值
        loss = self.loss_fn(q_value, q_target)
        # 更新参数
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

if __name__ == '__main__':
    # 更新网络参数的学习率
    learning_rate = 1e-3
    # 训练轮次
    num_episodes = 1000
    # 隐藏层神经元数量
    hidden_dim = 128
    # 计算累计奖励时的折扣率
    gamma = 0.98
    epsilon = 0.2
    # 如果存在cuda就用cuda,否则用cpu
    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    env = gym.make('CartPole-v1')
    # 获取状态维度,为4
    state_dim = env.observation_space.shape[0]
    # 获取离散动作数量,为2
    action_dim = env.action_space.n
    # 强化学习智能体
    agent = Agent(state_dim, hidden_dim, action_dim, learning_rate, gamma, device, epsilon)
    for episode in range(num_episodes):
        # transition含义是,在state执行action后,环境返回reward、next_state、terminated
        # 根据next_state,继续采取next_action作为下一动作
        transition = {
            'state': None,
            'action': None,
            'next_state': None,
            'next_action': None,
            'reward': None,
            'terminated': None
        }
        # 统计信息,游戏结束时获得的总奖励
        sum_reward = 0
        # reset返回的是一个元组,第一个元素是初始state值,第二个元素是一个字典
        state = env.reset()[0]
        # 游戏终止信号
        terminated = False
        while not terminated:
            """与Sarsa算法略有不同的地方,这里不需要再获取next_action"""
            action = agent.take_action(state)
            next_state, reward, terminated, _, _ = env.step(action)
            # 为transition中添加当前的状态、动作等信息
            transition['state'] = state
            transition['action'] = action
            transition['next_state'] = next_state
            transition['reward'] = reward
            transition['terminated'] = terminated
            # 一定确保这里会学习到terminated为true的那一步
            agent.update(transition)
            sum_reward += reward
            # 进入下一状态
            state = next_state
        # 每10轮打印一次统计信息
        if episode % 10 == 0:
            print(f"Episode: {episode}, Reward: {sum_reward}")

相关文章:

  • 电路元器件知识:稳压二极管
  • 开题报告——基于Spring Boot的社区居民健康管理平台的设计与实现
  • NPM环境搭建指南
  • 在 Ubuntu 22.04 中修改主机名称(hostname)
  • C++基础知识学习记录—模版和泛型编程
  • Three.js 快速入门教程【一】开启你的 3D Web 开发之旅
  • Redis_基础
  • C语言(结构体)
  • unity学习47:寻路和导航,unity2022后版本如何使用 Navmesh 和 bake
  • Java基础——代理模式
  • 【优先级队列】任务分配
  • sentinel集成nacos做持久化配置
  • rk3576上部署deepseek
  • systemverilog刷题小记
  • CodeGPT 使用教程(适用于 VSCode)
  • Kafka 简介
  • 波导阵列天线 学习笔记9 使用紧凑高效率馈网的宽带圆极化阵列天线
  • 【面试题】2025.02.19-前端面试题汇总
  • 图论(四):图的中心性——度中心性介数中心性紧密中心性
  • 学习总结2.19
  • 未来之城湖州,正在书写怎样的城市未来
  • 屈晓华履新四川省社科联党组书记,此前担任省国动办主任
  • 复旦发文缅怀文科杰出教授裘锡圭:曾提出治学需具备三种精神
  • 欧盟公布对美关税反制清单,瞄准美国飞机等产品
  • 普雷沃斯特当选新一任天主教罗马教皇
  • 万达电影:股东杭州臻希拟减持不超1.3927%公司股份