深入研究SSE协议
介绍
Server-Sent Events(SSE)是一种用于实现服务器向客户端实时推送数据的Web技术。与传统的轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。
SSE基于HTTP协议,允许服务器将数据以事件流(Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。
SSE的主要特点包括:
- 简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,使得数据的发送和解析都相对简单。
- 单向通信:SSE支持服务器向客户端的单向通信,服务器可以主动推送数据给客户端,而客户端只能接收数据。
- 实时性:SSE建立长时间的连接,使得服务器可以实时地将数据推送给客户端,而无需客户端频繁地发起请求。
SSE的基本概念
SSE基于标准的HTTP协议,客户端通过发送一个特殊的HTTPGET请求到服务器,请求中包含Accept: text/event-stream头
,表明客户端期望接收SSE数据流
。服务器响应后保持连接打开,并可以持续向客户端推送数据。数据流由一系列事件组成,每个事件都包含事件类型、数据内容和事件ID等信息,客户端可以使用JavaScript中的EventSource接口来监听服务器发送的事件,并进行相应的处理。
SSE通信协议
SSE(Server-sent Events)是一种允许服务器主动向客户端发送新数据的技术,它基于HTTP协议,但与传统HTTP请求响应模式不同,SSE提供了一种持久连接,允许服务器在有新数据时推送给客户端。这种机制非常适合于需要实时更新信息的应用场景。
HTTP请求和响应头设置
在SSE中,客户端首先向服务器发送一个HTTP GET请求,请求头中包含Accept: text/event-stream,表明客户端准备接收SSE数据流。服务器在响应时,需要设置特定的响应头来告知客户端这是一个SSE流:
- Content-Type: text/event-stream:指明响应的内容类型为SSE格式的文本流。
- Cache-Control:no-cache:指示响应内容不应被缓存,保证数据的实时性。
- Connection:keep-alive:保持连接打开,以便服务器可以持续发送数据。
SSE字段介绍
SSE数据流由一系列的字段组成,每个字段都以键值对的形式出现,字段之间用换行符分隔:
- event: <event_name>:可选字段,用于指定事件的名称,message是默认的事件名称。
- data::必须字段,包含事件的数据内容,可以有多行,每行都以data:开头。
- id::可选字段,提供一个唯一的标识符给事件,可用于断线重连和消息追踪。
- retry: :可选字段,指定客户端在连接断开后重连的间隔时间。
SSE事件流数据示例
以下是一个SSE事件流的示例,展示了如何逐字发送消息“Hello, world”:
event:message
data:Hevent:message
data:eevent:message
data:levent:message
data:levent:message
data:oevent:message
data:,event:message
data:wevent:message
data:oevent:message
data:revent:message
data:levent:message
data:devent:end
data:
在这个示例中,每发送一个字,都会先发送一个event:message字段,然后是data:字段,后面跟着具体的字符。最后,通过发送event:end字段来标记事件流的结束。
SSE的建连和断开流程
SSE服务端主要通过TCP层连接状态变化和Netty框架的事件机制来检测客户端断开。当客户端主动断开时,操作系统TCP栈会发送相应的信号,Netty捕获这些信号并通过ChannelInactive等事件通知应用程序,从而及时清理资源并更新连接状态。
SSE与WebSocket的比较
WebSocket是另一种用于实现实时双向通信的Web技术,它与SSE在某些方面有所不同。下面是SSE和WebSocket之间的比较:
- 数据推送方向:SSE是服务器向客户端的单向通信,服务器可以主动推送数据给客户端。而WebSocket是双向通信,允许服务器和客户端之间进行实时的双向数据交换。
- 连接建立:SSE使用基于HTTP的长连接,通过普通的HTTP请求和响应来建立连接,从而实现数据的实时推送。WebSocket使用自定义的协议,通过建立WebSocket连接来实现双向通信。
- 兼容性:由于SSE基于HTTP协议,它可以在大多数现代浏览器中使用,并且不需要额外的协议升级。WebSocket在绝大多数现代浏览器中也得到了支持,但在某些特殊的网络环境下可能会遇到问题。
- 适用场景:SSE适用于服务器向客户端实时推送数据的场景,如股票价格更新、新闻实时推送等。WebSocket适用于需要实时双向通信的场景,如聊天应用、多人协同编辑等。
根据具体的业务需求和场景,选择SSE或WebSocket取决于您的实际需求。如果您只需要服务器向客户端单向推送数据,并且希望保持简单易用和兼容性好,那么SSE是一个不错的选择。如果您需要实现双向通信,或者需要更高级的功能和控制,那么WebSocket可能更适合您的需求。
进行SSE实时数据推送时的注意点
- 异步处理:由于SSE是基于长连接的机制,推送数据的过程是一个长时间的操作。为了不阻塞服务器线程,推送使用异步方式处理SSE请求。可以在控制器方法中使用@Async注解或使用CompletableFuture等异步编程方式。
- 超时处理:SSE连接可能会因为网络中断、客户端关闭等原因而发生超时。为了避免无效的连接一直保持在服务器端,您可以设置超时时间并处理连接超时的情况。可以使用SseEmitter对象的setTimeout()方法设置超时时间,并通过onTimeout()方法处理连接超时的逻辑。
- 异步处理:在实际应用中,可能会出现一些异常情况,如网络异常、推送数据失败等。您可以使用SseEmitter对象的completeWithError()方法将异常信息发送给客户端,并在客户端通过eventSource.onerror事件进行处理。
- 内存管理:使用SseEmitter时需要注意内存管理,特别是在大量并发连接的情况下。当客户端断开连接时,务必及时释放SseEmitter对象,避免造成资源泄漏和内存溢出。
- 并发性能:SSE的并发连接数可能会对服务器的性能造成影响。如果需要处理大量的并发连接,可以考虑使用线程池或其他异步处理方式,以充分利用服务器资源。
- 客户端兼容性:虽然大多数现代浏览器都支持SSE,但仍然有一些旧版本的浏览器不支持。在使用SSE时,要确保您的目标客户端支持SSE,或者提供备用的实时数据推送机制。
小案例
SseServer.java
package org.example.sse;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedWriteHandler;import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class SseServer {// 存储活跃的客户端通道public static final ConcurrentHashMap<ChannelId, Channel> activeChannels = new ConcurrentHashMap<>();private static final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()) // 支持异步发送大文件,但不占用过多的内存,防止发生内存溢出.addLast(new SseHandler());}}).option(ChannelOption.SO_BACKLOG, 128) // 最大连接数.childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.bind(8080).sync();System.out.println("SSE Server started, visit: http://localhost:8080");startPeriodicMessagePushing(); // 定时向所有活跃客户端推送消息future.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}/*** 定时向所有活跃客户端推送消息*/private static void startPeriodicMessagePushing() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {String time = sdf.format(new Date());String sseMessage = "data: Server Time - " + time + "\n\n";for (ChannelId channelId : activeChannels.keySet()) {Channel channel = activeChannels.get(channelId);if (channel == null || !channel.isActive() || !channel.isWritable()) {activeChannels.remove(channelId);continue;}ByteBuf messageBuf = Unpooled.copiedBuffer(sseMessage, StandardCharsets.UTF_8);channel.eventLoop().execute(() -> {channel.writeAndFlush(new DefaultHttpContent(messageBuf)).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {System.err.println("Message push failed: " + future.cause().getMessage());activeChannels.remove(channelId);}});});}}, 0, 1, TimeUnit.SECONDS);}
}
SseHandler.java
package org.example.sse;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;import java.nio.charset.StandardCharsets;public class SseHandler extends SimpleChannelInboundHandler<HttpObject> {private boolean isSseConnectionEstablished = false;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {if (!(msg instanceof HttpRequest)) {return;}HttpRequest request = (HttpRequest) msg;String requestUri = request.uri();if (isSseConnectionEstablished) {return;}if ("/sse".equals(requestUri)) {establishSseConnection(ctx); // 建立SSE连接isSseConnectionEstablished = true;} else if ("/".equals(requestUri)) {returnClientHtml(ctx); // 返回客户端HTML页面} else {return404(ctx);}}/*** 建立SSE连接*/private void establishSseConnection(ChannelHandlerContext ctx) {Channel channel = ctx.channel();HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK);HttpHeaders headers = response.headers();headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream");headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");ctx.writeAndFlush(response).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {SseServer.activeChannels.put(channel.id(), channel);System.out.println("New SSE client connected, total: " + SseServer.activeChannels.size());} else {System.err.println("SSE header send failed: " + future.cause().getMessage());ctx.close();}});}/*** 返回客户端HTML页面*/private void returnClientHtml(ChannelHandlerContext ctx) {String html = "<!DOCTYPE html>\n" +"<html>\n" +"<head>\n" +" <meta charset=\"UTF-8\">\n" +" <title>SSE Demo</title>\n" +" <style>body { font-family: Arial; padding: 20px; }</style>\n" +"</head>\n" +"<body>\n" +" <h1>SSE Real-Time Messages</h1>\n" +" <div id=\"messageContainer\"></div>\n" +" <script>\n" +" const source = new EventSource('http://localhost:8080/sse');\n" + // 使用EventSource创建一个SSE连接" const container = document.getElementById('messageContainer');\n" +" source.onmessage = (e) => {\n" + // 监听服务器消息,当服务器通过SSE推送消息时,会触发onmessage事件" const div = document.createElement('div');\n" +" div.textContent = e.data;\n" +" container.appendChild(div);\n" +" };\n" +" source.onerror = (err) => {\n" +" console.error('SSE Error:', err);\n" +" source.close();\n" +" };\n" +" </script>\n" +"</body>\n" +"</html>";ByteBuf htmlBuf = Unpooled.copiedBuffer(html, StandardCharsets.UTF_8);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,htmlBuf);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");response.headers().set(HttpHeaderNames.CONTENT_LENGTH, htmlBuf.readableBytes());ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}/*** 返回404页面*/private void return404(ChannelHandlerContext ctx) {String notFound = "<h1>404 Not Found</h1><p>Visit http://localhost:8080</p>";ByteBuf buf = Unpooled.copiedBuffer(notFound, StandardCharsets.UTF_8);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.NOT_FOUND,buf);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {ChannelId channelId = ctx.channel().id();if (SseServer.activeChannels.remove(channelId) != null) {System.out.println("Client disconnected, total: " + SseServer.activeChannels.size());}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("Exception: " + cause.getMessage());SseServer.activeChannels.remove(ctx.channel().id());ctx.close();}
}
参考
- 深入浅出计算机网络:什么是SSE
- SSE(Server-Sent Events)是什么?
- 基于Netty编写SSE服务器,基于豆包生成
- SSE(Server-Sent Events)圣经: 底层原理 + 应用开发 + 技术对比 (图解+秒懂+史上最全)