java每日精进 3.12 【WebSocket进阶】
基于 SpringWebSocket 进行二次封装,实现了更加简单的使用方式。例如说,WebSocket 的认证、Session 的管理、WebSocket 集群的消息广播等等。
1. 用户认证与登录用户信息传递
1.1 Token 过滤器 (TokenAuthenticationFilter
)
① 在 WebSocket 连接建立时,通过 QueryString 的 token
参数,进行认证。例如说:ws://127.0.0.1:48080/ws?token=xxx
。
/**
* Token 过滤器,验证 token 的有效性
* 验证通过后,获得 {@link LoginUser} 信息,并加入到 Spring Security 上下文
*/
@RequiredArgsConstructor
@Slf4j
public class TokenAuthenticationFilter extends OncePerRequestFilter {
private final SecurityProperties securityProperties;
private final GlobalExceptionHandler globalExceptionHandler;
private final OAuth2TokenApi oauth2TokenApi;
@Override
@SuppressWarnings("NullableProblems")
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
throws ServletException, IOException {
// 情况一,基于 header[login-user] 获得用户,例如说来自 Gateway 或者其它服务透传
LoginUser loginUser = buildLoginUserByHeader(request);
// 情况二,基于 Token 获得用户
// 注意,这里主要满足直接使用 Nginx 直接转发到 Spring Cloud 服务的场景。
if (loginUser == null) {
String token = SecurityFrameworkUtils.obtainAuthorization(request,
securityProperties.getTokenHeader(), securityProperties.getTokenParameter());
if (StrUtil.isNotEmpty(token)) {
Integer userType = WebFrameworkUtils.getLoginUserType(request);
try {
// 1.1 基于 token 构建登录用户
loginUser = buildLoginUserByToken(token, userType);
// 1.2 模拟 Login 功能,方便日常开发调试
if (loginUser == null) {
loginUser = mockLoginUser(request, token, userType);
}
} catch (Throwable ex) {
CommonResult<?> result = globalExceptionHandler.allExceptionHandler(request, ex);
ServletUtils.writeJSON(response, result);
return;
}
}
}
// 设置当前用户
if (loginUser != null) {
SecurityFrameworkUtils.setLoginUser(loginUser, request);
}
// 继续过滤链
chain.doFilter(request, response);
}
private LoginUser buildLoginUserByToken(String token, Integer userType) {
try {
// 校验访问令牌
OAuth2AccessTokenCheckRespDTO accessToken = oauth2TokenApi.checkAccessToken(token).getCheckedData();
if (accessToken == null) {
return null;
}
// 用户类型不匹配,无权限
// 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比对用户类型
// 类似 WebSocket 的 /ws/* 连接地址,是不需要比对用户类型的
if (userType != null
&& ObjectUtil.notEqual(accessToken.getUserType(), userType)) {
throw new AccessDeniedException("错误的用户类型");
}
// 构建登录用户
return new LoginUser().setId(accessToken.getUserId()).setUserType(accessToken.getUserType())
.setInfo(accessToken.getUserInfo()) // 额外的用户信息
.setTenantId(accessToken.getTenantId()).setScopes(accessToken.getScopes())
.setExpiresTime(accessToken.getExpiresTime());
} catch (ServiceException serviceException) {
// 校验 Token 不通过时,考虑到一些接口是无需登录的,所以直接返回 null 即可
return null;
}
}
/**
* 模拟登录用户,方便日常开发调试
* @param request 请求
* @param token 模拟的 token,格式为 {@link SecurityProperties#getMockSecret()} + 用户编号
* @param userType 用户类型
* @return 模拟的 LoginUser
*/
private LoginUser mockLoginUser(HttpServletRequest request, String token, Integer userType) {
if (!securityProperties.getMockEnable()) {
return null;
}
// 必须以 mockSecret 开头
if (!token.startsWith(securityProperties.getMockSecret())) {
return null;
}
// 构建模拟用户
Long userId = Long.valueOf(token.substring(securityProperties.getMockSecret().length()));
return new LoginUser().setId(userId).setUserType(userType)
.setTenantId(WebFrameworkUtils.getTenantId(request));
}
@SneakyThrows
private LoginUser buildLoginUserByHeader(HttpServletRequest request) {
String loginUserStr = request.getHeader(SecurityFrameworkUtils.LOGIN_USER_HEADER);
if (StrUtil.isEmpty(loginUserStr)) {
return null;
}
try {
loginUserStr = URLDecoder.decode(loginUserStr, StandardCharsets.UTF_8.name()); // 解码,解决中文乱码问题
return JsonUtils.parseObject(loginUserStr, LoginUser.class);
} catch (Exception ex) {
log.error("[buildLoginUserByHeader][解析 LoginUser({}) 发生异常]", loginUserStr, ex); ;
throw ex;
}
}
}
-
作用:在 WebSocket 握手之前,验证用户的 Token 有效性,并将登录用户信息(
LoginUser
)设置到 Spring Security 上下文中。 -
流程:
-
从请求头或请求参数中获取 Token。
-
调用
OAuth2TokenApi
验证 Token 的有效性。 -
如果 Token 有效,构建
LoginUser
对象,并将其设置到 Spring Security 上下文中。 -
如果 Token 无效,返回错误响应。
-
-
关键点:
-
支持从请求头或请求参数中获取 Token。
-
支持模拟登录用户(用于开发环境)。
-
支持用户类型(
userType
)的校验。
-
1.2 WebSocket 握手拦截器 (LoginUserHandshakeInterceptor
)
/**
* 登录用户的 {@link HandshakeInterceptor} 实现类
*
* 流程如下:
* 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过
* 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中
*/
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) {
LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
if (loginUser != null) {
WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
// do nothing
}
}
-
作用:在 WebSocket 握手时,将 Spring Security 上下文中的
LoginUser
信息传递到 WebSocket 会话中。 -
流程:
-
在握手之前,从 Spring Security 上下文中获取
LoginUser
。 -
将
LoginUser
设置到 WebSocket 会话的attributes
中。
-
-
关键点:
-
通过
WebSocketFrameworkUtils.setLoginUser
将用户信息存储到会话中。
-
2. WebSocket 会话管理
每个前端和后端建立的 WebSocket 连接,对应后端的一个 WebSocketSession 会话对象。由于后续需要对 WebSocketSession 进行消息的发送,所以需要进行管理。
2.1 WebSocket 会话管理器 (WebSocketSessionManager
)
/**
* 默认的 {@link WebSocketSessionManager} 实现类
*/
public class WebSocketSessionManagerImpl implements WebSocketSessionManager {
/**
* id 与 WebSocketSession 映射
*
* key:Session 编号
*/
private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();
/**
* user 与 WebSocketSession 映射
*
* key1:用户类型
* key2:用户编号
*/
private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions
= new ConcurrentHashMap<>();
@Override
public void addSession(WebSocketSession session) {
// 添加到 idSessions 中
idSessions.put(session.getId(), session);
// 添加到 userSessions 中
LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
if (user == null) {
return;
}
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
if (userSessionsMap == null) {
userSessionsMap = new ConcurrentHashMap<>();
if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {
userSessionsMap = userSessions.get(user.getUserType());
}
}
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
if (sessions == null) {
sessions = new CopyOnWriteArrayList<>();
if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {
sessions = userSessionsMap.get(user.getId());
}
}
sessions.add(session);
}
@Override
public void removeSession(WebSocketSession session) {
// 移除从 idSessions 中
idSessions.remove(session.getId());
// 移除从 idSessions 中
LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
if (user == null) {
return;
}
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
if (userSessionsMap == null) {
return;
}
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
sessions.removeIf(session0 -> session0.getId().equals(session.getId()));
if (CollUtil.isEmpty(sessions)) {
userSessionsMap.remove(user.getId(), sessions);
}
}
@Override
public WebSocketSession getSession(String id) {
return idSessions.get(id);
}
@Override
public Collection<WebSocketSession> getSessionList(Integer userType) {
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
if (CollUtil.isEmpty(userSessionsMap)) {
return new ArrayList<>();
}
LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容
Long contextTenantId = TenantContextHolder.getTenantId();
for (List<WebSocketSession> sessions : userSessionsMap.values()) {
if (CollUtil.isEmpty(sessions)) {
continue;
}
// 特殊:如果租户不匹配,则直接排除
if (contextTenantId != null) {
Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));
if (!contextTenantId.equals(userTenantId)) {
continue;
}
}
result.addAll(sessions);
}
return result;
}
@Override
public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
if (CollUtil.isEmpty(userSessionsMap)) {
return new ArrayList<>();
}
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);
return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();
}
}
存储 WebSocket 会话
- 维护一个
idSessions
集合,用于通过 Session ID 存储和查找WebSocketSession
。- 维护一个
userSessions
集合,用于根据 用户类型 和 用户 ID 存储WebSocketSession
,支持多端会话管理。添加会话 (
addSession
)
- 将
session
存入idSessions
,以 Session ID 作为键。- 获取
session
关联的LoginUser
信息(用户类型 & 用户 ID)。- 在
userSessions
中,以 用户类型 和 用户 ID 作为键,存储WebSocketSession
。移除会话 (
removeSession
)
- 从
idSessions
移除session
。- 从
userSessions
移除session
:
- 先找到该
session
所属的用户类型 & 用户 ID 。- 在对应的
CopyOnWriteArrayList<WebSocketSession>
中删除该session
。- 如果该用户的所有
session
都被移除,则删除该用户的映射。获取单个会话 (
getSession
)
- 通过
idSessions.get(id)
直接获取 WebSocket 连接。获取某用户类型的所有会话 (
getSessionList(Integer userType)
)
- 获取
userSessions
中属于userType
的所有session
。- 如果存在租户 ID(多租户逻辑),则排除不同租户的
session
。获取某个用户的所有会话 (
getSessionList(Integer userType, Long userId)
)
- 获取特定用户 ID 关联的
session
集合。
-
作用:管理所有 WebSocket 会话,支持按用户类型、用户编号等条件查询会话。
-
实现类:
WebSocketSessionManagerImpl
-
数据结构:
-
idSessions
:存储所有会话,键为会话 ID,值为WebSocketSession
。 -
userSessions
:按用户类型和用户编号存储会话,键为用户类型和用户编号,值为会话列表。
-
-
功能:
-
添加会话:将新会话添加到
idSessions
和userSessions
中。 -
移除会话:从
idSessions
和userSessions
中移除会话。 -
查询会话:支持按会话 ID、用户类型、用户编号查询会话。
-
2.2 WebSocket 会话装饰器 (WebSocketSessionHandlerDecorator
)
/**
* {@link WebSocketHandler} 的装饰类,实现了以下功能:
*
* 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理
* 2. 封装 {@link WebSocketSession} 支持并发操作
*/
public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {
/**
* 发送时间的限制,单位:毫秒
*/
private static final Integer SEND_TIME_LIMIT = 1000 * 5;
/**
* 发送消息缓冲上线,单位:bytes
*/
private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;
private final WebSocketSessionManager sessionManager;
public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,
WebSocketSessionManager sessionManager) {
super(delegate);
this.sessionManager = sessionManager;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) {
// 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149
session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);
// 添加到 WebSocketSessionManager 中
sessionManager.addSession(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
sessionManager.removeSession(session);
}
}
管理 WebSocket 连接
- 当
WebSocketSession
建立连接 时,使用sessionManager.addSession(session)
进行管理。- 当
WebSocketSession
关闭连接 时,使用sessionManager.removeSession(session)
进行清理。增强
WebSocketSession
,支持并发操作
- 使用
ConcurrentWebSocketSessionDecorator
包装session
,确保在高并发环境下 避免阻塞或数据丢失。- 限制消息发送时间(最大 5 秒)。
- 限制消息缓冲区大小(最大 100 KB)。
-
作用:在 WebSocket 连接建立和关闭时,调用
WebSocketSessionManager
管理会话。 -
流程:
-
在连接建立时,将
WebSocketSession
包装为ConcurrentWebSocketSessionDecorator
,支持并发操作。 -
将包装后的会话添加到
WebSocketSessionManager
中。 -
在连接关闭时,从
WebSocketSessionManager
中移除会话。
-
-
关键点:
-
使用
ConcurrentWebSocketSessionDecorator
解决 WebSocket 并发问题。
-
3. WebSocket 消息处理
3.1 JSON 消息格式 (JsonWebSocketMessage
)
/**
* JSON 格式的 WebSocket 消息帧
*/
@Data
public class JsonWebSocketMessage implements Serializable {
/**
* 消息类型
*
* 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类
*/
private String type;
/**
* 消息内容
*
* 要求 JSON 对象
*/
private String content;
}
-
作用:定义 WebSocket 消息的 JSON 格式。
-
字段:
-
type
:消息类型,用于分发到对应的消息监听器。 -
content
:消息内容,为 JSON 字符串。
-
3.2 JSON 消息处理器 (JsonWebSocketMessageHandler
)
这个类 继承 TextWebSocketHandler
,用于处理 WebSocket 文本消息(JSON 格式),并且 基于消息类型 type
,将消息分发给对应的 WebSocketMessageListener
进行处理。
/**
* JSON 格式 {@link WebSocketHandler} 实现类
*
* 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。
*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {
/**
* type 与 WebSocketMessageListener 的映射
*/
private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();
@SuppressWarnings({"rawtypes", "unchecked"})
public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
listenersList.forEach((Consumer<WebSocketMessageListener>)
listener -> listeners.put(listener.getType(), listener));
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 1.1 空消息,跳过
if (message.getPayloadLength() == 0) {
return;
}
// 1.2 ping 心跳消息,直接返回 pong 消息。
if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {
session.sendMessage(new TextMessage("pong"));
return;
}
// 2.1 解析消息
try {
JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);
if (jsonMessage == null) {
log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());
return;
}
if (StrUtil.isEmpty(jsonMessage.getType())) {
log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());
return;
}
// 2.2 获得对应的 WebSocketMessageListener
WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());
if (messageListener == null) {
log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());
return;
}
// 2.3 处理消息
Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);
Long tenantId = WebSocketFrameworkUtils.getTenantId(session);
TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));
} catch (Throwable ex) {
log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());
}
}
}
-
处理客户端发送的 JSON 格式消息,并根据消息类型分发到对应的消息监听器。
-
流程:
-
解析客户端发送的消息为
JsonWebSocketMessage
。 -
根据
type
字段查找对应的消息监听器(WebSocketMessageListener
)。 -
将消息内容反序列化为监听器所需的类型,并调用监听器的
onMessage
方法。
-
-
关键点:
-
支持心跳消息(
ping
->pong
)。 -
支持多租户(通过
TenantUtils.execute
设置当前租户上下文)。
-
4. WebSocket 全流程
4.1 连接建立流程
-
客户端发起 WebSocket 连接请求,携带 Token。
-
TokenAuthenticationFilter
验证 Token,并将LoginUser
设置到 Spring Security 上下文中。 -
LoginUserHandshakeInterceptor
在握手时,将LoginUser
设置到 WebSocket 会话的attributes
中。 -
WebSocketSessionHandlerDecorator
在连接建立时,将会话添加到WebSocketSessionManager
中。
4.2 消息处理流程
-
客户端发送 JSON 格式的消息。
-
JsonWebSocketMessageHandler
解析消息,并根据type
字段查找对应的消息监听器。 -
消息监听器处理消息,并执行相应的业务逻辑。
4.3 连接关闭流程
-
客户端断开 WebSocket 连接。
-
WebSocketSessionHandlerDecorator
在连接关闭时,从WebSocketSessionManager
中移除会话。
5. 关键设计
-
用户认证:通过
TokenAuthenticationFilter
和LoginUserHandshakeInterceptor
实现用户信息的传递和验证。 -
会话管理:通过
WebSocketSessionManager
统一管理所有 WebSocket 会话,支持按用户类型和用户编号查询。 -
消息分发:通过
JsonWebSocketMessageHandler
实现消息的解析和分发,支持多租户和并发处理。 -
扩展性:通过
WebSocketMessageListener
接口,支持灵活扩展消息处理逻辑。
6. 总结
上述WebSocket 应用,包括用户认证、会话管理、消息处理等功能。通过分层设计和模块化实现,代码具有良好的扩展性和可维护性。适用于需要 WebSocket 支持的实时通信场景,如在线聊天、实时通知等。