dbscan 检测噪声
matplotlib
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import LocalOutlierFactor
# 生成复杂轨迹函数
def generate_realistic_trajectory(num_points, speed_factors):
"""生成包含随机转向的复杂轨迹"""
np.random.seed(42) # 固定随机种子确保可重复性
trajectory = []
x, y = 400, 300 # 起始点
direction = 0 # 初始方向(弧度)
for factor in speed_factors:
# 每段开始时的随机方向偏移
direction += np.random.uniform(-np.pi / 2, np.pi / 2)
for _ in range(num_points // len(speed_factors)):
# 添加方向随机扰动
direction += np.random.normal(0, 0.2)
# 计算位移分量
dx = factor * np.cos(direction) + np.random.normal(0, 0.3)
dy = factor * np.sin(direction) + np.random.normal(0, 0.3)
x += dx
y += dy
# 添加位置噪声(模拟传感器噪声)
x += np.random.normal(0, 3)
y += np.random.normal(0, 3)
trajectory.append((x, y))
return np.array(trajectory)
# 生成轨迹数据
trajectory = generate_realistic_trajectory(200, [5.0, 2.0, 0.5, 6, 3])
# ======================
# 方法一:滑动窗口+DBSCAN
# ======================
def sliding_window_anomaly_detection(trajectory, window_size=15, step=5, eps=0.5, min_samples=3):
"""滑动窗口异常检测主函数"""
# 创建滑动窗口
windows = []
for i in range(0, len(trajectory) - window_size + 1, step):
window = trajectory[i:i + window_size]
windows.append(window)
# 提取窗口特征(关键步骤)
features = []
for win in windows:
# 1. 位置统计特征
x_mean = np.mean(win[:, 0])
y_mean = np.mean(win[:, 1])
xy_std = np.std(win, axis=0).mean()
# 2. 动态特征
dx = np.diff(win[:, 0])
dy = np.diff(win[:, 1])
speeds = np.sqrt(dx ** 2 + dy ** 2)
avg_speed = np.mean(speeds)
speed_std = np.std(speeds)
# 3. 方向变化特征
directions = np.arctan2(dy, dx)
dir_change = np.abs(np.diff(directions)).mean()
features.append([x_mean, y_mean, xy_std, avg_speed, speed_std, dir_change])
# 特征标准化
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
# DBSCAN聚类
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
labels = dbscan.fit_predict(scaled_features)
# 标记异常窗口
anomaly_windows = [windows[i] for i, label in enumerate(labels) if label == -1]
# 提取所有异常点(取每个异常窗口的最后两个点作为疑似异常)
anomaly_points = []
for win in anomaly_windows:
anomaly_points.extend(win[-2:]) # 假设异常通常出现在窗口末尾
return np.array(anomaly_points)
# ======================
# 方法二:局部离群因子(LOF)
# ======================
def lof_anomaly_detection(trajectory, neighbors=20, contamination=0.1):
"""直接使用LOF检测离群点"""
lof = LocalOutlierFactor(n_neighbors=neighbors, contamination=contamination)
labels = lof.fit_predict(trajectory)
return trajectory[labels == -1]
# 执行检测
window_anomalies = sliding_window_anomaly_detection(trajectory)
lof_anomalies = lof_anomaly_detection(trajectory)
# ======================
# 可视化对比
# ======================
plt.figure(figsize=(15, 5))
# 原始轨迹
plt.subplot(131)
plt.scatter(trajectory[:, 0], trajectory[:, 1], s=10, c='blue', alpha=0.6)
plt.title("Original Trajectory")
plt.xlabel("X")
plt.ylabel("Y")
# 滑动窗口+DBSCAN检测结果
plt.subplot(132)
plt.scatter(trajectory[:, 0], trajectory[:, 1], s=10, c='blue', alpha=0.3)
plt.scatter(window_anomalies[:, 0], window_anomalies[:, 1], s=50, edgecolors='red', facecolors='none', label='Anomalies')
plt.title("Sliding Window + DBSCAN Detection")
plt.legend()
# LOF检测结果
plt.subplot(133)
plt.scatter(trajectory[:, 0], trajectory[:, 1], s=10, c='blue', alpha=0.3)
plt.scatter(lof_anomalies[:, 0], lof_anomalies[:, 1], s=50, edgecolors='green', facecolors='none', label='LOF Anomalies')
plt.title("LOF Direct Detection")
plt.legend()
plt.tight_layout()
plt.show()
# ======================
# 参数优化建议
# ======================
print("""
参数调优指导:
1. 滑动窗口大小(window_size):
- 建议值:10-30个点(覆盖约1-3秒的连续运动)
- 过小:噪声敏感度高,但误报率上升
- 过大:可能错过瞬时异常
2. DBSCAN参数:
- eps:通过K-距离图寻找拐点(尝试0.3-1.0)
- min_samples:通常设为3-5,避免过度分割
3. LOF参数:
- n_neighbors:根据轨迹密度调整(通常20-50)
- contamination:预估异常比例(0.05-0.2)
""")
opencv可视化
dbscan/dbscan_2_cv.py
import cv2
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import LocalOutlierFactor
def generate_realistic_trajectory(num_points, speed_factors):
"""生成包含随机转向的复杂轨迹"""
trajectory = []
x, y = 400, 300 # 起始点居中
direction = 0 # 初始方向(弧度)
for factor in speed_factors:
# 每段开始时的随机方向偏移
direction += np.random.uniform(-np.pi / 2, np.pi / 2)
for _ in range(num_points // len(speed_factors)):
# 添加方向随机扰动
if np.random.uniform(0, 1) < 0.5:
direction += np.random.normal(0, 0.2)
# 计算位移分量
dx = factor * np.cos(direction) + np.random.normal(0, 0.3)
dy = factor * np.sin(direction) + np.random.normal(0, 0.3)
x += dx
y += dy
if np.random.uniform(0, 1) < 0.5:
# 添加位置噪声
x += np.random.normal(0, 5)
y += np.random.normal(0, 5)
trajectory.append((x, y))
return np.array(trajectory)
def sliding_window_anomaly_detection(trajectory, window_size=9, step=5, eps=1, min_samples=3):
"""滑动窗口异常检测主函数"""
# 创建滑动窗口
windows = []
for i in range(0, len(trajectory) - window_size + 1, step):
window = trajectory[i:i + window_size]
windows.append(window)
# 提取窗口特征(关键步骤)
features = []
for win in windows:
# 1. 位置统计特征
x_mean = np.mean(win[:, 0])
y_mean = np.mean(win[:, 1])
xy_std = np.std(win, axis=0).mean()
# 2. 动态特征
dx = np.diff(win[:, 0])
dy = np.diff(win[:, 1])
speeds = np.sqrt(dx ** 2 + dy ** 2)
avg_speed = np.mean(speeds)
speed_std = np.std(speeds)
# 3. 方向变化特征
directions = np.arctan2(dy, dx)
# dir_change = np.abs(np.diff(directions)).mean()
dir_change = np.diff(directions).mean()
features.append([x_mean, y_mean, xy_std, avg_speed, speed_std, dir_change])
# 特征标准化
scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)
# DBSCAN聚类
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
labels = dbscan.fit_predict(scaled_features)
# 标记异常窗口
anomaly_windows = [windows[i] for i, label in enumerate(labels) if label == -1]
# 提取所有异常点(取每个异常窗口的最后两个点作为疑似异常)
anomaly_points = []
for win in anomaly_windows:
anomaly_points.extend(win[-2:]) # 假设异常通常出现在窗口末尾
return np.array(anomaly_points)
def visualize_trajectory_with_opencv(trajectory, anomalies, window_size=(800, 600)):
"""使用OpenCV可视化轨迹连线"""
# 创建白色画布
canvas = np.ones((window_size[1], window_size[0], 3), dtype=np.uint8) * 255
# 转换坐标为整数并确保在画布范围内
def clamp_coordinates(point):
x = int(np.clip(point[0], 0, window_size[0] - 1))
y = int(np.clip(point[1], 0, window_size[1] - 1))
return (x, y)
# 绘制轨迹连线(带抗锯齿)
prev_point = None
for i, point in enumerate(trajectory):
current_point = clamp_coordinates(point)
if prev_point is not None:
# 使用渐变色表示运动方向
color = (0, int(255 * i / len(trajectory)), 0) # 从绿到蓝的渐变
cv2.line(canvas, prev_point, current_point, color, 2, lineType=cv2.LINE_AA)
prev_point = current_point
# 标记异常点
for point in anomalies:
center = clamp_coordinates(point)
# 绘制红色空心圆标记异常
# cv2.circle(canvas, center, 3, (0, 0, 255), -1)
cv2.circle(canvas, center, 3, (0, 0, 255), 2, lineType=cv2.LINE_AA)
# 添加坐标指示
cv2.putText(canvas, f"Points: {len(trajectory)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (100, 100, 100), 2)
cv2.putText(canvas, f"Anomalies: {len(anomalies)}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# 显示图像
cv2.imshow("Trajectory Visualization", canvas)
cv2.waitKey(0)
cv2.destroyAllWindows()
while True:
# 生成轨迹数据(复用之前的函数)
trajectory = generate_realistic_trajectory(200, [5.0, 2.0, 0.5, 6, 3])
anomaly_points = sliding_window_anomaly_detection(trajectory) # 使用之前定义的检测方法
# 执行可视化(调整窗口尺寸适配坐标范围)
visualize_trajectory_with_opencv(trajectory, anomaly_points, window_size=(800, 600))