当前位置: 首页 > news >正文

基于Spring SSE构建实时监控系统

一、SSE技术基础

1. SSE核心特性

特性说明
协议基于HTTP的单向通信协议
数据格式text/event-stream
重连机制自动重连(默认3秒)
事件类型支持自定义事件类型
浏览器支持除IE外主流浏览器均支持

2. 与WebSocket对比

维度SSEWebSocket
协议方向单向(服务端→客户端)双向
协议复杂度简单(纯HTTP)复杂(独立协议)
断线恢复内置自动恢复需手动实现
数据格式文本为主二进制/文本
适用场景服务端推送场景双向交互场景

二、Spring SSE实现方案

1. 基础控制器实现

@RestController
@RequestMapping("/api/monitor")
public class MonitorController {private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter subscribe(@RequestParam String clientId) {SseEmitter emitter = new SseEmitter(60_000L); // 60秒超时// 注册新的客户端emitters.put(clientId, emitter);// 设置回调emitter.onCompletion(() -> emitters.remove(clientId));emitter.onTimeout(() -> emitters.remove(clientId));emitter.onError(e -> {log.error("SSE error", e);emitters.remove(clientId);});// 发送初始数据sendInitialData(emitter);return emitter;}private void sendInitialData(SseEmitter emitter) {try {emitter.send(SseEmitter.event().name("system-status").data(getCurrentStatus()).reconnectTime(5000L));} catch (IOException e) {emitter.completeWithError(e);}}
}

2. 监控数据推送服务

@Service
@RequiredArgsConstructor
public class MonitorPushService {private final SimpMessagingTemplate messagingTemplate;@Scheduled(fixedRate = 1000)public void pushSystemMetrics() {SystemMetrics metrics = collectMetrics();// 推送给所有SSE客户端messagingTemplate.convertAndSend("/topic/system-metrics", metrics);// 推送给WebSocket客户端messagingTemplate.convertAndSend("/queue/metrics", metrics);}private SystemMetrics collectMetrics() {return new SystemMetrics(ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(),Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(),ManagementFactory.getThreadMXBean().getThreadCount());}
}

三、前端集成方案

1. 原生JavaScript实现

const eventSource = new EventSource('/api/monitor/subscribe?clientId=' + uuidv4());eventSource.addEventListener('system-status', (e) => {const data = JSON.parse(e.data);updateCpuChart(data.cpuUsage);updateMemoryChart(data.memoryUsage);
});eventSource.addEventListener('error', (e) => {if (e.readyState === EventSource.CLOSED) {console.log('Connection closed');} else {console.error('EventSource error:', e);}
});// 自定义重连策略
eventSource.onerror = () => {eventSource.close();setTimeout(() => {new EventSource(eventSource.url);}, 10000);
};

2. React组件封装

import { useEffect, useState } from 'react';const SystemMonitor = () => {const [metrics, setMetrics] = useState({});useEffect(() => {const eventSource = new EventSource('/api/monitor/subscribe');eventSource.onmessage = (e) => {setMetrics(JSON.parse(e.data));};return () => eventSource.close();}, []);return (<div className="monitor-dashboard"><CpuGauge value={metrics.cpuLoad} /><MemoryChart data={metrics.memory} /><ThreadCounter count={metrics.threadCount} /></div>);
};

四、监控数据类型处理

1. 多维度监控数据模型

@Data
@Builder
public class SystemMetrics {// 基础资源private double cpuLoad;private long usedMemory;private long maxMemory;private int threadCount;// JVM信息private long gcCount;private long gcTime;// 应用指标private int activeSessions;private double requestRate;private double errorRate;// 自定义业务指标private Map<String, Object> customMetrics;
}

2. 数据分片推送策略

public void pushDetailedMetrics(SseEmitter emitter, SystemMetrics metrics) {try {// CPU数据单独推送emitter.send(SseEmitter.event().name("cpu-metrics").data(metrics.getCpuLoad()));// 内存数据单独推送emitter.send(SseEmitter.event().name("memory-metrics").data(Map.of("used", metrics.getUsedMemory(),"max", metrics.getMaxMemory())));// 其他指标批量推送emitter.send(SseEmitter.event().name("other-metrics").data(Map.of("threads", metrics.getThreadCount(),"sessions", metrics.getActiveSessions())));} catch (IOException e) {emitter.completeWithError(e);}
}

五、高级功能实现

1. 动态订阅控制

@PostMapping("/subscribe/custom")
public SseEmitter subscribeCustomMetrics(@RequestBody MetricSubscription subscription) {SseEmitter emitter = new SseEmitter();ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {if (!emitter.isComplete()) {Map<String, Object> data = new HashMap<>();subscription.getMetrics().forEach(metric -> {data.put(metric, getMetricValue(metric));});try {emitter.send(data);} catch (IOException e) {emitter.completeWithError(e);}}}, 0, subscription.getInterval(), TimeUnit.SECONDS);emitter.onCompletion(() -> {future.cancel(true);executor.shutdown();});return emitter;
}

2. 历史数据回放

@GetMapping("/replay")
public SseEmitter replayHistoricalData(@RequestParam Instant from,@RequestParam Instant to,@RequestParam(defaultValue = "1") int speed) {SseEmitter emitter = new SseEmitter();List<SystemMetrics> history = metricStore.queryRange(from, to);new Thread(() -> {try {for (SystemMetrics metrics : history) {if (emitter.isComplete()) break;emitter.send(metrics);Thread.sleep(1000 / speed);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}}).start();return emitter;
}

六、安全增强方案

1. 认证集成

@GetMapping("/secure/subscribe")
public SseEmitter secureSubscribe(@AuthenticationPrincipal User user) {SseEmitter emitter = new SseEmitter();if (!user.hasRole("MONITOR_VIEWER")) {emitter.completeWithError(new AccessDeniedException("Forbidden"));return emitter;}// 定期推送该用户有权限查看的指标scheduledExecutor.scheduleAtFixedRate(() -> {try {emitter.send(filterByPermission(user, getMetrics()));} catch (IOException e) {emitter.completeWithError(e);}}, 0, 1, TimeUnit.SECONDS);return emitter;
}

2. 流量控制

@Bean
public WebMvcConfigurer sseConfigurer() {return new WebMvcConfigurer() {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {// 每个IP最大10个SSE连接configurer.registerDeferredResultInterceptors(new SseRateLimitInterceptor(10));}};
}public class SseRateLimitInterceptor implements DeferredResultProcessingInterceptor {private final ConcurrentMap<String, AtomicInteger> ipCounts = new ConcurrentHashMap<>();private final int maxPerIp;@Overridepublic <T> void beforeConcurrentHandling(NativeWebRequest request, DeferredResult<T> deferredResult) {String ip = request.getRemoteAddr();int count = ipCounts.computeIfAbsent(ip, k -> new AtomicInteger()).incrementAndGet();if (count > maxPerIp) {throw new TooManyRequestsException("SSE connection limit exceeded");}deferredResult.onCompletion(() -> {ipCounts.get(ip).decrementAndGet();});}
}

七、性能优化策略

1. 服务端配置优化

server:tomcat:max-threads: 200max-connections: 10000compression:enabled: truemime-types: text/event-streammin-response-size: 1024spring:mvc:async:request-timeout: 60000

2. 客户端优化建议

  1. 指数退避重连:连接失败时逐步增加重试间隔
  2. 数据压缩:启用SSE数据压缩
  3. 按需订阅:只订阅必要的数据类型
  4. 心跳检测:定期检查连接状态
  5. 数据聚合:客户端本地缓存和聚合数据
class SmartEventSource {constructor(url) {this.url = url;this.retryDelay = 1000;this.maxRetryDelay = 60000;this.connect();}connect() {this.es = new EventSource(this.url);this.es.onopen = () => {this.retryDelay = 1000; // 重置重试延迟};this.es.onerror = () => {this.es.close();this.retryDelay = Math.min(this.retryDelay * 2, this.maxRetryDelay);setTimeout(() => this.connect(), this.retryDelay);};}
}

通过Spring SSE构建的实时监控系统,结合上述优化策略和安全方案,可以实现高性能、高可用的监控数据推送服务,满足从基础系统监控到复杂业务指标展示的各类场景需求。

http://www.dtcms.com/a/324977.html

相关文章:

  • Python 的列表 list 和元组 tuple 有啥本质区别?啥时候用谁更合适?
  • TC39x STM(System Timer)学习记录
  • 压力测试等工具源码包编译及使用方法
  • Vulnhub doubletrouble 靶场复现 详细攻略
  • Knuth‘s TwoSum Algorithm 原理详解
  • MyBatis 核心入门:从概念到实战,一篇掌握简单增删改查
  • 【东枫科技】FR3 可扩展测试平台,适用于 6G 研究与卫星通信,高达 1.6 GHz 的带宽
  • 【自动化运维神器Ansible】playbook案例解析:Tags组件实现任务选择性执行
  • 【01】华勤技术股份有限公司——华勤C++笔试,题目记录及解析
  • Java基础-使用反射做一个简易框架
  • Python 实例属性和类属性
  • 【PyTorch】单目标检测项目
  • vulnhub-Drippingblues靶机
  • Typora结合PicGo + 使用Gitee搭建个人免费图床
  • 计算机网络---IP(互联网协议)
  • 2025年6月电子学会全国青少年软件编程等级考试(Python六级)真题及答案
  • 二叉树进阶 之 【二叉搜索树的简介与模拟实现的前提准备】
  • 【杂谈】-智能代理+可观察性:构建下一代复杂系统监控体系
  • UE5多人MOBA+GAS 41、制作一个飞弹,添加准心索敌
  • JS实现数组扁平化
  • 计算二分类误差时的常见错误及解决方案
  • ubuntu22.04+samba
  • VMware 使用 Ubuntu 一段时间后逐渐卡顿、甚至卡死的问题
  • sqli-labs-master/Less-51~Less-61
  • 解读 GPT-5:从“博士级 AI 专家”能力到 OpenAI API Key 获取与实践(提示工程→性能调优全流程)
  • MySQL自增ID与UUID的区别及其在索引分裂中的表现与优化
  • W3D引擎游戏开发----从入门到精通【23】
  • 2013年考研数学(二)真题
  • A#语言详解
  • 相比于传统的全波分析,特征模分析具有哪些优点