Websocket两台服务器之间的通信
服务器端:
<!--jwt--><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.9.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
WebSocketMessage:
package com.aoto.config.websocket;import com.alibaba.fastjson2.JSON;
import lombok.Data;
import java.util.UUID;@Data
public class WebSocketMessage {private String messageId;private String type;private String from;private String to;private Object data;private Long timestamp;// 确保这个create方法正确定义public static WebSocketMessage create(String type, String from, String to, Object data) {WebSocketMessage message = new WebSocketMessage();message.setMessageId(UUID.randomUUID().toString());message.setType(type);message.setFrom(from);message.setTo(to);message.setData(data);message.setTimestamp(System.currentTimeMillis());return message;}public String toJson() {return JSON.toJSONString(this);}public static WebSocketMessage fromJson(String json) {return JSON.parseObject(json, WebSocketMessage.class);}
}
ServerBWebSocketConfig
package com.aoto.config.websocket;import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration
@EnableWebSocket
public class ServerBWebSocketConfig implements WebSocketConfigurer {private final ServerBWebSocketHandler serverBWebSocketHandler;public ServerBWebSocketConfig(ServerBWebSocketHandler serverBWebSocketHandler) {this.serverBWebSocketHandler = serverBWebSocketHandler;}@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(serverBWebSocketHandler, "/ws/{userId}").setAllowedOrigins("*");System.out.println("Server B WebSocket服务已启动: ws://localhost:8081/ws/{userId}");}
}
ServerBWebSocketHandler
package com.aoto.config.websocket;
import com.aoto.service.impl.WhiteUserServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;import java.net.InetSocketAddress;@Component
public class ServerBWebSocketHandler extends TextWebSocketHandler {@Value("${server.identity:ServerB}")private String serverIdentity;// 存储所有连接的客户端会话,key 改为 userIdprivate final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();// 存储 sessionId 到 userId 的映射private final Map<String, String> sessionToUserMap = new ConcurrentHashMap<>();private final static Logger logger = LoggerFactory.getLogger(ServerBWebSocketHandler.class);@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// 从路径参数中获取 userIdString userId = extractUserIdFromUri(session.getUri());String sessionId = session.getId();if (userId == null || userId.trim().isEmpty()) {logger.error("错误: 未提供用户ID,关闭连接");session.close(CloseStatus.NOT_ACCEPTABLE.withReason("用户ID不能为空"));return;}// 检查该用户是否已经连接if (userSessions.containsKey(userId)) {logger.info("用户 " + userId + " 已存在连接,关闭旧连接");WebSocketSession oldSession = userSessions.get(userId);if (oldSession != null && oldSession.isOpen()) {oldSession.close(CloseStatus.NORMAL.withReason("新连接建立"));}}// 存储会话userSessions.put(userId, session);sessionToUserMap.put(sessionId, userId);String clientAddress = getClientAddress(session);logger.info(serverIdentity + ": 用户 " + userId + " 连接建立, Session ID: " + sessionId);logger.info("客户端地址: " + clientAddress);// 发送欢迎消息sendWelcomeMessage(session, userId);}/*** 从 WebSocket URI 中提取 userId*/private String extractUserIdFromUri(java.net.URI uri) {try {UriComponents components = UriComponentsBuilder.fromUri(uri).build();return components.getPathSegments().get(components.getPathSegments().size() - 1);} catch (Exception e) {logger.error("提取用户ID失败: " + e.getMessage());return null;}}private String getClientAddress(WebSocketSession session) {try {InetSocketAddress remoteAddress = session.getRemoteAddress();return remoteAddress != null ? remoteAddress.toString() : "未知";} catch (Exception e) {return "未知";}}private void sendWelcomeMessage(WebSocketSession session, String userId) throws IOException {WebSocketMessage welcomeMsg = WebSocketMessage.create("WELCOME",serverIdentity,userId,"欢迎用户 " + userId + " 连接到 " + serverIdentity);session.sendMessage(new TextMessage(welcomeMsg.toJson()));logger.info(serverIdentity + " 发送欢迎消息给用户: " + userId);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();String userId = sessionToUserMap.get(session.getId());logger.info(serverIdentity + " 收到用户 " + userId + " 的消息: " + payload);// 处理客户端消息processClientMessage(session, payload, userId);}private void processClientMessage(WebSocketSession session, String message, String userId) {// 这里可以处理客户端发送的消息logger.info(serverIdentity + " 处理用户 " + userId + " 的消息: " + message);}/*** 向指定用户发送消息*/public void sendToUser(String userId, String message) throws IOException {if (userId == null || userId.trim().isEmpty()) {throw new IllegalArgumentException("用户ID不能为空");}WebSocketSession session = userSessions.get(userId);if (session != null && session.isOpen()) {session.sendMessage(new TextMessage(message));logger.info(serverIdentity + " 发送消息到用户: " + userId);} else {logger.info("用户连接不存在或已关闭: " + userId);}}/*** 向所有用户广播消息*/public void broadcastToUsers(String message) {userSessions.forEach((userId, session) -> {try {if (session.isOpen()) {session.sendMessage(new TextMessage(message));}} catch (IOException e) {logger.error("向用户 " + userId + " 发送消息失败: " + e.getMessage());}});logger.info(serverIdentity + " 向所有用户广播消息");}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {String sessionId = session.getId();String userId = sessionToUserMap.get(sessionId);if (userId != null) {userSessions.remove(userId);sessionToUserMap.remove(sessionId);logger.info(serverIdentity + ": 用户 " + userId + " 连接关闭, 原因: " + status.getReason());} else {logger.info(serverIdentity + ": 未知用户连接关闭, Session ID: " + sessionId);}}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {String userId = sessionToUserMap.get(session.getId());logger.error(serverIdentity + " 用户 " + userId + " 传输错误: " + exception.getMessage());}/*** 获取用户会话(用于调试)*/public WebSocketSession getUserSession(String userId) {return userSessions.get(userId);}/*** 获取所有已连接的用户ID*/public String[] getAllConnectedUsers() {return userSessions.keySet().toArray(new String[0]);}/*** 获取连接的客户端数量*/public int getConnectedUserCount() {return (int) userSessions.values().stream().filter(WebSocketSession::isOpen).count();}/*** 检查用户是否在线*/public boolean isUserOnline(String userId) {WebSocketSession session = userSessions.get(userId);return session != null && session.isOpen();}
}
ServerBController
package com.aoto.config.websocket;import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;@RestController
@RequestMapping("/api")
public class ServerBController {@Value("${server.identity:ServerB}")private String serverIdentity;private final ServerBWebSocketHandler webSocketHandler;public ServerBController(ServerBWebSocketHandler webSocketHandler) {this.webSocketHandler = webSocketHandler;}@GetMapping("/status")public Map<String, Object> getStatus() {Map<String, Object> status = new HashMap<>();status.put("server", serverIdentity);status.put("connectedUsers", webSocketHandler.getConnectedUserCount());status.put("onlineUsers", webSocketHandler.getAllConnectedUsers());status.put("timestamp", System.currentTimeMillis());return status;}@GetMapping("/users/{userId}/status")public Map<String, Object> getUserStatus(@PathVariable String userId) {Map<String, Object> response = new HashMap<>();response.put("userId", userId);response.put("isOnline", webSocketHandler.isUserOnline(userId));response.put("timestamp", System.currentTimeMillis());return response;}@PostMapping("/send-to-user")public Map<String, Object> sendToUser(@RequestBody Map<String, String> request) {String userId = request.get("userId");String message = request.get("message");Map<String, Object> response = new HashMap<>();// 参数验证if (userId == null || userId.trim().isEmpty()) {response.put("status", "error");response.put("message", "userId 参数不能为空");response.put("onlineUsers", webSocketHandler.getAllConnectedUsers());return response;}if (message == null || message.trim().isEmpty()) {response.put("status", "error");response.put("message", "message 参数不能为空");return response;}try {webSocketHandler.sendToUser(userId, message);response.put("status", "success");response.put("message", "消息已发送到用户: " + userId);response.put("userId", userId);} catch (IOException e) {response.put("status", "error");response.put("message", "发送消息失败: " + e.getMessage());response.put("onlineUsers", webSocketHandler.getAllConnectedUsers());} catch (IllegalArgumentException e) {response.put("status", "error");response.put("message", "参数错误: " + e.getMessage());}return response;}@PostMapping("/broadcast")public Map<String, Object> broadcastMessage(@RequestBody Map<String, String> request) {String message = request.get("message");if (message == null || message.trim().isEmpty()) {Map<String, Object> response = new HashMap<>();response.put("status", "error");response.put("message", "message 参数不能为空");return response;}webSocketHandler.broadcastToUsers(message);Map<String, Object> response = new HashMap<>();response.put("status", "success");response.put("message", "广播消息已发送");response.put("usersCount", webSocketHandler.getConnectedUserCount());return response;}@PostMapping("/send-structured")public Map<String, Object> sendStructuredMessage(@RequestBody Map<String, Object> request) {String userId = (String) request.get("userId");String messageType = (String) request.get("type");Object data = request.get("data");Map<String, Object> response = new HashMap<>();if (userId == null || userId.trim().isEmpty()) {response.put("status", "error");response.put("message", "userId 参数不能为空");return response;}WebSocketMessage message = WebSocketMessage.create(messageType, serverIdentity, userId, data);try {webSocketHandler.sendToUser(userId, message.toJson());response.put("status", "success");response.put("message", "结构化消息已发送");response.put("type", messageType);response.put("userId", userId);} catch (IOException e) {response.put("status", "error");response.put("message", "发送消息失败: " + e.getMessage());}return response;}
}
客户端:
ServerAWebSocketClient
package com.aoto.virtual_studio.utils.websocket;import com.aoto.virtual_studio.controller.BoroomController;
import com.aoto.virtual_studio.utils.RedisUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.*;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.TextWebSocketHandler;import javax.annotation.PostConstruct;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Service
public class ServerAWebSocketClient {private String serverIdentity = "ServerA";private String serverBWebSocketUrl = "ws://metabox.aoto.com:8086/ws";private boolean messageHandlerEnabled = true;private WebSocketSession serverSession;private boolean connected = false;// 存储接收到的消息private final List<String> receivedMessages = new CopyOnWriteArrayList<>();// 消息监听器private final List<MessageListener> messageListeners = new CopyOnWriteArrayList<>();private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();private volatile boolean reconnecting = false;private final static Logger logger = LoggerFactory.getLogger(ServerAWebSocketClient.class);@Autowiredprivate RedisUtil redisUtil;// @PostConstruct
// public void init() {
// connectToServerB();
// }public void connectToServerB(String url) {url = serverBWebSocketUrl + "/" + url;try {WebSocketClient client = new StandardWebSocketClient();WebSocketHttpHeaders headers = new WebSocketHttpHeaders();// 保存外部类引用final ServerAWebSocketClient outer = this;WebSocketHandler handler = new TextWebSocketHandler() {@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {outer.serverSession = session;outer.connected = true;logger.info(outer.serverIdentity + ": 成功连接到 Server B");// 通过外部类引用调用方法outer.notifyConnectionEstablished();}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();System.out.println(outer.serverIdentity + " 收到 Server B 的消息: " + payload);// 存储消息outer.receivedMessages.add(payload);// 处理消息outer.processReceivedMessage(payload);// 通知所有监听器outer.notifyMessageReceived(payload);}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {System.err.println(outer.serverIdentity + ": 传输错误: " + exception.getMessage());outer.connected = false;outer.notifyConnectionLost();
// outer.scheduleReconnect();}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {System.out.println(outer.serverIdentity + ": 与 Server B 的连接关闭: " + closeStatus.getReason());outer.connected = false;outer.notifyConnectionLost();
// outer.scheduleReconnect();}};logger.info(serverIdentity + ": 尝试连接到 " + url);client.doHandshake(handler, headers, URI.create(url));} catch (Exception e) {logger.error(serverIdentity + ": 连接失败: " + e.getMessage());
// scheduleReconnect();}}private void processReceivedMessage(String message) {if (!messageHandlerEnabled) {return;}try {WebSocketMessage wsMessage = WebSocketMessage.fromJson(message);String messageType = wsMessage.getType();Object data = wsMessage.getData();logger.info(serverIdentity + " 处理消息 - 类型: " + messageType + ", 来自: " + wsMessage.getFrom());switch (messageType) {case "WELCOME":handleWelcomeMessage(wsMessage);break;case "NOTIFICATION":handleNotificationMessage(wsMessage);break;case "BROADCAST":handleBroadcastMessage(wsMessage);break;default:handleDefaultMessage(wsMessage);}} catch (Exception e) {// 如果不是JSON格式,按普通文本处理logger.info(serverIdentity + " 处理文本消息: " + message);}}private void handleWelcomeMessage(WebSocketMessage message) {logger.info(serverIdentity + " 收到欢迎消息: " + message.getData());// 可以在这里执行连接成功后的初始化操作}private void handleNotificationMessage(WebSocketMessage message) {Object data = message.getData();logger.info(serverIdentity + " 收到通知: " + data);// 这里可以添加通知处理逻辑,比如显示通知、记录日志等if (data instanceof Map) {Map<?, ?> notification = (Map<?, ?>) data;String title = String.valueOf(notification.get("title"));String content = String.valueOf(notification.get("content"));String priority = String.valueOf(notification.get("priority"));logger.info("=== 通知 ===");logger.info("标题: " + title);logger.info("内容: " + content);logger.info("优先级: " + priority);logger.info("============");}}private void handleBroadcastMessage(WebSocketMessage message) {logger.info(serverIdentity + " 收到广播消息: " + message.getData());// 处理广播消息}private void handleDefaultMessage(WebSocketMessage message) {logger.info(serverIdentity + " 处理默认消息: " + message.getData());// 处理其他类型的消息}// private void scheduleReconnect() {
// ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
// scheduler.schedule(() -> {
// System.out.println(serverIdentity + ": 尝试重新连接 Server B");
// connectToServerB("111");
// }, 5, TimeUnit.SECONDS);
// }// private void scheduleReconnect() {
// if (reconnecting) {
// logger.info(serverIdentity + ": 重连已在进行中,跳过本次调度");
// return;
// }
//
// reconnecting = true;
// scheduler.schedule(() -> {
// try {
// logger.info(serverIdentity + ": 尝试重新连接 Server B");
// Object webscoketNotify = redisUtil.get("webscoketNotify");
// if (webscoketNotify != null){
// String key = String.valueOf(webscoketNotify);
// connectToServerB(key); // 考虑使用配置参数
// reconnecting = false;
// }
// } catch (Exception e) {
// logger.error(serverIdentity + ": 重连失败: " + e.getMessage());
// reconnecting = false;
// // 可以考虑在这里实现指数退避重试
// }
// }, 5, TimeUnit.SECONDS);
// }// 消息监听器相关方法public void addMessageListener(MessageListener listener) {messageListeners.add(listener);}public void removeMessageListener(MessageListener listener) {messageListeners.remove(listener);}private void notifyMessageReceived(String message) {for (MessageListener listener : messageListeners) {listener.onMessageReceived(message);}}private void notifyConnectionEstablished() {for (MessageListener listener : messageListeners) {listener.onConnectionEstablished();}}private void notifyConnectionLost() {for (MessageListener listener : messageListeners) {listener.onConnectionLost();}}public boolean isConnected() {return connected && serverSession != null && serverSession.isOpen();}public List<String> getReceivedMessages() {return new ArrayList<>(receivedMessages);}public void clearReceivedMessages() {receivedMessages.clear();}public int getReceivedMessageCount() {return receivedMessages.size();}
}/*** 消息监听器接口*/
interface MessageListener {void onMessageReceived(String message);void onConnectionEstablished();void onConnectionLost();
}
ServerAController
package com.aoto.virtual_studio.utils.websocket;import com.alibaba.fastjson.JSONObject;
import com.aoto.virtual_studio.constant.Constant;
import com.aoto.virtual_studio.entity.SystemMessage;
import com.aoto.virtual_studio.service.MetaBoxApiService;
import com.aoto.virtual_studio.service.SystemService;
import com.aoto.virtual_studio.service.impl.MetaBoxApiServiceImpl;
import com.aoto.virtual_studio.utils.ApiResult;
import com.aoto.virtual_studio.utils.RedisUtil;
import com.aoto.virtual_studio.utils.WebSocketServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@RestController
@RequestMapping("/api")
public class ServerAController {private final static Logger logger = LoggerFactory.getLogger(ServerAController.class);private String serverIdentity = "ServerA";private final ServerAWebSocketClient webSocketClient;@Autowiredprivate SystemService systemService;@Resource(name = "jarPath")private String rootPath;@Autowiredprivate RedisUtil redisUtil;@Autowiredprivate MetaBoxApiService metaBoxApiService;public ServerAController(ServerAWebSocketClient webSocketClient) {this.webSocketClient = webSocketClient;// 添加消息监听器示例webSocketClient.addMessageListener(new MessageListener() {@Overridepublic void onMessageReceived(String message) {logger.info("监听器: 收到新消息: " + message);if ("自动下线通知:该账号已经在其他设备上登录".equals(message)){logger.info("onMessageReceived:{}", message);Object phoneNumberObject = redisUtil.get("phoneNumber");if (phoneNumberObject != null){metaBoxApiService.loginSuccessNotify(0);redisUtil.del("phoneNumber");// 发送websocket通知前端JSONObject jsonObject = new JSONObject();jsonObject.put("loginStatus", "offline");WebSocketServerUtil.sendInfoToOneUser(JSONObject.toJSONString(jsonObject), "content");}}if ("loginOutStatus".equals(message)){logger.info("onMessageReceived:{}", message);Object phoneNumberObject = redisUtil.get("phoneNumber");if (phoneNumberObject != null){metaBoxApiService.loginSuccessNotify(0);redisUtil.del("phoneNumber");// 发送websocket通知前端JSONObject jsonObject = new JSONObject();jsonObject.put("loginStatus", "loginExpired");WebSocketServerUtil.sendInfoToOneUser(JSONObject.toJSONString(jsonObject), "content");}}}@Overridepublic void onConnectionEstablished() {System.out.println("监听器: 连接已建立");}@Overridepublic void onConnectionLost() {System.out.println("监听器: 连接丢失");}});}@GetMapping("/status")public Map<String, Object> getStatus() {Map<String, Object> status = new HashMap<>();status.put("server", serverIdentity);status.put("connected", webSocketClient.isConnected());status.put("receivedMessageCount", webSocketClient.getReceivedMessageCount());status.put("timestamp", System.currentTimeMillis());return status;}@GetMapping("/messages")public Map<String, Object> getReceivedMessages() {List<String> messages = webSocketClient.getReceivedMessages();Map<String, Object> response = new HashMap<>();response.put("server", serverIdentity);response.put("messageCount", messages.size());response.put("messages", messages);response.put("timestamp", System.currentTimeMillis());return response;}@PostMapping("/messages/clear")public Map<String, Object> clearMessages() {webSocketClient.clearReceivedMessages();Map<String, Object> response = new HashMap<>();response.put("status", "success");response.put("message", "已清空接收到的消息");return response;}@GetMapping("/messages/latest")public Map<String, Object> getLatestMessage() {List<String> messages = webSocketClient.getReceivedMessages();String latestMessage = messages.isEmpty() ? "无消息" : messages.get(messages.size() - 1);Map<String, Object> response = new HashMap<>();response.put("server", serverIdentity);response.put("hasMessages", !messages.isEmpty());response.put("latestMessage", latestMessage);response.put("timestamp", System.currentTimeMillis());return response;}@PostMapping("/loginNotify")public ApiResult loginNotify() {SystemMessage systemMessage = systemService.getSystemMessage();String path = rootPath + Constant.RootPath.PROJECT + systemMessage.getProjectName();// 发送websocket通知前端JSONObject jsonObject = new JSONObject();jsonObject.put("type", "true");jsonObject.put("path", path);WebSocketServerUtil.sendInfoToOneUser(JSONObject.toJSONString(jsonObject), "isLogin");return ApiResult.success();}
}
WebSocketMessage
package com.aoto.virtual_studio.utils.websocket;import com.alibaba.fastjson2.JSON;
import lombok.Data;import java.util.UUID;@Data
public class WebSocketMessage {private String messageId;private String type;private String from;private String to;private Object data;private Long timestamp;public static WebSocketMessage create(String type, String from, String to, Object data) {WebSocketMessage message = new WebSocketMessage();message.setMessageId(UUID.randomUUID().toString());message.setType(type);message.setFrom(from);message.setTo(to);message.setData(data);message.setTimestamp(System.currentTimeMillis());return message;}public String toJson() {return JSON.toJSONString(this);}public static WebSocketMessage fromJson(String json) {return JSON.parseObject(json, WebSocketMessage.class);}
}