如何通过reactor实现流式响应接口
最近在学习 Spring-AI 框架,发现其流式响应接口使用的是 reactor 的 Flux,于是准备深入学习一番
简单样例
@GetMapping("/mono")public Mono<String> mono() {return Mono.just("hello mono");}中文乱码
在上面的样例中 mono 接口返回了一个字符串 “hello mono”,但是如果返回中文字符则会乱码,需要添加响应头,设置字符集
@GetMapping("/mono-zh")public Mono<String> monoChinese(HttpServletResponse response) {response.setCharacterEncoding("UTF-8");return Mono.just("你好 mono");}异步返回
使用 reactor 的场景基本都是耗时较长的场景,需要异步返回
@GetMapping("/mono-async")public Mono<String> monoAsync(HttpServletResponse response) {response.setCharacterEncoding("UTF-8");Mono<String> mono = Mono.fromFuture(CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}return "你好 mono";}));return mono;}在上述代码中,启动异步任务后并不会等待异步任务执行完毕,fromFuture 方法会将执行中的异步任务包装为 Mono 对象并立即返回
多条返回
Mono 对象只能返回一条消息,在使用大模型时往往需要返回多条消息,此时需要切换为 Flux
@GetMapping("/flux-async")public Flux<String> fluxAsync(HttpServletResponse response) {response.setCharacterEncoding("UTF-8");return Flux.create(sink -> {// 订阅时异步执行CompletableFuture.runAsync(() -> {try {// 模拟逐步发送多条消息Thread.sleep(3000);sink.next("第一条消息");Thread.sleep(1000);sink.next("第二条消息");Thread.sleep(1000);sink.next("第三条消息");sink.complete();} catch (Exception e) {sink.error(e);}});});}注意:多条返回是指多次返回不同消息,并不能像ai工具一样打字式的逐字显示,但可以通过每次仅返回一个字符来实现类似的效果
Sinks
也可以使用 reactor 的 Sinks 实现类似的功能
@GetMapping("/sink")public Flux<String> sink(HttpServletResponse response) {response.setCharacterEncoding("UTF-8");Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();CompletableFuture.runAsync(() -> {try {// 模拟逐步发送多条消息for (int i = 0; i < 10; i++) {Thread.sleep(6000);sink.tryEmitNext("你好 sink " + i);}} catch (Exception e) {sink.tryEmitError(e);}});return sink.asFlux();}Sinks 提供了 many 和 one 两个方法,分别支持发送多条消息和一条消息,可以通过 asFlux 和 asMono 方法转换为 Flux 和 Mono 对象。
同时提供 unicast 和 multicast 来实现单播和广播
最后 onBackpressureBuffer 是背压策略,表示当生产速度快于消费速度时,会将数据缓存起来以避免丢失
返回的 Sinks.Many 对象,支持下列方法
- tryEmitNext:发送一条消息
- emitNext:发送一条消息,发送失败会抛出异常
- tryEmitError:发送错误消息
- emitError:发送错误消息,发送失败会抛出异常
- tryEmitComplete:发送完成信号
- emitComplete:发送完成信号,发送失败会抛出异常
- asFlux:转换为 Flux 对象
