当前位置: 首页 > news >正文

深入研究SSE协议

介绍

Server-Sent Events(SSE)是一种用于实现服务器向客户端实时推送数据的Web技术。与传统的轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。

SSE基于HTTP协议,允许服务器将数据以事件流Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。

SSE的主要特点包括:

  1. 简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,使得数据的发送和解析都相对简单。
  2. 单向通信:SSE支持服务器向客户端的单向通信,服务器可以主动推送数据给客户端,而客户端只能接收数据。
  3. 实时性:SSE建立长时间的连接,使得服务器可以实时地将数据推送给客户端,而无需客户端频繁地发起请求。

SSE的基本概念

SSE基于标准的HTTP协议,客户端通过发送一个特殊的HTTPGET请求到服务器,请求中包含Accept: text/event-stream头,表明客户端期望接收SSE数据流。服务器响应后保持连接打开,并可以持续向客户端推送数据。数据流由一系列事件组成,每个事件都包含事件类型、数据内容和事件ID等信息,客户端可以使用JavaScript中的EventSource接口来监听服务器发送的事件,并进行相应的处理。

SSE通信协议

SSE(Server-sent Events)是一种允许服务器主动向客户端发送新数据的技术,它基于HTTP协议,但与传统HTTP请求响应模式不同,SSE提供了一种持久连接,允许服务器在有新数据时推送给客户端。这种机制非常适合于需要实时更新信息的应用场景。

HTTP请求和响应头设置

在SSE中,客户端首先向服务器发送一个HTTP GET请求,请求头中包含Accept: text/event-stream,表明客户端准备接收SSE数据流。服务器在响应时,需要设置特定的响应头来告知客户端这是一个SSE流:

  • Content-Type: text/event-stream:指明响应的内容类型为SSE格式的文本流。
  • Cache-Control:no-cache:指示响应内容不应被缓存,保证数据的实时性。
  • Connection:keep-alive:保持连接打开,以便服务器可以持续发送数据。

SSE字段介绍

SSE数据流由一系列的字段组成,每个字段都以键值对的形式出现,字段之间用换行符分隔:

  • event: <event_name>:可选字段,用于指定事件的名称,message是默认的事件名称。
  • data::必须字段,包含事件的数据内容,可以有多行,每行都以data:开头。
  • id::可选字段,提供一个唯一的标识符给事件,可用于断线重连和消息追踪。
  • retry: :可选字段,指定客户端在连接断开后重连的间隔时间。

SSE事件流数据示例

以下是一个SSE事件流的示例,展示了如何逐字发送消息“Hello, world”:

event:message
data:Hevent:message
data:eevent:message
data:levent:message
data:levent:message
data:oevent:message
data:,event:message
data:wevent:message
data:oevent:message
data:revent:message
data:levent:message
data:devent:end
data:

在这个示例中,每发送一个字,都会先发送一个event:message字段,然后是data:字段,后面跟着具体的字符。最后,通过发送event:end字段来标记事件流的结束。

SSE的建连和断开流程

在这里插入图片描述

SSE服务端主要通过TCP层连接状态变化和Netty框架的事件机制来检测客户端断开。当客户端主动断开时,操作系统TCP栈会发送相应的信号,Netty捕获这些信号并通过ChannelInactive等事件通知应用程序,从而及时清理资源并更新连接状态。

SSE与WebSocket的比较

WebSocket是另一种用于实现实时双向通信的Web技术,它与SSE在某些方面有所不同。下面是SSE和WebSocket之间的比较:

  1. 数据推送方向:SSE是服务器向客户端的单向通信,服务器可以主动推送数据给客户端。而WebSocket是双向通信,允许服务器和客户端之间进行实时的双向数据交换。
  2. 连接建立:SSE使用基于HTTP的长连接,通过普通的HTTP请求和响应来建立连接,从而实现数据的实时推送。WebSocket使用自定义的协议,通过建立WebSocket连接来实现双向通信。
  3. 兼容性:由于SSE基于HTTP协议,它可以在大多数现代浏览器中使用,并且不需要额外的协议升级。WebSocket在绝大多数现代浏览器中也得到了支持,但在某些特殊的网络环境下可能会遇到问题。
  4. 适用场景:SSE适用于服务器向客户端实时推送数据的场景,如股票价格更新、新闻实时推送等。WebSocket适用于需要实时双向通信的场景,如聊天应用、多人协同编辑等。

根据具体的业务需求和场景,选择SSE或WebSocket取决于您的实际需求。如果您只需要服务器向客户端单向推送数据,并且希望保持简单易用和兼容性好,那么SSE是一个不错的选择。如果您需要实现双向通信,或者需要更高级的功能和控制,那么WebSocket可能更适合您的需求。

进行SSE实时数据推送时的注意点

  1. 异步处理:由于SSE是基于长连接的机制,推送数据的过程是一个长时间的操作。为了不阻塞服务器线程,推送使用异步方式处理SSE请求。可以在控制器方法中使用@Async注解或使用CompletableFuture等异步编程方式。
  2. 超时处理:SSE连接可能会因为网络中断、客户端关闭等原因而发生超时。为了避免无效的连接一直保持在服务器端,您可以设置超时时间并处理连接超时的情况。可以使用SseEmitter对象的setTimeout()方法设置超时时间,并通过onTimeout()方法处理连接超时的逻辑。
  3. 异步处理:在实际应用中,可能会出现一些异常情况,如网络异常、推送数据失败等。您可以使用SseEmitter对象的completeWithError()方法将异常信息发送给客户端,并在客户端通过eventSource.onerror事件进行处理。
  4. 内存管理:使用SseEmitter时需要注意内存管理,特别是在大量并发连接的情况下。当客户端断开连接时,务必及时释放SseEmitter对象,避免造成资源泄漏和内存溢出。
  5. 并发性能:SSE的并发连接数可能会对服务器的性能造成影响。如果需要处理大量的并发连接,可以考虑使用线程池或其他异步处理方式,以充分利用服务器资源。
  6. 客户端兼容性:虽然大多数现代浏览器都支持SSE,但仍然有一些旧版本的浏览器不支持。在使用SSE时,要确保您的目标客户端支持SSE,或者提供备用的实时数据推送机制。

小案例

SseServer.java

package org.example.sse;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedWriteHandler;import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class SseServer {// 存储活跃的客户端通道public static final ConcurrentHashMap<ChannelId, Channel> activeChannels = new ConcurrentHashMap<>();private static final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()) // 支持异步发送大文件,但不占用过多的内存,防止发生内存溢出.addLast(new SseHandler());}}).option(ChannelOption.SO_BACKLOG, 128)        // 最大连接数.childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.bind(8080).sync();System.out.println("SSE Server started, visit: http://localhost:8080");startPeriodicMessagePushing();      // 定时向所有活跃客户端推送消息future.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}/*** 定时向所有活跃客户端推送消息*/private static void startPeriodicMessagePushing() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {String time = sdf.format(new Date());String sseMessage = "data: Server Time - " + time + "\n\n";for (ChannelId channelId : activeChannels.keySet()) {Channel channel = activeChannels.get(channelId);if (channel == null || !channel.isActive() || !channel.isWritable()) {activeChannels.remove(channelId);continue;}ByteBuf messageBuf = Unpooled.copiedBuffer(sseMessage, StandardCharsets.UTF_8);channel.eventLoop().execute(() -> {channel.writeAndFlush(new DefaultHttpContent(messageBuf)).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {System.err.println("Message push failed: " + future.cause().getMessage());activeChannels.remove(channelId);}});});}}, 0, 1, TimeUnit.SECONDS);}
}

SseHandler.java

package org.example.sse;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;import java.nio.charset.StandardCharsets;public class SseHandler extends SimpleChannelInboundHandler<HttpObject> {private boolean isSseConnectionEstablished = false;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {if (!(msg instanceof HttpRequest)) {return;}HttpRequest request = (HttpRequest) msg;String requestUri = request.uri();if (isSseConnectionEstablished) {return;}if ("/sse".equals(requestUri)) {establishSseConnection(ctx);    // 建立SSE连接isSseConnectionEstablished = true;} else if ("/".equals(requestUri)) {returnClientHtml(ctx);          // 返回客户端HTML页面} else {return404(ctx);}}/*** 建立SSE连接*/private void establishSseConnection(ChannelHandlerContext ctx) {Channel channel = ctx.channel();HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK);HttpHeaders headers = response.headers();headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream");headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);headers.set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");ctx.writeAndFlush(response).addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {SseServer.activeChannels.put(channel.id(), channel);System.out.println("New SSE client connected, total: " + SseServer.activeChannels.size());} else {System.err.println("SSE header send failed: " + future.cause().getMessage());ctx.close();}});}/*** 返回客户端HTML页面*/private void returnClientHtml(ChannelHandlerContext ctx) {String html = "<!DOCTYPE html>\n" +"<html>\n" +"<head>\n" +"    <meta charset=\"UTF-8\">\n" +"    <title>SSE Demo</title>\n" +"    <style>body { font-family: Arial; padding: 20px; }</style>\n" +"</head>\n" +"<body>\n" +"    <h1>SSE Real-Time Messages</h1>\n" +"    <div id=\"messageContainer\"></div>\n" +"    <script>\n" +"        const source = new EventSource('http://localhost:8080/sse');\n" +      // 使用EventSource创建一个SSE连接"        const container = document.getElementById('messageContainer');\n" +"        source.onmessage = (e) => {\n" +                                       // 监听服务器消息,当服务器通过SSE推送消息时,会触发onmessage事件"            const div = document.createElement('div');\n" +"            div.textContent = e.data;\n" +"            container.appendChild(div);\n" +"        };\n" +"        source.onerror = (err) => {\n" +"            console.error('SSE Error:', err);\n" +"            source.close();\n" +"        };\n" +"    </script>\n" +"</body>\n" +"</html>";ByteBuf htmlBuf = Unpooled.copiedBuffer(html, StandardCharsets.UTF_8);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,htmlBuf);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");response.headers().set(HttpHeaderNames.CONTENT_LENGTH, htmlBuf.readableBytes());ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}/*** 返回404页面*/private void return404(ChannelHandlerContext ctx) {String notFound = "<h1>404 Not Found</h1><p>Visit http://localhost:8080</p>";ByteBuf buf = Unpooled.copiedBuffer(notFound, StandardCharsets.UTF_8);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.NOT_FOUND,buf);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {ChannelId channelId = ctx.channel().id();if (SseServer.activeChannels.remove(channelId) != null) {System.out.println("Client disconnected, total: " + SseServer.activeChannels.size());}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("Exception: " + cause.getMessage());SseServer.activeChannels.remove(ctx.channel().id());ctx.close();}
}

参考

  • 深入浅出计算机网络:什么是SSE
  • SSE(Server-Sent Events)是什么?
  • 基于Netty编写SSE服务器,基于豆包生成
  • SSE(Server-Sent Events)圣经: 底层原理 + 应用开发 + 技术对比 (图解+秒懂+史上最全)
http://www.dtcms.com/a/400576.html

相关文章:

  • 建网站书籍建行网站是多少呢
  • 泊头市网站建设广东小程序系统开发
  • 备案网站服务内容世界互联网巨头
  • 北京工程工程建设交易信息网站门户网站都在哪推广
  • 厦门有没网站建设的公司南坪做网站
  • 如何搭建高并发的在线教育系统?源码与APP开发的底层逻辑解析
  • 北京朝阳区最好的小区微信seo排名优化软件
  • 网站右侧广告家乡网站建设策划书模板
  • dede网站被黑什么是网站优化主要包括那几个
  • 香橙派(orangepi)zero扩充空间
  • 网站和app软件制作公司建筑工程网上报建流程
  • Linux 交换空间管理
  • 网站网页设计培训做企业平台的网站有哪些内容
  • 网站开发 前端 外包网站出现风险如何处理方法
  • 北京规划网站wordpress 去掉 index.php
  • 如何搭建网站赚点击网站开发作业总结
  • 赤峰是住房和城乡建设局网站萍乡网站建设公司
  • 淘宝网站开发实训报告企业网站开发报告
  • 莘县网站建设公司宝安做网站公司乐云seo
  • 如何评价网站是否做的好坏用手机自创游戏
  • 余姚建设公司网站苏州建设工程协会网站
  • 阿克苏市建设银行网站深圳58同城招聘网最新招聘
  • 东莞国网站建设云南省疾控中心最新提示
  • 邢台网站建设联系电话北京网站推广价格
  • 简述如何让网站排名快速提升网站建设方案 规划
  • Wan2.2-Animate开源发布:一个模型通吃两大视频玩法
  • 直接进网站的浏览器打开网站右侧浮动代码
  • 湖南做网站问磐石网络专业一键生成广告
  • 允许发外链的网站珠海网站制作公
  • 网站优化外包费用465端口 WordPress