当前位置: 首页 > news >正文

Spring WebFlux 流式数据拉取与推送的实现

Spring WebFlux流式数据拉取与推送的实现

本文介绍了使用Spring WebFlux实现流式数据拉取与推送的方案。文章首先展示了流式返回数据的格式(类似DeepSeek大模型的推送模式),然后详细讲解了三个核心实现部分:1)通过Flux.create实现流式响应数据的桥接转发;2)配置OkHttpClient的HTTP客户端参数(特别是readTimeout和callTimeout设为0以支持流式传输);3)核心数据获取方法queryDifficultFaultMessage的实现,包括异步请求处理、错误处理和取消订阅机制。该方案实现了后端对原始数

前言

1,流式返回数据类型如下,是不断的推送数据,类似于主流DeepSeek大模型模式,推送数据一点点推送,直至推送结束或者主动点击停止

data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":""}data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":""}data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"故障"}data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"根"}data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"因"}
....
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":":\n\n"}data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"message_end","answer":"一"}```

2,流式接口又称基于响应式编程(Reactive Programming)的服务器端到服务器端(Service-to-Service)的流式数据拉取与推送的实现。

3,它的核心作用是:从一个流式API(SSE)消费数据,并立即将其作为流式响应转发给客户端

4,功能要求,后段不做数据处理,大模型接口返回什么数据,直接回传给前端,分阻塞返回,【当时做的效果是:大模型返回数据,后端自动拼接结束后返回给前端,这种效果很不友好,导致请求的时间比较长】

功能实现

1.流式响应数据

public Flux<DifficultFaultMessageVo> streamingQueryDifficultFault(DifficultFaultMessageDto paramDto) {// 这个 Flux 代表了从下游服务获取的原始数据流。Flux<DifficultFaultMessageVo> faults = queryDifficultFaultMessage(paramDto);return Flux.create(sink -> {// faults.subscribeOn(Schedulers.boundedElastic()): 这行代码至关重要。它告诉上游的 faults 流在 boundedElastic 调度器上执行其订阅操作(即执行网络请求和处理响应)。faults.subscribeOn(Schedulers.boundedElastic())// 这里创建了一个新的 Flux。create 方法允许我们手动控制如何向这个流中发射数据。我们传入一个 Consumer,它接收一个 FluxSink 对象(这里的参数名为 sink)作为参数。Sink(汇)就是数据流的出口,我们可以通过它发射数据 (next)、错误 (error) 或完成信号 (complete)。.subscribe(sink::next, sink::error, sink::complete);});
}

代码解析:
subscribe(sink::next, sink::error, sink::complete): 这里订阅了从 queryDifficultFaults 返回的原始流。
当原始流 (faults) 产生一个数据 (DifficultFaultMessageVo 对象) 时,就通过 sink::next 将它转发给我们新创建的流的 Sink。
当原始流发生错误时,通过 sink::error 将错误转发给新流的 Sink。
当原始流结束时,通过 sink::complete 结束新流的 Sink。

**总结:**这个方法的作用可以理解为 “流的桥接” 。它将在一个弹性线程上执行的、可能阻塞的原始数据流,桥接成一个适合在WebFlux等响应式Web框架中返回的响应式流。外部调用者(如Controller)只需返回这个方法的返回值,框架就会自动处理流的订阅和HTTP响应体的流式写入。

2.HTTP客户端配置 okhttpclient

private final OkHttpClient httpClient = new OkHttpClient.Builder().connectTimeout(120, TimeUnit.SECONDS) // 连接超时2分钟.readTimeout(0, TimeUnit.SECONDS)      // 读取超时:0(无限等待,对于流式响应关键!).writeTimeout(120, TimeUnit.SECONDS)   // 写入超时2分钟.callTimeout(0, TimeUnit.SECONDS)      // 整个调用超时:0(无限等待).retryOnConnectionFailure(true)        // 自动重试连接失败.connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接池(20个空闲连接,存活5分钟).build();

代码解读:
这个配置是流式HTTP客户端的灵魂,每一项都针对长连接和流式传输进行了优化:

readTimeout(0): 这是最关键的配置。普通的HTTP请求需要设置读取超时,但流式响应是一个长时间存在的连接,数据会分块持续发送。设置为 0 表示永不超时,客户端会一直等待服务器发送更多数据,直到连接被服务器或自己主动关闭。
callTimeout(0): 同理,整个调用的总时间也不应设限。
connectTimeout 和 writeTimeout: 这两个仍然需要设置一个合理的值,分别控制建立TCP连接的时间和发送请求体的时间,这些操作不应该无限等待。
connectionPool: 使用连接池可以复用TCP连接,避免为每个请求都进行三次握手,极大提升性能。这里配置了最多保持20个空闲连接,每个空闲连接最多存活5分钟。
retryOnConnectionFailure: 网络抖动时自动重试,提高鲁棒性。

3.核心数据获取方法

private Flux<DifficultFaultMessageVo> queryDifficultFaultMessage(DifficultFaultMessageDto paramDto) {return Flux.create(emitter -> { // 这个emitter是内部Flux的SinkRequest request = buildRequest(paramDto);Call call = httpClient.newCall(request);call.enqueue(new Callback() { // 异步执行HTTP请求@Overridepublic void onFailure(...) {if (!emitter.isCancelled()) {emitter.error(...); // 网络失败,向上游发射错误}}@Overridepublic void onResponse(...) throws IOException {if (!response.isSuccessful()) {if (!emitter.isCancelled()) {emitter.error(...); // HTTP状态码非2xx,向上游发射错误}return;}// 成功响应,开始处理流式响应体processResponseStream(response, emitter);}});// 重要:注册取消回调emitter.onCancel(() -> {if (!call.isCanceled()) {call.cancel(); // 如果下游取消订阅(如客户端断开),则取消OkHttp请求}});});
}
构建请求
private Request buildRequest(DifficultFaultMessageDto paramDto) {// 2.组装请求头信息MediaType parse = MediaType.parse("application/json;charset=UTF-8");// 3.组装请求体信息JSONObject requestBody = new JSONObject();requestBody.put("x x x", paramDto.getxxxType());requestBody.put("apiKey", paramDto.getApiKey());// 省略业务代码... log.info("API请求体: {}", requestBody);return new Request.Builder().url("http://xxx.x.xx.xx:8000/servicexxx/r/postApi").post(body).addHeader("X-APP-ID", "xx09xx3xx0d7").addHeader("X-APP-KEY", "9jfksjfjkxxkkssdc").addHeader("Content-Type", "application/json").build();}

代码解读:
目的:创建一个 Flux,用于封装对下游服务的异步HTTP调用和流式响应处理。
执行流程:

构建请求 (buildRequest) 和调用对象 (Call)。
异步执行 (call.enqueue) HTTP请求。
在回调中:

失败 (onFailure): 检查内部的 FluxSink (emitter) 是否还未被取消(即下游是否还在关心结果),如果是,则发射一个错误信号。
成功 (onResponse): 检查HTTP状态码,如果不成功则发射错误;如果成功,则调用 processResponseStream 开始处理响应体流。
emitter.onCancel(…): 这是响应式编程中资源清理的关键。它注册了一个回调,当这个 Flux 的下游订阅者取消订阅时(例如,前端用户关闭了浏览器标签页),这个回调会被触发。回调里会取消底层的OkHttp Call 对象,从而立即关闭网络连接,避免资源泄漏。这是一种“背压”(Backpressure)传播,体现了响应式的优点。

4.流式响应体处理 【最核心部分】

private void processResponseStream(Response response, FluxSink<DifficultFaultsVo> emitter) {try (ResponseBody responseBody = response.body()) { // 使用try-with-resources确保资源关闭...BufferedSource source = responseBody.source(); // 获取缓冲数据源AtomicBoolean isComplete = new AtomicBoolean(false); // 标志位,是否收到结束事件try {while (!emitter.isCancelled()) { // 循环,只要下游没有取消就继续读String line = source.readUtf8Line(); // 读取一行UTF-8文本if (line == null) break; // 读到null表示流自然结束(服务器关闭连接)if (StringUtils.isBlank(line)) continue; // 忽略空行// SSE协议格式:每段数据以"data: "开头if (line.startsWith("data:")) {String jsonData = line.substring(6); // 截取"data: "后面的JSON字符串// 过滤:只处理包含"answer"或"message_end"的数据行if (!jsonData.contains("answer") && !jsonData.contains("message_end")) {continue;}DifficultFaultMessageVo vo = processModelResponse(jsonData); // 解析JSON为值对象if (vo != null) {emitter.next(vo); // 解析成功,立即发射给下游(最终到前端)// 如果遇到结束事件,标记完成并结束循环if ("message_end".equals(vo.getEvent())) {isComplete.set(true);emitter.complete();break;}}}}} catch (Exception e) {// 处理读取和解析过程中的异常if (!emitter.isCancelled()){emitter.error(e);}} finally {// 确保流最终完成if (!isComplete.get() && !emitter.isCancelled()){emitter.complete();}}...} catch (Exception e) {...} finally {response.close(); // 最终确保HTTP响应被关闭}
}private DifficultFaultMessageVo processModelResponse(String jsonData) {try {// 1. 解析JSONDifficultFaultMessageVo rawResponse = JSONUtil.toBean(jsonData,DifficultFaultMessageVo.class,false);// 2. 过滤非消息事件String event = rawResponse.getEvent();// 3. 创建前端响应对象DifficultFaultMessageVo result = new DifficultFaultMessageVo();result.setConversationId(rawResponse.getConversationId());// 4. 处理结束事件if ("message_end".equals(event)) {result.setEvent("message_end");result.setAnswer("");return result;}result.setAnswer(rawResponse.getAnswer());result.setEvent(rawResponse.getEvent());return result;} catch (Exception e) {log.error("JSON解析错误: {} - {}", jsonData, e.getMessage());return null;}}

代码解读:
核心任务:从 ResponseBody 的流中逐行读取并解析SSE格式。
SSE格式简介:通常为 data: {json}\n\n。代码只关心以 data: 开头的行。
关键点:

逐行读取: source.readUtf8Line() 是阻塞方法,这就是为什么必须在 boundedElastic 线程上执行的原因。
过滤: 并非所有 data: 行都需要处理,这里通过检查内容来过滤。
JSON解析: processModelResponse(jsonData) 方法(代码未给出)负责将JSON字符串解析为 DifficultFaultsVo 对象。
实时发射: 一旦解析成功,立即通过 emitter.next(vo) 将数据推送给下游,实现了数据块的零延迟转发。
结束条件:

显式结束: 收到 “message_end” 事件,调用 emitter.complete()。
隐式结束: 服务器关闭连接(readUtf8Line() 返回 null),跳出循环。
异常结束: 捕获到任何异常,调用 emitter.error(e)。
健壮性保证: 大量的 if (!emitter.isCancelled()) 检查确保了在下游已经不感兴趣的情况下,不会进行无效的操作(发射数据、错误或完成信号)。finally 块确保了在任何情况下流最终都会被关闭,防止资源泄漏。

总结:

这段代码实现了一个高效、健壮的双重流式处理管道:

下游流 (OkHttp -> 本服务):使用配置了长超时的OkHttp客户端,异步调用外部流式API,并在独立的弹性线程上阻塞地、逐行读取SSE响应。
上游流 (本服务 -> 客户端):通过Project Reactor的 Flux 和 Sink,将下游获取到的数据块立即、实时地转发给最终的客户端(如Web浏览器)。
关键特性:

非阻塞IO: 通过将阻塞操作卸载到专用线程池,保护了Web容器的核心线程。
背压传播: 下游的取消订阅会向上传播,最终取消OkHttp请求,及时释放资源。
全面错误处理: 对网络错误、HTTP错误、解析错误、连接意外关闭等都有处理。
资源安全: 广泛使用 try-with-resources 和 finally 块确保网络连接和响应体被正确关闭。
这是一种在Spring WebFlux等响应式框架中集成传统阻塞式HTTP客户端以消费流式服务的标准且优雅的模式。

喜欢我的文章记得点个在看,或者点赞,持续更新中ing…


文章转载自:

http://rqG6YF9X.krdxz.cn
http://6WbfYAM0.krdxz.cn
http://4bw27Oxg.krdxz.cn
http://NDswhqwq.krdxz.cn
http://bZ5SPour.krdxz.cn
http://2E3mpiUO.krdxz.cn
http://IuSzdNTi.krdxz.cn
http://oQHW7F12.krdxz.cn
http://0pZ8KvJo.krdxz.cn
http://HVCEqsBD.krdxz.cn
http://pbwVy913.krdxz.cn
http://eNC5AREH.krdxz.cn
http://CD9351lH.krdxz.cn
http://2YSXHWPX.krdxz.cn
http://ZAksOnbm.krdxz.cn
http://FY3mDqdp.krdxz.cn
http://RKyGxjbO.krdxz.cn
http://naWgNQrs.krdxz.cn
http://rCiKD3dg.krdxz.cn
http://jBntxqLB.krdxz.cn
http://NVf401bt.krdxz.cn
http://LgDEgsnB.krdxz.cn
http://g2Ljvzhs.krdxz.cn
http://RUqIwCQe.krdxz.cn
http://8bCrIOeW.krdxz.cn
http://Hgq85tVd.krdxz.cn
http://dc9pksBD.krdxz.cn
http://kxLnxbZP.krdxz.cn
http://xYKdqNoY.krdxz.cn
http://Iyq8D3SB.krdxz.cn
http://www.dtcms.com/a/366244.html

相关文章:

  • 【算法--链表】25.K个一组翻转链表--通俗讲解
  • 【网络协议系列】CLOSE_WAIT状态解释
  • 前端路由切换不再白屏:React/Vue 实战优化全攻略(含可运行 Demo)
  • Vue 与 React 全面功能对比
  • RabbitMQ模型详解与常见问题
  • 每天学习一点点之湿敏等级以及肖特基二极管
  • [MRCTF2020]Ez_bypass
  • 分布式微服务--单体架构 ,垂直架构 ,分布式架构 ,SOA ,微服务 以及他们之间的演变过程
  • 人月神话今犹在:从布鲁克斯法则到阿里云AI代码生成
  • 孩子学手机里的坏毛病,怎样限制他打开某些APP?
  • [免费]基于Python的Django+Vue图书借阅推荐系统【论文+源码+SQL脚本】
  • 2025年人工智能政策剖析:GEO新赛道,硕芽科技助力前行
  • 光谱相机在手机行业的应用
  • 怎样让外网计算机访问局域网计算机?通过公网地址访问不同内网服务的设置方法
  • 在 ASP.NET Core 8 Web API 中实现基于角色的授权 安全且可扩展 API 的最佳实践
  • 安装3DS MAX 2026后,无法运行,提示缺少.net core的解决方案
  • 基于阿里云部署 RustDesk 自托管服务器
  • 电子病历空缺句的语言学特征描述与自动分类探析(以GPT-5为例)(下)
  • 从根源破解“找不到 vcruntime140.dll 无法执行”问题:原因分析、安全修复工具推荐及预防指南
  • 服务器监控不用盯屏幕:Ward+Cpolar让异常告警主动找到你
  • 【LeetCode热题100道笔记】旋转图像
  • 从零开始的云计算生活——第五十八天,全力以赴,Jenkins部署
  • [Linux] Linux标准块设备驱动详解:从原理到实现
  • 如何将两个网段互相打通
  • ⸢ 肆 ⸥ ⤳ 默认安全:安全建设方案 ➭ b.安全资产建设
  • 算法模板(Java版)_字符串、并查集和堆
  • 云数据库服务(参考自腾讯云计算工程师认证课程)更新中......
  • 如何在Linux上部署1Panel面板并远程访问内网Web端管理界面
  • vue3存储/获取本地或会话存储,封装存储工具,结合pina使用存储
  • [数据结构] 链表