当前位置: 首页 > news >正文

强化学习入门-1-CartPole-v1(DQN)

强化学习项目-1-CartPole-v1(DQN)

环境

本环境是OpenAI Gym提供的一个经典控制环境。

官网链接:https://gymnasium.farama.org/environments/classic_control/cart_pole/

观测空间(状态S)

状态共包含 4 4 4个参数:

  • 车位置(Cart Position)
  • 车速(Cart Velocity)
  • 杆子的角度(Pole Angle)
  • 角速度(Pole Angular Velocity)

动作空间(动作A)

  • 0: 推动车向左移动
  • 1: 推动车向右移动

奖励

每坚持一步,环境将会给出 1 1 1点奖励,最大可以获得 500 500 500奖励,同时只要达到 200 200 200就视为达到通过门槛。

引入环境

下载包
pip install gymnasium
导入
import gymnasium as gym
env = gym.make("CartPole-v1", render_mode="human")
# 获取状态维度和动作维度
state_dim  = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n
action_dim = env.action_space.n

Q网络

定义

这里 Q Q Q网络仅为一个替代 Q Q Q函数的预测神经网络,对于状态 s s s预测所有的 Q ( s , a ) Q(s, a) Q(s,a)

双网络结构

为了确保 Q Q Q值的稳定性,一般会使用两个神经网络:

  • Q Q Q网络:用于估计当前策略的 Q Q Q值的网络
  • 目标网络:用于提高稳定的目标 Q Q Q值的网络

简单来说,就是由 Q Q Q网络输出预测值,由目标网络预测结果作为真实值,并且每次训练仅更新 Q Q Q网络,每经过若干轮训练后再将 Q Q Q网络参数复制到目标网络。

代码实现

这里网络采用两层隐藏层,维度均为 128 128 128,激活函数为Relu

class Qnet(nn.Module):def __init__(self, hidden_dim = 128):super(Qnet, self).__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.ReLU(),nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),nn.Linear(hidden_dim, action_dim))def forward(self, x):return self.net(x)

经验回放池

定义

用于存储和重复利用历史交互数据的数据结构。

它把智能体与环境交互产生的经验元组(通常形如 ( s , a , r , s ′ , d o n e ) (s, a, r, s^{\prime}, done) (s,a,r,s,done))暂存起来,并在后续训练中以随机小批量的形式反复抽取,用于更新策略或价值函数。

代码实现

经验回放池共包含3个函数:

  • 初始化:创建一个双端队列存储数据,并设置最大容量
  • 添加数据:将经验元组放入双端队列,如果超过容量先进行删除操作
  • 随机采样:随机采样 b a t c h s i z e batch \; size batchsize组数据,转换后成张量后返回
class ReplayBuffer(object):def __init__(self, max_size = 50000):self.max_size = max_sizeself.buffer = deque(maxlen = max_size)def add(self, state, action, reward, next_state, done):if self.__len__() >= self.max_size:self.buffer.popleft()self.buffer.append((state, action, reward, next_state, done))def sample(self, batch_size, device = 'mps'):indices = np.random.choice(len(self.buffer), batch_size, replace=True)batch = [self.buffer[i] for i in indices]states, actions, rewards, next_states, dones = zip(*batch)return (torch.FloatTensor(states).to(device),torch.LongTensor(actions).to(device),torch.FloatTensor(rewards).to(device),torch.FloatTensor(next_states).to(device),torch.FloatTensor(dones).to(device))

DQN算法

定义

DQN算法的核心就是使用神经网络替代了Q函数,用于预测 Q ( s , a ) Q(s,a) Q(s,a)

初始化

定义时将所有需要的参数设置好。

定义好两个网络,设置好优化器,折扣因子等等。

class DQN():def __init__(self, lr = 3e-4,gamma = 0.98, epsilon = 0.1, batch_size = 128, update_epochs = 50):self.q_net = Qnet()self.target_q_net = Qnet()self.target_q_net.load_state_dict(self.q_net.state_dict())self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr)self.gamma = gammaself.epsilon = epsilonself.batch_size = batch_sizeself.update_epochs = update_epochsself.loss = nn.MSELoss()self.memory = ReplayBuffer()self.learnstep = 0

动作选择

这里使用 ϵ \epsilon ϵ贪心策略进行动作选择, ϵ \epsilon ϵ在训练时动态更新。

def choose_action(self, state):state = torch.from_numpy(state).float()state = state.unsqueeze(0)if np.random.random() > self.epsilon:action_values = self.q_net(state)action = torch.argmax(action_values).item()else:action = np.random.randint(0, action_dim)return action

状态保存

将智能体与环境的互动存储下来,用于后续的训练。

def store_transition(self, state, action, reward, next_state, done):self.memory.add(state, action, reward, next_state, done)

训练

当收集到超过 b a t c h s i z e batch \; size batchsize组信息后,就可以开始训练了,在经验回放池中随机取出 b a t c h s i z e batch \; size batchsize组信息,并通过 Q Q Q网络得到预测到 Q ( s t , a t ) Q(s_{t},a_{t}) Q(st,at)值。

然后通过目标网络得到下一状态的 Q ( s t + 1 , a t + 1 ) Q(s_{t + 1}, a_{t + 1}) Q(st+1,at+1),并计算出目标 Q Q Q值。

得到当前 Q Q Q值和目标 Q Q Q值后计算损失并更新网络,同时定期更新目标网络。

def learn(self):if len(self.memory) < self.batch_size:return# 批量计算Q(s,a)states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)q_values = self.q_net(states)next_q_values = self.target_q_net(next_states)q_sa = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)target = rewards + self.gamma * next_q_values.max(1)[0].detach() * (1 - dones)# 计算损失并反向传播loss = self.loss(q_sa, target)self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 目标网络更新self.learnstep += 1if self.learnstep % self.update_epochs == 0:self.target_q_net.load_state_dict(self.q_net.state_dict())

环境交互 & 模型训练

设置好参数后就可以初始化环境开始收集信息并训练模型

from tqdm import tqdm
episodes = 1000
epsilon_dacay = 0.995
epsilon_start = 1
epsilon_end = 0.05
scores = []
model = DQN()
pbar = tqdm(range(episodes), desc="Training")
for episode in pbar:state, _ = env.reset()score = 0done = Falsewhile not done:action = model.choose_action(state)  next_state, reward, done, truncated,_ = env.step(action)done = done or truncatedmodel.store_transition(state, action, reward, next_state, done)state = next_statemodel.learn()score += rewardenv.render()scores.append(score)model.epsilon = max(epsilon_end, model.epsilon * epsilon_dacay)pbar.set_postfix(ep=episode, score=score, avg100=np.mean(scores[-100:]), ε=model.epsilon)
torch.save(model.q_net.state_dict(), "../model/cartpole.pt")
print(scores)
plt.plot(scores)
plt.show()

完整程序

import gymnasium as gym, torch, torch.nn as nn, numpy as np, random, matplotlib.pyplot as plt
from collections import dequeenv = gym.make("CartPole-v1")
# env = gym.make("CartPole-v1", render_mode="human")
state_dim  = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n
action_dim = env.action_space.n
# print(state_dim, action_dim)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
class Qnet(nn.Module):def __init__(self, hidden_dim = 128):super(Qnet, self).__init__()self.net = nn.Sequential(nn.Linear(state_dim, hidden_dim), nn.ReLU(),nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),nn.Linear(hidden_dim, action_dim))def forward(self, x):return self.net(x)class ReplayBuffer(object):def __init__(self, max_size = 50000):self.max_size = max_sizeself.buffer = deque(maxlen = max_size)def add(self, state, action, reward, next_state, done):if self.__len__() >= self.max_size:self.buffer.popleft()self.buffer.append((state, action, reward, next_state, done))def sample(self, batch_size, device = 'cpu'):indices = np.random.choice(len(self.buffer), batch_size, replace=True)batch = [self.buffer[i] for i in indices]states, actions, rewards, next_states, dones = zip(*batch)return (torch.FloatTensor(states).to(device),torch.LongTensor(actions).to(device),torch.FloatTensor(rewards).to(device),torch.FloatTensor(next_states).to(device),torch.FloatTensor(dones).to(device))def __len__(self):return len(self.buffer)class DQN():def __init__(self, lr = 3e-4,gamma = 0.98, epsilon = 0.1, batch_size = 128, update_epochs = 50):self.q_net = Qnet()self.target_q_net = Qnet()self.target_q_net.load_state_dict(self.q_net.state_dict())self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr)self.gamma = gammaself.epsilon = epsilonself.batch_size = batch_sizeself.update_epochs = update_epochsself.loss = nn.MSELoss()self.memory = ReplayBuffer()self.learnstep = 0def choose_action(self, state):state = torch.from_numpy(state).float()state = state.unsqueeze(0)if np.random.random() > self.epsilon:action_values = self.q_net(state)action = torch.argmax(action_values).item()else:action = np.random.randint(0, action_dim)return actiondef store_transition(self, state, action, reward, next_state, done):self.memory.add(state, action, reward, next_state, done)def learn(self):if len(self.memory) < self.batch_size:return# 批量计算Q(s,a)states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)q_values = self.q_net(states)next_q_values = self.target_q_net(next_states)q_sa = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)target = rewards + self.gamma * next_q_values.max(1)[0].detach() * (1 - dones)# 计算损失并反向传播loss = self.loss(q_sa, target)self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 目标网络更新self.learnstep += 1if self.learnstep % self.update_epochs == 0:self.target_q_net.load_state_dict(self.q_net.state_dict())from tqdm import tqdm
episodes = 1000
epsilon_dacay = 0.995
epsilon_start = 1
epsilon_end = 0.05
scores = []
model = DQN()
pbar = tqdm(range(episodes), desc="Training")
for episode in pbar:state, _ = env.reset()score = 0done = Falsewhile not done:action = model.choose_action(state)  # 根据杆子角度简单决策next_state, reward, done, truncated,_ = env.step(action)done = done or truncatedmodel.store_transition(state, action, reward, next_state, done)state = next_statemodel.learn()score += rewardenv.render()scores.append(score)model.epsilon = max(epsilon_end, model.epsilon * epsilon_dacay)pbar.set_postfix(ep=episode, score=score, avg100=np.mean(scores[-100:]), ε=model.epsilon)
torch.save(model.q_net.state_dict(), "../model/cartpole.pt")
print(scores)
plt.plot(scores)
plt.show()
http://www.dtcms.com/a/493128.html

相关文章:

  • 精品网站源码资源程序下载新网站怎么做流畅
  • 温州专业手机网站制作多少钱郑州外贸网站制作
  • 阴阳师网站建设动漫专业就业前景
  • 企业门户网站服务器网页设计与网站建设的热点
  • 义乌公司网站制作海南网络广播电视台少儿频道
  • 企业怎么做自己的网站辽宁建设工程信息网怎么业绩加分
  • 萍缘网站建设工作曲靖房地产网站开发
  • 互联网站备案信息查询河南开展涉网暴力专项举报工作
  • 免备案域名是危险网站南京网站推广哪家便宜
  • 做电商卖玉器的网站如何自己开个网站平台
  • 南昌哪家做网站好h5制作开发哪儿
  • windows优化大师最新版本杭州市网站seo
  • 网站设计制作哪个公司的好wordpress淘宝联盟模板下载地址
  • 百度天气赋能下的湖南省湖南省空气质量WebGIS可视化关键技术与实现
  • 自己做服装搭配的网站网站cdn自己做
  • 建设银行官网网站人事百度广告投放公司
  • 公司展示型网站seo营销外包
  • 腾讯云学生机做网站四川省住房与城乡建设 厅网站
  • 广州知名网站提高网站可用性的策略有哪些
  • 自己做网站nas什么网站做博客好
  • 成都网站内容策划关于网站建设的技巧
  • 网站建设可行性及需求分析什么网站能接单做网站
  • 乐清站在那儿专门做书单的网站
  • 企业网站ui设计wordpress ftp服务器
  • 太仓建设局网站网站首页设计效果图
  • 织梦可以做视频网站么域名服务器分为
  • 北京网站优化公司哪里稳定网站网址怎么写
  • 甘肃网站建设费用工会网站建设方案
  • 热门关键词查询新上线的网站怎么做优化
  • 网站 多线买医疗产品的网站建设