1. pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
2. 相关配置
1. WebSocketConfig
package com.lxsy.framework.config.websocket.conf;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}
2. SemaphoreUtils (JUC信号量)
package com.lxsy.framework.config.websocket;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreUtils
{public static boolean tryAcquire(Semaphore semaphore){boolean flag = false;try{flag = semaphore.tryAcquire();}catch (Exception e){log.error("获取信号量异常", e);}return flag;}public static void release(Semaphore semaphore){try{semaphore.release();}catch (Exception e){log.error("释放信号量异常", e);}}
}
3. WebSocketServer
package com.lxsy.framework.config.websocket;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
@Slf4j
@Component
@ServerEndpoint("/websocket/message")
public class WebSocketServer2 {public static int socketMaxOnlineCount = 100;private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);@AllArgsConstructor@NoArgsConstructor@Dataprivate static class User {private String id; private String name; private Session session; }private static ConcurrentHashMap<String, User> users = new ConcurrentHashMap<>();private static ConcurrentHashMap<String, CopyOnWriteArrayList<String>> messageCache = new ConcurrentHashMap<>();@OnOpenpublic void onOpen(Session session) throws Exception {if (!SemaphoreUtils.tryAcquire(socketSemaphore)) {log.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);session.close();} else {WebSocketUsers.put(session.getId(), session);log.info("\n 建立连接 - {}", session);log.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());WebSocketUsers.sendMessageToUserByText(session, "连接成功");}}@OnClosepublic void onClose(Session session) {log.info("\n 关闭连接 - {}", session);for (User user : users.values()) {if (user.getId().equals(session.getId())) {user.setSession(null);break;}}boolean removeFlag = WebSocketUsers.remove(session.getId());if (!removeFlag) {SemaphoreUtils.release(socketSemaphore);}}@OnErrorpublic void onError(Session session, Throwable exception) throws Exception {if (session.isOpen()) {session.close();}String sessionId = session.getId();log.info("\n 连接异常 - {}", sessionId);log.info("\n 异常信息 - {}", exception);WebSocketUsers.remove(sessionId);SemaphoreUtils.release(socketSemaphore);}@OnMessagepublic void onMessage(String message, Session session) {try {ObjectMapper objectMapper = new ObjectMapper();JsonNode jsonNode = objectMapper.readTree(message);if (jsonNode.has("username") && !jsonNode.get("username").asText().isEmpty()) {String username = jsonNode.get("username").asText();boolean userExists = false;for (User user : users.values()) {if (user.getName().equals(username)) {userExists = true;user.setId(session.getId());user.setSession(session);searchCache(username, user);log.info("\n用户 {} 已更新", username);log.info("\n当前用户列表 {}", users);break;}}if (!userExists) {User user = new User(session.getId(), username, session);users.put(session.getId(), user);log.info("\n用户 {} 已连接", username);log.info("\n当前用户列表 {}", users);searchCache(username, user);}} else if (jsonNode.has("targetusername") && jsonNode.has("sendmessage")) {String targetusername = jsonNode.get("targetusername").asText();String sendmessage = jsonNode.get("sendmessage").asText();boolean userExists = false;for (User user : users.values()) {if (user.getName().equals(targetusername)) {if (user.getSession() != null) {userExists = true;WebSocketUsers.sendMessageToUserByText(user.getSession(), sendmessage);}break;}}log.info("\n目标用户 {}", targetusername);log.info("\n发送消息 {}", sendmessage);log.info("\n当前用户列表 {}", users);if (!userExists) {messageCache.computeIfAbsent(targetusername, k -> new CopyOnWriteArrayList<String>()).add(sendmessage);log.info("\n消息已缓存,等待用户 {} 上线", targetusername);}}} catch (Exception e) {String msg = message.replace("你", "我").replace("吗", "");WebSocketUsers.sendMessageToUserByText(session, msg);}}public void searchCache(String username, User user) {if (messageCache.containsKey(username)) {CopyOnWriteArrayList<String> cachedMessages = messageCache.get(username);for (String cachedMessage : cachedMessages) {WebSocketUsers.sendMessageToUserByText(user.getSession(), cachedMessage);log.info("\n向用户 {} 发送缓存消息 - {}", username, cachedMessage);}messageCache.remove(username);log.info("\n已清理用户 {} 的缓存消息", username);}}
}
4. WebSocketUsers
package com.lxsy.framework.config.websocket;import jakarta.websocket.Session;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class WebSocketUsers {private static Map<String, Session> USERS = new ConcurrentHashMap<>();public static void put(String key, Session session) {USERS.put(key, session);}public static boolean remove(Session session) {String key = null;boolean flag = USERS.containsValue(session);if (flag) {Set<Map.Entry<String, Session>> entries = USERS.entrySet();for (Map.Entry<String, Session> entry : entries) {Session value = entry.getValue();if (value.equals(session)) {key = entry.getKey();break;}}} else {return true;}return remove(key);}public static boolean remove(String key) {Session remove = USERS.remove(key);if (remove != null) {boolean containsValue = USERS.containsValue(remove);return containsValue;} else {return true;}}public static Map<String, Session> getUsers() {return USERS;}public static void sendMessageToUsersByText(String message) {Collection<Session> values = USERS.values();for (Session value : values) {sendMessageToUserByText(value, message);}}public static void sendMessageToUserByText(Session session, String message) {if (session != null) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("\n[发送消息异常]", e);}} else {}}
}