车辆保险网站厦门关键词优化平台
基于 SpringWebSocket 进行二次封装,实现了更加简单的使用方式。例如说,WebSocket 的认证、Session 的管理、WebSocket 集群的消息广播等等。
1. 用户认证与登录用户信息传递
1.1 Token 过滤器 (TokenAuthenticationFilter
)
① 在 WebSocket 连接建立时,通过 QueryString 的 token
参数,进行认证。例如说:ws://127.0.0.1:48080/ws?token=xxx
。
/*** Token 过滤器,验证 token 的有效性* 验证通过后,获得 {@link LoginUser} 信息,并加入到 Spring Security 上下文*/
@RequiredArgsConstructor
@Slf4j
public class TokenAuthenticationFilter extends OncePerRequestFilter {private final SecurityProperties securityProperties;private final GlobalExceptionHandler globalExceptionHandler;private final OAuth2TokenApi oauth2TokenApi;@Override@SuppressWarnings("NullableProblems")protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)throws ServletException, IOException {// 情况一,基于 header[login-user] 获得用户,例如说来自 Gateway 或者其它服务透传LoginUser loginUser = buildLoginUserByHeader(request);// 情况二,基于 Token 获得用户// 注意,这里主要满足直接使用 Nginx 直接转发到 Spring Cloud 服务的场景。if (loginUser == null) {String token = SecurityFrameworkUtils.obtainAuthorization(request,securityProperties.getTokenHeader(), securityProperties.getTokenParameter());if (StrUtil.isNotEmpty(token)) {Integer userType = WebFrameworkUtils.getLoginUserType(request);try {// 1.1 基于 token 构建登录用户loginUser = buildLoginUserByToken(token, userType);// 1.2 模拟 Login 功能,方便日常开发调试if (loginUser == null) {loginUser = mockLoginUser(request, token, userType);}} catch (Throwable ex) {CommonResult<?> result = globalExceptionHandler.allExceptionHandler(request, ex);ServletUtils.writeJSON(response, result);return;}}}// 设置当前用户if (loginUser != null) {SecurityFrameworkUtils.setLoginUser(loginUser, request);}// 继续过滤链chain.doFilter(request, response);}private LoginUser buildLoginUserByToken(String token, Integer userType) {try {// 校验访问令牌OAuth2AccessTokenCheckRespDTO accessToken = oauth2TokenApi.checkAccessToken(token).getCheckedData();if (accessToken == null) {return null;}// 用户类型不匹配,无权限// 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比对用户类型// 类似 WebSocket 的 /ws/* 连接地址,是不需要比对用户类型的if (userType != null&& ObjectUtil.notEqual(accessToken.getUserType(), userType)) {throw new AccessDeniedException("错误的用户类型");}// 构建登录用户return new LoginUser().setId(accessToken.getUserId()).setUserType(accessToken.getUserType()).setInfo(accessToken.getUserInfo()) // 额外的用户信息.setTenantId(accessToken.getTenantId()).setScopes(accessToken.getScopes()).setExpiresTime(accessToken.getExpiresTime());} catch (ServiceException serviceException) {// 校验 Token 不通过时,考虑到一些接口是无需登录的,所以直接返回 null 即可return null;}}/*** 模拟登录用户,方便日常开发调试* @param request 请求* @param token 模拟的 token,格式为 {@link SecurityProperties#getMockSecret()} + 用户编号* @param userType 用户类型* @return 模拟的 LoginUser*/private LoginUser mockLoginUser(HttpServletRequest request, String token, Integer userType) {if (!securityProperties.getMockEnable()) {return null;}// 必须以 mockSecret 开头if (!token.startsWith(securityProperties.getMockSecret())) {return null;}// 构建模拟用户Long userId = Long.valueOf(token.substring(securityProperties.getMockSecret().length()));return new LoginUser().setId(userId).setUserType(userType).setTenantId(WebFrameworkUtils.getTenantId(request));}@SneakyThrowsprivate LoginUser buildLoginUserByHeader(HttpServletRequest request) {String loginUserStr = request.getHeader(SecurityFrameworkUtils.LOGIN_USER_HEADER);if (StrUtil.isEmpty(loginUserStr)) {return null;}try {loginUserStr = URLDecoder.decode(loginUserStr, StandardCharsets.UTF_8.name()); // 解码,解决中文乱码问题return JsonUtils.parseObject(loginUserStr, LoginUser.class);} catch (Exception ex) {log.error("[buildLoginUserByHeader][解析 LoginUser({}) 发生异常]", loginUserStr, ex); ;throw ex;}}}
-
作用:在 WebSocket 握手之前,验证用户的 Token 有效性,并将登录用户信息(
LoginUser
)设置到 Spring Security 上下文中。 -
流程:
-
从请求头或请求参数中获取 Token。
-
调用
OAuth2TokenApi
验证 Token 的有效性。 -
如果 Token 有效,构建
LoginUser
对象,并将其设置到 Spring Security 上下文中。 -
如果 Token 无效,返回错误响应。
-
-
关键点:
-
支持从请求头或请求参数中获取 Token。
-
支持模拟登录用户(用于开发环境)。
-
支持用户类型(
userType
)的校验。
-
1.2 WebSocket 握手拦截器 (LoginUserHandshakeInterceptor
)
/*** 登录用户的 {@link HandshakeInterceptor} 实现类** 流程如下:* 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过* 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中*/
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) {LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();if (loginUser != null) {WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);}return true;}@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Exception exception) {// do nothing}}
-
作用:在 WebSocket 握手时,将 Spring Security 上下文中的
LoginUser
信息传递到 WebSocket 会话中。 -
流程:
-
在握手之前,从 Spring Security 上下文中获取
LoginUser
。 -
将
LoginUser
设置到 WebSocket 会话的attributes
中。
-
-
关键点:
-
通过
WebSocketFrameworkUtils.setLoginUser
将用户信息存储到会话中。
-
2. WebSocket 会话管理
每个前端和后端建立的 WebSocket 连接,对应后端的一个 WebSocketSession 会话对象。由于后续需要对 WebSocketSession 进行消息的发送,所以需要进行管理。
2.1 WebSocket 会话管理器 (WebSocketSessionManager
)
/*** 默认的 {@link WebSocketSessionManager} 实现类*/
public class WebSocketSessionManagerImpl implements WebSocketSessionManager {/*** id 与 WebSocketSession 映射** key:Session 编号*/private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();/*** user 与 WebSocketSession 映射** key1:用户类型* key2:用户编号*/private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions= new ConcurrentHashMap<>();@Overridepublic void addSession(WebSocketSession session) {// 添加到 idSessions 中idSessions.put(session.getId(), session);// 添加到 userSessions 中LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);if (user == null) {return;}ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());if (userSessionsMap == null) {userSessionsMap = new ConcurrentHashMap<>();if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {userSessionsMap = userSessions.get(user.getUserType());}}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());if (sessions == null) {sessions = new CopyOnWriteArrayList<>();if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {sessions = userSessionsMap.get(user.getId());}}sessions.add(session);}@Overridepublic void removeSession(WebSocketSession session) {// 移除从 idSessions 中idSessions.remove(session.getId());// 移除从 idSessions 中LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);if (user == null) {return;}ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());if (userSessionsMap == null) {return;}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());sessions.removeIf(session0 -> session0.getId().equals(session.getId()));if (CollUtil.isEmpty(sessions)) {userSessionsMap.remove(user.getId(), sessions);}}@Overridepublic WebSocketSession getSession(String id) {return idSessions.get(id);}@Overridepublic Collection<WebSocketSession> getSessionList(Integer userType) {ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);if (CollUtil.isEmpty(userSessionsMap)) {return new ArrayList<>();}LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容Long contextTenantId = TenantContextHolder.getTenantId();for (List<WebSocketSession> sessions : userSessionsMap.values()) {if (CollUtil.isEmpty(sessions)) {continue;}// 特殊:如果租户不匹配,则直接排除if (contextTenantId != null) {Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));if (!contextTenantId.equals(userTenantId)) {continue;}}result.addAll(sessions);}return result;}@Overridepublic Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);if (CollUtil.isEmpty(userSessionsMap)) {return new ArrayList<>();}CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();}}
存储 WebSocket 会话
- 维护一个
idSessions
集合,用于通过 Session ID 存储和查找WebSocketSession
。- 维护一个
userSessions
集合,用于根据 用户类型 和 用户 ID 存储WebSocketSession
,支持多端会话管理。添加会话 (
addSession
)
- 将
session
存入idSessions
,以 Session ID 作为键。- 获取
session
关联的LoginUser
信息(用户类型 & 用户 ID)。- 在
userSessions
中,以 用户类型 和 用户 ID 作为键,存储WebSocketSession
。移除会话 (
removeSession
)
- 从
idSessions
移除session
。- 从
userSessions
移除session
:
- 先找到该
session
所属的用户类型 & 用户 ID 。- 在对应的
CopyOnWriteArrayList<WebSocketSession>
中删除该session
。- 如果该用户的所有
session
都被移除,则删除该用户的映射。获取单个会话 (
getSession
)
- 通过
idSessions.get(id)
直接获取 WebSocket 连接。获取某用户类型的所有会话 (
getSessionList(Integer userType)
)
- 获取
userSessions
中属于userType
的所有session
。- 如果存在租户 ID(多租户逻辑),则排除不同租户的
session
。获取某个用户的所有会话 (
getSessionList(Integer userType, Long userId)
)
- 获取特定用户 ID 关联的
session
集合。
-
作用:管理所有 WebSocket 会话,支持按用户类型、用户编号等条件查询会话。
-
实现类:
WebSocketSessionManagerImpl
-
数据结构:
-
idSessions
:存储所有会话,键为会话 ID,值为WebSocketSession
。 -
userSessions
:按用户类型和用户编号存储会话,键为用户类型和用户编号,值为会话列表。
-
-
功能:
-
添加会话:将新会话添加到
idSessions
和userSessions
中。 -
移除会话:从
idSessions
和userSessions
中移除会话。 -
查询会话:支持按会话 ID、用户类型、用户编号查询会话。
-
2.2 WebSocket 会话装饰器 (WebSocketSessionHandlerDecorator
)
/*** {@link WebSocketHandler} 的装饰类,实现了以下功能:** 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理* 2. 封装 {@link WebSocketSession} 支持并发操作*/
public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {/*** 发送时间的限制,单位:毫秒*/private static final Integer SEND_TIME_LIMIT = 1000 * 5;/*** 发送消息缓冲上线,单位:bytes*/private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;private final WebSocketSessionManager sessionManager;public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,WebSocketSessionManager sessionManager) {super(delegate);this.sessionManager = sessionManager;}@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);// 添加到 WebSocketSessionManager 中sessionManager.addSession(session);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {sessionManager.removeSession(session);}}
管理 WebSocket 连接
- 当
WebSocketSession
建立连接 时,使用sessionManager.addSession(session)
进行管理。- 当
WebSocketSession
关闭连接 时,使用sessionManager.removeSession(session)
进行清理。增强
WebSocketSession
,支持并发操作
- 使用
ConcurrentWebSocketSessionDecorator
包装session
,确保在高并发环境下 避免阻塞或数据丢失。- 限制消息发送时间(最大 5 秒)。
- 限制消息缓冲区大小(最大 100 KB)。
-
作用:在 WebSocket 连接建立和关闭时,调用
WebSocketSessionManager
管理会话。 -
流程:
-
在连接建立时,将
WebSocketSession
包装为ConcurrentWebSocketSessionDecorator
,支持并发操作。 -
将包装后的会话添加到
WebSocketSessionManager
中。 -
在连接关闭时,从
WebSocketSessionManager
中移除会话。
-
-
关键点:
-
使用
ConcurrentWebSocketSessionDecorator
解决 WebSocket 并发问题。
-
3. WebSocket 消息处理
3.1 JSON 消息格式 (JsonWebSocketMessage
)
/*** JSON 格式的 WebSocket 消息帧*/
@Data
public class JsonWebSocketMessage implements Serializable {/*** 消息类型** 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类*/private String type;/*** 消息内容** 要求 JSON 对象*/private String content;}
-
作用:定义 WebSocket 消息的 JSON 格式。
-
字段:
-
type
:消息类型,用于分发到对应的消息监听器。 -
content
:消息内容,为 JSON 字符串。
-
3.2 JSON 消息处理器 (JsonWebSocketMessageHandler
)
这个类 继承 TextWebSocketHandler
,用于处理 WebSocket 文本消息(JSON 格式),并且 基于消息类型 type
,将消息分发给对应的 WebSocketMessageListener
进行处理。
/*** JSON 格式 {@link WebSocketHandler} 实现类** 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {/*** type 与 WebSocketMessageListener 的映射*/private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();@SuppressWarnings({"rawtypes", "unchecked"})public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {listenersList.forEach((Consumer<WebSocketMessageListener>)listener -> listeners.put(listener.getType(), listener));}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// 1.1 空消息,跳过if (message.getPayloadLength() == 0) {return;}// 1.2 ping 心跳消息,直接返回 pong 消息。if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {session.sendMessage(new TextMessage("pong"));return;}// 2.1 解析消息try {JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);if (jsonMessage == null) {log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());return;}if (StrUtil.isEmpty(jsonMessage.getType())) {log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());return;}// 2.2 获得对应的 WebSocketMessageListenerWebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());if (messageListener == null) {log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());return;}// 2.3 处理消息Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);Long tenantId = WebSocketFrameworkUtils.getTenantId(session);TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));} catch (Throwable ex) {log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());}}}
-
处理客户端发送的 JSON 格式消息,并根据消息类型分发到对应的消息监听器。
-
流程:
-
解析客户端发送的消息为
JsonWebSocketMessage
。 -
根据
type
字段查找对应的消息监听器(WebSocketMessageListener
)。 -
将消息内容反序列化为监听器所需的类型,并调用监听器的
onMessage
方法。
-
-
关键点:
-
支持心跳消息(
ping
->pong
)。 -
支持多租户(通过
TenantUtils.execute
设置当前租户上下文)。
-
4. WebSocket 全流程
4.1 连接建立流程
-
客户端发起 WebSocket 连接请求,携带 Token。
-
TokenAuthenticationFilter
验证 Token,并将LoginUser
设置到 Spring Security 上下文中。 -
LoginUserHandshakeInterceptor
在握手时,将LoginUser
设置到 WebSocket 会话的attributes
中。 -
WebSocketSessionHandlerDecorator
在连接建立时,将会话添加到WebSocketSessionManager
中。
4.2 消息处理流程
-
客户端发送 JSON 格式的消息。
-
JsonWebSocketMessageHandler
解析消息,并根据type
字段查找对应的消息监听器。 -
消息监听器处理消息,并执行相应的业务逻辑。
4.3 连接关闭流程
-
客户端断开 WebSocket 连接。
-
WebSocketSessionHandlerDecorator
在连接关闭时,从WebSocketSessionManager
中移除会话。
5. 关键设计
-
用户认证:通过
TokenAuthenticationFilter
和LoginUserHandshakeInterceptor
实现用户信息的传递和验证。 -
会话管理:通过
WebSocketSessionManager
统一管理所有 WebSocket 会话,支持按用户类型和用户编号查询。 -
消息分发:通过
JsonWebSocketMessageHandler
实现消息的解析和分发,支持多租户和并发处理。 -
扩展性:通过
WebSocketMessageListener
接口,支持灵活扩展消息处理逻辑。
6. 总结
上述WebSocket 应用,包括用户认证、会话管理、消息处理等功能。通过分层设计和模块化实现,代码具有良好的扩展性和可维护性。适用于需要 WebSocket 支持的实时通信场景,如在线聊天、实时通知等。