当前位置: 首页 > news >正文

【深度强化学习】MIP-DQN 实现案例(完整Python代码)

目录

  • MIP-DQN 算法概述
    • 建模基础
    • 训练阶段(Training)
    • 部署阶段(Online Execution)
    • DNN 网络转化为 MIP 表达式
    • 性能指标
  • 完整 Python 代码实现
    • 主函数:random_generator_battery
    • 模型函数:MIP_DQN
      • 基础/专用库包安装
      • 模型运行(完整Python代码)
    • 参数设置函数:Parameters
  • 参考

本博客根据论文《Optimal energy system scheduling using a constraint-aware reinforcement learning algorithm》(2023)中提出的 MIP-DQN 算法,对其完整研究方法进行详细介绍。

内容包括:输入、输出,状态空间与动作空间定义,训练过程,在线部署过程,以及如何将 DNN 转换为 MIP 进行约束优化。

MIP-DQN 算法概述

MIP-DQN(Mixed-Integer Programming Deep Q-Network)是一种 值函数驱动的深度强化学习算法,与传统 深度强化学习DRL 不同,它在 执行阶段能够严格满足操作约束,例如功率平衡、爬坡限制等。

建模基础

输入(State 状态空间):

每个时刻 ( t ) 的状态 ( s_t ) 包括:

  • ( P^V_t ):当前时刻的光伏出力(向量)
  • ( P^L_t ):当前时刻的用户负荷(向量)
  • ( P^G_{t-1} ):上一时刻 DG(分布式发电机)出力(向量)
  • ( SOC_t ):当前时刻储能系统(ESS)电量状态(向量)
s_t = (P^V_t, P^L_t, P^G_{t-1}, SOC_t)

在这里插入图片描述


输出(Action 动作空间):

每个时刻 ( t ) 的动作 ( a_t ) 包括:

  • ( P^G_{i,t} ):每个 DG 单元的出力(连续变量)
  • ( P^B_{j,t} ):每个 ESS 的充放电功率(连续变量)
a_t = (P^G_{i,t}, P^B_{j,t})

注意:系统与电网的交易功率 ( P^N_t ) 不由 DRL 控制,它由系统自动调整以维持功率平衡。

训练阶段(Training)

🎯 目标:
学习一个 Q 函数,估计在状态 ( s_t ) 下采取动作 ( a_t ) 所带来的期望回报。
在这里插入图片描述

奖励函数定义(Reward Function):

R(s_t, a_t) = -σ_1 × \text{运行成本} - σ_2 × \text{功率不平衡}
  • 运行成本 = DG 成本 + 与电网交易成本
  • 功率不平衡 ( ΔP_t ):公式如下:
ΔP_t = \left| \sum_i P^G_{i,t} + \sum_j P^B_{j,t} + \sum_m P^V_{m,t} + P^N_t - \sum_k P^L_{k,t} \right|

在这里插入图片描述
训练流程(见算法 1):

  1. 初始化 Q 网络 ( Q_\theta(s, a) )、目标网络 ( Q_{\theta_{target}} ),以及策略网络 ( \pi_\omega )
  2. 采样动作:从策略网络 ( \pi_\omega(s) ) 加上噪声(用于探索)
  3. 与环境交互:得到 ( (s_t, a_t, r_t, s_{t+1}) )
  4. 存入回放缓冲区 Replay Buffer
  5. 采样 mini-batch 批量更新:
    • 更新 Q 网络:使用目标网络计算 target Q 值
    • 更新策略网络:最大化 Q 网络的期望值
  6. 定期软更新目标网络:( \theta_{target} = \tau \theta + (1 - \tau)\theta_{target} )
    在这里插入图片描述

部署阶段(Online Execution)

训练完成后,丢弃策略网络 ( \pi_\omega ),仅保留 Q 网络 ( Q_\theta(s, a) ),并将其转化为 带约束的整数规划问题(MIP) 进行决策。

步骤如下(见算法 2):

  1. 将训练好的 DNN ( Q_\theta(s, a) ) 转化为 MIP 表达形式(见下节)
  2. 添加操作约束:
    • 功率平衡约束(等式)
    • DG 出力上下限、爬坡约束
    • ESS 充放电限制、SOC 约束等
  3. 使用商业 MIP 求解器(如 Gurobi)求解:
\max_{a \in \mathcal{A}} Q_\theta(s, a) \quad \text{subject to all constraints}

在这里插入图片描述

求解结果中的 ( a^* ) 即为当前状态下最优可行动作!

在这里插入图片描述

DNN 网络转化为 MIP 表达式

假设 Q 网络结构为 多层前馈神经网络(ReLU 激活),则每一层输出满足:

x^k = \text{ReLU}(W^{k-1} x^{k-1} + b^{k-1})

ReLU 可通过如下线性混合整数形式建模:

对于每一个神经元 ( x ):

x = \max(0, \hat{x}) \Rightarrow
\begin{cases}
x \geq 0 \\
x \geq \hat{x} \\
x \leq M z \\
x - \hat{x} \leq M (1 - z) \\
z \in \{0, 1\}
\end{cases}

其中 ( z ) 是二进制变量,( M ) 是足够大的常数。

性能指标

DRL 算法性能评估指标:

  • 运行成本:目标函数(越小越好)
  • 功率不平衡(ΔP):是否满足功率平衡(越小越好)
  • 算法运行时间:是否满足实时性要求

完整 Python 代码实现

所用工具:

  • PyTorch:训练 Q 网络和策略网络
  • OMLT(Optimization and Machine Learning Toolkit):将 PyTorch 模型转为 MIP 形式
  • Gurobi / CPLEX / CBC:商业混合整数规划求解器
  • Pyomo:建模数学规划问题
  • OpenAI Gym / 自定义环境:训练环境

主函数:random_generator_battery

此段代码构建了一个 能源管理环境 ESSEnv,用于模拟以下多个能源设备的运行:

  • 🔋 电池系统(Battery)
  • ⚡ 三台分布式发电机(DG1、DG2、DG3)
  • 🌞 光伏发电(PV)
  • 🔌 电网供电(Grid)
  • 📈 能源价格、负荷、电量数据(通过 DataManager 管理)

该环境继承自 gym.Env,兼容 OpenAI Gym 强化学习 API,支持 reset()step()render() 等接口,用于训练强化学习智能体。

模型函数:MIP_DQN

基础/专用库包安装

安装基础依赖:

conda install numpy pandas matplotlib scikit-learn -y
conda install pytorch torchvision -c pytorch -y

安装 Pyomo(用于建模 MIP):

conda install -c conda-forge pyomo -y

安装 Gurobi(用于求解 MIP)

⚠️注意: Gurobi 是商业软件,需注册并获取 Academic License(免费用于学术用途)

官网-Gurobi Optimization。Gurobi的安装及 license 获取可参考我的另一博客-。

1. 安装 Gurobi

conda install -c gurobi gurobi -y

2. 设置 license(首次使用)

grbgetkey <your-license-key>grbgetkeyex: grbgetkey ae36ac20-16e6-acd2-f242-4da6e765fa0a

然后按提示操作。若你已有 gurobi.lic 文件,请放在 ~/.gurobi/ 或 C:\gurobi\ 目录下。
在这里插入图片描述


安装 OMLT(Optimization and Machine Learning Toolkit,优化与机器学习集成工具)

pip install omltconda install omlt

注意:OMLT 依赖 pyomo 和 onnx,会自动安装。


安装 ONNX(用于神经网络转模型)

pip install onnx onnxruntime

说明,onnx 是模型格式,onnxruntime 是运行时推理引擎


安装 Weights & Biases(可选,日志可视化工具)
wandb 是一个轻量级第三方 Python 包,不依赖底层 C/C++ 库,pip 安装非常稳定,适合用于 Conda 环境中。

官网-wandb,进去官网后注册即可获取个人API。

下载命令如下:

pip install wandb

输入以下命令,按照指示输入个人API即可。

wandb login

模型运行(完整Python代码)

import pickle
import torch
import os
import numpy as np
import numpy.random as rd
import pandas as pd
import pyomo.environ as pyo
import pyomo.kernel as pmo
from omlt import OmltBlock
from gurobipy import *
from omlt.neuralnet import NetworkDefinition, FullSpaceNNFormulation,ReluBigMFormulation
from omlt.io.onnx import write_onnx_model_with_bounds,load_onnx_neural_network_with_bounds
import tempfile
import torch.onnx
import torch.nn as nn
from copy import deepcopy
import wandb
from random_generator_battery import ESSEnv## define net
# 经验回放缓冲区 ReplayBuffer:用于存储 agent 与环境交互的轨迹,以供后续训练使用。
class ReplayBuffer:def __init__(self, max_len, state_dim, action_dim, gpu_id=0):self.now_len = 0self.next_idx = 0self.if_full = Falseself.max_len = max_lenself.data_type = torch.float32self.action_dim = action_dimself.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")other_dim = 1 + 1 + self.action_dimself.buf_other = torch.empty(size=(max_len, other_dim), dtype=self.data_type, device=self.device)if isinstance(state_dim, int):  # state is pixelself.buf_state = torch.empty((max_len, state_dim), dtype=torch.float32, device=self.device)elif isinstance(state_dim, tuple):self.buf_state = torch.empty((max_len, *state_dim), dtype=torch.uint8, device=self.device)else:raise ValueError('state_dim')# extend_buffer():添加新数据def extend_buffer(self, state, other):  # CPU array to CPU arraysize = len(other)next_idx = self.next_idx + sizeif next_idx > self.max_len:self.buf_state[self.next_idx:self.max_len] = state[:self.max_len - self.next_idx]self.buf_other[self.next_idx:self.max_len] = other[:self.max_len - self.next_idx]self.if_full = Truenext_idx = next_idx - self.max_lenself.buf_state[0:next_idx] = state[-next_idx:]self.buf_other[0:next_idx] = other[-next_idx:]else:self.buf_state[self.next_idx:next_idx] = stateself.buf_other[self.next_idx:next_idx] = otherself.next_idx = next_idx# sample_batch():采样 mini-batch(训练用)def sample_batch(self, batch_size) -> tuple:indices = rd.randint(self.now_len - 1, size=batch_size)r_m_a = self.buf_other[indices]return (r_m_a[:, 0:1],r_m_a[:, 1:2],r_m_a[:, 2:],self.buf_state[indices],self.buf_state[indices + 1])def update_now_len(self):self.now_len = self.max_len if self.if_full else self.next_idxclass Arguments:def __init__(self, agent=None, env=None):self.agent = agent  # Deep Reinforcement Learning algorithmself.env = env  # the environment for trainingself.cwd = None  # current work directory. None means set automaticallyself.if_remove = False  # remove the cwd folder? (True, False, None:ask me)self.visible_gpu = '0,1,2,3'  # for example: os.environ['CUDA_VISIBLE_DEVICES'] = '0, 2,'self.worker_num = 2  # rollout workers number pre GPU (adjust it to get high GPU usage)self.num_threads = 8  # cpu_num for evaluate model, torch.set_num_threads(self.num_threads)self.if_per_or_gae = False'''Arguments for training'''self.num_episode=3000self.gamma = 0.995  # discount factor of future rewardsself.learning_rate = 1e-4  # 2 ** -14 ~= 6e-5self.soft_update_tau = 1e-2  # 2 ** -8 ~= 5e-3self.net_dim = 64  # the network width 256self.batch_size = 256  # num of transitions sampled from replay buffer.self.repeat_times = 2 ** 3  # repeatedly update network to keep critic's loss smallself.target_step = 1000 # collect target_step experiences , then update network, 1024self.max_memo = 50000  # capacity of replay buffer## arguments for controlling explorationself.explorate_decay=0.99self.explorate_min=0.3'''Arguments for evaluate'''self.random_seed_list=[1234,2234,3234,4234,5234]# self.random_seed_list=[2234]self.run_name='MIP_DQN_experiments''''Arguments for save'''self.train=Trueself.save_network=Truedef init_before_training(self, if_main):if self.cwd is None:agent_name = self.agent.__class__.__name__self.cwd = f'./{agent_name}/{self.run_name}'if if_main:import shutil  # remove history according to bool(if_remove)if self.if_remove is None:self.if_remove = bool(input(f"| PRESS 'y' to REMOVE: {self.cwd}? ") == 'y')elif self.if_remove:shutil.rmtree(self.cwd, ignore_errors=True)print(f"| Remove cwd: {self.cwd}")os.makedirs(self.cwd, exist_ok=True)np.random.seed(self.random_seed)torch.manual_seed(self.random_seed)torch.set_num_threads(self.num_threads)torch.set_default_dtype(torch.float32)os.environ['CUDA_VISIBLE_DEVICES'] = str(self.visible_gpu)# control how many GPU is used  # 模型定义 Actor 网络(策略网络)
class Actor(nn.Module):def __init__(self,mid_dim,state_dim,action_dim):super().__init__()self.net=nn.Sequential(nn.Linear(state_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,action_dim))def forward(self,state):return self.net(state).tanh()# make the data from -1 to 1# 用于探索时加入噪声def get_action(self,state,action_std):#action=self.net(state).tanh()noise=(torch.randn_like(action)*action_std).clamp(-0.5,0.5)#return (action+noise).clamp(-1.0,1.0)# 模型定义 CriticQ 网络(双 Q 网络)
class CriticQ(nn.Module):def __init__(self,mid_dim,state_dim,action_dim):super().__init__()self.net_head=nn.Sequential(nn.Linear(state_dim+action_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU())self.net_q1=nn.Sequential(nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,1))# we get q1 valueself.net_q2=nn.Sequential(nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,1))# we get q2 valuedef forward(self,value):mid=self.net_head(value)return self.net_q1(mid)def get_q1_q2(self,value):mid=self.net_head(value)return self.net_q1(mid),self.net_q2(mid)class AgentBase:def __init__(self):self.state = Noneself.device = Noneself.action_dim = Noneself.if_off_policy = Noneself.explore_noise = Noneself.trajectory_list = Noneself.explore_rate = 1.0self.criterion = torch.nn.SmoothL1Loss()def init(self, net_dim, state_dim, action_dim, learning_rate=1e-4, _if_per_or_gae=False, gpu_id=0):self.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")self.action_dim = action_dimself.cri = self.ClassCri(net_dim, state_dim, action_dim).to(self.device)self.act = self.ClassAct(net_dim, state_dim, action_dim).to(self.device) if self.ClassAct else self.criself.cri_target = deepcopy(self.cri) if self.if_use_cri_target else self.criself.act_target = deepcopy(self.act) if self.if_use_act_target else self.actself.cri_optim = torch.optim.Adam(self.cri.parameters(), learning_rate)self.act_optim = torch.optim.Adam(self.act.parameters(),learning_rate) if self.ClassAct else self.cridel self.ClassCri, self.ClassActdef select_action(self, state) -> np.ndarray:states = torch.as_tensor((state,), dtype=torch.float32, device=self.device)action = self.act(states)[0]if rd.rand()<self.explore_rate:action = (action + torch.randn_like(action) * self.explore_noise).clamp(-1, 1)return action.detach().cpu().numpy()def explore_env(self, env, target_step):trajectory = list()state = self.statefor _ in range(target_step):action = self.select_action(state)state, next_state, reward, done, = env.step(action)trajectory.append((state, (reward, done, *action)))state = env.reset() if done else next_stateself.state = statereturn trajectory@staticmethoddef optim_update(optimizer, objective):optimizer.zero_grad()objective.backward()optimizer.step()@staticmethoddef soft_update(target_net, current_net, tau):for tar, cur in zip(target_net.parameters(), current_net.parameters()):tar.data.copy_(cur.data * tau + tar.data * (1.0 - tau))def save_or_load_agent(self, cwd, if_save):def load_torch_file(model_or_optim, _path):state_dict = torch.load(_path, map_location=lambda storage, loc: storage)model_or_optim.load_state_dict(state_dict)name_obj_list = [('actor', self.act), ('act_target', self.act_target), ('act_optim', self.act_optim),('critic', self.cri), ('cri_target', self.cri_target), ('cri_optim', self.cri_optim), ]name_obj_list = [(name, obj) for name, obj in name_obj_list if obj is not None]if if_save:for name, obj in name_obj_list:save_path = f"{cwd}/{name}.pth"torch.save(obj.state_dict(), save_path)else:for name, obj in name_obj_list:save_path = f"{cwd}/{name}.pth"load_torch_file(obj, save_path) if os.path.isfile(save_path) else Nonedef _update_exploration_rate(self,explorate_decay,explore_rate_min):self.explore_rate = max(self.explore_rate * explorate_decay, explore_rate_min)'''this function is used to update the explorate probability when select action'''# 模型定义 AgentMIPDQN 算法(继承基础类 AgentBase)
class AgentMIPDQN(AgentBase):def __init__(self):super().__init__()self.explore_noise = 0.5  # standard deviation of exploration noiseself.policy_noise = 0.2  # standard deviation of policy noiseself.update_freq = 2  # delay update frequencyself.if_use_cri_target = self.if_use_act_target = Trueself.ClassCri = CriticQself.ClassAct = Actor# update_net():更新策略网络和双 Q 网络def update_net(self, buffer, batch_size, repeat_times, soft_update_tau) -> tuple:buffer.update_now_len()obj_critic = obj_actor = Nonefor update_c in range(int(buffer.now_len / batch_size * repeat_times)):# we update too much time?obj_critic, state = self.get_obj_critic(buffer, batch_size)self.optim_update(self.cri_optim, obj_critic)action_pg = self.act(state)  # policy gradientobj_actor = -self.cri_target(torch.cat((state, action_pg),dim=-1)).mean()  # use cri_target instead of cri for stable trainingself.optim_update(self.act_optim, obj_actor)if update_c % self.update_freq == 0:  # delay updateself.soft_update(self.cri_target, self.cri, soft_update_tau)self.soft_update(self.act_target, self.act, soft_update_tau)return obj_critic.item() / 2, obj_actor.item()# get_obj_critic():获取目标 Q 值并计算 critic lossdef get_obj_critic(self, buffer, batch_size) -> (torch.Tensor, torch.Tensor):with torch.no_grad():reward, mask, action, state, next_s = buffer.sample_batch(batch_size)next_a = self.act_target.get_action(next_s, self.policy_noise)  # policy noise,next_q = torch.min(*self.cri_target.get_q1_q2(torch.cat((next_s, next_a),dim=-1)))  # twin criticsq_label = reward + mask * next_qq1, q2 = self.cri.get_q1_q2(torch.cat((state, action),dim=-1))obj_critic = self.criterion(q1, q_label) + self.criterion(q2, q_label)  # twin criticsreturn obj_critic, statedef update_buffer(_trajectory):ten_state = torch.as_tensor([item[0] for item in _trajectory], dtype=torch.float32)ary_other = torch.as_tensor([item[1] for item in _trajectory])ary_other[:, 0] = ary_other[:, 0]   # ten_rewardary_other[:, 1] = (1.0 - ary_other[:, 1]) * gamma  # ten_mask = (1.0 - ary_done) * gammabuffer.extend_buffer(ten_state, ary_other)_steps = ten_state.shape[0]_r_exp = ary_other[:, 0].mean()  # other = (reward, mask, action)return _steps, _r_expdef get_episode_return(env, act, device):'''get information of one episode during the training'''episode_return = 0.0  # sum of rewards in an episodeepisode_unbalance=0.0episode_operation_cost=0.0state = env.reset()for i in range(24):s_tensor = torch.as_tensor((state,), device=device)a_tensor = act(s_tensor)action = a_tensor.detach().cpu().numpy()[0]  # not need detach(), because with torch.no_grad() outsidestate, next_state, reward, done,= env.step(action)state=next_stateepisode_return += rewardepisode_unbalance+=env.real_unbalanceepisode_operation_cost+=env.operation_costif done:breakreturn episode_return,episode_unbalance,episode_operation_cost# 网络导出与 MIP 调用模块 Actor_MIP
# 用于将训练好的 Q 网络转换成 ONNX + OMLT + Pyomo 表达形式,并结合 Gurobi 求解最优动作
class Actor_MIP:'''this actor is used to get the best action and Q function, the only input should be batch tensor state, action, and network, while the output should bebatch tensor max_action, batch tensor max_Q'''def __init__(self,scaled_parameters,batch_size,net,state_dim,action_dim,env,constrain_on=False):self.batch_size = batch_sizeself.net = netself.state_dim = state_dimself.action_dim =action_dimself.env = envself.constrain_on=constrain_onself.scaled_parameters=scaled_parametersdef get_input_bounds(self,input_batch_state):batch_size = self.batch_sizebatch_input_bounds = []lbs_states = input_batch_state.detach().numpy()ubs_states = lbs_statesfor i in range(batch_size):input_bounds = {}for j in range(self.action_dim + self.state_dim):if j < self.state_dim:input_bounds[j] = (float(lbs_states[i][j]), float(ubs_states[i][j]))else:input_bounds[j] = (float(-1), float(1))batch_input_bounds.append(input_bounds)return batch_input_boundsdef predict_best_action(self, state):state=state.detach().cpu().numpy()v1 = torch.zeros((1, self.state_dim+self.action_dim), dtype=torch.float32)'''this function is used to get the best action based on current net'''model = self.net.to('cpu')input_bounds = {}lb_state = stateub_state = statefor i in range(self.action_dim + self.state_dim):if i < self.state_dim:input_bounds[i] = (float(lb_state[0][i]), float(ub_state[0][i]))else:input_bounds[i] = (float(-1), float(1))with tempfile.NamedTemporaryFile(suffix='.onnx', delete=False) as f:# export neural network to ONNXtorch.onnx.export(model,v1,f,input_names=['state_action'],output_names=['Q_value'],dynamic_axes={'state_action': {0: 'batch_size'},'Q_value': {0: 'batch_size'}})# write ONNX model and its bounds using OMLTwrite_onnx_model_with_bounds(f.name, None, input_bounds)# load the network definition from the ONNX modelnetwork_definition = load_onnx_neural_network_with_bounds(f.name)# global optimalityformulation = ReluBigMFormulation(network_definition)m = pyo.ConcreteModel()m.nn = OmltBlock()m.nn.build_formulation(formulation)'''# we are now building the surrogate model between action and state'''# constrain for battery,if self.constrain_on:m.power_balance_con1 = pyo.Constraint(expr=((-m.nn.inputs[7] * self.scaled_parameters[0])+\((m.nn.inputs[8] * self.scaled_parameters[1])+m.nn.inputs[4]*self.scaled_parameters[5]) +\((m.nn.inputs[9] * self.scaled_parameters[2])+m.nn.inputs[5]*self.scaled_parameters[6]) +\((m.nn.inputs[10] * self.scaled_parameters[3])+m.nn.inputs[6]*self.scaled_parameters[7])>=\m.nn.inputs[3] *self.scaled_parameters[4]-self.env.grid.exchange_ability))m.power_balance_con2 = pyo.Constraint(expr=((-m.nn.inputs[7] * self.scaled_parameters[0])+\(m.nn.inputs[8] * self.scaled_parameters[1]+m.nn.inputs[4]*self.scaled_parameters[5]) +\(m.nn.inputs[9] * self.scaled_parameters[2]+m.nn.inputs[5]*self.scaled_parameters[6]) +\(m.nn.inputs[10] * self.scaled_parameters[3]+m.nn.inputs[6]*self.scaled_parameters[7])<=\m.nn.inputs[3] *self.scaled_parameters[4]+self.env.grid.exchange_ability))m.obj = pyo.Objective(expr=(m.nn.outputs[0]), sense=pyo.maximize)pyo.SolverFactory('gurobi').solve(m, tee=False)best_input = pyo.value(m.nn.inputs[:])best_action = (best_input[self.state_dim::])return best_action# define test function
if __name__ == '__main__':args = Arguments()'''here record real unbalance'''reward_record = {'episode': [], 'steps': [], 'mean_episode_reward': [], 'unbalance': [],'episode_operation_cost': []}loss_record = {'episode': [], 'steps': [], 'critic_loss': [], 'actor_loss': [], 'entropy_loss': []}args.visible_gpu = '2'for seed in args.random_seed_list:args.random_seed = seed# set different seedargs.agent = AgentMIPDQN()agent_name = f'{args.agent.__class__.__name__}'args.agent.cri_target = Trueargs.env = ESSEnv()args.init_before_training(if_main=True)'''init agent and environment'''agent = args.agentenv = args.envagent.init(args.net_dim, env.state_space.shape[0], env.action_space.shape[0], args.learning_rate,args.if_per_or_gae)'''init replay buffer'''buffer = ReplayBuffer(max_len=args.max_memo, state_dim=env.state_space.shape[0],action_dim=env.action_space.shape[0])'''start training'''cwd = args.cwdgamma = args.gammabatch_size = args.batch_size  # how much data should be used to update nettarget_step = args.target_step  # how manysteps of one episode should stoprepeat_times = args.repeat_times  # how many times should update for one batch size datasoft_update_tau = args.soft_update_tauagent.state = env.reset()'''collect data and train and update network'''num_episode = args.num_episodeargs.train=Trueargs.save_network=True# 自动记录每集的 reward、loss、unbalance 等wandb.init(project='MIP_DQN_experiments',name=args.run_name,settings=wandb.Settings(start_method="fork"))wandb.config = {"epochs": num_episode,"batch_size": batch_size}wandb.define_metric('custom_step')if args.train:collect_data = Truewhile collect_data:print(f'buffer:{buffer.now_len}')with torch.no_grad():trajectory = agent.explore_env(env, target_step)steps, r_exp = update_buffer(trajectory)buffer.update_now_len()if buffer.now_len >= 10000:collect_data = Falsefor i_episode in range(num_episode):critic_loss, actor_loss = agent.update_net(buffer, batch_size, repeat_times, soft_update_tau)wandb.log({'critic loss':critic_loss,'custom_step':i_episode})wandb.log({'actor loss': actor_loss,'custom_step':i_episode})loss_record['critic_loss'].append(critic_loss)loss_record['actor_loss'].append(actor_loss)with torch.no_grad():episode_reward, episode_unbalance, episode_operation_cost = get_episode_return(env, agent.act,agent.device)wandb.log({'mean_episode_reward': episode_reward,'custom_step':i_episode})wandb.log({'unbalance':episode_unbalance,'custom_step':i_episode})wandb.log({'episode_operation_cost':episode_operation_cost,'custom_step':i_episode})reward_record['mean_episode_reward'].append(episode_reward)reward_record['unbalance'].append(episode_unbalance)reward_record['episode_operation_cost'].append(episode_operation_cost)print(f'curren epsiode is {i_episode}, reward:{episode_reward},unbalance:{episode_unbalance},buffer_length: {buffer.now_len}')if i_episode % 10 == 0:# target_stepwith torch.no_grad():agent._update_exploration_rate(args.explorate_decay,args.explorate_min)trajectory = agent.explore_env(env, target_step)steps, r_exp = update_buffer(trajectory)wandb.finish()if args.update_training_data:loss_record_path = f'{args.cwd}/loss_data.pkl'reward_record_path = f'{args.cwd}/reward_data.pkl'with open(loss_record_path, 'wb') as tf:pickle.dump(loss_record, tf)with open(reward_record_path, 'wb') as tf:pickle.dump(reward_record, tf)act_save_path = f'{args.cwd}/actor.pth'cri_save_path = f'{args.cwd}/critic.pth'print('training data have been saved')if args.save_network:# 模型保存与结果存储torch.save(agent.act.state_dict(), act_save_path)torch.save(agent.cri.state_dict(), cri_save_path)print('training finished and actor and critic parameters have been saved')

参数设置函数:Parameters

1、电池参数(battery_parameters)

battery_parameters = {'capacity': 500,           # 电池总容量(kWh)'max_charge': 100,         # 最大充电功率(kW)'max_discharge': 100,      # 最大放电功率(kW)'efficiency': 0.9,         # 充放电效率(90%)'degradation': 0,          # 电池退化成本(€/kW,未启用)'max_soc': 0.8,            # 最大SOC(80%)'min_soc': 0.2,            # 最小SOC(20%)'initial_capacity': 0.2    # 初始SOC(20%)
}

参数解释:

参数名含义用途
capacity电池的总电量容量(单位 kWh)用于计算 SOC 的绝对值
max_charge每小时最大充电功率(kW)限制动作空间中充电方向的动作上限
max_discharge每小时最大放电功率(kW)限制动作空间中放电方向的动作下限
efficiency充/放电的能量转换效率影响 SOC 更新公式,通常 < 1
degradation每单位放电引起的电池退化成本可选项,未启用
max_socSOC 最大值(占比)建模约束,防止过充(如 0.8×500 kWh)
min_socSOC 最小值(占比)建模约束,防止过放
initial_capacity初始时刻的 SOC(占比)环境初始化状态使用

在 MIP-DQN 中的作用:

  • 在状态空间中,SOC 是环境的一个组成变量;
  • 在动作空间中,充放电功率是 agent 决策的一部分;
  • 在约束中,必须满足:
    • min_soc × capacity ≤ SOC_t ≤ max_soc × capacity
    • -max_discharge ≤ P_battery_t ≤ max_charge
    • SOC_{t+1} = SOC_t + η × P_battery_t × Δt / capacity

2、发电机参数(dg_parameters

结构是一个字典嵌套字典,每一个 key (如 'gen_1')代表一个 DG 单元,value 是该 DG 的参数。

dg_parameters = {'gen_1': {...},'gen_2': {...},'gen_3': {...}
}

参数结构(以 gen_1 为例):

{'a': 0.0034,         # 成本函数二次项系数'b': 3,              # 成本函数一次项系数'c': 30,             # 成本函数常数项'd': 0.03,           # 热电参数(未使用)'e': 4.2,'f': 0.031,'power_output_max': 150,   # 最大出力(kW)'power_output_min': 0,     # 最小出力(kW)'heat_output_max': None,   # 若为热电联产系统使用(未启用)'heat_output_min': None,'ramping_up': 100,         # 每小时最大爬坡(上升)能力(kW/h)'ramping_down': 100,       # 每小时最大爬坡(下降)能力(kW/h)'min_up': 2,               # 最小连续开机时间(小时)'min_down': 1              # 最小连续关机时间(小时)
}

成本函数定义:

发电成本函数为二次型:

C_{DG}(P) = a × P^2 + b × P + c

gen_1 为例:

C_{DG_1}(P) = 0.0034 × P^2 + 3 × P + 30

这在论文中公式 (2) 中有体现。

约束相关参数:

参数名含义用途
power_output_max/min发电出力上下限控制动作空间边界
ramping_up/down爬坡约束限制连续两个时刻出力差值
min_up/down连续开/关机约束状态转换约束(若考虑启停状态)

注意:最小启停时间在本文中未显式建模,若要考虑,需要引入二进制变量进行建模(Unit Commitment 问题)。

参考

http://www.dtcms.com/a/288181.html

相关文章:

  • [spring6: IntroductionAdvisor IntroductionInterceptor]-源码分析
  • C++编程学习(第11天)
  • Patch-wise Structural:一种引入局部统计特性的时序预测损失函数
  • eNSP综合实验(DNCP、NAT、TELET、HTTP、DNS)
  • 定时器中BDTR死区时间和刹车功能配置
  • debian的pulseaudio删掉也没事
  • Go语言pprof性能分析指南
  • SIMATIC WinCC Unified 使用 KPI 优化流程
  • 永磁同步电机无速度算法--脉振正弦注入法
  • Kakfa集群部署及主题创建
  • haproxy七层代理
  • day7--绑定媒资、课程发布
  • kafka--基础知识点--6--AR、ISR、OSR
  • Mysql系列--3、数据类型
  • RTDETR融合DECS-Net中的FFM模块
  • Verilog *2* SPI-立创逻辑派G1测试-1
  • 多表查询-8-练习总结
  • 【LeetCode 热题 100】437. 路径总和 III——(解法一)递归递归!
  • 【Linux】mmap的介绍和使用
  • [硬件电路-36]:模拟电路的基本组成要素以及模拟信号处理
  • Python条件控制艺术:侦探破解犯罪谜题逻辑
  • 浏览器渲染原理——计算属性和布局过程常考内容
  • 如何实现一个定时任务
  • LibreTv在线观影项目部署开箱即用
  • 如何解决Flink CDC同步时间类型字段8小时时间差的问题,以MySQL为例
  • 相似度度量方法
  • 车载刷写框架 --- 关于私有节点刷写失败未报引起的反思
  • 暑期算法训练.4
  • 用虚拟机体验纯血鸿蒙所有机型!
  • 【成品设计】基于STM32的水资源监控系列项目