当前位置: 首页 > news >正文

Java 网络流式编程

SSE 协议

概念

在 WebSocket 协议中,客户端与服务器之间建立连接完成后,客户端向服务器发送请求,服务器给客户端返回响应,此后服务器就不能主动与客户端进行通信。若服务器需要不断地给客户端发送响应,就需要使用 SSE 协议。

当服务器向客户端声明,接下来发送的是流消息,那么客户端就不会关闭连接,一直等待服务器发送数据。若后续服务器不需要给客户端发送数据,也可以主动关闭连接。

SSE(Server-Sent Events)是一种基于 HTTP 的轻量级实时通信协议,浏览器通过内置的 EventSource API 接收并处理这些实时事件。

核心特点

基于 HTTP 协议

使用标准 HTTP / HTTPS 协议,无需额外端口或协议,兼容性好切易于部署

单项通信机制

SSE 仅支持服务器向客户端的单向数据推送,客户端与服务器建立连接后,服务器可以给客户端持续发送数据,但是客户端不能给服务器发送数据

自动重连机制

客户端与服务器断线重连时,浏览器会自动尝试重新连接(retry 字段支持设置重连间隔)

自定义消息类型

默认是 message 类型,也可以自定义类型,前端根据不同的类型做出相应的相应。

数据格式

当服务器发送流式数据时,需要设置 Content-Type 为:

text/event-stream;charset=utf-8

每一次发送的消息,是由若干个 message 组成,message 之间使用 \n\n 进行分割,message 由若干行组成,每一行的格式如下:

[field]: value\n

field 取值如下:

  • data:数据内容,必须
  • event:表示自定义消息类型,默认为 message 类型,非必需
  • id:数据标识符,相当于每一条数据的编号,非必需
  • retry:指定浏览器发起重新连接的时间间隔,非必需

代码示例

data

后端代码:

@RestController
public class SSMController {@GetMapping("/data")public void data(HttpServletResponse response) throws IOException, InterruptedException {response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer = response.getWriter();String str = "";for (int i = 0; i < 10; i++) {str = "data: " + new Date() + "\n\n";writer.write(str);writer.flush();Thread.sleep(1000L);}}
}

前端代码:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/data");eventSource.onmessage = function(event){document.getElementById("sse").innerHTML= event.data;}
</script>
</body>
</html>

event

后端代码:

@RestController
public class SSMController {@GetMapping("/event")public void event(HttpServletResponse response) throws IOException, InterruptedException {response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer = response.getWriter();for (int i = 0; i < 10; i++) {String str = "event: foo\n"; //自定义事件str += "data: " + new Date() + "\n\n";writer.write(str);writer.flush();Thread.sleep(1000L);}}
}

前端代码:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/event");eventSource.addEventListener("foo", (event)=>{document.getElementById("sse").innerHTML= event.data;});
</script>
</body>
</html>

event 结束连接

后端代码:

@RestController
public class SSMController {@GetMapping("/end")public void end(HttpServletResponse response) throws IOException, InterruptedException {response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer = response.getWriter();for (int i = 0; i < 10; i++) {String str = "event: foo\n";str += "data: " + new Date() + "\n\n";writer.write(str);writer.flush();Thread.sleep(1000L);}writer.write("event: end\ndata: EOF\n\n"); //结束writer.flush();}
}

前端代码:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/end");eventSource.addEventListener("end", (event)=>{eventSource.close();});
</script>
</body>
</html>

retry

后端代码:

@RestController
public class SSMController {@GetMapping("/retry")public void retry(HttpServletResponse response) throws IOException, InterruptedException {response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer = response.getWriter();String str = "retry: 2000\n"; //前端每隔 2000ms 重新发送请求str += "data: " + new Date() + "\n\n"; //每条消息必须以 data 结尾writer.write(str);writer.flush();}
}

前端代码:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/retry");eventSource.onmessage = function(event){document.getElementById("sse").innerHTML= event.data;}
</script>
</body>
</html>

Spring 中 SSE 实现

从 Spring 5开始我们可以使用 WebFlux 更优雅的实现 SSE 协议。Flux 是 WebFlux 的核心API。

快速使用 Flux

  1. 创建 FLux
  2. 处理数据
  3. 订阅数据

代码如下:

public class FluxDemoTest {public static void main(String[] args) throws InterruptedException {Flux<String> flux = Flux.just("aaa", "bbb", "ccc").delayElements(Duration.ofSeconds(1)); //每隔1s打印一次flux.map(s -> {return s.toUpperCase(); //操作 Flux 中的元素}).map(s -> {return s + "-1";}).subscribe(System.out :: println); //订阅 Flux 中的元素Thread.sleep(5000L); //由于打印元素需要时间,需要让程序延迟 5s 停止}
}

常见操作符

map()

元素一对一转化

filter()

过滤元素

merge()

合并多个 Flux,不保证顺序,如:

Flux.merge(Flux.just("A"), Flux.just("B"))

concat()

顺序拼接多个 Flux,保证顺序,如:

Flux.concat(Flux.just("A"), Flux.just("B"))

delayElements()

延迟元素发射,如:

.delayElements(Duration.ofSecends(1))

流式响应接口

后端代码:

@RestController
public class SSMController {@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> stream() {return Flux.interval(Duration.ofSeconds(1)).map(s -> {return new Date().toString();});}
}

后端每隔 1s 给前端发送当前时间。

前端代码:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/stream");eventSource.onmessage = function(event){document.getElementById("sse").innerHTML= event.data;}
</script>
</body>
</html>

http://www.dtcms.com/a/465427.html

相关文章:

  • java后端工程师进修ing(研一版‖day51)
  • JavaScript Promise 详解:从入门到精通
  • 中山建设银行招聘网站网站设计的评估
  • 深圳制作网站培训机构自己做的网站打开超慢
  • MySQL数据库优化实战提升查询性能的五大核心策略
  • libboost_system-mt-x64.so.1.76.0 和libboost_system-mt-d-x64.so.1.76.0 区别
  • 【11408学习记录】考研数学核心突破:线性代数特征值与特征向量详解+英语长难句精析
  • 深入剖析:基于epoll与主从Reactor模型的高性能服务器设计与实现
  • 非小细胞肺癌与肿瘤相关巨噬细胞:新的治疗策略
  • React Native:发现默认参数children【特殊的prop】
  • Flink进阶:从“会用”到“用明白”的踩坑与实战总结
  • 最专业的礼品网站实例网站优化费用怎么做会计分录
  • 苍穹外卖-工作台实现、Apache POI、导出Excel报表
  • 自定义类型:联合与枚举
  • Java9
  • 基于Spring Boot + Vue 3的乡村振兴综合服务平台
  • Java-145 深入浅出 MongoDB 基本操作详解:数据库查看、切换、创建集合与删除完整教程
  • disable-devtool 网络安全 禁止打开控制台
  • TCP协议的可靠性保障
  • ktv支付订房网站模板商业策划书范文6篇
  • 十一、OpenCV中图形的绘制
  • 用户中心网站设计北京社保网址
  • 安卓13_ROM修改定制化-----如何给安卓手机里安装或者内置数字证书文件 cer类型的证书文件如何转换为可内置文件
  • 仿mudou——Connection模块(连接管理)
  • vue3 + el-upload组件集成阿里云视频点播从本地上传至点播存储
  • 外贸网站是用什么软件做的法制教育网站
  • c/c++字符串比较
  • 国外建站公司上海企业自助建站系统
  • AI 生产工艺参数优化:中小型制造企业用 “智能调参“ 提升产品合格率与生产效率
  • 《Linux基础入门指令》:从零开始理解Linux系统