基于Spring Boot和SSE的实时消息推送系统
一、SSE技术深度解析
1.1 协议工作原理
1.2 与WebSocket对比
特性 | SSE | WebSocket |
---|---|---|
协议 | HTTP | WS/WSS |
方向 | 单向(服务端→客户端) | 双向 |
重连 | 自动 | 需手动实现 |
二进制 | 仅文本 | 支持二进制 |
复杂度 | 低 | 中高 |
二、Spring Boot服务端实现
2.1 增强型SSE控制器
@RestController
@RequestMapping("/api/sse")
public class EnhancedSseController {private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(4);@GetMapping("/subscribe/{clientId}")public SseEmitter subscribe(@PathVariable String clientId) {SseEmitter emitter = new SseEmitter(30_000L);// 心跳机制ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> sendHeartbeat(emitter),10, 10, TimeUnit.SECONDS);emitter.onCompletion(() -> {heartbeat.cancel(true);emitters.remove(clientId);});emitter.onTimeout(() -> {heartbeat.cancel(true);emitters.remove(clientId);});emitters.put(clientId, emitter);return emitter;}@PostMapping("/broadcast")public void broadcast(@RequestBody MessageDto message) {emitters.forEach((id, emitter) -> {try {emitter.send(SseEmitter.event().id(UUID.randomUUID().toString()).name("message").data(message).reconnectTime(5000L));} catch (IOException e) {emitter.complete();emitters.remove(id);}});}
}
2.2 消息实体设计
public class MessageDto {private MessageType type;private String from;private String content;private Instant timestamp;public enum MessageType {NOTIFICATION, ALERT, SYSTEM}
}
三、前端高级实现
3.1 带重连机制的EventSource
class ResilientEventSource {constructor(url, options = {}) {this.url = url;this.retryDelay = options.retryDelay || 3000;this.maxRetries = options.maxRetries || 5;this.eventHandlers = {};this.connect();}connect() {this.es = new EventSource(this.url);this.retryCount = 0;this.es.onopen = () => {this.retryCount = 0;this.onOpen?.();};this.es.onerror = () => {this.es.close();if (this.retryCount++ < this.maxRetries) {setTimeout(() => this.connect(), this.retryDelay);} else {this.onError?.();}};Object.entries(this.eventHandlers).forEach(([type, handler]) => {this.es.addEventListener(type, handler);});}addEventListener(type, handler) {this.eventHandlers[type] = handler;if (this.es) this.es.addEventListener(type, handler);}
}
3.2 Vue3集成示例
<template><div class="sse-container"><div v-for="msg in messages" :key="msg.id":class="['message', msg.type]"><span class="timestamp">{{ formatTime(msg.timestamp) }}</span><span class="content">{{ msg.content }}</span></div></div>
</template><script setup>
import { ref, onMounted, onUnmounted } from 'vue';const messages = ref([]);
let eventSource;onMounted(() => {eventSource = new ResilientEventSource('/api/sse/subscribe/user123', {retryDelay: 5000,maxRetries: Infinity});eventSource.addEventListener('message', (e) => {messages.value.push(JSON.parse(e.data));});
});onUnmounted(() => {eventSource?.close();
});
</script>
四、生产环境最佳实践
4.1 性能优化方案
- 连接池管理:限制最大连接数
- 消息批处理:合并短时间内的多个事件
- 压缩传输:启用gzip压缩
- 负载均衡:Nginx配置SSE支持
4.2 Nginx配置示例
server {location /api/sse {proxy_pass <http://backend>;proxy_http_version 1.1;proxy_set_header Connection '';proxy_buffering off;proxy_cache off;proxy_read_timeout 24h;}
}
五、安全增强措施
5.1 认证授权
@GetMapping("/secure-subscribe")
public SseEmitter secureSubscribe(@RequestHeader("Authorization") String token) {if (!jwtUtil.validateToken(token)) {throw new SecurityException("Invalid token");}String userId = jwtUtil.extractUserId(token);return sseService.subscribe(userId);
}
5.2 消息加密
public String encryptMessage(String raw) {return Base64.getEncoder().encodeToString(aesCipher.doFinal(raw.getBytes(StandardCharsets.UTF_8)));
}
六、监控与运维
6.1 关键监控指标
指标 | 采集方式 | 告警阈值 |
---|---|---|
活跃连接数 | Micrometer | >1000 |
消息延迟 | Prometheus | >1s |
错误率 | ELK | >5% |
6.2 健康检查端点
@GetMapping("/health")
public Map<String, Object> health() {return Map.of("status", "UP","connections", emitters.size(),"lastMessage", lastMessageTime);
}
七、扩展应用场景
7.1 实时日志监控
@Bean
public ApplicationListener<LoggingEvent> logListener() {return event -> {if (event.getLevel().isGreaterOrEqual(Level.WARN)) {sseService.broadcast(new MessageDto("SYSTEM",event.getLoggerName(),event.getFormattedMessage()));}};
}
7.2 股票行情推送
@Scheduled(fixedRate = 1_000)
public void pushStockQuotes() {stockService.getLatestQuotes().forEach(quote -> {sseService.sendToUser(quote.getUserId(),new MessageDto("STOCK",quote.getSymbol(),quote.getPrice().toString()));});
}
总结
本文实现的SSE实时消息推送系统具有以下优势:
- 高效实时:毫秒级消息延迟
- 资源友好:单连接持续复用
- 弹性可靠:自动重连机制
- 易于扩展:支持水平扩展
在实际应用中建议:
- 根据业务需求选择合适的消息格式(JSON/Protobuf)
- 实施完善的监控告警
- 定期进行压力测试
- 考虑消息持久化方案
通过合理设计和优化,该方案可支持从中小规模到百万级连接的消息推送场景。