【数据安全】连续数据突升检测
在数据监控领域,很多情况下需要对时序数据突增情况进行检测,下面记录简单的检测方法,仅供参考
import java.util.*;
import java.util.stream.Collectors;public class AdaptiveSurgeDetector {public static class SurgePoint {String startTime;String endTime;double startValue;double endValue;double surgeRatio;boolean isConsecutive;int consecutiveCount;int minShowCount;public SurgePoint(String startTime, String endTime,double startValue, double endValue, double surgeRatio,boolean isConsecutive, int consecutiveCount, int minShowCount) {this.startTime = startTime;this.endTime = endTime;this.startValue = startValue;this.endValue = endValue;this.surgeRatio = surgeRatio;this.isConsecutive = isConsecutive;this.consecutiveCount = consecutiveCount;this.minShowCount = minShowCount;}@Overridepublic String toString() {if (isConsecutive) {return String.format("连续突增[%s 至 %s]: 起始值=%.2f, 结束值=%.2f, 突增比例=%.2f%%, 持续=%d次",startTime, endTime, startValue, endValue, surgeRatio, consecutiveCount);} else {return String.format("单点突增[%s]: 值=%.2f, 突增比例=%.2f%%",startTime, startValue, surgeRatio);}}}/*** 检测方法** @param timeSeriesData 时间序列Map对象* @param thresholdMultiplier 阈值* @param minShowCount 最小连续检测点* @param baselineSize 基础数据计算缓存大小* @return*/public static List<SurgePoint> detectSurges(Map<String, Object> timeSeriesData,double thresholdMultiplier, Integer minShowCount, Integer baselineSize) {// 转换并排序数据List<Map.Entry<String, Double>> sortedData = timeSeriesData.entrySet().stream().map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), convertToDouble(e.getValue()))).sorted(Map.Entry.comparingByKey()).collect(Collectors.toList());List<SurgePoint> results = new ArrayList<>();List<Map.Entry<String, Double>> baselineData = new ArrayList<>();List<Map.Entry<String, Double>> surgeBuffer = new ArrayList<>();for (int i = 0; i < sortedData.size(); i++) {Map.Entry<String, Double> current = sortedData.get(i);// 第一个点作为初始基准if (i == 0) {baselineData.add(current);continue;}// 计算当前基准统计量double mean = calculateMean(baselineData);double stdDev = calculateStdDev(baselineData, mean);double threshold = stdDev > 0 ? thresholdMultiplier * stdDev : thresholdMultiplier * mean;double deviation = current.getValue() - mean;// 突增检测if (deviation > threshold) {surgeBuffer.add(current);} else {// 处理缓冲区的突增点if (!surgeBuffer.isEmpty()) {processSurgeBuffer(surgeBuffer, baselineData, results, thresholdMultiplier, minShowCount);surgeBuffer.clear();}// 当前点不是突增,添加到基准数据baselineData.add(current);// 保持基准数据大小不超过必要范围if (baselineData.size() > baselineSize) {baselineData.remove(0);}}}// 处理最后可能的突增序列if (!surgeBuffer.isEmpty()) {processSurgeBuffer(surgeBuffer, baselineData, results, thresholdMultiplier, minShowCount);}return results;}private static void processSurgeBuffer(List<Map.Entry<String, Double>> surgeBuffer,List<Map.Entry<String, Double>> baselineData,List<SurgePoint> results,double thresholdMultiplier,Integer minShowCount) {if (surgeBuffer.size() == 1 && minShowCount == 1) {// 单点突增Map.Entry<String, Double> point = surgeBuffer.get(0);double mean = calculateMean(baselineData);double surgeRatio = ((point.getValue() - mean) / mean) * 100;if(surgeRatio > 30) {results.add(new SurgePoint(point.getKey(),point.getKey(),point.getValue(),point.getValue(),surgeRatio,false,1,minShowCount));}} else if(surgeBuffer.size() >= minShowCount) {// 连续突增double mean = calculateMean(baselineData);double avgSurgeRatio = surgeBuffer.stream().mapToDouble(e -> ((e.getValue() - mean) / mean) * 100).average().orElse(0);if(avgSurgeRatio > 30) {results.add(new SurgePoint(surgeBuffer.get(0).getKey(),surgeBuffer.get(surgeBuffer.size() - 1).getKey(),surgeBuffer.get(0).getValue(),surgeBuffer.get(surgeBuffer.size() - 1).getValue(),avgSurgeRatio,true,surgeBuffer.size(),minShowCount));}}// 重置基准数据baselineData.clear();}private static double convertToDouble(Object value) {if (value instanceof Number) {return ((Number) value).doubleValue();}try {return Double.parseDouble(value.toString());} catch (Exception e) {return 0.0;}}private static double calculateMean(List<Map.Entry<String, Double>> data) {return data.stream().mapToDouble(Map.Entry::getValue).average().orElse(0);}private static double calculateStdDev(List<Map.Entry<String, Double>> data, double mean) {if (data.size() <= 1) return 0;double variance = data.stream().mapToDouble(e -> Math.pow(e.getValue() - mean, 2)).average().orElse(0);return Math.sqrt(variance);}public static void main(String[] args) {// 示例时间序列数据(包含单点突增和连续突增)Map<String, Object> timeSeriesData = new LinkedHashMap<>();timeSeriesData.put("2023-01-01 10:00", 10.0);timeSeriesData.put("2023-01-01 11:00", 10.0);timeSeriesData.put("2023-01-01 12:00", 50.0);timeSeriesData.put("2023-01-01 13:00", 13.0);timeSeriesData.put("2023-01-01 14:00", 12.0);timeSeriesData.put("2023-01-01 15:00", 10.0);timeSeriesData.put("2023-01-01 16:00", 11.0);timeSeriesData.put("2023-01-01 17:00", 16.0);timeSeriesData.put("2023-01-01 18:00", 100.0);timeSeriesData.put("2023-01-01 19:00", 10.0);timeSeriesData.put("2023-01-01 20:00", 140.0);timeSeriesData.put("2023-01-01 21:00", 110.0);timeSeriesData.put("2023-01-01 22:00", 110.0);timeSeriesData.put("2023-01-01 23:00", 110.0);timeSeriesData.put("2023-01-02 00:00", 60.0);timeSeriesData.put("2023-01-02 01:00", 22.0);timeSeriesData.put("2023-01-02 02:00", 25.0);timeSeriesData.put("2023-01-02 03:00", 28.0);Double threshold = 2.0;Integer miShowCount = 1;Integer baselineSize = 5;List<SurgePoint> surges = detectSurges(timeSeriesData, threshold, miShowCount, baselineSize);// 输出结果System.out.println("时间序列数据分析结果:");if (surges.isEmpty()) {System.out.println("未检测到明显突增点");} else {surges.forEach(System.out::println);}}
}
结果:
时间序列数据分析结果:
单点突增[2023-01-01 12:00]: 值=50.00, 突增比例=400.00%
连续突增[2023-01-01 17:00 至 2023-01-01 18:00]: 起始值=16.00, 结束值=100.00, 突增比例=404.35%, 持续=2次
连续突增[2023-01-01 20:00 至 2023-01-02 00:00]: 起始值=140.00, 结束值=60.00, 突增比例=960.00%, 持续=5次