多智能体强化学习入门:从基础到 IPPO 算法—强化学习(20)
目录
1、什么是多智能体强化学习?
2、多智能体强化学习的问题建模
2.1、 核心要素
2.2、 核心挑战
2.3、目标:优化联合策略
3、多智能体强化学习的基本求解范式
3.1、 完全中心化(Centralized)
3.1.1、核心思想:
3.1.2、具体做法:
3.1.3、优点:
3.1.4、缺点:
3.2、 完全去中心化(Decentralized)
3.2.1、核心思想:
3.2.2、具体做法:
3.2.3、优点:
3.2.4、缺点:
3.3、 为什么需要 “中间方案”?
4、IPPO:多智能体版的 PPO 算法
4.1、 先回顾 PPO 的核心思想
4.2、 IPPO 如何适配多智能体场景?
4.2.1、步骤拆解:
4.3、 IPPO 的优势
5、完整代码
6、实验结果
1、什么是多智能体强化学习?
多智能体强化学习(Multi-Agent Reinforcement Learning,MARL)是研究多个智能体在同一环境中交互、协作或竞争,并通过学习优化各自策略的领域。
和单智能体强化学习(如 AlphaGo 独自学习围棋)不同,MARL 的核心是智能体之间的相互影响:比如自动驾驶中多辆车的避障(协作)、团队游戏中 5v5 的对抗(竞争 + 协作)、无人机群的协同搜救(纯协作)。
举个例子:在《王者荣耀》这样的 5v5 游戏中,每个英雄都是一个智能体,需要学习 “何时配合队友抓人”“如何分工推塔”—— 这就是典型的多智能体强化学习场景。
2、多智能体强化学习的问题建模
要理解 MARL,先明确其核心要素和挑战。我们用符号和通俗例子结合的方式说明:
2.1、 核心要素
- 智能体(Agents):多个独立决策的个体,记为
(比如 5 个游戏玩家)。
- 状态(State):环境的全局信息,记为
(比如游戏中所有英雄的位置、血量)。
- 局部观测(Observation):智能体 i 能看到的部分信息,记为
(比如你在游戏中只能看到自己视野内的敌人)。
- 动作(Action):智能体 i 选择的行为,记为
(比如 “释放技能”“移动”)。所有智能体的动作组成联合动作
。
- 奖励(Reward):
- 个体奖励
:智能体 i 获得的局部反馈(比如你击杀敌人得到的金币);
- 全局奖励 r:整个团队的反馈(比如团队推掉水晶的胜利奖励)。
- 个体奖励
- 策略(Policy):智能体 i 根据观测选择动作的规则,记为
(比如 “看到敌人残血就追击”)。
2.2、 核心挑战
多智能体的复杂之处,源于 “智能体之间的相互影响”,带来了单智能体没有的问题:
- 环境非平稳性:对单个智能体来说,其他智能体的策略在学习过程中会不断变化,相当于 “环境在动态改变”。比如你刚适应队友的打法,队友又学会了新策略,导致你之前的经验失效。
- 信用分配难题:当团队获得全局奖励(比如 “赢了比赛”),很难判断每个智能体的贡献(是辅助的控制关键,还是 ADC 的输出关键?),导致个体策略难以优化。
- 通信与协作:智能体需要通过动作间接 “通信”(比如游戏中 “信号 ping 敌方”),如何学习高效的协作模式是难点。
2.3、目标:优化联合策略
3、多智能体强化学习的基本求解范式
针对多智能体的协作与决策,有两种极端的求解思路:完全中心化和完全去中心化。
3.1、 完全中心化(Centralized)
3.1.1、核心思想:
有一个 “中央控制器”,掌握所有智能体的状态、动作和全局奖励,统一优化所有智能体的策略。
3.1.2、具体做法:
- 中央控制器拥有全局视角:知道所有智能体的观测
和环境状态 s;
- 输出联合动作:直接决定每个智能体该做什么(比如教练在场边指挥每个队员的跑位);
- 优化目标:基于全局奖励 r 调整联合策略,让团队整体收益最大。
3.1.3、优点:
- 能直接优化全局目标,协作效率高(比如无人机群在中央控制下精准避障)。
3.1.4、缺点:
- 计算量爆炸:当智能体数量 N 增加(比如 100 个无人机),联合动作空间
太大,无法处理;
- 灵活性差:中央控制器故障会导致整个系统瘫痪,且智能体缺乏独立决策能力(比如教练不在,队员不会打了)。
3.2、 完全去中心化(Decentralized)
3.2.1、核心思想:
每个智能体独立学习,只根据自己的局部观测和个体奖励决策,完全不知道其他智能体的信息。
3.2.2、具体做法:
- 每个智能体 i 只使用自己的观测
和奖励
;
- 独立优化自身策略
,比如用单智能体的 PPO、Q-learning 等算法(比如游戏中每个玩家只靠自己的经验摸索打法)。
3.2.3、优点:
- 计算量小:每个智能体的策略独立优化,不受其他智能体数量影响;
- 鲁棒性强:单个智能体故障不影响整体(比如一个无人机没电,其他仍能工作)。
3.2.4、缺点:
- 协作困难:智能体不知道队友的意图,容易出现 “各自为战”(比如游戏中队友同时抢一个 buff);
- 难以利用全局奖励:个体看不到团队整体收益,可能 “捡芝麻丢西瓜”(比如为了个人击杀放弃推塔)。
3.3、 为什么需要 “中间方案”?
完全中心化和去中心化都有明显缺陷。实际中常用 “去中心化执行,中心化评估”(Decentralized Execution, Centralized Critic,DECC):
- 执行时:每个智能体独立决策(去中心化,灵活高效);
- 评估时:有一个全局 “评委”(Centralized Critic),基于全局信息判断当前联合动作的好坏(中心化,解决信用分配问题)。
IPPO 算法就是这种中间方案的典型代表。
4、IPPO:多智能体版的 PPO 算法
IPPO(Independent PPO)是基于单智能体 PPO 扩展的多智能体算法,核心是 “让每个智能体独立用 PPO 学习,但用全局信息辅助评估”,实现高效协作。
4.1、 先回顾 PPO 的核心思想
单智能体 PPO 是通过 “信任域优化” 稳定训练的算法:每次更新策略时,限制新策略与旧策略的差异(用 “剪辑” 技巧),避免更新幅度过大导致训练崩溃。
核心公式(剪辑损失):
其中 A(s,a) 是优势函数(衡量动作的好坏),是剪辑系数(通常取 0.2)。
4.2、 IPPO 如何适配多智能体场景?
IPPO 的核心是 **“每个智能体有独立的 PPO 策略,但共享一个集中式 Critic 评估全局优势”**。
4.2.1、步骤拆解:
-
步骤 1:去中心化执行 每个智能体 i 基于自己的观测
,用独立的策略网络
选择动作(比如游戏中每个英雄自己决定技能释放)。
-
步骤 2:中心化评估 有一个全局 Critic 网络,输入所有智能体的观测和动作(即联合观测
和联合动作
),输出全局优势函数
—— 衡量当前联合动作对团队整体的价值(比如 “这波团战的操作是赚还是亏”)。
-
步骤 3:用全局优势更新个体策略 每个智能体 i 的策略损失参考 PPO 的剪辑公式,但用全局优势 A 替代单智能体的优势:
这样,每个智能体虽然独立决策,但能通过全局优势知道 “自己的动作对团队的贡献”,从而优化策略。
-
步骤 4:重复迭代 智能体不断与环境交互,收集联合观测、动作和全局奖励,更新 Critic(评估)和各自的策略(决策),直到策略收敛。
4.3、 IPPO 的优势
- 解决信用分配问题:通过全局 Critic,每个智能体能知道自己的动作在团队中的价值(比如 “我这波控制帮队友拿到了击杀,是有贡献的”);
- 训练稳定:继承 PPO 的剪辑机制,避免多智能体环境中策略突变导致的训练震荡;
- 灵活性高:去中心化执行让智能体可独立适应环境变化(比如队友策略变了,自己也能调整)。
5、完整代码
import torch
import torch.nn.functional as F
import numpy as np
import rl_utils # 自定义强化学习工具库,包含优势函数计算等功能
from tqdm import tqdm # 用于显示训练进度条
import matplotlib.pyplot as plt # 用于绘制胜率曲线
from ma_gym.envs.combat.combat import Combat # 导入多智能体战斗环境class PolicyNet(torch.nn.Module):"""策略网络,用于输出动作概率分布"""def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim) # 第一层全连接self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) # 第二层全连接self.fc3 = torch.nn.Linear(hidden_dim, action_dim) # 输出层,维度为动作空间大小def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x)))) # 两层ReLU激活的非线性变换return F.softmax(self.fc3(x), dim=1) # 将输出转换为概率分布class ValueNet(torch.nn.Module):"""价值网络,用于评估状态价值"""def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim) # 第一层全连接self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim) # 第二层全连接self.fc3 = torch.nn.Linear(hidden_dim, 1) # 输出层,输出单个状态价值def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x)))) # 两层ReLU激活的非线性变换return self.fc3(x) # 返回状态价值class PPO:''' PPO算法,采用截断方式 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, eps, gamma, device):self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) # 策略网络(演员)self.critic = ValueNet(state_dim, hidden_dim).to(device) # 价值网络(评论家)self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr) # 演员优化器self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr) # 评论家优化器self.gamma = gamma # 折扣因子,控制未来奖励的重要性self.lmbda = lmbda # GAE(广义优势估计)的参数self.eps = eps # PPO中截断范围的参数,防止策略更新过大self.device = device # 运行设备(CPU或GPU)def take_action(self, state):"""根据当前状态选择动作"""state = torch.tensor([state], dtype=torch.float).to(self.device) # 将状态转换为张量probs = self.actor(state) # 获取动作概率分布action_dist = torch.distributions.Categorical(probs) # 创建分类分布action = action_dist.sample() # 从分布中采样动作return action.item() # 返回动作的标量值def update(self, transition_dict):"""更新策略网络和价值网络"""# 从字典中提取各元素并转换为张量states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)# 计算TD目标(贝尔曼目标)td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)# 计算TD误差td_delta = td_target - self.critic(states)# 计算优势函数(使用GAE方法)advantage = rl_utils.compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)# 计算旧策略下动作的对数概率(用于重要性采样)old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()# 计算新策略下动作的对数概率log_probs = torch.log(self.actor(states).gather(1, actions))# 计算新旧策略的概率比ratio = torch.exp(log_probs - old_log_probs)# PPO-Clip损失的两部分surr1 = ratio * advantage # 未截断的目标surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage # 截断后的目标# 演员损失(取两部分的最小值,加负号转为最大化问题)actor_loss = torch.mean(-torch.min(surr1, surr2))# 评论家损失(均方误差)critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))# 反向传播和优化self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()actor_loss.backward()critic_loss.backward()self.actor_optimizer.step()self.critic_optimizer.step()if __name__ == '__main__':# 超参数设置actor_lr = 3e-4 # 演员网络学习率critic_lr = 1e-3 # 评论家网络学习率num_episodes = 100000 # 总训练回合数hidden_dim = 64 # 隐藏层维度gamma = 0.99 # 折扣因子lmbda = 0.97 # GAE参数eps = 0.2 # PPO截断参数device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") # 运行设备team_size = 2 # 每队智能体数量grid_size = (15, 15) # 网格世界大小# 创建Combat环境,双方各有2个智能体env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)# 获取状态空间和动作空间维度state_dim = env.observation_space[0].shape[0]action_dim = env.action_space[0].n# 创建PPO智能体(两个智能体共享同一个策略,即参数共享)agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, eps, gamma, device)win_list = [] # 存储每回合的胜负结果# 分阶段训练,每个阶段显示进度条for i in range(10):with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:for i_episode in range(int(num_episodes / 10)):# 为两个智能体分别创建经验字典transition_dict_1 = {'states': [],'actions': [],'next_states': [],'rewards': [],'dones': []}transition_dict_2 = {'states': [],'actions': [],'next_states': [],'rewards': [],'dones': []}s = env.reset() # 重置环境,获取初始状态terminal = False # 回合终止标志# 与环境交互直到回合结束while not terminal:# 两个智能体分别选择动作a_1 = agent.take_action(s[0])a_2 = agent.take_action(s[1])# 执行动作,获取下一状态、奖励、终止标志和额外信息next_s, r, done, info = env.step([a_1, a_2])# 存储智能体1的经验transition_dict_1['states'].append(s[0])transition_dict_1['actions'].append(a_1)transition_dict_1['next_states'].append(next_s[0])# 如果获胜,奖励增加100;否则减少0.1(稀疏奖励设计)transition_dict_1['rewards'].append(r[0] + 100 if info['win'] else r[0] - 0.1)transition_dict_1['dones'].append(False)# 存储智能体2的经验(同理)transition_dict_2['states'].append(s[1])transition_dict_2['actions'].append(a_2)transition_dict_2['next_states'].append(next_s[1])transition_dict_2['rewards'].append(r[1] + 100 if info['win'] else r[1] - 0.1)transition_dict_2['dones'].append(False)s = next_s # 更新状态terminal = all(done) # 当所有智能体都完成时,回合结束# 记录本回合胜负结果win_list.append(1 if info["win"] else 0)# 分别使用两个智能体的经验更新策略agent.update(transition_dict_1)agent.update(transition_dict_2)# 每100个回合更新一次进度条,显示当前胜率if (i_episode + 1) % 100 == 0:pbar.set_postfix({'episode': '%d' % (num_episodes / 10 * i + i_episode + 1),'return': '%.3f' % np.mean(win_list[-100:]) # 最近100回合的平均胜率})pbar.update(1) # 更新进度条# 处理胜率数据并绘制曲线win_array = np.array(win_list)# 每100条轨迹取一次平均,平滑胜率曲线win_array = np.mean(win_array.reshape(-1, 100), axis=1)episodes_list = np.arange(win_array.shape[0]) * 100 # 计算X轴(回合数)plt.plot(episodes_list, win_array) # 绘制胜率曲线plt.xlabel('Episodes') # X轴标签plt.ylabel('Win rate') # Y轴标签plt.title('IPPO on Combat') # 标题plt.show() # 显示图像