储粮温度预测新方案!FEBL模型用代码实现:LSTM+注意力+岭回归的完整流程
特征增强型宽度学习(FEBL)的模型,用于储粮温度预测任务。以下是代码的逐层解析和功能说明:
1. 依赖库导入
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import time
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
- PyTorch:用于构建神经网络模型。
- NumPy:处理数值计算。
- scikit-learn:用于数据归一化(
MinMaxScaler
)和数据集划分(train_test_split
)。
2. 数据处理类 GrainData
class GrainData:
"""储粮数据处理类,包括数据生成、预处理和划分"""
功能:
- 生成模拟数据:
load_data
方法生成(num_samples, num_features)
的数据,前4个特征模拟粮温随时间变化的时序数据。 - 创建时序数据集:
create_dataset
方法将原始数据转换为滑动窗口格式(每个样本包含seq_length
个时间步,标签为下一个时间步的温度)。 - 数据预处理:
preprocess
方法对数据进行归一化处理(MinMaxScaler
),并划分为训练集和测试集。
关键代码段:
# 生成数据
X = np.random.rand(num_samples, num_features)
for i in range(1, num_samples):X[i, :4] = X[i-1, :4] + np.random.normal(0, 0.1)# 创建时序数据集
def create_dataset(data, seq_length):X, y = [], []for i in range(len(data)-seq_length):X.append(data[i:i+seq_length])y.append(data[i+seq_length, 0]) # 假设预测第一个特征return np.array(X), np.array(y)
3. 特征增强模块 FeatureEnhancer
class FeatureEnhancer(nn.Module):def __init__(self, input_size, hidden_size, num_heads):super().__init__()self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)self.attention = nn.MultiheadAttention(embed_dim=hidden_size, num_heads=num_heads)
功能:
- LSTM:提取时序特征。
- 多头自注意力(MultiheadAttention):增强关键特征(如温度变化的关联性)。
- 输出处理:取最后一个时间步的特征,通过线性层调整维度。
关键代码段:
def forward(self, x):# LSTM提取时序特征out, _ = self.lstm(x)# 多头自注意力增强attention_out, _ = self.attention(out, out, out)# 取最后一个时间步的特征last_feature = attention_out[:, -1, :]return last_feature
4. 宽度学习系统(BLS)模块 BroadLearningSystem
class BroadLearningSystem(nn.Module):def __init__(self, input_size, feature_nodes, enhancement_nodes):super().__init__()self.feature_layer = nn.Linear(input_size, feature_nodes)self.enhancement_layer = nn.Linear(feature_nodes, enhancement_nodes)
功能:
- 特征节点:通过线性变换生成高维特征(
feature_nodes
)。 - 增强节点:进一步扩展特征维度(
enhancement_nodes
),增强模型表达能力。
关键代码段:
def forward(self, x):# 特征节点feature_out = torch.tanh(self.feature_layer(x))# 增强节点enhancement_out = torch.tanh(self.enhancement_layer(feature_out))return torch.cat([feature_out, enhancement_out], dim=1)
5. 整合模型 FEBLModel
class FEBLModel(nn.Module):def __init__(self, config):super().__init__()self.feature_extractor = FeatureEnhancer(config['input_size'], config['lstm_hidden'], config['num_heads'])self.bls = BroadLearningSystem(config['lstm_hidden'], config['feature_nodes'], config['enhancement_nodes'])
功能:
- 组合模块:将
FeatureEnhancer
和BLS
模块组合在一起。 - 岭回归输出:使用
compute_ridge_weights
方法计算最终输出层的权重(基于训练集特征矩阵和标签)。
关键代码段:
def compute_ridge_weights(self, A_train, y_train):# 岭回归计算权重I = torch.eye(A_train.size(1))A_pinv = torch.linalg.pinv(A_train.T @ A_train + self.lambda_reg * I) @ A_train.Tself.W_m = A_pinv @ y_train
6. 训练与评估流程 train_and_evaluate
def train_and_evaluate():# 配置参数config = {'input_size': 8,'seq_length': 30,'lstm_hidden': 128,'num_heads': 2,'feature_nodes': 40,'enhancement_nodes': 400}# 数据预处理data_handler = GrainData(seq_length=config['seq_length'])X_train, X_test, y_train, y_test = data_handler.preprocess()# 模型初始化model = FEBLModel(config)# 预训练特征增强器optimizer = optim.Adam(model.feature_extractor.parameters(), lr=0.001)criterion = nn.MSELoss()for epoch in range(50):features = model.feature_extractor(X_train)A = model.bls(features)outputs = temp_fc(A) # 临时全连接层loss = criterion(outputs, y_train)optimizer.zero_grad()loss.backward()optimizer.step()# 计算BLS输出权重with torch.no_grad():A_train = model.bls(model.feature_extractor(X_train))model.compute_ridge_weights(A_train, y_train)# 测试集评估y_pred = model(X_test)mae = torch.mean(torch.abs(y_pred - y_test)).item()rmse = torch.sqrt(torch.mean((y_pred - y_test) ** 2)).item()mape = torch.mean(torch.abs((y_pred - y_test) / y_test)).item() * 100print(f"MAE: {mae:.4f}, RMSE: {rmse:.4f}, MAPE: {mape:.2f}%")
7. 模型特点总结
模块 | 功能 |
---|---|
LSTM | 提取时序特征(如温度变化的长期依赖关系) |
多头自注意力 | 增强关键特征(如温度波动的关联性) |
BLS(宽度学习系统) | 通过特征节点和增强节点扩展模型容量 |
岭回归 | 避免过拟合,稳定输出权重 |
完整代码如下
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import time
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_splitclass GrainData:"""储粮数据处理类,包括数据生成、预处理和划分"""def __init__(self, seq_length=30):"""初始化数据处理对象:param seq_length: 时序窗口大小"""self.seq_length = seq_lengthself.scaler = MinMaxScaler(feature_range=(0, 1))def load_data(self, num_samples=800, num_features=8):"""生成模拟储粮数据:param num_samples: 样本数量:param num_features: 特征数量:return: 模拟数据数组"""# 生成基础数据 (范围15-45度)data = np.random.rand(num_samples, num_features) * 30 + 15# 添加时间相关性:温度趋势变化for i in range(1, num_samples):# 前4个特征(粮食温度)随时间变化,添加随机波动data[i, :4] = data[i - 1, :4] + np.random.randn(4) * 0.5return datadef create_dataset(self, data):"""创建用于训练的时序数据集:param data: 原始数据:return: (特征集, 标签集)"""X, y = [], []# 创建滚动窗口数据集for i in range(len(data) - self.seq_length - 1):X.append(data[i:i + self.seq_length]) # 窗口序列y.append(data[i + self.seq_length, 0]) # 预测下一时刻的粮温return np.array(X), np.array(y)def preprocess(self):"""数据预处理:归一化并划分训练/测试集:return: (X_train, X_test, y_train, y_test)"""# 1. 加载数据data = self.load_data()# 2. 归一化处理scaled_data = self.scaler.fit_transform(data)# 3. 创建数据集X, y = self.create_dataset(scaled_data)# 4. 划分训练/测试集 (按时间顺序划分)return train_test_split(X, y, test_size=0.2, shuffle=False)class FeatureEnhancer(nn.Module):"""特征增强器:LSTM + 多头自注意力机制"""def __init__(self, input_size, hidden_size, num_heads):"""初始化特征增强器:param input_size: 输入特征维度:param hidden_size: LSTM隐藏单元数量:param num_heads: 注意力头部数量"""super().__init__()# LSTM层提取时序特征self.lstm = nn.LSTM(input_size=input_size,hidden_size=hidden_size,batch_first=True)# 多头自注意力机制强化关键特征self.attention = nn.MultiheadAttention(embed_dim=hidden_size,num_heads=num_heads,batch_first=True)# 维度调整层确保输出一致self.linear_adjust = nn.Linear(hidden_size, hidden_size)def forward(self, x):"""前向传播:param x: 输入数据 (batch, seq_len, input_size):return: 增强后的特征 (batch, hidden_size)"""# 1. LSTM处理时序特征lstm_out, _ = self.lstm(x)# 2. 多头自注意力强化重要特征# 输入维度检查:确保是3维(batch, seq, features)if lstm_out.dim() == 2:lstm_out = lstm_out.unsqueeze(0)attn_output, _ = self.attention(lstm_out, lstm_out, lstm_out)# 3. 取最后一个时间步的特征 + 维度调整return self.linear_adjust(attn_output[:, -1, :])class BroadLearningSystem(nn.Module):"""宽度学习系统:特征节点与增强节点"""def __init__(self, input_size, feature_nodes, enhancement_nodes):"""初始化BLS系统:param input_size: 输入特征维度:param feature_nodes: 特征节点数量:param enhancement_nodes: 增强节点数量"""super().__init__()# 随机初始化权重和偏置self.W_e = torch.randn(input_size, feature_nodes) * 2 - 1self.bias_e = torch.randn(1, feature_nodes) * 2 - 1self.W_h = torch.randn(feature_nodes, enhancement_nodes) * 2 - 1self.bias_h = torch.randn(1, enhancement_nodes) * 2 - 1def forward(self, x):"""前向传播:param x: 输入特征 (batch, input_size):return: 特征+增强节点 (batch, feature_nodes+enhancement_nodes)"""# 1. 特征节点: 非线性变换Z = torch.tanh(torch.mm(x, self.W_e) + self.bias_e)# 2. 增强节点: 非线性变换H = torch.tanh(torch.mm(Z, self.W_h) + self.bias_h)# 3. 级联作为最终特征矩阵return torch.cat((Z, H), dim=1)class FEBLModel(nn.Module):"""特征增强型宽度学习模型 (FEBL)"""def __init__(self, config):"""初始化FEBL模型:param config: 配置字典"""super().__init__()# 特征增强器self.feature_enhancer = FeatureEnhancer(config['input_size'],config['lstm_hidden'],config['num_heads'])# 宽度学习系统self.bls = BroadLearningSystem(config['lstm_hidden'],config['feature_nodes'],config['enhancement_nodes'])# 输出权重 (通过岭回归计算)self.W_m = Nonedef forward(self, x):"""前向传播:param x: 输入数据:return: 预测结果"""# 1. 特征增强features = self.feature_enhancer(x)# 2. 维度统一: 确保为(batch, features)形式if features.dim() > 2:features = features.squeeze(0)# 3. 宽度学习A = self.bls(features)# 4. 输出预测 (如果权重已计算)return torch.mm(A, self.W_m) if self.W_m is not None else Adef compute_ridge_weights(self, A, Y, lambda_reg=1e-10):"""岭回归计算输出权重:param A: 特征矩阵:param Y: 标签:param lambda_reg: 正则化系数"""# 确保转换为PyTorch张量if not isinstance(A, torch.Tensor):A = torch.tensor(A, dtype=torch.float32)if not isinstance(Y, torch.Tensor):Y = torch.tensor(Y, dtype=torch.float32)# 关键检查1: 样本数量匹配if A.size(0) != Y.size(0):raise ValueError(f"特征矩阵A有{A.size(0)}个样本, 但标签Y有{Y.size(0)}个样本")# 关键检查2: 矩阵维度一致if Y.dim() == 1:Y = Y.unsqueeze(1)# 1. 计算伪逆 (岭回归)I = torch.eye(A.size(1))A_pinv = torch.linalg.pinv(A.T @ A + lambda_reg * I) @ A.T# 2. 计算输出权重 (确保维度一致)self.W_m = A_pinv @ Y# 3. 维度检查: 确保预测维度正确if self.W_m.size(0) != A.size(1) or self.W_m.size(1) != Y.size(1):raise RuntimeError(f"权重矩阵维度错误: W_m {self.W_m.shape}, A {A.shape}, Y {Y.shape}")def train_and_evaluate():"""模型训练与评估流程"""# 配置参数config = {'input_size': 8, # 输入特征数量'seq_length': 30, # 时序窗口大小'lstm_hidden': 128, # LSTM隐藏单元'num_heads': 2, # 注意力头数'feature_nodes': 40, # BLS特征节点数'enhancement_nodes': 400 # BLS增强节点数}print("开始数据预处理...")data_handler = GrainData(seq_length=config['seq_length'])X_train, X_test, y_train, y_test = data_handler.preprocess()# 转换为PyTorch张量并调整维度X_train = torch.tensor(X_train).float()y_train = torch.tensor(y_train).float().unsqueeze(1) # 添加第二维度X_test = torch.tensor(X_test).float()y_test = torch.tensor(y_test).float().unsqueeze(1)print(f"训练集维度: X_train {X_train.shape}, y_train {y_train.shape}")print(f"测试集维度: X_test {X_test.shape}, y_test {y_test.shape}")# 初始化模型model = FEBLModel(config)print(f"模型初始化完成: 特征节点 {config['feature_nodes']} + 增强节点 {config['enhancement_nodes']}")# === 阶段1: 预训练特征增强器 ===print("\n开始预训练特征增强器 (50轮)...")optimizer = optim.Adam(model.feature_enhancer.parameters(), lr=0.001)criterion = nn.MSELoss()# 添加临时输出层bls_output_dim = config['feature_nodes'] + config['enhancement_nodes']temp_fc = nn.Linear(bls_output_dim, 1)start_time = time.time()for epoch in range(50):# 前向传播features = model.feature_enhancer(X_train)A = model.bls(features)outputs = temp_fc(A)# 损失计算loss = criterion(outputs, y_train)# 反向传播optimizer.zero_grad()loss.backward()optimizer.step()if (epoch + 1) % 10 == 0:print(f"Epoch [{epoch + 1}/50], Loss: {loss.item():.6f}, Time: {time.time() - start_time:.1f}s")# === 阶段2: 计算BLS输出权重 ===print("\n计算BLS岭回归权重...")with torch.no_grad():# 1. 获取训练集特征矩阵train_features = model.feature_enhancer(X_train)A_train = model.bls(train_features)# 2. 维度检查print(f"训练特征矩阵维度: A_train {A_train.shape}, y_train {y_train.shape}")if A_train.shape[0] != y_train.shape[0]:print(f"警告: 样本数量不匹配 A_train {A_train.shape[0]} vs y_train {y_train.shape[0]}")# 3. 岭回归计算权重try:model.compute_ridge_weights(A_train, y_train)print(f"权重计算成功: W_m维度 {model.W_m.shape}")except Exception as e:print(f"权重计算失败: {e}")return None# 4. 测试集预测test_features = model.feature_enhancer(X_test)A_test = model.bls(test_features)print(f"测试特征矩阵维度: A_test {A_test.shape}")y_pred = model(X_test)# 5. 性能评估mae = torch.mean(torch.abs(y_pred - y_test)).item()rmse = torch.sqrt(torch.mean((y_pred - y_test) ** 2)).item()mape = torch.mean(torch.abs((y_pred - y_test) / y_test)).item() * 100print("\n测试结果:")print(f"MAE: {mae:.4f}")print(f"RMSE: {rmse:.4f}")print(f"MAPE: {mape:.2f}%")print(f"总训练时间: {time.time() - start_time:.2f}秒")return modelif __name__ == "__main__":print("=" * 60)print("特征增强型宽度学习 (FEBL) 粮温预测模型")print("=" * 60)model = train_and_evaluate()