当前位置: 首页 > news >正文

GA3C(GPU/CPU混合式异步优势Actor-Critic)算法实现控制倒立摆

GA3C算法实现倒立摆

完整代码在文章结尾

GA3C算法

GPU/CPU混合式异步优势AC算法,是由A3C算法进一步优化而来,为了更好利用GPU计算资源。
GA3C理论上与A3C相同,属于On-Policy。但由于存在延迟更新问题,导致用于策略更新的数据并非当前策略采样得到的,因此大致也可视作Off-Policy,而这也会导致算法的收敛困难。

GA3C算法主要由三部分组成:

  • 智能体:多个智能体分别与模拟的环境进行交互,但是每个智能体不需要策略网络来选择动作,而是将当前状态发送给预测序列,预测者会根据策略网络为预测序列中的请求提供决策。智能体会将探索的经验发送到训练序列,用于网络训练。
  • 预测者:从预测队列中批量获取决策请求,并输入决策网络,得到决策动作。批量的数据输入能够利用GPU的并行计算能力,提升计算效率。根据实际任务的请求数量,预测者和预测队列数目可以进行调整,来控制信息的处理速度,提升计算效率;
  • 训练者:训练者会将信息存储在训练队列中,并从中批量选取数据,用于整体策略网络和价值网络的模型训练。

GA3C算法的策略网络和价值网络优化的目标和梯度的形式是相同的。

状态价值网络梯度

对于状态价值的更新,最小化真实状态值和估计值之间的误差,而真实状态值由时序差分算法进行估计,状态价值梯度:
∇ w J ( w ) = ∇ w E [ 1 2 ( v π ( S ) − v ^ π ( S ) ) 2 ] \nabla_w J(w)=\nabla_wE[\dfrac{1}{2}(v_\pi(S)-\hat{v}_\pi(S))^2] wJ(w)=wE[21(vπ(S)v^π(S))2]

使用时序差分算法和随机梯度法近似:
∇ w J ( w ) = − ∇ w ( r t + γ v ^ t ( s t + 1 ) − v ^ t ( s t ) ) ∇ w v ^ t ( s t ) \nabla_w J(w)=-\nabla_w (r_t + \gamma \hat{v}_t(s_{t+1})-\hat{v}_t(s_t))\nabla_w \hat{v}_t(s_t) wJ(w)=w(rt+γv^t(st+1)v^t(st))wv^t(st)

能够用时序差分算法估计状态值,原因是贝尔曼期望方程:
v π ( s ) = E [ R t + 1 + γ v π ( S t + 1 ) ∣ S t = s ] v_\pi(s) = E[R_{t+1}+\gamma v_\pi(S_{t+1})|S_t=s] vπ(s)=E[Rt+1+γvπ(St+1)St=s]

策略梯度

GA3C的策略梯度表达式与A2C相同,可额外添加熵正则项。
∇ θ J ( θ ) = E [ ∇ θ ln ⁡ π ( A ∣ S ) ( q π ( S , A ) − v π ( S ) ) ] + α ∇ θ H ( π ( ⋅ ∣ S ) ) \nabla_\theta J(\theta) = E[\nabla_\theta \ln\pi(A|S)(q_\pi(S,A)-v_\pi(S))] + \alpha\nabla_\theta H(\pi(·|S)) θJ(θ)=E[θlnπ(AS)(qπ(S,A)vπ(S))]+αθH(π(S))

使用随机梯度法近似真实梯度:
∇ θ J ( θ ) = ∇ θ ln ⁡ π ( a t ∣ s t ) ( q π ( s t , a t ) − v π ( s t ) ) + α ∇ θ ∑ i = 0 N π ( a i ∣ s t ) ln ⁡ π ( a i ∣ s t ) \nabla_\theta J(\theta) = \nabla_\theta \ln\pi(a_t|s_t)(q_\pi(s_t,a_t)-v_\pi(s_t)) + \alpha\nabla_\theta \sum^N_{i=0}\pi(a_i|s_t)\ln\pi(a_i|s_t) θJ(θ)=θlnπ(atst)(qπ(st,at)vπ(st))+αθi=0Nπ(aist)lnπ(aist)

由于使用离散的动作空间,能够直接计算得到动作熵。

其中的动作状态价值能够用状态价值网络的进行估计,原因:
q π ( s , a ) = E [ R t + 1 + γ v π ( S t + 1 ) ∣ S t = s , A t = a ] q_\pi(s,a)=E[R_{t+1}+\gamma v_\pi(S_{t+1})|S_t=s,A_t=a] qπ(s,a)=E[Rt+1+γvπ(St+1)St=s,At=a]

因此策略梯度表达式:
∇ θ J ( θ ) = ∇ θ ln ⁡ π ( a t ∣ s t ) ( r t + γ v ^ t ( s t + 1 ) − v ^ t ( s t ) ) + α ∇ θ ∑ i = 0 N π ( a i ∣ s t ) ln ⁡ π ( a i ∣ s t ) \nabla_\theta J(\theta) = \nabla_\theta \ln\pi(a_t|s_t)(r_t + \gamma \hat{v}_t(s_{t+1})-\hat{v}_t(s_t)) + \alpha\nabla_\theta \sum^N_{i=0}\pi(a_i|s_t)\ln\pi(a_i|s_t) θJ(θ)=θlnπ(atst)(rt+γv^t(st+1)v^t(st))+αθi=0Nπ(aist)lnπ(aist)

策略梯度是最大化平均状态价值,因此实际使用Pytorch的反向传播时,需要取负

GA3C算法代码实践

Actor

演员使用一层隐藏层的神经网络。

class Actor(nn.Module):def __init__(self, state_dim, action_dim, hidden_dim=256):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, action_dim),nn.Softmax(dim=-1))def forward(self, x):result = self.net(x)return result
Critic

评论家也使用一层隐藏层的神经网络。

class Critic(nn.Module):def __init__(self, state_dim, hidden_dim=256):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, 1))def forward(self, x):return self.net(x).squeeze(-1)
预测者

预测者属于多线程,需要继承于threading.Thread

class Predict(Thread):def __init__(self, agent, pred_queue, device):super().__init__()self.pred_queue = pred_queueself.device = deviceself.flag = Falseself.agent = agentself.daemon = Truedef run(self):while not self.flag:if self.pred_queue.empty():continueGA3C_Id, state = self.pred_queue.get(timeout=0.5)state = torch.tensor(state, dtype=torch.float32).to(self.device)with torch.no_grad():prob = self.agent.actor(state)# prob为GPU上的tentor类型,传递到队列(通常在CPU上)时,会出现错误,导致数据丢失,所以需要先转移到CPU上self.agent.workers[GA3C_Id].wait_q.put(prob.cpu())

预测者通过传入的pred_queue,属于multiprocessing.Queue能够在多进程之间通信,从而接收多进程中的预测要求,并记录对应的进程。

通过调用主进程中的策略网络来进行决策得到动作,重新传回对应进程的等待队列中。

注意其中由策略得到的概率分布,在GPU上,而队列Queue在CPU上,因此在传入队列前,需要先转换到CPU上,防止数据丢失。

训练者

训练者也属于多线程,需要继承于threading.Thread

lass Train(Thread):def __init__(self, agent, train_queue, device, batch):super().__init__()self.train_queue = train_queueself.device = deviceself.flag = Falseself.agent = agentself.daemon = Trueself.train_list = []self.size = 0self.batch = batchdef run(self):while not self.flag:if self.train_queue.empty():continuestate, action, reward, next_state, done = self.train_queue.get(timeout=0.5)self.train_list.append((state, action, reward, next_state, done))self.size += 1if self.size >= self.batch:state, action, reward, next_state, done = zip(*self.train_list)self.train_list.clear()self.size = 0self.agent.train_model(np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done))

用于训练的数据是成批传输的,这样有助于策略网络和价值网络的稳定更新。

交互的智能体(工作者)

与环境进行交互的智能体,也可称为工作者,需要多个独立进程,继承于multiprocessing.Process

class Worker(Process):def __init__(self, id, pred_queue, train_queue, episode, gamma, max_step):super().__init__()self.pred_queue = pred_queueself.train_queue = train_queueself.id = idself.episode = episodeself.gamma = gammaself.wait_q = Queue(maxsize=1)self.max_step = max_stepdef take_action(self, state):self.pred_queue.put((self.id, state))while self.wait_q.empty():continueprob = self.wait_q.get(block=True)dist = torch.distributions.Categorical(prob)action = dist.sample().item()return actiondef run(self):env = gym.make('CartPole-v1')return_list = []for num in range(self.episode):step_num = 0state, info = env.reset()done = Falsereward_list = []while not done and step_num < self.max_step:action = self.take_action(state)step_num += 1next_state, reward, done, _, __ = env.step(action)self.train_queue.put((np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)))state = next_statereward_list.append(reward)res = 0for j in range(len(reward_list)-1, -1, -1):res = res * self.gamma + reward_list[j]print(f'{self.id}号,{num}轮回报:{res}   游戏进行步数:{step_num}')return_list.append(res)average = [np.mean(return_list[i:i+9])for i in range(0, len(return_list)-8)]epi = [x for x in range(len(average))]plt.plot(epi, average)plt.show()

工作者除了与环境进行交互,还需要从预测队列中接收状态,并调用主进程的策略网络进行决策得到动作。因此,多进程之间通讯十分重要。

主进程

将预测者、训练者和工作者集成到一个类GA3C中,进行统筹控制。

class GA3C:def __init__(self, max_data, state_dim, action_dim, episode, gamma, device, Worker_num, Predict_num, Train_num, lr, batch, lock, max_step):self.pred_queue = Queue(maxsize=max_data)self.train_queue = Queue(maxsize=max_data)self.episode = episodeself.gamma = gammaself.Worker_num = Worker_numself.Predict_num = Predict_numself.Train_num = Train_numself.device = deviceself.actor = Actor(state_dim, action_dim).to(device)self.critic = Critic(state_dim).to(device)self.initialize_net(self.actor, self.critic)self.optimizer = torch.optim.AdamW(list(self.actor.parameters()) + list(self.critic.parameters()),lr=lr)self.workers = []self.predicts = []self.trains = []self.batch = batchself.lock = lockself.max_step = max_stepdef train_model(self, state, action, reward, next_state, done):state = torch.tensor(state, dtype=torch.float32).to(self.device)action = torch.tensor(action, dtype=torch.int64).reshape((-1, 1)).to(self.device)reward = torch.tensor(reward, dtype=torch.float32).to(self.device)next_state = torch.tensor(next_state, dtype=torch.float32).to(self.device)done = torch.tensor(done, dtype=torch.float32).to(self.device)with self.lock:with torch.no_grad():TD_target = reward + self.gamma * \self.critic(next_state) * (1 - done)advantage = TD_target - self.critic(state)critic_loss = F.mse_loss(TD_target.detach(), self.critic(state))prob = self.actor(state)actor_loss = - torch.mean(advantage.detach()* torch.log(prob.gather(-1, action)) + 1e-7) + 0.005 * torch.sum(prob * torch.log(prob + 1e-7), dim=1, keepdim=True).mean()total_loss = actor_loss + critic_lossself.optimizer.zero_grad()total_loss.backward()self.optimizer.step()def initialize_net(self, actor, critic):def init_weights(m):if isinstance(m, nn.Linear):nn.init.xavier_uniform_(m.weight)nn.init.zeros_(m.bias)actor.apply(init_weights)critic.apply(init_weights)def main(self):for i in range(self.Worker_num):self.workers.append(Worker(i, self.pred_queue, self.train_queue, self.episode, self.gamma, self.max_step))self.workers[-1].start()for i in range(self.Predict_num):self.predicts.append(Predict(self, self.pred_queue, self.device))self.predicts[-1].start()for i in range(self.Train_num):self.trains.append(Train(self, self.train_queue, self.device, self.batch))self.trains[-1].start()for w in range(len(self.workers)):self.workers[-1].join()self.workers.pop()for i in range(len(self.predicts)):self.predicts[-1].flag = Trueself.predicts[-1].join()self.predicts.pop()for i in range(len(self.trains)):self.trains[-1].flag = Trueself.trains[-1].join()self.trains.pop()

在GA3C类中需要定义全局策略网络和价值网络,还需要定义策略和价值网络的更新函数,并在main()函数中指定好各工作者进程和预测及训练线程的开始与结束标志。

在更新函数中,策略网络损失函数计算时,需要在取对数的动作概率上加上一个大于0的小数,防止概率过小,而使得损失无穷大,导致算法无法正常收敛。

完整代码

import os
import torch
import torch.nn as nn
import numpy as np
import gym
from multiprocessing import Queue, Process
import torch.nn.functional as F
import threading
from threading import Thread
import matplotlib.pyplot as pltclass Actor(nn.Module):def __init__(self, state_dim, action_dim, hidden_dim=256):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, action_dim),nn.Softmax(dim=-1))def forward(self, x):result = self.net(x)return resultclass Critic(nn.Module):def __init__(self, state_dim, hidden_dim=256):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, hidden_dim), nn.LeakyReLU(),nn.Linear(hidden_dim, 1))def forward(self, x):return self.net(x).squeeze(-1)class Worker(Process):def __init__(self, id, pred_queue, train_queue, episode, gamma, max_step):super().__init__()self.pred_queue = pred_queueself.train_queue = train_queueself.id = idself.episode = episodeself.gamma = gammaself.wait_q = Queue(maxsize=1)self.max_step = max_stepdef take_action(self, state):self.pred_queue.put((self.id, state))while self.wait_q.empty():continueprob = self.wait_q.get(block=True)dist = torch.distributions.Categorical(prob)action = dist.sample().item()return actiondef run(self):env = gym.make('CartPole-v1')return_list = []for num in range(self.episode):step_num = 0state, info = env.reset()done = Falsereward_list = []while not done and step_num < self.max_step:action = self.take_action(state)step_num += 1next_state, reward, done, _, __ = env.step(action)self.train_queue.put((np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)))state = next_statereward_list.append(reward)res = 0for j in range(len(reward_list)-1, -1, -1):res = res * self.gamma + reward_list[j]print(f'{self.id}号,{num}轮回报:{res}   游戏进行步数:{step_num}')return_list.append(res)average = [np.mean(return_list[i:i+9])for i in range(0, len(return_list)-8)]epi = [x for x in range(len(average))]plt.plot(epi, average)plt.show()class Predict(Thread):def __init__(self, agent, pred_queue, device):super().__init__()self.pred_queue = pred_queueself.device = deviceself.flag = Falseself.agent = agentself.daemon = Truedef run(self):while not self.flag:if self.pred_queue.empty():continueGA3C_Id, state = self.pred_queue.get(timeout=0.5)state = torch.tensor(state, dtype=torch.float32).to(self.device)with torch.no_grad():prob = self.agent.actor(state)# prob为GPU上的tentor类型,传递到队列(通常在CPU上)时,会出现错误,导致数据丢失,所以需要先转移到CPU上self.agent.workers[GA3C_Id].wait_q.put(prob.cpu())class Train(Thread):def __init__(self, agent, train_queue, device, batch):super().__init__()self.train_queue = train_queueself.device = deviceself.flag = Falseself.agent = agentself.daemon = Trueself.train_list = []self.size = 0self.batch = batchdef run(self):while not self.flag:if self.train_queue.empty():continuestate, action, reward, next_state, done = self.train_queue.get(timeout=0.5)self.train_list.append((state, action, reward, next_state, done))self.size += 1if self.size >= self.batch:state, action, reward, next_state, done = zip(*self.train_list)self.train_list.clear()self.size = 0self.agent.train_model(np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done))class GA3C:def __init__(self, max_data, state_dim, action_dim, episode, gamma, device, Worker_num, Predict_num, Train_num, lr, batch, lock, max_step):self.pred_queue = Queue(maxsize=max_data)self.train_queue = Queue(maxsize=max_data)self.episode = episodeself.gamma = gammaself.Worker_num = Worker_numself.Predict_num = Predict_numself.Train_num = Train_numself.device = deviceself.actor = Actor(state_dim, action_dim).to(device)self.critic = Critic(state_dim).to(device)self.initialize_net(self.actor, self.critic)self.optimizer = torch.optim.AdamW(list(self.actor.parameters()) + list(self.critic.parameters()),lr=lr)self.workers = []self.predicts = []self.trains = []self.batch = batchself.lock = lockself.max_step = max_stepdef train_model(self, state, action, reward, next_state, done):state = torch.tensor(state, dtype=torch.float32).to(self.device)action = torch.tensor(action, dtype=torch.int64).reshape((-1, 1)).to(self.device)reward = torch.tensor(reward, dtype=torch.float32).to(self.device)next_state = torch.tensor(next_state, dtype=torch.float32).to(self.device)done = torch.tensor(done, dtype=torch.float32).to(self.device)with self.lock:with torch.no_grad():TD_target = reward + self.gamma * \self.critic(next_state) * (1 - done)advantage = TD_target - self.critic(state)critic_loss = F.mse_loss(TD_target.detach(), self.critic(state))prob = self.actor(state)actor_loss = - torch.mean(advantage.detach()* torch.log(prob.gather(-1, action)) + 1e-7) + 0.005 * torch.sum(prob * torch.log(prob + 1e-7), dim=1, keepdim=True).mean()total_loss = actor_loss + critic_lossself.optimizer.zero_grad()total_loss.backward()self.optimizer.step()def initialize_net(self, actor, critic):def init_weights(m):if isinstance(m, nn.Linear):nn.init.xavier_uniform_(m.weight)nn.init.zeros_(m.bias)actor.apply(init_weights)critic.apply(init_weights)def main(self):for i in range(self.Worker_num):self.workers.append(Worker(i, self.pred_queue, self.train_queue, self.episode, self.gamma, self.max_step))self.workers[-1].start()for i in range(self.Predict_num):self.predicts.append(Predict(self, self.pred_queue, self.device))self.predicts[-1].start()for i in range(self.Train_num):self.trains.append(Train(self, self.train_queue, self.device, self.batch))self.trains[-1].start()for w in range(len(self.workers)):self.workers[-1].join()self.workers.pop()for i in range(len(self.predicts)):self.predicts[-1].flag = Trueself.predicts[-1].join()self.predicts.pop()for i in range(len(self.trains)):self.trains[-1].flag = Trueself.trains[-1].join()self.trains.pop()if __name__ == '__main__':os.system('cls')max_data = 10000 # 预测队列和训练队列最大容量,不过一般取10左右即可Worker_num = 4Predict_num = 2Train_num = 4batch = 40  # 32max_step = 1500 # 限制单回合最大步数gamma = 0.99episode = 1300 # 500lr = 1e-4 # 极其重要,过大,可能会导致发散device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')env = gym.make('CartPole-v1')state_dim = env.observation_space.shape[0]action_dim = env.action_space.nenv.close()lock = threading.Lock()agent = GA3C(max_data, state_dim, action_dim, episode,gamma, device, Worker_num, Predict_num, Train_num, lr, batch, lock, max_step)agent.main()

注意,为了防止倒立摆游戏无限进行,需要增加最大步数限制max_step

学习率的取值也十分重要,较大的学习率可能会导致算法在越过局部极小值后,开始发散无法收敛,如下图。

下图是使用lr=0.0003进行训练的结果,纵坐标是单回合的回报,横坐标是回合数。
在这里插入图片描述

该图就描述了算法在越过局部极小值后,开始发散,导致回报无法收敛。

而使用lr=0.0001进行训练,除了训练速度可能降低外,算法也没出现发散的问题,回报能够正常收敛到稳定值。
在这里插入图片描述

注意事项

  • 一般batch设置在32~64之间即可,一批训练数据的大小,影响训练的稳定性。过多会导致一个随机动作被忽视,难以发现更好的动作;过少会导致算法的振荡,稳定性较差,可能难以收敛。
  • 工作者、预测者和训练者的数量按照实际任务需要进行设置。一般预测者和训练者的比例可选取1:1,1:2,2:1。具体可见原论文内容;
  • 虽然这里使用的单步TD进行更新,与原论文中的多步TD存在差异,但实际影响的是网络更新的稳定性,一般都能够正常更新和收敛的,影响不是很大。

相关文章:

  • Cppcheck 使用教程:本地 + CMake + GitHub Actions 自动分析实战
  • 创业知识概论
  • 【ing】Ubuntu安装Anaconda及环境配置\docker\pycharm
  • Qwen3 Embedding 结构-加载-训练 看透模型设计哲学
  • Linux——库文件生成和使用
  • ADC的传递函数
  • Spring 源码学习 2:Bean 后处理器
  • 大模型学习入门——Day3:注意力机制
  • 零基础入门PCB设计 一实践项目篇 第四章(STM32开发板PCB设计)
  • Seata:微服务分布式事务的解决方案
  • 饼图:数据可视化的“切蛋糕”艺术
  • 详解Redis的过期策略
  • 邮件合并----批量从excel表中导出数据到word中
  • Git——分布式版本控制工具
  • 【ESP32摄像头开发实例】-实现遥控视频小车
  • 让Agent的应用价值增长
  • 热点Key拆分方案实现
  • Excel批量计算时间差
  • 前端 CSS 框架:分类、选择与应用
  • tkinter 的 grid() 布局管理器学习指南
  • 网站制作导航超链接怎么做/网站优化培训
  • 网络公司做网站的合同/网站友情链接怎么弄
  • 好资源源码网站/百度会员登录入口
  • 风水网站建设多少钱/南昌百度快速排名提升
  • 石景山网站制作案例/阿里指数查询
  • 男主重生做代购网站的小说/成都业务网络推广平台