网站建站公司一站式服务免费男女打扑克的软件
一、多智能体强化学习原理
1. 多智能体系统核心思想
多智能体强化学习(Multi-Agent Reinforcement Learning, MARL)旨在解决多个智能体在共享环境中协同或竞争学习的问题。与单智能体强化学习的区别在于:
对比维度 | 单智能体强化学习 | 多智能体强化学习 |
---|---|---|
目标 | 单个智能体优化策略 | 多个智能体协同或竞争优化策略 |
环境 | 静态环境 | 动态环境(其他智能体行为影响) |
挑战 | 探索与利用 | 非平稳性、通信、合作与竞争 |
2. 多智能体强化学习分类
-
合作型:智能体共同完成一个目标(如团队协作)
-
竞争型:智能体之间存在对抗关系(如博弈)
-
混合型:既有合作又有竞争(如市场交易)
二、MADDPG 算法框架
Multi-Agent Deep Deterministic Policy Gradient (MADDPG) 是 MARL 中的经典算法,基于 DDPG 扩展而来:
-
集中训练,分散执行:
-
训练时:每个智能体可以访问全局信息
-
执行时:每个智能体仅依赖局部观测
-
-
Critic 网络:使用全局信息评估动作价值
-
Actor 网络:基于局部观测选择动作
数学表达:
三、MADDPG 实现步骤(基于 Gymnasium)
我们将以 Multi-Agent Particle Environment 为例,实现 MADDPG 算法:
-
定义多智能体环境:创建多个智能体共享的环境
-
构建 Actor-Critic 网络:每个智能体独立拥有 Actor 和 Critic 网络
-
实现集中训练:Critic 使用全局信息,Actor 使用局部观测
-
分散执行测试:验证智能体在局部观测下的表现
四、代码实现
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
from pettingzoo.mpe import simple_spread_v3
from gymnasium.spaces import Discrete, Box
import time# ================== 配置参数 ==================
class MADDPGConfig:env_name = "simple_spread_v3" # 使用 PettingZoo 中的环境num_agents = 3 # 智能体数量hidden_dim = 256 # 网络隐藏层维度actor_lr = 1e-4 # Actor 学习率critic_lr = 1e-3 # Critic 学习率gamma = 0.95 # 折扣因子tau = 0.05 # 软更新系数buffer_size = 100000 # 经验回放缓冲区大小batch_size = 1024 # 批量大小max_episodes = 1000 # 最大训练回合数device = torch.device("cuda" if torch.cuda.is_available() else "cpu")noise_scale_init = 0.2 # 初始噪声幅度noise_scale_decay = 0.995 # 噪声衰减系数# ================== Actor 网络 ==================
class Actor(nn.Module):def __init__(self, state_dim, action_dim):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim, MADDPGConfig.hidden_dim),nn.ReLU(),nn.Linear(MADDPGConfig.hidden_dim, MADDPGConfig.hidden_dim),nn.ReLU(),nn.Linear(MADDPGConfig.hidden_dim, action_dim),nn.Sigmoid() # 输出范围 [0, 1])def forward(self, state):return self.net(state)# ================== Critic 网络 ==================
class Critic(nn.Module):def __init__(self, state_dim, action_dim, num_agents):super().__init__()self.net = nn.Sequential(nn.Linear(state_dim * num_agents + action_dim * num_agents, MADDPGConfig.hidden_dim),nn.ReLU(),nn.Linear(MADDPGConfig.hidden_dim, MADDPGConfig.hidden_dim),nn.ReLU(),nn.Linear(MADDPGConfig.hidden_dim, 1))def forward(self, state, action):# 展平状态和动作的维度: [batch, agents, dim] -> [batch, agents*dim]state_flat = state.view(state.size(0), -1)action_flat = action.view(action.size(0), -1)x = torch.cat([state_flat, action_flat], dim=-1)return self.net(x)# ================== MADDPG 智能体 ==================
class MADDPGAgent:def __init__(self, state_dim, action_dim, num_agents, agent_id, env_action_space):self.id = agent_idself.actor = Actor(state_dim, action_dim).to(MADDPGConfig.device)self.target_actor = Actor(state_dim, action_dim).to(MADDPGConfig.device)self.critic = Critic(state_dim, action_dim, num_agents).to(MADDPGConfig.device)self.target_critic = Critic(state_dim, action_dim, num_agents).to(MADDPGConfig.device)self.env_action_space = env_action_space # 保存环境动作空间# 初始化目标网络self.target_actor.load_state_dict(self.actor.state_dict())self.target_critic.load_state_dict(self.critic.state_dict())# 优化器self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=MADDPGConfig.actor_lr)self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=MADDPGConfig.critic_lr)def act(self, state, noise_scale=None):if noise_scale is None:noise_scale = 0.0 # 或 self.config.noise_scale_initnoise_scale = float(noise_scale)# ==== 动作生成逻辑 ====state = torch.FloatTensor(state).to(MADDPGConfig.device)action = self.actor(state).cpu().data.numpy()# 生成截断噪声noise = np.clip(np.random.randn(*action.shape), -3.0, 3.0) * noise_scale# 裁剪动作action = np.clip(action + noise, 0.0, 1.0)return actiondef update(self, replay_buffer, agents):if len(replay_buffer) < MADDPGConfig.batch_size:return# 从缓冲区采样states, actions, rewards, next_states, dones = replay_buffer.sample(MADDPGConfig.batch_size)# 提取当前智能体的奖励和终止状态agent_rewards = rewards[:, self.id, :] # Shape: (batch_size, 1)agent_dones = dones[:, self.id, :] # Shape: (batch_size, 1)# 计算目标 Q 值with torch.no_grad():next_actions = torch.cat([agent.target_actor(next_states[:, i, :]) for i, agent in enumerate(agents)], dim=1 # 在第二维拼接: (batch_size, num_agents * action_dim))target_q_values = self.target_critic(next_states, next_actions)target_q = agent_rewards + MADDPGConfig.gamma * (1 - agent_dones) * target_q_values# 更新 Criticcurrent_q = self.critic(states, actions)critic_loss = nn.MSELoss()(current_q, target_q.detach())self.critic_optimizer.zero_grad()critic_loss.backward()torch.nn.utils.clip_grad_norm_(self.critic.parameters(), max_norm=0.5)self.critic_optimizer.step()# 更新 Actoractor_actions = torch.cat([self.actor(states[:, i, :]) if i == self.id else agents[i].actor(states[:, i, :]).detach() for i in range(len(agents))],dim=1 # 在第二维拼接: (batch_size, num_agents * action_dim))actor_loss = -self.critic(states, actor_actions).mean()self.actor_optimizer.zero_grad()actor_loss.backward()torch.nn.utils.clip_grad_norm_(self.actor.parameters(), max_norm=0.5)self.actor_optimizer.step()# 软更新目标网络for param, target_param in zip(self.actor.parameters(), self.target_actor.parameters()):target_param.data.copy_(MADDPGConfig.tau * param.data + (1 - MADDPGConfig.tau) * target_param.data)for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):target_param.data.copy_(MADDPGConfig.tau * param.data + (1 - MADDPGConfig.tau) * target_param.data)# ================== 经验回放缓冲区 ==================
class ReplayBuffer:def __init__(self, num_agents, state_dim, action_dim):self.buffer = deque(maxlen=MADDPGConfig.buffer_size)self.num_agents = num_agentsself.state_dim = state_dimself.action_dim = action_dimdef add(self, state, action, reward, next_state, done):# 转换为 numpy 数组并验证形状state = np.array(state, dtype=np.float32)assert state.shape == (self.num_agents, self.state_dim), \f"State shape mismatch: {state.shape} vs {(self.num_agents, self.state_dim)}"action = np.array(action, dtype=np.float32)assert action.shape == (self.num_agents, self.action_dim), \f"Action shape mismatch: {action.shape} vs {(self.num_agents, self.action_dim)}"next_state = np.array(next_state, dtype=np.float32)assert next_state.shape == (self.num_agents, self.state_dim), \f"Next state shape mismatch: {next_state.shape} vs {(self.num_agents, self.state_dim)}"self.buffer.append((state,action,np.array(reward, dtype=np.float32),next_state,np.array(done, dtype=np.float32)))def sample(self, batch_size):samples = random.sample(self.buffer, batch_size)states, actions, rewards, next_states, dones = zip(*samples)# 转换为 numpy 数组并调整维度states = np.array(states, dtype=np.float32) # Shape: (batch, num_agents, state_dim)actions = np.array(actions, dtype=np.float32) # Shape: (batch, num_agents, action_dim)rewards = np.array(rewards, dtype=np.float32) # Shape: (batch, num_agents)next_states = np.array(next_states, dtype=np.float32)dones = np.array(dones, dtype=np.float32) # Shape: (batch, num_agents)# 转换为张量并添加维度return (torch.FloatTensor(states).to(MADDPGConfig.device),torch.FloatTensor(actions).to(MADDPGConfig.device),torch.FloatTensor(rewards).unsqueeze(-1).to(MADDPGConfig.device), # Shape: (batch, num_agents, 1)torch.FloatTensor(next_states).to(MADDPGConfig.device),torch.FloatTensor(dones).unsqueeze(-1).to(MADDPGConfig.device) # Shape: (batch, num_agents, 1))def __len__(self):return len(self.buffer)# ================== 训练系统 ==================
class MADDPGTrainer:def __init__(self):self.env = simple_spread_v3.parallel_env(N=3,continuous_actions=True,max_cycles=100,render_mode="rgb_array")self.env_action_space = self.env.action_space("agent_0")print("动作空间下限:", self.env_action_space.low) # 应输出 [0.0, 0.0, ...]print("动作空间上限:", self.env_action_space.high) # 应输出 [1.0, 1.0, ...]# 验证动作空间assert np.allclose(self.env_action_space.low, 0.0), "动作空间下限应为0.0"assert np.allclose(self.env_action_space.high, 1.0), "动作空间上限应为1.0"self.state_dim = self.env.observation_space("agent_0").shape[0]self.action_space = self.env.action_space("agent_0")if isinstance(self.action_space, Box):self.action_dim = self.action_space.shape[0]else:raise ValueError("Continuous action space required")self.num_agents = MADDPGConfig.num_agentsself.agents = [MADDPGAgent(self.state_dim, self.action_dim, self.num_agents, agent_id=i, env_action_space=self.env_action_space) for i in range(self.num_agents)]self.replay_buffer = ReplayBuffer(self.num_agents, self.state_dim, self.action_dim)self.noise_scale = MADDPGConfig.noise_scale_init # 初始化噪声幅度def train(self):for episode in range(MADDPGConfig.max_episodes):states, _ = self.env.reset()episode_reward = 0current_noise_scale = self.noise_scaleself.noise_scale *= MADDPGConfig.noise_scale_decay # 衰减噪声while True:# 保存当前环境的agents列表current_agents = list(self.env.agents)# 生成动作actions = {}for i, agent_id in enumerate(current_agents):agent_state = states[agent_id]actions[agent_id] = self.agents[i].act(agent_state, noise_scale=current_noise_scale)# 执行环境步骤next_states, rewards, terminations, truncations, _ = self.env.step(actions)# 转换为数组并验证形状state_matrix = np.stack([states[agent_id] for agent_id in current_agents])next_state_matrix = np.stack([next_states[agent_id] for agent_id in current_agents])action_matrix = np.stack([actions[agent_id] for agent_id in current_agents])reward_array = np.array([rewards[agent_id] for agent_id in current_agents], dtype=np.float32)done_array = np.array([terminations[agent_id] or truncations[agent_id] for agent_id in current_agents], dtype=np.float32)# 存储经验(已包含形状验证)self.replay_buffer.add(state_matrix,action_matrix,reward_array,next_state_matrix,done_array)# 更新状态states = next_statesepisode_reward += np.sum(reward_array)# 智能体更新for agent in self.agents:agent.update(self.replay_buffer, self.agents)# 检查是否所有智能体都已完成if all(terminations[agent_id] or truncations[agent_id] for agent_id in current_agents):break# 训练进度输出if (episode + 1) % 100 == 0:print(f"Episode {episode+1} | Total Reward: {episode_reward:.2f}")if __name__ == "__main__":# ==== 日志抑制 ====import loggingimport warningslogging.basicConfig(level=logging.CRITICAL)logging.getLogger('gymnasium').setLevel(logging.CRITICAL)warnings.filterwarnings("ignore")start_time = time.time()print(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")print("初始化训练环境...")trainer = MADDPGTrainer()print("开始训练...")trainer.train()end_time = time.time()print(f"训练完成时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}")print(f"总训练时长: {end_time - start_time:.2f}秒")
五、关键代码解析
-
Actor-Critic 网络
-
Actor
:基于局部观测输出动作 -
Critic
:基于全局状态和所有智能体的动作输出 Q 值
-
-
集中训练与分散执行
-
训练时:Critic 使用全局信息,Actor 使用局部观测
-
执行时:每个智能体仅依赖局部观测选择动作
-
-
经验回放缓冲区
-
存储所有智能体的经验(状态、动作、奖励、下一状态、完成标志)
-
随机采样用于训练
-
六、训练输出示例
开始时间: 2025-03-23 08:27:49
初始化训练环境...
动作空间下限: [0. 0. 0. 0. 0.]
动作空间上限: [1. 1. 1. 1. 1.]
开始训练...
Episode 100 | Total Reward: -322.41
Episode 200 | Total Reward: -402.97
Episode 300 | Total Reward: -388.02
Episode 400 | Total Reward: -384.98
Episode 500 | Total Reward: -261.25
Episode 600 | Total Reward: -215.34
Episode 700 | Total Reward: -623.22
Episode 800 | Total Reward: -255.72
Episode 900 | Total Reward: -506.97
Episode 1000 | Total Reward: -285.47
训练完成时间: 2025-03-23 09:35:24
总训练时长: 4055.36秒
七、总结与扩展
本文实现了多智能体强化学习的核心算法——MADDPG,展示了智能体在共享环境中协同学习的能力。读者可尝试以下扩展方向:
-
复杂环境
-
使用更复杂的环境(如 StarCraft II)测试算法性能
-
-
通信机制
-
添加智能体之间的通信机制,提升协作效率
-
-
混合任务
-
设计既有合作又有竞争的任务,测试算法的通用性
-
在下一篇文章中,我们将探索 多任务强化学习(Multi-Task RL),并实现基于共享表示的策略优化算法!
注意事项
-
安装依赖:
pip install gymnasium pettingzoo