Spring Boot 整合 SSE, http长连接
1. 什么是 SSE? (30秒)
SSE (Server-Sent Events) 是一种允许服务器通过 HTTP 连接主动向客户端发送实时更新的技术。
特点:基于 HTTP,使用简单,单向通信(服务器 -> 客户端),自动重连。
对比 WebSocket:WebSocket 是双向的,更复杂;SSE 是单向的,更轻量,适用于通知、日志流、实时数据更新等场景。
2. 核心依赖与配置 (30秒)
Spring Boot 从 2.2.x 版本开始提供了对 SSE 的专用支持,主要包含在 spring-boot-starter-web
中,无需引入额外依赖。
确保你的 pom.xml
中有:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
3. 三步编写代码 (3分钟)
第一步:创建控制器 (Controller)
创建一个 @RestController
,并定义一个方法来产生 SSE 流。
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@RestController
public class SseController {// 用于保存所有连接的 SseEmitter,可以根据用户ID等关键字进行存储private static final Map<String, SseEmitter> EMITTER_MAP = new ConcurrentHashMap<>();/*** 用于客户端连接 SSE* @param clientId 客户端标识,用于区分不同客户端* @return SseEmitter*/@GetMapping(path = "/sse/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@RequestParam String clientId) {// 设置超时时间,0表示永不超时。可以根据需要设置,例如 30_000L (30秒)SseEmitter emitter = new SseEmitter(0L);// 注册回调函数,当连接完成或出错时,从Map中移除这个Emitteremitter.onCompletion(() -> EMITTER_MAP.remove(clientId));emitter.onError((e) -> EMITTER_MAP.remove(clientId));emitter.onTimeout(() -> EMITTER_MAP.remove(clientId));// 将新的 emitter 存入 MapEMITTER_MAP.put(clientId, emitter);// 可选:发送一个初始连接成功的事件try {emitter.send(SseEmitter.event().name("INIT") // 事件名称,可选.data("连接成功 for: " + clientId) // 事件数据.id("1") // 事件ID,可选,用于重连.reconnectTime(5000)); // 重连时间,可选} catch (IOException e) {e.printStackTrace();}return emitter;}
}
第二步:创建发送消息的方法
在同一个 Controller 中,添加一个 API 来模拟向特定客户端发送消息。
/*** 向指定客户端发送消息*/@GetMapping("/sse/send")public String sendMessage(@RequestParam String clientId, @RequestParam String message) {SseEmitter emitter = EMITTER_MAP.get(clientId);if (emitter != null) {try {// 构建并发送事件emitter.send(SseEmitter.event().name("MESSAGE") // 事件类型.data(message) // 事件数据.id("msg-id-" + System.currentTimeMillis())); // ID} catch (IOException e) {// 发送失败,移除 emitterEMITTER_MAP.remove(clientId);return "发送失败,客户端可能已断开";}return "发送成功 to: " + clientId;}return "客户端不存在";}
第三步:编写前端页面进行测试 (1分钟)
在 src/main/resources/static
目录下创建一个 sse-demo.html
文件。
<!DOCTYPE html>
<html>
<head><title>SSE Demo</title>
</head>
<body><h1>SSE 客户端测试</h1><label for="clientId">客户端ID: </label><input type="text" id="clientId" value="test-client-1"><button onclick="connectSSE()">连接SSE</button><button onclick="closeSSE()">断开连接</button><hr><label for="message">要发送的消息: </label><input type="text" id="message" value="Hello SSE!"><button onclick="sendMessage()">发送消息</button><hr><h3>收到的事件:</h3><div id="messages"></div><script>let eventSource;function connectSSE() {const clientId = document.getElementById('clientId').value;// 断开现有连接if (eventSource) {eventSource.close();}// 建立新的 SSE 连接eventSource = new EventSource(`/sse/connect?clientId=${clientId}`);// 监听通用消息(没有指定 event name 的消息)eventSource.onmessage = function (event) {appendMessage(`[message]: ${event.data}`);};// 监听特定名称的事件 (例如:MESSAGE)eventSource.addEventListener("MESSAGE", function (event) {appendMessage(`[MESSAGE]: ${event.data}`);});// 监听特定名称的事件 (例如:INIT)eventSource.addEventListener("INIT", function (event) {appendMessage(`[INIT]: ${event.data}`);});eventSource.onerror = function (err) {console.error("SSE error:", err);appendMessage('[错误] 连接出错');};}function closeSSE() {if (eventSource) {eventSource.close();appendMessage('[信息] 连接已关闭');eventSource = null;}}function sendMessage() {const clientId = document.getElementById('clientId').value;const message = document.getElementById('message').value;fetch(`/sse/send?clientId=${clientId}&message=${encodeURIComponent(message)}`).then(response => response.text()).then(data => console.log(data));}function appendMessage(text) {const messageDiv = document.getElementById('messages');const p = document.createElement('p');p.textContent = `${new Date().toLocaleTimeString()}: ${text}`;messageDiv.appendChild(p);}</script>
</body>
</html>
4. 运行与测试 (1分钟)
启动应用:运行你的 Spring Boot 应用。
打开页面:访问
http://localhost:8080/sse-demo.html
。进行测试:
输入一个客户端 ID(如
user1
),点击 “连接SSE”。前端会收到[INIT]
事件。在另一个浏览器标签页或使用 Postman 访问 :
http://localhost:8080/sse/send?clientId=user1&message=你好!
。观察第一个标签页,会立即收到
[MESSAGE]: 你好!
的消息。
总结
核心对象:
SseEmitter
关键注解:
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
流程:
客户端连接
/sse/connect
,服务端创建并保存SseEmitter
。服务端通过
emitter.send()
主动推送消息。客户端通过
EventSource
API 监听和处理消息。连接结束时,服务端需要清理
SseEmitter
(通过回调函数)。
现在你已经掌握了 Spring Boot 整合 SSE 的基本方法!在实际项目中,你可能需要将其与业务逻辑、身份认证(如 JWT)以及更强大的连接管理(如使用数据库或 Redis 存储 emitter)相结合。