当前位置: 首页 > news >正文

如何通过reactor实现流式响应接口

最近在学习 Spring-AI 框架,发现其流式响应接口使用的是 reactor 的 Flux,于是准备深入学习一番

简单样例

    @GetMapping("/mono")public Mono<String> mono() {return Mono.just("hello mono");}

中文乱码

在上面的样例中 mono 接口返回了一个字符串 “hello mono”,但是如果返回中文字符则会乱码,需要添加响应头,设置字符集

    @GetMapping("/mono-zh")public Mono<String> monoChinese(HttpServletResponse response) {response.setCharacterEncoding("UTF-8");return Mono.just("你好 mono");}

异步返回

使用 reactor 的场景基本都是耗时较长的场景,需要异步返回

    @GetMapping("/mono-async")public Mono<String> monoAsync(HttpServletResponse response) {response.setCharacterEncoding("UTF-8");Mono<String> mono = Mono.fromFuture(CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "你好 mono";}));return mono;}

在上述代码中,启动异步任务后并不会等待异步任务执行完毕,fromFuture 方法会将执行中的异步任务包装为 Mono 对象并立即返回

多条返回

Mono 对象只能返回一条消息,在使用大模型时往往需要返回多条消息,此时需要切换为 Flux

    @GetMapping("/flux-async")public Flux<String> fluxAsync(HttpServletResponse response) {response.setCharacterEncoding("UTF-8");return Flux.create(sink -> {// 订阅时异步执行CompletableFuture.runAsync(() -> {try {// 模拟逐步发送多条消息Thread.sleep(3000);sink.next("第一条消息");Thread.sleep(1000);sink.next("第二条消息");Thread.sleep(1000);sink.next("第三条消息");sink.complete();} catch (Exception e) {sink.error(e);}});});}

注意:多条返回是指多次返回不同消息,并不能像ai工具一样打字式的逐字显示,但可以通过每次仅返回一个字符来实现类似的效果

Sinks

也可以使用 reactor 的 Sinks 实现类似的功能

    @GetMapping("/sink")public Flux<String> sink(HttpServletResponse response) {response.setCharacterEncoding("UTF-8");Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();CompletableFuture.runAsync(() -> {try {// 模拟逐步发送多条消息for (int i = 0; i < 10; i++) {Thread.sleep(6000);sink.tryEmitNext("你好 sink " + i);}} catch (Exception e) {sink.tryEmitError(e);}});return sink.asFlux();}

Sinks 提供了 many 和 one 两个方法,分别支持发送多条消息和一条消息,可以通过 asFlux 和 asMono 方法转换为 Flux 和 Mono 对象。

同时提供 unicast 和 multicast 来实现单播和广播

最后 onBackpressureBuffer 是背压策略,表示当生产速度快于消费速度时,会将数据缓存起来以避免丢失

返回的 Sinks.Many 对象,支持下列方法

  • tryEmitNext:发送一条消息
  • emitNext:发送一条消息,发送失败会抛出异常
  • tryEmitError:发送错误消息
  • emitError:发送错误消息,发送失败会抛出异常
  • tryEmitComplete:发送完成信号
  • emitComplete:发送完成信号,发送失败会抛出异常
  • asFlux:转换为 Flux 对象
http://www.dtcms.com/a/618685.html

相关文章:

  • vue-leaflet使用教程(一)
  • 江苏省徐州市建设银行网站技术培训网站
  • 如何取外贸网站域名建设网站平台费
  • python 贪心-dfs-dp
  • Android Studio - 使用 BuildConfig
  • 在ec2上部署Qwen2.5omini和Qwen3omini模型
  • 设备通信的艺术:从协议选型、性能调优到自定义实现的全维度技术实践
  • 过滤器模式、责任链模式
  • 做货源的网站郑州企业免费建站
  • HCIP笔记5--OSPF域间路由、虚链路、认证
  • Java 黑马程序员学习笔记(进阶篇27)
  • 海南网站推广建设温州市城市基础设施建设网站
  • CentOS 7 安装 unzip-6.0-21.el7.x86_64.rpm 步骤详解(附安装包)
  • 审计局网站建设管理创意设计广告
  • Goer-Docker系列-1-使用kubectl命令部署Web应用
  • php网站超市响应式网站建设方案
  • 家用路由器挑选指南:考量无线协议与Wi-Fi 7新技术参数
  • 站长工具网站提交沈阳网站优化怎么做
  • SAP FICO成本分解功能
  • 前端实现扫描大屏幕二维码上传图片并同步显示方案
  • 免费域名网站wordpress 修改子主题
  • 二叉树的前序遍历
  • 网站规划建设与管理维护课后答案四平市城乡建设局网站
  • 『大模型部署』NVIDIA Orin + bnb量化 + Qwen3-VL | 4bit、8bit量化
  • 网站建设状况vue快速搭建网站
  • 深入数据库性能优化:从参数调优到RAC高可用架构构建
  • MaxTex下载及LaTex环境配置
  • HttpServletResponse 详细指南
  • 网站建设3a模型是什么意思即墨网站建设哪家好
  • 为什么网站设计很少全屏网络维护难吗