当前位置: 首页 > news >正文

若依 springboot websocket

1. pom

<!-- SpringBoot Websocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

2. 相关配置

1. WebSocketConfig

package com.lxsy.framework.config.websocket.conf;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** websocket 配置*/
@Configuration
@EnableWebSocket
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}

2. SemaphoreUtils (JUC信号量)

package com.lxsy.framework.config.websocket;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Semaphore;/*** 信号量相关处理*/
@Slf4j
public class SemaphoreUtils
{/*** 获取信号量* @param semaphore* @return*/public static boolean tryAcquire(Semaphore semaphore){boolean flag = false;try{flag = semaphore.tryAcquire();}catch (Exception e){log.error("获取信号量异常", e);}return flag;}/*** 释放信号量* @param semaphore*/public static void release(Semaphore semaphore){try{semaphore.release();}catch (Exception e){log.error("释放信号量异常", e);}}
}

3. WebSocketServer

package com.lxsy.framework.config.websocket;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;/*** WebSocketServer 消息处理类,用于实现 WebSocket 服务端功能*/
@Slf4j
@Component
@ServerEndpoint("/websocket/message")
public class WebSocketServer2 {/*** 默认最多允许同时在线人数 100*/public static int socketMaxOnlineCount = 100;/*** 信号量,用于控制同时在线人数*/private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);// 内部静态类 User,用于存储用户信息@AllArgsConstructor@NoArgsConstructor@Dataprivate static class User {private String id; // 用户 IDprivate String name; // 用户名private Session session; // 用户会话}/*** 存储用户信息的 ConcurrentHashMap,键为用户 ID,值为 User 对象*/private static ConcurrentHashMap<String, User> users = new ConcurrentHashMap<>();/*** 消息缓存的 ConcurrentHashMap,键为用户名,值为缓存消息列表*/private static ConcurrentHashMap<String, CopyOnWriteArrayList<String>> messageCache = new ConcurrentHashMap<>();/*** 连接建立成功时调用的方法* @param session 用户会话*/@OnOpenpublic void onOpen(Session session) throws Exception {// 尝试获取信号量if (!SemaphoreUtils.tryAcquire(socketSemaphore)) {// 未获取到信号量,说明在线人数已达上限log.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);session.close();} else {// 将用户会话存储到 WebSocketUsers 中WebSocketUsers.put(session.getId(), session);log.info("\n 建立连接 - {}", session);log.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());WebSocketUsers.sendMessageToUserByText(session, "连接成功");}}/*** 连接关闭时调用的方法** @param session 用户会话*/@OnClosepublic void onClose(Session session) {log.info("\n 关闭连接 - {}", session);// 移除用户信息,移除用户时,将该用户的session值置为null// 用于判断该用户是否离线for (User user : users.values()) {if (user.getId().equals(session.getId())) {user.setSession(null);break;}}boolean removeFlag = WebSocketUsers.remove(session.getId());if (!removeFlag) {// 如果移除失败,则释放信号量SemaphoreUtils.release(socketSemaphore);}}/*** 抛出异常时调用的方法** @param session     用户会话* @param exception   异常信息* @throws Exception  可能抛出的异常*/@OnErrorpublic void onError(Session session, Throwable exception) throws Exception {if (session.isOpen()) {// 如果会话仍然开启,则关闭会话session.close();}String sessionId = session.getId();log.info("\n 连接异常 - {}", sessionId);log.info("\n 异常信息 - {}", exception);// 移除用户信息并释放信号量WebSocketUsers.remove(sessionId);SemaphoreUtils.release(socketSemaphore);}/*** 服务器接收到客户端消息时调用的方法** @param message 消息内容* @param session 用户会话*/@OnMessagepublic void onMessage(String message, Session session) {try {// 使用 Jackson 解析 JSON 消息ObjectMapper objectMapper = new ObjectMapper();JsonNode jsonNode = objectMapper.readTree(message);if (jsonNode.has("username") && !jsonNode.get("username").asText().isEmpty()) {String username = jsonNode.get("username").asText();// 检查用户是否已存在boolean userExists = false;for (User user : users.values()) {if (user.getName().equals(username)) {userExists = true;// 更新用户的 ID 和会话user.setId(session.getId());user.setSession(session);// 检查该用户是否有缓存消息searchCache(username, user);log.info("\n用户 {} 已更新", username);log.info("\n当前用户列表 {}", users);break;}}if (!userExists) {// 新建用户并存储到 users 中User user = new User(session.getId(), username, session);users.put(session.getId(), user);log.info("\n用户 {} 已连接", username);log.info("\n当前用户列表 {}", users);// 检查是否有缓存消息searchCache(username, user);}} else if (jsonNode.has("targetusername") && jsonNode.has("sendmessage")) {String targetusername = jsonNode.get("targetusername").asText();String sendmessage = jsonNode.get("sendmessage").asText();boolean userExists = false;// 查找目标用户,如果用户的session为空,则表示用户离线// 将通知存放到缓存消息列表中,如果不为空则表示用户在线,直接发送通知for (User user : users.values()) {if (user.getName().equals(targetusername)) {if (user.getSession() != null) {userExists = true;WebSocketUsers.sendMessageToUserByText(user.getSession(), sendmessage);}break;}}log.info("\n目标用户 {}", targetusername);log.info("\n发送消息 {}", sendmessage);log.info("\n当前用户列表 {}", users);if (!userExists) {// 如果目标用户不在线,则缓存通知messageCache.computeIfAbsent(targetusername, k -> new CopyOnWriteArrayList<String>()).add(sendmessage);log.info("\n消息已缓存,等待用户 {} 上线", targetusername);}}} catch (Exception e) {// 如果解析失败,则按普通消息处理String msg = message.replace("你", "我").replace("吗", "");WebSocketUsers.sendMessageToUserByText(session, msg);}}/*** 检查一个用户的缓存通知并发送** @param username 用户名* @param user     用户对象*/public void searchCache(String username, User user) {if (messageCache.containsKey(username)) {CopyOnWriteArrayList<String> cachedMessages = messageCache.get(username);for (String cachedMessage : cachedMessages) {WebSocketUsers.sendMessageToUserByText(user.getSession(), cachedMessage);log.info("\n向用户 {} 发送缓存消息 - {}", username, cachedMessage);}// 清理缓存通知messageCache.remove(username);log.info("\n已清理用户 {} 的缓存消息", username);}}
}

4. WebSocketUsers

package com.lxsy.framework.config.websocket;import jakarta.websocket.Session;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** websocket 客户端用户集*/
@Slf4j
public class WebSocketUsers {/*** 用户集*/private static Map<String, Session> USERS = new ConcurrentHashMap<>();/*** 存储用户*/public static void put(String key, Session session) {USERS.put(key, session);}/*** 移除用户*/public static boolean remove(Session session) {String key = null;boolean flag = USERS.containsValue(session);if (flag) {Set<Map.Entry<String, Session>> entries = USERS.entrySet();for (Map.Entry<String, Session> entry : entries) {Session value = entry.getValue();if (value.equals(session)) {key = entry.getKey();break;}}} else {return true;}return remove(key);}/*** 移出用户* @param key 键*/public static boolean remove(String key) {//log.info("\n 正在移出用户 - {}", key);Session remove = USERS.remove(key);if (remove != null) {boolean containsValue = USERS.containsValue(remove);//log.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");return containsValue;} else {return true;}}/*** 获取在线用户列表* @return 返回用户集合*/public static Map<String, Session> getUsers() {return USERS;}/*** 群发消息文本消息* @param message 消息内容*/public static void sendMessageToUsersByText(String message) {Collection<Session> values = USERS.values();for (Session value : values) {sendMessageToUserByText(value, message);}}/*** 发送文本消息*/public static void sendMessageToUserByText(Session session, String message) {if (session != null) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {log.error("\n[发送消息异常]", e);}} else {//log.info("\n[你已离线]");}}
}
http://www.dtcms.com/a/406465.html

相关文章:

  • 开源 C# 快速开发(三)复杂控件
  • Visual Studio使用C++配置OpenCV环境,同时添加模板以4.12为例
  • JUnit 4 + Spring Boot 测试依赖
  • HTML应用指南:利用POST请求获取全国索尼体验型零售店位置信息
  • html网站源码 html网页模板下载
  • 做网站接广告了解基本的php wordpress
  • 房地产手机网站模板网站推广公司ihanshi
  • 推荐一个网站
  • 前端可视化第一章:PixiJS入门指南
  • 时间序列分析新视角:单变量预训练 多变量微调
  • coqui-ai/TTS 安装
  • linux命令dd单刷镜像文件
  • 奔驰押注中国AI,国产大模型上车
  • 笔记(C++篇)—— Day 11
  • Cursor推出全新文档中心:甚至提供详细的中文版本
  • 选择合肥网站建设html的基本结构
  • Linux文件系统调用详解:底层操作到高级应用
  • 基于51单片机的供电保护系统
  • 网站建设技术交流制作公司网页价钱
  • 前端Bug实录:为什么表格筛选条件在刷新时神秘消失?
  • 关于做视频网站的一些代码网站备案号是什么样子
  • 专业定制网站开发上海手机网站建设价格
  • 《postman》软件下载_《postman》安装包下载_《postman》安装教程下载_《postman》网盘下载
  • 双模更超模!飞利浦双模办公娱乐显示器27E2N5900RW优雅登场!
  • TDengine 聚合函数 HYPERLOGLOG 用户手册
  • 威海网站优化公司济南简单的网站制作
  • 书法网站开发的前景西部数码网站管理助手2
  • 使用rabbitmq发送消息时消息体转换报错
  • rabbitmq分布式事务
  • vue动态插槽 #[i] 和 v-slot:[i] 对于Prettier的区别