当前位置: 首页 > news >正文

时间序列数据集构建方案Pytorch

时间序列数据集构建方案

时间序列数据集TimeSeriesDataset
时间序列数据集增强EnhancedTimeSeriesDataset

时间序列数据集的构建不同于图像、传统面板数据,其需要满足多实体、动态窗口、时间连续等性质,且容易产生数据泄漏。本文介绍了一种时间序列数据集的构建方法,可以高效地产生训练数据。

一、应用场景

1.1 典型场景示例

(1)零售销量预测

  • 场景描述:同时预测某连锁零售企业1000家门店、5000种商品未来7天的每日销量

  • 数据特征:

    • 时间维度:3年历史数据(1095天)

    • 特征维度:销量、折扣力度、天气数据、节假日标记

    • 实体维度:store_id(1000个) × product_id(5000个)= 500万时间序列

(2)能源需求预测

  • 场景描述:预测某省级电网100个变电站未来24小时的电力负荷

  • 数据特征:

    • 时间粒度:每小时数据

    • 特征维度:历史负荷、温度、湿度、工作日标记

    • 实体维度:100个变电站 × 3种电压等级 = 300个时间序列

(3)服务器集群负载预测

  • 场景描述:预测某数据中心500台服务器未来30分钟的CPU使用率

  • 数据特征:

    • 时间粒度:5分钟间隔

    • 特征维度:CPU使用率、内存占用、网络流量

    • 实体维度:500台独立服务器

二、场景核心难点

2.1 多实体并行处理

  • 挑战:

    • 每个实体(store+product组合)形成独立时间序列

    • 需要保持各序列的独立性同时实现批量处理

    • 示例:500万时间序列需高效管理内存和计算资源

  • 解决方案:

    • 使用分组索引机制(对数据集按照实体id+time column排序,groups列表存储各实体的数据区间),通过预排序和建立索引可以快速定位每个实体的数据slice

    • 预计算实体边界(start/end索引),避免重复groupby操作

2.2 动态窗口切分

  • 挑战:

    • 不同序列长度差异大(新商品可能只有30天数据)

    • 需要同时满足:

      • 最小训练长度要求(min_encoder_length + min_decoder_length)

      • 数据增强需求(随机采样不同历史窗口),其他数据增强策略比如反事实推断/时序随机波动等,可以在__getitem__中实现

  • 解决方案:

    • 动态计算可用窗口范围:max_L = remaining - D

    • 双重随机采样机制:

    D = random.randint(min_decoder, max_decoder)  # decoder长度随机
    L = random.randint(min_encoder, max_L)        # encoder长度随机
    

2.3 变长序列处理

  • 挑战:

    • 不同样本的encoder/decoder长度不同

    • 需要统一为固定维度张量进行批量处理,并使用mask标记

  • 解决方案:

    • 双端独立padding机制:

      # Encoder padding
      padded_encoder = np.full((max_encoder_length, features), padding_value)
      padded_encoder[:L] = actual_data# Decoder padding
      padded_decoder = np.full((max_decoder_length, features), padding_value)
      
    • 配套MASK机制标识有效数据:

         encoder_mask = [1]*L + [0]*(max_length-L)
      

2.4 时间连续性保证

  • 挑战:

    • 随机采样可能破坏时间序列连续性,时序预测需要保证每个实体的每个时间片段是连续的,encoder代表历史,decoder代表未来

    • 需防止数据泄露(未来信息混入历史数据)

  • 解决方案:

    • 严格的时间窗口划分:
      |---- encoder ----|---- decoder ----|t               t+L              t+L+D
    
    • 三重验证机制:

      • remaining = T - t 检查剩余长度

      • t + L + D <= T 最终边界检查

      • 特征分离:确保decoder不使用未来特征

      原始数据
      排序分组
      静态特征编码
      构建分组索引
      特征索引映射

关键步骤:

  • 排序分组:

    • 按group_ids + time_col排序保证时间连续性

    • 示例:df.sort_values([‘store_id’,‘product_id’,‘target_date’])

  • 静态特征编码:

    • 使用LabelEncoder对类别特征进行数字化

    • 存储编码器供后续使用:static_encoders字典

  • 分组索引构建:

    • 遍历数据记录实体变化点

    • 存储三元组(group_key, start_idx, end_idx)

  • 特征索引映射:

    • 建立特征名称到列索引的映射

    • 示例:encoder_features = [‘sale’,‘discount’] → indices [0,1]

  • 样本生成:

    • 在每个实体的时间序列上抽样时间切片slice
    • 格式group_id, start_idx, encoder_length, decoder_length,在group_id上切片-
      • encoder_slice=[start_idx:start_idx+encoder_length]
      • decoder_slice=[start_idx+encoder_length:start_idx+encoder_length+decoder_length]
    • 时间切片的个数就是样本数(len),在 getitem 函数通过样本生成的切片索引切分出训练数据
    • 最后进行动态mask

三、代码

import torch
from torch.utils.data import Dataset
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import randomclass TimeSeriesDataset(Dataset):def __init__(self,data: pd.DataFrame,time_col: str = "target_date",target_col: str = "sale_amount",group_ids: list = ["store_id", "product_id"],encoder_features: list = ["sale_amount", "discount", "precipitation", "temperature"],decoder_features: list = ["discount", "precipitation", "temperature"],static_features: list = ["store_id", "product_id"],max_encoder_length: int = 35,min_encoder_length: int = 14,max_decoder_length: int = 7,min_decoder_length: int = 7,num_samples_per_step: int = 1,padding_value: float = 0.0,):"""优化版时间序列数据集类参数新增:num_samples_per_step: 每个时间步生成的样本数max_decoder_length: decoder最大长度min_decoder_length: decoder最小长度"""super().__init__()# 参数校验assert max_encoder_length >= min_encoder_length, "encoder长度参数非法"assert max_decoder_length >= min_decoder_length, "decoder长度参数非法"assert num_samples_per_step >= 1, "样本数需≥1"# 存储参数self.time_col = time_colself.target_col = target_colself.group_ids = group_idsself.encoder_features = encoder_featuresself.decoder_features = decoder_featuresself.static_features = static_featuresself.max_encoder_length = max_encoder_lengthself.min_encoder_length = min_encoder_lengthself.max_decoder_length = max_decoder_lengthself.min_decoder_length = min_decoder_lengthself.num_samples_per_step = num_samples_per_stepself.padding_value = padding_value# 预处理数据self._preprocess_data(data)# 生成样本索引self.samples = self._generate_samples()def _preprocess_data(self, data):"""数据预处理"""# 排序并建立索引self.data = data.sort_values(self.group_ids + [self.time_col]).reset_index(drop=True)# 编码静态特征self.static_encoders = {}for col in self.static_features:le = LabelEncoder()self.data[col] = le.fit_transform(self.data[col])self.static_encoders[col] = le# 构建分组索引self.groups = []current_group = Nonestart_idx = 0group_values = self.data[self.group_ids].valuesfor idx in range(len(group_values)):group_key = tuple(group_values[idx])if group_key != current_group:if current_group is not None:self.groups.append( (current_group, start_idx, idx) )current_group = group_keystart_idx = idxself.groups.append( (current_group, start_idx, len(group_values)) )# 特征索引映射self.all_features = list(set(self.encoder_features + self.decoder_features))self.encoder_feature_indices = [self.all_features.index(col) for col in self.encoder_features]self.decoder_feature_indices = [self.all_features.index(col) for col in self.decoder_features]def _generate_samples(self):"""样本生成"""samples = []# 按预存索引访问分组数据for group_key, start, end in self.groups:dynamic_values = self.data[self.all_features].iloc[start:end].valuestarget_values = self.data[self.target_col].iloc[start:end].valuesstatic_values = self.data[self.static_features].iloc[start].valuesT = end - start  # 时间序列长度# 生成有效时间窗口min_required = self.min_encoder_length + self.min_decoder_lengthif T < min_required:continue# 滑动窗口生成多个样本for t in range(T - min_required + 1):# 每个时间步生成多个样本for _ in range(self.num_samples_per_step):# 动态计算最大可用长度remaining = T - t# 随机选择decoder长度max_D = min(remaining - self.min_encoder_length, self.max_decoder_length)if max_D < self.min_decoder_length:continueD = random.randint(self.min_decoder_length, max_D)# 根据D确定encoder长度范围max_L = remaining - Dif max_L < self.min_encoder_length:continueL = random.randint(self.min_encoder_length, max_L)# 有效性验证if t + L + D > T:continuesamples.append({"group_id": group_key,"start_idx": t,"encoder_length": L,"decoder_length": D,"static": static_values,"dynamics": dynamic_values,"targets": target_values,})return samplesdef __len__(self):return len(self.samples)def __getitem__(self, idx):sample = self.samples[idx]L = sample["encoder_length"]D = sample["decoder_length"]t = sample["start_idx"]# Encoder部分处理encoder_data = sample["dynamics"][t:t+L, self.encoder_feature_indices]padded_encoder = np.full((self.max_encoder_length, len(self.encoder_features)), self.padding_value)padded_encoder[:L] = encoder_dataencoder_mask = np.zeros(self.max_encoder_length)encoder_mask[:L] = 1.0# Decoder部分处理decoder_start = t + Ldecoder_end = decoder_start + Ddecoder_data = sample["dynamics"][decoder_start:decoder_end, self.decoder_feature_indices]padded_decoder = np.full((self.max_decoder_length, len(self.decoder_features)), self.padding_value)padded_decoder[:D] = decoder_datadecoder_mask = np.zeros(self.max_decoder_length)decoder_mask[:D] = 1.0# 目标值处理target = sample["targets"][decoder_start:decoder_end]padded_target = np.full(self.max_decoder_length, self.padding_value)padded_target[:D] = targetreturn {# Encoder部分"encoder_input": torch.FloatTensor(padded_encoder),"encoder_mask": torch.FloatTensor(encoder_mask),# Decoder部分"decoder_input": torch.FloatTensor(padded_decoder),"decoder_mask": torch.FloatTensor(decoder_mask),# 目标值"target": torch.FloatTensor(padded_target),# 静态特征"static_features": torch.LongTensor(sample["static"]),# 实际长度(可选)"actual_lengths": (L, D)}# 使用示例
if __name__ == "__main__":# 生成测试数据dates = pd.date_range(start="2023-01-01", periods=90, name="target_date")example_data = pd.DataFrame({"store_id": np.repeat([1, 2], 45),"product_id": np.repeat([101, 102], 45),"target_date": np.tile(dates, 2),"sale_amount": np.random.randint(0, 100, 180),"discount": np.random.rand(180),"precipitation": np.random.rand(180),"temperature": np.random.rand(180),})# 初始化数据集dataset = TimeSeriesDataset(data=example_data,max_encoder_length=35,min_encoder_length=14,max_decoder_length=14,  # 扩展decoder长度范围min_decoder_length=7,num_samples_per_step=3,  # 每个时间步生成3个样本)# 验证样本sample = dataset[0]print(f"Encoder输入形状: {sample['encoder_input'].shape}")print(f"Decoder输入形状: {sample['decoder_input'].shape}")print(f"有效Decoder长度: {sample['actual_lengths'][1]}")print(f"Mask矩阵示例:\nEncoder: {sample['encoder_mask'][:5]}\nDecoder: {sample['decoder_mask'][:5]}")

相关文章:

  • Android学习总结之网络篇补充
  • ACE-Step:扩散自编码文生音乐基座模型快速了解
  • ActiveMQ 源码剖析:消息存储与通信协议实现(二)
  • 使用 Couchbase Analytics Service 的典型步骤
  • [GESP202406 七级] 黑白翻转
  • FAISS(Facebook AI Similarity Search)
  • 单片机-STM32部分:6、不同编程方式-寄存器、标准库、HAL库、LL库
  • Scrapy框架之Scrapyd部署及Gerapy分布式爬虫管理框架的使用
  • MCU缓存架构设计与优化策略
  • MySQL关于锁的面试题
  • 【详细教程】ROC曲线的计算方式与绘制方法详细介绍
  • 基于SeaFormer的YOLOv8性能提升策略—轻量高效注意力模块Sea_AttentionBlock在语义分割中的应用研究
  • 性能比拼: HTTP/2 vs. HTTP/3
  • 【算法】随机快速排序和随机选择算法
  • QT编程练习20250507
  • 【C++】C++中this指针的介绍及使用
  • k8s部署OpenELB
  • RT Thread Studio创建软件和硬件RTC工程
  • ROBOVERSE:面向可扩展和可泛化机器人学习的统一平台、数据集和基准
  • SQL 子查询
  • 花2万多在海底捞办婚礼,连锁餐企要抢酒楼的婚宴生意?
  • 拿出压箱底作品,北京交响乐团让上海观众享受音乐盛宴
  • 七大交响乐团“神仙斗法”,时代交响在上海奏出时代新声
  • 英国和美国就关税贸易协议条款达成一致
  • 习近平同俄罗斯总统普京举行会谈
  • 习近平出席俄罗斯总统举行的欢迎仪式