GA3C(GPU/CPU混合式异步优势Actor-Critic)算法实现控制倒立摆
GA3C算法实现倒立摆
完整代码在文章结尾
GA3C算法
GPU/CPU混合式异步优势AC算法,是由A3C算法进一步优化而来,为了更好利用GPU计算资源。
GA3C理论上与A3C相同,属于On-Policy。但由于存在延迟更新问题,导致用于策略更新的数据并非当前策略采样得到的,因此大致也可视作Off-Policy,而这也会导致算法的收敛困难。
GA3C算法主要由三部分组成:
- 智能体:多个智能体分别与模拟的环境进行交互,但是每个智能体不需要策略网络来选择动作,而是将当前状态发送给预测序列,预测者会根据策略网络为预测序列中的请求提供决策。智能体会将探索的经验发送到训练序列,用于网络训练。
- 预测者:从预测队列中批量获取决策请求,并输入决策网络,得到决策动作。批量的数据输入能够利用GPU的并行计算能力,提升计算效率。根据实际任务的请求数量,预测者和预测队列数目可以进行调整,来控制信息的处理速度,提升计算效率;
- 训练者:训练者会将信息存储在训练队列中,并从中批量选取数据,用于整体策略网络和价值网络的模型训练。
GA3C算法的策略网络和价值网络优化的目标和梯度的形式是相同的。
状态价值网络梯度
对于状态价值的更新,最小化真实状态值和估计值之间的误差,而真实状态值由时序差分算法进行估计,状态价值梯度:
∇ w J ( w ) = ∇ w E [ 1 2 ( v π ( S ) − v ^ π ( S ) ) 2 ] \nabla_w J(w)=\nabla_wE[\dfrac{1}{2}(v_\pi(S)-\hat{v}_\pi(S))^2] ∇wJ(w)=∇wE[21(vπ(S)−v^π(S))2]
使用时序差分算法和随机梯度法近似:
∇ w J ( w ) = − ∇ w ( r t + γ v ^ t ( s t + 1 ) − v ^ t ( s t ) ) ∇ w v ^ t ( s t ) \nabla_w J(w)=-\nabla_w (r_t + \gamma \hat{v}_t(s_{t+1})-\hat{v}_t(s_t))\nabla_w \hat{v}_t(s_t) ∇wJ(w)=−∇w(rt+γv^t(st+1)−v^t(st))∇wv^t(st)
能够用时序差分算法估计状态值,原因是贝尔曼期望方程:
v π ( s ) = E [ R t + 1 + γ v π ( S t + 1 ) ∣ S t = s ] v_\pi(s) = E[R_{t+1}+\gamma v_\pi(S_{t+1})|S_t=s] vπ(s)=E[Rt+1+γvπ(St+1)∣St=s]
策略梯度
GA3C的策略梯度表达式与A2C相同,可额外添加熵正则项。
∇ θ J ( θ ) = E [ ∇ θ ln π ( A ∣ S ) ( q π ( S , A ) − v π ( S ) ) ] + α ∇ θ H ( π ( ⋅ ∣ S ) ) \nabla_\theta J(\theta) = E[\nabla_\theta \ln\pi(A|S)(q_\pi(S,A)-v_\pi(S))] + \alpha\nabla_\theta H(\pi(·|S)) ∇θJ(θ)=E[∇θlnπ(A∣S)(qπ(S,A)−vπ(S))]+α∇θH(π(⋅∣S))
使用随机梯度法近似真实梯度:
∇ θ J ( θ ) = ∇ θ ln π ( a t ∣ s t ) ( q π ( s t , a t ) − v π ( s t ) ) + α ∇ θ ∑ i = 0 N π ( a i ∣ s t ) ln π ( a i ∣ s t ) \nabla_\theta J(\theta) = \nabla_\theta \ln\pi(a_t|s_t)(q_\pi(s_t,a_t)-v_\pi(s_t)) + \alpha\nabla_\theta \sum^N_{i=0}\pi(a_i|s_t)\ln\pi(a_i|s_t) ∇θJ(θ)=∇θlnπ(at∣st)(qπ(st,at)−vπ(st))+α∇θi=0∑Nπ(ai∣st)lnπ(ai∣st)
由于使用离散的动作空间,能够直接计算得到动作熵。
其中的动作状态价值能够用状态价值网络的进行估计,原因:
q π ( s , a ) = E [ R t + 1 + γ v π ( S t + 1 ) ∣ S t = s , A t = a ] q_\pi(s,a)=E[R_{t+1}+\gamma v_\pi(S_{t+1})|S_t=s,A_t=a] qπ(s,a)=E[Rt+1+γvπ(St+1)∣St=s,At=a]
因此策略梯度表达式:
∇ θ J ( θ ) = ∇ θ ln π ( a t ∣ s t ) ( r t + γ v ^ t ( s t + 1 ) − v ^ t ( s t ) ) + α ∇ θ ∑ i = 0 N π ( a i ∣ s t ) ln π ( a i ∣ s t ) \nabla_\theta J(\theta) = \nabla_\theta \ln\pi(a_t|s_t)(r_t + \gamma \hat{v}_t(s_{t+1})-\hat{v}_t(s_t)) + \alpha\nabla_\theta \sum^N_{i=0}\pi(a_i|s_t)\ln\pi(a_i|s_t) ∇θJ(θ)=∇θlnπ(at∣st)(rt+γv^t(st+1)−v^t(st))+α∇θi=0∑Nπ(ai∣st)lnπ(ai∣st)
策略梯度是最大化平均状态价值,因此实际使用Pytorch的反向传播时,需要取负。
GA3C算法代码实践
Actor
演员使用一层隐藏层的神经网络。
class Actor(nn.Module):def __init__(self, state_dim, action_dim, hidden_dim=256):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, action_dim),nn.Softmax(dim=-1))def forward(self, x):result = self.net(x)return result
Critic
评论家也使用一层隐藏层的神经网络。
class Critic(nn.Module):def __init__(self, state_dim, hidden_dim=256):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, 1))def forward(self, x):return self.net(x).squeeze(-1)
预测者
预测者属于多线程,需要继承于threading.Thread
。
class Predict(Thread):def __init__(self, agent, pred_queue, device):super().__init__()self.pred_queue = pred_queueself.device = deviceself.flag = Falseself.agent = agentself.daemon = Truedef run(self):while not self.flag:if self.pred_queue.empty():continueGA3C_Id, state = self.pred_queue.get(timeout=0.5)state = torch.tensor(state, dtype=torch.float32).to(self.device)with torch.no_grad():prob = self.agent.actor(state)# prob为GPU上的tentor类型,传递到队列(通常在CPU上)时,会出现错误,导致数据丢失,所以需要先转移到CPU上self.agent.workers[GA3C_Id].wait_q.put(prob.cpu())
预测者通过传入的pred_queue
,属于multiprocessing.Queue
能够在多进程之间通信,从而接收多进程中的预测要求,并记录对应的进程。
通过调用主进程中的策略网络来进行决策得到动作,重新传回对应进程的等待队列中。
注意其中由策略得到的概率分布,在GPU上,而队列Queue
在CPU上,因此在传入队列前,需要先转换到CPU上,防止数据丢失。
训练者
训练者也属于多线程,需要继承于threading.Thread
。
lass Train(Thread):def __init__(self, agent, train_queue, device, batch):super().__init__()self.train_queue = train_queueself.device = deviceself.flag = Falseself.agent = agentself.daemon = Trueself.train_list = []self.size = 0self.batch = batchdef run(self):while not self.flag:if self.train_queue.empty():continuestate, action, reward, next_state, done = self.train_queue.get(timeout=0.5)self.train_list.append((state, action, reward, next_state, done))self.size += 1if self.size >= self.batch:state, action, reward, next_state, done = zip(*self.train_list)self.train_list.clear()self.size = 0self.agent.train_model(np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done))
用于训练的数据是成批传输的,这样有助于策略网络和价值网络的稳定更新。
交互的智能体(工作者)
与环境进行交互的智能体,也可称为工作者,需要多个独立进程,继承于multiprocessing.Process
。
class Worker(Process):def __init__(self, id, pred_queue, train_queue, episode, gamma, max_step):super().__init__()self.pred_queue = pred_queueself.train_queue = train_queueself.id = idself.episode = episodeself.gamma = gammaself.wait_q = Queue(maxsize=1)self.max_step = max_stepdef take_action(self, state):self.pred_queue.put((self.id, state))while self.wait_q.empty():continueprob = self.wait_q.get(block=True)dist = torch.distributions.Categorical(prob)action = dist.sample().item()return actiondef run(self):env = gym.make('CartPole-v1')return_list = []for num in range(self.episode):step_num = 0state, info = env.reset()done = Falsereward_list = []while not done and step_num < self.max_step:action = self.take_action(state)step_num += 1next_state, reward, done, _, __ = env.step(action)self.train_queue.put((np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)))state = next_statereward_list.append(reward)res = 0for j in range(len(reward_list)-1, -1, -1):res = res * self.gamma + reward_list[j]print(f'{self.id}号,{num}轮回报:{res} 游戏进行步数:{step_num}')return_list.append(res)average = [np.mean(return_list[i:i+9])for i in range(0, len(return_list)-8)]epi = [x for x in range(len(average))]plt.plot(epi, average)plt.show()
工作者除了与环境进行交互,还需要从预测队列中接收状态,并调用主进程的策略网络进行决策得到动作。因此,多进程之间通讯十分重要。
主进程
将预测者、训练者和工作者集成到一个类GA3C中,进行统筹控制。
class GA3C:def __init__(self, max_data, state_dim, action_dim, episode, gamma, device, Worker_num, Predict_num, Train_num, lr, batch, lock, max_step):self.pred_queue = Queue(maxsize=max_data)self.train_queue = Queue(maxsize=max_data)self.episode = episodeself.gamma = gammaself.Worker_num = Worker_numself.Predict_num = Predict_numself.Train_num = Train_numself.device = deviceself.actor = Actor(state_dim, action_dim).to(device)self.critic = Critic(state_dim).to(device)self.initialize_net(self.actor, self.critic)self.optimizer = torch.optim.AdamW(list(self.actor.parameters()) + list(self.critic.parameters()),lr=lr)self.workers = []self.predicts = []self.trains = []self.batch = batchself.lock = lockself.max_step = max_stepdef train_model(self, state, action, reward, next_state, done):state = torch.tensor(state, dtype=torch.float32).to(self.device)action = torch.tensor(action, dtype=torch.int64).reshape((-1, 1)).to(self.device)reward = torch.tensor(reward, dtype=torch.float32).to(self.device)next_state = torch.tensor(next_state, dtype=torch.float32).to(self.device)done = torch.tensor(done, dtype=torch.float32).to(self.device)with self.lock:with torch.no_grad():TD_target = reward + self.gamma * \self.critic(next_state) * (1 - done)advantage = TD_target - self.critic(state)critic_loss = F.mse_loss(TD_target.detach(), self.critic(state))prob = self.actor(state)actor_loss = - torch.mean(advantage.detach()* torch.log(prob.gather(-1, action)) + 1e-7) + 0.005 * torch.sum(prob * torch.log(prob + 1e-7), dim=1, keepdim=True).mean()total_loss = actor_loss + critic_lossself.optimizer.zero_grad()total_loss.backward()self.optimizer.step()def initialize_net(self, actor, critic):def init_weights(m):if isinstance(m, nn.Linear):nn.init.xavier_uniform_(m.weight)nn.init.zeros_(m.bias)actor.apply(init_weights)critic.apply(init_weights)def main(self):for i in range(self.Worker_num):self.workers.append(Worker(i, self.pred_queue, self.train_queue, self.episode, self.gamma, self.max_step))self.workers[-1].start()for i in range(self.Predict_num):self.predicts.append(Predict(self, self.pred_queue, self.device))self.predicts[-1].start()for i in range(self.Train_num):self.trains.append(Train(self, self.train_queue, self.device, self.batch))self.trains[-1].start()for w in range(len(self.workers)):self.workers[-1].join()self.workers.pop()for i in range(len(self.predicts)):self.predicts[-1].flag = Trueself.predicts[-1].join()self.predicts.pop()for i in range(len(self.trains)):self.trains[-1].flag = Trueself.trains[-1].join()self.trains.pop()
在GA3C类中需要定义全局策略网络和价值网络,还需要定义策略和价值网络的更新函数,并在main()
函数中指定好各工作者进程和预测及训练线程的开始与结束标志。
在更新函数中,策略网络损失函数计算时,需要在取对数的动作概率上加上一个大于0的小数,防止概率过小,而使得损失无穷大,导致算法无法正常收敛。
完整代码
import os
import torch
import torch.nn as nn
import numpy as np
import gym
from multiprocessing import Queue, Process
import torch.nn.functional as F
import threading
from threading import Thread
import matplotlib.pyplot as pltclass Actor(nn.Module):def __init__(self, state_dim, action_dim, hidden_dim=256):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, action_dim),nn.Softmax(dim=-1))def forward(self, x):result = self.net(x)return resultclass Critic(nn.Module):def __init__(self, state_dim, hidden_dim=256):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, 1))def forward(self, x):return self.net(x).squeeze(-1)class Worker(Process):def __init__(self, id, pred_queue, train_queue, episode, gamma, max_step):super().__init__()self.pred_queue = pred_queueself.train_queue = train_queueself.id = idself.episode = episodeself.gamma = gammaself.wait_q = Queue(maxsize=1)self.max_step = max_stepdef take_action(self, state):self.pred_queue.put((self.id, state))while self.wait_q.empty():continueprob = self.wait_q.get(block=True)dist = torch.distributions.Categorical(prob)action = dist.sample().item()return actiondef run(self):env = gym.make('CartPole-v1')return_list = []for num in range(self.episode):step_num = 0state, info = env.reset()done = Falsereward_list = []while not done and step_num < self.max_step:action = self.take_action(state)step_num += 1next_state, reward, done, _, __ = env.step(action)self.train_queue.put((np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)))state = next_statereward_list.append(reward)res = 0for j in range(len(reward_list)-1, -1, -1):res = res * self.gamma + reward_list[j]print(f'{self.id}号,{num}轮回报:{res} 游戏进行步数:{step_num}')return_list.append(res)average = [np.mean(return_list[i:i+9])for i in range(0, len(return_list)-8)]epi = [x for x in range(len(average))]plt.plot(epi, average)plt.show()class Predict(Thread):def __init__(self, agent, pred_queue, device):super().__init__()self.pred_queue = pred_queueself.device = deviceself.flag = Falseself.agent = agentself.daemon = Truedef run(self):while not self.flag:if self.pred_queue.empty():continueGA3C_Id, state = self.pred_queue.get(timeout=0.5)state = torch.tensor(state, dtype=torch.float32).to(self.device)with torch.no_grad():prob = self.agent.actor(state)# prob为GPU上的tentor类型,传递到队列(通常在CPU上)时,会出现错误,导致数据丢失,所以需要先转移到CPU上self.agent.workers[GA3C_Id].wait_q.put(prob.cpu())class Train(Thread):def __init__(self, agent, train_queue, device, batch):super().__init__()self.train_queue = train_queueself.device = deviceself.flag = Falseself.agent = agentself.daemon = Trueself.train_list = []self.size = 0self.batch = batchdef run(self):while not self.flag:if self.train_queue.empty():continuestate, action, reward, next_state, done = self.train_queue.get(timeout=0.5)self.train_list.append((state, action, reward, next_state, done))self.size += 1if self.size >= self.batch:state, action, reward, next_state, done = zip(*self.train_list)self.train_list.clear()self.size = 0self.agent.train_model(np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done))class GA3C:def __init__(self, max_data, state_dim, action_dim, episode, gamma, device, Worker_num, Predict_num, Train_num, lr, batch, lock, max_step):self.pred_queue = Queue(maxsize=max_data)self.train_queue = Queue(maxsize=max_data)self.episode = episodeself.gamma = gammaself.Worker_num = Worker_numself.Predict_num = Predict_numself.Train_num = Train_numself.device = deviceself.actor = Actor(state_dim, action_dim).to(device)self.critic = Critic(state_dim).to(device)self.initialize_net(self.actor, self.critic)self.optimizer = torch.optim.AdamW(list(self.actor.parameters()) + list(self.critic.parameters()),lr=lr)self.workers = []self.predicts = []self.trains = []self.batch = batchself.lock = lockself.max_step = max_stepdef train_model(self, state, action, reward, next_state, done):state = torch.tensor(state, dtype=torch.float32).to(self.device)action = torch.tensor(action, dtype=torch.int64).reshape((-1, 1)).to(self.device)reward = torch.tensor(reward, dtype=torch.float32).to(self.device)next_state = torch.tensor(next_state, dtype=torch.float32).to(self.device)done = torch.tensor(done, dtype=torch.float32).to(self.device)with self.lock:with torch.no_grad():TD_target = reward + self.gamma * \self.critic(next_state) * (1 - done)advantage = TD_target - self.critic(state)critic_loss = F.mse_loss(TD_target.detach(), self.critic(state))prob = self.actor(state)actor_loss = - torch.mean(advantage.detach()* torch.log(prob.gather(-1, action)) + 1e-7) + 0.005 * torch.sum(prob * torch.log(prob + 1e-7), dim=1, keepdim=True).mean()total_loss = actor_loss + critic_lossself.optimizer.zero_grad()total_loss.backward()self.optimizer.step()def initialize_net(self, actor, critic):def init_weights(m):if isinstance(m, nn.Linear):nn.init.xavier_uniform_(m.weight)nn.init.zeros_(m.bias)actor.apply(init_weights)critic.apply(init_weights)def main(self):for i in range(self.Worker_num):self.workers.append(Worker(i, self.pred_queue, self.train_queue, self.episode, self.gamma, self.max_step))self.workers[-1].start()for i in range(self.Predict_num):self.predicts.append(Predict(self, self.pred_queue, self.device))self.predicts[-1].start()for i in range(self.Train_num):self.trains.append(Train(self, self.train_queue, self.device, self.batch))self.trains[-1].start()for w in range(len(self.workers)):self.workers[-1].join()self.workers.pop()for i in range(len(self.predicts)):self.predicts[-1].flag = Trueself.predicts[-1].join()self.predicts.pop()for i in range(len(self.trains)):self.trains[-1].flag = Trueself.trains[-1].join()self.trains.pop()if __name__ == '__main__':os.system('cls')max_data = 10000 # 预测队列和训练队列最大容量,不过一般取10左右即可Worker_num = 4Predict_num = 2Train_num = 4batch = 40 # 32max_step = 1500 # 限制单回合最大步数gamma = 0.99episode = 1300 # 500lr = 1e-4 # 极其重要,过大,可能会导致发散device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')env = gym.make('CartPole-v1')state_dim = env.observation_space.shape[0]action_dim = env.action_space.nenv.close()lock = threading.Lock()agent = GA3C(max_data, state_dim, action_dim, episode,gamma, device, Worker_num, Predict_num, Train_num, lr, batch, lock, max_step)agent.main()
注意,为了防止倒立摆游戏无限进行,需要增加最大步数限制max_step
。
学习率的取值也十分重要,较大的学习率可能会导致算法在越过局部极小值后,开始发散无法收敛,如下图。
下图是使用lr=0.0003
进行训练的结果,纵坐标是单回合的回报,横坐标是回合数。
该图就描述了算法在越过局部极小值后,开始发散,导致回报无法收敛。
而使用lr=0.0001
进行训练,除了训练速度可能降低外,算法也没出现发散的问题,回报能够正常收敛到稳定值。
注意事项:
- 一般
batch
设置在32~64之间即可,一批训练数据的大小,影响训练的稳定性。过多会导致一个随机动作被忽视,难以发现更好的动作;过少会导致算法的振荡,稳定性较差,可能难以收敛。 - 工作者、预测者和训练者的数量按照实际任务需要进行设置。一般预测者和训练者的比例可选取1:1,1:2,2:1。具体可见原论文内容;
- 虽然这里使用的单步TD进行更新,与原论文中的多步TD存在差异,但实际影响的是网络更新的稳定性,一般都能够正常更新和收敛的,影响不是很大。