当前位置: 首页 > news >正文

Spring WebFlux调用生成式AI提供的stream流式接口,实现返回实时对话

接上文在前端实现流式回复的「逐字打印」——完整演进与代码实现,后端调用生成式AI提供流式接口实现返回数据。

一、概要 / 目标

本文档目标:使用 Spring WebFlux 做一个中间代理服务,调用后端生成式 AI 的流式接口(SSE / chunked JSON / NDJSON),把流式数据实时转发给前端,同时:

  • 按 chunk 实时显示,并提供“逐字打字机(typewriter)”体验;
  • 在流结束时返回一个带自定义字段(如 finished, elapsedMs, recordId)的最终消息,方便日志与临时存储;

  • 在前端实现稳健的 fetch 流式解析(避免 Unexpected end of JSON input);

  • 支持把来自内存或输出流的文件以 multipart/form-data 上传给后端(示例给出)。

二、关键设计要点(概览)

  • 后端使用 WebClient 单例调用 AI 的流式 endpoint,并把 DataBuffer 按 SSE/NDJSON 事件切分、解析、转为 Flux<Map<String,Object>>

  • 在 Controller 层:在 FluxdoOnNext 中把每个片段拼接到 StringBuilder(用于最终合并),并用 concatWith(Mono.fromCallable(...)) 在流结束时追加「最终消息」(带 finished=trueelapsedMs)。

  • 前端采用 fetch + ReadableStream.getReader()(支持 POST),用一个 缓冲字符串 按双换行分割事件(\r\n\r\n\n\n),保证只在事件完整时 JSON.parse()

  • 逐字显示用「串行队列」(typingQueue),并实现混合策略:前几个 chunk 或短片段逐字,其余直接整段显示;当收到 finished 时快速 flush 剩余队列,避免长文本被逐字慢慢打印耗时。


三、后端(完整可运行示例)

下面给出一个最小但完整的 Spring Boot WebFlux 后端实现(Java)。只需把这些类放入一个 Spring Boot 项目即可运行

1) pom.xml(核心依赖)

<project ...><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>webflux-ai-stream</artifactId><version>0.0.1-SNAPSHOT</version><properties><java.version>11</java.version><spring.boot.version>2.7.12</spring.boot.version></properties><dependencies><!-- WebFlux --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!-- Jackson --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Reactor Netty (comes with webflux) --><!-- logging --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

注意:必须引入 spring-boot-starter-webflux,否则 WebClient.Builder 等 Bean 不会自动注册,并且 ServerHttpRequest 等类型与 DispatcherServlet 会冲突(你之前遇到的问题)。


2) DTO:ChatPrompt.java

package com.example.ai;public class ChatPrompt {private String prompt;// 可扩展:temperature, maxTokens, conversationId 等public String getPrompt() { return prompt; }public void setPrompt(String prompt) { this.prompt = prompt; }
}

3) WebClient 配置 WebClientConfig.java

确保单例 builder 且增大内存 buffer:

package com.example.ai;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient.Builder webClientBuilder() {return WebClient.builder();}@Beanpublic WebClient webClient(WebClient.Builder builder) {ExchangeStrategies strategies = ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(16 * 1024 * 1024)).build();return builder.exchangeStrategies(strategies).build();}
}

如果你的项目没有自动注入 WebClient.Builder,可以直接提供上面 webClientBuilder()


4) Service:GeminiStreamService.java(负责把后端流解析成 Flux<Map<String,Object>>

实现要点(具体根据不同AI模型的返回的流式响应数据结构做调整):

  • 使用 WebClient 发起 POST 到模型的流式 endpoint(示例用 /v1/generate/stream,替换为真实 URL);

  • DataBuffer 增量读取,维护一个字符串缓存 StringBuilder buffer,按 SSE 事件边界(双换行 \r\n\r\n\n\n)切分事件;

  • 每个事件中可能有多行 data:,拼接后 JSON.parse,提取 candidates[0].content.parts[0].text 等字段;若解析失败,退化为原始文本;

  • 若遇到服务端特殊结束标志(如 data: [DONE] 或 JSON 中 finishReason/finish_reason),结束流。

package com.example.ai;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;@Service
public class GeminiStreamService {private final WebClient webClient;private final ObjectMapper mapper = new ObjectMapper();public GeminiStreamService(WebClient webClient) {this.webClient = webClient;}/*** 调用后端模型的流式接口并把它解析成 Flux<Map<String,Object>>(每个 map 包含 role/text 等)*/public Flux<Map<String, Object>> streamFromModel(ChatPrompt prompt) throws IOException {// 构造请求体(按模型 API 要求调整)Map<String, Object> body = new HashMap<>();body.put("prompt", prompt.getPrompt());// 使用 Flux.create 将订阅到 DataBuffer 的增量数据转为逐个 Map 推送return Flux.create(sink -> {StringBuilder buffer = new StringBuilder();AtomicBoolean finished = new AtomicBoolean(false);webClient.post().uri("https://ai-backend.example/v1/generate/stream") // 替换真实 endpoint.header("Authorization", "Bearer YOUR_API_KEY").bodyValue(body).retrieve().bodyToFlux(DataBuffer.class).subscribe(dataBuffer -> {try {int len = dataBuffer.readableByteCount();byte[] bytes = new byte[len];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);String chunk = new String(bytes, StandardCharsets.UTF_8);buffer.append(chunk);// 按事件边界切分(双换行)String all = buffer.toString();String[] events = all.split("\\r?\\n\\r?\\n");// 最后一段有可能是不完整的,保留在 bufferfor (int i = 0; i < events.length - 1; i++) {String ev = events[i].trim();if (ev.isEmpty()) continue;// 处理 eventStringBuilder dataLines = new StringBuilder();for (String line : ev.split("\\r?\\n")) {line = line.trim();if (line.startsWith("data:")) {String d = line.substring(5).trim();if ("[DONE]".equals(d)) {finished.set(true);break;}dataLines.append(d);}}if (finished.get()) {// 直接结束sink.complete();return;}String dataStr = dataLines.toString();if (dataStr.isEmpty()) continue;// 解析 JSON,尽量稳健地取出 texttry {JsonNode node = mapper.readTree(dataStr);JsonNode partText = node.path("candidates").path(0).path("content").path("parts").path(0).path("text");String text = partText.isMissingNode() ? dataStr : partText.asText();String role = node.path("candidates").path(0).path("content").path("role").asText("assistant");Map<String, Object> result = new HashMap<>();result.put("role", role);result.put("text", text);sink.next(result);// 可根据 JSON 的 finishReason 决定结束JsonNode fr = node.path("finishReason");if (!fr.isMissingNode() && "STOP".equalsIgnoreCase(fr.asText())) {sink.complete();return;}} catch (Exception ex) {// 不是 JSON,则作为原始文本返回Map<String, Object> result = new HashMap<>();result.put("role", "assistant");result.put("text", dataStr);sink.next(result);}}// 把剩下的不完整事件放回 bufferbuffer.setLength(0);buffer.append(events[events.length - 1]);} catch (Throwable ex) {sink.error(ex);}}, sink::error, () -> {// 当 upstream 完成,再处理 buffer 中残留数据(若有)String remain = buffer.toString().trim();if (!remain.isEmpty()) {// 尝试处理残留一次try {JsonNode node = mapper.readTree(remain);String text = node.path("candidates").path(0).path("content").path("parts").path(0).path("text").asText(remain);Map<String, Object> result = new HashMap<>();result.put("role", node.path("candidates").path(0).path("content").path("role").asText("assistant"));result.put("text", text);sink.next(result);} catch (Exception ex) {Map<String, Object> result = new HashMap<>();result.put("role", "assistant");result.put("text", remain);sink.next(result);}}sink.complete();});});}
}

说明 & 常见坑

  • split("\\r?\\n\\r?\\n") 来按事件边界切分:SSE 用空行分隔事件,NDJSON 每行为 JSON(你也可以按换行切分);关键是要保留不完整片段在 buffer,等待下次 DataBuffer 拼接完再处理,避免 JSON 被截断导致解析异常。

  • DataBuffer 读取后记得 DataBufferUtils.release(dataBuffer) 释放。

  • 根据不同后端协议(SSE vs NDJSON vs chunked plain text)调整解析逻辑:SSE:data: 前缀;NDJSON:按行 JSON;纯 chunked:每个 chunk 直接为文本。

  • 处理结束:支持 data: [DONE]、或 JSON 中的 finishReason: STOP 等作为结束标志。


5) Controller:ChatController.java(把流转发给前端并在流结束时拼接完整文本与添加自定义属性)

要点:

  • 在流的 doOnNext 中拼接到 StringBuilder sb

  • concatWith(Mono.fromCallable(...)) 在流结束时追加 final message(带 finished=trueelapsedMs 等);

  • 返回 Flux<ServerSentEvent<Map<String,Object>>>produces = MediaType.TEXT_EVENT_STREAM_VALUE.

package com.example.ai;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;@RestController
public class ChatController {private final GeminiStreamService geminiService;private final Logger log = LoggerFactory.getLogger(ChatController.class);public ChatController(GeminiStreamService geminiService) {this.geminiService = geminiService;}@PostMapping(value = "/api/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<Map<String, Object>>> chatStream(@RequestBody ChatPrompt chatPromptReq) throws IOException {StringBuilder sb = new StringBuilder();long start = System.currentTimeMillis();Flux<Map<String, Object>> modelFlux = geminiService.streamFromModel(chatPromptReq).doOnNext(chunk -> {Object t = chunk.get("text");if (t != null) sb.append(t.toString());}).doOnCancel(() -> log.info("Client cancelled"));// 在流结束后追加最终消息(finished + elapsedMs + 可选 recordId)Flux<Map<String, Object>> finalFlux = modelFlux.concatWith(Mono.fromCallable(() -> {Map<String, Object> finalMsg = new HashMap<>();finalMsg.put("role", "assistant");finalMsg.put("text", sb.toString());finalMsg.put("finished", true);finalMsg.put("elapsedMs", System.currentTimeMillis() - start);finalMsg.put("recordId", null); // 如果你有记录 id 可放这里return finalMsg;}));return finalFlux.map(map -> ServerSentEvent.<Map<String, Object>>builder().event("message").data(map).build()).doOnComplete(() -> {// 此处也可以做持久化:sb.toString() 为完整文本log.info("Full AI reply: {}", sb.toString());// e.g. save to DB or cache...});}
}

说明

  • @RequestBody ChatPrompt(实体)优于 @RequestBody Mono(容易出错的地方是漏写泛型导致 Jackson 试图反序列化 Mono 本身)。

  • 如果你使用 MVC(spring-boot-starter-web)而不是 WebFlux,会报 No primary or single unique constructor found for interface ServerHttpRequest 等错误 —— 所以要确保用 WebFlux。


四、前端(fetch 流 + SSE)详解与稳健实现

下面给出两个前端接收方式:

  • A. 使用 fetch + ReadableStream(支持 POST) —— 推荐用于发送 prompt(POST)并读取流;

  • B. 使用 EventSource(SSE,GET) —— 简单,但只能 GET(如果要 POST,需用会话 id + 双端点策略)。

我们重点展示 A(fetch)并给出 EventSource 的对照示例。


A) 前端(Vue 3)示例:稳健解析 + 逐字队列 + 混合展示 + 收尾快速 flush

特点:

  • 使用 reader.read() 增量读取;

  • buffer 按事件边界(双换行)进行分割,保证 JSON.parse() 只在完整事件时执行,避免 Unexpected end of JSON input

  • 使用 typingQueue 串行逐字打印,保证不并发打印导致乱序;

  • 混合策略:前 N 个 chunk 或短 chunk 采用逐字,长 chunk 直接整体渲染;

  • 收到 finished 时快速 flush 未打印队列,避免长文本被慢慢逐字打印浪费时间。

<!-- StreamChat.vue -->
<template><div><button @click="sendPrompt" :disabled="statusDisabled">Send</button><div v-for="(m, idx) in msgList" :key="idx"><div>{{ m.text }}</div></div></div>
</template><script>
export default {data() {return {statusDisabled: false,msgList: [],typingQueue: [],isTyping: false,chunkCount: 0,flushOnFinish: false,};},methods: {refreshChatReply(list, msg) {list.push(msg);},async sendPrompt() {const prompt = { prompt: "你好,给我一个短故事" };await this.streamChat(prompt);},enqueueText(text, meta = {}) {this.chunkCount++;// 混合策略:前2个 chunk 或短文本走打字机const mode = (this.chunkCount <= 2 || text.length < 60) ? "typewriter" : "instant";this.typingQueue.push({ text, mode, meta });if (!this.isTyping) this.processQueue();},processQueue() {if (this.typingQueue.length === 0) {this.isTyping = false;return;}this.isTyping = true;const { text, mode } = this.typingQueue.shift();if (mode === "instant") {this.msgList[this.msgList.length - 1].text += text;// 继续下一个this.processQueue();return;}// typewriter modelet i = 0;const speed = 30; // ms per charconst interval = setInterval(() => {if (i < text.length) {this.msgList[this.msgList.length - 1].text += text.charAt(i);i++;} else {clearInterval(interval);// 如果收到了 finish 标志并且队列里还有项,希望快速 flush 剩余if (this.flushOnFinish) {// 直接把剩余队列立刻追加(避免慢打)while (this.typingQueue.length) {const it = this.typingQueue.shift();this.msgList[this.msgList.length - 1].text += it.text;}this.flushOnFinish = false;this.isTyping = false;return;}this.processQueue();}}, speed);},async streamChat(prompt) {const res = await fetch('/api/chat/stream', {method: 'POST',headers: {'Content-Type': 'application/json','Accept': 'text/event-stream'},body: JSON.stringify(prompt)});this.statusDisabled = true;const reader = res.body.getReader();const decoder = new TextDecoder('utf-8');let buffer = "";let done = false;// add an empty message slot for assistantconst listMsg = { align: "left", text: "", type: "text", time: Date.now() };this.refreshChatReply(this.msgList, listMsg);while (!done) {const { value, done: streamDone } = await reader.read();done = streamDone;if (value) {buffer += decoder.decode(value, { stream: true });// 按 SSE 事件边界分割(双换行)const parts = buffer.split(/\r?\n\r?\n/);buffer = parts.pop(); // 最后一段可能不完整,保留for (const ev of parts) {const trimmed = ev.trim();if (!trimmed) continue;// 获取 data: 行(可能多行)const lines = trimmed.split(/\r?\n/);let dataStr = "";for (let l of lines) {l = l.trim();if (l.startsWith('data:')) {dataStr += l.substring(5).trim();}}if (!dataStr) continue;try {const data = JSON.parse(dataStr);if (data.text) {this.enqueueText(data.text);}if (data.finished) {// 收尾:取消禁用、记录时间、快速 flushthis.statusDisabled = false;this.msgList[this.msgList.length - 1].time = data.time || Date.now();this.msgList[this.msgList.length - 1].elapsedMs = (data.elapsedMs || 0) / 1000;// 收到完成标志后,如果当前正在 typewriter 打字,希望快速把剩余全部展示if (this.isTyping) {this.flushOnFinish = true;} else {// 还没在 typing,则直接把队列快速 drain(防止一直等待小段)while (this.typingQueue.length) {const it = this.typingQueue.shift();this.msgList[this.msgList.length - 1].text += it.text;}}}} catch (e) {console.warn('JSON parse error, ignoring part:', e, dataStr);}}}}// 流结束:如果 buffer 里还有残余(通常不会),尝试解析if (buffer && buffer.trim()) {try {const evt = buffer.trim();const match = evt.match(/data:(.*)/s);if (match) {const data = JSON.parse(match[1].trim());if (data.text) this.enqueueText(data.text);}} catch (e) { /* ignore */ }}// 最终解锁按钮(安全措施)this.statusDisabled = false;}}
}
</script>

要点回顾(前端)

  • 一定使用 buffer + split(/\r?\n\r?\n/) 保证每次 JSON.parse 的字符串为完整事件;

  • 使用 typingQueue 保证一个片段打印完才开始下个,避免并发多个打字机干扰;

  • 收到 finishedflushOnFinish=true,在当前片段结束后快速把剩余队列直接追加,避免长文本慢慢逐字打印浪费时间。


B) 另一种:EventSource(仅 GET请求,无法 POST请求)

若你后端暴露一个基于会话 id 的 GET SSE endpoint,前端可以使用 EventSource,不用处理 chunk 较简单:

const evt = new EventSource('/api/chat/stream?sessionId=abc123');
evt.addEventListener('message', e => {const data = JSON.parse(e.data);if (data.finished) { /* ... */ } else { enqueueText(data.text); }
});

优点:浏览器自动处理重连;缺点:只能 GET,不能 POST 直接携带 body。


五、常见问题与解决方法(回顾我们讨论过的 bug & 解法)

  1. Could not autowire. No beans of 'Builder' type found.

    • 原因:没有引入 spring-boot-starter-webflux,导致 WebClient.Builder 未注册。

    • 解决:在 pom.xmlspring-boot-starter-webflux,或手动在配置类中 @Bean public WebClient.Builder webClientBuilder()

  2. Cannot construct instance of reactor.core.publisher.Mono during @RequestBody deserialization

    • 原因:Controller 的签名写成了 @RequestBody Mono(无泛型)或让 Jackson 要构造 Mono 本身。

    • 解决:要么 @RequestBody ChatPrompt chatPrompt,要么 @RequestBody Mono<ChatPrompt> chatPromptMono(带泛型)。

  3. No primary or single unique constructor found for interface ServerHttpRequest

    • 原因:在 Spring MVC(Servlet)环境尝试使用 WebFlux 类型 ServerHttpRequest

    • 解决:使用 WebFlux(spring-boot-starter-webflux)或者在 MVC 下使用 HttpServletRequest / SseEmitter

  4. 前端 Unexpected end of JSON input

    • 原因:直接 JSON.parse 不完整的 chunk(被截断)。

    • 解决:使用 buffer 并按事件边界(双换行)切分,只有完整事件才 JSON.parse

  5. 逐字打印导致后续 chunk 到来时乱序

    • 原因:并发启动多个定时器打印多个 chunk。

    • 解决:使用串行 typingQueue,确保当前片段打印完后才开始下一个;并提供混合/加速策略(收到 finished 时 flush)。

  6. 如何识别流结束

    • 可能的结束信号:data: [DONE],或 JSON 中的 finishReason / finish_reason / finishReason: STOP,或服务端关闭连接(stream 完成)。后端解析器里兼顾几种方式都好。

  7. 如何上传内存/输出流而不是本地文件路径

    • 使用 MultipartBodyBuilder 并传 InputStreamResourceByteArrayResource(也可用 PipedInputStream/PipedOutputStream 做生产者-消费者实时写入)。我之前给过三个示例(ByteArrayResource、InputStreamResource、PipedInputStream 配合 PipedOutputStream)。


六、如何测试(curl 与 前端快速验证)

curl(POST SSE)

curl -N -X POST http://localhost:8080/api/chat/stream \-H "Content-Type: application/json" \-d '{"prompt":"你好"}'

-N 表示不缓冲输出(保持流)。

前端(fetch):使用上面 Vue 的 sendPrompt()


七、完整代码清单(快速一览)

(已经在上文分别给出 pom.xml、DTO、WebClientConfigGeminiStreamServiceChatController、前端组件 StreamChat.vue。把这些文件放在 Spring Boot 项目并替换 AI 后端 URL & API Key 即可运行。)


八、部署与监控建议(短)

  • 对流式连接设置合理的超时与心跳(避免反向代理/负载均衡中断长连接);

  • 限制并发流连接数(避免同一服务被 DDoS);对每个用户连接做配额;

  • 日志记录:记录 elapsedMs, token 使用量,取消率(doOnCancel),异常率;

  • 指标:Expose Prometheus metrics(连接数、平均延迟、拼接文本长度等)。

我正在做一个开源通用管理系统,可以满足大部分系统的要求进行二次改造,目前已经发布在github上,后面也会继续优化。

http://www.dtcms.com/a/422874.html

相关文章:

  • 【学习笔记】高质量数据集
  • 微美全息科学院(WIMI.US):互信息赋能运动想象脑电分类,脑机接口精度迎来突破!
  • 协议 NTP UDP 获取实时网络时间
  • 公司网站可以分两个域名做吗残疾人网站服务平台
  • spark pipeline 转换n个字段,如何对某个字段反向转换
  • 学习React-18-useCallBack
  • 长沙制作网站的公司与传统市场营销的区别与联系有哪些
  • 从语言到向量:自然语言处理核心转换技术的深度拆解与工程实践导论(自然语言处理入门必读)
  • 无人设备遥控器之无线发射接收技术篇
  • 《从数组到动态顺序表:数据结构与算法如何优化内存管理?》
  • 浏览器正能量网站2021网页设计免费模板图片
  • 花生壳内网穿透网站如何做seo优化目前最好的找工作平台
  • 1-wireshark网络安全分析——VLAN基础细节详解
  • android studio 无法运行java main()
  • 如何用 Claude Code 搭建安全、可测、可自动化的 GitHub CI 流程?
  • K6的CI/CD集成在云原生应用的性能测试应用
  • Selective Kernel Networks 学习笔记
  • wordpress 浮动留言框搜索引擎优化是什么工作
  • UNIX下C语言编程与实践9-UNIX 动态库创建实战:gcc 参数 -fpic、-shared 的作用与动态库生成步骤
  • 无锡市建设工程质量监督站网站三星网上商城投诉电话
  • Cesium快速入门到精通系列教程十九:Cesium 1.95 中地图模式
  • 内网穿透部署
  • port hybrid pvid vlan vlan-id 概念及题目
  • 十大高端网站定制设计在线制作图片的软件
  • sentinel docker gateway k8s 集群 主从
  • 嘉兴高端网站定制进销存软件排行榜前十名
  • 一个wordpress的爱好者北京关键词优化平台
  • 第四部分:VTK常用类详解(第111章 vtkGlyph3D符号化类)
  • 联邦大型语言模型、多智能体大型语言模型是什么?
  • Apache Doris 入门与技术替代方案