java配置webSocket、前端使用uniapp连接
一、这个管理系统是基于若依,配置webSocKet的maven依赖
<!--websocket--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
二、配置类配置webSocket的端点和相关的参数
1、WebSocketConfig - webSocket配置类
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate WebSocketHandler webSocketHandler;/*** 注册websocket的端点* 客户端连接格式: ws://yourdomain:port/ws/order?token=yourTokenValue* token参数必须提供,系统会通过token从Redis获取对应的openId用于用户识别* @param registry WebSocketHandlerRegistry*/@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(webSocketHandler, "/ws/order").setAllowedOrigins("*"); // 允许跨域访问}/*** 配置WebSocket服务器的参数* 包括:连接超时时间、心跳超时时间、最大消息大小等* @return ServletServerContainerFactoryBean*/@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();// 设置异步发送超时时间为25秒container.setAsyncSendTimeout(25000L);// 设置最大会话空闲时间为60秒container.setMaxSessionIdleTimeout(60000L);// 设置最大文本消息缓冲区大小为8KBcontainer.setMaxTextMessageBufferSize(8192);// 设置最大二进制消息缓冲区大小为8KBcontainer.setMaxBinaryMessageBufferSize(8192);return container;}
}
2、WebSocketHandler - webSocket处理器
@Component
@Slf4j
public class WebSocketHandler extends TextWebSocketHandler {@Autowiredprivate StringRedisTemplate stringRedisTemplate;// 用线程安全的集合来管理所有连接的 WebSocket 会话private static final Set<WebSocketSession> sessions = new CopyOnWriteArraySet<>();// 使用ConcurrentHashMap来存储openId到session的映射关系private static final Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();// 使用ConcurrentHashMap来存储session到openId的映射关系(反向映射)private static final Map<WebSocketSession, String> sessionUsers = new ConcurrentHashMap<>();// 记录每个session最后一次活跃时间private static final Map<String, Long> sessionLastActiveTime = new ConcurrentHashMap<>();// 心跳检查的定时任务执行器private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();// 心跳超时时间,单位毫秒private static final long HEARTBEAT_TIMEOUT = 50000L; // 50秒// 用于解析JSON的对象映射器private static final ObjectMapper objectMapper = new ObjectMapper();/*** 构造方法,启动心跳检测任务*/public WebSocketHandler() {// 每15秒检查一次心跳heartbeatScheduler.scheduleAtFixedRate(this::checkHeartbeats, 15, 15, TimeUnit.SECONDS);}/*** 心跳检查方法,清理那些超时的连接*/private void checkHeartbeats() {long currentTime = System.currentTimeMillis();for (Map.Entry<String, Long> entry : sessionLastActiveTime.entrySet()) {String openId = entry.getKey();long lastActive = entry.getValue();// 如果超过超时时间没有活动,则关闭会话if (currentTime - lastActive > HEARTBEAT_TIMEOUT) {WebSocketSession session = userSessions.get(openId);if (session != null && session.isOpen()) {try {log.warn("会话心跳超时,主动断开连接 - openId: {}, 上次活跃: {}ms前", openId, currentTime - lastActive);session.close(CloseStatus.NORMAL);} catch (IOException e) {log.error("关闭超时WebSocket会话异常 - openId: {}, 错误: {}", openId, e.getMessage());} finally {// 确保从会话映射中移除sessions.remove(session);sessionUsers.remove(session);userSessions.remove(openId);sessionLastActiveTime.remove(openId);}} else {// 会话已关闭或不存在,直接清理userSessions.remove(openId);sessionLastActiveTime.remove(openId);}}}}/*** 新客户端连接时,加入到 sessions 集合中* @param session 会话*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {sessions.add(session);// 从URL中获取token参数,格式应为 /ws/order?token=xxxString token = extractToken(session);if (token != null) {// 从Redis中获取对应的openIdString openId = getOpenIdFromToken(token);if (openId != null) {userSessions.put(openId, session);sessionUsers.put(session, openId);sessionLastActiveTime.put(openId, System.currentTimeMillis()); // 记录初始活跃时间log.info("WebSocket连接已建立 - token: {}, openId: {}, 当前连接数: {}", token, openId, sessions.size());} else {log.warn("找不到token对应的openId,token可能已过期 - token: {}", token);// 可以选择关闭这个无效的连接session.close(CloseStatus.NOT_ACCEPTABLE);}} else {log.warn("WebSocket连接未提供token参数,无法识别用户");// 可以选择关闭这个无效的连接session.close(CloseStatus.NOT_ACCEPTABLE);}}/*** 客户端断开连接时,从 sessions 集合中移除* @param session 会话* @param status*/@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {sessions.remove(session);// 从用户会话映射中也移除String openId = sessionUsers.remove(session);if (openId != null) {userSessions.remove(openId);sessionLastActiveTime.remove(openId);log.info("WebSocket连接已关闭 - openId: {}, 状态: {}", openId, status);}}/*** 处理收到的文本消息* 对于心跳消息进行特殊处理*/@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String openId = sessionUsers.get(session);String payload = message.getPayload();try {// 尝试解析为JSONJsonNode jsonNode = objectMapper.readTree(payload);// 检查是否是心跳消息if (jsonNode.has("type") && "ping".equals(jsonNode.get("type").asText())) {// 更新最后活跃时间if (openId != null) {sessionLastActiveTime.put(openId, System.currentTimeMillis());}// 发送pong响应session.sendMessage(new TextMessage("{\"type\":\"pong\",\"time\":" + System.currentTimeMillis() + "}"));return;}} catch (Exception e) {// 不是JSON格式的消息,忽略错误继续处理}// 更新最后活跃时间if (openId != null) {sessionLastActiveTime.put(openId, System.currentTimeMillis());}log.debug("收到消息 - openId: {}, 内容: {}", openId, payload);// 在这里可以添加其他消息处理逻辑}/*** 从WebSocketSession中提取token* @param session WebSocket会话* @return token,如果不存在则返回null*/private String extractToken(WebSocketSession session) {String query = session.getUri().getQuery();if (query != null && query.contains("token=")) {String[] params = query.split("&");for (String param : params) {String[] keyValue = param.split("=");if (keyValue.length == 2 && "token".equals(keyValue[0])) {log.info("WebSocket连接已获取token - token: {}", keyValue[1]);return keyValue[1];}}}return null;}/*** 从token获取对应的openId* @param token 用户token* @return openId,如果token无效则返回null*/private String getOpenIdFromToken(String token) {if (token == null || token.isEmpty()) {return null;}try {// 从Redis中获取token对应的openIdreturn stringRedisTemplate.opsForValue().get(WECHAT_KEY + token);} catch (Exception e) {log.error("从Redis获取token信息异常 - token: {}, 错误: {}", token, e.getMessage());return null;}}/*** 发送支付成功的通知给所有连接的客户端* @param message 消息体*/public void sendPaymentSuccessNotification(String message) {for (WebSocketSession session : sessions) {try {// 通过 WebSocket 向每个客户端发送消息session.sendMessage(new TextMessage(message));} catch (IOException e) {log.error("发送支付成功通知失败", e);}}}/*** 向指定用户发送消息* @param openId 用户的openId* @param message 消息内容* @return 是否发送成功*/public boolean sendMessageToUser(String openId, String message) {WebSocketSession session = userSessions.get(openId);if (session != null && session.isOpen()) {try {session.sendMessage(new TextMessage(message));log.info("消息已发送给用户 - openId: {}", openId);return true;} catch (IOException e) {log.error("发送消息给用户失败 - openId: {}", openId, e);return false;}} else {log.info("用户未通过WebSocket连接 - openId: {}", openId);return false;}}/*** 向所有用户发送心跳检测消息*/public void sendHeartbeat() {String heartbeatMsg = "{\"type\":\"heartbeat\",\"time\":" + System.currentTimeMillis() + "}";for (WebSocketSession session : sessions) {if (session.isOpen()) {try {session.sendMessage(new TextMessage(heartbeatMsg));} catch (IOException e) {log.error("发送心跳消息失败", e);}}}}
}
注意:这里发送消息给指定用户需要前端传递token,获取存储在redis中的openId(微信小程序用户标识)
3、发送消息我定义了一个定时器发送消息和心跳测试
3.1、根据自己业务封装的消息体
@ApiModel(value = "MessageVo",discriminator = "websocket的消息体")
public class MessageVo {@ApiModelProperty(value = "消息标题",dataType = "string")private String title;@ApiModelProperty(value = "消息内容",dataType = "string")private String content;@ApiModelProperty(value = "车牌号码",dataType = "string")private String plateNumber;@ApiModelProperty(value = "订单编号",dataType = "string")private String orderNumber;@ApiModelProperty(value = "创建时间",dataType = "date")private Date createTime;}
/*** 定时发送提醒消息给待过磅状态的用户* 每1分钟执行一次,提醒用户进行过磅操作*/public void sendWeighingReminder() {log.info("开始执行待过磅用户提醒任务");try {// 查询所有待过磅的订单WeighingRecords pendingQuery = new WeighingRecords();pendingQuery.setStatus(0L); // 待过磅List<WeighingRecords> pendingWeighingOrders = weighingRecordsMapper.selectWeighingRecordsList(pendingQuery);// 如果没有待过磅订单,直接返回if (pendingWeighingOrders == null || pendingWeighingOrders.isEmpty()) {log.info("没有查询到待过磅订单,跳过发送提醒");return;}log.info("查询到 {} 条待过磅订单,开始发送提醒", pendingWeighingOrders.size());int successCount = 0;// 遍历所有待过磅订单,发送提醒消息for (WeighingRecords order : pendingWeighingOrders) {// 检查是否有有效的openIdString openId = order.getOpenId();if (openId == null || openId.trim().isEmpty()) {log.warn("订单 {} 缺少有效的openId,无法发送提醒", order.getOrderNumber());continue;}// 创建消息体MessageVo messageVo = new MessageVo();messageVo.setTitle("过磅提醒");messageVo.setContent("您有一条待过磅的订单,请及时前往过磅点进行过磅操作。");messageVo.setOrderNumber(order.getOrderNumber());messageVo.setPlateNumber(order.getPlateNumber()); // 设置车牌号messageVo.setCreateTime(DateUtils.getNowDate());try {// 转换为JSON字符串String messageJson = objectMapper.writeValueAsString(messageVo);// 直接使用openId发送消息(WebSocketHandler内部会通过openId查找对应的会话)boolean sent = webSocketHandler.sendMessageToUser(openId, messageJson);if (sent) {successCount++;log.info("成功向用户 {} 发送过磅提醒消息,订单号: {}", openId, order.getOrderNumber());} else {log.info("用户 {} 未连接WebSocket,无法发送过磅提醒消息,订单号: {}", openId, order.getOrderNumber());}} catch (JsonProcessingException e) {log.error("消息序列化异常,订单号: {}, 错误: {}", order.getOrderNumber(), e.getMessage());} catch (Exception e) {log.error("发送消息异常,订单号: {}, 错误: {}", order.getOrderNumber(), e.getMessage());}}log.info("过磅提醒任务完成,共尝试: {} 条,成功: {} 条", pendingWeighingOrders.size(), successCount);} catch (Exception e) {log.error("过磅提醒任务异常: {}", e.getMessage(), e);}}/*** 定期发送心跳消息,保持WebSocket连接活跃* 每25秒执行一次,低于WebSocketConfig中设置的60秒超时时间*/public void sendHeartbeat() {log.debug("开始执行WebSocket心跳任务");try {webSocketHandler.sendHeartbeat();log.debug("WebSocket心跳消息发送完成");} catch (Exception e) {log.error("WebSocket心跳任务异常: {}", e.getMessage(), e);}}
4、由于这个管理系统是基于若依所以需要配置鉴权,否则会被拦截
这个是部分配置代码
@Beanprotected SecurityFilterChain filterChain(HttpSecurity httpSecurity) throws Exception{return httpSecurity// CSRF禁用,因为不使用session.csrf(csrf -> csrf.disable())// 禁用HTTP响应标头.headers((headersCustomizer) -> {headersCustomizer.cacheControl(cache -> cache.disable()).frameOptions(options -> options.sameOrigin());})// 认证失败处理类.exceptionHandling(exception -> exception.authenticationEntryPoint(unauthorizedHandler))// 基于token,所以不需要session.sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))// 注解标记允许匿名访问的url.authorizeHttpRequests((requests) -> {permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll());// 对于登录login 注册register 验证码captchaImage 允许匿名访问requests.antMatchers("/login", "/register", "/captchaImage","/weiXin/login","/weiXin/returnNotify","/ws/**").permitAll()
..........}
注意:端点配置的是“/ws/order",所以在这了配置为”/ws/**“
三、小程序端的部分代码配置
注意:需要在路径上面传递token,为了后端获取openId向指定用户发送消息
这个是小程序的webSocket的地址示例:“wss://5aa7e45c.r11.cpolar.top/ws/order?token=${this.token}”