当前位置: 首页 > news >正文

实时通信技术大比拼:长轮询、短轮询、WebSocket 与 SSE 深度解析及实战指南

在当今的 Web 应用中,实时通信已经成为不可或缺的核心功能。从即时聊天、实时数据监控到协同编辑工具,用户期望能够实时获取信息并与系统进行交互。然而,实现高效、可靠的实时通信并非易事,开发者需要在多种技术方案中做出选择。

本文将深入剖析四种主流的实时通信技术:短轮询、长轮询、WebSocket 和 SSE(服务器发送事件),从底层原理、优缺点分析到实战案例,全方位对比它们的适用场景和实现方式。无论你是前端开发者还是后端工程师,都能从本文中获得实用的知识和可直接应用的代码示例。

一、实时通信技术概述

实时通信(Real-Time Communication)指的是客户端和服务器之间能够快速交换数据,使信息能够近乎即时地更新。在传统的 HTTP 通信模式中,客户端总是主动发起请求,服务器被动响应,这种模式难以满足实时性要求高的场景。

为了解决这个问题,工程师们开发了多种实时通信技术,每种技术都有其独特的实现方式和适用场景:

这四种技术各有优劣,选择合适的技术取决于具体的应用场景、浏览器兼容性要求、服务器负载考量以及数据传输的方向和格式等因素。

二、短轮询(Short Polling)

2.1 技术原理

短轮询是实现实时通信最简单直接的方式,它基于传统的 HTTP 请求 / 响应模式:

  1. 客户端按照固定的时间间隔(如每 1 秒)向服务器发送 HTTP 请求
  2. 服务器收到请求后,立即返回最新的数据(无论数据是否有更新)
  3. 客户端处理响应数据,然后等待下一个时间间隔再次发送请求

短轮询的核心思想是通过频繁的请求来模拟 "实时" 效果,实现简单,兼容性极好,几乎所有浏览器和服务器都支持。

2.2 优缺点分析

优点:

  • 实现简单,开发成本低
  • 兼容性极佳,无浏览器限制
  • 不需要特殊的服务器配置
  • 天然支持负载均衡

缺点:

  • 实时性差,取决于轮询间隔
  • 资源浪费严重,即使没有新数据也会频繁发送请求
  • 增加服务器负担,大量无效请求
  • 网络流量大,每个请求都包含 HTTP 头信息

2.3 适用场景

短轮询适用于实时性要求不高、用户量不大的场景,例如:

  • 简单的通知提醒
  • 数据更新频率低的应用
  • 对兼容性要求极高的场景
  • 快速原型开发

2.4 实战示例

下面我们使用 Spring Boot 实现一个短轮询的示例,包括后端 API 和前端页面。

后端实现:

首先,添加必要的依赖(pom.xml):

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.0</version><relativePath/></parent><groupId>com.example</groupId><artifactId>realtime-communication-demo</artifactId><version>1.0.0</version><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.32</version></dependency><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>2.2.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

创建数据模型类:

package com.example.realtime.model;import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;import java.time.LocalDateTime;/*** 消息模型类** @author ken*/
@Data
@Schema(description = "消息模型")
public class Message {@Schema(description = "消息ID")private String id;@Schema(description = "消息内容")private String content;@Schema(description = "发送时间")private LocalDateTime timestamp;
}

创建服务类:

package com.example.realtime.service;import com.example.realtime.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;/*** 消息服务类,管理消息的存储和获取** @author ken*/
@Slf4j
@Service
public class MessageService {/*** 存储消息的列表,使用CopyOnWriteArrayList保证线程安全*/private final CopyOnWriteArrayList<Message> messages = new CopyOnWriteArrayList<>();/*** 添加新消息** @param content 消息内容* @return 生成的消息对象*/public Message addMessage(String content) {Message message = new Message();message.setId(UUID.randomUUID().toString());message.setContent(content);message.setTimestamp(LocalDateTime.now());messages.add(message);log.info("添加新消息: {}", message);return message;}/*** 获取指定时间之后的所有消息** @param lastTimestamp 上次获取消息的时间戳* @return 新消息列表*/public CopyOnWriteArrayList<Message> getMessagesAfter(LocalDateTime lastTimestamp) {CopyOnWriteArrayList<Message> result = new CopyOnWriteArrayList<>();if (lastTimestamp == null) {// 如果没有上次时间戳,返回所有消息result.addAll(messages);} else {// 只返回比上次时间戳新的消息for (Message message : messages) {if (message.getTimestamp().isAfter(lastTimestamp)) {result.add(message);}}}log.info("获取到 {} 条新消息", result.size());return result;}
}

创建短轮询控制器:

package com.example.realtime.controller;import com.alibaba.fastjson2.JSON;
import com.example.realtime.model.Message;
import com.example.realtime.service.MessageService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;/*** 短轮询控制器** @author ken*/
@Slf4j
@RestController
@RequestMapping("/api/short-polling")
@RequiredArgsConstructor
@Tag(name = "短轮询API", description = "基于短轮询的实时通信接口")
public class ShortPollingController {private final MessageService messageService;private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME;/*** 获取最新消息** @param lastTimestamp 上次获取消息的时间戳,ISO格式* @return 新消息列表*/@GetMapping("/messages")@Operation(summary = "获取最新消息",description = "客户端定期调用此接口获取最新消息",parameters = @Parameter(name = "lastTimestamp", description = "上次获取消息的时间戳,ISO格式"),responses = @ApiResponse(responseCode = "200",description = "成功获取消息",content = @Content(schema = @Schema(implementation = Message.class))))public ResponseEntity<List<Message>> getMessages(@RequestParam(required = false) String lastTimestamp) {LocalDateTime lastTime = null;if (StringUtils.hasText(lastTimestamp)) {try {lastTime = LocalDateTime.parse(lastTimestamp, FORMATTER);} catch (Exception e) {log.error("解析时间戳失败", e);return ResponseEntity.badRequest().build();}}List<Message> messages = messageService.getMessagesAfter(lastTime);return ResponseEntity.ok(messages);}/*** 发送消息** @param content 消息内容* @return 发送成功的消息*/@PostMapping("/messages")@Operation(summary = "发送消息",description = "发送新消息到服务器",responses = @ApiResponse(responseCode = "200",description = "消息发送成功",content = @Content(schema = @Schema(implementation = Message.class))))public ResponseEntity<Message> sendMessage(@Parameter(description = "消息内容", required = true)@RequestBody String content) {if (!StringUtils.hasText(content)) {return ResponseEntity.badRequest().build();}Message message = messageService.addMessage(content);return ResponseEntity.ok(message);}
}

前端实现(short-polling.html):

<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><title>短轮询示例</title><script src="https://cdn.tailwindcss.com"></script><link href="https://cdn.jsdelivr.net/npm/font-awesome@4.7.0/css/font-awesome.min.css" rel="stylesheet">
</head>
<body class="bg-gray-100"><div class="container mx-auto px-4 py-8 max-w-4xl"><h1 class="text-3xl font-bold mb-6 text-center">短轮询实时通信示例</h1><div class="bg-white rounded-lg shadow-md p-6 mb-6"><h2 class="text-xl font-semibold mb-4">消息列表</h2><div id="messageContainer" class="border rounded-lg p-4 h-64 overflow-y-auto mb-4"></div><div class="flex"><input type="text" id="messageInput" class="flex-1 border rounded-l-md px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"placeholder="输入消息..."><button id="sendButton" class="bg-blue-500 text-white px-4 py-2 rounded-r-md hover:bg-blue-600 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:ring-offset-2"><i class="fa fa-paper-plane mr-1"></i>发送</button></div><div class="mt-4 text-sm text-gray-500"><p>轮询状态: <span id="pollStatus" class="text-yellow-500">未开始</span></p><p>轮询间隔: <span id="pollInterval">2000</span> 毫秒</p><p>最后更新时间: <span id="lastUpdate">-</span></p></div></div></div><script>// 全局变量let lastTimestamp = null;let pollInterval = 2000; // 2秒let pollTimer = null;const API_BASE_URL = '/api/short-polling';// DOM元素const messageContainer = document.getElementById('messageContainer');const messageInput = document.getElementById('messageInput');const sendButton = document.getElementById('sendButton');const pollStatus = document.getElementById('pollStatus');const pollIntervalInput = document.getElementById('pollInterval');const lastUpdateElement = document.getElementById('lastUpdate');// 初始化document.addEventListener('DOMContentLoaded', () => {startPolling();// 发送消息按钮点击事件sendButton.addEventListener('click', sendMessage);// 输入框回车事件messageInput.addEventListener('keypress', (e) => {if (e.key === 'Enter') {sendMessage();}});});/*** 开始轮询*/function startPolling() {pollStatus.textContent = '运行中';pollStatus.className = 'text-green-500';fetchMessages();}/*** 停止轮询*/function stopPolling() {pollStatus.textContent = '已停止';pollStatus.className = 'text-red-500';if (pollTimer) {clearTimeout(pollTimer);pollTimer = null;}}/*** 获取消息*/async function fetchMessages() {try {// 构建请求URL,包含上次时间戳let url = `${API_BASE_URL}/messages`;if (lastTimestamp) {url += `?lastTimestamp=${encodeURIComponent(lastTimestamp)}`;}// 发送请求const response = await fetch(url);if (!response.ok) {throw new Error(`HTTP error! status: ${response.status}`);}// 处理响应const messages = await response.json();if (messages.length > 0) {displayMessages(messages);// 更新最后时间戳为最新消息的时间lastTimestamp = messages[messages.length - 1].timestamp;}// 更新最后更新时间lastUpdateElement.textContent = new Date().toLocaleTimeString();} catch (error) {console.error('获取消息失败:', error);pollStatus.textContent = '出错';pollStatus.className = 'text-red-500';} finally {// 安排下一次轮询pollTimer = setTimeout(fetchMessages, pollInterval);}}/*** 发送消息*/async function sendMessage() {const content = messageInput.value.trim();if (!content) return;try {const response = await fetch(`${API_BASE_URL}/messages`, {method: 'POST',headers: {'Content-Type': 'text/plain',},body: content});if (!response.ok) {throw new Error(`发送消息失败: ${response.status}`);}// 清空输入框messageInput.value = '';} catch (error) {console.error('发送消息失败:', error);alert('发送消息失败,请重试');}}/*** 显示消息* @param {Array} messages 消息数组*/function displayMessages(messages) {messages.forEach(message => {const messageElement = document.createElement('div');messageElement.className = 'mb-2 p-2 bg-gray-50 rounded border';const time = new Date(message.timestamp).toLocaleTimeString();messageElement.innerHTML = `<div class="text-sm text-gray-500">${time}</div><div class="text-gray-800">${escapeHtml(message.content)}</div>`;messageContainer.appendChild(messageElement);});// 滚动到底部messageContainer.scrollTop = messageContainer.scrollHeight;}/*** HTML转义,防止XSS攻击* @param {string} text 文本内容* @return {string} 转义后的文本*/function escapeHtml(text) {return text.replace(/&/g, "&amp;").replace(/</g, "&lt;").replace(/>/g, "&gt;").replace(/"/g, "&quot;").replace(/'/g, "&#039;");}</script>
</body>
</html>

三、长轮询(Long Polling)

3.1 技术原理

长轮询是对短轮询的改进,旨在减少无效请求,提高实时性:

  1. 客户端向服务器发送 HTTP 请求
  2. 服务器收到请求后,如果没有新数据,不会立即响应,而是将请求挂起
  3. 当有新数据可用时,服务器立即响应客户端的请求
  4. 客户端收到响应后,处理数据,并立即发送一个新的长轮询请求,保持连接

长轮询的核心思想是将多个无效的短轮询合并为一个长连接,只有当有数据更新时才返回响应,减少了网络传输和服务器负载。

3.2 优缺点分析

优点:

  • 实时性比短轮询好,数据更新后能立即推送
  • 减少了无效请求,降低了网络流量
  • 兼容性好,大多数浏览器和服务器都支持
  • 实现相对简单,不需要特殊协议支持

缺点:

  • 服务器需要维护大量挂起的连接,增加内存消耗
  • 每个响应后仍需要重新建立连接,有一定开销
  • 可能受限于服务器的连接超时设置
  • HTTP 头信息仍然会带来额外开销

3.3 适用场景

长轮询适用于以下场景:

  • 实时性要求中等的应用
  • 数据更新频率不确定的场景
  • 不能使用 WebSocket 的环境
  • 如社交媒体通知、实时评论系统等

3.4 实战示例

下面实现一个基于 Spring Boot 的长轮询示例:

后端实现:

创建长轮询控制器:

package com.example.realtime.controller;import com.alibaba.fastjson2.JSON;
import com.example.realtime.model.Message;
import com.example.realtime.service.MessageService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.*;/*** 长轮询控制器** @author ken*/
@Slf4j
@RestController
@RequestMapping("/api/long-polling")
@RequiredArgsConstructor
@Tag(name = "长轮询API", description = "基于长轮询的实时通信接口")
public class LongPollingController {private final MessageService messageService;private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME;/*** 用于存储等待消息的客户端*/private final ConcurrentMap<String, CompletableFuture<ResponseEntity<List<Message>>>> waitingClients = new ConcurrentHashMap<>();/*** 长轮询超时时间,30秒*/private static final long TIMEOUT_SECONDS = 30;/*** 获取最新消息(长轮询)** @param clientId 客户端ID,用于标识不同的客户端* @param lastTimestamp 上次获取消息的时间戳,ISO格式* @return 新消息列表,如果没有新消息则会阻塞等待*/@GetMapping("/messages")@Operation(summary = "获取最新消息(长轮询)",description = "客户端调用此接口获取最新消息,如果没有新消息,服务器会阻塞等待直到有新消息或超时",parameters = {@Parameter(name = "clientId", description = "客户端ID", required = true),@Parameter(name = "lastTimestamp", description = "上次获取消息的时间戳,ISO格式")},responses = @ApiResponse(responseCode = "200",description = "成功获取消息或超时",content = @Content(schema = @Schema(implementation = Message.class))))public CompletableFuture<ResponseEntity<List<Message>>> getMessages(@RequestParam String clientId,@RequestParam(required = false) String lastTimestamp) {// 解析上次时间戳LocalDateTime lastTime = null;if (StringUtils.hasText(lastTimestamp)) {try {lastTime = LocalDateTime.parse(lastTimestamp, FORMATTER);} catch (Exception e) {log.error("解析时间戳失败", e);return CompletableFuture.completedFuture(ResponseEntity.badRequest().build());}}// 检查是否已有新消息List<Message> messages = messageService.getMessagesAfter(lastTime);if (!messages.isEmpty()) {// 如果有新消息,立即返回return CompletableFuture.completedFuture(ResponseEntity.ok(messages));}// 如果没有新消息,创建一个CompletableFuture并存储,等待新消息CompletableFuture<ResponseEntity<List<Message>>> future = new CompletableFuture<>();waitingClients.put(clientId, future);// 设置超时ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.schedule(() -> {if (!future.isDone()) {// 超时,返回空列表future.complete(ResponseEntity.ok(List.of()));waitingClients.remove(clientId);}scheduler.shutdown();}, TIMEOUT_SECONDS, TimeUnit.SECONDS);log.info("客户端 {} 进入等待状态", clientId);return future;}/*** 发送消息,并通知等待的客户端** @param content 消息内容* @return 发送成功的消息*/@PostMapping("/messages")@Operation(summary = "发送消息",description = "发送新消息到服务器,服务器会通知所有等待的客户端",responses = @ApiResponse(responseCode = "200",description = "消息发送成功",content = @Content(schema = @Schema(implementation = Message.class))))public ResponseEntity<Message> sendMessage(@Parameter(description = "消息内容", required = true)@RequestBody String content) {if (!StringUtils.hasText(content)) {return ResponseEntity.badRequest().build();}// 添加消息Message message = messageService.addMessage(content);// 通知所有等待的客户端notifyWaitingClients();return ResponseEntity.ok(message);}/*** 通知所有等待的客户端有新消息*/private void notifyWaitingClients() {// 遍历所有等待的客户端并通知for (var entry : waitingClients.entrySet()) {String clientId = entry.getKey();CompletableFuture<ResponseEntity<List<Message>>> future = entry.getValue();if (!future.isDone()) {try {// 获取所有消息(实际应用中应该根据客户端的lastTimestamp获取)List<Message> messages = messageService.getMessagesAfter(null);future.complete(ResponseEntity.ok(messages));log.info("通知客户端 {} 有新消息", clientId);} catch (Exception e) {log.error("通知客户端 {} 失败", clientId, e);future.completeExceptionally(e);}}}// 清空等待列表waitingClients.clear();}
}

前端实现(long-polling.html):

<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><title>长轮询示例</title><script src="https://cdn.tailwindcss.com"></script><link href="https://cdn.jsdelivr.net/npm/font-awesome@4.7.0/css/font-awesome.min.css" rel="stylesheet">
</head>
<body class="bg-gray-100"><div class="container mx-auto px-4 py-8 max-w-4xl"><h1 class="text-3xl font-bold mb-6 text-center">长轮询实时通信示例</h1><div class="bg-white rounded-lg shadow-md p-6 mb-6"><h2 class="text-xl font-semibold mb-4">消息列表</h2><div id="messageContainer" class="border rounded-lg p-4 h-64 overflow-y-auto mb-4"></div><div class="flex"><input type="text" id="messageInput" class="flex-1 border rounded-l-md px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"placeholder="输入消息..."><button id="sendButton" class="bg-blue-500 text-white px-4 py-2 rounded-r-md hover:bg-blue-600 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:ring-offset-2"><i class="fa fa-paper-plane mr-1"></i>发送</button></div><div class="mt-4 text-sm text-gray-500"><p>客户端ID: <span id="clientId"></span></p><p>连接状态: <span id="connectionStatus" class="text-yellow-500">未连接</span></p><p>最后更新时间: <span id="lastUpdate">-</span></p></div></div></div><script>// 全局变量let lastTimestamp = null;let clientId = generateClientId();const API_BASE_URL = '/api/long-polling';let isPolling = false;// DOM元素const messageContainer = document.getElementById('messageContainer');const messageInput = document.getElementById('messageInput');const sendButton = document.getElementById('sendButton');const clientIdElement = document.getElementById('clientId');const connectionStatusElement = document.getElementById('connectionStatus');const lastUpdateElement = document.getElementById('lastUpdate');// 初始化document.addEventListener('DOMContentLoaded', () => {clientIdElement.textContent = clientId;startLongPolling();// 发送消息按钮点击事件sendButton.addEventListener('click', sendMessage);// 输入框回车事件messageInput.addEventListener('keypress', (e) => {if (e.key === 'Enter') {sendMessage();}});});/*** 生成唯一客户端ID* @returns {string} 客户端ID*/function generateClientId() {return 'client_' + Math.random().toString(36).substring(2, 10);}/*** 开始长轮询*/function startLongPolling() {if (isPolling) return;isPolling = true;connectionStatusElement.textContent = '连接中';connectionStatusElement.className = 'text-blue-500';longPoll();}/*** 停止长轮询*/function stopLongPolling() {isPolling = false;connectionStatusElement.textContent = '已断开';connectionStatusElement.className = 'text-red-500';}/*** 长轮询请求*/async function longPoll() {if (!isPolling) return;try {// 构建请求URLlet url = `${API_BASE_URL}/messages?clientId=${encodeURIComponent(clientId)}`;if (lastTimestamp) {url += `&lastTimestamp=${encodeURIComponent(lastTimestamp)}`;}// 发送请求connectionStatusElement.textContent = '等待中';connectionStatusElement.className = 'text-yellow-500';const response = await fetch(url);if (!response.ok) {throw new Error(`HTTP error! status: ${response.status}`);}// 处理响应const messages = await response.json();if (messages.length > 0) {displayMessages(messages);// 更新最后时间戳为最新消息的时间lastTimestamp = messages[messages.length - 1].timestamp;// 更新最后更新时间lastUpdateElement.textContent = new Date().toLocaleTimeString();}// 立即发起下一次长轮询setTimeout(longPoll, 0);} catch (error) {console.error('长轮询失败:', error);connectionStatusElement.textContent = '出错,重试中';connectionStatusElement.className = 'text-orange-500';// 出错后延迟重试setTimeout(longPoll, 3000);}}/*** 发送消息*/async function sendMessage() {const content = messageInput.value.trim();if (!content) return;try {const response = await fetch(`${API_BASE_URL}/messages`, {method: 'POST',headers: {'Content-Type': 'text/plain',},body: content});if (!response.ok) {throw new Error(`发送消息失败: ${response.status}`);}// 清空输入框messageInput.value = '';} catch (error) {console.error('发送消息失败:', error);alert('发送消息失败,请重试');}}/*** 显示消息* @param {Array} messages 消息数组*/function displayMessages(messages) {messages.forEach(message => {const messageElement = document.createElement('div');messageElement.className = 'mb-2 p-2 bg-gray-50 rounded border';const time = new Date(message.timestamp).toLocaleTimeString();messageElement.innerHTML = `<div class="text-sm text-gray-500">${time}</div><div class="text-gray-800">${escapeHtml(message.content)}</div>`;messageContainer.appendChild(messageElement);});// 滚动到底部messageContainer.scrollTop = messageContainer.scrollHeight;}/*** HTML转义,防止XSS攻击* @param {string} text 文本内容* @return {string} 转义后的文本*/function escapeHtml(text) {return text.replace(/&/g, "&amp;").replace(/</g, "&lt;").replace(/>/g, "&gt;").replace(/"/g, "&quot;").replace(/'/g, "&#039;");}</script>
</body>
</html>

四、WebSocket

4.1 技术原理

WebSocket 是 HTML5 引入的一种全双工通信协议,它提供了客户端和服务器之间持久的连接,允许双向实时通信:

  1. 客户端通过 HTTP 请求发起 WebSocket 握手,请求头中包含Upgrade: websocketConnection: Upgrade等信息
  2. 服务器同意升级协议后,建立 WebSocket 连接
  3. 连接建立后,客户端和服务器可以随时向对方发送数据,无需重新建立连接
  4. 通信使用帧(frame)格式,相比 HTTP 更加轻量
  5. 连接可以通过客户端或服务器主动关闭

WebSocket 的核心优势是建立一次连接后可以双向持续通信,避免了 HTTP 的请求 / 响应模式带来的开销,是真正意义上的实时通信技术。

4.2 优缺点分析

优点:

  • 全双工通信,客户端和服务器可以随时发送数据
  • 持久连接,减少了连接建立的开销
  • 轻量级协议,数据传输效率高
  • 实时性好,延迟低
  • 可以传输二进制数据

缺点:

  • 实现相对复杂
  • 某些老旧浏览器不支持(IE 10 及以下)
  • 需要服务器支持 WebSocket 协议
  • 可能被某些防火墙拦截
  • 负载均衡配置相对复杂

4.3 适用场景

WebSocket 适用于以下场景:

  • 实时聊天应用
  • 多人在线游戏
  • 实时协作工具(如文档协作)
  • 实时数据监控和仪表盘
  • 高频数据更新的应用

4.4 实战示例

下面使用 Spring WebSocket 实现一个 WebSocket 通信示例:

后端实现:

首先,创建 WebSocket 配置类:

package com.example.realtime.config;import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import com.example.realtime.handler.MessageWebSocketHandler;
import com.example.realtime.interceptor.WebSocketHandshakeInterceptor;/*** WebSocket配置类** @author ken*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {private final MessageWebSocketHandler messageWebSocketHandler;private final WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;public WebSocketConfig(MessageWebSocketHandler messageWebSocketHandler,WebSocketHandshakeInterceptor webSocketHandshakeInterceptor) {this.messageWebSocketHandler = messageWebSocketHandler;this.webSocketHandshakeInterceptor = webSocketHandshakeInterceptor;}@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {// 注册WebSocket处理器和拦截器registry.addHandler(messageWebSocketHandler, "/ws/messages").addInterceptors(webSocketHandshakeInterceptor).setAllowedOriginPatterns("*"); // 允许所有来源,生产环境需要限制}
}

创建 WebSocket 握手拦截器:

package com.example.realtime.interceptor;import jakarta.servlet.http.HttpSession;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.util.Map;
import java.util.UUID;/*** WebSocket握手拦截器** @author ken*/
@Slf4j
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {/*** 握手前处理** @param request  请求对象* @param response 响应对象* @param handler  WebSocket处理器* @param attributes  attributes存储在WebSocket会话中* @return 是否继续握手*/@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler handler, Map<String, Object> attributes) {// 从请求中获取HttpSessionif (request instanceof ServletServerHttpRequest servletRequest) {HttpSession session = servletRequest.getServletRequest().getSession();if (session != null) {// 可以从session中获取用户信息// String username = (String) session.getAttribute("username");}}// 生成客户端IDString clientId = "ws_client_" + UUID.randomUUID().toString().substring(0, 8);attributes.put("clientId", clientId);log.info("客户端 {} 开始握手", clientId);return true;}/*** 握手后处理** @param request  请求对象* @param response 响应对象* @param handler  WebSocket处理器* @param exception 异常,如果有的话*/@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler handler, Exception exception) {if (exception != null) {log.error("握手失败", exception);} else {log.info("握手成功");}}
}

创建 WebSocket 处理器:

package com.example.realtime.handler;import com.alibaba.fastjson2.JSON;
import com.example.realtime.model.Message;
import com.example.realtime.service.MessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.time.LocalDateTime;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;/*** WebSocket消息处理器** @author ken*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageWebSocketHandler extends TextWebSocketHandler {private final MessageService messageService;/*** 存储所有活跃的WebSocket会话*/private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();/*** 连接建立后调用** @param session WebSocket会话*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 从会话属性中获取客户端IDString clientId = (String) session.getAttributes().get("clientId");if (clientId == null) {clientId = "ws_client_" + UUID.randomUUID().toString().substring(0, 8);session.getAttributes().put("clientId", clientId);}// 存储会话sessions.put(clientId, session);log.info("客户端 {} 连接成功,当前连接数: {}", clientId, sessions.size());// 发送连接成功消息try {Message message = new Message();message.setId(UUID.randomUUID().toString());message.setContent("连接成功");message.setTimestamp(LocalDateTime.now());session.sendMessage(new TextMessage(JSON.toJSONString(message)));} catch (Exception e) {log.error("发送连接成功消息失败", e);}}/*** 收到消息时调用** @param session WebSocket会话* @param message 收到的消息*/@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {String clientId = (String) session.getAttributes().get("clientId");log.info("收到客户端 {} 的消息: {}", clientId, message.getPayload());try {// 解析消息内容String content = message.getPayload();// 存储消息Message newMessage = messageService.addMessage(content);// 广播消息给所有客户端broadcastMessage(newMessage);} catch (Exception e) {log.error("处理消息失败", e);try {session.sendMessage(new TextMessage("处理消息失败: " + e.getMessage()));} catch (Exception ex) {log.error("发送错误消息失败", ex);}}}/*** 连接关闭后调用** @param session WebSocket会话* @param status  关闭状态*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) {String clientId = (String) session.getAttributes().get("clientId");sessions.remove(clientId);log.info("客户端 {} 断开连接,当前连接数: {}", clientId, sessions.size());}/*** 发生错误时调用** @param session WebSocket会话* @param exception 异常*/@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) {String clientId = (String) session.getAttributes().get("clientId");log.error("客户端 {} 发生错误", clientId, exception);}/*** 广播消息给所有客户端** @param message 要广播的消息*/public void broadcastMessage(Message message) {String jsonMessage = JSON.toJSONString(message);TextMessage textMessage = new TextMessage(jsonMessage);// 遍历所有会话并发送消息Set<String> clientIds = sessions.keySet();for (String clientId : clientIds) {WebSocketSession session = sessions.get(clientId);if (session != null && session.isOpen()) {try {session.sendMessage(textMessage);log.info("向客户端 {} 发送消息", clientId);} catch (Exception e) {log.error("向客户端 {} 发送消息失败", clientId, e);}}}}
}

创建用于发送消息的控制器:

package com.example.realtime.controller;import com.example.realtime.handler.MessageWebSocketHandler;
import com.example.realtime.model.Message;
import com.example.realtime.service.MessageService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;/*** WebSocket消息控制器** @author ken*/
@Slf4j
@RestController
@RequestMapping("/api/websocket")
@RequiredArgsConstructor
@Tag(name = "WebSocket API", description = "基于WebSocket的实时通信接口")
public class WebSocketController {private final MessageWebSocketHandler webSocketHandler;private final MessageService messageService;/*** 通过HTTP发送消息,然后通过WebSocket广播** @param content 消息内容* @return 发送成功的消息*/@PostMapping("/messages")@Operation(summary = "发送消息",description = "通过HTTP发送消息,服务器会通过WebSocket广播给所有连接的客户端",responses = @ApiResponse(responseCode = "200",description = "消息发送成功",content = @Content(schema = @Schema(implementation = Message.class))))public ResponseEntity<Message> sendMessage(@Parameter(description = "消息内容", required = true)@RequestBody String content) {if (!StringUtils.hasText(content)) {return ResponseEntity.badRequest().build();}// 创建消息Message message = messageService.addMessage(content);// 广播消息webSocketHandler.broadcastMessage(message);return ResponseEntity.ok(message);}
}

前端实现(websocket.html):

<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><title>WebSocket示例</title><script src="https://cdn.tailwindcss.com"></script><link href="https://cdn.jsdelivr.net/npm/font-awesome@4.7.0/css/font-awesome.min.css" rel="stylesheet">
</head>
<body class="bg-gray-100"><div class="container mx-auto px-4 py-8 max-w-4xl"><h1 class="text-3xl font-bold mb-6 text-center">WebSocket实时通信示例</h1><div class="bg-white rounded-lg shadow-md p-6 mb-6"><h2 class="text-xl font-semibold mb-4">消息列表</h2><div id="messageContainer" class="border rounded-lg p-4 h-64 overflow-y-auto mb-4"></div><div class="flex"><input type="text" id="messageInput" class="flex-1 border rounded-l-md px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"placeholder="输入消息..."><button id="sendButton" class="bg-blue-500 text-white px-4 py-2 rounded-r-md hover:bg-blue-600 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:ring-offset-2"><i class="fa fa-paper-plane mr-1"></i>发送</button></div><div class="mt-4 text-sm text-gray-500"><p>连接状态: <span id="connectionStatus" class="text-yellow-500">未连接</span></p><p>WebSocket状态码: <span id="readyState">-</span></p><p>最后消息时间: <span id="lastMessageTime">-</span></p></div><div class="mt-4"><button id="connectButton" class="bg-green-500 text-white px-4 py-2 rounded-md hover:bg-green-600 mr-2"><i class="fa fa-connectdevelop mr-1"></i>连接</button><button id="disconnectButton" class="bg-red-500 text-white px-4 py-2 rounded-md hover:bg-red-600"disabled><i class="fa fa-disconnect mr-1"></i>断开</button></div></div></div><script>// 全局变量let webSocket = null;let isConnected = false;const WS_BASE_URL = window.location.protocol === 'https:' ? 'wss:' : 'ws:' + '//' + window.location.host + '/ws/messages';// DOM元素const messageContainer = document.getElementById('messageContainer');const messageInput = document.getElementById('messageInput');const sendButton = document.getElementById('sendButton');const connectButton = document.getElementById('connectButton');const disconnectButton = document.getElementById('disconnectButton');const connectionStatusElement = document.getElementById('connectionStatus');const readyStateElement = document.getElementById('readyState');const lastMessageTimeElement = document.getElementById('lastMessageTime');// 初始化document.addEventListener('DOMContentLoaded', () => {// 事件监听connectButton.addEventListener('click', connect);disconnectButton.addEventListener('click', disconnect);sendButton.addEventListener('click', sendMessage);// 输入框回车事件messageInput.addEventListener('keypress', (e) => {if (e.key === 'Enter') {sendMessage();}});// 显示WebSocket状态码含义updateReadyState();});/*** 连接到WebSocket服务器*/function connect() {if (isConnected) return;try {// 创建WebSocket连接webSocket = new WebSocket(WS_BASE_URL);// 连接打开事件webSocket.onopen = function(event) {isConnected = true;connectionStatusElement.textContent = '已连接';connectionStatusElement.className = 'text-green-500';connectButton.disabled = true;disconnectButton.disabled = false;updateReadyState();logMessage('系统消息', '连接成功');};// 收到消息事件webSocket.onmessage = function(event) {updateReadyState();try {// 解析JSON消息const message = JSON.parse(event.data);displayMessage(message);// 更新最后消息时间lastMessageTimeElement.textContent = new Date().toLocaleTimeString();} catch (error) {console.error('解析消息失败:', error);logMessage('错误', '解析消息失败: ' + error.message);}};// 连接关闭事件webSocket.onclose = function(event) {isConnected = false;connectionStatusElement.textContent = `已关闭 (代码: ${event.code})`;connectionStatusElement.className = 'text-red-500';connectButton.disabled = false;disconnectButton.disabled = true;updateReadyState();logMessage('系统消息', `连接已关闭,代码: ${event.code}, 原因: ${event.reason}`);// 如果不是主动关闭,尝试重连if (event.code !== 1000) {setTimeout(connect, 3000);logMessage('系统消息', '3秒后尝试重连...');}};// 错误事件webSocket.onerror = function(error) {connectionStatusElement.textContent = '发生错误';connectionStatusElement.className = 'text-red-500';updateReadyState();logMessage('错误', 'WebSocket错误: ' + error.message);};} catch (error) {console.error('连接WebSocket失败:', error);logMessage('错误', '连接失败: ' + error.message);}}/*** 断开WebSocket连接*/function disconnect() {if (!isConnected || !webSocket) return;// 主动关闭连接webSocket.close(1000, '客户端主动关闭');isConnected = false;}/*** 发送消息*/function sendMessage() {if (!isConnected || !webSocket || webSocket.readyState !== WebSocket.OPEN) {alert('未连接到服务器,请先连接');return;}const content = messageInput.value.trim();if (!content) return;try {// 发送消息webSocket.send(content);// 清空输入框messageInput.value = '';} catch (error) {console.error('发送消息失败:', error);logMessage('错误', '发送消息失败: ' + error.message);}}/*** 显示消息* @param {Object} message 消息对象*/function displayMessage(message) {const messageElement = document.createElement('div');messageElement.className = 'mb-2 p-2 bg-gray-50 rounded border';const time = new Date(message.timestamp).toLocaleTimeString();messageElement.innerHTML = `<div class="text-sm text-gray-500">${time}</div><div class="text-gray-800">${escapeHtml(message.content)}</div>`;messageContainer.appendChild(messageElement);// 滚动到底部messageContainer.scrollTop = messageContainer.scrollHeight;}/*** 记录系统消息* @param {string} title 标题* @param {string} content 内容*/function logMessage(title, content) {const messageElement = document.createElement('div');messageElement.className = 'mb-2 p-2 bg-blue-50 rounded border border-blue-200';const time = new Date().toLocaleTimeString();messageElement.innerHTML = `<div class="text-sm text-blue-500">${time} - ${title}</div><div class="text-blue-800">${escapeHtml(content)}</div>`;messageContainer.appendChild(messageElement);// 滚动到底部messageContainer.scrollTop = messageContainer.scrollHeight;}/*** 更新WebSocket状态码显示*/function updateReadyState() {if (!webSocket) {readyStateElement.textContent = '未初始化';return;}const states = {0: 'CONNECTING (连接中)',1: 'OPEN (已打开)',2: 'CLOSING (关闭中)',3: 'CLOSED (已关闭)'};readyStateElement.textContent = states[webSocket.readyState] || `未知 (${webSocket.readyState})`;}/*** HTML转义,防止XSS攻击* @param {string} text 文本内容* @return {string} 转义后的文本*/function escapeHtml(text) {return text.replace(/&/g, "&amp;").replace(/</g, "&lt;").replace(/>/g, "&gt;").replace(/"/g, "&quot;").replace(/'/g, "&#039;");}</script>
</body>
</html>

五、SSE(Server-Sent Events)

5.1 技术原理

SSE(Server-Sent Events,服务器发送事件)是一种允许服务器主动向客户端推送数据的技术,基于 HTTP 协议:

  1. 客户端通过 HTTP GET 请求建立与服务器的连接
  2. 服务器响应的 MIME 类型为text/event-stream
  3. 连接建立后保持打开状态,服务器可以随时向客户端发送事件
  4. 数据以特定格式的文本流形式传输
  5. 客户端可以通过关闭连接来终止通信

SSE 的核心特点是单向通信(服务器到客户端),基于标准 HTTP 协议,实现简单,非常适合需要服务器主动推送更新但客户端不需要频繁发送数据的场景。

5.2 优缺点分析

优点:

  • 实现简单,基于 HTTP 协议,不需要特殊协议支持
  • 自动重连机制,连接断开后客户端会自动尝试重连
  • 轻量级协议,数据格式简单
  • 服务器可以发送事件 ID,客户端可以记录最后接收的 ID,重连时可以请求从该 ID 开始的事件
  • 兼容性较好(除 IE 外的大部分浏览器)

缺点:

  • 单向通信,只能服务器向客户端发送数据
  • 只能传输文本数据,二进制数据需要编码
  • 受限于 HTTP 的并发连接数限制(同一域名下通常限制为 6 个连接)
  • 不支持跨域(需要服务器配置 CORS)

5.3 适用场景

SSE 适用于以下场景:

  • 新闻推送、通知提醒
  • 实时数据更新(如股票行情、天气数据)
  • 状态监控(如服务器状态、设备状态)
  • 社交媒体动态流

5.4 实战示例

下面实现一个基于 Spring Boot 的 SSE 示例:

后端实现:

创建 SSE 控制器:

package com.example.realtime.controller;import com.alibaba.fastjson2.JSON;
import com.example.realtime.model.Message;
import com.example.realtime.service.MessageService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** SSE控制器** @author ken*/
@Slf4j
@RestController
@RequestMapping("/api/sse")
@RequiredArgsConstructor
@Tag(name = "SSE API", description = "基于Server-Sent Events的实时通信接口")
public class SSEController {private final MessageService messageService;/*** 存储所有活跃的SSE发射器*/private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();/*** 定时任务线程池,用于清理过期连接*/private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public SSEController(MessageService messageService) {this.messageService = messageService;// 初始化定时清理任务,每30分钟执行一次scheduler.scheduleAtFixedRate(this::cleanupEmitters, 30, 30, TimeUnit.MINUTES);}/*** 建立SSE连接** @param clientId 客户端ID* @param lastEventId 最后接收的事件ID,用于重连时恢复* @return SSE发射器*/@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)@Operation(summary = "建立SSE连接",description = "客户端建立SSE连接,服务器会推送事件到客户端",parameters = {@Parameter(name = "clientId", description = "客户端ID"),@Parameter(name = "lastEventId", description = "最后接收的事件ID,用于重连时恢复")},responses = @ApiResponse(responseCode = "200",description = "SSE连接已建立",content = @Content(mediaType = "text/event-stream")))public SseEmitter connect(@RequestParam(required = false) String clientId,@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {// 生成客户端ID(如果未提供)if (!StringUtils.hasText(clientId)) {clientId = "sse_client_" + UUID.randomUUID().toString().substring(0, 8);}// 创建SSE发射器,设置超时时间为30分钟SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);// 存储发射器emitters.put(clientId, emitter);log.info("客户端 {} 建立SSE连接,当前连接数: {}", clientId, emitters.size());// 注册连接关闭回调emitter.onCompletion(() -> {emitters.remove(clientId);log.info("客户端 {} SSE连接已完成,当前连接数: {}", clientId, emitters.size());});emitter.onTimeout(() -> {emitters.remove(clientId);log.info("客户端 {} SSE连接超时,当前连接数: {}", clientId, emitters.size());try {emitter.complete();} catch (Exception e) {log.error("关闭超时连接失败", e);}});emitter.onError((e) -> {emitters.remove(clientId);log.error("客户端 {} SSE连接出错", clientId, e);try {emitter.completeWithError(e);} catch (Exception ex) {log.error("处理连接错误失败", ex);}});// 如果有lastEventId,发送从该ID之后的消息if (StringUtils.hasText(lastEventId)) {try {// 实际应用中应该根据lastEventId查询消息sendWelcomeMessage(emitter, clientId);} catch (Exception e) {log.error("发送历史消息失败", e);}} else {// 发送欢迎消息try {sendWelcomeMessage(emitter, clientId);} catch (IOException e) {log.error("发送欢迎消息失败", e);emitter.completeWithError(e);}}return emitter;}/*** 发送欢迎消息** @param emitter SSE发射器* @param clientId 客户端ID* @throws IOException 如果发送失败*/private void sendWelcomeMessage(SseEmitter emitter, String clientId) throws IOException {Message welcomeMessage = new Message();welcomeMessage.setId(UUID.randomUUID().toString());welcomeMessage.setContent("SSE连接已建立,客户端ID: " + clientId);welcomeMessage.setTimestamp(LocalDateTime.now());SseEventBuilder event = SseEmitter.event().id(welcomeMessage.getId()).name("welcome").data(JSON.toJSONString(welcomeMessage));emitter.send(event);}/*** 发送消息,通过SSE推送给所有客户端** @param content 消息内容* @return 发送成功的消息*/@PostMapping("/messages")@Operation(summary = "发送消息",description = "发送新消息,服务器会通过SSE推送给所有连接的客户端",responses = @ApiResponse(responseCode = "200",description = "消息发送成功",content = @Content(schema = @Schema(implementation = Message.class))))public ResponseEntity<Message> sendMessage(@Parameter(description = "消息内容", required = true)@RequestBody String content) {if (!StringUtils.hasText(content)) {return ResponseEntity.badRequest().build();}// 创建消息Message message = messageService.addMessage(content);// 广播消息给所有客户端broadcastMessage(message);return ResponseEntity.ok(message);}/*** 广播消息给所有连接的客户端** @param message 要广播的消息*/private void broadcastMessage(Message message) {String jsonMessage = JSON.toJSONString(message);// 创建SSE事件SseEventBuilder event = SseEmitter.event().id(message.getId()).name("message").data(jsonMessage).reconnectTime(5000); // 建议客户端重连时间// 遍历所有发射器并发送消息Set<String> clientIds = emitters.keySet();for (String clientId : clientIds) {SseEmitter emitter = emitters.get(clientId);if (emitter != null) {try {emitter.send(event);log.info("向客户端 {} 发送SSE消息: {}", clientId, message.getId());} catch (IOException e) {log.error("向客户端 {} 发送SSE消息失败", clientId, e);// 移除无效的发射器emitters.remove(clientId);try {emitter.completeWithError(e);} catch (Exception ex) {log.error("处理发射器错误失败", ex);}}}}}/*** 清理过期的发射器*/private void cleanupEmitters() {log.info("开始清理SSE发射器,当前数量: {}", emitters.size());int removed = 0;Set<String> clientIds = emitters.keySet();for (String clientId : clientIds) {SseEmitter emitter = emitters.get(clientId);if (emitter != null && emitter.isCompletable()) {emitters.remove(clientId);removed++;try {emitter.complete();} catch (Exception e) {log.error("清理发射器 {} 失败", clientId, e);}}}log.info("SSE发射器清理完成,移除了 {} 个过期发射器,剩余: {}", removed, emitters.size());}
}

前端实现(sse.html):

<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><title>SSE示例</title><script src="https://cdn.tailwindcss.com"></script><link href="https://cdn.jsdelivr.net/npm/font-awesome@4.7.0/css/font-awesome.min.css" rel="stylesheet">
</head>
<body class="bg-gray-100"><div class="container mx-auto px-4 py-8 max-w-4xl"><h1 class="text-3xl font-bold mb-6 text-center">SSE(Server-Sent Events)实时通信示例</h1><div class="bg-white rounded-lg shadow-md p-6 mb-6"><h2 class="text-xl font-semibold mb-4">消息列表</h2><div id="messageContainer" class="border rounded-lg p-4 h-64 overflow-y-auto mb-4"></div><div class="flex"><input type="text" id="messageInput" class="flex-1 border rounded-l-md px-4 py-2 focus:outline-none focus:ring-2 focus:ring-blue-500"placeholder="输入消息..."><button id="sendButton" class="bg-blue-500 text-white px-4 py-2 rounded-r-md hover:bg-blue-600 focus:outline-none focus:ring-2 focus:ring-blue-500 focus:ring-offset-2"><i class="fa fa-paper-plane mr-1"></i>发送</button></div><div class="mt-4 text-sm text-gray-500"><p>客户端ID: <span id="clientId"></span></p><p>连接状态: <span id="connectionStatus" class="text-yellow-500">未连接</span></p><p>最后事件ID: <span id="lastEventId">-</span></p><p>重连次数: <span id="reconnectCount">0</span></p></div><div class="mt-4"><button id="connectButton" class="bg-green-500 text-white px-4 py-2 rounded-md hover:bg-green-600 mr-2"><i class="fa fa-connectdevelop mr-1"></i>连接</button><button id="disconnectButton" class="bg-red-500 text-white px-4 py-2 rounded-md hover:bg-red-600"disabled><i class="fa fa-disconnect mr-1"></i>断开</button></div></div></div><script>// 全局变量let eventSource = null;let isConnected = false;let clientId = generateClientId();let lastEventId = '';let reconnectCount = 0;const SSE_BASE_URL = '/api/sse/connect';// DOM元素const messageContainer = document.getElementById('messageContainer');const messageInput = document.getElementById('messageInput');const sendButton = document.getElementById('sendButton');const connectButton = document.getElementById('connectButton');const disconnectButton = document.getElementById('disconnectButton');const clientIdElement = document.getElementById('clientId');const connectionStatusElement = document.getElementById('connectionStatus');const lastEventIdElement = document.getElementById('lastEventId');const reconnectCountElement = document.getElementById('reconnectCount');// 初始化document.addEventListener('DOMContentLoaded', () => {clientIdElement.textContent = clientId;// 事件监听connectButton.addEventListener('click', connect);disconnectButton.addEventListener('click', disconnect);sendButton.addEventListener('click', sendMessage);// 输入框回车事件messageInput.addEventListener('keypress', (e) => {if (e.key === 'Enter') {sendMessage();}});});/*** 生成唯一客户端ID* @returns {string} 客户端ID*/function generateClientId() {return 'sse_client_' + Math.random().toString(36).substring(2, 10);}/*** 连接到SSE服务器*/function connect() {if (isConnected) return;try {// 构建连接URLlet url = `${SSE_BASE_URL}?clientId=${encodeURIComponent(clientId)}`;// 创建EventSource对象// 注意:lastEventId会自动通过HTTP头传递,这里不需要显式添加eventSource = new EventSource(url);// 连接打开事件eventSource.onopen = function(event) {isConnected = true;connectionStatusElement.textContent = '已连接';connectionStatusElement.className = 'text-green-500';connectButton.disabled = true;disconnectButton.disabled = false;logMessage('系统消息', 'SSE连接已建立');};// 收到消息事件eventSource.onmessage = function(event) {// 更新最后事件IDif (event.lastEventId) {lastEventId = event.lastEventId;lastEventIdElement.textContent = lastEventId;}try {// 解析JSON消息const message = JSON.parse(event.data);displayMessage(message);} catch (error) {console.error('解析消息失败:', error);logMessage('错误', '解析消息失败: ' + error.message);}};// 特定事件处理eventSource.addEventListener('welcome', function(event) {if (event.lastEventId) {lastEventId = event.lastEventId;lastEventIdElement.textContent = lastEventId;}try {const data = JSON.parse(event.data);logMessage('欢迎消息', data.content);} catch (error) {console.error('解析欢迎消息失败:', error);logMessage('欢迎消息', event.data);}});// 错误事件eventSource.onerror = function(error) {connectionStatusElement.textContent = '发生错误';connectionStatusElement.className = 'text-red-500';if (eventSource.readyState === EventSource.CLOSED) {logMessage('错误', '连接已关闭,正在尝试重连...');// 手动重连reconnectCount++;reconnectCountElement.textContent = reconnectCount;setTimeout(connect, 3000);} else if (eventSource.readyState === EventSource.CONNECTING) {connectionStatusElement.textContent = '正在重连...';connectionStatusElement.className = 'text-orange-500';}};} catch (error) {console.error('连接SSE失败:', error);logMessage('错误', '连接失败: ' + error.message);// 尝试重连reconnectCount++;reconnectCountElement.textContent = reconnectCount;setTimeout(connect, 3000);}}/*** 断开SSE连接*/function disconnect() {if (!isConnected || !eventSource) return;// 关闭连接eventSource.close();eventSource = null;isConnected = false;connectionStatusElement.textContent = '已断开';connectionStatusElement.className = 'text-red-500';connectButton.disabled = false;disconnectButton.disabled = true;logMessage('系统消息', 'SSE连接已手动关闭');}/*** 发送消息(通过HTTP POST)*/function sendMessage() {const content = messageInput.value.trim();if (!content) return;try {fetch('/api/sse/messages', {method: 'POST',headers: {'Content-Type': 'text/plain',},body: content}).then(response => {if (!response.ok) {throw new Error(`发送消息失败: ${response.status}`);}// 清空输入框messageInput.value = '';}).catch(error => {console.error('发送消息失败:', error);logMessage('错误', '发送消息失败: ' + error.message);});} catch (error) {console.error('发送消息失败:', error);logMessage('错误', '发送消息失败: ' + error.message);}}/*** 显示消息* @param {Object} message 消息对象*/function displayMessage(message) {const messageElement = document.createElement('div');messageElement.className = 'mb-2 p-2 bg-gray-50 rounded border';const time = new Date(message.timestamp).toLocaleTimeString();messageElement.innerHTML = `<div class="text-sm text-gray-500">${time} (ID: ${message.id})</div><div class="text-gray-800">${escapeHtml(message.content)}</div>`;messageContainer.appendChild(messageElement);// 滚动到底部messageContainer.scrollTop = messageContainer.scrollHeight;}/*** 记录系统消息* @param {string} title 标题* @param {string} content 内容*/function logMessage(title, content) {const messageElement = document.createElement('div');messageElement.className = 'mb-2 p-2 bg-blue-50 rounded border border-blue-200';const time = new Date().toLocaleTimeString();messageElement.innerHTML = `<div class="text-sm text-blue-500">${time} - ${title}</div><div class="text-blue-800">${escapeHtml(content)}</div>`;messageContainer.appendChild(messageElement);// 滚动到底部messageContainer.scrollTop = messageContainer.scrollHeight;}/*** HTML转义,防止XSS攻击* @param {string} text 文本内容* @return {string} 转义后的文本*/function escapeHtml(text) {return text.replace(/&/g, "&amp;").replace(/</g, "&lt;").replace(/>/g, "&gt;").replace(/"/g, "&quot;").replace(/'/g, "&#039;");}// 页面关闭时断开连接window.addEventListener('beforeunload', () => {disconnect();});</script>
</body>
</html>

六、四种实时通信技术的综合对比

为了帮助开发者在实际项目中选择合适的实时通信技术,我们对四种技术进行全面对比:

特性短轮询长轮询WebSocketSSE
通信方式客户端定期请求客户端请求,服务器延迟响应全双工持久连接服务器单向推送
实时性低(取决于轮询间隔)中(有数据时立即响应)高(毫秒级延迟)高(毫秒级延迟)
连接类型短连接,频繁建立长连接,数据推送后关闭持久连接,一直保持持久连接,一直保持
数据方向客户端请求,服务器响应客户端请求,服务器响应双向服务器到客户端
协议HTTPHTTPWebSocketHTTP
数据格式任意 HTTP 支持的格式任意 HTTP 支持的格式文本或二进制文本(可包含 JSON 等)
开销高(每次请求有 HTTP 头)中(请求头开销,但次数少)低(连接建立后开销小)低(连接建立后开销小)
服务器负载高(大量无效请求)中(挂起连接消耗资源)低(单个连接处理多请求)低(单个连接处理多推送)
浏览器兼容性所有浏览器所有浏览器现代浏览器(IE10+)现代浏览器(IE 不支持)
自动重连需要手动实现需要手动实现需要手动实现自动支持
实现复杂度简单中等较复杂简单
适用场景简单通知,低频率更新中等频率更新,通知系统实时聊天,在线游戏新闻推送,实时数据展示

6.1 技术选择建议

  1. 简单场景,兼容性要求极高:选择短轮询

    • 优点:实现最简单,所有浏览器都支持
    • 缺点:实时性差,服务器负载高
    • 适用:内部系统,用户量小,更新频率低的场景
  2. 中等实时性要求,需要广泛兼容:选择长轮询

    • 优点:实时性比短轮询好,兼容性好
    • 缺点:服务器需要维护挂起连接
    • 适用:社交媒体通知,评论系统
  3. 高实时性,需要双向通信:选择 WebSocket

    • 优点:全双工,低延迟,高效率
    • 缺点:实现复杂,老旧浏览器不支持
    • 适用:实时聊天,多人在线游戏,协作工具
  4. 高实时性,仅需服务器推送:选择 SSE

    • 优点:实现简单,自动重连,效率高
    • 缺点:单向通信,IE 不支持
    • 适用:新闻推送,实时数据监控,股票行情

6.2 混合使用策略

在实际项目中,有时可以混合使用这些技术:

  1. 降级策略:优先使用 WebSocket,不支持则降级为长轮询,最后降级为短轮询
  2. 场景分离:不同功能使用最适合的技术,例如聊天用 WebSocket,通知用 SSE
  3. 渐进增强:基础功能用短轮询保证兼容性,高级功能用 WebSocket 提升体验

七、总结与展望

实时通信技术是现代 Web 应用不可或缺的组成部分,从简单的短轮询到先进的 WebSocket 和 SSE,每种技术都有其独特的适用场景和优缺点。

短轮询实现最简单,但效率最低,适合对实时性要求不高且需要广泛兼容的场景。长轮询在保持兼容性的同时提高了实时性和效率,是短轮询的良好替代方案。WebSocket 提供了全双工的实时通信能力,适合需要高频双向数据交换的场景。SSE 则专注于服务器到客户端的单向推送,实现简单且效率高。

http://www.dtcms.com/a/390744.html

相关文章:

  • ICML 2025|图像如何与激光雷达对齐并互补?迈向协调的多模态3D全景分割
  • 基于Web的3D工程应用图形引擎——HOOPS Communicator技术解析
  • 【每日一问】运放的失调电压是什么?对于电路有何影响?
  • 【轨物方案】轨物科技新型储能管理系统:以AIoT技术驱动储能资产全生命周期价值最大化
  • 线性回归 vs 逻辑回归:从原理到实战的全面对比
  • HashMap的底层原理
  • 股指期货超短线如何操作?
  • 【洛谷】算法竞赛中的树结构:形式、存储与遍历全解析
  • 育苗盘补苗路径规划研究
  • API Gateway :API网关组件
  • conda激活虚拟环境
  • 重构大qmt通达信板块预警自动交易系统--读取通达信成分股
  • 25.9.19 Spring AOP
  • d38: PostgreSQL 简单入门与 Vue3 动态路由实现
  • No006:订阅化时间管理——迈向个性化、生态化的AI服务模式
  • 微服务-sentinel的理论与集成springcloud
  • C++学习:哈希表unordered_set/unordered_map的封装
  • 圆柱永磁体磁场及梯度快速计算与可视化程序
  • 种群演化优化算法:原理与Python实现
  • 基于IPDRR模型能力,每个能力的概念及所要具备的能力产品
  • NUST技术漫谈:当非结构化数据遇见状态跟踪——一场静默的技术革命
  • 在技术无人区开路,OPPO的指南针是“人”
  • AI与NPC发展过程及技术
  • Redis数据库(三)—— 深入解析Redis三种高可用架构:主从复制、哨兵与集群模式
  • (leetcode) 力扣100 13最大子序和(动态规划卡达内算法分治法)
  • SpringBoot整合JUnit:单元测试从入门到精通
  • MySQL三范式详细解析
  • GitHub 仓库权限更改
  • 卷积神经网络(CNN)核心知识点总结
  • Python数据挖掘之基础分类模型_朴素贝叶斯