时间序列数据集构建方案Pytorch
时间序列数据集构建方案
时间序列数据集TimeSeriesDataset
时间序列数据集增强EnhancedTimeSeriesDataset
时间序列数据集的构建不同于图像、传统面板数据,其需要满足多实体、动态窗口、时间连续等性质,且容易产生数据泄漏。本文介绍了一种时间序列数据集的构建方法,可以高效地产生训练数据。
一、应用场景
1.1 典型场景示例
(1)零售销量预测
-
场景描述:同时预测某连锁零售企业1000家门店、5000种商品未来7天的每日销量
-
数据特征:
-
时间维度:3年历史数据(1095天)
-
特征维度:销量、折扣力度、天气数据、节假日标记
-
实体维度:store_id(1000个) × product_id(5000个)= 500万时间序列
-
(2)能源需求预测
-
场景描述:预测某省级电网100个变电站未来24小时的电力负荷
-
数据特征:
-
时间粒度:每小时数据
-
特征维度:历史负荷、温度、湿度、工作日标记
-
实体维度:100个变电站 × 3种电压等级 = 300个时间序列
-
(3)服务器集群负载预测
-
场景描述:预测某数据中心500台服务器未来30分钟的CPU使用率
-
数据特征:
-
时间粒度:5分钟间隔
-
特征维度:CPU使用率、内存占用、网络流量
-
实体维度:500台独立服务器
-
二、场景核心难点
2.1 多实体并行处理
-
挑战:
-
每个实体(store+product组合)形成独立时间序列
-
需要保持各序列的独立性同时实现批量处理
-
示例:500万时间序列需高效管理内存和计算资源
-
-
解决方案:
-
使用分组索引机制(对数据集按照实体id+time column排序,groups列表存储各实体的数据区间),通过预排序和建立索引可以快速定位每个实体的数据slice
-
预计算实体边界(start/end索引),避免重复groupby操作
-
2.2 动态窗口切分
-
挑战:
-
不同序列长度差异大(新商品可能只有30天数据)
-
需要同时满足:
-
最小训练长度要求(min_encoder_length + min_decoder_length)
-
数据增强需求(随机采样不同历史窗口),其他数据增强策略比如反事实推断/时序随机波动等,可以在__getitem__中实现
-
-
-
解决方案:
-
动态计算可用窗口范围:max_L = remaining - D
-
双重随机采样机制:
D = random.randint(min_decoder, max_decoder) # decoder长度随机 L = random.randint(min_encoder, max_L) # encoder长度随机
-
2.3 变长序列处理
-
挑战:
-
不同样本的encoder/decoder长度不同
-
需要统一为固定维度张量进行批量处理,并使用mask标记
-
-
解决方案:
-
双端独立padding机制:
# Encoder padding padded_encoder = np.full((max_encoder_length, features), padding_value) padded_encoder[:L] = actual_data# Decoder padding padded_decoder = np.full((max_decoder_length, features), padding_value)
-
配套MASK机制标识有效数据:
encoder_mask = [1]*L + [0]*(max_length-L)
-
2.4 时间连续性保证
-
挑战:
-
随机采样可能破坏时间序列连续性,时序预测需要保证每个实体的每个时间片段是连续的,encoder代表历史,decoder代表未来
-
需防止数据泄露(未来信息混入历史数据)
-
-
解决方案:
- 严格的时间窗口划分:
|---- encoder ----|---- decoder ----|t t+L t+L+D
-
三重验证机制:
-
remaining = T - t 检查剩余长度
-
t + L + D <= T 最终边界检查
-
特征分离:确保decoder不使用未来特征
-
关键步骤:
-
排序分组:
-
按group_ids + time_col排序保证时间连续性
-
示例:df.sort_values([‘store_id’,‘product_id’,‘target_date’])
-
-
静态特征编码:
-
使用LabelEncoder对类别特征进行数字化
-
存储编码器供后续使用:static_encoders字典
-
-
分组索引构建:
-
遍历数据记录实体变化点
-
存储三元组(group_key, start_idx, end_idx)
-
-
特征索引映射:
-
建立特征名称到列索引的映射
-
示例:encoder_features = [‘sale’,‘discount’] → indices [0,1]
-
-
样本生成:
- 在每个实体的时间序列上抽样时间切片slice
- 格式group_id, start_idx, encoder_length, decoder_length,在group_id上切片-
- encoder_slice=[start_idx:start_idx+encoder_length]
- decoder_slice=[start_idx+encoder_length:start_idx+encoder_length+decoder_length]
- 时间切片的个数就是样本数(len),在 getitem 函数通过样本生成的切片索引切分出训练数据
- 最后进行动态mask
三、代码
import torch
from torch.utils.data import Dataset
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import randomclass TimeSeriesDataset(Dataset):def __init__(self,data: pd.DataFrame,time_col: str = "target_date",target_col: str = "sale_amount",group_ids: list = ["store_id", "product_id"],encoder_features: list = ["sale_amount", "discount", "precipitation", "temperature"],decoder_features: list = ["discount", "precipitation", "temperature"],static_features: list = ["store_id", "product_id"],max_encoder_length: int = 35,min_encoder_length: int = 14,max_decoder_length: int = 7,min_decoder_length: int = 7,num_samples_per_step: int = 1,padding_value: float = 0.0,):"""优化版时间序列数据集类参数新增:num_samples_per_step: 每个时间步生成的样本数max_decoder_length: decoder最大长度min_decoder_length: decoder最小长度"""super().__init__()# 参数校验assert max_encoder_length >= min_encoder_length, "encoder长度参数非法"assert max_decoder_length >= min_decoder_length, "decoder长度参数非法"assert num_samples_per_step >= 1, "样本数需≥1"# 存储参数self.time_col = time_colself.target_col = target_colself.group_ids = group_idsself.encoder_features = encoder_featuresself.decoder_features = decoder_featuresself.static_features = static_featuresself.max_encoder_length = max_encoder_lengthself.min_encoder_length = min_encoder_lengthself.max_decoder_length = max_decoder_lengthself.min_decoder_length = min_decoder_lengthself.num_samples_per_step = num_samples_per_stepself.padding_value = padding_value# 预处理数据self._preprocess_data(data)# 生成样本索引self.samples = self._generate_samples()def _preprocess_data(self, data):"""数据预处理"""# 排序并建立索引self.data = data.sort_values(self.group_ids + [self.time_col]).reset_index(drop=True)# 编码静态特征self.static_encoders = {}for col in self.static_features:le = LabelEncoder()self.data[col] = le.fit_transform(self.data[col])self.static_encoders[col] = le# 构建分组索引self.groups = []current_group = Nonestart_idx = 0group_values = self.data[self.group_ids].valuesfor idx in range(len(group_values)):group_key = tuple(group_values[idx])if group_key != current_group:if current_group is not None:self.groups.append( (current_group, start_idx, idx) )current_group = group_keystart_idx = idxself.groups.append( (current_group, start_idx, len(group_values)) )# 特征索引映射self.all_features = list(set(self.encoder_features + self.decoder_features))self.encoder_feature_indices = [self.all_features.index(col) for col in self.encoder_features]self.decoder_feature_indices = [self.all_features.index(col) for col in self.decoder_features]def _generate_samples(self):"""样本生成"""samples = []# 按预存索引访问分组数据for group_key, start, end in self.groups:dynamic_values = self.data[self.all_features].iloc[start:end].valuestarget_values = self.data[self.target_col].iloc[start:end].valuesstatic_values = self.data[self.static_features].iloc[start].valuesT = end - start # 时间序列长度# 生成有效时间窗口min_required = self.min_encoder_length + self.min_decoder_lengthif T < min_required:continue# 滑动窗口生成多个样本for t in range(T - min_required + 1):# 每个时间步生成多个样本for _ in range(self.num_samples_per_step):# 动态计算最大可用长度remaining = T - t# 随机选择decoder长度max_D = min(remaining - self.min_encoder_length, self.max_decoder_length)if max_D < self.min_decoder_length:continueD = random.randint(self.min_decoder_length, max_D)# 根据D确定encoder长度范围max_L = remaining - Dif max_L < self.min_encoder_length:continueL = random.randint(self.min_encoder_length, max_L)# 有效性验证if t + L + D > T:continuesamples.append({"group_id": group_key,"start_idx": t,"encoder_length": L,"decoder_length": D,"static": static_values,"dynamics": dynamic_values,"targets": target_values,})return samplesdef __len__(self):return len(self.samples)def __getitem__(self, idx):sample = self.samples[idx]L = sample["encoder_length"]D = sample["decoder_length"]t = sample["start_idx"]# Encoder部分处理encoder_data = sample["dynamics"][t:t+L, self.encoder_feature_indices]padded_encoder = np.full((self.max_encoder_length, len(self.encoder_features)), self.padding_value)padded_encoder[:L] = encoder_dataencoder_mask = np.zeros(self.max_encoder_length)encoder_mask[:L] = 1.0# Decoder部分处理decoder_start = t + Ldecoder_end = decoder_start + Ddecoder_data = sample["dynamics"][decoder_start:decoder_end, self.decoder_feature_indices]padded_decoder = np.full((self.max_decoder_length, len(self.decoder_features)), self.padding_value)padded_decoder[:D] = decoder_datadecoder_mask = np.zeros(self.max_decoder_length)decoder_mask[:D] = 1.0# 目标值处理target = sample["targets"][decoder_start:decoder_end]padded_target = np.full(self.max_decoder_length, self.padding_value)padded_target[:D] = targetreturn {# Encoder部分"encoder_input": torch.FloatTensor(padded_encoder),"encoder_mask": torch.FloatTensor(encoder_mask),# Decoder部分"decoder_input": torch.FloatTensor(padded_decoder),"decoder_mask": torch.FloatTensor(decoder_mask),# 目标值"target": torch.FloatTensor(padded_target),# 静态特征"static_features": torch.LongTensor(sample["static"]),# 实际长度(可选)"actual_lengths": (L, D)}# 使用示例
if __name__ == "__main__":# 生成测试数据dates = pd.date_range(start="2023-01-01", periods=90, name="target_date")example_data = pd.DataFrame({"store_id": np.repeat([1, 2], 45),"product_id": np.repeat([101, 102], 45),"target_date": np.tile(dates, 2),"sale_amount": np.random.randint(0, 100, 180),"discount": np.random.rand(180),"precipitation": np.random.rand(180),"temperature": np.random.rand(180),})# 初始化数据集dataset = TimeSeriesDataset(data=example_data,max_encoder_length=35,min_encoder_length=14,max_decoder_length=14, # 扩展decoder长度范围min_decoder_length=7,num_samples_per_step=3, # 每个时间步生成3个样本)# 验证样本sample = dataset[0]print(f"Encoder输入形状: {sample['encoder_input'].shape}")print(f"Decoder输入形状: {sample['decoder_input'].shape}")print(f"有效Decoder长度: {sample['actual_lengths'][1]}")print(f"Mask矩阵示例:\nEncoder: {sample['encoder_mask'][:5]}\nDecoder: {sample['decoder_mask'][:5]}")