当前位置: 首页 > news >正文

Spring Boot SSE实战:SseEmitter实现多客户端事件广播与心跳保活

1. 添加依赖 (pom.xml)

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

2. 事件服务 (EventService.java) 

import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;@Service
public class EventService {// 线程安全的Emitter存储private final ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();// 心跳调度器private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();// 事件计数器private final AtomicInteger eventCounter = new AtomicInteger(0);public EventService() {// 启动心跳任务 (每25秒发送一次)heartbeatExecutor.scheduleAtFixedRate(this::broadcastHeartbeat, 0, 25, TimeUnit.SECONDS);}// 客户端订阅public SseEmitter subscribe() {String clientId = UUID.randomUUID().toString();SseEmitter emitter = new SseEmitter(60_000L); // 1分钟超时// 注册事件处理器emitter.onCompletion(() -> removeEmitter(clientId));emitter.onTimeout(() -> {removeEmitter(clientId);emitter.complete();});emitter.onError(ex -> removeEmitter(clientId));emitters.put(clientId, emitter);return emitter;}// 广播事件public void broadcast(String eventName, Object data) {emitters.forEach((clientId, emitter) -> {try {emitter.send(SseEmitter.event().id(String.valueOf(eventCounter.incrementAndGet())).name(eventName).data(data));} catch (IOException | IllegalStateException e) {removeEmitter(clientId); // 发送失败则移除}});}// 广播心跳private void broadcastHeartbeat() {emitters.forEach((clientId, emitter) -> {try {emitter.send(SseEmitter.event().comment("heartbeat") // 发送注释类型的心跳);} catch (Exception ignored) {// 心跳失败不移除,等待超时机制处理}});}// 移除客户端private void removeEmitter(String clientId) {SseEmitter emitter = emitters.remove(clientId);if (emitter != null) {emitter.complete();}}// 关闭服务 (资源清理)public void shutdown() {// 1. 停止心跳线程heartbeatExecutor.shutdownNow();// 2. 关闭所有连接emitters.forEach((id, emitter) -> {try {emitter.send(SseEmitter.event().name("system").data(Map.of("action", "shutdown")));} catch (Exception ignored) {} finally {emitter.complete();}});// 3. 清空集合emitters.clear();}
}

3. 控制器 (EventController.java)

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@RestController
@RequestMapping("/events")
public class EventController {private final EventService eventService;public EventController(EventService eventService) {this.eventService = eventService;}// 客户端订阅入口@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter subscribe() {return eventService.subscribe();}// 广播消息入口(这里是模拟消息推送过来,会把该条消息都放入到已订阅的客户端)@PostMapping("/broadcast")public void broadcast(@RequestParam String message) {eventService.broadcast("message", Map.of("content", message,"timestamp", System.currentTimeMillis()));}
}

4. 应用配置 (Application.java)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;@SpringBootApplication
public class SseApplication {public static void main(String[] args) {ApplicationContext context = SpringApplication.run(SseApplication.class, args);// 注册优雅关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(() -> {EventService eventService = context.getBean(EventService.class);eventService.shutdown();System.out.println("SSE资源已清理完成");}));}
}

5. 客户端示例 (JavaScript)

<!DOCTYPE html>
<html>
<body><h1>SSE客户端</h1><div id="messages"></div><script>const messageContainer = document.getElementById('messages');let eventSource;function connect() {eventSource = new EventSource('http://localhost:8080/events');eventSource.addEventListener('message', (e) => {const data = JSON.parse(e.data);addMessage(`消息: ${data.content} [${new Date(data.timestamp).toLocaleTimeString()}]`);});eventSource.addEventListener('system', (e) => {const data = JSON.parse(e.data);if (data.action === 'shutdown') {addMessage('系统通知: 服务即将关闭');eventSource.close();}});eventSource.onerror = (e) => {addMessage('连接错误,3秒后重连...');setTimeout(connect, 3000);};}function addMessage(text) {const p = document.createElement('p');p.textContent = text;messageContainer.appendChild(p);messageContainer.scrollTop = messageContainer.scrollHeight;}// 初始连接connect();</script>
</body>
</html>

关键机制说明

  1. 心跳机制

    • 每25秒发送一次空注释事件 :heartbeat

    • 防止代理或负载均衡器关闭空闲连接

    • 客户端可通过监听所有事件检测心跳

  2. 关闭流程

  3. 客户端重连

    • 使用事件ID支持断线续传

    • 客户端错误时自动重连

    • 服务端关闭时发送系统通知

http://www.dtcms.com/a/298899.html

相关文章:

  • Spring Boot 实战:用 Apache Commons CSV 优雅解析 CSV 文件
  • x86汇编语言入门基础(三)汇编指令篇5 串操作
  • OpenCV学习探秘之一 :了解opencv技术及架构解析、数据结构与内存管理​等基础
  • 技术赋能与营销创新:开源链动2+1模式AI智能名片S2B2C商城小程序的流量转化路径研究
  • 嵌入式硬件篇---zigbee无线串口通信问题解决方法
  • Claude 4.0 终极编程指南:模型对比、API配置与IDE集成实战
  • CMakeLists.txt 怎么写
  • 39.Python 中 list.sort() 与 sorted() 的本质区别与最佳实践
  • 数据库索引详解:原理、设计原则与应用场景
  • NLua和C#交互
  • 6G通感算
  • Spring Boot DFS、HDFS、AI、PyOD、ECOD、Junit、嵌入式实战指南
  • 学习游戏制作记录(剑投掷技能)7.26
  • Kotlin 数据容器 - List 扩展(转换操作、过滤操作、排序操作、分组操作、集合操作、归纳操作、窗口操作)
  • 一款基于react-native harmonyOS 封装的【文档】文件预览查看开源库(基于Harmony 原生文件预览服务进行封装)
  • 【深度之眼机器学习笔记】04-01-决策树简介、熵,04-02-条件熵及计算举例,04-03-信息增益、ID3算法
  • OpenCV图像梯度、边缘检测、轮廓绘制、凸包检测大合集
  • 今天凌晨,字节开源 Coze,如何白嫖?
  • 【Vue2】结合chrome与element-ui的网页端条码打印
  • 使用Spring Boot创建Web项目
  • QT开发---网络编程上
  • 【CTF-WEB-反序列化】利用__toString魔术方法读取flag.php
  • 传统框架与减震楼盖框架地震动力响应分析与有限元模拟
  • USB Type-c
  • 《P3313 [SDOI2014] 旅行》
  • 关于我司即将对商业间谍行为进行法律诉讼的通知
  • C++学习笔记(十:类与对象基础)
  • 洛谷刷题7.25
  • TwinCAT3编程入门1
  • 【Mybatis】分页插件及其原理