当前位置: 首页 > news >正文

dbscan 检测噪声

matplotlib

import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import LocalOutlierFactor


# 生成复杂轨迹函数
def generate_realistic_trajectory(num_points, speed_factors):
    """生成包含随机转向的复杂轨迹"""
    np.random.seed(42)  # 固定随机种子确保可重复性
    trajectory = []
    x, y = 400, 300  # 起始点
    direction = 0  # 初始方向(弧度)

    for factor in speed_factors:
        # 每段开始时的随机方向偏移
        direction += np.random.uniform(-np.pi / 2, np.pi / 2)

        for _ in range(num_points // len(speed_factors)):
            # 添加方向随机扰动
            direction += np.random.normal(0, 0.2)

            # 计算位移分量
            dx = factor * np.cos(direction) + np.random.normal(0, 0.3)
            dy = factor * np.sin(direction) + np.random.normal(0, 0.3)

            x += dx
            y += dy

            # 添加位置噪声(模拟传感器噪声)
            x += np.random.normal(0, 3)
            y += np.random.normal(0, 3)

            trajectory.append((x, y))

    return np.array(trajectory)


# 生成轨迹数据
trajectory = generate_realistic_trajectory(200, [5.0, 2.0, 0.5, 6, 3])


# ======================
# 方法一:滑动窗口+DBSCAN
# ======================
def sliding_window_anomaly_detection(trajectory, window_size=15, step=5, eps=0.5, min_samples=3):
    """滑动窗口异常检测主函数"""
    # 创建滑动窗口
    windows = []
    for i in range(0, len(trajectory) - window_size + 1, step):
        window = trajectory[i:i + window_size]
        windows.append(window)

    # 提取窗口特征(关键步骤)
    features = []
    for win in windows:
        # 1. 位置统计特征
        x_mean = np.mean(win[:, 0])
        y_mean = np.mean(win[:, 1])
        xy_std = np.std(win, axis=0).mean()

        # 2. 动态特征
        dx = np.diff(win[:, 0])
        dy = np.diff(win[:, 1])
        speeds = np.sqrt(dx ** 2 + dy ** 2)
        avg_speed = np.mean(speeds)
        speed_std = np.std(speeds)

        # 3. 方向变化特征
        directions = np.arctan2(dy, dx)
        dir_change = np.abs(np.diff(directions)).mean()

        features.append([x_mean, y_mean, xy_std, avg_speed, speed_std, dir_change])

    # 特征标准化
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(features)

    # DBSCAN聚类
    dbscan = DBSCAN(eps=eps, min_samples=min_samples)
    labels = dbscan.fit_predict(scaled_features)

    # 标记异常窗口
    anomaly_windows = [windows[i] for i, label in enumerate(labels) if label == -1]

    # 提取所有异常点(取每个异常窗口的最后两个点作为疑似异常)
    anomaly_points = []
    for win in anomaly_windows:
        anomaly_points.extend(win[-2:])  # 假设异常通常出现在窗口末尾

    return np.array(anomaly_points)


# ======================
# 方法二:局部离群因子(LOF)
# ======================
def lof_anomaly_detection(trajectory, neighbors=20, contamination=0.1):
    """直接使用LOF检测离群点"""
    lof = LocalOutlierFactor(n_neighbors=neighbors, contamination=contamination)
    labels = lof.fit_predict(trajectory)
    return trajectory[labels == -1]


# 执行检测
window_anomalies = sliding_window_anomaly_detection(trajectory)
lof_anomalies = lof_anomaly_detection(trajectory)

# ======================
# 可视化对比
# ======================
plt.figure(figsize=(15, 5))

# 原始轨迹
plt.subplot(131)
plt.scatter(trajectory[:, 0], trajectory[:, 1], s=10, c='blue', alpha=0.6)
plt.title("Original Trajectory")
plt.xlabel("X")
plt.ylabel("Y")

# 滑动窗口+DBSCAN检测结果
plt.subplot(132)
plt.scatter(trajectory[:, 0], trajectory[:, 1], s=10, c='blue', alpha=0.3)
plt.scatter(window_anomalies[:, 0], window_anomalies[:, 1], s=50, edgecolors='red', facecolors='none', label='Anomalies')
plt.title("Sliding Window + DBSCAN Detection")
plt.legend()

# LOF检测结果
plt.subplot(133)
plt.scatter(trajectory[:, 0], trajectory[:, 1], s=10, c='blue', alpha=0.3)
plt.scatter(lof_anomalies[:, 0], lof_anomalies[:, 1], s=50, edgecolors='green', facecolors='none', label='LOF Anomalies')
plt.title("LOF Direct Detection")
plt.legend()

plt.tight_layout()
plt.show()

# ======================
# 参数优化建议
# ======================
print("""
参数调优指导:
1. 滑动窗口大小(window_size):
   - 建议值:10-30个点(覆盖约1-3秒的连续运动)
   - 过小:噪声敏感度高,但误报率上升
   - 过大:可能错过瞬时异常

2. DBSCAN参数:
   - eps:通过K-距离图寻找拐点(尝试0.3-1.0)
   - min_samples:通常设为3-5,避免过度分割

3. LOF参数:
   - n_neighbors:根据轨迹密度调整(通常20-50)
   - contamination:预估异常比例(0.05-0.2)
""")

opencv可视化

dbscan/dbscan_2_cv.py 

import cv2
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import LocalOutlierFactor
def generate_realistic_trajectory(num_points, speed_factors):
    """生成包含随机转向的复杂轨迹"""
    trajectory = []
    x, y = 400, 300  # 起始点居中
    direction = 0  # 初始方向(弧度)

    for factor in speed_factors:
        # 每段开始时的随机方向偏移
        direction += np.random.uniform(-np.pi / 2, np.pi / 2)

        for _ in range(num_points // len(speed_factors)):
            # 添加方向随机扰动

            if np.random.uniform(0, 1) < 0.5:
                direction += np.random.normal(0, 0.2)
            # 计算位移分量
            dx = factor * np.cos(direction) + np.random.normal(0, 0.3)
            dy = factor * np.sin(direction) + np.random.normal(0, 0.3)

            x += dx
            y += dy

            if np.random.uniform(0, 1) < 0.5:
                # 添加位置噪声
                x += np.random.normal(0, 5)
                y += np.random.normal(0, 5)

            trajectory.append((x, y))

    return np.array(trajectory)

def sliding_window_anomaly_detection(trajectory, window_size=9, step=5, eps=1, min_samples=3):
    """滑动窗口异常检测主函数"""
    # 创建滑动窗口
    windows = []
    for i in range(0, len(trajectory) - window_size + 1, step):
        window = trajectory[i:i + window_size]
        windows.append(window)

    # 提取窗口特征(关键步骤)
    features = []
    for win in windows:
        # 1. 位置统计特征
        x_mean = np.mean(win[:, 0])
        y_mean = np.mean(win[:, 1])
        xy_std = np.std(win, axis=0).mean()

        # 2. 动态特征
        dx = np.diff(win[:, 0])
        dy = np.diff(win[:, 1])
        speeds = np.sqrt(dx ** 2 + dy ** 2)
        avg_speed = np.mean(speeds)
        speed_std = np.std(speeds)

        # 3. 方向变化特征
        directions = np.arctan2(dy, dx)
        # dir_change = np.abs(np.diff(directions)).mean()
        dir_change = np.diff(directions).mean()

        features.append([x_mean, y_mean, xy_std, avg_speed, speed_std, dir_change])

    # 特征标准化
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(features)

    # DBSCAN聚类
    dbscan = DBSCAN(eps=eps, min_samples=min_samples)
    labels = dbscan.fit_predict(scaled_features)

    # 标记异常窗口
    anomaly_windows = [windows[i] for i, label in enumerate(labels) if label == -1]

    # 提取所有异常点(取每个异常窗口的最后两个点作为疑似异常)
    anomaly_points = []
    for win in anomaly_windows:
        anomaly_points.extend(win[-2:])  # 假设异常通常出现在窗口末尾

    return np.array(anomaly_points)

def visualize_trajectory_with_opencv(trajectory, anomalies, window_size=(800, 600)):
    """使用OpenCV可视化轨迹连线"""
    # 创建白色画布
    canvas = np.ones((window_size[1], window_size[0], 3), dtype=np.uint8) * 255

    # 转换坐标为整数并确保在画布范围内
    def clamp_coordinates(point):
        x = int(np.clip(point[0], 0, window_size[0] - 1))
        y = int(np.clip(point[1], 0, window_size[1] - 1))
        return (x, y)

    # 绘制轨迹连线(带抗锯齿)
    prev_point = None
    for i, point in enumerate(trajectory):
        current_point = clamp_coordinates(point)
        if prev_point is not None:
            # 使用渐变色表示运动方向
            color = (0, int(255 * i / len(trajectory)), 0)  # 从绿到蓝的渐变
            cv2.line(canvas, prev_point, current_point, color, 2, lineType=cv2.LINE_AA)
        prev_point = current_point

    # 标记异常点
    for point in anomalies:
        center = clamp_coordinates(point)
        # 绘制红色空心圆标记异常
        # cv2.circle(canvas, center, 3, (0, 0, 255), -1)
        cv2.circle(canvas, center, 3, (0, 0, 255), 2, lineType=cv2.LINE_AA)

    # 添加坐标指示
    cv2.putText(canvas, f"Points: {len(trajectory)}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (100, 100, 100), 2)
    cv2.putText(canvas, f"Anomalies: {len(anomalies)}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    # 显示图像
    cv2.imshow("Trajectory Visualization", canvas)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

while True:

    # 生成轨迹数据(复用之前的函数)
    trajectory = generate_realistic_trajectory(200, [5.0, 2.0, 0.5, 6, 3])
    anomaly_points = sliding_window_anomaly_detection(trajectory)  # 使用之前定义的检测方法


    # 执行可视化(调整窗口尺寸适配坐标范围)
    visualize_trajectory_with_opencv(trajectory, anomaly_points, window_size=(800, 600))

http://www.dtcms.com/a/112268.html

相关文章:

  • Node.js中间件的5个注意事项
  • 【Java集合】单列集合List详解
  • 基于K8s的演示用单机ML服务部署
  • 26考研——线性表_ 线性表的链式表示_双循环链表(2)
  • 多表查询的多与一
  • 对 Python Websockets 库全方位详解
  • 企业安全——FIPs
  • 面试可能会遇到的问题回答(嵌入式软件开发部分)
  • 如何在 Windows 上安装 Python
  • 新旧iPhone相册复制 - 相册图片视频对拷 - 换机 - 迁移设备数据 - 免费开源爱思助手
  • 免费在线MBTI性格测试工具 - 探索你的性格特质
  • 什么是自动化测试框架?常用的自动化测试框架有哪些?
  • 2.3 MySQL基本内置函数
  • Cortex-M​ 函数调用的入栈与出栈操作
  • 【5】搭建k8s集群系列(二进制部署)之安装master节点组件(kube-controller-manager)
  • 盲盒小程序开发平台搭建:打造个性化、高互动性的娱乐消费新体验
  • 定长池的实现
  • 蓝桥杯 小明的背包1 小兰的神秘礼物 01背包问题 模板 C++
  • 财务税务域——企业税务系统设计
  • centos8上实现lvs集群负载均衡dr模式
  • 【学Rust写CAD】23 渐变效果(gradient_source.rs)
  • 【面试篇】Dubbo
  • NSSCTF [HGAME 2023 week1]simple_shellcode
  • 音视频入门基础:MPEG2-PS专题(8)——使用Wireshark分析GB28181的PS流
  • 第十二步:react
  • 如何用Python轻松实现快速复制或剪切文件列表中的所有文件呢?
  • 【架构艺术】Go大仓monorepo中使用wire做依赖注入的经验
  • PowerMonitor的使用步骤
  • 【jvm】GC评估指标
  • 面试手撕------智能指针