完整强化学习教程:基于4x4网格世界的智能体探索之旅(一)
第二部分:
https://blog.csdn.net/hzm8341/article/details/148630875?spm=1001.2014.3001.5502
前言:智能体的冒险故事
想象一下,有一个名叫"小智"的机器人,它被放置在一个神秘的4×4网格世界中。这个世界充满了挑战:
- 起始位置:左上角 (1,1) - 小智的出生地
- 目标位置:右下角 (4,4) - 藏着宝藏的地方
- 障碍物:(2,2) 和 (3,3) 位置有不可穿越的障碍
- 可用动作:上、下、左、右、不动 (5种选择)
- 使命:找到从起点到宝藏的最优路径
网格世界地图:
+---+---+---+---+
| 🤖| | | | 🤖 = 小智起点 (1,1)
+---+---+---+---+
| | 🚫| | | 🚫 = 障碍物
+---+---+---+---+
| | | 🚫| |
+---+---+---+---+
| | | | 💎| 💎 = 宝藏目标 (4,4)
+---+---+---+---+
第一部分:为什么我们需要强化学习?
传统方法的局限性
-
静态规划的问题
- 如果我们预先编程一条路径,当环境发生变化时怎么办?
- 如果障碍物的位置经常改变,预设路径就失效了
-
不确定性的挑战
- 现实中,小智的动作可能不总是100%成功
- 比如:想向右走,但有10%的概率会滑向其他方向
-
最优化的需求
- 不仅要到达目标,还要找到最短、最安全的路径
- 需要在探索新路径和利用已知最佳路径之间平衡
-
自主学习的重要性
- 我们希望小智能够自己发现和学习
- 而不是被告知每一步该怎么做
这就是强化学习的用武之地! 强化学习让智能体通过与环境交互,在试错中学习最优策略。
第一章:状态表示 - 小智如何理解世界
1.1 什么是状态?
状态(State) 是对当前环境情况的完整描述。对小智来说,状态就是它当前在网格中的位置。
1.2 数学建模
我们用坐标 ( x , y ) (x, y) (x,y) 表示状态:
- x x x:行号(1到4)
- y y y:列号(1到4)
状态空间定义:
S = { ( x , y ) ∣ x , y ∈ { 1 , 2 , 3 , 4 } , ( x , y ) ∉ 障碍物 } S = \{(x,y) | x,y \in \{1,2,3,4\}, (x,y) \notin \text{障碍物}\} S={(x,y)∣x,y∈{1,2,3,4},(x,y)∈/障碍物}
总状态数: ∣ S ∣ = 4 × 4 − 2 = 14 |S| = 4 \times 4 - 2 = 14 ∣S∣=4×4−2=14个有效状态
1.3 状态编码系统
为了让计算机处理,我们需要将二维坐标转换为一维数字:
编码公式:
state_id = ( x − 1 ) × 4 + ( y − 1 ) \text{state\_id} = (x-1) \times 4 + (y-1) state_id=(x−1)×4+(y−1)
解码公式:
x = ⌊ state_id / 4 ⌋ + 1 x = \lfloor \text{state\_id} / 4 \rfloor + 1 x=⌊state_id/4⌋+1
y = ( state_id m o d 4 ) + 1 y = (\text{state\_id} \bmod 4) + 1 y=(state_idmod4)+1
1.4 Python实现
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import matplotlib.patches as mpatchesclass GridWorldEnvironment:"""4x4网格世界环境"""def __init__(self):self.grid_size = 4self.start_pos = (1, 1) # 1-based坐标self.goal_pos = (4, 4)self.obstacles = [(2, 2), (3, 3)]# 动作定义self.actions = {0: '上',1: '下', 2: '左',3: '右',4: '不动'}# 动作对应的坐标变化self.action_effects = {0: (-1, 0), # 上1: (1, 0), # 下2: (0, -1), # 左3: (0, 1), # 右4: (0, 0) # 不动}def pos_to_state_id(self, x, y):"""位置坐标转状态ID (1-based转0-based)"""return (x - 1) * self.grid_size + (y - 1)def state_id_to_pos(self, state_id):"""状态ID转位置坐标 (0-based转1-based)"""x = state_id // self.grid_size + 1y = state_id % self.grid_size + 1return (x, y)def is_valid_position(self, x, y):"""检查位置是否有效"""# 边界检查if x < 1 or x > self.grid_size or y < 1 or y > self.grid_size:return False# 障碍物检查if (x, y) in self.obstacles:return Falsereturn Truedef get_all_valid_states(self):"""获取所有有效状态ID"""valid_states = []for x in range(1, self.grid_size + 1):for y in range(1, self.grid_size + 1):if self.is_valid_position(x, y):valid_states.append(self.pos_to_state_id(x, y))return valid_statesdef visualize_world(self):"""可视化网格世界"""fig, ax = plt.subplots(figsize=(8, 8))# 绘制网格线for i in range(self.grid_size + 1):ax.axhline(i, color='black', linewidth=1)ax.axvline(i, color='black', linewidth=1)# 标记起点start_rect = Rectangle((self.start_pos[1]-1, self.grid_size-self.start_pos[0]), 1, 1, facecolor='lightgreen', alpha=0.7, edgecolor='green', linewidth=2)ax.add_patch(start_rect)ax.text(self.start_pos[1]-0.5, self.grid_size-self.start_pos[0]+0.5, '🤖\n起点', ha='center', va='center', fontsize=12, fontweight='bold')# 标记终点goal_rect = Rectangle((self.goal_pos[1]-1, self.grid_size-self.goal_pos[0]), 1, 1, facecolor='lightcoral', alpha=0.7, edgecolor='red', linewidth=2)ax.add_patch(goal_rect)ax.text(self.goal_pos[1]-0.5, self.grid_size-self.goal_pos[0]+0.5, '💎\n目标', ha='center', va='center', fontsize=12, fontweight='bold')# 标记障碍物for obs_x, obs_y in self.obstacles:obs_rect = Rectangle((obs_y-1, self.grid_size-obs_x), 1, 1, facecolor='gray', alpha=0.8, edgecolor='black', linewidth=2)ax.add_patch(obs_rect)ax.text(obs_y-0.5, self.grid_size-obs_x+0.5, '🚫\n障碍', ha='center', va='center', color='white', fontsize=10, fontweight='bold')# 标记状态IDfor x in range(1, self.grid_size + 1):for y in range(1, self.grid_size + 1):if self.is_valid_position(x, y):state_id = self.pos_to_state_id(x, y)ax.text(y-0.9, self.grid_size-x+0.1, f'S{state_id}', fontsize=8, color='blue', fontweight='bold')ax.set_xlim(0, self.grid_size)ax.set_ylim(0, self.grid_size)ax.set_aspect('equal')ax.set_title('4×4网格世界 - 状态表示', fontsize=16, fontweight='bold')# 添加图例legend_elements = [mpatches.Patch(color='lightgreen', label='起点 (1,1)'),mpatches.Patch(color='lightcoral', label='目标 (4,4)'),mpatches.Patch(color='gray', label='障碍物')]ax.legend(handles=legend_elements, loc='upper right', bbox_to_anchor=(1.2, 1))plt.tight_layout()return fig# 创建环境并测试
env = GridWorldEnvironment()print("=== 网格世界环境信息 ===")
print(f"网格大小: {env.grid_size}×{env.grid_size}")
print(f"起点位置: {env.start_pos}")
print(f"目标位置: {env.goal_pos}")
print(f"障碍物位置: {env.obstacles}")
print(f"可用动作: {list(env.actions.values())}")print("\n=== 状态空间分析 ===")
valid_states = env.get_all_valid_states()
print(f"有效状态ID列表: {valid_states}")
print(f"总状态数: {len(valid_states)}")print("\n=== 状态编码测试 ===")
test_positions = [(1,1), (2,3), (4,4), (3,1)]
for pos in test_positions:if env.is_valid_position(pos[0], pos[1]):state_id = env.pos_to_state_id(pos[0], pos[1])recovered_pos = env.state_id_to_pos(state_id)print(f"位置{pos} → 状态ID:{state_id} → 恢复位置:{recovered_pos}")# 可视化(这里只是代码,实际运行时会显示图像)
# fig = env.visualize_world()
# plt.show()
第二章:马尔科夫过程 - 小智的"健忘症"优势
2.1 什么是马尔科夫性质?
想象小智有一种特殊的"健忘症":它只记得现在在哪里,不记得是怎么来到这里的。这听起来像缺点,但实际上这是一个重要的数学性质!
马尔科夫性质:未来只依赖于现在,不依赖于过去的历史。
数学表达:
P ( S t + 1 = s ′ ∣ S t = s , S t − 1 = s t − 1 , . . . , S 0 = s 0 ) = P ( S t + 1 = s ′ ∣ S t = s ) P(S_{t+1} = s' | S_t = s, S_{t-1} = s_{t-1}, ..., S_0 = s_0) = P(S_{t+1} = s' | S_t = s) P(St+1=s′∣St=s,St−1=st−1,...,S0=s0)=P(St+1=s′∣St=s)
白话解释:不管小智之前走过什么路径,只要它现在在位置A,下一步到位置B的概率总是一样的。
2.2 马尔科夫过程的定义
马尔科夫过程是一个二元组: M = ( S , P ) \mathcal{M} = (\mathcal{S}, \mathcal{P}) M=(S,P)
其中:
- S \mathcal{S} S:状态空间(所有可能的位置)
- P \mathcal{P} P:状态转移概率矩阵
状态转移概率:
P s s ′ = P ( S t + 1 = s ′ ∣ S t = s ) \mathcal{P}_{ss'} = P(S_{t+1} = s' | S_t = s) Pss′=P(St+1=s′∣St=s)
重要性质:
∑ s ′ ∈ S P s s ′ = 1 \sum_{s' \in \mathcal{S}} \mathcal{P}_{ss'} = 1 s′∈S∑Pss′=1
(从任何状态出发,所有可能转移的概率和为1)
2.3 小智世界的马尔科夫过程
在我们的网格世界中,状态转移不是确定的!小智的动作有一定的随机性:
- 主要方向:70%概率按期望方向移动
- 侧滑现象:每个垂直方向15%概率滑向侧边
- 边界处理:如果滑向边界或障碍物,就停在原地
2.4 转移概率的计算
假设小智在状态 s s s选择动作"向右":
P ( s ′ ∣ s , 向右 ) = { 0.7 如果 s ′ 是右边的有效位置 0.15 如果 s ′ 是上方的有效位置 0.15 如果 s ′ 是下方的有效位置 0.7 + 无效移动概率 如果 s ′ = s (原地不动) 0 其他情况 P(s'|s, \text{向右}) = \begin{cases} 0.7 & \text{如果 } s' \text{ 是右边的有效位置} \\ 0.15 & \text{如果 } s' \text{ 是上方的有效位置} \\ 0.15 & \text{如果 } s' \text{ 是下方的有效位置} \\ 0.7 + \text{无效移动概率} & \text{如果 } s' = s \text{ (原地不动)} \\ 0 & \text{其他情况} \end{cases} P(s′∣s,向右)=⎩ ⎨ ⎧0.70.150.150.7+无效移动概率0如果 s′ 是右边的有效位置如果 s′ 是上方的有效位置如果 s′ 是下方的有效位置如果 s′=s (原地不动)其他情况
2.5 Python实现
import numpy as np
import pandas as pd
from collections import defaultdictclass MarkovGridWorld(GridWorldEnvironment):"""带有马尔科夫转移概率的网格世界"""def __init__(self, success_prob=0.7, slip_prob=0.15):super().__init__()self.success_prob = success_prob # 成功执行动作的概率self.slip_prob = slip_prob # 向两侧滑动的概率self.stay_prob = 1 - success_prob - 2 * slip_prob # 原地不动的概率# 构建状态转移矩阵self.valid_states = self.get_all_valid_states()self.num_states = len(self.valid_states)self.num_actions = len(self.actions)# 创建状态ID到索引的映射self.state_to_idx = {state: idx for idx, state in enumerate(self.valid_states)}self.idx_to_state = {idx: state for state, idx in self.state_to_idx.items()}# 构建转移概率矩阵 P[s][a][s'] = P(s'|s,a)self.transition_probs = self._build_transition_matrix()def _get_neighbors(self, x, y):"""获取位置(x,y)的四个邻居位置"""neighbors = {}for action_id, (dx, dy) in self.action_effects.items():if action_id == 4: # 跳过"不动"动作continuenew_x, new_y = x + dx, y + dyneighbors[action_id] = (new_x, new_y)return neighborsdef _build_transition_matrix(self):"""构建状态转移概率矩阵"""# P[state_idx][action][next_state_idx] = probabilityP = np.zeros((self.num_states, self.num_actions, self.num_states))for state_idx, state_id in enumerate(self.valid_states):x, y = self.state_id_to_pos(state_id)for action_id in range(self.num_actions):if action_id == 4: # "不动"动作P[state_idx][action_id][state_idx] = 1.0continue# 获取期望的下一个位置dx, dy = self.action_effects[action_id]intended_x, intended_y = x + dx, y + dy# 获取所有可能的移动方向neighbors = self._get_neighbors(x, y)# 分配概率for move_action, (next_x, next_y) in neighbors.items():# 确定这个移动方向的概率if move_action == action_id:# 期望方向prob = self.success_probelif self._are_perpendicular(action_id, move_action):# 垂直方向(侧滑)prob = self.slip_probelse:# 相反方向或其他,概率为0prob = 0.0if prob > 0:if self.is_valid_position(next_x, next_y):# 移动到有效位置next_state_id = self.pos_to_state_id(next_x, next_y)next_state_idx = self.state_to_idx[next_state_id]P[state_idx][action_id][next_state_idx] += probelse:# 移动到无效位置,停在原地P[state_idx][action_id][state_idx] += probreturn Pdef _are_perpendicular(self, action1, action2):"""检查两个动作是否垂直"""# 上下(0,1)与左右(2,3)垂直vertical_actions = {0, 1}horizontal_actions = {2, 3}return ((action1 in vertical_actions and action2 in horizontal_actions) or(action1 in horizontal_actions and action2 in vertical_actions))def get_transition_prob(self, state_id, action_id, next_state_id):"""获取状态转移概率 P(next_state|state, action)"""state_idx = self.state_to_idx[state_id]next_state_idx = self.state_to_idx[next_state_id]return self.transition_probs[state_idx][action_id][next_state_idx]def get_possible_next_states(self, state_id, action_id):"""获取给定状态和动作下所有可能的下一状态及其概率"""state_idx = self.state_to_idx[state_id]next_state_probs = self.transition_probs[state_idx][action_id]result = []for next_state_idx, prob in enumerate(next_state_probs):if prob > 0:next_state_id = self.idx_to_state[next_state_idx]result.append((next_state_id, prob))return resultdef simulate_transition(self, state_id, action_id):"""模拟状态转移"""state_idx = self.state_to_idx[state_id]next_state_probs = self.transition_probs[state_idx][action_id]# 按概率随机选择下一个状态next_state_idx = np.random.choice(self.num_states, p=next_state_probs)next_state_id = self.idx_to_state[next_state_idx]return next_state_iddef print_transition_matrix(self, state_id, action_id):"""打印特定状态和动作的转移概率"""print(f"\n状态 {state_id} ({self.state_id_to_pos(state_id)}) 执行动作 '{self.actions[action_id]}' 的转移概率:")print("-" * 50)possible_states = self.get_possible_next_states(state_id, action_id)for next_state_id, prob in possible_states:next_pos = self.state_id_to_pos(next_state_id)print(f" → 状态 {next_state_id} {next_pos}: {prob:.3f}")def verify_markov_property(self):"""验证马尔科夫性质:每行概率和为1"""print("=== 验证马尔科夫性质 ===")all_valid = Truefor state_idx in range(self.num_states):state_id = self.idx_to_state[state_idx]for action_id in range(self.num_actions):prob_sum = np.sum(self.transition_probs[state_idx][action_id])if abs(prob_sum - 1.0) > 1e-6:print(f"警告: 状态{state_id}动作{action_id}的概率和为{prob_sum:.6f}")all_valid = Falseif all_valid:print("✓ 所有转移概率和都等于1,满足马尔科夫性质")return all_valid# 创建马尔科夫网格世界
markov_env = MarkovGridWorld(success_prob=0.7, slip_prob=0.15)print("=== 马尔科夫网格世界设置 ===")
print(f"成功概率: {markov_env.success_prob}")
print(f"侧滑概率: {markov_env.slip_prob} (每边)")
print(f"状态数量: {markov_env.num_states}")
print(f"动作数量: {markov_env.num_actions}")# 验证马尔科夫性质
markov_env.verify_markov_property()# 展示几个具体的转移概率例子
print("\n=== 转移概率示例 ===")
test_cases = [(0, 3), # 起点向右(5, 1), # 中间位置向下(15, 0), # 终点向上
]for state_id, action_id in test_cases:if state_id in markov_env.valid_states:markov_env.print_transition_matrix(state_id, action_id)# 模拟一次状态转移
print("\n=== 状态转移模拟 ===")
current_state = 0 # 起点
action = 3 # 向右
print(f"当前状态: {current_state} {markov_env.state_id_to_pos(current_state)}")
print(f"执行动作: {markov_env.actions[action]}")# 模拟10次转移
print("10次模拟结果:")
for i in range(10):next_state = markov_env.simulate_transition(current_state, action)next_pos = markov_env.state_id_to_pos(next_state)print(f" 第{i+1}次: → 状态 {next_state} {next_pos}")
2.6 马尔科夫链的数学性质
2.6.1 转移矩阵的特性
对于马尔科夫链的转移矩阵 P \mathbf{P} P:
- 非负性: P i j ≥ 0 P_{ij} \geq 0 Pij≥0 (概率不能为负)
- 行随机性: ∑ j P i j = 1 \sum_j P_{ij} = 1 ∑jPij=1 (每行概率和为1)
- Chapman-Kolmogorov方程:
P i j ( n ) = ∑ k P i k ( n − 1 ) P k j P^{(n)}_{ij} = \sum_k P^{(n-1)}_{ik} P_{kj} Pij(n)=k∑Pik(n−1)Pkj
2.6.2 n步转移概率
从状态 i i i经过 n n n步转移到状态 j j j的概率:
P i j ( n ) = ( P n ) i j P^{(n)}_{ij} = (\mathbf{P}^n)_{ij} Pij(n)=(Pn)ij
其中 P n \mathbf{P}^n Pn是转移矩阵的 n n n次幂。
2.6.3 稳态分布
如果存在分布 π \pi π使得:
π = π P \pi = \pi \mathbf{P} π=πP
则 π \pi π称为稳态分布或平稳分布。
def analyze_markov_chain_properties(markov_env):"""分析马尔科夫链的数学性质"""print("=== 马尔科夫链数学性质分析 ===")# 我们需要处理每个动作对应的转移矩阵for action_id in range(markov_env.num_actions):print(f"\n动作 '{markov_env.actions[action_id]}' 的转移矩阵分析:")# 提取该动作的转移矩阵P = markov_env.transition_probs[:, action_id, :]# 计算特征值和特征向量eigenvalues, eigenvectors = np.linalg.eig(P.T) # 转置因为我们要左特征向量# 找到特征值为1的特征向量(稳态分布)steady_state_idx = np.argmax(np.real(eigenvalues))steady_state = np.real(eigenvectors[:, steady_state_idx])steady_state = steady_state / np.sum(steady_state) # 归一化print(f" 最大特征值: {np.real(eigenvalues[steady_state_idx]):.6f}")print(f" 稳态分布存在性: {'是' if abs(eigenvalues[steady_state_idx] - 1) < 1e-6 else '否'}")# 计算多步转移概率print(f" 2步转移矩阵的最大元素: {np.max(np.linalg.matrix_power(P, 2)):.6f}")print(f" 5步转移矩阵的最大元素: {np.max(np.linalg.matrix_power(P, 5)):.6f}")# 运行分析
# analyze_markov_chain_properties(markov_env)
小结
马尔科夫过程为我们提供了描述随机系统演化的数学框架。在强化学习中,这个性质让我们可以:
- 简化问题:只需考虑当前状态,不用记住全部历史
- 数学建模:用转移概率矩阵描述环境动态
- 预测未来:计算多步转移概率
- 理论分析:研究系统的长期行为
下一章我们将看到,当我们加入奖励和决策时,马尔科夫过程如何演化为马尔科夫决策过程,这是强化学习的核心数学框架。
第三章:贝尔曼方程 - 小智的价值观念
3.1 从马尔科夫过程到马尔科夫决策过程
之前我们的小智只是随机游走,现在我们要给它一个目标:获得最大的奖励!这就需要引入两个新概念:
- 奖励函数:告诉小智每个行为的好坏
- 策略:小智的决策规则
马尔科夫决策过程(MDP) 是一个五元组: M = ( S , A , P , R , γ ) \mathcal{M} = (\mathcal{S}, \mathcal{A}, \mathcal{P}, \mathcal{R}, \gamma) M=(S,A,P,R,γ)
其中:
- S \mathcal{S} S:状态空间
- A \mathcal{A} A:动作空间
- P \mathcal{P} P:状态转移概率
- R \mathcal{R} R:奖励函数
- γ \gamma γ:折扣因子(0 ≤ γ ≤ 1)
3.2 奖励函数的设计
在我们的网格世界中,奖励函数 R ( s , a , s ′ ) R(s,a,s') R(s,a,s′)定义为:
R ( s , a , s ′ ) = { + 100 如果 s ′ 是目标状态 (4,4) − 1 如果 s ′ 不是目标状态 − 10 如果撞到障碍物或边界 R(s,a,s') = \begin{cases} +100 & \text{如果 } s' \text{ 是目标状态 (4,4)} \\ -1 & \text{如果 } s' \text{ 不是目标状态} \\ -10 & \text{如果撞到障碍物或边界} \end{cases} R(s,a,s′)=⎩ ⎨ ⎧+100−1−10如果 s′ 是目标状态 (4,4)如果 s′ 不是目标状态如果撞到障碍物或边界
设计理念:
- 正奖励:鼓励到达目标
- 负奖励:鼓励尽快完成任务
- 惩罚:避免无效动作
3.3 策略的定义
策略(Policy) π ( a ∣ s ) \pi(a|s) π(a∣s) 是一个概率分布,表示在状态 s s s下选择动作 a a a的概率。
- 确定性策略: π ( s ) = a \pi(s) = a π(s)=a(在状态 s s s总是选择动作 a a a)
- 随机性策略: π ( a ∣ s ) = P ( A t = a ∣ S t = s ) \pi(a|s) = P(A_t = a | S_t = s) π(a∣s)=P(At=a∣St=s)
3.4 价值函数
3.4.1 状态价值函数
状态价值函数 V π ( s ) V^\pi(s) Vπ(s) 表示从状态 s s s开始,遵循策略 π \pi π的期望累积奖励:
V π ( s ) = E π [ ∑ t = 0 ∞ γ t R t + 1 ∣ S 0 = s ] V^\pi(s) = \mathbb{E}_\pi \left[ \sum_{t=0}^\infty \gamma^t R_{t+1} | S_0 = s \right] Vπ(s)=Eπ[t=0∑∞γtRt+1∣S0=s]
直观理解:如果小智在状态 s s s,按照策略 π \pi π行动,预期能获得多少总奖励?
3.4.2 动作价值函数
动作价值函数 Q π ( s , a ) Q^\pi(s,a) Qπ(s,a) 表示在状态 s s s执行动作 a a a,然后遵循策略 π \pi π的期望累积奖励:
Q π ( s , a ) = E π [ ∑ t = 0 ∞ γ t R t + 1 ∣ S 0 = s , A 0 = a ] Q^\pi(s,a) = \mathbb{E}_\pi \left[ \sum_{t=0}^\infty \gamma^t R_{t+1} | S_0 = s, A_0 = a \right] Qπ(s,a)=Eπ[t=0∑∞γtRt+1∣S0=s,A0=a]
直观理解:在状态 s s s执行特定动作 a a a的价值是多少?
3.5 贝尔曼方程的推导
贝尔曼方程是强化学习的核心!让我们一步步推导。
3.5.1 状态价值函数的贝尔曼方程
从价值函数的定义开始:
V π ( s ) = E π [ ∑ t = 0 ∞ γ t R t + 1 ∣ S 0 = s ] V^\pi(s) = \mathbb{E}_\pi \left[ \sum_{t=0}^\infty \gamma^t R_{t+1} | S_0 = s \right] Vπ(s)=Eπ[t=0∑∞γtRt+1∣S0=s]
将求和分解为第一项和其余项:
V π ( s ) = E π [ R 1 + γ ∑ t = 1 ∞ γ t − 1 R t + 1 ∣ S 0 = s ] V^\pi(s) = \mathbb{E}_\pi \left[ R_1 + \gamma \sum_{t=1}^\infty \gamma^{t-1} R_{t+1} | S_0 = s \right] Vπ(s)=Eπ[R1+γt=1∑∞γt−1Rt+1∣S0=s]
利用马尔科夫性质和期望的线性:
V π ( s ) = E π [ R 1 ∣ S 0 = s ] + γ E π [ ∑ t = 1 ∞ γ t − 1 R t + 1 ∣ S 0 = s ] V^\pi(s) = \mathbb{E}_\pi [R_1 | S_0 = s] + \gamma \mathbb{E}_\pi \left[ \sum_{t=1}^\infty \gamma^{t-1} R_{t+1} | S_0 = s \right] Vπ(s)=Eπ[R1∣S0=s]+γEπ[t=1∑∞γt−1Rt+1∣S0=s]
注意到第二项实际上是从下一个状态开始的价值函数:
V π ( s ) = E π [ R 1 ∣ S 0 = s ] + γ E π [ V π ( S 1 ) ∣ S 0 = s ] V^\pi(s) = \mathbb{E}_\pi [R_1 | S_0 = s] + \gamma \mathbb{E}_\pi [V^\pi(S_1) | S_0 = s] Vπ(s)=Eπ[R1∣S0=s]+γEπ[Vπ(S1)∣S0=s]
展开期望:
V π ( s ) = ∑ a π ( a ∣ s ) ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ V π ( s ′ ) ] V^\pi(s) = \sum_a \pi(a|s) \sum_{s'} P(s'|s,a) [R(s,a,s') + \gamma V^\pi(s')] Vπ(s)=a∑π(a∣s)s′∑P(s′∣s,a)[R(s,a,s′)+γVπ(s′)]
这就是贝尔曼方程!
3.5.2 动作价值函数的贝尔曼方程
类似地,我们可以推导出:
Q π ( s , a ) = ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ ∑ a ′ π ( a ′ ∣ s ′ ) Q π ( s ′ , a ′ ) ] Q^\pi(s,a) = \sum_{s'} P(s'|s,a) [R(s,a,s') + \gamma \sum_{a'} \pi(a'|s') Q^\pi(s',a')] Qπ(s,a)=s′∑P(s′∣s,a)[R(s,a,s′)+γa′∑π(a′∣s′)Qπ(s′,a′)]
3.5.3 价值函数之间的关系
V π ( s ) = ∑ a π ( a ∣ s ) Q π ( s , a ) V^\pi(s) = \sum_a \pi(a|s) Q^\pi(s,a) Vπ(s)=a∑π(a∣s)Qπ(s,a)
Q π ( s , a ) = ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ V π ( s ′ ) ] Q^\pi(s,a) = \sum_{s'} P(s'|s,a) [R(s,a,s') + \gamma V^\pi(s')] Qπ(s,a)=s′∑P(s′∣s,a)[R(s,a,s′)+γVπ(s′)]
3.6 最优价值函数
最优状态价值函数:
V ∗ ( s ) = max π V π ( s ) V^*(s) = \max_\pi V^\pi(s) V∗(s)=πmaxVπ(s)
最优动作价值函数:
Q ∗ ( s , a ) = max π Q π ( s , a ) Q^*(s,a) = \max_\pi Q^\pi(s,a) Q∗(s,a)=πmaxQπ(s,a)
贝尔曼最优方程:
V ∗ ( s ) = max a ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ V ∗ ( s ′ ) ] V^*(s) = \max_a \sum_{s'} P(s'|s,a) [R(s,a,s') + \gamma V^*(s')] V∗(s)=amaxs′∑P(s′∣s,a)[R(s,a,s′)+γV∗(s′)]
Q ∗ ( s , a ) = ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ max a ′ Q ∗ ( s ′ , a ′ ) ] Q^*(s,a) = \sum_{s'} P(s'|s,a) [R(s,a,s') + \gamma \max_{a'} Q^*(s',a')] Q∗(s,a)=s′∑P(s′∣s,a)[R(s,a,s′)+γa′maxQ∗(s′,a′)]
3.7 Python实现
import numpy as np
from collections import defaultdictclass MDPGridWorld(MarkovGridWorld):"""马尔科夫决策过程网格世界"""def __init__(self, success_prob=0.7, slip_prob=0.15, gamma=0.9):super().__init__(success_prob, slip_prob)self.gamma = gamma # 折扣因子# 奖励函数设计self.goal_reward = 100.0 # 到达目标的奖励self.step_reward = -1.0 # 每步的成本self.collision_reward = -10.0 # 撞墙的惩罚# 构建奖励矩阵 R[s][a][s'] = rewardself.rewards = self._build_reward_matrix()def _build_reward_matrix(self):"""构建奖励矩阵"""R = np.zeros((self.num_states, self.num_actions, self.num_states))goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])goal_state_idx = self.state_to_idx[goal_state_id]for state_idx in range(self.num_states):for action_id in range(self.num_actions):for next_state_idx in range(self.num_states):if self.transition_probs[state_idx][action_id][next_state_idx] > 0:# 有转移概率的状态对if next_state_idx == goal_state_idx:# 到达目标R[state_idx][action_id][next_state_idx] = self.goal_rewardelif next_state_idx == state_idx:# 原地不动(可能是撞墙)if action_id != 4: # 不是"不动"动作但是没有移动R[state_idx][action_id][next_state_idx] = self.collision_rewardelse:R[state_idx][action_id][next_state_idx] = self.step_rewardelse:# 正常移动R[state_idx][action_id][next_state_idx] = self.step_rewardreturn Rdef get_expected_reward(self, state_id, action_id):"""计算状态-动作对的期望奖励"""state_idx = self.state_to_idx[state_id]expected_reward = 0.0for next_state_idx in range(self.num_states):prob = self.transition_probs[state_idx][action_id][next_state_idx]reward = self.rewards[state_idx][action_id][next_state_idx]expected_reward += prob * rewardreturn expected_rewarddef evaluate_policy(self, policy, max_iterations=1000, tolerance=1e-6):"""策略评估:计算给定策略的价值函数"""V = np.zeros(self.num_states) # 初始化价值函数for iteration in range(max_iterations):V_new = np.zeros(self.num_states)for state_idx in range(self.num_states):state_id = self.idx_to_state[state_idx]# 贝尔曼方程更新for action_id in range(self.num_actions):action_prob = policy[state_id][action_id]# 计算该动作的期望价值action_value = 0.0for next_state_idx in range(self.num_states):transition_prob = self.transition_probs[state_idx][action_id][next_state_idx]reward = self.rewards[state_idx][action_id][next_state_idx]next_value = V[next_state_idx]action_value += transition_prob * (reward + self.gamma * next_value)V_new[state_idx] += action_prob * action_value# 检查收敛if np.max(np.abs(V_new - V)) < tolerance:print(f"策略评估在第 {iteration + 1} 次迭代后收敛")breakV = V_new.copy()return Vdef compute_q_values(self, V):"""根据状态价值函数计算动作价值函数"""Q = np.zeros((self.num_states, self.num_actions))for state_idx in range(self.num_states):for action_id in range(self.num_actions):# Q(s,a) = E[R + γV(s')]for next_state_idx in range(self.num_states):transition_prob = self.transition_probs[state_idx][action_id][next_state_idx]reward = self.rewards[state_idx][action_id][next_state_idx]next_value = V[next_state_idx]Q[state_idx][action_id] += transition_prob * (reward + self.gamma * next_value)return Qdef value_iteration(self, max_iterations=1000, tolerance=1e-6):"""价值迭代算法求解最优价值函数"""V = np.zeros(self.num_states)for iteration in range(max_iterations):V_new = np.zeros(self.num_states)for state_idx in range(self.num_states):# 贝尔曼最优方程:V*(s) = max_a Q*(s,a)action_values = []for action_id in range(self.num_actions):action_value = 0.0for next_state_idx in range(self.num_states):transition_prob = self.transition_probs[state_idx][action_id][next_state_idx]reward = self.rewards[state_idx][action_id][next_state_idx]next_value = V[next_state_idx]action_value += transition_prob * (reward + self.gamma * next_value)action_values.append(action_value)V_new[state_idx] = max(action_values)# 检查收敛if np.max(np.abs(V_new - V)) < tolerance:print(f"价值迭代在第 {iteration + 1} 次迭代后收敛")breakV = V_new.copy()return Vdef extract_optimal_policy(self, V):"""从最优价值函数提取最优策略"""policy = {}Q = self.compute_q_values(V)for state_idx in range(self.num_states):state_id = self.idx_to_state[state_idx]# 选择Q值最大的动作best_action = np.argmax(Q[state_idx])policy[state_id] = best_actionreturn policydef print_value_function(self, V):"""打印价值函数"""print("\n=== 状态价值函数 ===")print("状态ID | 位置 | 价值")print("-" * 25)for state_idx in range(self.num_states):state_id = self.idx_to_state[state_idx]pos = self.state_id_to_pos(state_id)value = V[state_idx]print(f"{state_id:6d} | {pos} | {value:8.2f}")def print_policy(self, policy):"""打印策略"""print("\n=== 策略 ===")print("状态ID | 位置 | 最优动作")print("-" * 30)for state_id in sorted(policy.keys()):pos = self.state_id_to_pos(state_id)action_id = policy[state_id]action_name = self.actions[action_id]print(f"{state_id:6d} | {pos} | {action_name}")def visualize_policy(self, policy):"""可视化策略"""# 创建策略网格policy_grid = np.full((self.grid_size, self.grid_size), '', dtype=object)# 动作符号action_symbols = {0: '↑', # 上1: '↓', # 下2: '←', # 左3: '→', # 右4: '●' # 不动}for state_id, action_id in policy.items():x, y = self.state_id_to_pos(state_id)policy_grid[x-1][y-1] = action_symbols[action_id]# 标记障碍物和目标for obs_x, obs_y in self.obstacles:policy_grid[obs_x-1][obs_y-1] = '🚫'goal_x, goal_y = self.goal_pospolicy_grid[goal_x-1][goal_y-1] = '💎'print("\n=== 策略可视化 ===")print("符号含义: ↑上 ↓下 ←左 →右 ●不动 🚫障碍 💎目标")print("-" * 20)for i in range(self.grid_size):row = " | ".join(f"{policy_grid[i][j]:^3}" for j in range(self.grid_size))print(f"| {row} |")if i < self.grid_size - 1:print("-" * 20)# 创建MDP环境并测试
mdp_env = MDPGridWorld(gamma=0.9)print("=== MDP网格世界设置 ===")
print(f"折扣因子: {mdp_env.gamma}")
print(f"目标奖励: {mdp_env.goal_reward}")
print(f"步数惩罚: {mdp_env.step_reward}")
print(f"碰撞惩罚: {mdp_env.collision_reward}")# 测试期望奖励计算
print("\n=== 期望奖励示例 ===")
test_state = 0 # 起点
for action_id in range(mdp_env.num_actions):expected_reward = mdp_env.get_expected_reward(test_state, action_id)action_name = mdp_env.actions[action_id]print(f"状态 {test_state} 执行 '{action_name}': 期望奖励 = {expected_reward:.2f}")# 使用价值迭代求解最优策略
print("\n=== 价值迭代求解 ===")
optimal_V = mdp_env.value_iteration()
optimal_policy = mdp_env.extract_optimal_policy(optimal_V)# 显示结果
mdp_env.print_value_function(optimal_V)
mdp_env.print_policy(optimal_policy)
mdp_env.visualize_policy(optimal_policy)# 验证贝尔曼方程
print("\n=== 贝尔曼方程验证 ===")
print("检查最优价值函数是否满足贝尔曼最优方程...")
max_error = 0.0for state_idx in range(mdp_env.num_states):state_id = mdp_env.idx_to_state[state_idx]current_value = optimal_V[state_idx]# 计算max_a Q(s,a)max_q_value = float('-inf')for action_id in range(mdp_env.num_actions):q_value = 0.0for next_state_idx in range(mdp_env.num_states):prob = mdp_env.transition_probs[state_idx][action_id][next_state_idx]reward = mdp_env.rewards[state_idx][action_id][next_state_idx]next_value = optimal_V[next_state_idx]q_value += prob * (reward + mdp_env.gamma * next_value)max_q_value = max(max_q_value, q_value)error = abs(current_value - max_q_value)max_error = max(max_error, error)print(f"最大贝尔曼方程误差: {max_error:.8f}")
if max_error < 1e-6:print("✓ 贝尔曼最优方程得到满足!")
else:print("✗ 贝尔曼方程存在误差")
3.8 贝尔曼方程的意义
3.8.1 递归性质
贝尔曼方程揭示了价值函数的递归结构:
- 当前状态的价值 = 即时奖励 + 折扣后续状态的价值
- 这让我们可以通过动态规划求解
3.8.2 最优性条件
贝尔曼最优方程给出了最优策略的必要充分条件:
- 如果策略满足贝尔曼最优方程,则它是最优的
- 反之亦然
3.8.3 计算框架
贝尔曼方程为强化学习提供了统一的计算框架:
- 价值迭代:直接迭代贝尔曼最优方程
- 策略迭代:交替进行策略评估和策略改进
- Q-learning:学习动作价值函数的贝尔曼方程
小结
贝尔曼方程是强化学习的数学基础,它:
- 建立了递归关系:将复杂的长期价值分解为当前奖励和未来价值
- 提供了最优性条件:定义了什么是最优策略
- 指导算法设计:为各种强化学习算法提供了理论基础
- 保证了解的存在性:在一定条件下,贝尔曼方程有唯一解
下一章我们将学习如何使用表格方法来实际求解贝尔曼方程,这是强化学习算法的第一步。
第四章:表格方法 - 小智的记忆表格
4.1 为什么叫"表格"方法?
在前面的章节中,我们假设知道环境的转移概率和奖励函数。但现实中,小智需要通过试错来学习!
表格方法的核心思想:为每个状态(或状态-动作对)维护一个表格,记录学到的价值。
就像小智有一个笔记本,记录着:
- 状态1的价值是多少?
- 在状态2执行动作"向右"的Q值是多少?
4.2 动态规划方法回顾
在开始学习算法之前,让我们回顾一下已知环境模型时的动态规划方法:
4.2.1 策略迭代(Policy Iteration)
算法步骤:
- 策略评估:计算当前策略的价值函数
- 策略改进:根据价值函数改进策略
- 重复直到策略收敛
策略评估:
V k + 1 ( s ) = ∑ a π ( a ∣ s ) ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ V k ( s ′ ) ] V^{k+1}(s) = \sum_a \pi(a|s) \sum_{s'} P(s'|s,a)[R(s,a,s') + \gamma V^k(s')] Vk+1(s)=a∑π(a∣s)s′∑P(s′∣s,a)[R(s,a,s′)+γVk(s′)]
策略改进:
π ′ ( s ) = arg max a ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ V ( s ′ ) ] \pi'(s) = \arg\max_a \sum_{s'} P(s'|s,a)[R(s,a,s') + \gamma V(s')] π′(s)=argamaxs′∑P(s′∣s,a)[R(s,a,s′)+γV(s′)]
4.2.2 价值迭代(Value Iteration)
直接迭代贝尔曼最优方程:
V k + 1 ( s ) = max a ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ V k ( s ′ ) ] V^{k+1}(s) = \max_a \sum_{s'} P(s'|s,a)[R(s,a,s') + \gamma V^k(s')] Vk+1(s)=amaxs′∑P(s′∣s,a)[R(s,a,s′)+γVk(s′)]
4.3 蒙特卡洛方法
当我们不知道环境模型时,怎么办?蒙特卡洛方法:通过采样完整的回合来估计价值函数。
4.3.1 基本思想
如果小智玩了很多回合的游戏,我们可以:
- 记录每个回合中每个状态获得的总回报
- 对这些回报取平均,得到状态价值的估计
4.3.2 首次访问蒙特卡洛
算法:
- 对于回合中首次访问的每个状态 s s s
- 计算从该时刻开始的回报 G t = R t + 1 + γ R t + 2 + γ 2 R t + 3 + . . . G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + ... Gt=Rt+1+γRt+2+γ2Rt+3+...
- 更新价值估计: V ( s ) ← 平均 ( V ( s ) , G t ) V(s) \leftarrow \text{平均}(V(s), G_t) V(s)←平均(V(s),Gt)
4.3.3 数学推导
回报定义:
G t = R t + 1 + γ R t + 2 + γ 2 R t + 3 + . . . = ∑ k = 0 ∞ γ k R t + k + 1 G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + ... = \sum_{k=0}^\infty \gamma^k R_{t+k+1} Gt=Rt+1+γRt+2+γ2Rt+3+...=k=0∑∞γkRt+k+1
价值函数更新:
V ( s ) = 1 N ( s ) ∑ i = 1 N ( s ) G i ( s ) V(s) = \frac{1}{N(s)} \sum_{i=1}^{N(s)} G_i(s) V(s)=N(s)1i=1∑N(s)Gi(s)
其中 N ( s ) N(s) N(s)是访问状态 s s s的次数, G i ( s ) G_i(s) Gi(s)是第 i i i次访问时的回报。
4.4 时间差分学习(TD Learning)
蒙特卡洛方法需要等到回合结束,能否边玩边学?
4.4.1 TD(0)算法
核心思想:用即时奖励加上下一状态的价值估计来更新当前状态的价值。
更新规则:
V ( S t ) ← V ( S t ) + α [ R t + 1 + γ V ( S t + 1 ) − V ( S t ) ] V(S_t) \leftarrow V(S_t) + \alpha [R_{t+1} + \gamma V(S_{t+1}) - V(S_t)] V(St)←V(St)+α[Rt+1+γV(St+1)−V(St)]
其中:
- α \alpha α:学习率
- R t + 1 + γ V ( S t + 1 ) − V ( S t ) R_{t+1} + \gamma V(S_{t+1}) - V(S_t) Rt+1+γV(St+1)−V(St):TD误差
4.4.2 TD误差的含义
TD误差:
δ t = R t + 1 + γ V ( S t + 1 ) − V ( S t ) \delta_t = R_{t+1} + \gamma V(S_{t+1}) - V(S_t) δt=Rt+1+γV(St+1)−V(St)
- 如果 δ t > 0 \delta_t > 0 δt>0:实际获得的回报比预期高,增加 V ( S t ) V(S_t) V(St)
- 如果 δ t < 0 \delta_t < 0 δt<0:实际获得的回报比预期低,减少 V ( S t ) V(S_t) V(St)
4.4.3 TD vs MC
方面 | 蒙特卡洛 | 时间差分 |
---|---|---|
学习时机 | 回合结束后 | 每步都可以 |
方差 | 高 | 低 |
偏差 | 无 | 有(初期) |
收敛性 | 保证 | 通常收敛 |
4.5 Python实现
import numpy as np
import random
from collections import defaultdict
import matplotlib.pyplot as pltclass TabularMethodsGridWorld(MDPGridWorld):"""表格方法的网格世界实现"""def __init__(self, success_prob=0.7, slip_prob=0.15, gamma=0.9):super().__init__(success_prob, slip_prob, gamma)# 初始化价值表格self.V_table = np.zeros(self.num_states) # 状态价值表self.Q_table = np.zeros((self.num_states, self.num_actions)) # Q值表self.visit_count = np.zeros(self.num_states) # 访问计数self.returns_sum = np.zeros(self.num_states) # 回报累计def reset_tables(self):"""重置所有表格"""self.V_table.fill(0)self.Q_table.fill(0)self.visit_count.fill(0)self.returns_sum.fill(0)def generate_episode(self, policy, max_steps=1000):"""生成一个回合的经验"""episode = []current_state_id = self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])for step in range(max_steps):current_state_idx = self.state_to_idx[current_state_id]# 根据策略选择动作if isinstance(policy, dict):# 确定性策略if current_state_id in policy:action_id = policy[current_state_id]else:action_id = random.randint(0, self.num_actions - 1)else:# 随机策略或策略函数action_id = policy(current_state_id)# 执行动作,获得奖励和下一状态next_state_id = self.simulate_transition(current_state_id, action_id)next_state_idx = self.state_to_idx[next_state_id]# 计算奖励reward = self.rewards[current_state_idx][action_id][next_state_idx]# 记录经验episode.append((current_state_id, action_id, reward))# 检查是否到达目标if next_state_id == goal_state_id:breakcurrent_state_id = next_state_idreturn episodedef monte_carlo_policy_evaluation(self, policy, num_episodes=10000, first_visit=True):"""蒙特卡洛策略评估"""print(f"开始蒙特卡洛策略评估,回合数: {num_episodes}")self.reset_tables()for episode_num in range(num_episodes):# 生成回合episode = self.generate_episode(policy)# 计算每个时刻的回报G = 0visited_states = set() if first_visit else None# 从后往前计算回报for t in reversed(range(len(episode))):state_id, action_id, reward = episode[t]G = reward + self.gamma * Gif first_visit:if state_id not in visited_states:visited_states.add(state_id)# 更新价值估计state_idx = self.state_to_idx[state_id]self.visit_count[state_idx] += 1self.returns_sum[state_idx] += Gself.V_table[state_idx] = self.returns_sum[state_idx] / self.visit_count[state_idx]else:# 每次访问都更新state_idx = self.state_to_idx[state_id]self.visit_count[state_idx] += 1self.returns_sum[state_idx] += Gself.V_table[state_idx] = self.returns_sum[state_idx] / self.visit_count[state_idx]if (episode_num + 1) % 1000 == 0:print(f" 完成 {episode_num + 1} 个回合")return self.V_table.copy()def td_policy_evaluation(self, policy, num_episodes=1000, alpha=0.1):"""TD(0)策略评估"""print(f"开始TD(0)策略评估,回合数: {num_episodes}, 学习率: {alpha}")self.reset_tables()td_errors = []for episode_num in range(num_episodes):current_state_id = self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])episode_td_errors = []max_steps = 1000for step in range(max_steps):current_state_idx = self.state_to_idx[current_state_id]# 选择动作if isinstance(policy, dict):if current_state_id in policy:action_id = policy[current_state_id]else:action_id = random.randint(0, self.num_actions - 1)else:action_id = policy(current_state_id)# 执行动作next_state_id = self.simulate_transition(current_state_id, action_id)next_state_idx = self.state_to_idx[next_state_id]# 获得奖励reward = self.rewards[current_state_idx][action_id][next_state_idx]# TD更新current_value = self.V_table[current_state_idx]next_value = self.V_table[next_state_idx]td_error = reward + self.gamma * next_value - current_valueself.V_table[current_state_idx] += alpha * td_errorepisode_td_errors.append(abs(td_error))# 检查终止条件if next_state_id == goal_state_id:breakcurrent_state_id = next_state_idtd_errors.append(np.mean(episode_td_errors) if episode_td_errors else 0)if (episode_num + 1) % 100 == 0:print(f" 完成 {episode_num + 1} 个回合, 平均TD误差: {td_errors[-1]:.4f}")return self.V_table.copy(), td_errorsdef compare_methods(self, true_policy=None):"""比较不同方法的性能"""if true_policy is None:# 使用之前计算的最优策略optimal_V = self.value_iteration()true_policy = self.extract_optimal_policy(optimal_V)true_V = optimal_Velse:true_V = self.evaluate_policy_exactly(true_policy)print("=== 方法比较 ===")# 蒙特卡洛方法print("\n1. 蒙特卡洛方法")mc_V = self.monte_carlo_policy_evaluation(true_policy, num_episodes=5000)mc_rmse = np.sqrt(np.mean((mc_V - true_V) ** 2))print(f" RMSE: {mc_rmse:.4f}")# TD方法print("\n2. TD(0)方法")td_V, td_errors = self.td_policy_evaluation(true_policy, num_episodes=1000, alpha=0.1)td_rmse = np.sqrt(np.mean((td_V - true_V) ** 2))print(f" RMSE: {td_rmse:.4f}")# 比较结果print("\n=== 比较结果 ===")print(f"真实策略价值 vs 蒙特卡洛估计 vs TD估计")print("-" * 50)print("状态ID | 真实值 | MC估计 | TD估计 | MC误差 | TD误差")print("-" * 60)for state_idx in range(self.num_states):state_id = self.idx_to_state[state_idx]true_val = true_V[state_idx]mc_val = mc_V[state_idx]td_val = td_V[state_idx]mc_error = abs(true_val - mc_val)td_error = abs(true_val - td_val)print(f"{state_id:6d} | {true_val:7.3f} | {mc_val:7.3f} | {td_val:7.3f} | "f"{mc_error:7.3f} | {td_error:7.3f}")return {'true_V': true_V,'mc_V': mc_V,'td_V': td_V,'mc_rmse': mc_rmse,'td_rmse': td_rmse,'td_errors': td_errors}def evaluate_policy_exactly(self, policy):"""使用已知模型精确评估策略"""# 构建策略概率矩阵policy_probs = np.zeros((self.num_states, self.num_actions))for state_id, action_id in policy.items():state_idx = self.state_to_idx[state_id]policy_probs[state_idx][action_id] = 1.0# 构建策略对应的转移矩阵和奖励向量P_pi = np.zeros((self.num_states, self.num_states))R_pi = np.zeros(self.num_states)for state_idx in range(self.num_states):for action_id in range(self.num_actions):action_prob = policy_probs[state_idx][action_id]if action_prob > 0:for next_state_idx in range(self.num_states):transition_prob = self.transition_probs[state_idx][action_id][next_state_idx]reward = self.rewards[state_idx][action_id][next_state_idx]P_pi[state_idx][next_state_idx] += action_prob * transition_probR_pi[state_idx] += action_prob * transition_prob * reward# 求解线性方程组: V = R + γPV => (I - γP)V = RI = np.eye(self.num_states)A = I - self.gamma * P_piV = np.linalg.solve(A, R_pi)return V# 创建环境并测试表格方法
print("=== 表格方法测试 ===")
tabular_env = TabularMethodsGridWorld()# 首先求解最优策略作为测试基准
optimal_V = tabular_env.value_iteration()
optimal_policy = tabular_env.extract_optimal_policy(optimal_V)print("最优策略:")
tabular_env.print_policy(optimal_policy)# 比较不同方法
results = tabular_env.compare_methods(optimal_policy)# 可视化学到的策略
print("\n=== 学到的策略可视化 ===")
tabular_env.visualize_policy(optimal_policy)# 绘制学习曲线(取消注释以显示)
# learning_fig = tabular_env.plot_learning_progress()
# plt.show()
4.6 表格方法的收敛性分析
4.6.1 蒙特卡洛方法的收敛性
大数定律保证:
lim n → ∞ 1 n ∑ i = 1 n G i ( s ) = E [ G t ∣ S t = s ] = V π ( s ) \lim_{n \to \infty} \frac{1}{n} \sum_{i=1}^n G_i(s) = \mathbb{E}[G_t | S_t = s] = V^\pi(s) n→∞limn1i=1∑nGi(s)=E[Gt∣St=s]=Vπ(s)
收敛条件:
- 每个状态被访问无穷多次
- 策略保持固定
4.6.2 TD方法的收敛性
定理:在固定策略下,如果:
- 学习率满足: ∑ t = 1 ∞ α t = ∞ \sum_{t=1}^\infty \alpha_t = \infty ∑t=1∞αt=∞ 且 ∑ t = 1 ∞ α t 2 < ∞ \sum_{t=1}^\infty \alpha_t^2 < \infty ∑t=1∞αt2<∞
- 每个状态被访问无穷多次
则TD(0)算法收敛到真实价值函数 V π V^\pi Vπ。
常用学习率:
- 常数学习率: α = 0.1 \alpha = 0.1 α=0.1(实用但不保证收敛)
- 递减学习率: α t = 1 t \alpha_t = \frac{1}{t} αt=t1(理论保证但收敛慢)
4.7 表格方法的优缺点
4.7.1 优点
- 简单直观:容易理解和实现
- 收敛保证:在理论条件下保证收敛
- 无近似误差:表格足够大时可以精确表示价值函数
- 调试方便:可以直观检查每个状态的价值
4.7.2 缺点
- 状态空间爆炸:状态数量指数增长
- 泛化能力差:相似状态的价值不能共享
- 样本效率低:需要大量样本才能收敛
- 内存需求大:需要存储所有状态的价值
4.8 改进策略
4.8.1 函数近似
用神经网络等函数近似器替代表格:
V ( s ) ≈ V ( s ; θ ) V(s) \approx V(s; \theta) V(s)≈V(s;θ)
4.8.2 特征工程
设计状态特征来减少维度:
V ( s ) = θ T ϕ ( s ) V(s) = \theta^T \phi(s) V(s)=θTϕ(s)
其中 ϕ ( s ) \phi(s) ϕ(s)是状态 s s s的特征向量。
小结
表格方法是强化学习的基础,它们:
- 奠定了理论基础:为更高级的方法提供了收敛性理论
- 提供了直观理解:帮助理解价值函数和策略的概念
- 适用于小规模问题:在状态空间较小时仍然有用
- 启发了改进方向:函数近似、经验回放等技术的起点
下一章我们将学习Q-learning,这是一种不需要知道策略就能学习最优价值函数的方法!
第五章:Q-learning - 小智的独立探索
5.1 Q-learning的革命性思想
到目前为止,我们一直在评估给定策略的价值。但是,如果我们不知道好的策略应该是什么样的呢?
Q-learning的核心思想:直接学习最优动作价值函数 Q ∗ ( s , a ) Q^*(s,a) Q∗(s,a),而不需要知道策略!
这就像小智不需要老师告诉它该怎么走,而是自己探索并记住"在每个位置,每个动作的真正价值是多少"。
5.2 从TD学习到Q-learning
5.2.1 TD学习的局限
回忆TD学习的更新公式:
V ( S t ) ← V ( S t ) + α [ R t + 1 + γ V ( S t + 1 ) − V ( S t ) ] V(S_t) \leftarrow V(S_t) + \alpha [R_{t+1} + \gamma V(S_{t+1}) - V(S_t)] V(St)←V(St)+α[Rt+1+γV(St+1)−V(St)]
但这只能评估给定策略,不能告诉我们最优策略是什么。
5.2.2 Q-learning的灵感
如果我们学习 Q ( s , a ) Q(s,a) Q(s,a)而不是 V ( s ) V(s) V(s),就可以:
- 知道每个动作的价值
- 选择价值最高的动作: π ∗ ( s ) = arg max a Q ∗ ( s , a ) \pi^*(s) = \arg\max_a Q^*(s,a) π∗(s)=argmaxaQ∗(s,a)
5.3 Q-learning算法推导
5.3.1 目标:学习最优Q函数
我们想要学习 Q ∗ ( s , a ) Q^*(s,a) Q∗(s,a),它满足贝尔曼最优方程:
Q ∗ ( s , a ) = ∑ s ′ P ( s ′ ∣ s , a ) [ R ( s , a , s ′ ) + γ max a ′ Q ∗ ( s ′ , a ′ ) ] Q^*(s,a) = \sum_{s'} P(s'|s,a)[R(s,a,s') + \gamma \max_{a'} Q^*(s',a')] Q∗(s,a)=s′∑P(s′∣s,a)[R(s,a,s′)+γa′maxQ∗(s′,a′)]
5.3.2 Q-learning更新规则
核心思想:用当前的Q估计来更新Q值。
当智能体在状态 s s s执行动作 a a a,获得奖励 r r r,转移到状态 s ′ s' s′时:
Q ( s , a ) ← Q ( s , a ) + α [ r + γ max a ′ Q ( s ′ , a ′ ) − Q ( s , a ) ] Q(s,a) \leftarrow Q(s,a) + \alpha [r + \gamma \max_{a'} Q(s',a') - Q(s,a)] Q(s,a)←Q(s,a)+α[r+γa′maxQ(s′,a′)−Q(s,a)]
5.3.3 关键特性:off-policy学习
重要发现:更新时使用的是 max a ′ Q ( s ′ , a ′ ) \max_{a'} Q(s',a') maxa′Q(s′,a′),而不是当前策略选择的动作!
这意味着:
- 行为策略:实际选择动作的策略(比如 ϵ \epsilon ϵ-贪心)
- 目标策略:学习的策略(最优策略)
- 可以边探索边学习最优策略
5.4 探索与利用的平衡
5.4.1 探索的必要性
如果小智总是选择当前Q值最高的动作,可能会:
- 错过更好的动作
- 陷入局部最优
5.4.2 ε-贪心策略
定义:
π ( a ∣ s ) = { 1 − ϵ + ϵ ∣ A ∣ 如果 a = arg max a ′ Q ( s , a ′ ) ϵ ∣ A ∣ 其他动作 \pi(a|s) = \begin{cases} 1-\epsilon + \frac{\epsilon}{|\mathcal{A}|} & \text{如果 } a = \arg\max_{a'} Q(s,a') \\ \frac{\epsilon}{|\mathcal{A}|} & \text{其他动作} \end{cases} π(a∣s)={1−ϵ+∣A∣ϵ∣A∣ϵ如果 a=argmaxa′Q(s,a′)其他动作
简化版本:
- 以概率 1 − ϵ 1-\epsilon 1−ϵ选择最佳动作(利用)
- 以概率 ϵ \epsilon ϵ随机选择动作(探索)
5.4.3 探索策略的演化
递减探索:
ϵ t = max ( ϵ min , ϵ 0 ⋅ decay t ) \epsilon_t = \max(\epsilon_{\min}, \epsilon_0 \cdot \text{decay}^t) ϵt=max(ϵmin,ϵ0⋅decayt)
开始时多探索,后来多利用。
5.5 Q-learning的收敛性
5.5.1 收敛定理
Watkins定理:如果满足以下条件,Q-learning收敛到 Q ∗ Q^* Q∗:
- 访问条件:每个状态-动作对被访问无穷多次
- 学习率条件: ∑ t = 1 ∞ α t = ∞ \sum_{t=1}^\infty \alpha_t = \infty ∑t=1∞αt=∞ 且 ∑ t = 1 ∞ α t 2 < ∞ \sum_{t=1}^\infty \alpha_t^2 < \infty ∑t=1∞αt2<∞
- 有界奖励:奖励函数有界
5.5.2 实用收敛条件
在实际应用中:
- 使用递减的 ϵ \epsilon ϵ保证充分探索
- 使用合适的学习率(如0.1)
- 运行足够多的回合
5.6 Python实现
import numpy as np
import random
import matplotlib.pyplot as plt
from collections import defaultdictclass QLearningGridWorld(TabularMethodsGridWorld):"""Q-learning算法实现"""def __init__(self, success_prob=0.7, slip_prob=0.15, gamma=0.9):super().__init__(success_prob, slip_prob, gamma)# Q学习专用参数self.epsilon = 0.1 # 探索率self.epsilon_min = 0.01 # 最小探索率self.epsilon_decay = 0.995 # 探索率衰减self.alpha = 0.1 # 学习率# 统计信息self.episode_rewards = []self.episode_lengths = []self.q_errors = []def epsilon_greedy_action(self, state_id, epsilon=None):"""ε-贪心动作选择"""if epsilon is None:epsilon = self.epsilonstate_idx = self.state_to_idx[state_id]if random.random() < epsilon:# 探索:随机选择动作return random.randint(0, self.num_actions - 1)else:# 利用:选择Q值最大的动作return np.argmax(self.Q_table[state_idx])def q_learning_step(self, state_id, action_id, reward, next_state_id, done=False):"""单步Q-learning更新"""state_idx = self.state_to_idx[state_id]next_state_idx = self.state_to_idx[next_state_id]# 当前Q值current_q = self.Q_table[state_idx][action_id]# 计算目标Q值if done:target_q = reward # 终止状态,没有未来奖励else:next_max_q = np.max(self.Q_table[next_state_idx])target_q = reward + self.gamma * next_max_q# Q-learning更新td_error = target_q - current_qself.Q_table[state_idx][action_id] += self.alpha * td_errorreturn abs(td_error)def q_learning_episode(self, max_steps=1000):"""运行一个Q-learning回合"""current_state_id = self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])episode_reward = 0episode_errors = []for step in range(max_steps):# 选择动作action_id = self.epsilon_greedy_action(current_state_id)# 执行动作next_state_id = self.simulate_transition(current_state_id, action_id)# 计算奖励current_state_idx = self.state_to_idx[current_state_id]next_state_idx = self.state_to_idx[next_state_id]reward = self.rewards[current_state_idx][action_id][next_state_idx]episode_reward += reward# 检查是否完成done = (next_state_id == goal_state_id)# Q-learning更新td_error = self.q_learning_step(current_state_id, action_id, reward, next_state_id, done)episode_errors.append(td_error)episode_reward += rewardif done:breakcurrent_state_id = next_state_idreturn episode_reward, step + 1, episode_errorsdef train_q_learning(self, num_episodes=5000, verbose=True):"""Q-learning训练"""print(f"开始Q-learning训练,回合数: {num_episodes}")print(f"初始参数 - 学习率: {self.alpha}, 探索率: {self.epsilon}")self.reset_tables()self.episode_rewards = []self.episode_lengths = []self.q_errors = []for episode in range(num_episodes):# 运行一个回合episode_reward, episode_length, episode_errors = self.q_learning_episode()# 记录统计信息self.episode_rewards.append(episode_reward)self.episode_lengths.append(episode_length)self.q_errors.append(np.mean(episode_errors) if episode_errors else 0)# 更新探索率self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)# 打印进度if verbose and (episode + 1) % 500 == 0:avg_reward = np.mean(self.episode_rewards[-100:])avg_length = np.mean(self.episode_lengths[-100:])print(f"回合 {episode + 1}: 平均奖励={avg_reward:.2f}, "f"平均长度={avg_length:.1f}, 探索率={self.epsilon:.3f}")print("Q-learning训练完成!")return self.Q_table.copy()def extract_policy_from_q(self):"""从Q表提取贪心策略"""policy = {}for state_idx in range(self.num_states):state_id = self.idx_to_state[state_idx]best_action = np.argmax(self.Q_table[state_idx])policy[state_id] = best_actionreturn policydef evaluate_learned_policy(self, num_test_episodes=1000):"""评估学到的策略"""print("评估学到的策略...")policy = self.extract_policy_from_q()rewards = []lengths = []# 禁用探索进行测试old_epsilon = self.epsilonself.epsilon = 0.0for _ in range(num_test_episodes):current_state_id = self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])episode_reward = 0steps = 0max_steps = 1000for step in range(max_steps):action_id = policy[current_state_id]next_state_id = self.simulate_transition(current_state_id, action_id)current_state_idx = self.state_to_idx[current_state_id]next_state_idx = self.state_to_idx[next_state_id]reward = self.rewards[current_state_idx][action_id][next_state_idx]episode_reward += rewardsteps += 1if next_state_id == goal_state_id:breakcurrent_state_id = next_state_idrewards.append(episode_reward)lengths.append(steps)# 恢复探索率self.epsilon = old_epsilonavg_reward = np.mean(rewards)avg_length = np.mean(lengths)success_rate = sum(1 for r in rewards if r > 50) / len(rewards) # 假设奖励>50表示成功print(f"策略评估结果:")print(f" 平均奖励: {avg_reward:.2f}")print(f" 平均步数: {avg_length:.1f}")print(f" 成功率: {success_rate:.2%}")return {'avg_reward': avg_reward,'avg_length': avg_length,'success_rate': success_rate,'policy': policy}def compare_with_optimal(self):"""与最优策略比较"""print("\n=== Q-learning vs 最优策略比较 ===")# 计算最优策略optimal_V = self.value_iteration()optimal_policy = self.extract_optimal_policy(optimal_V)optimal_Q = self.compute_q_values(optimal_V)# Q-learning学到的策略learned_policy = self.extract_policy_from_q()# 比较Q值q_errors = []policy_match = 0print("状态ID | 最优动作 | 学习动作 | Q值误差(最大)")print("-" * 45)for state_idx in range(self.num_states):state_id = self.idx_to_state[state_idx]optimal_action = optimal_policy[state_id]learned_action = learned_policy[state_id]# 计算Q值误差q_error = np.max(np.abs(self.Q_table[state_idx] - optimal_Q[state_idx]))q_errors.append(q_error)# 检查策略是否匹配if optimal_action == learned_action:policy_match += 1match_symbol = "✓"else:match_symbol = "✗"print(f"{state_id:6d} | {self.actions[optimal_action]:6s} | "f"{self.actions[learned_action]:6s} | {q_error:8.3f} {match_symbol}")avg_q_error = np.mean(q_errors)policy_accuracy = policy_match / self.num_statesprint(f"\n总结:")print(f" 平均Q值误差: {avg_q_error:.4f}")print(f" 策略准确率: {policy_accuracy:.2%} ({policy_match}/{self.num_states})")return {'avg_q_error': avg_q_error,'policy_accuracy': policy_accuracy,'learned_policy': learned_policy,'optimal_policy': optimal_policy}def plot_learning_progress(self):"""绘制学习进度"""fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))# 回合奖励ax1.plot(self.episode_rewards)ax1.set_title('每回合奖励')ax1.set_xlabel('回合')ax1.set_ylabel('奖励')ax1.grid(True)# 移动平均奖励window = 100if len(self.episode_rewards) >= window:moving_avg = np.convolve(self.episode_rewards, np.ones(window)/window, mode='valid')ax2.plot(moving_avg)ax2.set_title(f'{window}回合移动平均奖励')ax2.set_xlabel('回合')ax2.set_ylabel('平均奖励')ax2.grid(True)# 回合长度ax3.plot(self.episode_lengths)ax3.set_title('每回合步数')ax3.set_xlabel('回合')ax3.set_ylabel('步数')ax3.grid(True)# Q误差ax4.plot(self.q_errors)ax4.set_title('TD误差')ax4.set_xlabel('回合')ax4.set_ylabel('平均TD误差')ax4.grid(True)plt.tight_layout()return fig# 创建Q-learning环境并训练
print("=== Q-learning实验 ===")
q_env = QLearningGridWorld()# 训练Q-learning
learned_Q = q_env.train_q_learning(num_episodes=3000)# 评估学到的策略
evaluation_result = q_env.evaluate_learned_policy()# 与最优策略比较
comparison_result = q_env.compare_with_optimal()# 可视化学到的策略
print("\n=== 学到的策略可视化 ===")
q_env.visualize_policy(evaluation_result['policy'])# 绘制学习曲线(取消注释以显示)
# learning_fig = q_env.plot_learning_progress()
# plt.show()
5.7 Q-learning的变种
5.7.1 Double Q-learning
问题:Q-learning会高估动作价值(最大化偏差)。
解决方案:使用两个Q表 Q 1 Q_1 Q1和 Q 2 Q_2 Q2:
- 随机选择一个表进行更新
- 使用一个表选择动作,另一个表评估价值
更新规则:
Q 1 ( s , a ) ← Q 1 ( s , a ) + α [ r + γ Q 2 ( s ′ , arg max a ′ Q 1 ( s ′ , a ′ ) ) − Q 1 ( s , a ) ] Q_1(s,a) \leftarrow Q_1(s,a) + \alpha [r + \gamma Q_2(s', \arg\max_{a'} Q_1(s',a')) - Q_1(s,a)] Q1(s,a)←Q1(s,a)+α[r+γQ2(s′,arga′maxQ1(s′,a′))−Q1(s,a)]
5.7.2 SARSA (State-Action-Reward-State-Action)
区别:SARSA是on-policy的,使用实际选择的下一个动作:
Q ( s , a ) ← Q ( s , a ) + α [ r + γ Q ( s ′ , a ′ ) − Q ( s , a ) ] Q(s,a) \leftarrow Q(s,a) + \alpha [r + \gamma Q(s',a') - Q(s,a)] Q(s,a)←Q(s,a)+α[r+γQ(s′,a′)−Q(s,a)]
其中 a ′ a' a′是在状态 s ′ s' s′实际选择的动作。
5.8 Q-learning vs SARSA实现
class SARSAGridWorld(QLearningGridWorld):"""SARSA算法实现"""def sarsa_step(self, state_id, action_id, reward, next_state_id, next_action_id, done=False):"""单步SARSA更新"""state_idx = self.state_to_idx[state_id]next_state_idx = self.state_to_idx[next_state_id]current_q = self.Q_table[state_idx][action_id]if done:target_q = rewardelse:# 使用实际选择的下一个动作(on-policy)target_q = reward + self.gamma * self.Q_table[next_state_idx][next_action_id]td_error = target_q - current_qself.Q_table[state_idx][action_id] += self.alpha * td_errorreturn abs(td_error)def sarsa_episode(self, max_steps=1000):"""运行一个SARSA回合"""current_state_id = self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])# 选择初始动作current_action_id = self.epsilon_greedy_action(current_state_id)episode_reward = 0episode_errors = []for step in range(max_steps):# 执行当前动作next_state_id = self.simulate_transition(current_state_id, current_action_id)# 计算奖励current_state_idx = self.state_to_idx[current_state_id]next_state_idx = self.state_to_idx[next_state_id]reward = self.rewards[current_state_idx][current_action_id][next_state_idx]episode_reward += reward# 检查是否完成done = (next_state_id == goal_state_id)if done:# 终止状态,直接更新td_error = self.sarsa_step(current_state_id, current_action_id, reward, next_state_id, 0, done=True)episode_errors.append(td_error)breakelse:# 选择下一个动作next_action_id = self.epsilon_greedy_action(next_state_id)# SARSA更新td_error = self.sarsa_step(current_state_id, current_action_id, reward, next_state_id, next_action_id, done=False)episode_errors.append(td_error)# 移动到下一个状态-动作对current_state_id = next_state_idcurrent_action_id = next_action_idreturn episode_reward, step + 1, episode_errorsdef compare_q_learning_sarsa():"""比较Q-learning和SARSA"""print("=== Q-learning vs SARSA 比较 ===")# Q-learningprint("\n训练Q-learning...")q_agent = QLearningGridWorld()q_agent.train_q_learning(num_episodes=2000, verbose=False)q_evaluation = q_agent.evaluate_learned_policy(num_test_episodes=100)# SARSAprint("训练SARSA...")sarsa_agent = SARSAGridWorld()sarsa_agent.reset_tables()# 为SARSA实现训练循环for episode in range(2000):episode_reward, episode_length, episode_errors = sarsa_agent.sarsa_episode()sarsa_agent.episode_rewards.append(episode_reward)sarsa_agent.episode_lengths.append(episode_length)sarsa_agent.epsilon = max(sarsa_agent.epsilon_min, sarsa_agent.epsilon * sarsa_agent.epsilon_decay)sarsa_evaluation = sarsa_agent.evaluate_learned_policy(num_test_episodes=100)# 比较结果print("\n=== 比较结果 ===")print(f"{'方法':<12} | {'平均奖励':<10} | {'平均步数':<10} | {'成功率':<10}")print("-" * 50)print(f"{'Q-learning':<12} | {q_evaluation['avg_reward']:<10.2f} | "f"{q_evaluation['avg_length']:<10.1f} | {q_evaluation['success_rate']:<10.2%}")print(f"{'SARSA':<12} | {sarsa_evaluation['avg_reward']:<10.2f} | "f"{sarsa_evaluation['avg_length']:<10.1f} | {sarsa_evaluation['success_rate']:<10.2%}")# 运行比较
# compare_q_learning_sarsa()
5.9 Q-learning的重要性质
5.9.1 Off-policy学习
- 优势:可以从任何策略生成的数据中学习最优策略
- 应用:经验回放、从演示中学习
5.9.2 样本效率
- Q-learning通常比策略方法更高效
- 每个样本可以用于改进多个状态-动作对的估计
5.9.3 探索策略的影响
- ϵ \epsilon ϵ-贪心:简单但不是最优的探索策略
- UCB探索:考虑不确定性的探索
- Thompson采样:贝叶斯探索方法
小结
Q-learning是强化学习的里程碑算法,它:
- 实现了无模型学习:不需要知道环境动态
- 引入了off-policy概念:行为策略和目标策略可以不同
- 提供了收敛保证:在合适条件下收敛到最优策略
- 启发了后续发展:深度Q网络、经验回放等技术的基础
下一章我们将学习策略梯度方法,这是另一类重要的强化学习方法!
第六章:策略梯度 - 小智的直接策略学习
6.1 从价值函数到策略函数
到目前为止,我们一直在学习价值函数,然后从中提取策略。但是,能否直接学习策略呢?
策略梯度方法的核心思想:直接优化参数化的策略 π ( a ∣ s ; θ ) \pi(a|s;\theta) π(a∣s;θ),通过梯度上升最大化期望回报。
这就像小智不再需要记住每个位置-动作的价值,而是直接学习"在什么情况下应该做什么"的规则。
6.2 策略梯度的优势
6.2.1 处理连续动作空间
- Q-learning问题:连续动作空间难以处理 max a Q ( s , a ) \max_a Q(s,a) maxaQ(s,a)
- 策略梯度优势:可以直接输出连续动作的概率分布
6.2.2 随机策略的自然表示
- 有些游戏中混合策略是最优的(如石头剪刀布)
- 策略梯度天然处理随机策略
6.2.3 更好的收敛性质
- 在某些问题中,策略梯度有更好的收敛保证
- 避免了价值函数近似的复杂性
6.3 策略梯度的数学基础
6.3.1 策略参数化
假设策略由参数 θ \theta θ参数化:
π ( a ∣ s ; θ ) = P ( A t = a ∣ S t = s ; θ ) \pi(a|s;\theta) = P(A_t = a | S_t = s; \theta) π(a∣s;θ)=P(At=a∣St=s;θ)
目标:找到最优参数 θ ∗ \theta^* θ∗以最大化期望回报:
J ( θ ) = E π θ [ G 0 ] = E π θ [ ∑ t = 0 ∞ γ t R t + 1 ] J(\theta) = \mathbb{E}_{\pi_\theta}[G_0] = \mathbb{E}_{\pi_\theta}\left[\sum_{t=0}^\infty \gamma^t R_{t+1}\right] J(θ)=Eπθ[G0]=Eπθ[t=0∑∞γtRt+1]
6.3.2 策略梯度定理
核心问题:如何计算 ∇ θ J ( θ ) \nabla_\theta J(\theta) ∇θJ(θ)?
策略梯度定理(Sutton等人,2000):
∇ θ J ( θ ) = E π θ [ ∑ t = 0 ∞ γ t G t ∇ θ ln π ( A t ∣ S t ; θ ) ] \nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}\left[\sum_{t=0}^\infty \gamma^t G_t \nabla_\theta \ln \pi(A_t|S_t;\theta)\right] ∇θJ(θ)=Eπθ[t=0∑∞γtGt∇θlnπ(At∣St;θ)]
6.3.3 定理推导(简化版)
我们想要计算:
∇ θ J ( θ ) = ∇ θ E π θ [ G 0 ] \nabla_\theta J(\theta) = \nabla_\theta \mathbb{E}_{\pi_\theta}[G_0] ∇θJ(θ)=∇θEπθ[G0]
将期望展开为对所有可能轨迹的积分:
J ( θ ) = ∑ τ P ( τ ; θ ) G ( τ ) J(\theta) = \sum_\tau P(\tau;\theta) G(\tau) J(θ)=τ∑P(τ;θ)G(τ)
其中 τ = ( s 0 , a 0 , r 1 , s 1 , a 1 , r 2 , . . . ) \tau = (s_0, a_0, r_1, s_1, a_1, r_2, ...) τ=(s0,a0,r1,s1,a1,r2,...)是轨迹, P ( τ ; θ ) P(\tau;\theta) P(τ;θ)是轨迹概率。
使用对数导数技巧:
∇ θ P ( τ ; θ ) = P ( τ ; θ ) ∇ θ ln P ( τ ; θ ) \nabla_\theta P(\tau;\theta) = P(\tau;\theta) \nabla_\theta \ln P(\tau;\theta) ∇θP(τ;θ)=P(τ;θ)∇θlnP(τ;θ)
轨迹概率可以分解为:
P ( τ ; θ ) = ρ 0 ( s 0 ) ∏ t = 0 ∞ P ( s t + 1 ∣ s t , a t ) π ( a t ∣ s t ; θ ) P(\tau;\theta) = \rho_0(s_0) \prod_{t=0}^\infty P(s_{t+1}|s_t,a_t) \pi(a_t|s_t;\theta) P(τ;θ)=ρ0(s0)t=0∏∞P(st+1∣st,at)π(at∣st;θ)
只有策略项包含 θ \theta θ:
∇ θ ln P ( τ ; θ ) = ∑ t = 0 ∞ ∇ θ ln π ( a t ∣ s t ; θ ) \nabla_\theta \ln P(\tau;\theta) = \sum_{t=0}^\infty \nabla_\theta \ln \pi(a_t|s_t;\theta) ∇θlnP(τ;θ)=t=0∑∞∇θlnπ(at∣st;θ)
因此:
∇ θ J ( θ ) = E π θ [ G 0 ∑ t = 0 ∞ ∇ θ ln π ( A t ∣ S t ; θ ) ] \nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}\left[G_0 \sum_{t=0}^\infty \nabla_\theta \ln \pi(A_t|S_t;\theta)\right] ∇θJ(θ)=Eπθ[G0t=0∑∞∇θlnπ(At∣St;θ)]
6.4 REINFORCE算法
6.4.1 基本REINFORCE
算法思想:使用蒙特卡洛采样估计策略梯度。
算法步骤:
- 用当前策略生成回合 τ \tau τ
- 计算每个时刻的回报 G t G_t Gt
- 更新策略参数:
θ ← θ + α ∑ t = 0 T G t ∇ θ ln π ( A t ∣ S t ; θ ) \theta \leftarrow \theta + \alpha \sum_{t=0}^T G_t \nabla_\theta \ln \pi(A_t|S_t;\theta) θ←θ+αt=0∑TGt∇θlnπ(At∣St;θ)
6.4.2 REINFORCE的直观理解
- 好的动作:如果 G t G_t Gt高,增加选择 A t A_t At的概率
- 坏的动作:如果 G t G_t Gt低,减少选择 A t A_t At的概率
- 更新强度:与回报成正比
6.4.3 带基线的REINFORCE
问题:高方差导致学习不稳定。
解决方案:引入基线 b ( s ) b(s) b(s):
∇ θ J ( θ ) = E π θ [ ( G t − b ( S t ) ) ∇ θ ln π ( A t ∣ S t ; θ ) ] \nabla_\theta J(\theta) = \mathbb{E}_{\pi_\theta}[(G_t - b(S_t)) \nabla_\theta \ln \pi(A_t|S_t;\theta)] ∇θJ(θ)=Eπθ[(Gt−b(St))∇θlnπ(At∣St;θ)]
常用基线:状态价值函数 V ( s ) V(s) V(s)
θ ← θ + α ( G t − V ( S t ) ) ∇ θ ln π ( A t ∣ S t ; θ ) \theta \leftarrow \theta + \alpha (G_t - V(S_t)) \nabla_\theta \ln \pi(A_t|S_t;\theta) θ←θ+α(Gt−V(St))∇θlnπ(At∣St;θ)
其中 ( G t − V ( S t ) ) (G_t - V(S_t)) (Gt−V(St))称为优势函数 A ( s , a ) = Q ( s , a ) − V ( s ) A(s,a) = Q(s,a) - V(s) A(s,a)=Q(s,a)−V(s)的估计。
6.5 网格世界中的策略网络
6.5.1 策略网络设计
在我们的4×4网格世界中,我们可以用简单的神经网络:
输入:状态的one-hot编码(14维向量)
输出:5个动作的概率分布(softmax)
6.5.2 Softmax策略
π ( a ∣ s ; θ ) = exp ( θ a T ϕ ( s ) ) ∑ a ′ exp ( θ a ′ T ϕ ( s ) ) \pi(a|s;\theta) = \frac{\exp(\theta_a^T \phi(s))}{\sum_{a'} \exp(\theta_{a'}^T \phi(s))} π(a∣s;θ)=∑a′exp(θa′Tϕ(s))exp(θaTϕ(s))
其中 ϕ ( s ) \phi(s) ϕ(s)是状态特征向量。
梯度计算:
∇ θ ln π ( a ∣ s ; θ ) = ϕ ( s ) ( 1 a − π ( s ; θ ) ) \nabla_\theta \ln \pi(a|s;\theta) = \phi(s) (\mathbf{1}_{a} - \pi(s;\theta)) ∇θlnπ(a∣s;θ)=ϕ(s)(1a−π(s;θ))
其中 1 a \mathbf{1}_{a} 1a是第 a a a个位置为1的单位向量。
6.6 Python实现
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from collections import deque
import matplotlib.pyplot as pltclass PolicyNetwork(nn.Module):"""策略网络"""def __init__(self, state_size, action_size, hidden_size=64):super(PolicyNetwork, self).__init__()self.fc1 = nn.Linear(state_size, hidden_size)self.fc2 = nn.Linear(hidden_size, hidden_size)self.fc3 = nn.Linear(hidden_size, action_size)def forward(self, state):x = F.relu(self.fc1(state))x = F.relu(self.fc2(x))action_probs = F.softmax(self.fc3(x), dim=-1)return action_probsclass ValueNetwork(nn.Module):"""价值网络(用作基线)"""def __init__(self, state_size, hidden_size=64):super(ValueNetwork, self).__init__()self.fc1 = nn.Linear(state_size, hidden_size)self.fc2 = nn.Linear(hidden_size, hidden_size)self.fc3 = nn.Linear(hidden_size, 1)def forward(self, state):x = F.relu(self.fc1(state))x = F.relu(self.fc2(x))value = self.fc3(x)return valueclass PolicyGradientAgent:"""策略梯度智能体"""def __init__(self, state_size, action_size, lr_policy=1e-3, lr_value=1e-3, gamma=0.99):self.state_size = state_sizeself.action_size = action_sizeself.gamma = gamma# 神经网络self.policy_net = PolicyNetwork(state_size, action_size)self.value_net = ValueNetwork(state_size)# 优化器self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=lr_policy)self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr_value)# 经验存储self.reset_episode()# 统计信息self.episode_rewards = []self.episode_lengths = []self.policy_losses = []self.value_losses = []def reset_episode(self):"""重置回合数据"""self.states = []self.actions = []self.rewards = []self.log_probs = []def state_to_tensor(self, state_id, valid_states):"""将状态ID转换为one-hot张量"""state_vector = np.zeros(len(valid_states))if state_id in valid_states:state_vector[valid_states.index(state_id)] = 1.0return torch.FloatTensor(state_vector)def select_action(self, state_tensor):"""选择动作"""action_probs = self.policy_net(state_tensor)action_dist = torch.distributions.Categorical(action_probs)action = action_dist.sample()log_prob = action_dist.log_prob(action)return action.item(), log_probdef store_transition(self, state_tensor, action, reward, log_prob):"""存储转移"""self.states.append(state_tensor)self.actions.append(action)self.rewards.append(reward)self.log_probs.append(log_prob)def compute_returns(self):"""计算折扣回报"""returns = []G = 0for reward in reversed(self.rewards):G = reward + self.gamma * Greturns.insert(0, G)return torch.FloatTensor(returns)def update_policy(self):"""更新策略和价值网络"""if len(self.states) == 0:return# 计算回报returns = self.compute_returns()# 转换为张量states = torch.stack(self.states)log_probs = torch.stack(self.log_probs)# 计算状态价值values = self.value_net(states).squeeze()# 计算优势函数advantages = returns - values.detach()# 标准化优势(减少方差)advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)# 策略损失policy_loss = -(log_probs * advantages).mean()# 价值损失value_loss = F.mse_loss(values, returns)# 更新策略网络self.policy_optimizer.zero_grad()policy_loss.backward()self.policy_optimizer.step()# 更新价值网络self.value_optimizer.zero_grad()value_loss.backward()self.value_optimizer.step()# 记录损失self.policy_losses.append(policy_loss.item())self.value_losses.append(value_loss.item())return policy_loss.item(), value_loss.item()class PolicyGradientGridWorld(MDPGridWorld):"""策略梯度网格世界"""def __init__(self, success_prob=0.7, slip_prob=0.15, gamma=0.99):super().__init__(success_prob, slip_prob, gamma)# 策略梯度智能体self.agent = PolicyGradientAgent(state_size=self.num_states,action_size=self.num_actions,gamma=gamma)def run_episode(self, max_steps=1000):"""运行一个回合"""current_state_id = self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])self.agent.reset_episode()episode_reward = 0for step in range(max_steps):# 状态转换为张量state_tensor = self.agent.state_to_tensor(current_state_id, self.valid_states)# 选择动作action, log_prob = self.agent.select_action(state_tensor)# 执行动作next_state_id = self.simulate_transition(current_state_id, action)# 计算奖励current_state_idx = self.state_to_idx[current_state_id]next_state_idx = self.state_to_idx[next_state_id]reward = self.rewards[current_state_idx][action][next_state_idx]# 存储转移self.agent.store_transition(state_tensor, action, reward, log_prob)episode_reward += reward# 检查终止条件if next_state_id == goal_state_id:breakcurrent_state_id = next_state_idreturn episode_reward, step + 1def train(self, num_episodes=2000, verbose=True):"""训练策略梯度智能体"""print(f"开始策略梯度训练,回合数: {num_episodes}")for episode in range(num_episodes):# 运行回合episode_reward, episode_length = self.run_episode()# 更新策略policy_loss, value_loss = self.agent.update_policy()# 记录统计信息self.agent.episode_rewards.append(episode_reward)self.agent.episode_lengths.append(episode_length)if verbose and (episode + 1) % 200 == 0:avg_reward = np.mean(self.agent.episode_rewards[-100:])avg_length = np.mean(self.agent.episode_lengths[-100:])print(f"回合 {episode + 1}: 平均奖励={avg_reward:.2f}, "f"平均长度={avg_length:.1f}, "f"策略损失={policy_loss:.4f}, 价值损失={value_loss:.4f}")print("策略梯度训练完成!")def evaluate_policy(self, num_episodes=1000):"""评估学到的策略"""print("评估策略梯度学到的策略...")rewards = []lengths = []for _ in range(num_episodes):current_state_id = self.pos_to_state_id(self.start_pos[0], self.start_pos[1])goal_state_id = self.pos_to_state_id(self.goal_pos[0], self.goal_pos[1])episode_reward = 0steps = 0max_steps = 1000for step in range(max_steps):state_tensor = self.agent.state_to_tensor(current_state_id, self.valid_states)# 使用贪心策略(选择概率最大的动作)with torch.no_grad():action_probs = self.agent.policy_net(state_tensor)action = torch.argmax(action_probs).item()next_state_id = self.simulate_transition(current_state_id, action)current_state_idx = self.state_to_idx[current_state_id]next_state_idx = self.state_to_idx[next_state_id]reward = self.rewards[current_state_idx][action][next_state_idx]episode_reward += rewardsteps += 1if next_state_id == goal_state_id:breakcurrent_state_id = next_state_idrewards.append(episode_reward)lengths.append(steps)avg_reward = np.mean(rewards)avg_length = np.mean(lengths)success_rate = sum(1 for r in rewards if r > 50) / len(rewards)print(f"策略评估结果:")print(f" 平均奖励: {avg_reward:.2f}")print(f" 平均步数: {avg_length:.1f}")print(f" 成功率: {success_rate:.2%}")return {'avg_reward': avg_reward,'avg_length': avg_length,'success_rate': success_rate}def extract_learned_policy(self):"""提取学到的确定性策略"""policy = {}for state_id in self.valid_states:state_tensor = self.agent.state_to_tensor(state_id, self.valid_states)with torch.no_grad():action_probs = self.agent.policy_net(state_tensor)best_action = torch.argmax(action_probs).item()policy[state_id] = best_actionreturn policydef plot_training_progress(self):"""绘制训练进度"""fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))# 回合奖励ax1.plot(self.agent.episode_rewards)ax1.set_title('每回合奖励')ax1.set_xlabel('回合')ax1.set_ylabel('奖励')ax1.grid(True)# 移动平均奖励window = 100if len(self.agent.episode_rewards) >= window:moving_avg = np.convolve(self.agent.episode_rewards, np.ones(window)/window, mode='valid')ax2.plot(moving_avg)ax2.set_title(f'{window}回合移动平均奖励')ax2.set_xlabel('回合')ax2.set_ylabel('平均奖励')ax2.grid(True)# 策略损失ax3.plot(self.agent.policy_losses)ax3.set_title('策略损失')ax3.set_xlabel('回合')ax3.set_ylabel('损失')ax3.grid(True)# 价值损失ax4.plot(self.agent.value_losses)ax4.set_title('价值损失')ax4.set_xlabel('回合')ax4.set_ylabel('损失')ax4.grid(True)plt.tight_layout()return fig# 创建策略梯度环境并训练
print("=== 策略梯度实验 ===")
pg_env = PolicyGradientGridWorld()# 训练策略梯度
pg_env.train(num_episodes=1500)# 评估策略
pg_result = pg_env.evaluate_policy()# 提取并可视化学到的策略
learned_policy = pg_env.extract_learned_policy()
print("\n=== 策略梯度学到的策略 ===")
pg_env.visualize_policy(learned_policy)# 与Q-learning比较
print("\n=== 策略梯度 vs Q-learning 比较 ===")
print("重新训练Q-learning作为对比...")
q_env = QLearningGridWorld()
q_env.train_q_learning(num_episodes=1500, verbose=False)
q_result = q_env.evaluate_learned_policy(num_test_episodes=1000)print(f"\n{'方法':<15} | {'平均奖励':<10} | {'平均步数':<10} | {'成功率':<10}")
print("-" * 55)
print(f"{'策略梯度':<15} | {pg_result['avg_reward']:<10.2f} | "f"{pg_result['avg_length']:<10.1f} | {pg_result['success_rate']:<10.2%}")
print(f"{'Q-learning':<15} | {q_result['avg_reward']:<10.2f} | "f"{q_result['avg_length']:<10.1f} | {q_result['success_rate']:<10.2%}")# 绘制训练曲线(取消注释显示)
# fig = pg_env.plot_training_progress()
# plt.show()
6.7 策略梯度的优缺点
6.7.1 优点
- 直接优化目标:直接优化策略性能
- 连续动作空间:天然处理连续动作
- 随机策略:可以学习最优随机策略
- 收敛保证:在一定条件下保证收敛到局部最优
6.7.2 缺点
- 高方差:蒙特卡洛估计的方差较大
- 样本效率低:需要大量样本
- 局部最优:可能陷入局部最优解
- 超参数敏感:学习率等超参数需要仔细调节
6.8 方差减少技术
6.8.1 基线方法
- 状态价值基线: b ( s ) = V ( s ) b(s) = V(s) b(s)=V(s)
- 平均回报基线: b = G ˉ b = \bar{G} b=Gˉ
- 自适应基线:根据历史数据调整
6.8.2 优势函数
A ( s , a ) = Q ( s , a ) − V ( s ) A(s,a) = Q(s,a) - V(s) A(s,a)=Q(s,a)−V(s)
优势函数衡量动作 a a a相对于平均水平的好坏程度。
6.8.3 自然策略梯度
问题:普通梯度对参数化敏感。
解决方案:使用Fisher信息矩阵校正:
∇ ~ θ J ( θ ) = F ( θ ) − 1 ∇ θ J ( θ ) \tilde{\nabla}_\theta J(\theta) = F(\theta)^{-1} \nabla_\theta J(\theta) ∇~θJ(θ)=F(θ)−1∇θJ(θ)
小结
策略梯度方法开辟了强化学习的新方向:
- 直接策略优化:避免了价值函数的中间步骤
- 理论基础扎实:策略梯度定理提供了坚实的数学基础
- 灵活性强:可以处理连续动作和随机策略
- 发展空间大:为PPO、TRPO等高级算法奠定了基础
下一章我们将学习PPO(Proximal Policy Optimization),这是目前最成功的策略优化算法之一!# 完整强化学习教程第二部分:高级算法篇
第七章:PPO (Proximal Policy Optimization) - 小智的稳定学习
7.1 PPO的核心思想
问题:策略梯度方法步长难以控制
- 步长太小:学习慢
- 步长太大:策略突变,破坏已学知识
PPO解决方案:限制策略更新幅度,确保新策略不会偏离旧策略太远
7.2 剪切目标函数
重要性采样比:
r t ( θ ) = π θ ( a t ∣ s t ) π θ old ( a t ∣ s t ) r_t(\theta) = \frac{\pi_\theta(a_t|s_t)}{\pi_{\theta_{\text{old}}}(a_t|s_t)} rt(θ)=πθold(at∣st)πθ(at∣st)
剪切目标函数:
L CLIP ( θ ) = E t [ min ( r t ( θ ) A t , clip ( r t ( θ ) , 1 − ϵ , 1 + ϵ ) A t ) ] L^{\text{CLIP}}(\theta) = \mathbb{E}_t[\min(r_t(\theta)A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)A_t)] LCLIP(θ)=Et[min(rt(θ)At,clip(rt(θ),1−ϵ,1+ϵ)At)]
7.3 PPO算法
class PPOAgent:def __init__(self, state_size, action_size, lr=3e-4):self.policy_net = PolicyNetwork(state_size, action_size)self.value_net = ValueNetwork(state_size)self.clip_epsilon = 0.2def update_policy(self, states, actions, old_log_probs, advantages, returns):# 计算新的概率action_probs = self.policy_net(states)new_log_probs = torch.log(action_probs.gather(1, actions))# 重要性采样比ratio = torch.exp(new_log_probs - old_log_probs)# PPO剪切损失surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1-self.clip_epsilon, 1+self.clip_epsilon) * advantagespolicy_loss = -torch.min(surr1, surr2).mean()return policy_loss
第八章:深度Q网络 (DQN) - 小智的神经网络大脑
8.1 从表格到神经网络
核心思想:用神经网络 Q ( s , a ; θ ) Q(s,a;\theta) Q(s,a;θ)近似Q函数
- 输入:状态 s s s
- 输出:所有动作的Q值
8.2 DQN核心技术
8.2.1 经验回放
存储经验序列,随机采样进行学习,打破数据相关性
8.2.2 目标网络
使用固定的目标网络计算目标值,提高稳定性
8.3 DQN实现
class DQNAgent:def __init__(self, state_size, action_size):self.q_network = DQNNetwork(state_size, action_size)self.target_network = DQNNetwork(state_size, action_size)self.memory = ExperienceReplay()def learn(self):states, actions, rewards, next_states, dones = self.memory.sample(32)# 计算目标Q值target_q = rewards + 0.99 * self.target_network(next_states).max(1)[0] * ~dones# 计算当前Q值current_q = self.q_network(states).gather(1, actions)# 更新网络loss = F.mse_loss(current_q, target_q.unsqueeze(1))self.optimizer.zero_grad()loss.backward()self.optimizer.step()