Java 网络流式编程
SSE 协议
概念
在 WebSocket 协议中,客户端与服务器之间建立连接完成后,客户端向服务器发送请求,服务器给客户端返回响应,此后服务器就不能主动与客户端进行通信。若服务器需要不断地给客户端发送响应,就需要使用 SSE 协议。
当服务器向客户端声明,接下来发送的是流消息,那么客户端就不会关闭连接,一直等待服务器发送数据。若后续服务器不需要给客户端发送数据,也可以主动关闭连接。
SSE(Server-Sent Events)是一种基于 HTTP 的轻量级实时通信协议,浏览器通过内置的 EventSource API 接收并处理这些实时事件。
核心特点
基于 HTTP 协议
使用标准 HTTP / HTTPS 协议,无需额外端口或协议,兼容性好切易于部署
单项通信机制
SSE 仅支持服务器向客户端的单向数据推送,客户端与服务器建立连接后,服务器可以给客户端持续发送数据,但是客户端不能给服务器发送数据
自动重连机制
客户端与服务器断线重连时,浏览器会自动尝试重新连接(retry 字段支持设置重连间隔)
自定义消息类型
默认是 message 类型,也可以自定义类型,前端根据不同的类型做出相应的相应。
数据格式
当服务器发送流式数据时,需要设置 Content-Type 为:
text/event-stream;charset=utf-8
每一次发送的消息,是由若干个 message 组成,message 之间使用 \n\n 进行分割,message 由若干行组成,每一行的格式如下:
[field]: value\n
field 取值如下:
- data:数据内容,必须
- event:表示自定义消息类型,默认为 message 类型,非必需
- id:数据标识符,相当于每一条数据的编号,非必需
- retry:指定浏览器发起重新连接的时间间隔,非必需
代码示例
data
后端代码:
@RestController
public class SSMController {@GetMapping("/data")public void data(HttpServletResponse response) throws IOException, InterruptedException {response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer = response.getWriter();String str = "";for (int i = 0; i < 10; i++) {str = "data: " + new Date() + "\n\n";writer.write(str);writer.flush();Thread.sleep(1000L);}}
}
前端代码:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/data");eventSource.onmessage = function(event){document.getElementById("sse").innerHTML= event.data;}
</script>
</body>
</html>
event
后端代码:
@RestController
public class SSMController {@GetMapping("/event")public void event(HttpServletResponse response) throws IOException, InterruptedException {response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer = response.getWriter();for (int i = 0; i < 10; i++) {String str = "event: foo\n"; //自定义事件str += "data: " + new Date() + "\n\n";writer.write(str);writer.flush();Thread.sleep(1000L);}}
}
前端代码:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/event");eventSource.addEventListener("foo", (event)=>{document.getElementById("sse").innerHTML= event.data;});
</script>
</body>
</html>
event 结束连接
后端代码:
@RestController
public class SSMController {@GetMapping("/end")public void end(HttpServletResponse response) throws IOException, InterruptedException {response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer = response.getWriter();for (int i = 0; i < 10; i++) {String str = "event: foo\n";str += "data: " + new Date() + "\n\n";writer.write(str);writer.flush();Thread.sleep(1000L);}writer.write("event: end\ndata: EOF\n\n"); //结束writer.flush();}
}
前端代码:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/end");eventSource.addEventListener("end", (event)=>{eventSource.close();});
</script>
</body>
</html>
retry
后端代码:
@RestController
public class SSMController {@GetMapping("/retry")public void retry(HttpServletResponse response) throws IOException, InterruptedException {response.setContentType("text/event-stream;charset=utf-8");PrintWriter writer = response.getWriter();String str = "retry: 2000\n"; //前端每隔 2000ms 重新发送请求str += "data: " + new Date() + "\n\n"; //每条消息必须以 data 结尾writer.write(str);writer.flush();}
}
前端代码:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/retry");eventSource.onmessage = function(event){document.getElementById("sse").innerHTML= event.data;}
</script>
</body>
</html>
Spring 中 SSE 实现
从 Spring 5开始我们可以使用 WebFlux 更优雅的实现 SSE 协议。Flux 是 WebFlux 的核心API。
快速使用 Flux
- 创建 FLux
- 处理数据
- 订阅数据
代码如下:
public class FluxDemoTest {public static void main(String[] args) throws InterruptedException {Flux<String> flux = Flux.just("aaa", "bbb", "ccc").delayElements(Duration.ofSeconds(1)); //每隔1s打印一次flux.map(s -> {return s.toUpperCase(); //操作 Flux 中的元素}).map(s -> {return s + "-1";}).subscribe(System.out :: println); //订阅 Flux 中的元素Thread.sleep(5000L); //由于打印元素需要时间,需要让程序延迟 5s 停止}
}
常见操作符
map()
元素一对一转化
filter()
过滤元素
merge()
合并多个 Flux,不保证顺序,如:
Flux.merge(Flux.just("A"), Flux.just("B"))
concat()
顺序拼接多个 Flux,保证顺序,如:
Flux.concat(Flux.just("A"), Flux.just("B"))
delayElements()
延迟元素发射,如:
.delayElements(Duration.ofSecends(1))
流式响应接口
后端代码:
@RestController
public class SSMController {@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> stream() {return Flux.interval(Duration.ofSeconds(1)).map(s -> {return new Date().toString();});}
}
后端每隔 1s 给前端发送当前时间。
前端代码:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SSE</title>
</head>
<body>
<div id="sse"></div>
<script>let eventSource = new EventSource("/stream");eventSource.onmessage = function(event){document.getElementById("sse").innerHTML= event.data;}
</script>
</body>
</html>