Gymnasium Cart Pole 环境与 REINFORCE 算法 —— 强化学习入门 2
Title: Gymnasium Cart Pole 环境与 REINFORCE 算法 —— 强化学习入门 2
文章目录
- I. Gymnasium Cart Pole 环境
- II. REINFORCE 算法
- 1. 原理说明
- 2. REINFORCE 算法实现
I. Gymnasium Cart Pole 环境
Gymnasium Cart Pole 环境是一个倒立摆的动力学仿真环境.
状态空间:
0: Cart Position
1: Cart Velocity
2: Pole Angle
3: Pole Angular Velocity
动作空间:
0: Push cart to the left
1: Push cart to the right
即时激励:
为了更长时间地保持倒立摆呈倒立状态, 每一时间步都是获得即时激励 +1
.
回合结束判据:
Termination: Pole Angle is greater than ±12°
Termination: Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)
Truncation: Episode length is greater than 200
II. REINFORCE 算法
1. 原理说明
REINFORCE 算法原理及 Python实现, 我们参考了 Foundations of Deep Reinforcement Learning: Theory and Practice in Python.
需要说明的是, 我们此处采用了 Improving REINFORCE
∇
θ
J
(
π
θ
)
≈
∑
t
=
0
T
(
R
t
(
τ
)
−
b
)
∇
θ
log
π
θ
(
a
t
∣
s
t
)
\nabla_{\theta} J(\pi_\theta) \approx \sum_{t=0}^{T} \left(R_t(\tau)-b\right) \nabla_{\theta}\log\pi_\theta(a_t|s_t)
∇θJ(πθ)≈t=0∑T(Rt(τ)−b)∇θlogπθ(at∣st)
其中
b
b
b 是整个轨迹上的回报均值, 是每条轨迹的常值基线.
b
=
1
T
∑
t
=
0
T
R
t
(
τ
)
b=\frac{1}{T} \sum_{t=0}^{T} R_t(\tau)
b=T1t=0∑TRt(τ)
另外, 我们设定连续 15 次倒立摆控制成功后, 结束 REINFORCE 算法训练, 并保存策略映射神经网络.
测试的时候, 加载已保存的策略映射神经网络, 加长测试时间步, 也都能较好控制倒立摆.
2. REINFORCE 算法实现
REINFORCE 算法的策略映射网络:
class Pi(nn.Module):
# a policy network to be optimized in reinforcement learning
# 待优化的策略网络
def __init__(self, in_dim, out_dim): # in_dim = 4, out_dim = 2
# super(Pi, self).__init__()
super().__init__()
# a policy network
layers = [
nn.Linear(in_dim, 64), # 4 -> 64
nn.ReLU(), # activation function
nn.Linear(64, out_dim), # 64 -> 2
]
self.model = nn.Sequential(*layers)
self.onpolicy_reset() # initialize memory
self.train() # Set the model to training mode
def onpolicy_reset(self):
self.log_probs = []
self.rewards = []
def forward(self, x): # x -> state
pdparam = self.model(x) # forward pass
return pdparam
# pdparam -> probability distribution
# such as the logits of a categorical distribution
def act(self, state):
# Convert the state from a NumPy array to a PyTorch tensor
# 由策略网络输出的采样动作和对数概率分布
x = torch.from_numpy(state.astype(np.float32))
# print("state: {}".format(state))
pdparam = self.forward(x) # Perform a forward pass through the neural network
# print("pdparam: {}".format(pdparam))
# to obtain the probability distribution parameters
pd = torch.distributions.Categorical(logits=pdparam) # probability distribution
# print("pd.probs: {}\t pd.logits: {}".format(pd.probs, pd.logits))
action = pd.sample() # pi(a|s) in action via pd
#calculates the log probability of the sampled action action under the probability distribution pd
#$\log(\pi_{\theta}(a_t|s_t))$
#where $\pi_{\theta}$ is the policy network,
# $a_t$ is the action at time step $t$,
# $s_t$ is the state at time step $t$
log_prob = pd.log_prob(action) # log_prob of pi(a|s), log_prob = pd.logits
self.log_probs.append(log_prob) # store for training
return action.item() # extracts the value of a single-element tensor as a scalar
对策略映射网络的方向传播训练:
def train(pi, optimizer):
# 以下利用蒙特卡洛法计算损失函数值,并利用梯度上升法更新策略网络参数
# 蒙特卡洛法需要采样多条轨迹来求损失函数的均值,但是为了简化只采样了一条轨迹当做均值
# Inner gradient-ascent loop of REINFORCE algorithm
T = len(pi.rewards)
rets = np.empty(T, dtype=np.float32) # Initialize returns
future_ret = 0.0
# compute the returns efficiently in reverse order
# R_t(\tau) = \Sigma_{t'=t}^{T} {\gamma^{t'-t} r_{t'}}
for t in reversed(range(T)):
future_ret = pi.rewards[t] + gamma * future_ret
rets[t] = future_ret
baseline = sum(rets) / T
rets = torch.tensor(rets)
rets = rets - baseline # modify the returns by subtracting a baseline
log_probs = torch.stack(pi.log_probs)
# - R_t(\tau) * log(\pi_{\theta}(a_t|s_t))
# Negative for maximizing
loss = - log_probs * rets
# - \Sigma_{t=0}^{T} [R_t(\tau) * log(\pi_{\theta}(a_t|s_t))]
loss = torch.sum(loss)
optimizer.zero_grad()
# backpropagate, compute gradients
# computes the gradients of the loss with respect to the model's parameters (\theta)
loss.backward()
# gradient-ascent, update the weights of the policy network
optimizer.step()
return loss
多回合强化学习训练, 连续多次控制倒立摆成功就结束整个 REINFORCE 算法的训练.
def train_main():
env = gym.make('CartPole-v1', render_mode="human")
in_dim = env.observation_space.shape[0] # 4
out_dim = env.action_space.n # 2
pi = Pi(in_dim, out_dim) # an ibstance of the policy network for REINFORCE algorithm
optimizer = optim.Adam(pi.parameters(), lr=0.01)
episode = 0
continuous_solved_episode = 0
# for epi in range(300): # episode = 300
while continuous_solved_episode <= 14:
# state = env.reset() # gym
state, _ = env.reset() # gymnasium
for t in range(200): # cartpole max timestep is 200
action = pi.act(state)
# state, reward, done, _ = env.step(action) # gym
state, reward, done, _, _ = env.step(action) # gymnasium
pi.rewards.append(reward)
env.render()
if done:
break
loss = train(pi, optimizer) # train per episode
total_reward = sum(pi.rewards)
solved = total_reward > 195.0
episode += 1
if solved:
continuous_solved_episode += 1
else:
continuous_solved_episode = 0
print(f'Episode {episode}, loss: {loss}, \
total_reward: {total_reward}, solved: {solved}, contnuous_solved: {continuous_solved_episode}')
pi.onpolicy_reset() # onpolicy: clear memory after training
save_model(pi)
一个简单的训练录屏

测试需要在神经网络的 evaluation 模式下进行, 测试中可以完成更长时间的倒立摆控制.
def test_process():
env = gym.make('CartPole-v1', render_mode="human")
# in_dim = env.observation_space.shape[0] # 4
# out_dim = env.action_space.n # 2
# pi_model = Pi(in_dim, out_dim)
pi_model = torch.load(model_path)
# set the model to evaluation mode
pi_model.eval()
# 进行前向传播
with torch.no_grad():
pi_model.onpolicy_reset() # onpolicy: clear memory after training
state, _ = env.reset() # gymnasium
steps = 600
for t in range(steps): # cartpole max timestep is 2000
action = pi_model.act(state)
state, reward, done, _, _ = env.step(action)
pi_model.rewards.append(reward)
env.render()
if done:
break
total_reward = sum(pi_model.rewards)
solved = total_reward >= steps
print(f'[Test] total_reward: {total_reward}, solved: {solved}')
一个简单的测试录屏

完整代码:
import gymnasium as gym
# import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import sys
gamma = 0.99 # discount factor
model_path = "./reinforce_pi.pt"
class Pi(nn.Module):
# a policy network to be optimized in reinforcement learning
# 待优化的策略网络
def __init__(self, in_dim, out_dim): # in_dim = 4, out_dim = 2
# super(Pi, self).__init__()
super().__init__()
# a policy network
layers = [
nn.Linear(in_dim, 64), # 4 -> 64
nn.ReLU(), # activation function
nn.Linear(64, out_dim), # 64 -> 2
]
self.model = nn.Sequential(*layers)
self.onpolicy_reset() # initialize memory
self.train() # Set the model to training mode
def onpolicy_reset(self):
self.log_probs = []
self.rewards = []
def forward(self, x): # x -> state
pdparam = self.model(x) # forward pass
return pdparam
# pdparam -> probability distribution
# such as the logits of a categorical distribution
def act(self, state):
# Convert the state from a NumPy array to a PyTorch tensor
# 由策略网络输出的采样动作和对数概率分布
x = torch.from_numpy(state.astype(np.float32))
# print("state: {}".format(state))
pdparam = self.forward(x) # Perform a forward pass through the neural network
# print("pdparam: {}".format(pdparam))
# to obtain the probability distribution parameters
pd = torch.distributions.Categorical(logits=pdparam) # probability distribution
# print("pd.probs: {}\t pd.logits: {}".format(pd.probs, pd.logits))
action = pd.sample() # pi(a|s) in action via pd
#calculates the log probability of the sampled action action under the probability distribution pd
#$\log(\pi_{\theta}(a_t|s_t))$
#where $\pi_{\theta}$ is the policy network,
# $a_t$ is the action at time step $t$,
# $s_t$ is the state at time step $t$
log_prob = pd.log_prob(action) # log_prob of pi(a|s), log_prob = pd.logits
self.log_probs.append(log_prob) # store for training
return action.item() # extracts the value of a single-element tensor as a scalar
def train(pi, optimizer):
# 以下利用蒙特卡洛法计算损失函数值,并利用梯度上升法更新策略网络参数
# 蒙特卡洛法需要采样多条轨迹来求损失函数的均值,但是为了简化只采样了一条轨迹当做均值
# Inner gradient-ascent loop of REINFORCE algorithm
T = len(pi.rewards)
rets = np.empty(T, dtype=np.float32) # Initialize returns
future_ret = 0.0
# compute the returns efficiently in reverse order
# R_t(\tau) = \Sigma_{t'=t}^{T} {\gamma^{t'-t} r_{t'}}
for t in reversed(range(T)):
future_ret = pi.rewards[t] + gamma * future_ret
rets[t] = future_ret
baseline = sum(rets) / T
rets = torch.tensor(rets)
rets = rets - baseline # modify the returns by subtracting a baseline
log_probs = torch.stack(pi.log_probs)
# - R_t(\tau) * log(\pi_{\theta}(a_t|s_t))
# Negative for maximizing
loss = - log_probs * rets
# - \Sigma_{t=0}^{T} [R_t(\tau) * log(\pi_{\theta}(a_t|s_t))]
loss = torch.sum(loss)
optimizer.zero_grad()
# backpropagate, compute gradients
# computes the gradients of the loss with respect to the model's parameters (\theta)
loss.backward()
# gradient-ascent, update the weights of the policy network
optimizer.step()
return loss
def save_model(pi):
print("pi.state_dict(): {}\n\n".format(pi.state_dict()))
for param_tensor in pi.state_dict():
print(param_tensor, "\t", pi.state_dict()[param_tensor].size())
torch.save(pi, model_path)
def train_main():
env = gym.make('CartPole-v1', render_mode="human")
in_dim = env.observation_space.shape[0] # 4
out_dim = env.action_space.n # 2
pi = Pi(in_dim, out_dim) # an ibstance of the policy network for REINFORCE algorithm
optimizer = optim.Adam(pi.parameters(), lr=0.01)
episode = 0
continuous_solved_episode = 0
# for epi in range(300): # episode = 300
while continuous_solved_episode <= 14:
# state = env.reset() # gym
state, _ = env.reset() # gymnasium
for t in range(200): # cartpole max timestep is 200
action = pi.act(state)
# state, reward, done, _ = env.step(action) # gym
state, reward, done, _, _ = env.step(action) # gymnasium
pi.rewards.append(reward)
env.render()
if done:
break
loss = train(pi, optimizer) # train per episode
total_reward = sum(pi.rewards)
solved = total_reward > 195.0
episode += 1
if solved:
continuous_solved_episode += 1
else:
continuous_solved_episode = 0
print(f'Episode {episode}, loss: {loss}, \
total_reward: {total_reward}, solved: {solved}, contnuous_solved: {continuous_solved_episode}')
pi.onpolicy_reset() # onpolicy: clear memory after training
save_model(pi)
def usage():
if len(sys.argv) != 2:
print("Usage: python ./REINFORCE.py --train/--test")
sys.exit()
mode = sys.argv[1]
return mode
def test_process():
env = gym.make('CartPole-v1', render_mode="human")
# in_dim = env.observation_space.shape[0] # 4
# out_dim = env.action_space.n # 2
# pi_model = Pi(in_dim, out_dim)
pi_model = torch.load(model_path)
# set the model to evaluation mode
pi_model.eval()
# 进行前向传播
with torch.no_grad():
pi_model.onpolicy_reset() # onpolicy: clear memory after training
state, _ = env.reset() # gymnasium
steps = 600
for t in range(steps): # cartpole max timestep is 2000
action = pi_model.act(state)
state, reward, done, _, _ = env.step(action)
pi_model.rewards.append(reward)
env.render()
if done:
break
total_reward = sum(pi_model.rewards)
solved = total_reward >= steps
print(f'[Test] total_reward: {total_reward}, solved: {solved}')
if __name__ == '__main__':
mode = usage()
if mode == "--train":
train_main()
elif mode == "--test":
test_process()
版权声明:本文为博主原创文章,遵循 CC 4.0 BY 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/woyaomaishu2/article/details/146382384
本文作者:wzf@robotics_notes