当前位置: 首页 > news >正文

SpringBoot手动实现流式输出方案整理以及SSE规范输出详解

背景:

最近做流式输出时,一直使用python实现的,应需求方的要求,需要通过java应用做一次封装并在java侧完成系统鉴权、模型鉴权等功能后才能真正去调用智能体应用,基于此调研java实现流式输出的几种方式,并完成与python服务对接的方案。

方案:

  • 使用Servlet原生API实现流式输出
  • 使用ResponseBodyEmitter实现异步流式输出
  • 使用SseEmitter实现服务器发送事件(SSE)
  • 使用WebFlux实现响应式流式输出
  • 使用Spring MVC的StreamingResponseBody
  • websockt

说一下我的业务场景,我原本的前后端适配已经按照SSE规范完成了功能,因此新写接口时也采用SSE规范,避免同一个系统中前端出现多种方式的调用,而且我的python微服务采用SSE规范,当时第一反应采用Feign去调用接口返回即可,但是使用后发现Openfeign支持这种调用不友好,因此接口对接这里采用的是WebClient。因此本文着重说一下SSE规范调用

一、SSE是什么

SSE (Server-Sent Events) 是一种基于HTTP的服务器向客户端推送数据的Web技术规范,它允许服务器单向地向客户端发送事件流。以下是SSE规范的全面解析:

1.基本概念

SSE是HTML5标准的一部分,主要特点包括:

  • 单向通信:仅服务器→客户端方向

  • 基于HTTP:使用普通HTTP连接

  • 文本协议:事件以纯文本格式传输

  • 自动重连:内置连接恢复机制

  • 简单易用:比WebSocket更轻量级

2. 协议格式

SSE事件流是一个UTF-8编码的文本流,包含以下字段(每个字段以\n结尾):

event: message\n
id: 123\n
retry: 5000\n
data: {\n
data: "name": "John",\n
data: "age": 30\n
data: }\n\n

  • data: 有效载荷内容(可多行,每行需加"data: "前缀)

  • event: 自定义事件类型(默认"message")

  • id: 事件ID(用于断线重连时定位)

  • retry: 重连时间(毫秒)

服务器响为:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive 

3.客户端API

浏览器端JavaScript使用EventSource接口:

const eventSource = new EventSource('/sse-endpoint');// 监听默认事件
eventSource.onmessage = (e) => {console.log('Message:', e.data);
};// 监听自定义事件
eventSource.addEventListener('customEvent', (e) => {console.log('Custom event:', e.data);
});// 错误处理
eventSource.onerror = (e) => {console.error('SSE error:', e);
};

4.与相关技术的对比

特性SSEWebSocketLong Polling
方向单向(服务器→客户端)双向单向(轮询)
协议HTTPWS/WSSHTTP
连接管理自动重连需手动处理每次请求新建连接
数据格式文本二进制/文本文本
复杂度

5. 适用场景

SSE特别适合:

  • 实时通知(新闻、股价、天气)

  • 日志流监控

  • 进度报告(文件处理、任务执行)

  • 社交媒体动态更新

  • 需要简单实时功能但不需要双向通信的场景

虽然WebSocket更强大,但SSE仍有很多优势:

  • 更简单的实现

  • 自动利用HTTP/2的多路复用

  • 不需要额外的协议升级

  • 被所有现代浏览器支持(IE除外)

二、WebClient ‌

1.概念

WebClient ‌是 Spring Framework 5中引入的一个基于响应式编程模型的 HTTP客户端 ,主要用于执行HTTP请求。相比传统的 RestTemplate ,WebClient采用了 Reactor库 ,支持非阻塞式(异步)调用,能够充分利用多核CPU资源,特别适合高并发场景。

2.与OpenFeign比较

推荐方案:优先使用WebClient + Service分层架构
原因:WebClient原生支持响应式流处理,更适合SSE场景,而OpenFeign更适合普通REST调用

备选方案:使用OpenFeign(需要特殊配置)
注意:需要Spring Cloud 2020.0.3+版本和响应式Feign支持

特性WebClient方案OpenFeign方案
响应式支持✅ 原生支持⚠️ 需要特殊配置
代码复杂度简单较复杂
维护性
性能高(非阻塞IO)中等
连接池管理自动需要手动配置
适合场景高并发流式处理简单接口调用

三、代码实现

1.基础实现

@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

2.业务进阶

2.1 依赖配置

在pom.xml中添加必要依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>

2.2 WebClient配置

  • 使用WebClient创建HTTP客户端,支持响应式流处理

  • 配置第三方SSE接口地址和必要的请求头(如认证信息)

    WebClient配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient webClient() {return WebClient.builder().baseUrl("https://api.example.com").build();}
}

2.3Service层实现

@Service
public class WebClientSseService {@Autowiredprivate WebClient webClient;public Flux<String> streamEvents() {System.out.println("前置校验。。。。");Flux<String> resFlux = null;try{resFlux = webClient.get().uri("/stream").accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(String.class).map(data -> {// 处理原始SSE数据#if (data.startsWith("data:")) {#return data.substring(5).trim();#}return data;});}catch (Exception exception){resFlux = Flux.just("{'status': 'Error', 'message': '"+exception.getMessage()+"'}");}return resFlux;}
}

2.4 Controller

// application-web模块
@RestController
public class DataStreamController {@PostMapping(value = "/stream",consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestBody StreamRequest request) {return dataProcessor.streamEvents(request);}@PostMapping(value = "/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestParam(name = "file", required = false) MultipartFile file, @RequestParam Map<String, Object> jsonObject) {return dataProcessor.streamEvents(file, jsonObject);}
}

解释一下这两个参数:

consumes = MULTIPART_FORM_DATA_VALUE,

produces = TEXT_EVENT_STREAM_VALUE

consumes、produces 两个参数的作用与区别
参数作用示例值
consumes = MULTIPART_FORM_DATA_VALUE声明接口接收的请求内容类型(客户端→服务端)multipart/form-data
produces = TEXT_EVENT_STREAM_VALUE声明接口返回的响应内容类型(服务端→客户端)text/event-stream
为什么需要同时声明?
  1. 输入输出分离原则

    • 输入(consumes):处理文件上传需要 multipart/form-data

    • 输出(produces):SSE流式响应需要 text/event-stream

  2. HTTP协议规范

POST /upload HTTP/1.1
Content-Type: multipart/form-data  ← 对应consumes
Accept: text/event-stream         ← 对应produces
内容类型对照速查表
场景客户端设置服务端声明
文件上传+JSON响应Content-Type: multipart/form-dataconsumes = MULTIPART_FORM_DATA_VALUE
文件上传+SSE流响应Accept: text/event-streamproduces = TEXT_EVENT_STREAM_VALUE
JSON上传+SSE流响应Content-Type: application/jsonconsumes = APPLICATION_JSON_VALUE

根据需要自由选择。

2.5 这里对webclient做个扩展

如果上传的是文件可以用这个方式写body的内容

.contentType(MediaType.MULTIPART_FORM_DATA)

.body(BodyInserters.fromMultipartData(formData))

如果不同的json类型的body请求体可以这么写

.body(BodyInserters.fromValue(res)) 

注意这块的细节,我就是在这里写绕了很多

四、其他方案实现

1. 使用Servlet原生API实现流式输出

@RestController
public class StreamingController {@GetMapping("/stream1")public void stream1(HttpServletResponse response) throws IOException {response.setContentType("text/plain;charset=UTF-8");try (PrintWriter writer = response.getWriter()) {for (int i = 0; i < 100; i++) {writer.write("Data line " + i + "\n");writer.flush(); // 手动刷新缓冲区Thread.sleep(100); // 模拟延迟}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

2. 使用ResponseBodyEmitter实现异步流式输出

@RestController
public class StreamingController {@GetMapping("/stream2")public ResponseBodyEmitter stream2() {ResponseBodyEmitter emitter = new ResponseBodyEmitter();CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {emitter.send("Data line " + i + "\n");Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

3. 使用SseEmitter实现服务器发送事件(SSE)

@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}

4. 使用WebFlux实现响应式流式输出

@RestController
@RequestMapping("/reactive")
public class ReactiveStreamingController {@GetMapping("/stream")public Flux<String> streamData() {return Flux.interval(Duration.ofMillis(100)).map(sequence -> "Reactive data " + sequence + "\n").take(100); // 限制输出数量}@GetMapping(value = "/stream-file", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamLargeFile() {return Flux.using(() -> Files.lines(Paths.get("large-file.txt")),Flux::fromStream,Stream::close);}
}

5. 使用Spring MVC的StreamingResponseBody

@RestController
public class StreamingResponseBodyController {@GetMapping("/stream3")public StreamingResponseBody stream3() {return outputStream -> {Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));for (int i = 0; i < 100; i++) {writer.write("Streaming line " + i + "\n");writer.flush();Thread.sleep(100);}};}
}

相关文章:

  • JavaSE知识总结(集合篇) ~个人笔记以及不断思考~持续更新
  • 学习经验分享【40】目标检测热力图制作
  • [HTML5]快速掌握canvas
  • (Python网络爬虫);抓取B站404页面小漫画
  • 智慧零工平台前端开发实战:从uni-app到跨平台应用
  • uniapp路由跳转toolbar页面
  • 通俗易懂解析:@ComponentScan 与 @MapperScan 的异同与用法
  • Java连接Redis和基础操作命令
  • 微软markitdown PDF/WORD/HTML文档转Markdown格式软件整合包下载
  • GODOT引擎学习日志
  • Gartner《Emerging Patterns for Building LLM-Based AIAgents》学习心得
  • 线程间和进程间是如何进行通信
  • 复变函数 $w = z^2$ 的映射图像演示
  • 端到端的导航技术NeuPAN论文讲解
  • 《AI Agent项目开发实战》DeepSeek R1模型蒸馏入门实战
  • 达梦数据库 Windows 系统安装教程
  • HTML 中 class 属性介绍、用法
  • 【学习笔记】On the Biology of a Large Language Model
  • ffmpeg 的视频格式转换 c# win10
  • 使用免费wordpress成品网站模板需要注意点什么
  • 重庆网站营销靠谱/百度图片收录提交入口
  • 嘉兴专业做网站的公司/网上教育培训机构哪家好
  • 广州企业网站模板建站/镇江优化推广
  • 网站备案失败/seo基础入门视频教程
  • 关于机关单位网站的建设/重庆疫情最新数据
  • 企业网站建设制作多少钱/百度网站提交了多久收录