SpringBoot手动实现流式输出方案整理以及SSE规范输出详解
背景:
最近做流式输出时,一直使用python实现的,应需求方的要求,需要通过java应用做一次封装并在java侧完成系统鉴权、模型鉴权等功能后才能真正去调用智能体应用,基于此调研java实现流式输出的几种方式,并完成与python服务对接的方案。
方案:
- 使用Servlet原生API实现流式输出
- 使用ResponseBodyEmitter实现异步流式输出
- 使用SseEmitter实现服务器发送事件(SSE)
- 使用WebFlux实现响应式流式输出
- 使用Spring MVC的StreamingResponseBody
- websockt
说一下我的业务场景,我原本的前后端适配已经按照SSE规范完成了功能,因此新写接口时也采用SSE规范,避免同一个系统中前端出现多种方式的调用,而且我的python微服务采用SSE规范,当时第一反应采用Feign去调用接口返回即可,但是使用后发现Openfeign支持这种调用不友好,因此接口对接这里采用的是WebClient。因此本文着重说一下SSE规范调用
一、SSE是什么
SSE (Server-Sent Events) 是一种基于HTTP的服务器向客户端推送数据的Web技术规范,它允许服务器单向地向客户端发送事件流。以下是SSE规范的全面解析:
1.基本概念
SSE是HTML5标准的一部分,主要特点包括:
-
单向通信:仅服务器→客户端方向
-
基于HTTP:使用普通HTTP连接
-
文本协议:事件以纯文本格式传输
-
自动重连:内置连接恢复机制
-
简单易用:比WebSocket更轻量级
2. 协议格式
SSE事件流是一个UTF-8编码的文本流,包含以下字段(每个字段以\n
结尾):
event: message\n
id: 123\n
retry: 5000\n
data: {\n
data: "name": "John",\n
data: "age": 30\n
data: }\n\n
data
: 有效载荷内容(可多行,每行需加"data: "前缀)
event
: 自定义事件类型(默认"message")
id
: 事件ID(用于断线重连时定位)
retry
: 重连时间(毫秒)
服务器响为:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
3.客户端API
浏览器端JavaScript使用EventSource
接口:
const eventSource = new EventSource('/sse-endpoint');// 监听默认事件
eventSource.onmessage = (e) => {console.log('Message:', e.data);
};// 监听自定义事件
eventSource.addEventListener('customEvent', (e) => {console.log('Custom event:', e.data);
});// 错误处理
eventSource.onerror = (e) => {console.error('SSE error:', e);
};
4.与相关技术的对比
特性 | SSE | WebSocket | Long Polling |
---|---|---|---|
方向 | 单向(服务器→客户端) | 双向 | 单向(轮询) |
协议 | HTTP | WS/WSS | HTTP |
连接管理 | 自动重连 | 需手动处理 | 每次请求新建连接 |
数据格式 | 文本 | 二进制/文本 | 文本 |
复杂度 | 低 | 中 | 低 |
5. 适用场景
SSE特别适合:
-
实时通知(新闻、股价、天气)
-
日志流监控
-
进度报告(文件处理、任务执行)
-
社交媒体动态更新
-
需要简单实时功能但不需要双向通信的场景
虽然WebSocket更强大,但SSE仍有很多优势:
-
更简单的实现
-
自动利用HTTP/2的多路复用
-
不需要额外的协议升级
-
被所有现代浏览器支持(IE除外)
二、WebClient
1.概念
WebClient 是 Spring Framework 5中引入的一个基于响应式编程模型的 HTTP客户端 ,主要用于执行HTTP请求。相比传统的 RestTemplate ,WebClient采用了 Reactor库 ,支持非阻塞式(异步)调用,能够充分利用多核CPU资源,特别适合高并发场景。
2.与OpenFeign比较
推荐方案:优先使用WebClient + Service分层架构
原因:WebClient原生支持响应式流处理,更适合SSE场景,而OpenFeign更适合普通REST调用
备选方案:使用OpenFeign(需要特殊配置)
注意:需要Spring Cloud 2020.0.3+版本和响应式Feign支持
特性 | WebClient方案 | OpenFeign方案 |
---|---|---|
响应式支持 | ✅ 原生支持 | ⚠️ 需要特殊配置 |
代码复杂度 | 简单 | 较复杂 |
维护性 | 高 | 中 |
性能 | 高(非阻塞IO) | 中等 |
连接池管理 | 自动 | 需要手动配置 |
适合场景 | 高并发流式处理 | 简单接口调用 |
三、代码实现
1.基础实现
@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}
2.业务进阶
2.1 依赖配置:
在pom.xml中添加必要依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.2 WebClient配置:
-
使用
WebClient
创建HTTP客户端,支持响应式流处理 -
配置第三方SSE接口地址和必要的请求头(如认证信息)
WebClient配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient webClient() {return WebClient.builder().baseUrl("https://api.example.com").build();}
}
2.3Service层实现
@Service
public class WebClientSseService {@Autowiredprivate WebClient webClient;public Flux<String> streamEvents() {System.out.println("前置校验。。。。");Flux<String> resFlux = null;try{resFlux = webClient.get().uri("/stream").accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(String.class).map(data -> {// 处理原始SSE数据#if (data.startsWith("data:")) {#return data.substring(5).trim();#}return data;});}catch (Exception exception){resFlux = Flux.just("{'status': 'Error', 'message': '"+exception.getMessage()+"'}");}return resFlux;}
}
2.4 Controller
// application-web模块
@RestController
public class DataStreamController {@PostMapping(value = "/stream",consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestBody StreamRequest request) {return dataProcessor.streamEvents(request);}@PostMapping(value = "/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestParam(name = "file", required = false) MultipartFile file, @RequestParam Map<String, Object> jsonObject) {return dataProcessor.streamEvents(file, jsonObject);}
}
解释一下这两个参数:
consumes = MULTIPART_FORM_DATA_VALUE,
produces = TEXT_EVENT_STREAM_VALUE
consumes、produces 两个参数的作用与区别
参数 | 作用 | 示例值 |
---|---|---|
consumes = MULTIPART_FORM_DATA_VALUE | 声明接口接收的请求内容类型(客户端→服务端) | multipart/form-data |
produces = TEXT_EVENT_STREAM_VALUE | 声明接口返回的响应内容类型(服务端→客户端) | text/event-stream |
为什么需要同时声明?
-
输入输出分离原则:
-
输入(consumes):处理文件上传需要
multipart/form-data
-
输出(produces):SSE流式响应需要
text/event-stream
-
-
HTTP协议规范:
POST /upload HTTP/1.1
Content-Type: multipart/form-data ← 对应consumes
Accept: text/event-stream ← 对应produces
内容类型对照速查表
场景 | 客户端设置 | 服务端声明 |
---|---|---|
文件上传+JSON响应 | Content-Type: multipart/form-data | consumes = MULTIPART_FORM_DATA_VALUE |
文件上传+SSE流响应 | Accept: text/event-stream | produces = TEXT_EVENT_STREAM_VALUE |
JSON上传+SSE流响应 | Content-Type: application/json | consumes = APPLICATION_JSON_VALUE |
根据需要自由选择。
2.5 这里对webclient做个扩展
如果上传的是文件可以用这个方式写body的内容
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(formData))
如果不同的json类型的body请求体可以这么写
.body(BodyInserters.fromValue(res))
注意这块的细节,我就是在这里写绕了很多
四、其他方案实现
1. 使用Servlet原生API实现流式输出
@RestController
public class StreamingController {@GetMapping("/stream1")public void stream1(HttpServletResponse response) throws IOException {response.setContentType("text/plain;charset=UTF-8");try (PrintWriter writer = response.getWriter()) {for (int i = 0; i < 100; i++) {writer.write("Data line " + i + "\n");writer.flush(); // 手动刷新缓冲区Thread.sleep(100); // 模拟延迟}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
2. 使用ResponseBodyEmitter实现异步流式输出
@RestController
public class StreamingController {@GetMapping("/stream2")public ResponseBodyEmitter stream2() {ResponseBodyEmitter emitter = new ResponseBodyEmitter();CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {emitter.send("Data line " + i + "\n");Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}
3. 使用SseEmitter实现服务器发送事件(SSE)
@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}
4. 使用WebFlux实现响应式流式输出
@RestController
@RequestMapping("/reactive")
public class ReactiveStreamingController {@GetMapping("/stream")public Flux<String> streamData() {return Flux.interval(Duration.ofMillis(100)).map(sequence -> "Reactive data " + sequence + "\n").take(100); // 限制输出数量}@GetMapping(value = "/stream-file", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamLargeFile() {return Flux.using(() -> Files.lines(Paths.get("large-file.txt")),Flux::fromStream,Stream::close);}
}
5. 使用Spring MVC的StreamingResponseBody
@RestController
public class StreamingResponseBodyController {@GetMapping("/stream3")public StreamingResponseBody stream3() {return outputStream -> {Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));for (int i = 0; i < 100; i++) {writer.write("Streaming line " + i + "\n");writer.flush();Thread.sleep(100);}};}
}