构建一个简单的有监督的异常点检测项目
目录
异常检测的逻辑:
生成数据集:
模型结构:
训练模型:
模型性能评估:
做了一个简单的有监督的异常点检测项目,使用的是自己构建的符合高斯分布的数据集,使用简单的自编码器对异常进行检测并评估性能。
整个项目的代码架构如下:
异常检测的逻辑:
训练数据中只包含正常数据。
测试数据中包含了正常数据和异常数据,将测试数据输入到模型后得到的输出与输入的测试数据计算出重构误差(使用均方误差),再将这个均方误差与阈值比较,超过阈值的定义为异常,(这里的阈值,不是简单规定一个数,阈值也是一个误差,也需要重构出来计算,用3sigma原则,即阈值为重构误差的均值加上3倍重构误差的标准差)。
我们需要构造一个根据test的异常情况构造一个异常标签矩阵,正常位为0,异常为1,如下代码。
注:如果使用公开的数据集,通常会给出labels文件,但还需要转化为和test匹配的矩阵格式。
# 生成异常标签矩阵,正常数据位0,异常数据位1y_test = np.hstack([np.zeros(len(X_test_normal)), np.ones(len(anomalous_data))])
在评估我们的检测效果时,我们就需要将检测出的异常和这个异常标签矩阵进行比较。
# 评估性能metrics = evaluate_performance(y_test, anomalies)
生成数据集:
本次项目由于比较基础简单,使用自己随机生成一些高斯分布的数据,就不用公开的数据集还要经过预处理和归一化操作。
使用均值和方差不同的高斯分布构建正常数据集和异常数据集,正常数据集构造一个1000×10的矩阵,异常数据集构造一个100×10矩阵,代码如下。
def generate_normal_data(n_samples=1000, n_features=10):# 生成均值为0,标准差为1,(标准差越小,越集中在均值附近)的正常数据return np.random.normal(loc=0, scale=1, size=(n_samples, n_features))def generate_anomalous_data(n_samples=100, n_features=10):return np.random.normal(loc=5, scale=3, size=(n_samples, n_features))
下面展示我们随机生成的训练数据。
import numpy as npdef generate_normal_data(n_samples=1000, n_features=10):# 生成均值为0,标准差为1,(标准差越小,越集中在均值附近)的正常数据return np.random.normal(loc=0, scale=1, size=(n_samples, n_features))# 调用函数生成数据
data = generate_normal_data(n_samples=5, n_features=3)# 打印生成的数据
print("生成的正态分布随机数据:")
print(data)# 查看数据的基本统计信息
mean = np.mean(data, axis=0)
std = np.std(data, axis=0)
print("\n每列数据的均值:", mean)
print("每列数据的标准差:", std)
模型结构:
构建了很基础的一个自编码解码器的对称结构,编码器将输入数据映射到低维空间,解码器再映射会高维空间,实现重构数据,模型代码如下:
class Autoencoder(nn.Module):def __init__(self, input_dim):super(Autoencoder, self).__init__()self.encoder = nn.Sequential(nn.Linear(input_dim, 8),nn.ReLU(),nn.Linear(8, 4),nn.ReLU())self.decoder = nn.Sequential(nn.Linear(4, 8),nn.ReLU(),nn.Linear(8, input_dim),nn.Tanh())def forward(self, x):x = self.encoder(x)x = self.decoder(x)return x
训练模型:
训练只使用正常数据集,将正常数据集中的20%和异常数据集拼接作为测试数据集。
训练模型函数最后返回的是model,定义训练轮次,每次进行一轮训练,每一轮里再分批次处理数据,将数据输入到模型得到输出后计算损失,将损失反向传播后进行梯度更新参数,计算每一轮的总损失累加。
训练模型的主代码:
model.train()for epoch in range(num_epochs):running_loss = 0.0for data in train_loader:inputs, = data # 从元组中解包# 每个批次训练前,梯度清零optimizer.zero_grad()# 计算损失loss = criterion(model(inputs), inputs)loss.backward()# 优化器optimizer.step()running_loss += loss.item()# 假设数据总共100个(train_loader),训练10轮,每一轮中都训练这100个数据,每次取出10个(data)数据进行一批次训练# running_loss就是训练一轮的损失# 训练完轮次后打印信息if (epoch + 1) % 10 == 0:# 总损失除一轮的损失批次数print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader):.6f}')
模型性能评估:
计算准确率,召回率,F1分数已经被封装好了,可以直接用,测试数据集中的异常已经被提前打了标签,参考下面的公式。
运行结果:
可视化结果图:
本文的代码可以在我的github上下载。
链接:https://github.com/Zik-code/Autoencoder_detection
作者水平有限,有错误或者任何问题欢迎留言与我交流 !