WebSocket细谈
WebSocket 全景笔记(Spring 原生版)
覆盖配置、握手、收发、推送、安全、性能一条线
基于代码 → WebSocketConfig
+ WebSocketInterceptor
+ UnifiedWebSocketHandler
一、核心角色
组件 | 作用 |
---|---|
WebSocketConfig | 注册 URL 路径与处理器、CORS、拦截器 |
HandshakeInterceptor | 握手阶段鉴权(HTTP 层),失败直接 401 |
TextWebSocketHandler | 生命周期管理 + 业务消息路由 + 向后推送 |
二、配置类(总闸门)
@Configuration
@EnableWebSocket // ① 开启模块
public class WebSocketConfig implements WebSocketConfigurer {public void registerWebSocketHandlers(WebSocketHandlerRegistry r) {r.addHandler(unifiedHandler(), "/ws") // 路径.addInterceptors(handshakeInterceptor()) // 鉴权.setAllowedOrigins("https://xiaoyua.com"); // 生产收窄}
}
可注册 多条路径 指向同一个处理器(兼容旧版)
少了
@EnableWebSocket
→ 404
三、握手拦截器(HTTP 阶段)
public class WebSocketInterceptor implements HandshakeInterceptor {boolean beforeHandshake(ServerHttpRequest req, ServerHttpResponse resp,WebSocketHandler ws, Map<String,Object> attributes){// 1. 三处取 tokenString token = req.getHeaders().getFirst("Authorization"); // Bearerif (token == null) token = extractFromSubProtocol(req); // Sec-WebSocket-Protocolif (token == null) token = extractFromQuery(req); // ?token=xxx// 2. 验签 → 失败直接返回 false → 浏览器收到 401,不会升级 TCPLong userId = jwtUtil.getUserIdFromToken(token);if (userId == null) return false;// 3. 把结果塞进 attributes,后续处理器随时拿attributes.put("userId", userId);return true;}
}
from 字段来源:attributes.get("userId")
就是发送者,无需客户端再传。
四、处理器生命周期
public class UnifiedWebSocketHandler extends TextWebSocketHandler {/* 连接建立 */afterConnectionEstablished(session){Long userId = (Long) session.getAttributes().get("userId");USER_SESSIONS.put(userId, session); // 内存快表redisTemplate.setEx("user:online:" + userId, 30min, "1");sendMessageToUser(userId, Map.of("type","connected"));}/* 收到文本消息 */handleMessage(session, TextMessage msg){Map<String,Object> data = objectMapper.readValue(msg.getPayload());switch ((String) data.get("type")){case "ping" -> handlePing(userId);case "heartbeat" -> handleHeartbeat(userId);case "send_message"-> handleSendMessage(userId, data); // 业务default -> sendError(userId, "不支持类型");}}/* 连接关闭 */afterConnectionClosed(session, status){USER_SESSIONS.remove(userId);redisTemplate.delete("user:online:" + userId);}
}
单线程模型 → 无需自己加锁,但禁止阻塞 IO(可扔线程池)
支持部分二进制 → 重写
handleBinaryMessage
五、向后推送(任意位置可调用)
public boolean sendMessageToUser(Long userId, Object payload){WebSocketSession session = USER_SESSIONS.get(userId);if (session==null || !session.isOpen()) return false;try{String json = objectMapper.writeValueAsString(payload);session.sendMessage(new TextMessage(json));return true;}catch (IOException e){// 失败时清理无效会话,防止内存泄漏USER_SESSIONS.remove(userId);redisTemplate.delete("user:online:" + userId);return false;}
}
线程安全:
ConcurrentHashMap
+ 快速失败删除失败即清理:避免“僵尸连接”占坑
六、客户端主动发私信流程(代码已实现)
前端发送
{"type":"send_message","to_id":123,"content":"你好","temp_id":"uuid"}
处理器解析 → 调现有 MQ 业务(离线、重试、格式统一)→ 不直接推送
回包给发送者
{"type":"message_sent","temp_id":"uuid","message_id":456,"from_user_id":789,"status":"success","timestamp":...}
→ from_user_id 已补,前端可直接显示“我发的”。
七、安全与性能
话题 | 做法 |
---|---|
CORS | 生产 setAllowedOrigins("https://xxx.com") |
消息大小 | configureWebSocketTransport(r -> r.setMaxMessageSize(512KB)) |
心跳保活 | 前端每 30s 发 {type:"heartbeat"} ,后端刷新 Redis TTL |
批量推送 | 把 sendMessageToUser 包装成 batchSend(List<Long> userIds, payload) |
集群扩容 | 推消息时先查 Redis 在线节点,再通过 MQ 广播到对应节点再推 |
八、前端最小示例
const token = localStorage.getItem("token");
const ws = new WebSocket(`wss://xiaoyua.com/ws?token=${token}`);ws.onopen = () => ws.send(JSON.stringify({type:"heartbeat"}));
ws.onmessage = (e) => {const msg = JSON.parse(e.data);if (msg.type === "private_message") {console.log(`来自 ${msg.from_user_id}: ${msg.content}`);}
};
ws.onclose = () => console.log("断开");
九、一句话速记
“配置类注册路径,拦截器握手鉴权,处理器管生命周期,
推送调sendMessageToUser
,from 字段就是session.getAttributes("userId")
,
心跳保活,失败即清理,集群用 MQ 广播!”