当前位置: 首页 > news >正文

具身系列——Diffusion Policy算法实现CartPole游戏

代码原理分析

1. 核心思想

该代码实现了一个基于扩散模型(Diffusion Model)的强化学习策略网络。扩散模型通过逐步去噪过程生成动作,核心思想是:
前向过程:通过T步逐渐将专家动作添加高斯噪声,最终变成纯噪声
逆向过程:训练神经网络预测噪声,通过T步逐步去噪生成动作
数学基础:基于DDPM(Denoising Diffusion Probabilistic Models)框架

算法步骤
1.1 前向加噪:在动作空间逐步添加高斯噪声,将真实动作分布转化为高斯分布
q ( a t ∣ a t − 1 ) = N ( a t ; 1 − β t a t − 1 , β t I ) q(\mathbf{a}_t|\mathbf{a}_{t-1}) = \mathcal{N}(\mathbf{a}_t; \sqrt{1-\beta_t}\mathbf{a}_{t-1}, \beta_t\mathbf{I}) q(atat1)=N(at;1βt at1,βtI)
其中 β t \beta_t βt 为噪声调度参数(网页4][网页5][网页8])。

1.2 逆向去噪:基于观测 o t \mathbf{o}_t ot 条件去噪生成动作
p θ ( a t − 1 ∣ a t , o t ) = N ( a t − 1 ; μ θ ( a t , o t , t ) , Σ t ) p_\theta(\mathbf{a}_{t-1}|\mathbf{a}_t, \mathbf{o}_t) = \mathcal{N}(\mathbf{a}_{t-1}; \mu_\theta(\mathbf{a}_t, \mathbf{o}_t, t), \Sigma_t) pθ(at1at,ot)=N(at1;μθ(at,ot,t),Σt)
去噪网络 μ θ \mu_\theta μθ 预测噪声残差(网页5][网页6][网页8])。

1.3 训练目标:最小化噪声预测误差
L = E t , a 0 , ϵ [ ∥ ϵ − ϵ θ ( α t a 0 + 1 − α t ϵ , o t , t ) ∥ 2 ] \mathcal{L} = \mathbb{E}_{t,\mathbf{a}_0,\epsilon}\left[ \|\epsilon - \epsilon_\theta(\sqrt{\alpha_t}\mathbf{a}_0 + \sqrt{1-\alpha_t}\epsilon, \mathbf{o}_t, t)\|^2 \right] L=Et,a0,ϵ[ϵϵθ(αt a0+1αt ϵ,ot,t)2]
其中 α t = ∏ s = 1 t ( 1 − β s ) \alpha_t = \prod_{s=1}^t (1-\beta_s) αt=s=1t(1βs)(网页4][网页8][网页11])。

2. 关键数学公式

前向过程(扩散过程):

q(a_t|a_{t-1}) = N(a_t; √(α_t)a_{t-1}, (1-α_t)I)
α_t = 1 - β_t,ᾱ_t = ∏_{i=1}^t α_i
a_t = √ᾱ_t a_0 + √(1-ᾱ_t)ε,其中ε ~ N(0,I)

训练目标(噪声预测):

L = ||ε - ε_θ(a_t, s, t)||^2

逆向过程(采样过程):

p_θ(a_{t-1}|a_t) = N(a_{t-1}; μ_θ(a_t, s, t), Σ_t)
μ_θ = 1/√α_t (a_t - β_t/√(1-ᾱ_t) ε_θ)

逐行代码注释

import torch
import gymnasium as gym
import numpy as np

class DiffusionPolicy(torch.nn.Module):
    def __init__(self, state_dim=4, action_dim=2, T=20):
        super().__init__()
        self.T = T  # 扩散过程总步数
        self.betas = torch.linspace(1e-4, 0.02, T)  # 噪声方差调度
        self.alphas = 1 - self.betas  # 前向过程参数
        self.alpha_bars = torch.cumprod(self.alphas, dim=0)  # 累积乘积ᾱ
        
        # 去噪网络(输入维度:state(4) + action(2) + timestep(1) = 7)
        self.denoiser = torch.nn.Sequential(
            torch.nn.Linear(7, 64),  # 输入层
            torch.nn.ReLU(),          # 激活函数
            torch.nn.Linear(64, 2)    # 输出预测的噪声
        )
        self.optimizer = torch.optim.Adam(self.denoiser.parameters(), lr=1e-3)

    def train_step(self, states, expert_actions):
        batch_size = states.size(0)
        t = torch.randint(0, self.T, (batch_size,))  # 随机采样时间步
        alpha_bar_t = self.alpha_bars[t].unsqueeze(1)  # 获取对应ᾱ_t
        
        # 前向加噪(公式实现)
        noise = torch.randn_like(expert_actions)  # 生成高斯噪声
        noisy_actions = torch.sqrt(alpha_bar_t) * expert_actions + \
                      torch.sqrt(1 - alpha_bar_t) * noise  # 公式(2)
        
        # 输入拼接(状态、加噪动作、归一化时间步)
        inputs = torch.cat([
            states, 
            noisy_actions,
            (t.float() / self.T).unsqueeze(1)  # 时间步归一化到[0,1]
        ], dim=1)  # 最终维度:batch_size x 7
        
        pred_noise = self.denoiser(inputs)  # 预测噪声
        loss = torch.mean((noise - pred_noise)**2)  # MSE损失
        return loss

    def sample_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        a_t = torch.randn(1, 2)  # 初始化为随机噪声(动作维度2)
        
        # 逆向去噪过程(需要补全)
        for t in reversed(range(self.T)):
            # 应实现的步骤:
            # 1. 获取当前时间步参数
            # 2. 拼接输入(状态,当前动作,时间步)
            # 3. 预测噪声ε_θ
            # 4. 根据公式计算均值μ
            # 5. 采样新动作(最后一步不添加噪声)
            pass
        
        return a_t.detach().numpy()[0]  # 返回最终动作

执行过程详解

训练流程
  1. 随机采样时间步:为每个样本随机选择扩散步t ∈ [0, T-1]
  2. 前向加噪:根据公式将专家动作添加对应程度的噪声
  3. 输入构造:拼接状态、加噪动作和归一化时间步
  4. 噪声预测:神经网络预测添加的噪声
  5. 损失计算:最小化预测噪声与真实噪声的MSE
采样流程(需补全)
  1. 初始化:从高斯噪声开始
  2. 迭代去噪:从t=T到t=1逐步去噪
    • 根据当前动作和状态预测噪声
    • 计算前一步的均值
    • 添加随机噪声(最后一步除外)
  3. 输出:得到最终去噪后的动作

关键改进建议

  1. 实现逆向过程:需要补充时间步循环和去噪公式
  2. 添加方差调度:在采样时使用更复杂的方差计算
  3. 时间步嵌入:可以使用正弦位置编码代替简单归一化
  4. 网络结构优化:考虑使用Transformer或条件批归一化

该实现展示了扩散策略的核心思想,但完整的扩散策略还需要实现完整的逆向采样过程,并可能需要调整噪声调度参数以获得更好的性能。

最终可执行代码:

import torch
import gymnasium as gym
import numpy as np

class DiffusionPolicy(torch.nn.Module):
    def __init__(self, state_dim=4, action_dim=2, T=20):
        super().__init__()
        self.T = T
        self.betas = torch.linspace(1e-4, 0.02, T)
        self.alphas = 1 - self.betas
        self.alpha_bars = torch.cumprod(self.alphas, dim=0)
        
        # 去噪网络(输入维度:4+2+1=7)
        self.denoiser = torch.nn.Sequential(
            torch.nn.Linear(7, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 2)
        )
        self.optimizer = torch.optim.Adam(self.denoiser.parameters(), lr=1e-3)

    def train_step(self, states, expert_actions):
        batch_size = states.size(0)
        t = torch.randint(0, self.T, (batch_size,))
        alpha_bar_t = self.alpha_bars[t].unsqueeze(1)
        
        # 前向加噪公式[2](@ref)
        noise = torch.randn_like(expert_actions)
        noisy_actions = torch.sqrt(alpha_bar_t) * expert_actions + torch.sqrt(1 - alpha_bar_t) * noise
        
        # 输入拼接(维度对齐)[1](@ref)
        inputs = torch.cat([
            states, 
            noisy_actions,
            (t.float() / self.T).unsqueeze(1)
        ], dim=1)  # 最终维度:batch_size x 7
        
        pred_noise = self.denoiser(inputs)
        loss = torch.mean((noise - pred_noise)**2)
        return loss

    def sample_action(self, state):
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        a_t = torch.randn(1, 2)  # 二维动作空间[2](@ref)
        
        # 逆向去噪过程[2](@ref)
        for t in reversed(range(self.T)):
            alpha_t = self.alphas[t]
            alpha_bar_t = self.alpha_bars[t]
            
            inputs = torch.cat([
                state_tensor,
                a_t,
                torch.tensor([[t / self.T]], dtype=torch.float32)
            ], dim=1)
            
            pred_noise = self.denoiser(inputs)
            a_t = (a_t - (1 - alpha_t)/torch.sqrt(1 - alpha_bar_t) * pred_noise) / torch.sqrt(alpha_t)
            if t > 0:
                a_t += torch.sqrt(self.betas[t]) * torch.randn_like(a_t)
        
        return torch.argmax(a_t).item()  # 离散动作选择[1](@ref)

if __name__ == "__main__":
    env = gym.make('CartPole-v1')
    policy = DiffusionPolicy()
    
    # 关键修复:确保状态数据维度统一[1,2](@ref)
    states, actions = [], []
    state, _ = env.reset()
    for _ in range(1000):
        action = env.action_space.sample()
        next_state, _, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        # 强制转换状态为numpy数组并检查维度[2](@ref)
        state = np.array(state, dtype=np.float32).flatten()
        if len(state) != 4:
            raise ValueError(f"Invalid state shape: {state.shape}")
            
        states.append(state)  # 确保每个状态是(4,)的数组
        actions.append(action)
        
        if done:
            state, _ = env.reset()
        else:
            state = next_state
    
    # 维度验证与转换[1](@ref)
    states_array = np.stack(states)  # 强制转换为(1000,4)
    if states_array.shape != (1000,4):
        raise ValueError(f"States shape error: {states_array.shape}")
    
    actions_onehot = np.eye(2)[np.array(actions)]  # 转换为one-hot编码[2](@ref)
    states_tensor = torch.FloatTensor(states_array)
    actions_tensor = torch.FloatTensor(actions_onehot)
    
    # 训练循环
    for epoch in range(100):
        loss = policy.train_step(states_tensor, actions_tensor)
        policy.optimizer.zero_grad()
        loss.backward()
        policy.optimizer.step()
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
    
    # 测试
    state, _ = env.reset()
    for _ in range(200):
        action = policy.sample_action(state)
        state, _, done, _, _ = env.step(action)
        if done: break

相关文章:

  • Jetpack LiveData 使用与原理解析
  • Hosts文件屏蔽广告/恶意网站的原理详解
  • Ansys Zemax | 联合Speos实现供应商与OEM交换黑盒光学系统
  • MongoDB 创建数据库
  • 3个版本的Unity项目的异同
  • vue3 ts 封装axios,配置axios前置拦截器,让所有axios请求携带token
  • 使用docker部署springboot、Vue分离项目,部署到主路径
  • Docker技术系列文章,第八篇——Docker 安全基础
  • 华鲲振宇天工TG225 B1国产服务器试装openEuler22.03 -SP4系统
  • LabVIEW多CAN设备连接故障
  • SICAR 标准 KUKA 机器人标准功能块说明手册
  • 激光线检测算法的FPGA实现
  • MyBatis 动态 SQL 优化:标签的实战与技巧
  • u盘文件夹删除没反应的解决办法
  • 语言合成模型Spark-TTS-0.5B学习笔记
  • Java为什么要使用线程池?
  • 【深度学习与实战】2.3、线性回归模型与梯度下降法先导案例--最小二乘法(向量形式求解)
  • 用Python和Stable Diffusion生成AI动画:从图像到视频的全流程指南
  • MYSQL基本语法使用
  • java八股文之JVM
  • 国有六大行一季度合计净赚超3444亿,不良贷款余额均上升
  • 专访|200余起诉讼,特朗普上台100天,美国已进入宪政危机
  • 普京与卢卡申科举行会晤,将扩大在飞机制造等领域合作
  • 运动健康|不同能力跑者,跑步前后营养补给差别这么大?
  • 农业农村部:把住能繁母猪存栏量“总开关”,引导养殖场户优化母猪存栏结构、合理控制产能
  • 我国首部《人工智能气象应用服务办法》今天发布