当前位置: 首页 > news >正文

WebApplicationType.REACTIVE 的webSocket 多实例问题处理

  1. 配置类
@Configuration
public class WebFluxWebSocketConfig {/** 让 Spring 注入已经带依赖的 Handler */@Beanpublic HandlerMapping webSocketMapping(WebSocketReceivedHandler handler) {return new SimpleUrlHandlerMapping(Map.of("/api/xxx/ws", handler),   // 用注入的 handler-1);}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
@Configuration
public class RedisPubSubConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,RedisBroadcastListener listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listener, new ChannelTopic("ws-broadcast"));return container;}
}
  1. handler
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketReceivedHandler implements WebSocketHandler {@Autowiredprivate AiBroadcastEventHandlerDispatcher<?, ?> dispatcher;@Autowiredprivate WsSessionPool wsSessionPool;@Overridepublic @NotNull Mono<Void> handle(@NotNull WebSocketSession session) {log.info("websocket 连接成功,sessionId:{}", session.getId());wsSessionPool.add(session);String sid = session.getId();// 处理客户端请求消息,生成响应消息流Flux<WebSocketMessage> inputFlux = session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(payload -> dispatcher.doDispatch(session, payload).map(session::textMessage));// 服务端广播消息流Flux<WebSocketMessage> broadcastFlux = wsSessionPool.getPersonalFlux(sid).map(session::textMessage);// 合并两个流,确保 session.send 只调用一次Flux<WebSocketMessage> mergedFlux = Flux.merge(inputFlux, broadcastFlux);log.info("websocket 开始发送消息,sessionId:{}", session.getId());return session.send(mergedFlux).doFinally(sig -> {wsSessionPool.remove(session);log.info("websocket 关闭,sessionId:{},signal:{}", session.getId(), sig);});}
}

3.连接池

@Component
@Slf4j
public class WsSessionPool {/** session 实体 */private final Map<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>();/** 消息推送通道:Replay 可以避免未订阅者时失败,limit(1) 限制内存 */private final Map<String, Sinks.Many<String>> SINKS = new ConcurrentHashMap<>();/** 添加新的连接 */public void add(WebSocketSession session) {String sid = session.getId();SESSIONS.put(sid, session);// 推荐使用 replay 降低 emit 失败风险Sinks.Many<String> sink = Sinks.many().replay().limit(1);SINKS.put(sid, sink);log.info("WS 连接上线: {}, 当前连接数={}", sid, SESSIONS.size());}/** 移除连接(主动或异常关闭) */public void remove(WebSocketSession session) {removeById(session.getId());}public void removeById(String sessionId) {SESSIONS.remove(sessionId);Sinks.Many<String> sink = SINKS.remove(sessionId);if (sink != null) {sink.tryEmitComplete(); // 通知关闭}log.info("WS 连接下线: {}, 当前连接数={}", sessionId, SESSIONS.size());}/** 获取指定 session 的推送 Flux */public Flux<String> getPersonalFlux(String sessionId) {Sinks.Many<String> sink = SINKS.get(sessionId);if (sink == null) {log.warn("sessionId={} 不存在,返回空流", sessionId);return Flux.empty();}return sink.asFlux().doOnCancel(() -> {log.info("sessionId={} Flux 被取消订阅", sessionId);removeById(sessionId);}).doOnTerminate(() -> {log.info("sessionId={} Flux 被终止", sessionId);removeById(sessionId);});}/** 广播推送消息到所有连接 */public void broadcast(String json) {for (Map.Entry<String, Sinks.Many<String>> entry : SINKS.entrySet()) {String sid = entry.getKey();Sinks.Many<String> sink = entry.getValue();Sinks.EmitResult result = sink.tryEmitNext(json);if (result.isFailure()) {log.warn("广播失败 sid={}, result={}, 自动移除连接", sid, result);removeById(sid);}}log.info("广播成功,消息: {}, 当前在线连接: {}", json, SINKS.size());}
}
  1. 心跳机制
@Component
@Log4j2
public class WsHeartbeatTask {private final WsSessionPool wsSessionPool;public WsHeartbeatTask(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@PostConstructpublic void init() {log.info("WebSocket心跳任务已启动");}// 每30秒广播一个心跳消息@Scheduled(fixedRate = 30_000)public void sendHeartbeat() {String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String json = String.format("{\"type\":\"ping\",\"timestamp\":\"%s\"}", timeStr);wsSessionPool.broadcast(json);}
}
  1. listener
@Component
@Log4j2
public class RedisBroadcastListener implements MessageListener {private final WsSessionPool wsSessionPool;public RedisBroadcastListener(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@Overridepublic void onMessage(Message message, byte[] pattern) {String body = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Redis广播监听器收到消息:{}", body);wsSessionPool.broadcast(body);}
}
  1. 监听kafka 转成redis发送websocket消息
@Component
@Log4j2
public class MapAlarmMsgHandler implements AiDetectionScreenMessageHandler<List<FaultAlarmVO>> {// 用于发布消息@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic Integer messageType() {return AiBroadcastEventEnum.MAP_ALARM_CHARGING.getCode();}@Overridepublic void handler(AiDetectionMessageRedisEvent event) {log.info("FaultAlarmMsgHandler:{}", event);try {List<FaultAlarmVO> vo = getSource(event);Mono<WebSocketResponse<List<FaultAlarmVO>>> response = WebSocketResponse.ok(AiBroadcastEventEnum.MAP_ALARM_CHARGING.getCode(), vo);WebSocketResponse<List<FaultAlarmVO>> block = response.block();String json = JSONUtil.toJsonStr(block);redisTemplate.convertAndSend("ws-broadcast", json);log.info("FaultAlarmMsgHandler-广播消息转发到 Redis:{}", json);} catch (Exception e) {log.error("消息处理error:{}", event, e);}}@Overridepublic List<FaultAlarmVO> getSource(AiDetectionMessageRedisEvent event) {return (List<FaultAlarmVO>) event.getContent();}
}

文章转载自:
http://ate.zzyjnl.cn
http://balancer.zzyjnl.cn
http://carbamoyl.zzyjnl.cn
http://chasmal.zzyjnl.cn
http://bantering.zzyjnl.cn
http://another.zzyjnl.cn
http://amphiploid.zzyjnl.cn
http://adat.zzyjnl.cn
http://autocollimator.zzyjnl.cn
http://chiaroscurist.zzyjnl.cn
http://alkylic.zzyjnl.cn
http://amianthus.zzyjnl.cn
http://astrometry.zzyjnl.cn
http://bumble.zzyjnl.cn
http://asbolite.zzyjnl.cn
http://boxer.zzyjnl.cn
http://bureaux.zzyjnl.cn
http://bierhaus.zzyjnl.cn
http://acini.zzyjnl.cn
http://arista.zzyjnl.cn
http://altorilievo.zzyjnl.cn
http://ceremonious.zzyjnl.cn
http://authorware.zzyjnl.cn
http://anorthic.zzyjnl.cn
http://affine.zzyjnl.cn
http://caducei.zzyjnl.cn
http://chez.zzyjnl.cn
http://branchiae.zzyjnl.cn
http://baal.zzyjnl.cn
http://boric.zzyjnl.cn
http://www.dtcms.com/a/280867.html

相关文章:

  • MySQL数据库----约束
  • C# 构建动态查询表达式(含查询、排序、分页)
  • C语言基础第6天:分支循环
  • Ubuntu24 辅助系统-屏幕键盘的back按键在网页文本框删除不正常的问题解决方法
  • CentOS7 Docker安装MySQL全过程,包括配置远程连接账户
  • fastApi连接数据库
  • 如何正确分配及设置香港站群服务器IP?
  • 深入解析 Java AQS (AbstractQueuedSynchronizer) 的实现原理
  • LeetCode 3136.有效单词:遍历模拟
  • [实战] 基8 FFT/IFFT算法原理与实现(完整C代码)
  • 【每天一个知识点】多模态信息(Multimodal Information)
  • 【知识扫盲】tokenizer.json中的vocab和merges是什么?
  • 【机器学习】第二章 Python入门
  • 【Unity】MiniGame编辑器小游戏(十四)基础支持模块(游戏窗口、游戏对象、物理系统、动画系统、射线检测)
  • 数学中的教学思想
  • MySQL 8.0 OCP 1Z0-908 题目解析(24)
  • P3842 [TJOI2007] 线段
  • Sharding-JDBC 分布式事务实战指南:XA/Seata 方案解析
  • sqli-labs靶场通关笔记:第18-19关 HTTP头部注入
  • 【C++】初识C++(1)
  • 课题学习笔记1——文本问答与信息抽取关键技术研究论文阅读(用于无结构化文本问答的文本生成技术)
  • Java 大视界 -- Java 大数据机器学习模型在金融风险传染路径分析与防控策略制定中的应用(347)
  • QT——QList的详细讲解
  • Redis的下载安装+基础操作+redis客户端的安装
  • 使用 1Panel PHP 运行环境部署 WordPress
  • 辨析git reset三种模式以及和git revert的区别:回退到指定版本和撤销指定版本的操作
  • 零样本轴承故障诊断SC - GAN模型
  • 【PCIe 总线及设备入门学习专栏 5.1.2 -- PCIe EP core_rst_n 与 app_rst_n】
  • React-router
  • 未来大模型在中小型企业如何实现普及