当前位置: 首页 > news >正文

Gymnasium Cart Pole 环境与 REINFORCE 算法 —— 强化学习入门 2

Title: Gymnasium Cart Pole 环境与 REINFORCE 算法 —— 强化学习入门 2


文章目录

  • I. Gymnasium Cart Pole 环境
  • II. REINFORCE 算法
    • 1. 原理说明
    • 2. REINFORCE 算法实现


I. Gymnasium Cart Pole 环境

Gymnasium Cart Pole 环境是一个倒立摆的动力学仿真环境.

状态空间:

0: Cart Position

1: Cart Velocity

2: Pole Angle

3: Pole Angular Velocity

动作空间:

0: Push cart to the left

1: Push cart to the right

即时激励:

为了更长时间地保持倒立摆呈倒立状态, 每一时间步都是获得即时激励 +1.

回合结束判据:

Termination: Pole Angle is greater than ±12°

Termination: Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)

Truncation: Episode length is greater than 200


II. REINFORCE 算法

1. 原理说明

REINFORCE 算法原理及 Python实现, 我们参考了 Foundations of Deep Reinforcement Learning: Theory and Practice in Python.
需要说明的是, 我们此处采用了 Improving REINFORCE
∇ θ J ( π θ ) ≈ ∑ t = 0 T ( R t ( τ ) − b ) ∇ θ log ⁡ π θ ( a t ∣ s t ) \nabla_{\theta} J(\pi_\theta) \approx \sum_{t=0}^{T} \left(R_t(\tau)-b\right) \nabla_{\theta}\log\pi_\theta(a_t|s_t) θJ(πθ)t=0T(Rt(τ)b)θlogπθ(atst)
其中 b b b 是整个轨迹上的回报均值, 是每条轨迹的常值基线.
b = 1 T ∑ t = 0 T R t ( τ ) b=\frac{1}{T} \sum_{t=0}^{T} R_t(\tau) b=T1t=0TRt(τ)
另外, 我们设定连续 15 次倒立摆控制成功后, 结束 REINFORCE 算法训练, 并保存策略映射神经网络.

测试的时候, 加载已保存的策略映射神经网络, 加长测试时间步, 也都能较好控制倒立摆.


2. REINFORCE 算法实现

REINFORCE 算法的策略映射网络:

class Pi(nn.Module):
    # a policy network to be optimized in reinforcement learning
    # 待优化的策略网络
    def __init__(self, in_dim, out_dim): # in_dim = 4, out_dim = 2
        # super(Pi, self).__init__()
        super().__init__()
        # a policy network
        layers = [
            nn.Linear(in_dim, 64), # 4 -> 64
            nn.ReLU(), # activation function
            nn.Linear(64, out_dim), # 64 -> 2
        ]
        self.model = nn.Sequential(*layers) 
        self.onpolicy_reset()  # initialize memory
        self.train()  # Set the model to training mode


    def onpolicy_reset(self):
        self.log_probs = []
        self.rewards = []


    def forward(self, x): # x -> state
        pdparam = self.model(x) # forward pass
        return pdparam
        # pdparam -> probability distribution
        # such as the logits of a categorical distribution


    def act(self, state):
        # Convert the state from a NumPy array to a PyTorch tensor
        # 由策略网络输出的采样动作和对数概率分布
        x = torch.from_numpy(state.astype(np.float32)) 
        # print("state: {}".format(state))
        pdparam = self.forward(x)     # Perform a forward pass through the neural network   
        # print("pdparam: {}".format(pdparam))
        
        # to obtain the probability distribution parameters
        pd = torch.distributions.Categorical(logits=pdparam) # probability distribution
        # print("pd.probs: {}\t pd.logits: {}".format(pd.probs, pd.logits))
        action = pd.sample()            # pi(a|s) in action via pd
        #calculates the log probability of the sampled action action under the probability distribution pd
        #$\log(\pi_{\theta}(a_t|s_t))$
        #where $\pi_{\theta}$ is the policy network,
        #	$a_t$ is the action at time step $t$,
        #	$s_t$ is the state at time step $t$
        log_prob = pd.log_prob(action)  # log_prob of pi(a|s), log_prob = pd.logits
        self.log_probs.append(log_prob) # store for training
        return action.item()  # extracts the value of a single-element tensor as a scalar

对策略映射网络的方向传播训练:

def train(pi, optimizer):
    # 以下利用蒙特卡洛法计算损失函数值,并利用梯度上升法更新策略网络参数
    # 蒙特卡洛法需要采样多条轨迹来求损失函数的均值,但是为了简化只采样了一条轨迹当做均值
    # Inner gradient-ascent loop of REINFORCE algorithm
    T = len(pi.rewards)
    rets = np.empty(T, dtype=np.float32)  # Initialize returns
    future_ret = 0.0
    # compute the returns efficiently in reverse order
    # R_t(\tau) = \Sigma_{t'=t}^{T} {\gamma^{t'-t} r_{t'}}
    for t in reversed(range(T)):
        future_ret = pi.rewards[t] + gamma * future_ret
        rets[t] = future_ret


    baseline = sum(rets) / T
    rets = torch.tensor(rets)
    rets = rets - baseline  # modify the returns by subtracting a baseline
    log_probs = torch.stack(pi.log_probs)
    # - R_t(\tau) * log(\pi_{\theta}(a_t|s_t))
    # Negative for maximizing
    loss = - log_probs * rets  
    #  - \Sigma_{t=0}^{T}  [R_t(\tau) * log(\pi_{\theta}(a_t|s_t))] 
    loss = torch.sum(loss)
    optimizer.zero_grad()
    # backpropagate, compute gradients
    # computes the gradients of the loss with respect to the model's parameters (\theta)
    loss.backward()   
    # gradient-ascent, update the weights of the policy network          
    optimizer.step()            
    return loss

多回合强化学习训练, 连续多次控制倒立摆成功就结束整个 REINFORCE 算法的训练.

def train_main():
    env = gym.make('CartPole-v1', render_mode="human")
    in_dim = env.observation_space.shape[0] # 4
    out_dim = env.action_space.n # 2
    pi = Pi(in_dim, out_dim)   # an ibstance of the policy network for REINFORCE algorithm
    optimizer = optim.Adam(pi.parameters(), lr=0.01)
    episode = 0
    continuous_solved_episode = 0
    # for epi in range(300): # episode = 300
    while continuous_solved_episode <= 14:
        # state = env.reset() # gym
        state, _ = env.reset()  # gymnasium
        for t in range(200):  # cartpole max timestep is 200
            action = pi.act(state)
            # state, reward, done, _ = env.step(action)  # gym
            state, reward, done, _, _ = env.step(action)  # gymnasium
            pi.rewards.append(reward)
            env.render()
            if done:
                break

        loss = train(pi, optimizer) # train per episode
        total_reward = sum(pi.rewards)   
        solved = total_reward > 195.0
        episode += 1
        if solved:
            continuous_solved_episode += 1
        else:
            continuous_solved_episode = 0
        print(f'Episode {episode}, loss: {loss}, \
        total_reward: {total_reward}, solved: {solved}, contnuous_solved: {continuous_solved_episode}')
        pi.onpolicy_reset()   # onpolicy: clear memory after training

    save_model(pi)

一个简单的训练录屏

REINFORCE_training

测试需要在神经网络的 evaluation 模式下进行, 测试中可以完成更长时间的倒立摆控制.

def test_process():
    env = gym.make('CartPole-v1', render_mode="human")
    # in_dim = env.observation_space.shape[0] # 4
    # out_dim = env.action_space.n # 2
    # pi_model = Pi(in_dim, out_dim)
    pi_model = torch.load(model_path)

    # set the model to evaluation mode
    pi_model.eval()


    # 进行前向传播
    with torch.no_grad():
        pi_model.onpolicy_reset()   # onpolicy: clear memory after training

        state, _ = env.reset()  # gymnasium
        steps = 600
        for t in range(steps):  # cartpole max timestep is 2000
            action = pi_model.act(state)
            state, reward, done, _, _ = env.step(action) 
            pi_model.rewards.append(reward)
            env.render()
            if done:
                break
        
        total_reward = sum(pi_model.rewards)   
        solved = total_reward >= steps

        print(f'[Test] total_reward: {total_reward}, solved: {solved}')

一个简单的测试录屏

REINFORCE_testing

完整代码:

import gymnasium as gym
# import gym

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import sys

gamma = 0.99 # discount factor
model_path = "./reinforce_pi.pt" 

class Pi(nn.Module):
    # a policy network to be optimized in reinforcement learning
    # 待优化的策略网络
    def __init__(self, in_dim, out_dim): # in_dim = 4, out_dim = 2
        # super(Pi, self).__init__()
        super().__init__()
        # a policy network
        layers = [
            nn.Linear(in_dim, 64), # 4 -> 64
            nn.ReLU(), # activation function
            nn.Linear(64, out_dim), # 64 -> 2
        ]
        self.model = nn.Sequential(*layers) 
        self.onpolicy_reset()  # initialize memory
        self.train()  # Set the model to training mode


    def onpolicy_reset(self):
        self.log_probs = []
        self.rewards = []


    def forward(self, x): # x -> state
        pdparam = self.model(x) # forward pass
        return pdparam
        # pdparam -> probability distribution
        # such as the logits of a categorical distribution


    def act(self, state):
        # Convert the state from a NumPy array to a PyTorch tensor
        # 由策略网络输出的采样动作和对数概率分布
        x = torch.from_numpy(state.astype(np.float32)) 
        # print("state: {}".format(state))
        pdparam = self.forward(x)     # Perform a forward pass through the neural network   
        # print("pdparam: {}".format(pdparam))
        
        # to obtain the probability distribution parameters
        pd = torch.distributions.Categorical(logits=pdparam) # probability distribution
        # print("pd.probs: {}\t pd.logits: {}".format(pd.probs, pd.logits))
        action = pd.sample()            # pi(a|s) in action via pd
        #calculates the log probability of the sampled action action under the probability distribution pd
        #$\log(\pi_{\theta}(a_t|s_t))$
        #where $\pi_{\theta}$ is the policy network,
        #	$a_t$ is the action at time step $t$,
        #	$s_t$ is the state at time step $t$
        log_prob = pd.log_prob(action)  # log_prob of pi(a|s), log_prob = pd.logits
        self.log_probs.append(log_prob) # store for training
        return action.item()  # extracts the value of a single-element tensor as a scalar


def train(pi, optimizer):
    # 以下利用蒙特卡洛法计算损失函数值,并利用梯度上升法更新策略网络参数
    # 蒙特卡洛法需要采样多条轨迹来求损失函数的均值,但是为了简化只采样了一条轨迹当做均值
    # Inner gradient-ascent loop of REINFORCE algorithm
    T = len(pi.rewards)
    rets = np.empty(T, dtype=np.float32)  # Initialize returns
    future_ret = 0.0
    # compute the returns efficiently in reverse order
    # R_t(\tau) = \Sigma_{t'=t}^{T} {\gamma^{t'-t} r_{t'}}
    for t in reversed(range(T)):
        future_ret = pi.rewards[t] + gamma * future_ret
        rets[t] = future_ret


    baseline = sum(rets) / T
    rets = torch.tensor(rets)
    rets = rets - baseline  # modify the returns by subtracting a baseline
    log_probs = torch.stack(pi.log_probs)
    # - R_t(\tau) * log(\pi_{\theta}(a_t|s_t))
    # Negative for maximizing
    loss = - log_probs * rets  
    #  - \Sigma_{t=0}^{T}  [R_t(\tau) * log(\pi_{\theta}(a_t|s_t))] 
    loss = torch.sum(loss)
    optimizer.zero_grad()
    # backpropagate, compute gradients
    # computes the gradients of the loss with respect to the model's parameters (\theta)
    loss.backward()   
    # gradient-ascent, update the weights of the policy network          
    optimizer.step()            
    return loss


def save_model(pi):
    print("pi.state_dict(): {}\n\n".format(pi.state_dict()))
    for param_tensor in pi.state_dict():
        print(param_tensor, "\t", pi.state_dict()[param_tensor].size())

    torch.save(pi, model_path)


def train_main():
    env = gym.make('CartPole-v1', render_mode="human")
    in_dim = env.observation_space.shape[0] # 4
    out_dim = env.action_space.n # 2
    pi = Pi(in_dim, out_dim)   # an ibstance of the policy network for REINFORCE algorithm
    optimizer = optim.Adam(pi.parameters(), lr=0.01)
    episode = 0
    continuous_solved_episode = 0
    # for epi in range(300): # episode = 300
    while continuous_solved_episode <= 14:
        # state = env.reset() # gym
        state, _ = env.reset()  # gymnasium
        for t in range(200):  # cartpole max timestep is 200
            action = pi.act(state)
            # state, reward, done, _ = env.step(action)  # gym
            state, reward, done, _, _ = env.step(action)  # gymnasium
            pi.rewards.append(reward)
            env.render()
            if done:
                break

        loss = train(pi, optimizer) # train per episode
        total_reward = sum(pi.rewards)   
        solved = total_reward > 195.0
        episode += 1
        if solved:
            continuous_solved_episode += 1
        else:
            continuous_solved_episode = 0
        print(f'Episode {episode}, loss: {loss}, \
        total_reward: {total_reward}, solved: {solved}, contnuous_solved: {continuous_solved_episode}')
        pi.onpolicy_reset()   # onpolicy: clear memory after training

    save_model(pi)


def usage():
    if len(sys.argv) != 2:
        print("Usage: python ./REINFORCE.py --train/--test")
        sys.exit()

    mode = sys.argv[1]
    return mode 


def test_process():
    env = gym.make('CartPole-v1', render_mode="human")
    # in_dim = env.observation_space.shape[0] # 4
    # out_dim = env.action_space.n # 2
    # pi_model = Pi(in_dim, out_dim)
    pi_model = torch.load(model_path)

    # set the model to evaluation mode
    pi_model.eval()


    # 进行前向传播
    with torch.no_grad():
        pi_model.onpolicy_reset()   # onpolicy: clear memory after training

        state, _ = env.reset()  # gymnasium
        steps = 600
        for t in range(steps):  # cartpole max timestep is 2000
            action = pi_model.act(state)
            state, reward, done, _, _ = env.step(action) 
            pi_model.rewards.append(reward)
            env.render()
            if done:
                break
        
        total_reward = sum(pi_model.rewards)   
        solved = total_reward >= steps

        print(f'[Test] total_reward: {total_reward}, solved: {solved}')


if __name__ == '__main__':
    mode = usage()
    if mode == "--train":
        train_main()
    elif mode == "--test":
        test_process()

版权声明:本文为博主原创文章,遵循 CC 4.0 BY 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/woyaomaishu2/article/details/146382384
本文作者:wzf@robotics_notes

相关文章:

  • 【视频】OrinNX+Ubuntu20.04:移植OpenCV-4.11.0 with CUDA(含opencv_contrib )
  • 教材结构化解读
  • scrollIntoView 的behavior都有哪些属性
  • 高性能边缘计算网关-高算力web组态PLC网关
  • 《通用去条纹算法:兼容自然图像与荧光图像的频域滤波方法》
  • 走进Java:String字符串的基本使用
  • 2000-2016年各省地方财政营业税数据
  • Rust + 时序数据库 TDengine:打造高性能时序数据处理利器
  • SpringBoot 第二课(Ⅰ) 整合springmvc(详解)
  • WorkManager 系列之一
  • 《AI赋能云原生区块链,引领供应链溯源革新》
  • 设备健康管理系统是什么,设备健康管理系统多少钱?
  • 开源模型应用落地-shieldgemma-2-4b-it模型小试-多模态内容安全检测(一)
  • 2610.转换二维数组
  • 蓝桥杯2023年第十四届省赛真题-阶乘的和
  • 【LangChain入门 1】安装
  • mac brew 安装的php@7.4 打开redis扩展
  • Elasticsearch8.17 集群重启操作
  • Linux 权限的概念
  • 数据库GreenDao的使用、升级、以及相关常用注释说明
  • 巴基斯坦首都及邻近城市听到巨大爆炸声
  • 2025年度上海市住房城乡建设管理委工程系列中级职称评审工作启动
  • 远离军事前线的另一面暗斗:除了“断水”,印度还试图牵制对巴国际援助
  • 国防部:奉劝有关国家不要引狼入室,甘当棋子
  • 公募基金解读“一揽子金融政策”:增量财政空间或打开,有助于维持A股活力
  • 美联储连续第三次维持利率不变,警示关税影响