当前位置: 首页 > news >正文

【Dataset】如何高效处理海量数据并从中智能筛选出有代表性的样本?

  • 海量数据构建高质量数据集
    • Step 1. 流式批特征提取
    • Step 2. Incremental PCA(在线降维) + MiniBatchKMeans(大规模聚类)
      • Incremental PCA(增量主成分分析)
      • MiniBatchKMeans(小批量 K-Means)
    • Step 3. Cluster 内分层采样(Stratified Sampling within Clusters)并使用概率抽样(softmax temperature)
  • 举例子总结
    • Transformer 编码器
    • 自监督预训练(Self-Supervised Learning, SSL)
      • 掩码重建(Masked Autoencoder)
      • 对比学习(Contrastive Learning)
    • 拒绝神经网络,手工设计统计特征
  • 附录
    • HDF5 数据模型
      • 使用 Python 创建 HDF5 文件和数据集
    • 批处理(Batch processing)
      • 批处理的工作原理
      • 批处理 VS 流处理(Stream processing)

数据和特征决定了机器学习的上限,而模型和算法只是逼近这个上限而已,所以数据特征是否全面,数据量是否足够对于算法来说是非常重要的。

本文要谈论的:从海量未标注数据中 高效提取特征、发现结构,并 基于多目标价值评估进行有策略的样本采样,服务于主动学习、异常检测、数据压缩或 高质量数据集构建 等任务。

海量数据构建高质量数据集

首先得有海量数据昂。

四步走战略:原始信号 → 特征提取 → (可选)降维 → 聚类/采样

Step 1. 流式批特征提取

原始 .h5 文件包含的是 高维、原始、难以直接使用的信号数据(如音频波形、图像像素);经过 流式批特征提取 后,.h5 文件中存储的是 低维、紧凑、语义丰富的 embedding 向量(如 512 维特征),可以直接用于聚类、降维、采样等下游任务,且整个过程 内存友好、只需遍历原始数据一次

这一步希望完成以下任务:从一个 超大的 HDF5 文件 中读取原始数据 → 提取特征(如通过 CNN、手工特征或统计量)→ 将提取出的特征保存为另一个文件(如新的 HDF5 或 numpy 格式),全程仅遍历原始数据一次,且 不耗尽内存(内存不足以一次性加载所有数据)。

在这里插入图片描述

适用场景举例:

  • 语音识别:从百万条语音中提取 Wav2Vec 特征用于 聚类
  • 医学影像:从 MRI slice 序列中提取 CNN embedding
  • 工业传感器:对振动信号做 autoencoder 编码用于 故障检测
  • 行为识别:从 IMU 数据中提取时空特征用于 用户分群

除了数值数据,对于分类、 字符串数据,不是简单的将其转换为数值型,而是将其 转换为一个向量 embedding
图源文章 Introducing TensorFlow Feature Columns
在这里插入图片描述

接下来是 具体实现步骤,假设你有一个 HDF5 文件 raw_data.h5,其中包含:

with h5py.File('raw_data.h5', 'r') as f:print(f.keys())  # ['signals', 'labels', 'metadata']print(f['signals'].shape)  # (1_000_000, 128, 6) → 百万段信号,每段 128 时间步,6 通道

我们要从中提取特征,比如用一个神经网络将 (128,6) 映射为 (512,) 的 embedding

Step 1: 定义特征提取模型

import torch
import torch.nn as nnclass SimpleEncoder(nn.Module):def __init__(self):super().__init__()self.conv = nn.Sequential(nn.Conv1d(6, 32, kernel_size=3),nn.ReLU(),nn.AdaptiveAvgPool1d(32))self.fc = nn.Linear(32*32, 512)def forward(self, x):# x shape: (B, T, C) → (B, C, T)x = x.transpose(1, 2)x = self.conv(x)x = x.flatten(start_dim=1)return self.fc(x)

假设已经训练好了,加载模型并置于设备上:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleEncoder().to(device)
model.eval()  # 推理模式

Step 2: 设置输入和输出路径,创建输出 HDF5 文件

import h5py
import numpy as npinput_path = 'raw_data.h5'
output_path = 'enhanced_data.h5'  # 如果只需要保存提取的 feature, 则保存为 'features.h5'feat_dim = 512  # 输出特征维度
chunk_size = 256  # 每次处理 256 条# 在外层同时管理两个文件:从旧文件读取 并 写入新文件
with h5py.File(input_path, 'r') as infile, h5py.File(output_path, 'w') as outfile:N = infile['signals'].shape[0]raw_data = infile['signals']  # ← 只是一个“指针”,不是数组!# Step 1: 复制原始数据(按需保留)infile.copy('signals', outfile)   # ← 保留 signalsif 'labels' in infile:outfile['labels'] = infile['labels'][:]  # 保留 labelsif 'metadata' in infile:infile.copy('metadata', outfile)         # 递归复制 metadata# Step 2: 创建新的 features 数据集features_dset = outfile.create_dataset('features',shape=(N, feat_dim),dtype='float32',chunks=True,compression='gzip')# Step 3: 流式提取并写入 featuresfor i in range(0, N, chunk_size):end_idx = min(i + chunk_size, N)batch_x = raw_data[i:end_idx]  # shape: (chunk_size, T, C)  ← 此刻才从硬盘读取这 256 条数据batch_tensor = torch.tensor(batch_x, dtype=torch.float32).to(device)  # 转为 tensor 并移到 GPUwith torch.no_grad():   # 前向传播,获取特征features = model(batch_tensor)  # shape: (B, 512)features_dset[i:end_idx] = features.cpu().numpy()  # 写回硬盘# 打印进度if i % (chunk_size * 10) == 0:print(f"Processed {i}/{N} samples")print("✅ All data (signals, labels, metadata, features) saved.")

关键点:infile['signals'] 并不加载数据!

raw_data 是一个 h5py._hl.dataset.Dataset 对象,它 指向磁盘上的数据块只有切片访问时才真正读入内存

在这个例子中:

  • 原始 raw_data.h5 包含的是 百万条原始 (128,6) 时间序列信号及其标签与元数据
  • 处理后的 features.h5 包含的是对应的 (512,) embedding 特征向量,并保留了原有的 signalslabelsmetadata,实现了 从“原始观测”到“语义表示”的转换,支持后续高效、低资源消耗的分析任务。
enhanced_data.h5
├── signals      → 原始信号 (1_000_000, 128, 6)
├── labels       → 标签 (1_000_000,)
├── metadata     → 元数据 Group(subject_id, session_time...)
└── features     → 新提取的 embedding (1_000_000, 512)
  • features聚类 / 可视化 / 训练模型
  • signals 做信号分析 / 检查原始波形
  • labelsmetadata 做分组统计、回溯分析

整个过程仅遍历原始数据一次,使用 批处理HDF5 的懒加载机制 保证内存友好,是一种 工业级的大规模特征工程标准做法

Step 2. Incremental PCA(在线降维) + MiniBatchKMeans(大规模聚类)

这套组合是专为大规模、高维、无法一次性加载进内存的数据集设计的 在线学习 方案。

  • 普通方法 PCA.fit(features) 需要将全部数据载入内存 → OOM(Out of Memory),解决方法:使用 Incremental PCA
  • 标准 K-Means 每次迭代遍历全量数据 → 太慢,解决方法:使用 MiniBatchKMeans

📌 类比:就像一边看书一边做笔记总结,而不是等看完整本书才开始写摘要。

Incremental PCA(增量主成分分析)

Incremental PCA 是标准 PCA 的在线版本。不要求一次性输入所有数据,支持 partial_fit(X_batch) 方法,逐批次更新主成分方向,最终可将高维特征投影到低维空间(如 512 → 64)。这需要两遍扫描,

  • 第一遍扫描(First Pass):学习 PCA 的主成分方向(即“哪些维度最重要”)
  • 第二遍扫描(Second Pass):应用这些主成分,把原始数据投 影到低维空间,得到最终的降维结果。
from sklearn.decomposition import IncrementalPCAipca = IncrementalPCA(n_components=64, batch_size=256)
with h5py.File('features.h5', 'r') as f:X = f['features']N = len(X)chunk_size = 256for i in range(0, N, chunk_size):batch = X[i:i+chunk_size]ipca.partial_fit(batch)  # ← 增量学习主成分print("✅ IPCA training completed.")

由于 IncrementalPCA 不支持直接对 HDF5 dataset 转换,需要再次流式应用:

with h5py.File('features.h5', 'r') as f, h5py.File('reduced_features.h5', 'w') as outf:X = f['features']N = len(X)chunk_size = 256dset = outf.create_dataset('embedding_64d', (N, 64), dtype='float32')  # 创建输出 datasetfor i in range(0, N, chunk_size):batch = X[i:i+chunk_size]reduced = ipca.transform(batch)  # ← 应用降维dset[i:i+chunk_size] = reducedprint("✅ All data transformed to 64D.")

假设 embedding 是由两个主要模式驱动的:是否发生快速上升(电压突变)、是否出现震荡衰减。

第一遍扫描 的目标就是让 PCA 发现这两个“潜在因子”作为主成分方向。一旦发现,第二遍扫描 就可以 用这两个方向来描述每条轨迹

MiniBatchKMeans(小批量 K-Means)

这是从 降维后的语义空间 走向 结构发现 的关键一步。

K-Means 的变体,每次只用一个小 batch 更新聚类中心。使用梯度近似方式更新 centroid,速度快、内存省。

from sklearn.cluster import MiniBatchKMeans
import numpy as npkmeans = MiniBatchKMeans(n_clusters=100, batch_size=256, max_iter=100, random_state=42)
with h5py.File('features.h5', 'r') as f:X = f['features']N = len(X)chunk_size = 256# 第一次调用需要初始化init_batch = X[:kmeans.batch_size]kmeans.fit(init_batch)  # 先 fit 初始化 centroids# 然后用 partial_fit 增量更新for i in range(0, N, chunk_size):batch = X[i:i+chunk_size]kmeans.partial_fit(batch)print("✅ MiniBatchKMeans clustering completed.")

💡 注意partial_fit持续更新模型,包括 centroidinertia

每次调用 partial_fit,MiniBatchKMeans 执行以下步骤:

  1. 随机选择一部分样本作为“新观测”
  2. 将每个样本分配给最近的 cluster center
  3. 以滑动平均方式更新该 center
    cj←(1−η)⋅cj+η⋅1∣Bj∣∑i∈Bjxi\mathbf{c}_j \leftarrow (1 - \eta) \cdot \mathbf{c}_j + \eta \cdot \frac{1}{|B_j|} \sum_{i \in B_j} \mathbf{x}_i cj(1η)cj+ηBj1iBjxi
    其中:η\etaη 是学习率,BjB_jBj 是当前 batch 中属于第 j 个 cluster 的样本。

训练完成后,可以获取每个样本的 cluster label(需 再次流式预测),

with h5py.File('features.h5', 'r') as f, h5py.File('clusters.h5', 'w') as outf:X = f['features']N = len(X)chunk_size = 256dset = outf.create_dataset('cluster_id', (N,), dtype='int32')for i in range(0, N, chunk_size):batch = X[i:i+chunk_size]preds = kmeans.predict(batch)dset[i:i+chunk_size] = predsprint("✅ Cluster assignments saved.")

查看聚类中心:

centers = kmeans.cluster_centers_  # shape: (100, 512)

可用于后续“查找最典型样本”或“生成合成样本”。

直接在原始空间聚类,保留更多细节不损失信息,但需要更大 batch size 和更多迭代,适用于确信 高维结构很重要的场景(如细粒度类别区分)。

聚类本身只依赖低维 embedding,所有 scoring 都是事后分析,不影响聚类结果

如何评估聚类质量?(无监督指标)

  • Inertia:样本到其 cluster 中心的距离平方和,越小越好。
  • Silhouette Score:衡量簇间分离度(可对 batch 估算),越大越好。

Step 3. Cluster 内分层采样(Stratified Sampling within Clusters)并使用概率抽样(softmax temperature)

这一步的目标是:在每一个 cluster 中,有策略地选出最具代表性的 k 个样本,用于后续人工标注、异常分析或 构建高质量子集。 每个 cluster 是一层,每层独立采样,确保覆盖所有类型。

举个例子,可以为每个样本计算下面这些 score,并加权组合成一个 综合得分

  • Stability Score(稳定性):衡量一个样本是否 长期稳定 属于这个 cluster。高 stability = 长期一致归属某类 → 典型代表;低 stability = 经常被分到不同 cluster → 边界/模糊样本。

    方法:使用 余弦相似度 到 cluster center,

    from sklearn.metrics.pairwise import cosine_similaritysimilarity = cosine_similarity(X_cluster, center.reshape(1, -1)).flatten()
    stability_score = similarity  # 越接近 center 越稳定
    

    Stability 的用途

    • 构建基准数据集:选 high-stability 样本作为“标准工况”;
    • 数据清洗:low-stability 可能是标签错误或噪声;
    • 工况分类:每个 cluster 中挑最稳定的做 prototype。
  • Novelty Score(新颖性):衡量一个样本是否 “与众不同” —— 可能是异常或罕见模式。定义是样本到其所属 cluster center 的距离,距离越远,表示该样本越“偏离典型模式”,可能是异常、边界样本或新行为。

    方法:使用 欧氏距离 到 cluster center,

    distances = np.linalg.norm(X_cluster - center, axis=1)
    novelty_score = distances  # 距离越远越“新”
    
  • Energy Score(能量强度):适用于电流/电压信号,反映事件的 显著程度

    方法:计算 原始信号的能量(如 RMS),

    # 取对应 cluster 的原始信号
    raw_cluster = raw_signals[mask]  # (M, 32, 3)# 计算总 RMS 能量(可按通道加权)
    rms = np.sqrt(np.mean(raw_cluster ** 2, axis=(1, 2)))  # (M,)
    energy_score = (rms - rms.min()) / (rms.max() - rms.min() + 1e-8)  # 归一化
    

然后将多个 score 加权融合:

 # 归一化所有 score 到 [0,1]
def normalize(x):return (x - x.min()) / (x.max() - x.min() + 1e-8)s_scores = normalize(stability_score)      # 高分:典型
n_scores = normalize(novelty_score)        # 高分:异常
e_scores = normalize(energy_score)         # 高分:强信号# 加权求和(注意符号  正向:越高越好;负向:越低越好)
composite_score = (weights['stability'] * s_scores + weights['novelty'] * n_scores + weights['energy'] * e_scores)

使用 Softmax + Temperature 进行概率抽样是要引入可控的随机性。不能直接取 top-k,那样会失去多样性。

# 使用 softmax 转换为概率分布
probabilities = softmax(composite_score / temperature)# 温度控制探索 vs 利用
# - temperature >> 1: 更均匀,探索性强
# - temperature << 1: 集中在高分样本,利用性强# 概率化抽样(不放回)
selected_local_idx = np.random.choice(np.arange(M),size=k,replace=False,p=probabilities
)# 映射回全局 index
selected_global_idx = indices[selected_local_idx]return selected_global_idx.tolist()
  • T = 0.1:几乎只选最高分样本,挑选最典型代表
  • T = 1.0:平衡选择,通用推荐
  • T = 5.0:接近均匀随机,探索未知、防偏见
  • T → ∞:完全随机,纯探索模式
  • T → 0:总是选最大值,贪心策略

对所有 cluster 执行分层采样:

all_selected = []for cid in np.unique(cluster_labels):selected_in_cluster = cluster_internal_sampling(features=features,cluster_labels=cluster_labels,cluster_centers=cluster_centers,target_cluster=cid,k=10,  # 每簇选 10 个temperature=1.5,weights={'stability': 0.5,'novelty': 1.0,   # 更关注异常'energy': 0.8     # 偏好强信号})all_selected.extend(selected_in_cluster)print(f"Total selected: {len(all_selected)} samples")

不要只挑最好的,也不要随便乱挑;而要在一个有意义的群体内部,按照多维价值评分,用带温度的概率方式智能抽取。

举例子总结

举个例子,面对一个 超大规模时间序列数据集N=2000万N=2000万N=2000 样本),每个样本是 C=3C=3C=3 通道 × T=32T=32T=32 时间步 的物理信号(电流、电压、位置),该如何设计特征提取与降维策略?

直接使用原始 (3,32),存在问题如 缺乏语义表达力(原始波形变化大,但语义可能相似,如相位偏移),信息冗余等。

推荐用轻量神经网络 提取 embedding,如 1D CNN 或 Transformer Encoder(小规模)。

关键矛盾点:“我想用 Transformer Encoder 提取特征,但它需要训练 —— 而我没有标注数据。”

Transformer 编码器

数据是:(B, C=3, T=32) 的时间序列(电流、电压、位置),每条是一个短时序。设计一个 无需标签也能使用 的 Transformer 编码器,想用一个 Transformer Encoder 将每条轨迹压缩成一个 全局语义 embedding(如 (B, T, C) -> (B, dim),以便后续:聚类分析或分布可视化等。

第一步,正弦位置编码,为每个时间步生成一个唯一的、基于正弦和余弦函数的“位置 ID”,并将其加到输入特征上,从而使 Transformer 能够感知 时间顺序

在这里插入图片描述

class PositionalEncoding(nn.Module):def __init__(self, d_model, max_len=512):super().__init__()pe = torch.zeros(max_len, d_model)   # (max_len, d_model)pos = torch.arange(0, max_len).unsqueeze(1).float()   # [0, 1, 2, ..., max_len-1]  -> (max_len, 1)div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))  # [0, 2, 4, ..., d_model-2] pe[:, 0::2] = torch.sin(pos * div_term)  # 偶数列:sinpe[:, 1::2] = torch.cos(pos * div_term)  # 奇数列:cosself.register_buffer('pe', pe.unsqueeze(0))  # (1, max_len, d_model)def forward(self, x):return x + self.pe[:, :x.size(1), :]

其中 (-math.log(10000.0) / d_model) 是一个常数,控制频率衰减速度。整体是
div_term[i]=exp⁡(−2i⋅ln⁡(10000)dmodel)=1100002i/dmodel​\text{div\_term[i]}=\exp \big( \frac{−2i\cdot \ln(10000)}{d_{model}} \big)= \frac{1}{10000^{2i/d_{model}​}}div_term[i]=exp(dmodel2iln(10000))=100002i/dmodel1

它实际上是在构造一组 指数衰减的频率,用于 生成不同波长的正弦波。假设 d_model=8,那么 div_term ≈ [1.0000, 0.1000, 0.0100, 0.0010],是四个不同的缩放因子,对应低频到高频的振荡。

第二步,定义模型,

class TimeSeriesTransformerEncoder(nn.Module):def __init__(self, in_channels=3, d_model=64, n_layers=4, n_heads=4, dim_ff=256, dropout=0.1, max_len=128, cls_token=True):super().__init__()self.cls_token = cls_token # 是否使用 `[CLS]` tokenself.in_proj = nn.Linear(in_channels, d_model)  # 线性投影层 (B, T, C=3) → (B, T, d_model=128)self.pos_enc = PositionalEncoding(d_model, max_len=max_len)  # 加了 PEencoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads, dim_feedforward=dim_ff, dropout=dropout, activation='gelu')self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)  # 堆叠 n_layers 层if cls_token:   # 类比 BERT, 最后取 [CLS] 的输出作为句子 embeddingself.cls = nn.Parameter(torch.randn(1, 1, d_model))   # 可学习的特殊 token (1,1,D) def forward(self, x):# x: (B, C, T)x = x.permute(0, 2, 1)  # -> (B, T, C)h = self.in_proj(x)  # (B, T, d_model)h = self.pos_enc(h)  # (B, T, d_model)if self.cls_token:B = h.shape[0]cls = self.cls.expand(B, -1, -1)  # (1,1,D) -> (B,1,D)h = torch.cat([cls, h], dim=1)  # (B, T+1, d_model)# transformer expects (S, N, E)h = h.permute(1, 0, 2)out = self.encoder(h)  # (S, N, E)out = out.permute(1, 0, 2)  # (B, S, E)if self.cls_token:emb = out[:, 0, :]  # (B, E)else:emb = out.mean(dim=1)return emb, out  # emb: global embedding, out: sequence embeddings

nn.TransformerEncoderLayer 是 PyTorch 内置的一个标准编码器层,包含:Multi-Head Self-Attention、Add & Norm、Feed-Forward Network (FFN)、Add & Norm。

encoder 是把 TransformerEncoderLayer 重复 n_layers 层,每一层都能捕捉更抽象的时间模式,第 1 层看局部相关性,第 4 层可能看到整个控制过程的动态规律。

torch.cat([cls, h], dim=1)序列最前面插入 [CLS] token,目的是让 attention 机制有机会把所有信息汇总到 [CLS]

原始:   [x₀, x₁, x₂, ..., x_{T-1}]        # (B, T, D)
插入后: [cls, x₀, x₁, ..., x_{T-1}]       # (B, T+1, D)

每一层中,[CLS] 都能 attend 到所有真实时间步,经过多层 Transformer 层传递,[CLS] 逐渐积累整条轨迹的语义信息

最后,推荐使用 [CLS],因为它是一个“主动聚合器”,而不是被动平均。

  • 如果用了 [CLS],取第一个 token 的输出:out[:, 0, :],这就是要的 global embedding,代表整条轨迹的语义;
  • 如果没用 [CLS],对所有时间步平均池化:mean(dim=1),也是一种 pooling 方式,但不如 [CLS] 灵活。

直观理解,假设训练好了这个 encoder,输入一条轨迹:电压突然上升 → 垂直位置快速抬升 → 然后震荡衰减 → 最终稳定在 z=0,Transformer 会通过 self-attention 发现这些因果关系:u[t]z[t+Δt] 之间有强 attention weight,[CLS] token 学会总结:这是一次典型的快速响应控制,不同轨迹之间的相似性体现在 emb 的距离上。最终,相似的控制行为 → embedding 靠近,不同的控制策略 → embedding 分开

每条样本映射为 64 维 compact embedding。为什么选择 64 维? 比原始 (3,32)=96 维更紧凑、足够保留动态模式(上升沿、振荡、过冲等)、不至于过度压缩导致信息丢失。

为了降低注意力层的二次复杂度,新 transformer 引入了 ProbSparse Attention 的概念。通过 让注意力层仅使用最重要的数据点来计算权重和概率,而不是全部数据点,ProbSparse 大幅缩短了计算注意力所需的时间。
在这里插入图片描述

自监督预训练(Self-Supervised Learning, SSL)

必须训练才能有效提取语义特征,但好消息是:你不需要人工标注!可以用无监督方式训练它

使用 自监督预训练(Self-supervised Pretraining),即使没有标签,也可以通过以下方法“教会”Transformer 学习有用的时序模式。

一个 混合自监督训练策略,可以同时利用

  • 掩码重建:学习动态规律;
  • 对比学习:学习不变表示。
方法掩码重建(MAE)对比学习(Contrastive)
输入部分遮盖的序列完整但增强的序列
输出重建原始信号embedding 之间的相对距离
学习方式逐点预测全局排序
关注点局部一致性全局语义相似性
是否需要 decoder✅ 是❌ 否
类比填空题多选题

掩码重建(Masked Autoencoder)

Masked Autoencoder(MAE)风格重建任务,原理:

  • 随机遮盖一部分时间步(比如 mask 掉 75%)
  • 让模型根据剩余部分重建原始信号
  • 迫使模型学习上下文依赖和动态规律
# 伪代码示例
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)for batch_x in dataloader:  # shape: (B, 32, 3)# 随机遮盖mask_ratio = 0.75mask = torch.rand(batch_x.shape[:2]) < mask_ratio  # (B, 32)masked_x = batch_x.clone()masked_x[mask] = 0.0  # 或填充均值# 前向传播得到 embeddinglatent = model.encoder(masked_x)  # 只取 [CLS]pred = model.decoder(latent)      # 解码回 (32,3)loss = F.mse_loss(pred, batch_x)optimizer.zero_grad()loss.backward()optimizer.step()

其中 mask = torch.rand(batch_x.shape[:2]) < mask_ratio # (B, T) 是在创建一个 布尔类型的随机掩码,用于标记哪些时间步将被遮盖(masked),即从输入中移除或置为零。

先生成一个形状为 (B, T) 的张量,假设 B=2, T=5,每个元素是从均匀分布 U(0,1)U(0,1)U(0,1) 中随机采样的浮点数(范围在 0 到 1 之间),然后对每个元素判断是否小于 mask_ratio(比如 0.15),则 mask 就是,

[[0.34>0.15→False, 0.87>0.15→False, 0.12<0.15→True, 0.95>0.15→False, 0.46>0.15→False],[0.77>0.15→False, 0.03<0.15→True,  0.68>0.15→False, 0.21>0.15→False, 0.99>0.15→False]]

mask[b, t] == True → 第 b 条序列的第 t 个时间步 要被遮盖mask[b, t] == False → 保留该时间步。

Masked Autoencoder (MAE):重建被遮盖的部分。把 mask 的部分填充为 0, 然后把 masked_x 输入模型,让模型预测原始完整信号,损失函数 只计算被遮盖位置的误差。目标是训练模型理解时间序列的上下文依赖,学会“脑补”缺失部分。

模型会自动学会“从局部推测整体”,学到的是物理信号的内在结构。

对比学习(Contrastive Learning)

一句话理解 对比学习让相似的东西靠近,不相似的东西远离

  • 相似的:同一条轨迹经过 不同增强(如加噪声、裁剪)→ 应该有相近的 embedding;
  • 不相似的:完全不同控制行为的轨迹 → embedding 应该远。

模型通过这种方式学会 提取语义上有意义的特征,而不是记住原始波形。

对比学习是一种“自我监督”的方式,通过人为制造“同一事物的不同观察视角”,然后训练模型识别这些视角属于同一个实体。

  • 在图像中:不同裁剪、旋转、颜色抖动 → 同一张图
  • 在时间序列中:加噪声、缩放、裁剪 → 同一段控制过程

核心机制:正样本 vs 负样本

  • 正样本对(Positive Pair):语义相同但细节不同的样本,同一条轨迹 + 不同增强;
  • 负样本对(Negative Pair):完全不同的样本,其他 batch 中的任意轨迹。
z1 = augmentation(x)  # view 1: jitter + scale + crop
z2 = augmentation(x)  # view 2: 另一套随机参数

z1z2 是一个正样本对!它们来自同一个原始轨迹 x,所以语义一致,但由于增强不同,外观略有差异。

def nt_xent_loss(z1, z2, temperature=0.2):z1 = F.normalize(z1, dim=1)  # L2 normalizez2 = F.normalize(z2, dim=1)batch_size = z1.size(0)z = torch.cat([z1, z2], dim=0)  # (2B, D)  拼接成大矩阵sim = torch.matmul(z, z.T) / temperature  # (2B, 2B)  计算相似度矩阵labels = torch.arange(batch_size, device=z.device)positives = torch.cat([torch.diag(sim, batch_size),   # z1[i] vs z2[i]torch.diag(sim, -batch_size)   # z2[i] vs z1[i]], dim=0)  # (2B,)# 构造分母(负样本求和)mask = (~torch.eye(2 * batch_size, dtype=torch.bool, device=z.device)).float()    # 排除自相似exp_sim = torch.exp(sim) * maskdenom = exp_sim.sum(dim=1)  # (2B,) 每行求和loss = -torch.log(torch.exp(positives) / (denom + 1e-12)).mean()return loss

这是 InfoNCE Loss 的一种简化实现,全称是 Normalized Temperature-scaled Cross Entropy Loss (NT-Xent)。

sim = torch.matmul(z, z.T) / temperaturesim[i,j] 表示第 i 个和第 j 个 embedding 的相似度,除以 temperature 控制分布锐度:温度小 → 差异放大;温度大 → 更平滑。

目标函数:InfoNCE / NT-Xent Loss(本质是分类),虽然叫“损失函数”,但它其实是在做一个分类任务:给定一个 query embedding(比如 z1[0]),从一堆候选中找出它的“配对项”(即 z2[0]

L=−log⁡exp⁡(sim(zi,zj)/τ)∑k=12BIk≠i⋅exp⁡(sim(zi,zk)/τ)\mathcal{L} = -\log \frac{ \exp(\text{sim}(z_i, z_j)/\tau) }{ \sum_{k=1}^{2B} \mathbb{I}_{k \neq i} \cdot \exp(\text{sim}(z_i, z_k)/\tau) } L=logk=12BIk=iexp(sim(zi,zk)/τ)exp(sim(zi,zj)/τ)

其中:

  • zi,zjz_i, z_jzi,zj:一对正样本(如 z1[0], z2[0]
  • sim(a,b)=aTb\text{sim}(a,b) = a^T bsim(a,b)=aTb:cosine 相似度
  • τ\tauτ:temperature 温度超参(控制分布锐度)
  • 分母包含所有其他样本(包括负样本)

负样本求和,即:
denomi=∑j≠iexp⁡(sim[i,j])\text{denom}_i = \sum_{j \neq i} \exp(\text{sim}[i,j]) denomi=j=iexp(sim[i,j])

📌 目标:最大化这个概率 → 让正样本得分远高于负样本

举个例子(B=2)

ViewEmbedding
v1_0z1[0]
v2_0z2[0] ← 正样本对
v1_1z1[1]
v2_1z2[1] ← 正样本对

目标:cos(z1[0], z2[0]) 要高,cos(z1[0], z1[1]), cos(z1[0], z2[1]) 要低。NT-Xent 自动完成这一优化。

拒绝神经网络,手工设计统计特征

不用神经网络,也能提取特征?

如果连训练都不想做,还有更简单的选择:手工设计统计特征(Hand-crafted Features)

对每条 (32,3) 序列提取如下特征:

  • 时域统计:均值、标准差、峰值、峰峰值、RMS、斜率
  • 频域特征:FFT 主频、能量集中度、谱熵
  • 变化率:差分最大值、上升时间、过冲量
  • 相关性:电流 vs 电压 相关系数、相位差估计
def extract_stats_features(x):# x: (32, 3) → current, voltage, positionmean = x.mean(axis=0)           # (3,)std = x.std(axis=0)             # (3,)peak = x.max(axis=0)            # (3,)rms = np.sqrt(np.mean(x**2, axis=0))  # (3,)diff = np.diff(x, axis=0)slew_rate = np.abs(diff).max(axis=0)  # 最大变化率return np.concatenate([mean, std, peak, rms, slew_rate])  # (15,)
  • 优点:完全无需训练、可解释性强、计算极快
  • 缺点:表达能力有限、无法捕捉复杂模式(如振荡频率变化)

总结流程

原始数据 (20e6, 32, 3)↓
【方案 A(推荐)】
→ 使用轻量 CNN / Transformer + MAE 自重建预训练(无监督)
→ 提取 64D embedding
→ 流式保存到 features.h5【方案 B(简单可靠)】
→ 手工提取统计特征(均值、RMS、变化率等)→ 得到 20~50D 特征
→ 直接用于 IPCA + KMeans↓
Incremental PCA → 降维至 32D(可选)↓
MiniBatchKMeans 聚类 → 获取 cluster_id↓
Cluster 内按 novelty/stability 分层采样

如果 embedding_dim > 32,且数据量巨大 → 建议用 IPCA 再压缩一下;如果只是为了聚类,64D 可直接上 MiniBatchKMeans。

附录

HDF5 数据模型

HDF5(Hierarchical Data Format version 5) 是一种专为 大规模数值数据 存储与高效访问设计的文件格式。

HDF5 文件(本身就是一个对象)可以被视为一个 容器(或组),用于存放 各种异构数据对象(或数据集)。这些数据集可以是图像、表格、图形,甚至是文档,例如 PDF 或 Excel:
在这里插入图片描述

HDF5 数据模型中的两个主要对象是 数据集。HDF5 数据模型中还有多种其他对象来支持组和数据集,包括 数据类型数据空间属性特性

假设上述 HDF5 文件中有两个组:VizSimOut

  • Viz 组下包含各种图像以及一张与 SimOut 组共享的表格。
  • SimOut 组包含一个三维数组、一个二维数组,以及一个链接到另一个 HDF5 文件中二维数组的链接。

在这里插入图片描述
对组及其成员的操作在许多方面类似于在 UNIX 中对目录和文件的操作。与 UNIX 的目录和文件一样,HDF5 文件中的对象通常通过提供它们的完整(或绝对)路径名来描述。

  • / 表示根组。
  • /foo 表示根组中名为 foo 的成员。
  • /foo/zoo 表示组 foo 的成员,而 foo 组本身是根组的成员。

HDF5 数据集组织并包含 raw 数据值。一个数据集除了数据本身外,还包括 描述数据的元数据

在这张图中,数据被存储为大小为 4 × 5 × 6 的三维数据集,使用整数数据类型。它包含属性 TimePressure,且该数据集采用了块状存储并进行了压缩。

在这里插入图片描述

使用 Python 创建 HDF5 文件和数据集

使用 Python 来 创建 HDF5 文件,必须:

  • 指定属性列表(或使用默认值)
  • 创建文件
  • 关闭文件(如有需要,也关闭属性列表)。

下面的 Python 示例创建了一个文件 file.h5,然后关闭它。生成的 HDF5 文件只包含一个根组:

import h5py
file = h5py.File ('file.h5', 'w')
file.close ()

在这里插入图片描述
使用 w 作为文件访问标志调用 h5py.File 将创建一个新的 HDF5 文件,并覆盖同名的已有文件。file 是打开文件后返回的文件句柄。文件使用完毕后必须将其关闭。当未指定属性列表时,将使用默认的属性列表。

如前所述,HDF5 数据集由原始数据以及描述数据的元数据(数据类型、空间信息和属性)组成。要 创建数据集,必须:

  • 定义数据集特性(数据类型、数据空间、属性)。
  • 决定将数据集附加到哪个组。
  • 创建数据集。
  • 关闭步骤 3 中的数据集句柄。

下面的代码摘录显示了在文件 dset.h5 中创建一个 4×6 整数数据集 dset 所需的调用。该数据集将位于根组中:

dataset = file.create_dataset("dset",(4, 6), h5py.h5t.STD_I32BE)

在这里插入图片描述
使用 Python 时,数据空间的创建作为参数包含在数据集创建方法中。只需一次调用即可创建一个 4 × 6 的整数数据集 dset。指定了预定义的 大端 32 位整数数据类型create_dataset 方法在根组(文件对象)中创建该数据集。该数据集由 Python 接口关闭。

创建或打开数据集后,可以向其 写入数据

data = np.zeros((4,6))
for i in range(4):for j in range(6):data[i][j]= i*6+j+1dataset[...] = data       <-- Write data to dataset
dataset[...] = data       <-- 将数据写入数据集
data_read = dataset[...]  <-- Read data from dataset
data_read = dataset[...]  <-- 从数据集读取数据

HDF5 组是一种结构,包含零个或多个 HDF5 对象。在 创建组 之前,必须获取要创建该组的位置标识符。下面是所需的步骤:

  • 决定将组放置在哪里——在“根组”(或文件标识符)中,还是在其他组中。如果组尚未打开,则打开它。
  • 定义属性或使用默认值。
  • 创建组。
  • 关闭组。

下面的代码 以读写权限打开数据集 dset.h5,并 在根组中创建一个名为 MyGroup 的组。由于未指定属性,因此使用默认值:

import h5py
file = h5py.File('dset.h5', 'r+')  
group = file.create_group ('MyGroup')
file.close()

在这里插入图片描述
创建属性,必须先打开希望附加属性的对象。然后可以根据需要创建、访问并关闭该属性:

  • 打开想要添加属性的对象。
  • 创建属性
  • 写入属性
  • 关闭属性及其所附属的对象。

在这里插入图片描述
在 Python 中创建属性的调用中指定了数据空间、数据类型和数据:

dataset.attrs["Units"] = 'Meters per second'  <-- Create string
dataset.attrs["Units"] = 'Meters per second'  <-- 创建字符串
attr_data = np.zeros((2,))
attr_data[0] = 100
attr_data[1] = 200
dataset.attrs.create("Speed", attr_data, (2,), 'i') <-- Create Integer
dataset.attrs.create("Speed", attr_data, (2,), 'i') <-- 创建整数

批处理(Batch processing)

批处理的工作原理

  • 数据收集:数据随时间从 各种来源 收集,并存储直至准备好进行处理;
  • 批处理执行:批处理作业 依据计划(例如,每夜)或定义的数据阈值触发;
  • 输出生成:批处理执行后的结果以多种形式生成,包括报告和 数据库更新

在这里插入图片描述

批处理 VS 流处理(Stream processing)

在批处理和流处理之间的选择体现了 时效性全面性 之间的权衡。

  • 批处理在预定窗口内以 大块、离散 的形式处理数据,这些块称为 批次。批处理最适用于 数据完整性 至关重要的场景,如日终报告或库存管理。
  • 流式处理在 数据实时到达时进行处理,毫无延迟。流式处理在需要 即时洞察 的场景中表现出色,如欺诈检测系统或实时仪表盘。

微批处理(Micro-batch processing)是一种混合方法,结合了批处理和流处理的优势。

  • 在这种方式中,数据以 小批次频繁的间隔 内进行处理,从而在提供 更快洞察 的同时,仍然保持批处理所具备的 数据完整性
  • 该技术通常用于需要 实时或近实时分析 的场景,但 数据量过大,传统流处理方法无法处理。
http://www.dtcms.com/a/524445.html

相关文章:

  • 攻防世界-Web-Confusion1
  • python:怎样用 Django 开发电子商务程序
  • 【u-boot】u-boot驱动模型-struct uclass_driver
  • 昌吉网站建设公司怎么用php安装wordpress
  • 山西网站建设营销什么价格html模板在哪找
  • MATLAB 实现基于短时傅里叶变换 (STFT) 的音频信号时频分析与可视化
  • 第十章-Tomcat性能测试与实战案例
  • 1.Linux初识
  • 如何在亚马逊做公司网站wordpress文档chm
  • 免费中英文网站源码想做个网站都需要什么
  • 【小程序】指定元素滚动到中间
  • 百度PaddleOCR-VL:基于0.9B超紧凑视觉语言模型,支持109种语言,性能超越GPT-4o等大模型
  • (论文速读)InteractVLM: 基于2D基础模型的3D交互推理
  • 网络基础知识简易急速理解---OSPF开放式最短路径优先协议
  • VTK入门:vtkImageData——3D体素/2D像素的“规则收纳盒”
  • 插入区间--leetcode
  • 网络构建与访问控制实验
  • 利用建e网全景生成VR全景链接
  • 【项目与八股】复习整理笔记
  • 企业门为什么要建设门户网站天津进口网站建设电话
  • OGNL语法实践
  • 二叉树的直径,二叉树中的最大路径和
  • 【无标题】Verilog中generate的用法
  • 代码随想录 105.从前序与中序遍历构造二叉树
  • 微信网站公司用wordpress还是用框架
  • 电子电气架构 --- 汽车软件开发基础V模型
  • 国产数据库替代MongoDB的技术实践过程:金仓多模数据库在电子证照系统中的深度应用
  • 【MATLAB例程】自适应渐消卡尔曼滤波,背景为二维雷达目标跟踪,基于扩展卡尔曼(EKF)|附完整代码的下载链接
  • 【开题答辩全过程】以 博客系统的设计与实现为例,包含答辩的问题和答案
  • 基于 OpenHarmony 分布式数据服务重构 BCI 脑机接口通信系统