当前位置: 首页 > news >正文

java每日精进 5.11【WebSocket】

1.纯Websocket实现消息发送

1.1一对一发送

前端

  1. 用户在输入框输入消息内容(sendText)

  2. 选择特定接收用户(sendUserId)

  3. 点击发送按钮触发handlerSend方法

  4. 构造消息内容JSON:

    {text: "Hello", // 消息内容toUserId: 123   // 目标用户ID
    }
  5. 包装为WebSocket标准格式:

    {type: "demo-message-send", // 消息类型content: '{"text":"Hello","toUserId":123}' // 字符串化的内容
    }
  6. 通过send()方法发送

  • 前端在setup函数中,使用useWebSocket方法,根据server变量(WebSocket 服务地址)建立连接。server地址由VITE_BASE_URL(环境变量)、/infra/ws路径和token(通过getRefreshToken获取)组成。
  • 设置autoReconnecttrue,表示自动重连;heartbeattrue,表示开启心跳机制。
  • 当用户在前端输入消息并点击发送按钮时,handlerSend函数被调用。
  • 首先将发送内容sendText和接收用户sendUserId进行 JSON 化处理,构建消息内容messageContent
  • 然后将消息类型typedemo-message-send)和消息内容messageContent再次 JSON 化,形成最终的消息jsonMessage
  • 最后使用send函数将jsonMessage发送到后端。
const server = ref((import.meta.env.VITE_BASE_URL + '/infra/ws').replace('http', 'ws') +'?token=' +getRefreshToken() // 使用 getRefreshToken() 方法,而不使用 getAccessToken() 方法的原因:WebSocket 无法方便的刷新访问令牌
) // WebSocket 服务地址
const getIsOpen = computed(() => status.value === 'OPEN') // WebSocket 连接是否打开
const getTagColor = computed(() => (getIsOpen.value ? 'success' : 'red')) // WebSocket 连接的展示颜色/** 发起 WebSocket 连接 */
const { status, data, send, close, open } = useWebSocket(server.value, {autoReconnect: true,heartbeat: true
})
/** 发送消息 */
const sendText = ref('') // 发送内容
const sendUserId = ref('') // 发送人
const handlerSend = () => {// 1.1 先 JSON 化 message 消息内容const messageContent = JSON.stringify({text: sendText.value,toUserId: sendUserId.value})// 1.2 再 JSON 化整个消息const jsonMessage = JSON.stringify({type: 'demo-message-send',content: messageContent})// 2. 最后发送消息send(jsonMessage)sendText.value = ''
}

后端

  • 注册监听器DemoWebSocketMessageListener 类通过实现 WebSocketMessageListener<DemoSendMessage> 接口,并使用 @Component 注解将自己注册为 Spring Bean。框架启动时会扫描所有实现了该接口的 Bean,并将它们注册到消息处理器中。
/*** WebSocket 示例:单发消息*/
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {@Resourceprivate WebSocketMessageSender webSocketMessageSender;@Overridepublic void onMessage(WebSocketSession session, DemoSendMessage message) {Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);// 情况一:单发if (message.getToUserId() != null) {DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true);webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 给指定用户"demo-message-receive", toMessage);return;}// 情况二:群发DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(false);webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 给所有用户"demo-message-receive", toMessage);}@Overridepublic String getType() {return "demo-message-send";}}
  • 消息类型绑定getType() 方法返回 "demo-message-send",这表明该监听器专门处理类型为 "demo-message-send" 的消息。当后端接收到消息时,会根据消息类型路由到对应的监听器。

当 WebSocket 服务器接收到消息后:

  1. 消息解析:框架首先解析消息的 JSON 格式,提取 type 字段(如 "demo-message-send")。
  2. 类型匹配:后端框架会自动将 type 为 "demo-message-send" 的消息路由到 DemoWebSocketMessageListener 的 onMessage 方法。
  3. 调用回调:将消息反序列化为 DemoSendMessage 对象,并调用监听器的 onMessage 方法。
/*** JSON 格式 {@link WebSocketHandler} 实现类* 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {/*** type 与 WebSocketMessageListener 的映射* 用于存储不同消息类型对应的监听器,键为消息类型,值为对应的监听器实例*/private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();@SuppressWarnings({"rawtypes", "unchecked"})public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {// 遍历传入的监听器列表listenersList.forEach((Consumer<WebSocketMessageListener>)listener -> {// 将监听器的类型(通过 getType() 方法获取)作为键,监听器实例作为值,存入 listeners 映射中listeners.put(listener.getType(), listener);});}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 1.1 空消息,跳过// 如果消息的负载长度为 0,说明是一个空消息,直接返回,不进行后续处理if (message.getPayloadLength() == 0) {return;}// 1.2 ping 心跳消息,直接返回 pong 消息。// 如果消息的负载长度为 4 且负载内容为 "ping",则向客户端发送 "pong" 消息,表示响应心跳if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {session.sendMessage(new TextMessage("pong"));return;}// 2.1 解析消息try {// 将文本消息的负载解析为 JsonWebSocketMessage 对象JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);// 如果解析后的消息为空,记录错误日志并返回,不进行后续处理if (jsonMessage == null) {log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());return;}// 如果解析后的消息类型为空,记录错误日志并返回,不进行后续处理if (StrUtil.isEmpty(jsonMessage.getType())) {log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());return;}// 2.2 获得对应的 WebSocketMessageListener// 根据消息类型从 listeners 映射中获取对应的监听器WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());// 如果没有找到对应的监听器,记录错误日志并返回,不进行后续处理if (messageListener == null) {log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());return;}// 2.3 处理消息// 获取监听器泛型参数类型Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);// 将消息内容解析为对应类型的对象Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);// 获取当前会话的租户 IDLong tenantId = WebSocketFrameworkUtils.getTenantId(session);// 执行租户相关的操作,调用监听器的 onMessage 方法处理消息TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));} catch (Throwable ex) {// 如果在处理消息过程中发生异常,记录错误日志log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());}}}

WebSocketMessageListener 之所以能监听消息,是因为:

  1. 接口契约:实现 WebSocketMessageListener 接口并指定消息类型(getType())。
  2. 框架支持:Spring 框架自动扫描并注册监听器,实现消息的解析和分发。
  3. 类型匹配:前端发送的消息 type 与后端监听器的 getType() 一致,触发回调。

这个过程类似于 HTTP 请求的路由机制,只不过 WebSocket 是长连接,需要持续监听消息。

通常,WebSocket 框架(如 Spring WebSocket)会提供以下核心组件:

  • 消息解码器:将二进制数据转换为 Java 对象(如 DemoSendMessage)。
  • 消息路由器:根据消息类型将消息路由到对应的监听器。
  • 会话管理器:维护所有 WebSocket 会话(WebSocketSession),并提供获取用户信息的工具(如 WebSocketFrameworkUtils.getLoginUserId)。
  • 后端的DemoWebSocketMessageListener类实现了WebSocketMessageListener接口的onMessage方法。
  • 当有消息到达时,onMessage方法被调用,从WebSocketSession中获取登录用户 ID(fromUserId)。
  • 根据消息中的toUserId判断是单发还是群发:
    • 如果toUserId不为空,则创建DemoReceiveMessage对象,设置fromUserIdtextsingletrue,通过webSocketMessageSendersendObject方法将消息发送给指定用户。
    • 如果toUserId为空,则创建DemoReceiveMessage对象,设置fromUserIdtextsinglefalse,通过webSocketMessageSendersendObject方法将消息发送给所有用户。
  1. JsonWebSocketMessageHandler接收并解析消息

  2. 根据type="demo-message-send"找到DemoWebSocketMessageListener

  3. 调用onMessage方法:

    • 从Session中获取发送者ID(fromUserId)

    • 检查message.getToUserId()不为null,进入单发逻辑

  4. 构造响应消息:

    new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true)
  5. 通过webSocketMessageSender发送给指定用户:

    webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 用户类型message.getToUserId(),         // 目标用户ID"demo-message-receive",       // 消息类型toMessage                    // 消息内容
    )

实际示例:

  • 用户A(ID:100)发送"下午开会"给用户B(ID:101)

  • 前端发送:

    {"type":"demo-message-send","content":"{\"text\":\"下午开会\",\"toUserId\":101}"}
  • 后端处理后发送给用户B:

    {"type":"demo-message-receive","content":"{\"fromUserId\":100,\"text\":\"下午开会\",\"single\":true}"}

1.2一对多发送

前端

  1. 用户在输入框输入消息内容(sendText)

  2. 不选择特定用户(或选择"所有人")

  3. 点击发送按钮触发handlerSend方法

  4. 构造消息内容JSON:

    {text: "系统维护通知", // 消息内容toUserId: ""      // 空表示群发
    }
  5. 包装为WebSocket标准格式并发送

后端

  1. 同上接收解析流程

  2. onMessage方法中检查message.getToUserId()为null,进入群发逻辑

  3. 构造响应消息:

    new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(false)
  4. 通过webSocketMessageSender发送给所有用户:

    webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 用户类型"demo-message-receive",       // 消息类型toMessage                    // 消息内容
    )

实际示例:

  • 管理员发送"系统即将升级"给所有用户

  • 前端发送:

    {"type":"demo-message-send","content":"{\"text\":\"系统即将升级\",\"toUserId\":\"\"}"}

  • 后端处理后广播:

    {"type":"demo-message-receive","content":"{\"fromUserId\":1,\"text\":\"系统即将升级\",\"single\":false}"}

相关文章:

  • Python爬虫第20节-使用 Selenium 爬取小米商城空调商品
  • 运用数组和矩阵对数据进行存取和运算——NumPy模块 之四
  • 现代化水库运行管理矩阵平台如何建设?
  • 【Linux篇章】Linux 进程信号2:解锁系统高效运作的 “隐藏指令”,开启性能飞跃新征程(精讲捕捉信号及OS运行机制)
  • 【文心智能体】使用文心一言来给智能体设计一段稳定调用工作流的提示词
  • 《数据结构初阶》【堆 + 堆排序 + TOP-K】
  • C++色彩博弈的史诗:红黑树
  • AI时代还需要目视解译吗?——目视解译详解
  • 横向移动(上)
  • [特殊字符] 本地大模型编程实战(29):用大语言模型LLM查询图数据库NEO4J(2)
  • OpenHarmony 开源鸿蒙南向开发——linux下使用make交叉编译第三方库——nettle库
  • 键盘输出希腊字符方法
  • Neo4j 入门级使用
  • CSS3 伪类和使用场景
  • 【番外】02:Windows 编译带 DNN_CUDA 功能的 OpenCV 动态链接库
  • 系统架构设计(五):构件
  • 69、微服务保姆教程(十二)容器化与云原生
  • 【C#】ToArray的使用
  • 当 AI 邂逅丝路:揭秘「丝路智旅」,用 RAG 重塑中阿文化旅游体验
  • springboot旅游小程序-计算机毕业设计源码76696
  • 干部任职公示:陕西宁强、镇安两县县长拟进一步使用
  • 中美经贸高层会谈在日内瓦结束,中国代表团将举行发布会
  • 婚姻登记“全国通办”首日观察:数据多跑路,群众少跑腿
  • 咸宁市委常委、市纪委书记官书云调任湖北省司法厅副厅长
  • 国常会:研究深化国家级经济技术开发区改革创新有关举措等
  • 罕见沙尘再度入川,官方:沙尘传输高度达到平流层,远超以往