当前位置: 首页 > news >正文

WebApplicationType.REACTIVE 的webSocket

通用请求体类

@Data
@ApiModel("websocket请求消息")
public class WebSocketRequest<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 参考:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum;*/private @NotNull(message = "业务操作类型不能为空") Integer aiBroadcastEventEnum;private T data;public T getRealData(Class<T> clazz) {if (this.data == null) {return null;} else {String jsonStr = JsonUtil.toJsonStr(this.data);return (T) JsonUtil.parseObject(jsonStr, clazz);}}}

通用响应类

@ApiModel("websocket响应消息")
@Data
public class WebSocketResponse<T> implements Serializable {private static final long serialVersionUID = 1L;/*** 参考枚举:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer aiBroadcastEventEnum;private String code;private String msg;private T data;public static <T> Mono<WebSocketResponse<T>> ok(Integer eventEnum, T data) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());response.setData(data);return Mono.just(response);}public static Mono<WebSocketResponse<Void>> ok(Integer eventEnum) {WebSocketResponse<Void> response = new WebSocketResponse<Void>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(RespStatusEnum.OK.getCode());response.setMsg(RespStatusEnum.OK.getMsg());return Mono.just(response);}public static <T> Mono<WebSocketResponse<T>> fail(Integer eventEnum, RespStatusEnum status, String err) {WebSocketResponse<T> response = new WebSocketResponse<T>();response.setAiBroadcastEventEnum(eventEnum);response.setCode(status.getCode());response.setMsg(err);return Mono.just(response);}}

连接类型类

@Data
@Accessors(chain = true)
public class ConnectDTO {/*** 连接类型 参考枚举:com.mcmcnet.gacne.basic.service.common.pojo.enumeration.screen.AiBroadcastEventEnum*/private Integer type;}
  1. 配置类
@Configuration
public class WebFluxWebSocketConfig {/** 让 Spring 注入已经带依赖的 Handler */@Beanpublic HandlerMapping webSocketMapping(WebSocketReceivedHandler handler) {return new SimpleUrlHandlerMapping(Map.of("/api/xxx/ws", handler),   // 用注入的 handler-1);}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
  1. 实现类
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketReceivedHandler implements WebSocketHandler {@Autowiredprivate AiBroadcastEventHandlerDispatcher<?, ?> dispatcher;@Autowiredprivate WsSessionPool wsSessionPool;@Overridepublic @NotNull Mono<Void> handle(@NotNull WebSocketSession session) {wsSessionPool.add(session);String sid = session.getId();// 处理客户端请求消息,生成响应消息流Flux<WebSocketMessage> inputFlux = session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(payload -> dispatcher.doDispatch(session, payload).map(session::textMessage));// 服务端广播消息流Flux<WebSocketMessage> broadcastFlux = wsSessionPool.getPersonalFlux(sid).map(session::textMessage);// 合并两个流,确保 session.send 只调用一次Flux<WebSocketMessage> mergedFlux = Flux.merge(inputFlux, broadcastFlux);return session.send(mergedFlux).doFinally(sig -> {wsSessionPool.remove(session);log.info("websocket 关闭,sessionId:{},signal:{}", session.getId(), sig);});}
}

3.处理类 aiBroadcastEventEnum 是枚举类型,根据不同的枚举类型进入不同的处理类进行处理不同的消息返回


@Component
@Slf4j
public class AiBroadcastEventHandlerDispatcher<T, R> {private final Map<Integer, AiBroadcastEventHandler<T, R>> eventMap = new HashMap<>();/** 由 Spring 注入所有事件处理器 */public AiBroadcastEventHandlerDispatcher(List<AiBroadcastEventHandler<T, R>> handlers) {handlers.forEach(h -> eventMap.put(h.aiBroadcastEvent(), h));}/*** 处理前端发来的 Payload 并把响应写回当前 session** @param session 当前 WebFlux Session* @param payload 文本 JSON* @return Mono<Void> 供调用方链式订阅*/public Mono<String> doDispatch(WebSocketSession session, String payload) {WebSocketRequest<R> webSocketRequest = JsonUtil.parseObject(payload, new TypeReference<WebSocketRequest<R>>() {});log.info("webSocketRequest:{}, sessionID:{}", webSocketRequest, session.getId());// 发送响应并记录日志return handlerRequest(webSocketRequest, session).map(JsonUtil::toJson).doOnNext(json -> log.info("准备发送: {}", json)).onErrorResume(e -> {log.error("发送异常", e);return Mono.just(JsonUtil.toJson(WebSocketResponse.fail(webSocketRequest != null ? webSocketRequest.getAiBroadcastEventEnum() : null,RespStatusEnum.INTERNAL_SERVICE_ERROR,"系统异常:" + e.getMessage())));});}/** 实际路由到具体 Handler */private Mono<WebSocketResponse<T>> handlerRequest(WebSocketRequest<R> req, WebSocketSession session) {if (ObjectUtil.isNull(req) || !eventMap.containsKey(req.getAiBroadcastEventEnum())) {return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,"aiBroadcastEventEnum not find");}try {return eventMap.get(req.getAiBroadcastEventEnum()).handler(req, session);} catch (Exception e) {log.error("websocket 处理消息异常,webSocketRequest:{}, sessionID:{}",req, session.getId(), e);return WebSocketResponse.fail(req.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR, e.getMessage());}}}
  1. 接口
public interface AiBroadcastEventHandler<T, R> {/*** 对应事件枚举值(例如 MAP_ALARM_CHARGING 的 code)*/Integer aiBroadcastEvent();/*** 执行处理逻辑,并返回响应,最终由 dispatcher 推送给前端** @param webSocketRequest 请求体* @param session          当前连接* @return Mono<WebSocketResponse<T>> 最终会转成 JSON 发给前端*/Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);/*** 校验参数*/void validateParam(WebSocketRequest<R> webSocketRequest) throws ParameterException;
  1. 通用处理逻辑
public abstract class AbstractAiBroadcastEventHandler<T, R>implements AiBroadcastEventHandler<T, R> {/*** websocket 请求事件处理统一流程:参数校验 → 业务处理*/@Overridepublic Mono<WebSocketResponse<T>> handler(WebSocketRequest<R> webSocketRequest,WebSocketSession session) {try {validateParam(webSocketRequest);return doHandler(webSocketRequest, session);} catch (ParameterException e) {return WebSocketResponse.fail(webSocketRequest.getAiBroadcastEventEnum(),RespStatusEnum.INTERNAL_SERVICE_ERROR,e.getMessage());}}/*** 子类实现真正的业务逻辑:*   1. 更新本地 sessionId / Redis 映射*   2. 查询并返回最新业务数据*/public abstract Mono<WebSocketResponse<T>> doHandler(WebSocketRequest<R> webSocketRequest, WebSocketSession session);
  1. 实现类
@Component
@Slf4j
public class SubscribeFireContentHandler extends AbstractAiBroadcastEventHandler<VO, ConnectDTO> implements AiBroadcastEventHandler<VO, ConnectDTO>{@Autowiredprivate BizServiceSafeScreenClient bizServiceSafeScreenClient;@Overridepublic Integer aiBroadcastEvent() {return AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode();}@Overridepublic Mono<WebSocketResponse<VO>> doHandler(WebSocketRequest<ConnectDTO> webSocketRequest, WebSocketSession session) {log.info("SubscribeFireContentHandler doHandler start");//session订阅该订单数据ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (!AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode().equals(dto.getType())) {return Mono.error(new RespException("参数异常", RespStatusEnum.CLIENT_ERROR));}// 查询火灾站 前端拼接内容FinalResultVO<VO> xxx = 调用接口获取数据;if (xxx  != null) {Mono<WebSocketResponse<FireStationVO>> ok = WebSocketResponse.ok(AiBroadcastEventEnum.AI_FIRE_ALARM_CONTENT.getCode(), xxx);return ok;}return Mono.empty();}@Overridepublic void validateParam(WebSocketRequest<ConnectDTO> webSocketRequest) {ConnectDTO dto = webSocketRequest.getRealData(ConnectDTO.class);if (ObjUtil.isNull(dto.getType())) {throw new RespException("参数异常", RespStatusEnum.CLIENT_ERROR);}}

7.心跳

@Component
@Log4j2
public class WsHeartbeatTask {private final WsSessionPool wsSessionPool;public WsHeartbeatTask(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@PostConstructpublic void init() {log.info("WebSocket心跳任务已启动");}// 每30秒广播一个心跳消息@Scheduled(fixedRate = 30_000)public void sendHeartbeat() {String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String json = String.format("{\"type\":\"ping\",\"timestamp\":\"%s\"}", timeStr);wsSessionPool.broadcast(json);}
}

文章转载自:
http://calcific.hdqtgc.cn
http://anatomical.hdqtgc.cn
http://brilliant.hdqtgc.cn
http://case.hdqtgc.cn
http://antifertility.hdqtgc.cn
http://caliology.hdqtgc.cn
http://albomycin.hdqtgc.cn
http://adherence.hdqtgc.cn
http://becalmed.hdqtgc.cn
http://aurorean.hdqtgc.cn
http://biomagnification.hdqtgc.cn
http://bordello.hdqtgc.cn
http://audibly.hdqtgc.cn
http://bristle.hdqtgc.cn
http://acumen.hdqtgc.cn
http://aluminothermics.hdqtgc.cn
http://autoindex.hdqtgc.cn
http://cavalryman.hdqtgc.cn
http://azc.hdqtgc.cn
http://alf.hdqtgc.cn
http://asiatic.hdqtgc.cn
http://abolition.hdqtgc.cn
http://ascensiontide.hdqtgc.cn
http://cecal.hdqtgc.cn
http://apartheid.hdqtgc.cn
http://adriatic.hdqtgc.cn
http://agonic.hdqtgc.cn
http://chillness.hdqtgc.cn
http://bagged.hdqtgc.cn
http://brethren.hdqtgc.cn
http://www.dtcms.com/a/281388.html

相关文章:

  • dotnet命令详解
  • linux的数据库与web服务器
  • LSTM入门案例(时间序列预测)
  • 平升智慧水务整体解决方案,大数据驱动的智慧水务,让城市供水更智能
  • 康谋分享 | 破解数据瓶颈:智能汽车合成数据架构与应用实践
  • 改进_开源证券_VCF_多尺度量价背离检测因子!
  • 【从0-1的JavaScript】第1篇:JavaScript的引入方式和基础语法
  • 第五章 管道工程 5.2 燃气管道
  • 数据库第三次作业
  • 脚手架新建Vue2/Vue3项目时,项目文件内容的区别
  • yolo-world环境配置
  • 【PCIe 总线及设备入门学习专栏 5.1.1 -- PCIe PERST# 信号的作用】
  • 关于实习的经验贴
  • eSearch识屏 · 搜索 v14.3.0
  • Redis集群搭建(主从、哨兵、读写分离)
  • netstat -tlnp | grep 5000
  • 3.创建表-demo
  • 进程的内存映像,只读区,可读写区,堆,共享库,栈详解
  • 23.将整数转换为罗马数字
  • 磁悬浮轴承的“眼睛”:位移测量核心技术深度解析
  • 【监控实战】Grafana自动登录如何实现
  • 关于tresos Studio(EB)的MCAL配置之FEE
  • dataLoader是不是一次性的
  • 文心一言4.5企业级部署实战:多模态能力与Docker容器化测评
  • 告别手动迁移:使用 PowerShell 一键导出 IIS 配置,让服务器迁移更轻松
  • LSA链路状态通告
  • QT——文件选择对话框 QFileDialog
  • Transformer是什么 - 李沐论文《Attention Is All You Need》精读
  • 内网穿透实例:在 NAT 环境下通过 FRP 配置 ThinLinc 远程桌面 实现外网登录
  • zynq串口的例子