WebApplicationType.REACTIVE 的webSocket 多实例问题处理
- 配置类
@Configuration
public class WebFluxWebSocketConfig {/** 让 Spring 注入已经带依赖的 Handler */@Beanpublic HandlerMapping webSocketMapping(WebSocketReceivedHandler handler) {return new SimpleUrlHandlerMapping(Map.of("/api/xxx/ws", handler), // 用注入的 handler-1);}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
@Configuration
public class RedisPubSubConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,RedisBroadcastListener listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listener, new ChannelTopic("ws-broadcast"));return container;}
}
- handler
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketReceivedHandler implements WebSocketHandler {@Autowiredprivate AiBroadcastEventHandlerDispatcher<?, ?> dispatcher;@Autowiredprivate WsSessionPool wsSessionPool;@Overridepublic @NotNull Mono<Void> handle(@NotNull WebSocketSession session) {log.info("websocket 连接成功,sessionId:{}", session.getId());wsSessionPool.add(session);String sid = session.getId();// 处理客户端请求消息,生成响应消息流Flux<WebSocketMessage> inputFlux = session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(payload -> dispatcher.doDispatch(session, payload).map(session::textMessage));// 服务端广播消息流Flux<WebSocketMessage> broadcastFlux = wsSessionPool.getPersonalFlux(sid).map(session::textMessage);// 合并两个流,确保 session.send 只调用一次Flux<WebSocketMessage> mergedFlux = Flux.merge(inputFlux, broadcastFlux);log.info("websocket 开始发送消息,sessionId:{}", session.getId());return session.send(mergedFlux).doFinally(sig -> {wsSessionPool.remove(session);log.info("websocket 关闭,sessionId:{},signal:{}", session.getId(), sig);});}
}
3.连接池
@Component
@Slf4j
public class WsSessionPool {/** session 实体 */private final Map<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>();/** 消息推送通道:Replay 可以避免未订阅者时失败,limit(1) 限制内存 */private final Map<String, Sinks.Many<String>> SINKS = new ConcurrentHashMap<>();/** 添加新的连接 */public void add(WebSocketSession session) {String sid = session.getId();SESSIONS.put(sid, session);// 推荐使用 replay 降低 emit 失败风险Sinks.Many<String> sink = Sinks.many().replay().limit(1);SINKS.put(sid, sink);log.info("WS 连接上线: {}, 当前连接数={}", sid, SESSIONS.size());}/** 移除连接(主动或异常关闭) */public void remove(WebSocketSession session) {removeById(session.getId());}public void removeById(String sessionId) {SESSIONS.remove(sessionId);Sinks.Many<String> sink = SINKS.remove(sessionId);if (sink != null) {sink.tryEmitComplete(); // 通知关闭}log.info("WS 连接下线: {}, 当前连接数={}", sessionId, SESSIONS.size());}/** 获取指定 session 的推送 Flux */public Flux<String> getPersonalFlux(String sessionId) {Sinks.Many<String> sink = SINKS.get(sessionId);if (sink == null) {log.warn("sessionId={} 不存在,返回空流", sessionId);return Flux.empty();}return sink.asFlux().doOnCancel(() -> {log.info("sessionId={} Flux 被取消订阅", sessionId);removeById(sessionId);}).doOnTerminate(() -> {log.info("sessionId={} Flux 被终止", sessionId);removeById(sessionId);});}/** 广播推送消息到所有连接 */public void broadcast(String json) {for (Map.Entry<String, Sinks.Many<String>> entry : SINKS.entrySet()) {String sid = entry.getKey();Sinks.Many<String> sink = entry.getValue();Sinks.EmitResult result = sink.tryEmitNext(json);if (result.isFailure()) {log.warn("广播失败 sid={}, result={}, 自动移除连接", sid, result);removeById(sid);}}log.info("广播成功,消息: {}, 当前在线连接: {}", json, SINKS.size());}
}
- 心跳机制
@Component
@Log4j2
public class WsHeartbeatTask {private final WsSessionPool wsSessionPool;public WsHeartbeatTask(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@PostConstructpublic void init() {log.info("WebSocket心跳任务已启动");}// 每30秒广播一个心跳消息@Scheduled(fixedRate = 30_000)public void sendHeartbeat() {String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String json = String.format("{\"type\":\"ping\",\"timestamp\":\"%s\"}", timeStr);wsSessionPool.broadcast(json);}
}
- listener
@Component
@Log4j2
public class RedisBroadcastListener implements MessageListener {private final WsSessionPool wsSessionPool;public RedisBroadcastListener(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@Overridepublic void onMessage(Message message, byte[] pattern) {String body = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Redis广播监听器收到消息:{}", body);wsSessionPool.broadcast(body);}
}
- 监听kafka 转成redis发送websocket消息
@Component
@Log4j2
public class MapAlarmMsgHandler implements AiDetectionScreenMessageHandler<List<FaultAlarmVO>> {// 用于发布消息@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic Integer messageType() {return AiBroadcastEventEnum.MAP_ALARM_CHARGING.getCode();}@Overridepublic void handler(AiDetectionMessageRedisEvent event) {log.info("FaultAlarmMsgHandler:{}", event);try {List<FaultAlarmVO> vo = getSource(event);Mono<WebSocketResponse<List<FaultAlarmVO>>> response = WebSocketResponse.ok(AiBroadcastEventEnum.MAP_ALARM_CHARGING.getCode(), vo);WebSocketResponse<List<FaultAlarmVO>> block = response.block();String json = JSONUtil.toJsonStr(block);redisTemplate.convertAndSend("ws-broadcast", json);log.info("FaultAlarmMsgHandler-广播消息转发到 Redis:{}", json);} catch (Exception e) {log.error("消息处理error:{}", event, e);}}@Overridepublic List<FaultAlarmVO> getSource(AiDetectionMessageRedisEvent event) {return (List<FaultAlarmVO>) event.getContent();}
}