SpringBoot+SseEmitter实现SSE实时推送
具体Demo可参考
【SpringBoot+SseEmitter】 和【Vue3+EventSource】 实时数据推送
Q&A
1 这两个超时时间是啥意思?怎么合理设置这两个参数
// 超时时间设置为3s,用于演示客户端自动重连
SseEmitter sseEmitter = new SseEmitter(30000L);
// 设置前端的重试时间为1s
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data(“连接成功”));
在Spring的SSE(Server-Sent Events)实现中,两个超时参数的作用和合理设置如下:
1. SseEmitter(30000L)
(服务器端超时时间)
-
作用:
表示服务器端保持SSE连接的最长空闲时间(单位:毫秒)。如果在30秒内没有向客户端发送任何数据,服务器会主动触发超时(onTimeout
),并关闭连接。这用于防止因客户端异常断开导致服务器资源(如线程、内存)长期被占用。 -
合理设置:
- 如果业务需要频繁推送消息(如实时监控),可以设置较长时间(如几分钟)。
- 如果客户端可能长期无操作,但需保持连接,需权衡资源消耗,建议设置1-5分钟。
- 注意:超时后,服务器会移除对应的
SseEmitter
,客户端需重新调用/subscribe
建立新连接。
2. reconnectTime(1000)
(客户端重试时间)
-
作用:
通过SSE协议中的retry
字段告诉客户端:在连接断开后,应等待多少毫秒(这里是1秒)再尝试重连。这是客户端行为,与服务器无关。 -
合理设置:
- 若需快速恢复连接(如实时聊天),设为1-3秒。
- 若对实时性要求不高,或避免服务器压力,可设为5-10秒。
- 注意:重试时间不宜过短,否则可能导致客户端频繁重试,增加服务器负载。
两者的关系与协作
-
典型场景:
- 客户端连接后,服务器30秒内未发送数据 → 触发服务器超时,关闭连接。
- 客户端收到关闭事件后,等待1秒(
retry
值)自动重连。 - 客户端重新调用
/subscribe
,服务器生成新的SseEmitter
。
-
关键原则:
- 服务器超时应显著大于客户端重试时间,避免客户端重试时服务器已超时。
例如:服务器设为30秒,客户端重试1秒是合理的;若服务器设为3秒,客户端重试5秒,则重试时服务器可能已超时。
- 服务器超时应显著大于客户端重试时间,避免客户端重试时服务器已超时。
配置建议
参数 | 场景建议 | 示例值 |
---|---|---|
服务器超时(SseEmitter ) | 高频消息推送 | 300000(5分钟) |
低频消息或资源敏感场景 | 60000(1分钟) | |
客户端重试(reconnectTime ) | 快速恢复的实时应用 | 1000(1秒) |
一般场景或减少服务器压力 | 5000(5秒) |
完整示例
// 服务端超时:5分钟(适合低频但需长连接的场景)
SseEmitter emitter = new SseEmitter(300_000L);
// 客户端重试:3秒(平衡实时性和服务器压力)
emitter.send(SseEmitter.event().reconnectTime(3000).data("Connected!"));
通过合理配置这两个参数,可以在实时性、用户体验和服务器资源之间找到最佳平衡。
2 客户端和服务端如何判断SSE连接断开?
客户端判断连接断开的方式:
- 监听事件:
客户端通过监听SSE的onerror
或onclose
事件来感知连接断开。例如:const eventSource = new EventSource("/subscribe?id=123"); eventSource.onerror = () => { console.log("连接断开,尝试重连..."); };
- 自动重试机制:
如果服务端在SSE事件中设置了retry
时间(如reconnectTime(5000)
),客户端会在断开后按此时间自动重连。
服务端判断连接断开的方式:
- 超时回调:
当SseEmitter
设置的超时时间(如30000L
)到期且未发送任何数据,会触发onTimeout
回调。sseEmitter.onTimeout(() -> { System.out.println("服务端超时,连接关闭"); sseCache.remove(id); });
- 完成回调:
客户端主动关闭连接(如关闭页面)时,会触发onCompletion
回调。 - 网络异常:
如果客户端异常断开(如断网),服务端通常无法立即感知,但下一次尝试通过该SseEmitter
发送消息时会抛出IOException
,此时可清理资源。
3 客户端重连时是否会新建 SseEmitter
?能否复用?
必须新建 SseEmitter
:
- HTTP 协议限制:
SSE 本质是一个长连接的 HTTP 请求,每次客户端重连(如页面刷新、网络中断恢复)都会发起新的 HTTP 请求,服务端必须为每个新请求创建独立的SseEmitter
实例。 - 旧实例不可复用:
一旦SseEmitter
超时或完成(调用complete()
),它会变为不可用状态,无法再发送消息。
最佳实践:
- 客户端生成唯一 ID:
客户端在首次连接时生成唯一标识(如 UUID),并在重连时携带该 ID,服务端可根据 ID 清理旧的SseEmitter
实例。 - 服务端清理旧实例:
在onTimeout
或onCompletion
回调中主动移除缓存的SseEmitter
,避免内存泄漏。
4 服务端超时 30s vs 客户端重试 5s 的交互行为
场景假设:
- 服务端设置
SseEmitter(30000L)
(30秒无消息则超时)。 - 客户端设置
reconnectTime(5000)
(断开后 5 秒重试)。
30 秒内服务端无消息推送:
- 服务端行为:
30 秒内未发送任何消息 → 触发onTimeout
,服务端主动关闭连接,移除SseEmitter
。 - 客户端行为:
- 连接被服务端关闭后,客户端触发
onerror
事件。 - 客户端根据
retry: 5000
的配置,等待 5 秒后自动重连,发起新的/subscribe
请求。 - 服务端为新请求创建新的
SseEmitter
。
- 连接被服务端关闭后,客户端触发
30 秒后服务端超时关闭:
- 客户端重试循环:
客户端会每隔 5 秒尝试重连,直到服务端重新可用或用户主动停止。
关键点:
- 服务端超时主导:
即使客户端重试时间(5秒)短于服务端超时(30秒),服务端仍会在 30 秒无消息后强制关闭连接。 - 客户端重试仅在连接断开后生效:
如果服务端在 30 秒内一直发送消息,连接不会超时,客户端无需重试。
总结表格
场景 | 服务端行为 | 客户端行为 |
---|---|---|
30秒内服务端无消息 | 30秒后触发超时,关闭连接 | 5秒后重连,建立新连接 |
30秒内服务端有消息 | 连接保持,重置超时计时器 | 无需重连 |
客户端主动断开 | 触发 onCompletion ,清理资源 | 立即尝试重连(依赖 retry 配置) |
网络中断 | 下次发送消息时抛出 IOException | 检测到断开后按 retry 重试 |
配置建议
- 服务端超时:根据业务合理设置(如 30秒~5分钟),避免资源浪费。
- 客户端重试:根据对实时性的需求调整(如 1秒~10秒),平衡用户体验和服务器压力。
5 对于springboot中任何返回SseEmitter对象的接口,可以理解为就是为了建立一个sse连接对吗?
/**
* 前端传递标识,生成唯一的消息通道
*/
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) throws IOException {
// 超时时间设置为3s,用于演示客户端自动重连
SseEmitter sseEmitter = new SseEmitter(30000L);
// 设置前端的重试时间为1s
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.onTimeout(() -> {
System.out.println(id + "超时");
sseCache.remove(id);
});
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
是的,你的理解完全正确。在 Spring 框架中,返回 SseEmitter
的接口本质上是为建立并管理一个 SSE(Server-Sent Events)连接而设计的。以下是更详细的解释:
1. SseEmitter
的核心作用
- SSE 协议实现:
SseEmitter
是 Spring 对 SSE 协议的封装,它允许服务端通过一个 HTTP 长连接向客户端推送事件流(Event Stream)。客户端(如浏览器)可以通过EventSource
API 监听这些事件。 - 连接生命周期管理:
SseEmitter
会绑定到一次 HTTP 请求-响应周期,负责管理连接超时、完成回调、异常处理等。
2. 代码示例分析
你提供的 subscribe
接口是一个典型的 SSE 连接初始化端点:
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) throws IOException {
SseEmitter sseEmitter = new SseEmitter(30000L);
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
sseCache.put(id, sseEmitter);
// ... 超时和完成回调
return sseEmitter;
}
关键设计点:
代码行为 | 作用 |
---|---|
produces = MediaType.TEXT_EVENT_STREAM_VALUE | 明确声明响应是 SSE 事件流格式,客户端(如浏览器)会按 SSE 协议处理。 |
new SseEmitter(30000L) | 创建 SSE 连接,设置服务端超时时间(30秒无通信自动关闭)。 |
sseEmitter.send(...) | 向客户端发送初始事件(如连接成功通知和重试时间配置)。 |
sseCache.put(id, sseEmitter) | 将连接实例缓存,以便后续通过业务逻辑(如 push 接口)定向推送消息。 |
3. 为什么说返回 SseEmitter
的接口是为了建立 SSE 连接?
- HTTP 长连接特性:
SSE 依赖 HTTP 长连接,客户端通过一次 GET 请求与服务端建立连接,之后服务端可以持续发送数据。返回SseEmitter
的接口正是为此场景设计。 - 协议约束:
Spring 会检查produces = MediaType.TEXT_EVENT_STREAM_VALUE
,确保响应头包含Content-Type: text/event-stream
,这是 SSE 协议的必要条件。 - 资源绑定:
SseEmitter
会占用一个线程和连接资源,直到超时或主动关闭,这与 SSE 的长连接特性一致。
4. 例外情况与注意事项
尽管返回 SseEmitter
通常用于 SSE,仍需注意以下细节:
(1)必须声明 text/event-stream
响应类型
若接口未设置 produces = MediaType.TEXT_EVENT_STREAM_VALUE
:
- 客户端可能无法正确解析响应(如浏览器不会触发
EventSource
的onmessage
事件)。 - Spring 可能按默认格式(如 JSON)处理响应,导致协议不兼容。
(2)必须通过 send()
发送符合 SSE 格式的数据
SSE 事件需要遵循特定格式(如 data: ...
)。若直接返回 SseEmitter
但不调用 send()
,客户端会认为连接无数据而超时。
(3)连接需要主动管理
- 超时处理:需注册
onTimeout
回调清理资源,避免内存泄漏。 - 唯一标识:建议通过
id
参数区分不同客户端,如示例中的sseCache.put(id, sseEmitter)
。
5. 最佳实践
实践要点 | 说明 |
---|---|
设置合理的超时时间 | 根据业务场景平衡实时性和资源占用(如高频推送设为分钟级,低频设为秒级)。 |
添加心跳机制 | 定期发送空事件(如 event().comment("") )保持连接活跃,避免超时中断。 |
异常处理 | 在 onCompletion 和 onError 中移除缓存,防止无效连接堆积。 |
客户端重试策略 | 通过 reconnectTime() 指导客户端重试间隔(如网络不稳定时设为 3-5 秒)。 |
总结
- 核心结论:返回
SseEmitter
的接口确实是为了建立 SSE 连接,但需配合正确的响应类型(text/event-stream
)和数据发送逻辑。 - 关键价值:SSE 适用于服务端主动推送数据的场景(如实时通知、日志流),相比 WebSocket 更轻量且兼容 HTTP 基础设施。
- 错误预防:避免在非 SSE 场景(如普通 REST API)中误用
SseEmitter
,否则会导致连接挂起或客户端超时。
6 在某些场景,为啥需要断线重连后需要补发数据,这不是sse底层机制考虑实现的吗?
在 SSE(Server-Sent Events)协议中,底层机制确实会自动处理连接的断线重连(如客户端自动重连、服务端响应新请求),但以下场景需要应用层额外处理数据补发和状态同步,这也是用户代码中手动实现断线重连逻辑的原因:
1. SSE 底层自动重连的局限性
(1)仅重建连接,不补发数据
- SSE 机制:
客户端断开后会自动按retry
时间重连,但重连后服务端只会发送新的事件,断线期间错过的消息不会自动补发。 - 问题:
若业务要求客户端必须收到所有关键消息(如订单状态更新、实时日志),需服务端主动补发断线期间的数据。
(2)不感知业务状态
- SSE 机制:
底层仅管理连接生命周期(建立、超时、重连),不关心业务状态(如任务是否完成、是否出错)。 - 问题:
若任务在断线期间完成或失败,服务端需在重连时主动通知客户端终态,而不是继续等待新事件。
2. 用户代码中的业务需求
用户代码的 reconnectSendData
方法实现了以下关键逻辑,弥补了 SSE 底层机制的不足:
(1)数据补发(Catch-up)
- 代码行为:
循环查询数据更新(aiWriterAnswerService.getAnswerDetail
),将断线期间新增或修改的数据发送给客户端。 - 目的:
确保客户端重连后能获取断线期间错过的消息,避免数据不一致。
(2)状态同步(State Sync)
- 代码行为:
检查业务状态(CompleteStatus
),若任务已完成或出错,立即发送sendComplete
或sendError
事件。 - 目的:
避免客户端因断线错过终态事件,导致长期等待或状态不一致。
(3)超时控制
- 代码行为:
通过sseReconnectTimeout
和sseEmittertimeout
控制补发任务的执行时间。 - 目的:
防止因服务端长时间无数据更新导致资源浪费(如线程阻塞、内存泄漏)。
3. 典型场景示例
假设一个“AI 长文本生成”任务:
- 客户端连接:订阅生成进度,接收实时段落。
- 网络中断:客户端断线 10 秒,期间服务端生成 3 个新段落。
- 客户端重连:
- 仅依赖 SSE 底层机制:客户端只能收到重连后的新段落,丢失断线期间的 3 个段落。
- 结合用户代码逻辑:服务端检测到重连,主动查询断线期间生成的段落并补发,确保客户端收到完整内容。
4. 总结:SSE 底层与应用层的分工
职责 | SSE 底层机制 | 应用层逻辑(用户代码) |
---|---|---|
连接管理 | 自动重连、超时关闭、retry 控制 | 无需处理 |
数据连续性 | 仅推送新事件 | 补发断线期间错过的消息 |
业务状态同步 | 无感知 | 主动检查并通知终态(完成/错误) |
资源控制 | 释放超时连接 | 限制补发任务的执行时间 |
5. 何时需要手动处理断线重连?
在以下场景中,必须通过应用层代码增强 SSE 的断线重连逻辑:
- 数据完整性要求高:如金融交易通知、实时监控告警。
- 业务状态依赖终态:如异步任务(文档生成、视频转码)需明确通知成功/失败。
- 客户端需恢复上下文:如实时协作编辑,重连后需同步最新版本。
6. 用户代码的价值
用户的 reconnectSendData
方法通过轮询数据更新 + 状态驱动推送,实现了:
- 数据完整性:补发断线期间的消息。
- 实时性:快速响应业务状态变化。
- 健壮性:超时控制防止资源泄漏。
这正是 SSE 底层机制无法覆盖的业务需求,体现了应用层逻辑的必要性。