当前位置: 首页 > news >正文

基于Spring Boot和SSE的实时消息推送系统

一、SSE技术深度解析

1.1 协议工作原理

ClientServerGET /stream (Accept: text/event-stream)HTTP/1.1 200 OKContent-Type: text/event-streamdata: 消息1\\n\\ndata: 消息2\\n\\nClientServer

1.2 与WebSocket对比

特性SSEWebSocket
协议HTTPWS/WSS
方向单向(服务端→客户端)双向
重连自动需手动实现
二进制仅文本支持二进制
复杂度中高

二、Spring Boot服务端实现

2.1 增强型SSE控制器

@RestController
@RequestMapping("/api/sse")
public class EnhancedSseController {private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(4);@GetMapping("/subscribe/{clientId}")public SseEmitter subscribe(@PathVariable String clientId) {SseEmitter emitter = new SseEmitter(30_000L);// 心跳机制ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> sendHeartbeat(emitter),10, 10, TimeUnit.SECONDS);emitter.onCompletion(() -> {heartbeat.cancel(true);emitters.remove(clientId);});emitter.onTimeout(() -> {heartbeat.cancel(true);emitters.remove(clientId);});emitters.put(clientId, emitter);return emitter;}@PostMapping("/broadcast")public void broadcast(@RequestBody MessageDto message) {emitters.forEach((id, emitter) -> {try {emitter.send(SseEmitter.event().id(UUID.randomUUID().toString()).name("message").data(message).reconnectTime(5000L));} catch (IOException e) {emitter.complete();emitters.remove(id);}});}
}

2.2 消息实体设计

public class MessageDto {private MessageType type;private String from;private String content;private Instant timestamp;public enum MessageType {NOTIFICATION, ALERT, SYSTEM}
}

三、前端高级实现

3.1 带重连机制的EventSource

class ResilientEventSource {constructor(url, options = {}) {this.url = url;this.retryDelay = options.retryDelay || 3000;this.maxRetries = options.maxRetries || 5;this.eventHandlers = {};this.connect();}connect() {this.es = new EventSource(this.url);this.retryCount = 0;this.es.onopen = () => {this.retryCount = 0;this.onOpen?.();};this.es.onerror = () => {this.es.close();if (this.retryCount++ < this.maxRetries) {setTimeout(() => this.connect(), this.retryDelay);} else {this.onError?.();}};Object.entries(this.eventHandlers).forEach(([type, handler]) => {this.es.addEventListener(type, handler);});}addEventListener(type, handler) {this.eventHandlers[type] = handler;if (this.es) this.es.addEventListener(type, handler);}
}

3.2 Vue3集成示例

<template><div class="sse-container"><div v-for="msg in messages" :key="msg.id":class="['message', msg.type]"><span class="timestamp">{{ formatTime(msg.timestamp) }}</span><span class="content">{{ msg.content }}</span></div></div>
</template><script setup>
import { ref, onMounted, onUnmounted } from 'vue';const messages = ref([]);
let eventSource;onMounted(() => {eventSource = new ResilientEventSource('/api/sse/subscribe/user123', {retryDelay: 5000,maxRetries: Infinity});eventSource.addEventListener('message', (e) => {messages.value.push(JSON.parse(e.data));});
});onUnmounted(() => {eventSource?.close();
});
</script>

四、生产环境最佳实践

4.1 性能优化方案

  1. 连接池管理:限制最大连接数
  2. 消息批处理:合并短时间内的多个事件
  3. 压缩传输:启用gzip压缩
  4. 负载均衡:Nginx配置SSE支持

4.2 Nginx配置示例

server {location /api/sse {proxy_pass <http://backend>;proxy_http_version 1.1;proxy_set_header Connection '';proxy_buffering off;proxy_cache off;proxy_read_timeout 24h;}
}

五、安全增强措施

5.1 认证授权

@GetMapping("/secure-subscribe")
public SseEmitter secureSubscribe(@RequestHeader("Authorization") String token) {if (!jwtUtil.validateToken(token)) {throw new SecurityException("Invalid token");}String userId = jwtUtil.extractUserId(token);return sseService.subscribe(userId);
}

5.2 消息加密

public String encryptMessage(String raw) {return Base64.getEncoder().encodeToString(aesCipher.doFinal(raw.getBytes(StandardCharsets.UTF_8)));
}

六、监控与运维

6.1 关键监控指标

指标采集方式告警阈值
活跃连接数Micrometer>1000
消息延迟Prometheus>1s
错误率ELK>5%

6.2 健康检查端点

@GetMapping("/health")
public Map<String, Object> health() {return Map.of("status", "UP","connections", emitters.size(),"lastMessage", lastMessageTime);
}

七、扩展应用场景

7.1 实时日志监控

@Bean
public ApplicationListener<LoggingEvent> logListener() {return event -> {if (event.getLevel().isGreaterOrEqual(Level.WARN)) {sseService.broadcast(new MessageDto("SYSTEM",event.getLoggerName(),event.getFormattedMessage()));}};
}

7.2 股票行情推送

@Scheduled(fixedRate = 1_000)
public void pushStockQuotes() {stockService.getLatestQuotes().forEach(quote -> {sseService.sendToUser(quote.getUserId(),new MessageDto("STOCK",quote.getSymbol(),quote.getPrice().toString()));});
}

总结

本文实现的SSE实时消息推送系统具有以下优势:

  1. 高效实时:毫秒级消息延迟
  2. 资源友好:单连接持续复用
  3. 弹性可靠:自动重连机制
  4. 易于扩展:支持水平扩展

在实际应用中建议:

  • 根据业务需求选择合适的消息格式(JSON/Protobuf)
  • 实施完善的监控告警
  • 定期进行压力测试
  • 考虑消息持久化方案

通过合理设计和优化,该方案可支持从中小规模到百万级连接的消息推送场景。

http://www.dtcms.com/a/324740.html

相关文章:

  • 三数之和 Java
  • 人工智能系列(7)人工神经网络中的无监督学习
  • C语言-数组和指针练习题合集(一)
  • C语言深度剖析
  • 网页五子棋测试
  • VUE+SPRINGBOOT从0-1打造前后端-前后台系统-关于我们
  • 2025最新免费的大模型和免费的大模型API有哪些?(202508更新)
  • 秋招春招实习百度笔试百度管培生笔试题库百度非技术岗笔试|笔试解析和攻略|题库分享
  • 冒泡排序实现以及优化
  • WebSocket集群方案解析与实现
  • My APK 安卓版:高效管理手机应用的工具软件
  • windows的cmd命令【持续更新】
  • Linux应用软件编程---文件操作1(fopen、fclose、fgetc/fputc、fgets/fputs)
  • 什么是浏览器标识?
  • 【Docker进阶实战】从多容器编排到集群部署
  • TSF应用开发与运维部署
  • 个人笔记Mybatis2
  • 医学统计(现况调查的统计分析策略1)
  • 电脑使用“碎片整理”程序的作用
  • 基于ECharts的智慧社区数据可视化
  • 【npm、yarn、pnpm】特点对比,按需选择
  • Java设计模式之开闭原则介绍与说明
  • 【RocketMQ 生产者和消费者】- ConsumeMessageOrderlyService 顺序消费消息
  • Vue.js设计于实现 - 概览(二)
  • 跑酷小游戏2.0
  • C语言(长期更新)第10讲:操作符详解(二)
  • 麻溜启动Oracle实例demo
  • 【渲染流水线】[几何阶段]-[归一化NDC]以UnityURP为例
  • 基于Spring Boot和WebSocket的实时聊天系统
  • Openlayers基础教程|从前端框架到GIS开发系列课程(21)geojson实现线要素和区要素