当前位置: 首页 > news >正文

DQN 玩 2048 实战|第三期!优化网络,使用GPU、Env奖励优化

视频讲解:

DQN 玩 2048 实战|第三期!优化网络,使用GPU、Env奖励优化

1. 仅考虑局部合并奖励:目前的奖励只设置为合并方块时获得的分数,只关注了每一步的即时合并收益,而没有对最终达成 2048 这个目标给予额外的激励,如果没有对达成 2048 给予足够的奖励信号,Agent 可能不会将其作为一个重要的目标

2. 训练硬件资源利用不高,没有使用GPU进行加速,默认为CPU,较慢

代码修改如下:

step函数里面,输入维度增加max_tile最大的数是多少

if 2048 in self.board:
    reward += 10000
    done = True
state = self.board.flatten()
max_tile = np.max(self.board)
state = np.append(state, max_tile)
return state, reward, done
input_size = 17

检查系统中是否有可用的 GPU,如果有则使用 GPU 进行计算,否则使用 CPU。

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

在 train ,创建模型实例后,使用 .to(device) 将模型移动到指定的设备(GPU 或 CPU)

model = DQN(input_size, output_size).to(device)
target_model = DQN(input_size, output_size).to(device)

在训练和推理过程中,将输入数据(状态、动作、奖励等)也移动到指定的设备上。

state = torch.FloatTensor(state).unsqueeze(0).to(device)

next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)

states = torch.FloatTensor(states).to(device)
actions = torch.LongTensor(actions).to(device)
rewards = torch.FloatTensor(rewards).to(device)
next_states = torch.FloatTensor(next_states).to(device)
dones = torch.FloatTensor(dones).to(device)

将 state 和 next_state 先使用 .cpu() 方法移动到 CPU 上,再使用 .numpy() 方法转换为 NumPy 数组

replay_buffer.add(state.cpu().squeeze(0).numpy(), action, reward, next_state.cpu().squeeze(0).numpy(), done)

这个不改的话,会出现 TypeError: can't convert cuda:0 device type tensor to numpy 错误

完整代码如下:

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.table import Table

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 2048 游戏环境类
class Game2048:
    def __init__(self):
        self.board = np.zeros((4, 4), dtype=int)
        self.add_random_tile()
        self.add_random_tile()

    def add_random_tile(self):
        empty_cells = np.argwhere(self.board == 0)
        if len(empty_cells) > 0:
            index = random.choice(empty_cells)
            self.board[index[0], index[1]] = 2 if random.random() < 0.9 else 4

    def move_left(self):
        reward = 0
        new_board = np.copy(self.board)
        for row in range(4):
            line = new_board[row]
            non_zero = line[line != 0]
            merged = []
            i = 0
            while i < len(non_zero):
                if i + 1 < len(non_zero) and non_zero[i] == non_zero[i + 1]:
                    merged.append(2 * non_zero[i])
                    reward += 2 * non_zero[i]
                    i += 2
                else:
                    merged.append(non_zero[i])
                    i += 1
            new_board[row] = np.pad(merged, (0, 4 - len(merged)), 'constant')
        if not np.array_equal(new_board, self.board):
            self.board = new_board
            self.add_random_tile()
        return reward

    def move_right(self):
        self.board = np.fliplr(self.board)
        reward = self.move_left()
        self.board = np.fliplr(self.board)
        return reward

    def move_up(self):
        self.board = self.board.T
        reward = self.move_left()
        self.board = self.board.T
        return reward

    def move_down(self):
        self.board = self.board.T
        reward = self.move_right()
        self.board = self.board.T
        return reward

    def step(self, action):
        if action == 0:
            reward = self.move_left()
        elif action == 1:
            reward = self.move_right()
        elif action == 2:
            reward = self.move_up()
        elif action == 3:
            reward = self.move_down()
        done = not np.any(self.board == 0) and all([
            np.all(self.board[:, i] != self.board[:, i + 1]) for i in range(3)
        ]) and all([
            np.all(self.board[i, :] != self.board[i + 1, :]) for i in range(3)
        ])
        if 2048 in self.board:
            reward += 10000
            done = True
        state = self.board.flatten()
        max_tile = np.max(self.board)
        state = np.append(state, max_tile)
        return state, reward, done

    def reset(self):
        self.board = np.zeros((4, 4), dtype=int)
        self.add_random_tile()
        self.add_random_tile()
        state = self.board.flatten()
        max_tile = np.max(self.board)
        state = np.append(state, max_tile)
        return state

# 深度 Q 网络类
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# 经验回放缓冲区类
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)

    def __len__(self):
        return len(self.buffer)

# 可视化函数
def visualize_board(board, ax):
    ax.clear()
    table = Table(ax, bbox=[0, 0, 1, 1])
    nrows, ncols = board.shape
    width, height = 1.0 / ncols, 1.0 / nrows

    # 定义颜色映射
    cmap = mcolors.LinearSegmentedColormap.from_list("", ["white", "yellow", "orange", "red"])

    for (i, j), val in np.ndenumerate(board):
        color = cmap(np.log2(val + 1) / np.log2(2048 + 1)) if val > 0 else "white"
        table.add_cell(i, j, width, height, text=val if val > 0 else "",
                       loc='center', facecolor=color)

    ax.add_table(table)
    ax.set_axis_off()
    plt.draw()
    plt.pause(0.1)

# 训练函数
def train():
    env = Game2048()
    input_size = 17
    output_size = 4
    model = DQN(input_size, output_size).to(device)
    target_model = DQN(input_size, output_size).to(device)
    target_model.load_state_dict(model.state_dict())
    target_model.eval()

    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()
    replay_buffer = ReplayBuffer(capacity=10000)
    batch_size = 32
    gamma = 0.99
    epsilon = 1.0
    epsilon_decay = 0.995
    epsilon_min = 0.01
    update_target_freq = 10

    num_episodes = 1000
    fig, ax = plt.subplots()
    for episode in range(num_episodes):
        state = env.reset()
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        done = False
        total_reward = 0
        while not done:
            visualize_board(env.board, ax)
            if random.random() < epsilon:
                action = random.randint(0, output_size - 1)
            else:
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            next_state, reward, done = env.step(action)
            next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
            replay_buffer.add(state.cpu().squeeze(0).numpy(), action, reward, next_state.cpu().squeeze(0).numpy(), done)

            if len(replay_buffer) >= batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
                states = torch.FloatTensor(states).to(device)
                actions = torch.LongTensor(actions).to(device)
                rewards = torch.FloatTensor(rewards).to(device)
                next_states = torch.FloatTensor(next_states).to(device)
                dones = torch.FloatTensor(dones).to(device)

                q_values = model(states)
                q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)

                next_q_values = target_model(next_states)
                next_q_values = next_q_values.max(1)[0]
                target_q_values = rewards + gamma * (1 - dones) * next_q_values

                loss = criterion(q_values, target_q_values)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            state = next_state
            total_reward += reward

        if episode % update_target_freq == 0:
            target_model.load_state_dict(model.state_dict())

        epsilon = max(epsilon * epsilon_decay, epsilon_min)
        print(f"Episode {episode}: Total Reward = {total_reward}, Epsilon = {epsilon}")

    plt.close()

if __name__ == "__main__":
    train()

相关文章:

  • 整形在内存中的存储(例题逐个解析)
  • Qemu 详解与 ARM 虚拟机搭建指南
  • Python递归与递推的练习(初步了解复杂度,全排列的价值,奇妙的变换,数正方形,高塔登顶方案)
  • HarmonyOS三层架构实战
  • java 使用命令创建jar的常用参数整理
  • 【计算机视觉】工业表计读数(2)--表计检测
  • 387. 字符串中的第一个唯一字符
  • 泛型主要是用于静态类型检查的工具,它并不会在运行时自动验证返回值类型和传入类型是否一致
  • vsftpd服务权限配置
  • Redis如何实现持久化
  • 基本不等式
  • 如何高效定位网络丢包问题?
  • 【C++】:C++11详解 —— 入门基础
  • Matlab 经验模态分解和时频图绘制
  • SAP WORKFLOW BUSINESS PROCESS AUTOMATION
  • QVariant:Qt中万能类型的使用与理解
  • 重生之我在学Vue--第17天 Vue 3 项目优化与部署
  • openEuler系统迁移 Docker 数据目录到 /home,解决Docker 临时文件占用大问题
  • 鸿蒙路由 HMRouter 配置及使用 三 全局拦截器使用
  • AtCoder Beginner Contest 397 A - D题解
  • 首次采用“顶置主星+侧挂从星”布局,长二丁“1箭12星”发射成功
  • 中拉论坛部长级会议为何悬挂海地和圣卢西亚的国旗?外交部回应
  • 江西省市场监管局原局长谢来发被双开:违规接受旅游活动安排
  • 刘国中:持续加强护士队伍建设,更好保障人民身体健康
  • 媒体谈法院就“行人相撞案”道歉:执法公正,普法莫拉开“距离”
  • 75万采购防火墙实为299元路由器?重庆三峡学院发布终止公告:出现违法违规行为