当前位置: 首页 > news >正文

Spring AI-流式编程

1. 流式编程

1.1 SSE协议介绍

HTTP协议本身设计为无状态的请求-响应模式,严格来说,是无法做到服务器主动推送消息到客户端,但 通过Server-Sent Events(服务器发送事件,简称SSE)技术可实现流式传输,允许服务器主动向浏览器推送数据流。

也就是说,服务器向客户端声明,接下来要发送的是流消息(streaming),这时客户端不会关闭连接,会一直等待服务器发送过来新的数据流。

SSE(Server-Sent Events)是一种基于HTTP的轻量级实时通信协议,浏览器通过内置的EventSource API接受并处理这些实时事件。

核心特点

基于HTTP协议

复用标准HTTP/HTTPS协议,无需额外端口或协议,兼容性好且易于部署

单向通信机制

SSE仅支持服务器向客户端的单向数据推送,客户端通过普通HTTP请求建立连接后,服务器可持续发送数据流,但客户端无法通过同一连接向服务器发送数据。

自动重连机制

支持断线重连,连接中断时,浏览器会自动尝试重新连接(支持 retry 字段指定重连间隔)

自定义消息类型

客户端发起请求后,服务器保持连接开放,响应头设置 Content-Type: text/event-stream,标识为事件流格式,持续推送事件流。

数据格式

服务器向浏览器发送SSE数据,需要设置必要的HTTP头信息

 Content-Type: text/event-stream;charset=utf-8Connection: keep-alive

每一次发送的消息,由若干个message组成,每个message之间由\n\n分割,每个message内部由若干行组成,每一行都是如下的格式:

[field]: value\n

Field可以取值为:

data[必需]:数据内容

event[非必需]:表示自定义的事件类型,默认是message事件

id[非必需]:数据标识符,相当于每一条数据的编号

retry[非必需]:指定浏览器重新发起连接的时间间隔

除此之外,还可以有冒号:开头的行,表示注释。

数据示例:

event: foo\n
data: a foo event\n\n
data: an unnamed event\n\n
event: end\n
data: a bar event\n\n

服务端实现

@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {@RequestMapping("/data")public void data(HttpServletResponse  response) throws IOException, InterruptedException {log.info("发送请求:data");response.setContentType("text/event-stream;charset=utf-8");PrintWriter printWriter=response.getWriter();for (int i = 0; i < 10; i++){String s = "data: " + new Date() + "\n\n";printWriter.write(s);printWriter.flush();Thread.sleep(1000);}}

测试接口:127.0.0.1:8080/sse/data

客户端API

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource('/sse/data');eventSource.onmessage = function(event){document.getElementById("sse").innerHTML= event.data;}// eventSource.addEventListener("foo", (event)=>{//     document.getElementById("sse").innerHTML= event.data;// });// eventSource.addEventListener("end", (event)=>{//   document.getElementById("sse").innerHTML= event.data;//     eventSource.close();// });
</script>
</body>
</html>

1.2 Spring 中SSE实现

Spring 4.2 开始就已经支持SSE,从Spring 5开始我们可以使用WebFlux更优雅的实现SSE协议。Flux 是WebFlux的核心API。

可以把Flux想象成一条传送带

异步传送:数据像快递包裹一样逐个到达,不用等全部到齐。

灵活加工:支持途中修改数据(如过滤/转换)

弹性控制:接收方可以调速(背压价值)

快速使用

1.创建Flux:创建一个Flux,并有数据源。

2.处理数据:使用操作符对数据进行处理。

3.订阅数据:订阅Flux来消费数据,触发数据流动。

示例:

public class FluxDemoTest {public static void main(String[] args) throws InterruptedException {Flux<String> flux=Flux.just("Apple","Banana","Cherry","Pear").delayElements(Duration.ofSeconds(1));flux.map(String::toUpperCase).map(s->s+"-").subscribe(System.out::println);Thread.sleep(5000);}
}

创建Flux

使用Flux.just(...)创建一个指定元素的Flux

Flux<String> flux=Flux.just("Apple","Banana","Cherry","Pear")

自动重连代码练习

@RequestMapping("/retry")public void retry(HttpServletResponse  response) throws IOException{log.info("发送请求:retry");response.setContentType("text/event-stream;charset=utf-8");PrintWriter printWriter=response.getWriter();//告诉浏览器如果断开连接2s后重传String s="retry: 2000\n\n";s += "data: " + new Date() + "\n\n";printWriter.write(s);printWriter.flush();}

测试接口:http://127.0.0.1:8080/index.html

测试结果:

每两秒重新发一次消息

自定义事件:

@RequestMapping("/event")public void event(HttpServletResponse  response) throws IOException, InterruptedException {log.info("发起请求: event");response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer=response.getWriter();for (int i = 0; i < 10; i++){//自定义事件类型String s = "event: foo\n";s += "data: " + new Date() + "\n\n";writer.write(s);writer.flush();Thread.sleep(1000L);}}

测试接口:http://127.0.0.1:8080/index.html

测试结果:

有明确结束信号的持续数据推送流:

@RequestMapping("/end")public void end(HttpServletResponse  response) throws IOException, InterruptedException {log.info("发起请求: end");response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer=response.getWriter();for (int i = 0; i < 10; i++){String s = "event: foo\n";s += "data: " + new Date() + "\n\n";writer.write(s);writer.flush();Thread.sleep(1000L);}//定义事件,表示当前流传输结束writer.write("event: end\ndata: EOF\n\n");writer.flush();}

流式响应接口:

需求:每隔一秒向客户端输出当前时间。

@RequestMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public  Flux<String> stream(){return Flux.interval(Duration.ofSeconds(1)).map(s->new Date().toString());}
<div id="sse"></div>
<script>let eventSource = new EventSource("/sse/stream");eventSource.onmessage = function(event){document.getElementById("sse").innerHTML = event.data;}
</script>

常见的操作符:

操作作用示例代码片段
map()元素一对一转换.map(String::toUpperCase)
filter()条件过滤.filter(s -> s.length() > 5)
take()限制元素数量.take(2) //只取前2个元素
merge()合并多个Flux(不保证顺序)Flux.merge(Flux.just("A"), Flux.just("B"))
concat()顺序拼接多个Flux(保证顺序)Flux.concat(Flux.just("A"), Flux.just("B"))
delayElements延迟元素发射.delayElements(Duration.ofSeconds(1))
http://www.dtcms.com/a/457026.html

相关文章:

  • 手写 Promise.all 的原理与实现
  • 关于windows系统事件查看器的初步理解
  • Linux 线程概念与虚拟地址空间深度解析
  • 一套智慧工地云平台源码,支持监管端、项目管理端,Java+Spring Cloud +UniApp +MySql技术开发
  • 虚幻引擎5 GAS开发俯视角RPG游戏 P05-05 游戏效果委托
  • 音频audio播放两种方式:MediaPlayer和AudioTrack对比
  • K8s学习笔记(十五) pause容器与init容器
  • DVWA靶场之十六:未验证的重定向漏洞(Open HTTP Redirect)
  • 上海网站建设免费推做网站的软件 简单易学
  • 面部情绪识别数据集的介绍和下载
  • Golang中的HTTP请求凝聚器
  • 网站建设多少钱一平米中铁建设集团门户网登陆
  • Linux shell学习(更新中....)
  • 自动生成API文档与故障排查决策树的NLP应用
  • 手机怎么制作钓鱼网站建设文明网 联盟网站的
  • Rust 的类型自动解引用:隐藏在人体工学设计中的魔法
  • AVX-512深度实现分析:从原理到LLaMA.cpp的性能优化艺术
  • 前端玩转大模型,DeepSeek-R1 蒸馏 Llama 模型的 Bedrock 部署
  • 计算机网络-运输层
  • OSPF协议详解5:实验 - 计时器、度量值与其他高级配置
  • OpenCV(五):鼠标控制
  • Linux中权限系统
  • 网站域名到期后果四川人力资源考试官网二建
  • python爬虫(五) ---- Pyinstaller打包Python程序为exe文件及遇到的问题
  • 沈阳做网站价格自己做网站要学什么
  • 深入浅出ArkTS:HarmonyOS应用开发的现代化语法解析
  • UVa 204 Robot Crash
  • 2025 完整指南:Gemini 2.5 Computer Use 模型 - AI Agent 界面控制的革命性突破
  • 云南网站建设专业品牌网站域名怎么转
  • Vue项目中如何实现表格选中数据的 Excel 导出