Spring AI-流式编程
1. 流式编程
1.1 SSE协议介绍
HTTP协议本身设计为无状态的请求-响应模式,严格来说,是无法做到服务器主动推送消息到客户端,但 通过Server-Sent Events(服务器发送事件,简称SSE)技术可实现流式传输,允许服务器主动向浏览器推送数据流。
也就是说,服务器向客户端声明,接下来要发送的是流消息(streaming),这时客户端不会关闭连接,会一直等待服务器发送过来新的数据流。
SSE(Server-Sent Events)是一种基于HTTP的轻量级实时通信协议,浏览器通过内置的EventSource API接受并处理这些实时事件。
核心特点
基于HTTP协议
复用标准HTTP/HTTPS协议,无需额外端口或协议,兼容性好且易于部署
单向通信机制
SSE仅支持服务器向客户端的单向数据推送,客户端通过普通HTTP请求建立连接后,服务器可持续发送数据流,但客户端无法通过同一连接向服务器发送数据。
自动重连机制
支持断线重连,连接中断时,浏览器会自动尝试重新连接(支持 retry 字段指定重连间隔)
自定义消息类型
客户端发起请求后,服务器保持连接开放,响应头设置 Content-Type: text/event-stream,标识为事件流格式,持续推送事件流。
数据格式
服务器向浏览器发送SSE数据,需要设置必要的HTTP头信息
Content-Type: text/event-stream;charset=utf-8Connection: keep-alive
每一次发送的消息,由若干个message组成,每个message之间由\n\n分割,每个message内部由若干行组成,每一行都是如下的格式:
[field]: value\n
Field可以取值为:
data[必需]:数据内容
event[非必需]:表示自定义的事件类型,默认是message事件
id[非必需]:数据标识符,相当于每一条数据的编号
retry[非必需]:指定浏览器重新发起连接的时间间隔
除此之外,还可以有冒号:开头的行,表示注释。
数据示例:
event: foo\n
data: a foo event\n\n
data: an unnamed event\n\n
event: end\n
data: a bar event\n\n
服务端实现
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {@RequestMapping("/data")public void data(HttpServletResponse response) throws IOException, InterruptedException {log.info("发送请求:data");response.setContentType("text/event-stream;charset=utf-8");PrintWriter printWriter=response.getWriter();for (int i = 0; i < 10; i++){String s = "data: " + new Date() + "\n\n";printWriter.write(s);printWriter.flush();Thread.sleep(1000);}}
测试接口:127.0.0.1:8080/sse/data
客户端API
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource('/sse/data');eventSource.onmessage = function(event){document.getElementById("sse").innerHTML= event.data;}// eventSource.addEventListener("foo", (event)=>{// document.getElementById("sse").innerHTML= event.data;// });// eventSource.addEventListener("end", (event)=>{// document.getElementById("sse").innerHTML= event.data;// eventSource.close();// });
</script>
</body>
</html>
1.2 Spring 中SSE实现
Spring 4.2 开始就已经支持SSE,从Spring 5开始我们可以使用WebFlux更优雅的实现SSE协议。Flux 是WebFlux的核心API。
可以把Flux想象成一条传送带
异步传送:数据像快递包裹一样逐个到达,不用等全部到齐。
灵活加工:支持途中修改数据(如过滤/转换)
弹性控制:接收方可以调速(背压价值)
快速使用
1.创建Flux:创建一个Flux,并有数据源。
2.处理数据:使用操作符对数据进行处理。
3.订阅数据:订阅Flux来消费数据,触发数据流动。
示例:
public class FluxDemoTest {public static void main(String[] args) throws InterruptedException {Flux<String> flux=Flux.just("Apple","Banana","Cherry","Pear").delayElements(Duration.ofSeconds(1));flux.map(String::toUpperCase).map(s->s+"-").subscribe(System.out::println);Thread.sleep(5000);}
}
创建Flux
使用Flux.just(...)创建一个指定元素的Flux
Flux<String> flux=Flux.just("Apple","Banana","Cherry","Pear")
自动重连代码练习
@RequestMapping("/retry")public void retry(HttpServletResponse response) throws IOException{log.info("发送请求:retry");response.setContentType("text/event-stream;charset=utf-8");PrintWriter printWriter=response.getWriter();//告诉浏览器如果断开连接2s后重传String s="retry: 2000\n\n";s += "data: " + new Date() + "\n\n";printWriter.write(s);printWriter.flush();}
测试接口:http://127.0.0.1:8080/index.html
测试结果:
每两秒重新发一次消息
自定义事件:
@RequestMapping("/event")public void event(HttpServletResponse response) throws IOException, InterruptedException {log.info("发起请求: event");response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer=response.getWriter();for (int i = 0; i < 10; i++){//自定义事件类型String s = "event: foo\n";s += "data: " + new Date() + "\n\n";writer.write(s);writer.flush();Thread.sleep(1000L);}}
测试接口:http://127.0.0.1:8080/index.html
测试结果:
有明确结束信号的持续数据推送流:
@RequestMapping("/end")public void end(HttpServletResponse response) throws IOException, InterruptedException {log.info("发起请求: end");response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer=response.getWriter();for (int i = 0; i < 10; i++){String s = "event: foo\n";s += "data: " + new Date() + "\n\n";writer.write(s);writer.flush();Thread.sleep(1000L);}//定义事件,表示当前流传输结束writer.write("event: end\ndata: EOF\n\n");writer.flush();}
流式响应接口:
需求:每隔一秒向客户端输出当前时间。
@RequestMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> stream(){return Flux.interval(Duration.ofSeconds(1)).map(s->new Date().toString());}
<div id="sse"></div>
<script>let eventSource = new EventSource("/sse/stream");eventSource.onmessage = function(event){document.getElementById("sse").innerHTML = event.data;}
</script>
常见的操作符:
操作 | 作用 | 示例代码片段 |
map() | 元素一对一转换 | .map(String::toUpperCase) |
filter() | 条件过滤 | .filter(s -> s.length() > 5) |
take() | 限制元素数量 | .take(2) //只取前2个元素 |
merge() | 合并多个Flux(不保证顺序) | Flux.merge(Flux.just("A"), Flux.just("B")) |
concat() | 顺序拼接多个Flux(保证顺序) | Flux.concat(Flux.just("A"), Flux.just("B")) |
delayElements | 延迟元素发射 | .delayElements(Duration.ofSeconds(1)) |