当前位置: 首页 > news >正文

Ruoyi-vue-plus-5.x第六篇Web开发与前后端交互: 6.4 WebSocket实时通信

👋 大家好,我是 阿问学长!专注于分享优质开源项目解析、毕业设计项目指导支持、幼小初高教辅资料推荐等,欢迎关注交流!🚀

WebSocket实时通信

前言

WebSocket是一种在单个TCP连接上进行全双工通信的协议,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在现代Web应用中,WebSocket广泛应用于实时聊天、消息推送、实时监控、在线协作等场景。RuoYi-Vue-Plus框架集成了WebSocket支持,提供了完整的实时通信解决方案。本文将详细介绍WebSocket服务端配置、消息推送机制、客户端集成以及实时监控实现等内容。

WebSocket服务端配置

基础配置

/*** WebSocket配置类*/
@Configuration
@EnableWebSocket
@Slf4j
public class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate WebSocketHandshakeInterceptor handshakeInterceptor;@Autowiredprivate WebSocketHandler webSocketHandler;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {// 注册WebSocket处理器registry.addHandler(webSocketHandler, "/websocket/{userId}").addInterceptors(handshakeInterceptor).setAllowedOrigins("*"); // 生产环境应该配置具体的域名// 注册SockJS支持(兼容不支持WebSocket的浏览器)registry.addHandler(webSocketHandler, "/sockjs/websocket/{userId}").addInterceptors(handshakeInterceptor).setAllowedOrigins("*").withSockJS();}/*** WebSocket任务调度器*/@Beanpublic TaskScheduler webSocketTaskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(10);scheduler.setThreadNamePrefix("websocket-");scheduler.setWaitForTasksToCompleteOnShutdown(true);scheduler.setAwaitTerminationSeconds(60);return scheduler;}
}/*** WebSocket握手拦截器*/
@Component
@Slf4j
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {log.info("WebSocket握手开始: {}", request.getURI());// 获取用户IDString path = request.getURI().getPath();String userId = extractUserIdFromPath(path);if (StringUtils.isEmpty(userId)) {log.warn("WebSocket握手失败: 用户ID为空");return false;}// 验证用户身份if (!validateUser(request, userId)) {log.warn("WebSocket握手失败: 用户身份验证失败, userId={}", userId);return false;}// 将用户信息存储到WebSocket会话属性中attributes.put("userId", userId);attributes.put("connectTime", System.currentTimeMillis());// 获取客户端信息String userAgent = request.getHeaders().getFirst("User-Agent");String clientIp = getClientIp(request);attributes.put("userAgent", userAgent);attributes.put("clientIp", clientIp);log.info("WebSocket握手成功: userId={}, clientIp={}", userId, clientIp);return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Exception exception) {if (exception != null) {log.error("WebSocket握手后处理异常", exception);}}/*** 从路径中提取用户ID*/private String extractUserIdFromPath(String path) {// 路径格式: /websocket/{userId} 或 /sockjs/websocket/{userId}String[] segments = path.split("/");if (segments.length > 0) {return segments[segments.length - 1];}return null;}/*** 验证用户身份*/private boolean validateUser(ServerHttpRequest request, String userId) {try {// 从请求头或参数中获取tokenString token = getTokenFromRequest(request);if (StringUtils.isEmpty(token)) {return false;}// 验证token并检查用户ID是否匹配return TokenUtils.validateToken(token, userId);} catch (Exception e) {log.error("验证用户身份异常", e);return false;}}/*** 从请求中获取token*/private String getTokenFromRequest(ServerHttpRequest request) {// 从请求头获取String authorization = request.getHeaders().getFirst("Authorization");if (StringUtils.hasText(authorization) && authorization.startsWith("Bearer ")) {return authorization.substring(7);}// 从查询参数获取String query = request.getURI().getQuery();if (StringUtils.hasText(query)) {String[] params = query.split("&");for (String param : params) {String[] kv = param.split("=");if (kv.length == 2 && "token".equals(kv[0])) {return kv[1];}}}return null;}/*** 获取客户端IP*/private String getClientIp(ServerHttpRequest request) {String xForwardedFor = request.getHeaders().getFirst("X-Forwarded-For");if (StringUtils.hasText(xForwardedFor)) {return xForwardedFor.split(",")[0].trim();}String xRealIp = request.getHeaders().getFirst("X-Real-IP");if (StringUtils.hasText(xRealIp)) {return xRealIp;}if (request instanceof ServletServerHttpRequest) {return ((ServletServerHttpRequest) request).getServletRequest().getRemoteAddr();}return "unknown";}
}/*** WebSocket处理器*/
@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {@Autowiredprivate WebSocketSessionManager sessionManager;@Autowiredprivate MessageHandler messageHandler;@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {String userId = (String) session.getAttributes().get("userId");String clientIp = (String) session.getAttributes().get("clientIp");log.info("WebSocket连接建立: userId={}, sessionId={}, clientIp={}", userId, session.getId(), clientIp);// 注册会话sessionManager.addSession(userId, session);// 发送连接成功消息WebSocketMessage welcomeMessage = WebSocketMessage.builder().type(MessageType.SYSTEM).content("连接成功").timestamp(System.currentTimeMillis()).build();sendMessage(session, welcomeMessage);// 发送未读消息sendUnreadMessages(userId, session);}@Overridepublic void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String userId = (String) session.getAttributes().get("userId");String payload = message.getPayload();log.info("收到WebSocket消息: userId={}, message={}", userId, payload);try {// 解析消息WebSocketMessage wsMessage = JsonUtils.parseObject(payload, WebSocketMessage.class);wsMessage.setSenderId(userId);wsMessage.setTimestamp(System.currentTimeMillis());// 处理消息messageHandler.handleMessage(wsMessage, session);} catch (Exception e) {log.error("处理WebSocket消息异常: userId={}, message={}", userId, payload, e);// 发送错误消息WebSocketMessage errorMessage = WebSocketMessage.builder().type(MessageType.ERROR).content("消息处理失败: " + e.getMessage()).timestamp(System.currentTimeMillis()).build();sendMessage(session, errorMessage);}}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {String userId = (String) session.getAttributes().get("userId");log.error("WebSocket传输异常: userId={}, sessionId={}", userId, session.getId(), exception);// 移除会话sessionManager.removeSession(userId, session.getId());}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {String userId = (String) session.getAttributes().get("userId");Long connectTime = (Long) session.getAttributes().get("connectTime");long duration = System.currentTimeMillis() - connectTime;log.info("WebSocket连接关闭: userId={}, sessionId={}, duration={}ms, closeStatus={}", userId, session.getId(), duration, closeStatus);// 移除会话sessionManager.removeSession(userId, session.getId());}@Overridepublic boolean supportsPartialMessages() {return false;}/*** 发送消息*/private void sendMessage(WebSocketSession session, WebSocketMessage message) {try {if (session.isOpen()) {String json = JsonUtils.toJsonString(message);session.sendMessage(new TextMessage(json));}} catch (Exception e) {log.error("发送WebSocket消息失败", e);}}/*** 发送未读消息*/private void sendUnreadMessages(String userId, WebSocketSession session) {try {List<WebSocketMessage> unreadMessages = messageHandler.getUnreadMessages(userId);for (WebSocketMessage message : unreadMessages) {sendMessage(session, message);}if (!unreadMessages.isEmpty()) {log.info("发送未读消息: userId={}, count={}", userId, unreadMessages.size());}} catch (Exception e) {log.error("发送未读消息失败: userId={}", userId, e);}}
}

会话管理

/*** WebSocket会话管理器*/
@Component
@Slf4j
public class WebSocketSessionManager {// 用户ID -> WebSocket会话列表的映射private final Map<String, Set<WebSocketSession>> userSessions = new ConcurrentHashMap<>();// 会话ID -> 用户ID的映射private final Map<String, String> sessionUsers = new ConcurrentHashMap<>();/*** 添加会话*/public void addSession(String userId, WebSocketSession session) {userSessions.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet()).add(session);sessionUsers.put(session.getId(), userId);log.info("添加WebSocket会话: userId={}, sessionId={}, totalSessions={}", userId, session.getId(), getTotalSessionCount());}/*** 移除会话*/public void removeSession(String userId, String sessionId) {Set<WebSocketSession> sessions = userSessions.get(userId);if (sessions != null) {sessions.removeIf(session -> session.getId().equals(sessionId));if (sessions.isEmpty()) {userSessions.remove(userId);}}sessionUsers.remove(sessionId);log.info("移除WebSocket会话: userId={}, sessionId={}, totalSessions={}", userId, sessionId, getTotalSessionCount());}/*** 获取用户的所有会话*/public Set<WebSocketSession> getUserSessions(String userId) {return userSessions.getOrDefault(userId, Collections.emptySet());}/*** 获取会话对应的用户ID*/public String getSessionUser(String sessionId) {return sessionUsers.get(sessionId);}/*** 检查用户是否在线*/public boolean isUserOnline(String userId) {Set<WebSocketSession> sessions = userSessions.get(userId);if (sessions == null || sessions.isEmpty()) {return false;}// 检查是否有有效的会话return sessions.stream().anyMatch(WebSocketSession::isOpen);}/*** 获取在线用户列表*/public Set<String> getOnlineUsers() {return userSessions.entrySet().stream().filter(entry -> entry.getValue().stream().anyMatch(WebSocketSession::isOpen)).map(Map.Entry::getKey).collect(Collectors.toSet());}/*** 获取总会话数*/public int getTotalSessionCount() {return userSessions.values().stream().mapToInt(Set::size).sum();}/*** 获取在线用户数*/public int getOnlineUserCount() {return getOnlineUsers().size();}/*** 清理无效会话*/@Scheduled(fixedRate = 60000) // 每分钟执行一次public void cleanupInvalidSessions() {int removedCount = 0;Iterator<Map.Entry<String, Set<WebSocketSession>>> userIterator = userSessions.entrySet().iterator();while (userIterator.hasNext()) {Map.Entry<String, Set<WebSocketSession>> entry = userIterator.next();String userId = entry.getKey();Set<WebSocketSession> sessions = entry.getValue();Iterator<WebSocketSession> sessionIterator = sessions.iterator();while (sessionIterator.hasNext()) {WebSocketSession session = sessionIterator.next();if (!session.isOpen()) {sessionIterator.remove();sessionUsers.remove(session.getId());removedCount++;}}if (sessions.isEmpty()) {userIterator.remove();}}if (removedCount > 0) {log.info("清理无效WebSocket会话: removedCount={}, totalSessions={}", removedCount, getTotalSessionCount());}}
}/*** WebSocket消息类*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WebSocketMessage {/*** 消息类型*/private MessageType type;/*** 发送者ID*/private String senderId;/*** 接收者ID*/private String receiverId;/*** 消息内容*/private String content;/*** 消息数据*/private Object data;/*** 时间戳*/private Long timestamp;/*** 消息ID*/private String messageId;/*** 扩展属性*/private Map<String, Object> extra;
}/*** 消息类型枚举*/
public enum MessageType {SYSTEM,      // 系统消息CHAT,        // 聊天消息NOTIFICATION, // 通知消息HEARTBEAT,   // 心跳消息ERROR,       // 错误消息BROADCAST,   // 广播消息PRIVATE      // 私聊消息
}

消息推送机制

消息处理器

/*** WebSocket消息处理器*/
@Component
@Slf4j
public class MessageHandler {@Autowiredprivate WebSocketSessionManager sessionManager;@Autowiredprivate MessageService messageService;@Autowiredprivate NotificationService notificationService;/*** 处理消息*/public void handleMessage(WebSocketMessage message, WebSocketSession session) {switch (message.getType()) {case HEARTBEAT:handleHeartbeat(message, session);break;case CHAT:handleChatMessage(message);break;case PRIVATE:handlePrivateMessage(message);break;case NOTIFICATION:handleNotificationMessage(message);break;default:log.warn("未知消息类型: {}", message.getType());}}/*** 处理心跳消息*/private void handleHeartbeat(WebSocketMessage message, WebSocketSession session) {try {WebSocketMessage pong = WebSocketMessage.builder().type(MessageType.HEARTBEAT).content("pong").timestamp(System.currentTimeMillis()).build();String json = JsonUtils.toJsonString(pong);session.sendMessage(new TextMessage(json));} catch (Exception e) {log.error("处理心跳消息失败", e);}}/*** 处理聊天消息*/private void handleChatMessage(WebSocketMessage message) {try {// 保存消息messageService.saveMessage(message);// 广播消息给所有在线用户broadcastMessage(message);} catch (Exception e) {log.error("处理聊天消息失败", e);}}/*** 处理私聊消息*/private void handlePrivateMessage(WebSocketMessage message) {try {// 保存消息messageService.saveMessage(message);// 发送给指定用户sendToUser(message.getReceiverId(), message);// 如果接收者不在线,发送离线通知if (!sessionManager.isUserOnline(message.getReceiverId())) {notificationService.sendOfflineNotification(message);}} catch (Exception e) {log.error("处理私聊消息失败", e);}}/*** 处理通知消息*/private void handleNotificationMessage(WebSocketMessage message) {try {// 保存通知notificationService.saveNotification(message);// 发送给指定用户或广播if (StringUtils.hasText(message.getReceiverId())) {sendToUser(message.getReceiverId(), message);} else {broadcastMessage(message);}} catch (Exception e) {log.error("处理通知消息失败", e);}}/*** 发送消息给指定用户*/public void sendToUser(String userId, WebSocketMessage message) {Set<WebSocketSession> sessions = sessionManager.getUserSessions(userId);if (sessions.isEmpty()) {log.debug("用户不在线,无法发送消息: userId={}", userId);return;}String json = JsonUtils.toJsonString(message);TextMessage textMessage = new TextMessage(json);int successCount = 0;for (WebSocketSession session : sessions) {try {if (session.isOpen()) {session.sendMessage(textMessage);successCount++;}} catch (Exception e) {log.error("发送消息失败: userId={}, sessionId={}", userId, session.getId(), e);}}log.debug("发送消息给用户: userId={}, sessionCount={}, successCount={}", userId, sessions.size(), successCount);}/*** 广播消息给所有在线用户*/public void broadcastMessage(WebSocketMessage message) {Set<String> onlineUsers = sessionManager.getOnlineUsers();if (onlineUsers.isEmpty()) {log.debug("没有在线用户,无法广播消息");return;}int successCount = 0;for (String userId : onlineUsers) {try {sendToUser(userId, message);successCount++;} catch (Exception e) {log.error("广播消息失败: userId={}", userId, e);}}log.info("广播消息: totalUsers={}, successCount={}", onlineUsers.size(), successCount);}/*** 获取未读消息*/public List<WebSocketMessage> getUnreadMessages(String userId) {try {return messageService.getUnreadMessages(userId);} catch (Exception e) {log.error("获取未读消息失败: userId={}", userId, e);return Collections.emptyList();}}
}/*** WebSocket消息推送服务*/
@Service
@Slf4j
public class WebSocketMessagePushService {@Autowiredprivate MessageHandler messageHandler;@Autowiredprivate WebSocketSessionManager sessionManager;/*** 推送系统通知*/public void pushSystemNotification(String title, String content) {WebSocketMessage message = WebSocketMessage.builder().type(MessageType.NOTIFICATION).content(content).data(Map.of("title", title, "level", "info")).timestamp(System.currentTimeMillis()).messageId(UUID.randomUUID().toString()).build();messageHandler.broadcastMessage(message);log.info("推送系统通知: title={}, content={}", title, content);}/*** 推送用户通知*/public void pushUserNotification(String userId, String title, String content) {WebSocketMessage message = WebSocketMessage.builder().type(MessageType.NOTIFICATION).receiverId(userId).content(content).data(Map.of("title", title, "level", "info")).timestamp(System.currentTimeMillis()).messageId(UUID.randomUUID().toString()).build();messageHandler.sendToUser(userId, message);log.info("推送用户通知: userId={}, title={}, content={}", userId, title, content);}/*** 推送警告消息*/public void pushWarningMessage(String userId, String message) {WebSocketMessage wsMessage = WebSocketMessage.builder().type(MessageType.NOTIFICATION).receiverId(userId).content(message).data(Map.of("level", "warning")).timestamp(System.currentTimeMillis()).messageId(UUID.randomUUID().toString()).build();messageHandler.sendToUser(userId, wsMessage);log.info("推送警告消息: userId={}, message={}", userId, message);}/*** 推送错误消息*/public void pushErrorMessage(String userId, String message) {WebSocketMessage wsMessage = WebSocketMessage.builder().type(MessageType.ERROR).receiverId(userId).content(message).data(Map.of("level", "error")).timestamp(System.currentTimeMillis()).messageId(UUID.randomUUID().toString()).build();messageHandler.sendToUser(userId, wsMessage);log.info("推送错误消息: userId={}, message={}", userId, message);}/*** 推送业务数据更新通知*/public void pushDataUpdateNotification(String userId, String dataType, Object data) {WebSocketMessage message = WebSocketMessage.builder().type(MessageType.NOTIFICATION).receiverId(userId).content("数据更新通知").data(Map.of("dataType", dataType, "data", data)).timestamp(System.currentTimeMillis()).messageId(UUID.randomUUID().toString()).build();messageHandler.sendToUser(userId, message);log.info("推送数据更新通知: userId={}, dataType={}", userId, dataType);}/*** 获取在线用户统计*/public Map<String, Object> getOnlineStatistics() {Map<String, Object> stats = new HashMap<>();stats.put("onlineUserCount", sessionManager.getOnlineUserCount());stats.put("totalSessionCount", sessionManager.getTotalSessionCount());stats.put("onlineUsers", sessionManager.getOnlineUsers());return stats;}
}

客户端集成

JavaScript客户端

/*** WebSocket客户端类*/
class WebSocketClient {constructor(options = {}) {this.url = options.url || this.buildWebSocketUrl();this.token = options.token || this.getToken();this.userId = options.userId || this.getUserId();this.reconnectInterval = options.reconnectInterval || 5000;this.maxReconnectAttempts = options.maxReconnectAttempts || 10;this.heartbeatInterval = options.heartbeatInterval || 30000;this.ws = null;this.reconnectAttempts = 0;this.heartbeatTimer = null;this.isConnected = false;this.messageHandlers = new Map();this.eventListeners = new Map();this.init();}/*** 初始化WebSocket连接*/init() {try {const wsUrl = `${this.url}/${this.userId}?token=${this.token}`;this.ws = new WebSocket(wsUrl);this.ws.onopen = this.onOpen.bind(this);this.ws.onmessage = this.onMessage.bind(this);this.ws.onclose = this.onClose.bind(this);this.ws.onerror = this.onError.bind(this);} catch (error) {console.error('WebSocket初始化失败:', error);this.scheduleReconnect();}}/*** 连接打开事件*/onOpen(event) {console.log('WebSocket连接已建立');this.isConnected = true;this.reconnectAttempts = 0;// 启动心跳this.startHeartbeat();// 触发连接事件this.emit('connected', event);}/*** 接收消息事件*/onMessage(event) {try {const message = JSON.parse(event.data);console.log('收到WebSocket消息:', message);// 处理心跳响应if (message.type === 'HEARTBEAT' && message.content === 'pong') {return;}// 调用消息处理器const handler = this.messageHandlers.get(message.type);if (handler) {handler(message);}// 触发消息事件this.emit('message', message);} catch (error) {console.error('解析WebSocket消息失败:', error);}}/*** 连接关闭事件*/onClose(event) {console.log('WebSocket连接已关闭:', event.code, event.reason);this.isConnected = false;// 停止心跳this.stopHeartbeat();// 触发断开事件this.emit('disconnected', event);// 尝试重连if (event.code !== 1000) { // 非正常关闭this.scheduleReconnect();}}/*** 连接错误事件*/onError(event) {console.error('WebSocket连接错误:', event);this.emit('error', event);}/*** 发送消息*/send(message) {if (!this.isConnected) {console.warn('WebSocket未连接,无法发送消息');return false;}try {const messageStr = typeof message === 'string' ? message : JSON.stringify(message);this.ws.send(messageStr);return true;} catch (error) {console.error('发送WebSocket消息失败:', error);return false;}}/*** 发送聊天消息*/sendChatMessage(content, receiverId = null) {const message = {type: 'CHAT',content: content,receiverId: receiverId,timestamp: Date.now()};return this.send(message);}/*** 发送私聊消息*/sendPrivateMessage(receiverId, content) {const message = {type: 'PRIVATE',receiverId: receiverId,content: content,timestamp: Date.now()};return this.send(message);}/*** 启动心跳*/startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.isConnected) {this.send({type: 'HEARTBEAT',content: 'ping',timestamp: Date.now()});}}, this.heartbeatInterval);}/*** 停止心跳*/stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);this.heartbeatTimer = null;}}/*** 安排重连*/scheduleReconnect() {if (this.reconnectAttempts >= this.maxReconnectAttempts) {console.error('WebSocket重连次数已达上限');this.emit('reconnectFailed');return;}this.reconnectAttempts++;console.log(`WebSocket将在${this.reconnectInterval}ms后进行第${this.reconnectAttempts}次重连`);setTimeout(() => {this.init();}, this.reconnectInterval);}/*** 手动重连*/reconnect() {this.close();this.reconnectAttempts = 0;this.init();}/*** 关闭连接*/close() {if (this.ws) {this.ws.close(1000, '客户端主动关闭');}this.stopHeartbeat();}/*** 注册消息处理器*/onMessage(type, handler) {this.messageHandlers.set(type, handler);}/*** 注册事件监听器*/on(event, listener) {if (!this.eventListeners.has(event)) {this.eventListeners.set(event, []);}this.eventListeners.get(event).push(listener);}/*** 移除事件监听器*/off(event, listener) {if (this.eventListeners.has(event)) {const listeners = this.eventListeners.get(event);const index = listeners.indexOf(listener);if (index > -1) {listeners.splice(index, 1);}}}/*** 触发事件*/emit(event, data) {if (this.eventListeners.has(event)) {this.eventListeners.get(event).forEach(listener => {try {listener(data);} catch (error) {console.error('事件监听器执行失败:', error);}});}}/*** 构建WebSocket URL*/buildWebSocketUrl() {const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';const host = window.location.host;return `${protocol}//${host}/websocket`;}/*** 获取Token*/getToken() {return localStorage.getItem('token') || sessionStorage.getItem('token');}/*** 获取用户ID*/getUserId() {const userInfo = JSON.parse(localStorage.getItem('userInfo') || '{}');return userInfo.userId || userInfo.id;}/*** 获取连接状态*/getConnectionState() {return {isConnected: this.isConnected,reconnectAttempts: this.reconnectAttempts,readyState: this.ws ? this.ws.readyState : WebSocket.CLOSED};}
}/*** WebSocket管理器*/
class WebSocketManager {constructor() {this.client = null;this.messageQueue = [];this.isInitialized = false;}/*** 初始化WebSocket*/init(options = {}) {if (this.isInitialized) {return;}this.client = new WebSocketClient(options);// 注册默认事件处理器this.client.on('connected', () => {console.log('WebSocket管理器:连接已建立');this.processMessageQueue();});this.client.on('disconnected', () => {console.log('WebSocket管理器:连接已断开');});this.client.on('message', (message) => {this.handleMessage(message);});// 注册默认消息处理器this.registerDefaultHandlers();this.isInitialized = true;}/*** 注册默认消息处理器*/registerDefaultHandlers() {// 系统消息处理器this.client.onMessage('SYSTEM', (message) => {this.showNotification('系统消息', message.content, 'info');});// 通知消息处理器this.client.onMessage('NOTIFICATION', (message) => {const data = message.data || {};this.showNotification(data.title || '通知', message.content, data.level || 'info');});// 错误消息处理器this.client.onMessage('ERROR', (message) => {this.showNotification('错误', message.content, 'error');});// 聊天消息处理器this.client.onMessage('CHAT', (message) => {this.handleChatMessage(message);});}/*** 发送消息*/send(message) {if (this.client && this.client.isConnected) {return this.client.send(message);} else {// 连接未建立时,将消息加入队列this.messageQueue.push(message);return false;}}/*** 处理消息队列*/processMessageQueue() {while (this.messageQueue.length > 0) {const message = this.messageQueue.shift();this.client.send(message);}}/*** 处理接收到的消息*/handleMessage(message) {// 可以在这里添加全局消息处理逻辑console.log('WebSocket管理器收到消息:', message);}/*** 处理聊天消息*/handleChatMessage(message) {// 显示聊天消息console.log('收到聊天消息:', message);// 可以触发自定义事件document.dispatchEvent(new CustomEvent('chatMessage', {detail: message}));}/*** 显示通知*/showNotification(title, content, level = 'info') {// 这里可以集成具体的通知组件console.log(`[${level.toUpperCase()}] ${title}: ${content}`);// 如果支持浏览器通知if ('Notification' in window && Notification.permission === 'granted') {new Notification(title, {body: content,icon: '/favicon.ico'});}}/*** 销毁WebSocket连接*/destroy() {if (this.client) {this.client.close();this.client = null;}this.messageQueue = [];this.isInitialized = false;}
}// 创建全局WebSocket管理器实例
window.wsManager = new WebSocketManager();// 页面加载完成后初始化WebSocket
document.addEventListener('DOMContentLoaded', () => {window.wsManager.init();
});// 页面卸载前关闭WebSocket连接
window.addEventListener('beforeunload', () => {window.wsManager.destroy();
});

实时监控实现

系统监控WebSocket

/*** 系统监控WebSocket控制器*/
@RestController
@RequestMapping("/api/v1/monitor")
@Slf4j
public class MonitorWebSocketController {@Autowiredprivate WebSocketMessagePushService messagePushService;@Autowiredprivate SystemMonitorService systemMonitorService;/*** 获取在线用户统计*/@GetMapping("/online-stats")public ApiResponse<Map<String, Object>> getOnlineStats() {Map<String, Object> stats = messagePushService.getOnlineStatistics();return ApiResponse.success(stats);}/*** 推送系统监控数据*/@PostMapping("/push-system-data")public ApiResponse<Void> pushSystemData() {try {Map<String, Object> systemData = systemMonitorService.getSystemData();WebSocketMessage message = WebSocketMessage.builder().type(MessageType.NOTIFICATION).content("系统监控数据更新").data(systemData).timestamp(System.currentTimeMillis()).build();messagePushService.broadcastMessage(message);return ApiResponse.success("系统监控数据推送成功");} catch (Exception e) {log.error("推送系统监控数据失败", e);return ApiResponse.error("推送失败: " + e.getMessage());}}/*** 推送告警消息*/@PostMapping("/push-alert")public ApiResponse<Void> pushAlert(@RequestBody AlertMessage alertMessage) {try {if (StringUtils.hasText(alertMessage.getUserId())) {// 推送给指定用户messagePushService.pushWarningMessage(alertMessage.getUserId(), alertMessage.getMessage());} else {// 广播告警messagePushService.pushSystemNotification("系统告警", alertMessage.getMessage());}return ApiResponse.success("告警消息推送成功");} catch (Exception e) {log.error("推送告警消息失败", e);return ApiResponse.error("推送失败: " + e.getMessage());}}
}/*** 系统监控数据定时推送*/
@Component
@Slf4j
public class SystemMonitorScheduler {@Autowiredprivate WebSocketMessagePushService messagePushService;@Autowiredprivate SystemMonitorService systemMonitorService;/*** 定时推送系统监控数据*/@Scheduled(fixedRate = 30000) // 每30秒推送一次public void pushSystemMonitorData() {try {Map<String, Object> systemData = systemMonitorService.getSystemData();WebSocketMessage message = WebSocketMessage.builder().type(MessageType.NOTIFICATION).content("系统监控数据").data(Map.of("type", "systemMonitor","data", systemData,"updateTime", System.currentTimeMillis())).timestamp(System.currentTimeMillis()).build();// 只推送给有监控权限的用户Set<String> monitorUsers = getMonitorUsers();for (String userId : monitorUsers) {messagePushService.sendToUser(userId, message);}} catch (Exception e) {log.error("推送系统监控数据失败", e);}}/*** 获取有监控权限的用户*/private Set<String> getMonitorUsers() {// 这里应该从数据库或缓存中获取有监控权限的用户列表// 为了演示,返回一个模拟的用户集合return Set.of("admin", "monitor");}
}/*** 告警消息类*/
@Data
public class AlertMessage {private String userId;private String message;private String level;private Map<String, Object> data;
}

总结

本文详细介绍了WebSocket实时通信,包括:

  1. WebSocket服务端配置:基础配置、握手拦截器、消息处理器、会话管理
  2. 消息推送机制:消息处理器、推送服务、消息类型处理
  3. 客户端集成:JavaScript客户端实现、WebSocket管理器、事件处理
  4. 实时监控实现:系统监控WebSocket、定时数据推送、告警消息处理

WebSocket为Web应用提供了强大的实时通信能力,通过合理的架构设计和实现,可以构建高效、稳定的实时通信系统。

至此,第六篇《Web开发与前后端交互》的内容就完成了。

参考资料

  • WebSocket协议规范
  • Spring WebSocket官方文档
  • WebSocket API - MDN
http://www.dtcms.com/a/374420.html

相关文章:

  • vlan(局部虚拟网)
  • MissionPlanner架构梳理之(十)-参数编辑器
  • Hadoop Windows客户端配置与实践指南
  • 【NVIDIA-B200】 ‘CUDA driver version is insufficient for CUDA runtime version‘
  • 从源码视角全面解析 Chrome UI 布局系统及 Views 框架的定制化实现方法与实践经验
  • 9.9 ajax的请求和封装
  • CTFshow系列——PHP特性Web101-104
  • MCP学习一——UV安装使用教程
  • 【Java实战㊳】Spring Boot实战:从打包到监控的全链路攻略
  • Go语言实战案例-开发一个Markdown转HTML工具
  • idea、服务器、数据库环境时区不一致问题
  • HarmonyOS 5.1.1版本图片上传功能
  • 2025最新超详细FreeRTOS入门教程:第八章 FreeRTOS任务通知
  • Puter+CPolar低成本替代商业网盘,打造私有云新势力
  • Deepoc科技之暖:智能助盲设备如何为视障家人点亮生活
  • 详细的vmware虚拟机安装教程
  • uni-app 项目中使用自定义字体
  • springboot maven 多环境配置入门与实战
  • 时序数据库选型指南:基于大数据视角的IoTDB应用优势分析详解!
  • 炫光活体检测技术:通过光学技术实现高效、安全的身份验证,有效防御多种伪造手段。
  • sqlite3的加解密全过程
  • Django REST Framework 中 @action 装饰器详解
  • 【Docker】一键将运行中的容器打包成镜像并导出
  • LLVM 数据结构简介
  • MCP与http、websocket的关系
  • 【modbus学习】
  • 【linux】sed/awk命令检索区间日志
  • 瑞派虹泰环城总院 | 打造“一站式宠物诊疗空间”,定义全国宠物医疗新高度
  • 数据分析画图显示中文
  • 嵌入式ARM架构学习3——启动代码