Spring WebFlux调用生成式AI提供的stream流式接口,实现返回实时对话
接上文在前端实现流式回复的「逐字打印」——完整演进与代码实现,后端调用生成式AI提供流式接口实现返回数据。
一、概要 / 目标
本文档目标:使用 Spring WebFlux 做一个中间代理服务,调用后端生成式 AI 的流式接口(SSE / chunked JSON / NDJSON),把流式数据实时转发给前端,同时:
- 按 chunk 实时显示,并提供“逐字打字机(typewriter)”体验;
-
在流结束时返回一个带自定义字段(如
finished
,elapsedMs
,recordId
)的最终消息,方便日志与临时存储; -
在前端实现稳健的
fetch
流式解析(避免Unexpected end of JSON input
); -
支持把来自内存或输出流的文件以 multipart/form-data 上传给后端(示例给出)。
二、关键设计要点(概览)
-
后端使用
WebClient
单例调用 AI 的流式 endpoint,并把DataBuffer
按 SSE/NDJSON 事件切分、解析、转为Flux<Map<String,Object>>
。 -
在 Controller 层:在
Flux
的doOnNext
中把每个片段拼接到StringBuilder
(用于最终合并),并用concatWith(Mono.fromCallable(...))
在流结束时追加「最终消息」(带finished=true
和elapsedMs
)。 -
前端采用
fetch
+ReadableStream.getReader()
(支持 POST),用一个 缓冲字符串 按双换行分割事件(\r\n\r\n
或\n\n
),保证只在事件完整时JSON.parse()
。 -
逐字显示用「串行队列」(typingQueue),并实现混合策略:前几个 chunk 或短片段逐字,其余直接整段显示;当收到
finished
时快速 flush 剩余队列,避免长文本被逐字慢慢打印耗时。
三、后端(完整可运行示例)
下面给出一个最小但完整的 Spring Boot WebFlux 后端实现(Java)。只需把这些类放入一个 Spring Boot 项目即可运行。
1) pom.xml
(核心依赖)
<project ...><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>webflux-ai-stream</artifactId><version>0.0.1-SNAPSHOT</version><properties><java.version>11</java.version><spring.boot.version>2.7.12</spring.boot.version></properties><dependencies><!-- WebFlux --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- Jackson --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Reactor Netty (comes with webflux) --><!-- logging --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
注意:必须引入
spring-boot-starter-webflux
,否则WebClient.Builder
等 Bean 不会自动注册,并且ServerHttpRequest
等类型与 DispatcherServlet 会冲突(你之前遇到的问题)。
2) DTO:ChatPrompt.java
package com.example.ai;public class ChatPrompt {private String prompt;// 可扩展:temperature, maxTokens, conversationId 等public String getPrompt() { return prompt; }public void setPrompt(String prompt) { this.prompt = prompt; }
}
3) WebClient 配置 WebClientConfig.java
确保单例 builder 且增大内存 buffer:
package com.example.ai;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient.Builder webClientBuilder() {return WebClient.builder();}@Beanpublic WebClient webClient(WebClient.Builder builder) {ExchangeStrategies strategies = ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)).build();return builder.exchangeStrategies(strategies).build();}
}
如果你的项目没有自动注入
WebClient.Builder
,可以直接提供上面webClientBuilder()
。
4) Service:GeminiStreamService.java
(负责把后端流解析成 Flux<Map<String,Object>>
)
实现要点(具体根据不同AI模型的返回的流式响应数据结构做调整):
-
使用
WebClient
发起 POST 到模型的流式 endpoint(示例用/v1/generate/stream
,替换为真实 URL); -
以
DataBuffer
增量读取,维护一个字符串缓存StringBuilder buffer
,按 SSE 事件边界(双换行\r\n\r\n
或\n\n
)切分事件; -
每个事件中可能有多行
data:
,拼接后JSON.parse
,提取candidates[0].content.parts[0].text
等字段;若解析失败,退化为原始文本; -
若遇到服务端特殊结束标志(如
data: [DONE]
或 JSON 中finishReason
/finish_reason
),结束流。
package com.example.ai;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;@Service
public class GeminiStreamService {private final WebClient webClient;private final ObjectMapper mapper = new ObjectMapper();public GeminiStreamService(WebClient webClient) {this.webClient = webClient;}/*** 调用后端模型的流式接口并把它解析成 Flux<Map<String,Object>>(每个 map 包含 role/text 等)*/public Flux<Map<String, Object>> streamFromModel(ChatPrompt prompt) throws IOException {// 构造请求体(按模型 API 要求调整)Map<String, Object> body = new HashMap<>();body.put("prompt", prompt.getPrompt());// 使用 Flux.create 将订阅到 DataBuffer 的增量数据转为逐个 Map 推送return Flux.create(sink -> {StringBuilder buffer = new StringBuilder();AtomicBoolean finished = new AtomicBoolean(false);webClient.post().uri("https://ai-backend.example/v1/generate/stream") // 替换真实 endpoint.header("Authorization", "Bearer YOUR_API_KEY").bodyValue(body).retrieve().bodyToFlux(DataBuffer.class).subscribe(dataBuffer -> {try {int len = dataBuffer.readableByteCount();byte[] bytes = new byte[len];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);String chunk = new String(bytes, StandardCharsets.UTF_8);buffer.append(chunk);// 按事件边界切分(双换行)String all = buffer.toString();String[] events = all.split("\\r?\\n\\r?\\n");// 最后一段有可能是不完整的,保留在 bufferfor (int i = 0; i < events.length - 1; i++) {String ev = events[i].trim();if (ev.isEmpty()) continue;// 处理 eventStringBuilder dataLines = new StringBuilder();for (String line : ev.split("\\r?\\n")) {line = line.trim();if (line.startsWith("data:")) {String d = line.substring(5).trim();if ("[DONE]".equals(d)) {finished.set(true);break;}dataLines.append(d);}}if (finished.get()) {// 直接结束sink.complete();return;}String dataStr = dataLines.toString();if (dataStr.isEmpty()) continue;// 解析 JSON,尽量稳健地取出 texttry {JsonNode node = mapper.readTree(dataStr);JsonNode partText = node.path("candidates").path(0).path("content").path("parts").path(0).path("text");String text = partText.isMissingNode() ? dataStr : partText.asText();String role = node.path("candidates").path(0).path("content").path("role").asText("assistant");Map<String, Object> result = new HashMap<>();result.put("role", role);result.put("text", text);sink.next(result);// 可根据 JSON 的 finishReason 决定结束JsonNode fr = node.path("finishReason");if (!fr.isMissingNode() && "STOP".equalsIgnoreCase(fr.asText())) {sink.complete();return;}} catch (Exception ex) {// 不是 JSON,则作为原始文本返回Map<String, Object> result = new HashMap<>();result.put("role", "assistant");result.put("text", dataStr);sink.next(result);}}// 把剩下的不完整事件放回 bufferbuffer.setLength(0);buffer.append(events[events.length - 1]);} catch (Throwable ex) {sink.error(ex);}}, sink::error, () -> {// 当 upstream 完成,再处理 buffer 中残留数据(若有)String remain = buffer.toString().trim();if (!remain.isEmpty()) {// 尝试处理残留一次try {JsonNode node = mapper.readTree(remain);String text = node.path("candidates").path(0).path("content").path("parts").path(0).path("text").asText(remain);Map<String, Object> result = new HashMap<>();result.put("role", node.path("candidates").path(0).path("content").path("role").asText("assistant"));result.put("text", text);sink.next(result);} catch (Exception ex) {Map<String, Object> result = new HashMap<>();result.put("role", "assistant");result.put("text", remain);sink.next(result);}}sink.complete();});});}
}
说明 & 常见坑
用
split("\\r?\\n\\r?\\n")
来按事件边界切分:SSE 用空行分隔事件,NDJSON 每行为 JSON(你也可以按换行切分);关键是要保留不完整片段在 buffer,等待下次 DataBuffer 拼接完再处理,避免 JSON 被截断导致解析异常。
DataBuffer
读取后记得DataBufferUtils.release(dataBuffer)
释放。根据不同后端协议(SSE vs NDJSON vs chunked plain text)调整解析逻辑:SSE:
data:
前缀;NDJSON:按行 JSON;纯 chunked:每个 chunk 直接为文本。处理结束:支持
data: [DONE]
、或 JSON 中的finishReason: STOP
等作为结束标志。
5) Controller:ChatController.java
(把流转发给前端并在流结束时拼接完整文本与添加自定义属性)
要点:
-
在流的
doOnNext
中拼接到StringBuilder sb
; -
用
concatWith(Mono.fromCallable(...))
在流结束时追加 final message(带finished=true
、elapsedMs
等); -
返回
Flux<ServerSentEvent<Map<String,Object>>>
,produces = MediaType.TEXT_EVENT_STREAM_VALUE
.
package com.example.ai;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;@RestController
public class ChatController {private final GeminiStreamService geminiService;private final Logger log = LoggerFactory.getLogger(ChatController.class);public ChatController(GeminiStreamService geminiService) {this.geminiService = geminiService;}@PostMapping(value = "/api/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<Map<String, Object>>> chatStream(@RequestBody ChatPrompt chatPromptReq) throws IOException {StringBuilder sb = new StringBuilder();long start = System.currentTimeMillis();Flux<Map<String, Object>> modelFlux = geminiService.streamFromModel(chatPromptReq).doOnNext(chunk -> {Object t = chunk.get("text");if (t != null) sb.append(t.toString());}).doOnCancel(() -> log.info("Client cancelled"));// 在流结束后追加最终消息(finished + elapsedMs + 可选 recordId)Flux<Map<String, Object>> finalFlux = modelFlux.concatWith(Mono.fromCallable(() -> {Map<String, Object> finalMsg = new HashMap<>();finalMsg.put("role", "assistant");finalMsg.put("text", sb.toString());finalMsg.put("finished", true);finalMsg.put("elapsedMs", System.currentTimeMillis() - start);finalMsg.put("recordId", null); // 如果你有记录 id 可放这里return finalMsg;}));return finalFlux.map(map -> ServerSentEvent.<Map<String, Object>>builder().event("message").data(map).build()).doOnComplete(() -> {// 此处也可以做持久化:sb.toString() 为完整文本log.info("Full AI reply: {}", sb.toString());// e.g. save to DB or cache...});}
}
说明:
@RequestBody ChatPrompt
(实体)优于@RequestBody Mono
(容易出错的地方是漏写泛型导致 Jackson 试图反序列化Mono
本身)。如果你使用 MVC(
spring-boot-starter-web
)而不是 WebFlux,会报No primary or single unique constructor found for interface ServerHttpRequest
等错误 —— 所以要确保用 WebFlux。
四、前端(fetch 流 + SSE)详解与稳健实现
下面给出两个前端接收方式:
-
A. 使用 fetch + ReadableStream(支持 POST) —— 推荐用于发送 prompt(POST)并读取流;
-
B. 使用 EventSource(SSE,GET) —— 简单,但只能 GET(如果要 POST,需用会话 id + 双端点策略)。
我们重点展示 A(fetch)并给出 EventSource 的对照示例。
A) 前端(Vue 3)示例:稳健解析 + 逐字队列 + 混合展示 + 收尾快速 flush
特点:
使用
reader.read()
增量读取;用
buffer
按事件边界(双换行)进行分割,保证JSON.parse()
只在完整事件时执行,避免Unexpected end of JSON input
;使用
typingQueue
串行逐字打印,保证不并发打印导致乱序;混合策略:前 N 个 chunk 或短 chunk 采用逐字,长 chunk 直接整体渲染;
收到
finished
时快速 flush 未打印队列,避免长文本被慢慢逐字打印浪费时间。
<!-- StreamChat.vue -->
<template><div><button @click="sendPrompt" :disabled="statusDisabled">Send</button><div v-for="(m, idx) in msgList" :key="idx"><div>{{ m.text }}</div></div></div>
</template><script>
export default {data() {return {statusDisabled: false,msgList: [],typingQueue: [],isTyping: false,chunkCount: 0,flushOnFinish: false,};},methods: {refreshChatReply(list, msg) {list.push(msg);},async sendPrompt() {const prompt = { prompt: "你好,给我一个短故事" };await this.streamChat(prompt);},enqueueText(text, meta = {}) {this.chunkCount++;// 混合策略:前2个 chunk 或短文本走打字机const mode = (this.chunkCount <= 2 || text.length < 60) ? "typewriter" : "instant";this.typingQueue.push({ text, mode, meta });if (!this.isTyping) this.processQueue();},processQueue() {if (this.typingQueue.length === 0) {this.isTyping = false;return;}this.isTyping = true;const { text, mode } = this.typingQueue.shift();if (mode === "instant") {this.msgList[this.msgList.length - 1].text += text;// 继续下一个this.processQueue();return;}// typewriter modelet i = 0;const speed = 30; // ms per charconst interval = setInterval(() => {if (i < text.length) {this.msgList[this.msgList.length - 1].text += text.charAt(i);i++;} else {clearInterval(interval);// 如果收到了 finish 标志并且队列里还有项,希望快速 flush 剩余if (this.flushOnFinish) {// 直接把剩余队列立刻追加(避免慢打)while (this.typingQueue.length) {const it = this.typingQueue.shift();this.msgList[this.msgList.length - 1].text += it.text;}this.flushOnFinish = false;this.isTyping = false;return;}this.processQueue();}}, speed);},async streamChat(prompt) {const res = await fetch('/api/chat/stream', {method: 'POST',headers: {'Content-Type': 'application/json','Accept': 'text/event-stream'},body: JSON.stringify(prompt)});this.statusDisabled = true;const reader = res.body.getReader();const decoder = new TextDecoder('utf-8');let buffer = "";let done = false;// add an empty message slot for assistantconst listMsg = { align: "left", text: "", type: "text", time: Date.now() };this.refreshChatReply(this.msgList, listMsg);while (!done) {const { value, done: streamDone } = await reader.read();done = streamDone;if (value) {buffer += decoder.decode(value, { stream: true });// 按 SSE 事件边界分割(双换行)const parts = buffer.split(/\r?\n\r?\n/);buffer = parts.pop(); // 最后一段可能不完整,保留for (const ev of parts) {const trimmed = ev.trim();if (!trimmed) continue;// 获取 data: 行(可能多行)const lines = trimmed.split(/\r?\n/);let dataStr = "";for (let l of lines) {l = l.trim();if (l.startsWith('data:')) {dataStr += l.substring(5).trim();}}if (!dataStr) continue;try {const data = JSON.parse(dataStr);if (data.text) {this.enqueueText(data.text);}if (data.finished) {// 收尾:取消禁用、记录时间、快速 flushthis.statusDisabled = false;this.msgList[this.msgList.length - 1].time = data.time || Date.now();this.msgList[this.msgList.length - 1].elapsedMs = (data.elapsedMs || 0) / 1000;// 收到完成标志后,如果当前正在 typewriter 打字,希望快速把剩余全部展示if (this.isTyping) {this.flushOnFinish = true;} else {// 还没在 typing,则直接把队列快速 drain(防止一直等待小段)while (this.typingQueue.length) {const it = this.typingQueue.shift();this.msgList[this.msgList.length - 1].text += it.text;}}}} catch (e) {console.warn('JSON parse error, ignoring part:', e, dataStr);}}}}// 流结束:如果 buffer 里还有残余(通常不会),尝试解析if (buffer && buffer.trim()) {try {const evt = buffer.trim();const match = evt.match(/data:(.*)/s);if (match) {const data = JSON.parse(match[1].trim());if (data.text) this.enqueueText(data.text);}} catch (e) { /* ignore */ }}// 最终解锁按钮(安全措施)this.statusDisabled = false;}}
}
</script>
要点回顾(前端):
-
一定使用
buffer
+split(/\r?\n\r?\n/)
保证每次JSON.parse
的字符串为完整事件; -
使用
typingQueue
保证一个片段打印完才开始下个,避免并发多个打字机干扰; -
收到
finished
时flushOnFinish=true
,在当前片段结束后快速把剩余队列直接追加,避免长文本慢慢逐字打印浪费时间。
B) 另一种:EventSource(仅 GET请求,无法 POST请求)
若你后端暴露一个基于会话 id 的 GET SSE endpoint,前端可以使用 EventSource,不用处理 chunk 较简单:
const evt = new EventSource('/api/chat/stream?sessionId=abc123');
evt.addEventListener('message', e => {const data = JSON.parse(e.data);if (data.finished) { /* ... */ } else { enqueueText(data.text); }
});
优点:浏览器自动处理重连;缺点:只能 GET,不能 POST 直接携带 body。
五、常见问题与解决方法(回顾我们讨论过的 bug & 解法)
-
Could not autowire. No beans of 'Builder' type found.
-
原因:没有引入
spring-boot-starter-webflux
,导致WebClient.Builder
未注册。 -
解决:在
pom.xml
加spring-boot-starter-webflux
,或手动在配置类中@Bean public WebClient.Builder webClientBuilder()
。
-
-
Cannot construct instance of reactor.core.publisher.Mono
during @RequestBody deserialization-
原因:Controller 的签名写成了
@RequestBody Mono
(无泛型)或让 Jackson 要构造 Mono 本身。 -
解决:要么
@RequestBody ChatPrompt chatPrompt
,要么@RequestBody Mono<ChatPrompt> chatPromptMono
(带泛型)。
-
-
No primary or single unique constructor found for interface ServerHttpRequest
-
原因:在 Spring MVC(Servlet)环境尝试使用 WebFlux 类型
ServerHttpRequest
。 -
解决:使用 WebFlux(
spring-boot-starter-webflux
)或者在 MVC 下使用HttpServletRequest
/SseEmitter
。
-
-
前端
Unexpected end of JSON input
-
原因:直接
JSON.parse
不完整的 chunk(被截断)。 -
解决:使用
buffer
并按事件边界(双换行)切分,只有完整事件才JSON.parse
。
-
-
逐字打印导致后续 chunk 到来时乱序
-
原因:并发启动多个定时器打印多个 chunk。
-
解决:使用串行
typingQueue
,确保当前片段打印完后才开始下一个;并提供混合/加速策略(收到 finished 时 flush)。
-
-
如何识别流结束
-
可能的结束信号:
data: [DONE]
,或 JSON 中的finishReason
/finish_reason
/finishReason: STOP
,或服务端关闭连接(stream 完成)。后端解析器里兼顾几种方式都好。
-
-
如何上传内存/输出流而不是本地文件路径
-
使用
MultipartBodyBuilder
并传InputStreamResource
或ByteArrayResource
(也可用PipedInputStream/PipedOutputStream
做生产者-消费者实时写入)。我之前给过三个示例(ByteArrayResource、InputStreamResource、PipedInputStream 配合 PipedOutputStream)。
-
六、如何测试(curl 与 前端快速验证)
curl(POST SSE):
curl -N -X POST http://localhost:8080/api/chat/stream \-H "Content-Type: application/json" \-d '{"prompt":"你好"}'
-N
表示不缓冲输出(保持流)。
前端(fetch):使用上面 Vue 的 sendPrompt()
。
七、完整代码清单(快速一览)
(已经在上文分别给出 pom.xml
、DTO、WebClientConfig
、GeminiStreamService
、ChatController
、前端组件 StreamChat.vue
。把这些文件放在 Spring Boot 项目并替换 AI 后端 URL & API Key 即可运行。)
八、部署与监控建议(短)
-
对流式连接设置合理的超时与心跳(避免反向代理/负载均衡中断长连接);
-
限制并发流连接数(避免同一服务被 DDoS);对每个用户连接做配额;
-
日志记录:记录
elapsedMs
, token 使用量,取消率(doOnCancel
),异常率; -
指标:Expose Prometheus metrics(连接数、平均延迟、拼接文本长度等)。
我正在做一个开源通用管理系统,可以满足大部分系统的要求进行二次改造,目前已经发布在github上,后面也会继续优化。