当前位置: 首页 > news >正文

java每日精进 3.12 【WebSocket进阶】

基于 SpringWebSocket 进行二次封装,实现了更加简单的使用方式。例如说,WebSocket 的认证、Session 的管理、WebSocket 集群的消息广播等等。

1. 用户认证与登录用户信息传递

1.1 Token 过滤器 (TokenAuthenticationFilter)

① 在 WebSocket 连接建立时,通过 QueryString 的 token 参数,进行认证。例如说:ws://127.0.0.1:48080/ws?token=xxx

/**
 * Token 过滤器,验证 token 的有效性
 * 验证通过后,获得 {@link LoginUser} 信息,并加入到 Spring Security 上下文
 */
@RequiredArgsConstructor
@Slf4j
public class TokenAuthenticationFilter extends OncePerRequestFilter {

    private final SecurityProperties securityProperties;

    private final GlobalExceptionHandler globalExceptionHandler;

    private final OAuth2TokenApi oauth2TokenApi;

    @Override
    @SuppressWarnings("NullableProblems")
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
            throws ServletException, IOException {
        // 情况一,基于 header[login-user] 获得用户,例如说来自 Gateway 或者其它服务透传
        LoginUser loginUser = buildLoginUserByHeader(request);

        // 情况二,基于 Token 获得用户
        // 注意,这里主要满足直接使用 Nginx 直接转发到 Spring Cloud 服务的场景。
        if (loginUser == null) {
            String token = SecurityFrameworkUtils.obtainAuthorization(request,
                    securityProperties.getTokenHeader(), securityProperties.getTokenParameter());
            if (StrUtil.isNotEmpty(token)) {
                Integer userType = WebFrameworkUtils.getLoginUserType(request);
                try {
                    // 1.1 基于 token 构建登录用户
                    loginUser = buildLoginUserByToken(token, userType);
                    // 1.2 模拟 Login 功能,方便日常开发调试
                    if (loginUser == null) {
                        loginUser = mockLoginUser(request, token, userType);
                    }
                } catch (Throwable ex) {
                    CommonResult<?> result = globalExceptionHandler.allExceptionHandler(request, ex);
                    ServletUtils.writeJSON(response, result);
                    return;
                }
            }
        }

        // 设置当前用户
        if (loginUser != null) {
            SecurityFrameworkUtils.setLoginUser(loginUser, request);
        }
        // 继续过滤链
        chain.doFilter(request, response);
    }

    private LoginUser buildLoginUserByToken(String token, Integer userType) {
        try {
            // 校验访问令牌
            OAuth2AccessTokenCheckRespDTO accessToken = oauth2TokenApi.checkAccessToken(token).getCheckedData();
            if (accessToken == null) {
                return null;
            }
            // 用户类型不匹配,无权限
            // 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比对用户类型
            // 类似 WebSocket 的 /ws/* 连接地址,是不需要比对用户类型的
            if (userType != null
                    && ObjectUtil.notEqual(accessToken.getUserType(), userType)) {
                throw new AccessDeniedException("错误的用户类型");
            }
            // 构建登录用户
            return new LoginUser().setId(accessToken.getUserId()).setUserType(accessToken.getUserType())
                    .setInfo(accessToken.getUserInfo()) // 额外的用户信息
                    .setTenantId(accessToken.getTenantId()).setScopes(accessToken.getScopes())
                    .setExpiresTime(accessToken.getExpiresTime());
        } catch (ServiceException serviceException) {
            // 校验 Token 不通过时,考虑到一些接口是无需登录的,所以直接返回 null 即可
            return null;
        }
    }

    /**
     * 模拟登录用户,方便日常开发调试
     * @param request 请求
     * @param token 模拟的 token,格式为 {@link SecurityProperties#getMockSecret()} + 用户编号
     * @param userType 用户类型
     * @return 模拟的 LoginUser
     */
    private LoginUser mockLoginUser(HttpServletRequest request, String token, Integer userType) {
        if (!securityProperties.getMockEnable()) {
            return null;
        }
        // 必须以 mockSecret 开头
        if (!token.startsWith(securityProperties.getMockSecret())) {
            return null;
        }
        // 构建模拟用户
        Long userId = Long.valueOf(token.substring(securityProperties.getMockSecret().length()));
        return new LoginUser().setId(userId).setUserType(userType)
                .setTenantId(WebFrameworkUtils.getTenantId(request));
    }

    @SneakyThrows
    private LoginUser buildLoginUserByHeader(HttpServletRequest request) {
        String loginUserStr = request.getHeader(SecurityFrameworkUtils.LOGIN_USER_HEADER);
        if (StrUtil.isEmpty(loginUserStr)) {
            return null;
        }
        try {
            loginUserStr = URLDecoder.decode(loginUserStr, StandardCharsets.UTF_8.name()); // 解码,解决中文乱码问题
            return JsonUtils.parseObject(loginUserStr, LoginUser.class);
        } catch (Exception ex) {
            log.error("[buildLoginUserByHeader][解析 LoginUser({}) 发生异常]", loginUserStr, ex);  ;
            throw ex;
        }
    }

}
  • 作用:在 WebSocket 握手之前,验证用户的 Token 有效性,并将登录用户信息(LoginUser)设置到 Spring Security 上下文中。

  • 流程

    1. 从请求头或请求参数中获取 Token。

    2. 调用 OAuth2TokenApi 验证 Token 的有效性。

    3. 如果 Token 有效,构建 LoginUser 对象,并将其设置到 Spring Security 上下文中。

    4. 如果 Token 无效,返回错误响应。

  • 关键点

    • 支持从请求头或请求参数中获取 Token。

    • 支持模拟登录用户(用于开发环境)。

    • 支持用户类型(userType)的校验。

1.2 WebSocket 握手拦截器 (LoginUserHandshakeInterceptor)

/**
 * 登录用户的 {@link HandshakeInterceptor} 实现类
 *
 * 流程如下:
 * 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过
 * 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中
 */
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) {
        LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
        if (loginUser != null) {
            WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);
        }
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                               WebSocketHandler wsHandler, Exception exception) {
        // do nothing
    }

}
  • 作用:在 WebSocket 握手时,将 Spring Security 上下文中的 LoginUser 信息传递到 WebSocket 会话中。

  • 流程

    1. 在握手之前,从 Spring Security 上下文中获取 LoginUser

    2. 将 LoginUser 设置到 WebSocket 会话的 attributes 中。

  • 关键点

    • 通过 WebSocketFrameworkUtils.setLoginUser 将用户信息存储到会话中。

2. WebSocket 会话管理

每个前端和后端建立的 WebSocket 连接,对应后端的一个 WebSocketSession 会话对象。由于后续需要对 WebSocketSession 进行消息的发送,所以需要进行管理。

2.1 WebSocket 会话管理器 (WebSocketSessionManager)

/**
 * 默认的 {@link WebSocketSessionManager} 实现类
 */
public class WebSocketSessionManagerImpl implements WebSocketSessionManager {

    /**
     * id 与 WebSocketSession 映射
     *
     * key:Session 编号
     */
    private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();

    /**
     * user 与 WebSocketSession 映射
     *
     * key1:用户类型
     * key2:用户编号
     */
    private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions
            = new ConcurrentHashMap<>();

    @Override
    public void addSession(WebSocketSession session) {
        // 添加到 idSessions 中
        idSessions.put(session.getId(), session);
        // 添加到 userSessions 中
        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
        if (user == null) {
            return;
        }
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
        if (userSessionsMap == null) {
            userSessionsMap = new ConcurrentHashMap<>();
            if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {
                userSessionsMap = userSessions.get(user.getUserType());
            }
        }
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
        if (sessions == null) {
            sessions = new CopyOnWriteArrayList<>();
            if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {
                sessions = userSessionsMap.get(user.getId());
            }
        }
        sessions.add(session);
    }

    @Override
    public void removeSession(WebSocketSession session) {
        // 移除从 idSessions 中
        idSessions.remove(session.getId());
        // 移除从 idSessions 中
        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
        if (user == null) {
            return;
        }
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
        if (userSessionsMap == null) {
            return;
        }
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
        sessions.removeIf(session0 -> session0.getId().equals(session.getId()));
        if (CollUtil.isEmpty(sessions)) {
            userSessionsMap.remove(user.getId(), sessions);
        }
    }

    @Override
    public WebSocketSession getSession(String id) {
        return idSessions.get(id);
    }

    @Override
    public Collection<WebSocketSession> getSessionList(Integer userType) {
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
        if (CollUtil.isEmpty(userSessionsMap)) {
            return new ArrayList<>();
        }
        LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容
        Long contextTenantId = TenantContextHolder.getTenantId();
        for (List<WebSocketSession> sessions : userSessionsMap.values()) {
            if (CollUtil.isEmpty(sessions)) {
                continue;
            }
            // 特殊:如果租户不匹配,则直接排除
            if (contextTenantId != null) {
                Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));
                if (!contextTenantId.equals(userTenantId)) {
                    continue;
                }
            }
            result.addAll(sessions);
        }
        return result;
    }

    @Override
    public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
        if (CollUtil.isEmpty(userSessionsMap)) {
            return new ArrayList<>();
        }
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);
        return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();
    }

}
  • 存储 WebSocket 会话

    • 维护一个 idSessions 集合,用于通过 Session ID 存储和查找 WebSocketSession
    • 维护一个 userSessions 集合,用于根据 用户类型用户 ID 存储 WebSocketSession,支持多端会话管理。
  • 添加会话 (addSession)

    • session 存入 idSessions,以 Session ID 作为键。
    • 获取 session 关联的 LoginUser 信息(用户类型 & 用户 ID)。
    • userSessions 中,以 用户类型用户 ID 作为键,存储 WebSocketSession
  • 移除会话 (removeSession)

    • idSessions 移除 session
    • userSessions 移除 session
      • 先找到该 session 所属的用户类型 & 用户 ID 。
      • 在对应的 CopyOnWriteArrayList<WebSocketSession> 中删除该 session
      • 如果该用户的所有 session 都被移除,则删除该用户的映射。
  • 获取单个会话 (getSession)

    • 通过 idSessions.get(id) 直接获取 WebSocket 连接。
  • 获取某用户类型的所有会话 (getSessionList(Integer userType))

    • 获取 userSessions 中属于 userType 的所有 session
    • 如果存在租户 ID(多租户逻辑),则排除不同租户的 session
  • 获取某个用户的所有会话 (getSessionList(Integer userType, Long userId))

    • 获取特定用户 ID 关联的 session 集合。
  • 作用:管理所有 WebSocket 会话,支持按用户类型、用户编号等条件查询会话。

  • 实现类WebSocketSessionManagerImpl

  • 数据结构

    • idSessions:存储所有会话,键为会话 ID,值为 WebSocketSession

    • userSessions:按用户类型和用户编号存储会话,键为用户类型和用户编号,值为会话列表。

  • 功能

    • 添加会话:将新会话添加到 idSessions 和 userSessions 中。

    • 移除会话:从 idSessions 和 userSessions 中移除会话。

    • 查询会话:支持按会话 ID、用户类型、用户编号查询会话。

2.2 WebSocket 会话装饰器 (WebSocketSessionHandlerDecorator)

/**
 * {@link WebSocketHandler} 的装饰类,实现了以下功能:
 *
 * 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理
 * 2. 封装 {@link WebSocketSession} 支持并发操作
 */
public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {

    /**
     * 发送时间的限制,单位:毫秒
     */
    private static final Integer SEND_TIME_LIMIT = 1000 * 5;
    /**
     * 发送消息缓冲上线,单位:bytes
     */
    private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;

    private final WebSocketSessionManager sessionManager;

    public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,
                                            WebSocketSessionManager sessionManager) {
        super(delegate);
        this.sessionManager = sessionManager;
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        // 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149
        session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);
        // 添加到 WebSocketSessionManager 中
        sessionManager.addSession(session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
        sessionManager.removeSession(session);
    }

}

  • 管理 WebSocket 连接

    • WebSocketSession 建立连接 时,使用 sessionManager.addSession(session) 进行管理。
    • WebSocketSession 关闭连接 时,使用 sessionManager.removeSession(session) 进行清理。
  • 增强 WebSocketSession,支持并发操作

    • 使用 ConcurrentWebSocketSessionDecorator 包装 session,确保在高并发环境下 避免阻塞或数据丢失
    • 限制消息发送时间(最大 5 秒)。
    • 限制消息缓冲区大小(最大 100 KB)。
  • 作用:在 WebSocket 连接建立和关闭时,调用 WebSocketSessionManager 管理会话。

  • 流程

    1. 在连接建立时,将 WebSocketSession 包装为 ConcurrentWebSocketSessionDecorator,支持并发操作。

    2. 将包装后的会话添加到 WebSocketSessionManager 中。

    3. 在连接关闭时,从 WebSocketSessionManager 中移除会话。

  • 关键点

    • 使用 ConcurrentWebSocketSessionDecorator 解决 WebSocket 并发问题。

3. WebSocket 消息处理

3.1 JSON 消息格式 (JsonWebSocketMessage)

/**
 * JSON 格式的 WebSocket 消息帧
 */
@Data
public class JsonWebSocketMessage implements Serializable {

    /**
     * 消息类型
     *
     * 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类
     */
    private String type;
    /**
     * 消息内容
     *
     * 要求 JSON 对象
     */
    private String content;

}
  • 作用:定义 WebSocket 消息的 JSON 格式。

  • 字段

    • type:消息类型,用于分发到对应的消息监听器。

    • content:消息内容,为 JSON 字符串。

3.2 JSON 消息处理器 (JsonWebSocketMessageHandler)

这个类 继承 TextWebSocketHandler,用于处理 WebSocket 文本消息(JSON 格式),并且 基于消息类型 type,将消息分发给对应的 WebSocketMessageListener 进行处理

/**
 * JSON 格式 {@link WebSocketHandler} 实现类
 *
 * 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。
 */
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {

    /**
     * type 与 WebSocketMessageListener 的映射
     */
    private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();

    @SuppressWarnings({"rawtypes", "unchecked"})
    public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
        listenersList.forEach((Consumer<WebSocketMessageListener>)
                listener -> listeners.put(listener.getType(), listener));
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 1.1 空消息,跳过
        if (message.getPayloadLength() == 0) {
            return;
        }
        // 1.2 ping 心跳消息,直接返回 pong 消息。
        if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {
            session.sendMessage(new TextMessage("pong"));
            return;
        }

        // 2.1 解析消息
        try {
            JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);
            if (jsonMessage == null) {
                log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());
                return;
            }
            if (StrUtil.isEmpty(jsonMessage.getType())) {
                log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());
                return;
            }
            // 2.2 获得对应的 WebSocketMessageListener
            WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());
            if (messageListener == null) {
                log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());
                return;
            }
            // 2.3 处理消息
            Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
            Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);
            Long tenantId = WebSocketFrameworkUtils.getTenantId(session);
            TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));
        } catch (Throwable ex) {
            log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());
        }
    }

}
  • 处理客户端发送的 JSON 格式消息,并根据消息类型分发到对应的消息监听器。

  • 流程

    1. 解析客户端发送的消息为 JsonWebSocketMessage

    2. 根据 type 字段查找对应的消息监听器(WebSocketMessageListener)。

    3. 将消息内容反序列化为监听器所需的类型,并调用监听器的 onMessage 方法。

  • 关键点

    • 支持心跳消息(ping -> pong)。

    • 支持多租户(通过 TenantUtils.execute 设置当前租户上下文)。

4. WebSocket 全流程

4.1 连接建立流程

  1. 客户端发起 WebSocket 连接请求,携带 Token。

  2. TokenAuthenticationFilter 验证 Token,并将 LoginUser 设置到 Spring Security 上下文中。

  3. LoginUserHandshakeInterceptor 在握手时,将 LoginUser 设置到 WebSocket 会话的 attributes 中。

  4. WebSocketSessionHandlerDecorator 在连接建立时,将会话添加到 WebSocketSessionManager 中。

4.2 消息处理流程

  1. 客户端发送 JSON 格式的消息。

  2. JsonWebSocketMessageHandler 解析消息,并根据 type 字段查找对应的消息监听器。

  3. 消息监听器处理消息,并执行相应的业务逻辑。

4.3 连接关闭流程

  1. 客户端断开 WebSocket 连接。

  2. WebSocketSessionHandlerDecorator 在连接关闭时,从 WebSocketSessionManager 中移除会话。


5. 关键设计

  • 用户认证:通过 TokenAuthenticationFilter 和 LoginUserHandshakeInterceptor 实现用户信息的传递和验证。

  • 会话管理:通过 WebSocketSessionManager 统一管理所有 WebSocket 会话,支持按用户类型和用户编号查询。

  • 消息分发:通过 JsonWebSocketMessageHandler 实现消息的解析和分发,支持多租户和并发处理。

  • 扩展性:通过 WebSocketMessageListener 接口,支持灵活扩展消息处理逻辑。


6. 总结

上述WebSocket 应用,包括用户认证、会话管理、消息处理等功能。通过分层设计和模块化实现,代码具有良好的扩展性和可维护性。适用于需要 WebSocket 支持的实时通信场景,如在线聊天、实时通知等。

相关文章:

  • LabVIEW正弦信号三参数最小二乘拟合
  • sensor数据在整个rk平台的框架流程是怎么样,
  • Maven 构建 项目测试
  • 前端PayPal支付按钮集成(Vue3)
  • AI + 游戏开发:如何用 DeepSeek 打造高性能开心消消乐游戏
  • React篇之three渲染
  • MB90540/540G/545/545G Series
  • 锤头线和倒锤头线
  • OpenHarmony 5.0 拨号键盘自定义暗码启动其他应用
  • Linux常见问题与分析
  • OBJ文件生成PCD文件(python 实现)
  • MTK Android12 安装app添加密码锁限制
  • Android Framwork 之深入理解 IPC Binder机制
  • vue2安装scss
  • C语言实现十六进制转十进制
  • 谷歌 DeepMind 重磅出击,多款 AI 模型震撼登场
  • 【A2DP】蓝牙音频编解码器互操作性要求详解
  • keepalived系列-自启动配置无效
  • Celery - 入门(get-started)
  • DeepSeek选择方向的优势
  • 怎么样注册自己的网站/长沙seo霸屏
  • 弹窗网站制作器/洛阳网站建设
  • 河南手机网站建设公司/人工智能培训课程
  • 使用php做的网站/有效获客的六大渠道
  • 英文网站怎么设置中文/湖南长沙今日疫情
  • wordpress cosy2.04/郑州百度seo网站优化