当前位置: 首页 > news >正文

强化学习入门-2(Dueling DQN)

强化学习项目-2-LunarLander-v3(Dueling DQN)

环境

本环境是OpenAI Gym提供的一个经典控制环境。

官网链接:https://gymnasium.farama.org/environments/box2d/lunar_lander/

操作:

  • 0:什么都不做
  • 1:左侧推进器:推动着陆器向右移动
  • 2:右侧推进器:推动着陆器向左移动
  • 3:主引擎:推动着陆器向上移动

对应状态向量

s = [ x y x ˙ y ˙ θ θ ˙ l r ] s = \left[ \begin{aligned} x \\[4pt] y \\[4pt] \dot{x} \\[4pt] \dot{y} \\[4pt] \theta \\[4pt] \dot{\theta} \\[4pt] l \\[4pt] r \end{aligned} \right] s= xyx˙y˙θθ˙lr

  • x , y x, y x,y :横坐标和纵坐标
  • x ˙ , y ˙ \dot{x}, \dot{y} x˙,y˙ :在横坐标和纵坐标上的移动速度
  • θ \theta θ : 着陆器机身的角度
  • θ ˙ \dot{\theta} θ˙ : 着陆器机身的角度的变化速度(角速度)
  • l , r l, r l,r :左右腿是否接触地面

奖励函数:

  • 成功抵达平台: 100 ∼ 140 100 \sim 140 100140
  • 朝向或远离平台情况:靠近加分,远离扣分
  • 坠毁: − 100 -100 100
  • 软着陆: + 100 +100 +100
  • 腿着陆: + 10 +10 +10
  • 使用主引擎一次: − 0.3 -0.3 0.3
  • 使用单侧推进器一次(左右推进器之一): − 0.03 -0.03 0.03

引入环境

下载包
pip install gymnasium
导入
import gymnasium as gym
env = gym.make("LunarLander-v3", render_mode="human")
# 获取状态维度和动作维度
state_dim  = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n
action_dim = env.action_space.n

VA网络

定义

这里 V A VA VA网络拥有两层共享的特征提取层,然后连接到价值网络和优势网络。

价值网络通过状态 s s s预测得到从状态 s s s出发,能获得的价值期望,即选择每个动作 a a a的概率乘上对应的 Q ( s , a ) Q(s,a) Q(s,a)

优势网络则表明选择当前动作所能获得的价值对比选择所有动作的平均值有多大的优势,即 q ( s , a ) − ∑ a i ∈ a c t i o n s q ( s , a i ) a c t i o n s d i m q(s, a) - \frac{\sum\limits_{a_{i} \in actions} q(s, a_{i})}{actions \; dim} q(s,a)actionsdimaiactionsq(s,ai)

双网络结构

这里为了确保 V V V值和 A A A值的稳定性,依然会使用两个神经网络:

  • V A VA VA网络:用于估计当前策略的 V V V值和 A A A值的网络
  • 目标网络:用于提高稳定的目标 V V V值和 A A A值的网络

简单来说,就是由 V A VA VA网络输出预测值,由目标网络预测结果作为真实值,并且每次训练仅更新 V A VA VA网络,然后通过软更新更新到目标网络。

代码实现

这里网络采用两层隐藏层,维度均为 128 128 128,激活函数为Relu

class VAnet(nn.Module):def __init__(self, hidden_dim = 128):super(VAnet, self).__init__()self.shared = nn.Sequential(nn.Linear(state_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),)self.value = nn.Sequential(nn.Linear(hidden_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, 1))self.advantage = nn.Sequential(nn.Linear(hidden_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, action_dim))def forward(self, x):shared = self.shared(x)value = self.value(shared)advantage = self.advantage(shared)return value + advantage - advantage.mean()

经验回放池

定义

用于存储和重复利用历史交互数据的数据结构。

它把智能体与环境交互产生的经验元组(通常形如 ( s , a , r , s ′ , d o n e ) (s, a, r, s^{\prime}, done) (s,a,r,s,done))暂存起来,并在后续训练中以随机小批量的形式反复抽取,用于更新策略或价值函数。

代码实现

经验回放池共包含3个函数:

  • 初始化:创建一个双端队列存储数据,并设置最大容量
  • 添加数据:将经验元组放入双端队列,如果超过容量先进行删除操作
  • 随机采样:随机采样 b a t c h s i z e batch \; size batchsize组数据,转换后成张量后返回
class ReplayBuffer(object):def __init__(self, max_size = 50000):self.max_size = max_sizeself.buffer = deque(maxlen = max_size)def add(self, state, action, reward, next_state, done):if self.__len__() >= self.max_size:self.buffer.popleft()self.buffer.append((state, action, reward, next_state, done))def sample(self, batch_size, device = 'mps'):indices = np.random.choice(len(self.buffer), batch_size, replace=True)batch = [self.buffer[i] for i in indices]states, actions, rewards, next_states, dones = zip(*batch)return (torch.FloatTensor(states).to(device),torch.LongTensor(actions).to(device),torch.FloatTensor(rewards).to(device),torch.FloatTensor(next_states).to(device),torch.FloatTensor(dones).to(device))

DQN算法

定义

DQN算法的核心就是使用神经网络替代了Q函数,用于预测 Q ( s , a ) Q(s,a) Q(s,a),选择DuelingDQN仅仅是对网络结构进行了修改,DQN算法的操作不变。

初始化

定义时将所有需要的参数设置好。

定义好两个网络,设置好优化器,折扣因子等等。

class DQN():def __init__(self, lr = 3e-4,gamma = 0.98, epsilon = 1, tau = 0.001, batch_size = 128, update_epochs = 4):self.q_net = VAnet()self.target_q_net = VAnet()self.target_q_net.load_state_dict(self.q_net.state_dict())self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr)self.gamma = gammaself.epsilon = epsilonself.tau = tauself.batch_size = batch_sizeself.update_epochs = update_epochsself.loss = nn.MSELoss()self.memory = ReplayBuffer()self.learnstep = 0

动作选择

这里使用 ϵ \epsilon ϵ贪心策略进行动作选择, ϵ \epsilon ϵ在训练时动态更新。

def choose_action(self, state):state = torch.from_numpy(state).float()state = state.unsqueeze(0)if np.random.random() > self.epsilon:action_values = self.q_net(state)action = torch.argmax(action_values).item()else:action = np.random.randint(0, action_dim)return action

状态保存

将智能体与环境的互动存储下来,用于后续的训练。

def store_transition(self, state, action, reward, next_state, done):self.memory.add(state, action, reward, next_state, done)

训练

当收集到超过 b a t c h s i z e batch \; size batchsize组信息后,就可以开始训练了,在经验回放池中随机取出 b a t c h s i z e batch \; size batchsize组信息,并通过 Q Q Q网络得到预测到 Q ( s t , a t ) Q(s_{t},a_{t}) Q(st,at)值。

然后通过目标网络得到下一状态的 Q ( s t + 1 , a t + 1 ) Q(s_{t + 1}, a_{t + 1}) Q(st+1,at+1),并计算出目标 Q Q Q值。

得到当前 Q Q Q值和目标 Q Q Q值后计算损失并更新网络。

由于此时不再是周期更新目标网络,而是换成了软更新,因此,需要修改原本的更新方式。

def learn(self):self.learnstep += 1if len(self.memory) < self.batch_size or self.learnstep % self.update_epochs != 0:return# 批量计算Q(s,a)states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)q_values = self.q_net(states)next_q_values = self.target_q_net(next_states)q_sa = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)target = rewards + self.gamma * next_q_values.max(1)[0].detach() * (1 - dones)# 计算损失并反向传播loss = self.loss(q_sa, target)self.optimizer.zero_grad()loss.backward()self.optimizer.step()# 目标网络更新for target_param, param in zip(self.target_q_net.parameters(), self.q_net.parameters()):target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)

环境交互 & 模型训练

设置好参数后就可以初始化环境开始收集信息并训练模型

from tqdm import tqdm
episodes = 1000
tau = 0.001
epsilon_decay = 0.99
epsilon_start = 1
epsilon_end = 0.05
scores = []
model = DQN()
model.epsilon = epsilon_start
pbar = tqdm(range(episodes), desc="Training")
for episode in pbar:state, _ = env.reset()score = 0done = Falsewhile not done:action = model.choose_action(state)next_state, reward, done, truncated,_ = env.step(action)done = done or truncatedmodel.store_transition(state, action, reward, next_state, done)model.learn()state = next_statescore += rewardenv.render()scores.append(score)model.epsilon = max(epsilon_end, epsilon_decay * model.epsilon)pbar.set_postfix(ep=episode, score=score, avg100=np.mean(scores[-100:]), ε=model.epsilon)
print(scores)
torch.save(model.q_net.state_dict(), "../../model/LunarLander-DuelingDQN.pt")
plt.plot(scores)
plt.show()

完整程序

import gymnasium as gym, torch, torch.nn as nn, numpy as np, random, matplotlib.pyplot as plt
from collections import dequeenv = gym.make("LunarLander-v3")
# env = gym.make("LunarLander-v3", render_mode = "human")
state_dim  = env.observation_space.shape[0] if len(env.observation_space.shape) == 1 else env.observation_space.n
action_dim = env.action_space.n
# print(state_dim, action_dim)
device = 'cpu'
# device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
class VAnet(nn.Module):def __init__(self, hidden_dim = 128):super(VAnet, self).__init__()self.shared = nn.Sequential(nn.Linear(state_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, hidden_dim), nn.ReLU(),)self.value = nn.Sequential(nn.Linear(hidden_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, 1))self.advantage = nn.Sequential(nn.Linear(hidden_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, action_dim))def forward(self, x):shared = self.shared(x)value = self.value(shared)advantage = self.advantage(shared)return value + advantage - advantage.mean()class ReplayBuffer(object):def __init__(self, max_size = 50000):self.max_size = max_sizeself.buffer = deque(maxlen = max_size)def add(self, state, action, reward, next_state, done):if self.__len__() >= self.max_size:self.buffer.popleft()self.buffer.append((state, action, reward, next_state, done))def sample(self, batch_size):indices = np.random.choice(len(self.buffer), batch_size, replace=True)batch = [self.buffer[i] for i in indices]states, actions, rewards, next_states, dones = zip(*batch)return (torch.FloatTensor(states).to(device),torch.LongTensor(actions).to(device),torch.FloatTensor(rewards).to(device),torch.FloatTensor(next_states).to(device),torch.FloatTensor(dones).to(device))def __len__(self):return len(self.buffer)class DQN():def __init__(self, lr = 3e-4,gamma = 0.98, epsilon = 0.1, batch_size = 128, update_epochs = 4):self.q_net = VAnet().to(device)self.target_q_net = VAnet().to(device)self.target_q_net.load_state_dict(self.q_net.state_dict())self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr)self.gamma = gammaself.epsilon = epsilonself.batch_size = batch_sizeself.update_epochs = update_epochsself.loss = nn.MSELoss()self.memory = ReplayBuffer()self.learnstep = 0def choose_action(self, state):state = torch.FloatTensor(state).unsqueeze(0).to(device)if np.random.random() > self.epsilon:action_values = self.q_net(state)action = torch.argmax(action_values).item()else:action = np.random.randint(0, action_dim)return actiondef store_transition(self, state, action, reward, next_state, done):self.memory.add(state, action, reward, next_state, done)def learn(self):self.learnstep += 1if len(self.memory) < self.batch_size or self.learnstep % self.update_epochs != 0:return# 批量计算Q(s,a)states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)q_values = self.q_net(states)next_q_values = self.target_q_net(next_states)q_sa = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)target = rewards + self.gamma * next_q_values.max(1)[0].detach() * (1 - dones)# 计算损失并反向传播loss = self.loss(q_sa, target)self.optimizer.zero_grad()loss.backward()self.optimizer.step()for target_param, param in zip(self.target_q_net.parameters(), self.q_net.parameters()):target_param.data.copy_(tau * param.data + (1.0 - tau) * target_param.data)from tqdm import tqdm
episodes = 1000
tau = 0.001
epsilon_decay = 0.99
epsilon_start = 1
epsilon_end = 0.05
scores = []
model = DQN()
model.epsilon = epsilon_start
pbar = tqdm(range(episodes), desc="Training")
for episode in pbar:state, _ = env.reset()score = 0done = Falsewhile not done:action = model.choose_action(state)next_state, reward, done, truncated,_ = env.step(action)done = done or truncatedmodel.store_transition(state, action, reward, next_state, done)model.learn()state = next_statescore += rewardenv.render()scores.append(score)model.epsilon = max(epsilon_end, epsilon_decay * model.epsilon)pbar.set_postfix(ep=episode, score=score, avg100=np.mean(scores[-100:]), ε=model.epsilon)
print(scores)
torch.save(model.q_net.state_dict(), "../../model/LunarLander-DuelingDQN.pt")
plt.plot(scores)
plt.show()
http://www.dtcms.com/a/491329.html

相关文章:

  • 【第6篇】引入高配大模型
  • 嵌入式计算机AF208:自动化装配管理集成方案核心
  • 五台县建设局网站有ip地址如何做网站
  • 数码产品商城网站建设网上推广赚钱方法
  • 数智时代的软件架构峰会
  • 展板模板网站河东网站建设公司
  • 快手编程大模型真实水平,本地部署,实测
  • 如何在AutoCAD中进行GIS空间查询?
  • 旧电脑变废为宝:打造低成本网络打印机服务器全记录
  • EF Core 导航属性赋值的一种方式
  • 做实验流程图的网站必应搜索引擎入口官网
  • 做那个的网站谁有wordpress用户信息修改
  • VMware虚拟机安装文档
  • 分布式专题——46 ElasticSearch高级查询语法Query DSL实战
  • Spring Boot核心功能深度解析
  • 麒麟系统使用-使用Sublime浏览小说
  • 【2025年10月一区SCI】Experience Exchange Strategy 经验交换策略(EES),优化算法改进新方法!-附Matlab免费代码
  • 渭南建网站如何建立官方网站
  • Azure Cobalt 100 VM:以卓越性能与能效优化云端工作负载
  • 【泛3C篇】AI深度学习在手机背板外观缺陷检测应用方案
  • OpenAI Sora 2 现已在Azure AI Foundry 公共预览中开放
  • 外贸网站推广渠道网站录入
  • **Unreal引擎中的发散创新思维:探索创新与优化之路**随着游戏
  • h5游戏免费下载:电子木鱼
  • h5游戏免费下载:《飞跃的奶酪》
  • 网站速度对seo的影响羽毛球赛事在哪里看
  • 阿里云渠道商:阿里云CDN怎么进行配额优化?
  • 自行车零部件尺寸自动化三维测量快速尺寸测量-中科米堆CASAIM
  • 帮人恶意点击网站岳阳做网站费用
  • 希尔排序解析