当前位置: 首页 > news >正文

站内信通知功能websoket+锁+重试机制+多线程

项目背景  在当前系统中,有周期性的任务,每天会执行一次,执行完毕就去创建一条执行日志

目前是通过定时任务自动创建的执行日志,通过定时任务去查全部的数据库,查出每一个任务,查出每一个周期性任务 查出当前时间和周期任务的执行时间是否相同 如果相同就创建一条执行日志

因为执行日志是自动创建的,所以执行人要确保去执行任务 ,就要引入站内信的功能,去通知工作人员去执行任务。采用弹窗提醒 使用websoket

1. 通知触发机制


当前系统有两个主要的通知触发点:


任务提醒通知触发(普通通知)


1. 1. genCycleTask 定时任务每分钟执行一次

    @Scheduled(cron = "0 0/1 * * * ?")public void genCycleTask() {log.info("开始执行genCycleTask");DateTime now = DateTime.now();//获取所有已发布的工作流Workflow query = new Workflow();query.setStatus(WorkflowStatus.PUBED.getType());query.setType(WorkflowType.PATROL.getType());query.setExecType(WorkflowExecType.CYCLE.getType());List<Workflow> workflows = workflowService.selectWorkflowList(query);workflows.parallelStream().forEach(workflow -> {String workflowLockKey = "workflow_" + workflow.getId();AtomicBoolean lock = taskLocks.computeIfAbsent(workflowLockKey, k -> new AtomicBoolean(false));// 尝试获取锁if (lock.compareAndSet(false, true)) {try {processWorkflowTask(workflow, now);} catch (Exception e) {log.error("处理工作流 {} 时发生异常", workflow.getId(), e);} finally {// 释放锁lock.set(false);taskLocks.remove(workflowLockKey, lock);}} else {log.debug("工作流 {} 当前正在处理中,跳过本次执行", workflow.getId());}});}


2. 2.查询所有已发布的周期性巡检工作流

 对每个工作流,检查是否到达执行时间(创建时间+偏移时间是否匹配当前时间)

   /*** 处理工作流任务的具体逻辑*/private void processWorkflowTask(Workflow workflow, DateTime now) {try {int pushCycle = workflow.getPushCycle();String pushUnit = workflow.getPushUnit();Date firstExecTime = workflow.getFirstExecTime();if (firstExecTime == null) {log.warn("工作流 {} 的首次执行时间为空,跳过处理", workflow.getId());return;}// 计算间隔天数int intervalDays = 0;if ("day".equals(pushUnit)) {intervalDays = pushCycle;} else if ("week".equals(pushUnit)) {intervalDays = pushCycle * 7;}long day = DateUtil.betweenDay(firstExecTime, now, false);if (Convert.toInt(day) % intervalDays == 0) {// 获取当前时间并计算30分钟后的时间DateTime thirtyMinutesLater = now.offset(DateField.MINUTE, 30);// 获取首次执行时间的时分秒格式String firstExecTimeStr = DateTime.of(firstExecTime).toString("HHmmss");String thirtyMinutesLaterStr = thirtyMinutesLater.toString("HHmmss");// 比较首次执行时间是否与30分钟后的时间匹配if (firstExecTimeStr.equals(thirtyMinutesLaterStr)) {// 创建任务日志并发送通知createTaskLogAndNotify(workflow);log.info("符合条件,已创建任务并发送通知: {}", workflow.getName());}}} catch (Exception e) {log.error("处理工作流任务时发生异常", e);throw e;}}


4. 4. 符合条件时,调用 createTaskLogAndNotify 方法创建任务并触发通知

    /*** 创建任务执行日志并发送通知* @param workflow 工作流信息*/private void createTaskLogAndNotify(Workflow workflow) {if (workflow == null) {log.warn("工作流信息为空,无法创建任务日志和发送通知");return;}String taskLogLockKey = "task_log_" + workflow.getId() + "_" + System.currentTimeMillis();AtomicBoolean lock = taskLocks.computeIfAbsent(taskLogLockKey, k -> new AtomicBoolean(false));// 尝试获取锁if (lock.compareAndSet(false, true)) {try {// 1. 创建任务执行日志(现有逻辑)createTaskPlan(workflow);log.info("已为工作流 {} 创建任务计划", workflow.getName());// 2. 获取执行人IDLong executorId = workflow.getExecUserId();if (executorId == null) {log.warn("工作流 {} 的执行人ID为空,无法发送通知", workflow.getName());return;}// 3. 异步发送WebSocket通知taskExecutor.submit(() -> {try {sendTaskNotification(executorId, workflow.getName());} catch (Exception e) {log.error("异步发送任务通知时发生异常", e);}});} catch (Exception e) {log.error("创建任务日志并发送通知时发生异常", e);} finally {// 释放锁lock.set(false);taskLocks.remove(taskLogLockKey, lock);}} else {log.debug("工作流 {} 的任务日志创建和通知发送正在进行中,跳过本次执行", workflow.getId());}}

   /*** 发送任务通知* @param userId 用户ID* @param taskName 任务名称*/private void sendTaskNotification(Long userId, String taskName) {sendNotification(userId, taskName, false);}
    /*** 发送任务通知* @param userId 用户ID* @param taskName 任务名称* @param isTimeout 是否为超时通知*/private void sendNotification(Long userId, String taskName, boolean isTimeout) {if (userId == null) {log.warn("用户ID为空,无法发送通知");return;}String message = isTimeout ? "任务'" + taskName + "'已经超时,赶快巡检" : "任务'" + taskName + "'将在30分钟后执行,请准备";try {Map<String, Object> msg = new HashMap<>();msg.put("inspectionReminder", message);String url = "http://localhost:8997/ws-api/sendExecMessage?userId=" + userId;String content = JSONUtil.toJsonStr(msg);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity<String> request = new HttpEntity<>(content, headers);// 添加重试机制int maxRetries = 3;int retryCount = 0;boolean success = false;while (!success && retryCount < maxRetries) {try {restTemplate.postForObject(url, request, String.class);success = true;log.info("已发送{}{}通知给用户 {}: {}", isTimeout ? "超时" : "", taskName, userId, message);} catch (Exception e) {retryCount++;if (retryCount >= maxRetries) {log.error("发送{}{}通知给用户 {} 失败,已重试{}次", isTimeout ? "超时" : "", taskName, userId, maxRetries, e);} else {log.warn("发送通知失败,正在进行第{}次重试", retryCount);// 指数退避重试Thread.sleep(1000 * (long)Math.pow(2, retryCount - 1));}}}} catch (Exception e) {log.error("构建或发送通知过程中发生异常", e);}}

这里采用后端发送请求到另一个websoket模块的接口,是因为当时在开发阶段,发现websoket存储的在线用户的map,在站内信模块调用时永远都是0,原因应该是类加载机制导致的问题,所以采用跨模块的方式,避免让他找不到在线用户

  /*** 向指定在线执行日志任务用户发送消息** @param userId 数据所属执行任务用户ID(不是单个目标用户,而是消息携带的业务归属标识)* @param content 消息内容(JSON格式字符串)*/@PostMapping("/sendExecMessage")public AjaxResult sendExecMessage(@RequestParam Long userId,@RequestBody String content) {// 直接调用 Socket 的发送方法RtcSocket.sendMsgByUserId(userId, content);return AjaxResult.success("消息已发送");}
    public static void sendMsgByUserId(Long userId, String msg) {try {WEB_SOCKET_MAP.forEach((k,v)->{if(k.startsWith(userId + "_")){try {v.session.getBasicRemote().sendText(msg);} catch (IOException e) {e.printStackTrace();}}});} catch (Exception e) {e.printStackTrace();logger.error(e.getMessage());}}

根据用户id,准确的发送到对应的用户

任务超时通知触发


1. 1.
checkTimeoutTasks 定时任务每5分钟执行一次
2. 2.
查询当天该工作流的未完成任务计划
3. 3.
计算任务应该的执行时间(创建时间+30分钟)以及超时检查时间(执行时间+15分钟)
4. 4.
如果当前时间已超过超时检查时间且任务未开始执行,调用sendTimeoutNotification 发送超时通知


2. 通知发送流程


系统通过统一的 sendNotification 方法处理所有通知,具体流程如下:

1. 1.参数校验 :检查用户ID是否为空,为空则记录警告日志并返回


2. 2. 构建消息内容 :根据是否为超时通知,构建不同的消息内容


3. 3.准备HTTP请求 :构建包含消息内容的HTTP请求实体


4. 4.执行重试发送 :

- - 设置最大重试次数为3次
- 使用 restTemplate 调用WebSocket服务发送通知
- 发送成功则记录成功日志并结束
- 发送失败则进行重试,每次重试前根据指数退避策略等待一段时间
- 超过最大重试次数后记录失败日志
- 5.
异常处理 :捕获并记录整个过程中可能发生的异常

    /*** 发送任务通知* @param userId 用户ID* @param taskName 任务名称* @param isTimeout 是否为超时通知*/private void sendNotification(Long userId, String taskName, boolean isTimeout) {if (userId == null) {log.warn("用户ID为空,无法发送通知");return;}String message = isTimeout ? "任务'" + taskName + "'已经超时,赶快巡检" : "任务'" + taskName + "'将在30分钟后执行,请准备";try {Map<String, Object> msg = new HashMap<>();msg.put("inspectionReminder", message);String url = "http://localhost:8997/ws-api/sendExecMessage?userId=" + userId;String content = JSONUtil.toJsonStr(msg);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity<String> request = new HttpEntity<>(content, headers);// 添加重试机制int maxRetries = 3;int retryCount = 0;boolean success = false;while (!success && retryCount < maxRetries) {try {restTemplate.postForObject(url, request, String.class);success = true;log.info("已发送{}{}通知给用户 {}: {}", isTimeout ? "超时" : "", taskName, userId, message);} catch (Exception e) {retryCount++;if (retryCount >= maxRetries) {log.error("发送{}{}通知给用户 {} 失败,已重试{}次", isTimeout ? "超时" : "", taskName, userId, maxRetries, e);} else {log.warn("发送通知失败,正在进行第{}次重试", retryCount);// 指数退避重试Thread.sleep(1000 * (long)Math.pow(2, retryCount - 1));}}}} catch (Exception e) {log.error("构建或发送通知过程中发生异常", e);}}

这里的重试机制,对于高并发高压力的情况下,并不完美,应该引入消息队列采用专业的重试机制和采用异步重试,这里的重试现在是同步阻塞的

3. 通知类型详解 任务提醒通知


- 触发条件 :任务将在30分钟后执行
- 消息内容 : 任务'xxx'将在30分钟后执行,请准备
- 发送时机 :任务创建时异步发送 任务超时通知
- 触发条件 :任务已超过执行时间15分钟且未开始执行
- 消息内容 : 任务'xxx'已经超时,赶快巡检
- 发送时机 :每5分钟检查一次,发现超时任务时发送

websoket

当前项目是建立在websoket的背景下进行的通信 

在前端,用户登录完毕,就进行websoket链接

function handleLogin() {proxy.$refs.loginRef.validate(valid => {if (valid) {loading.value = true; Cookies.set("username", loginForm.value.username, { expires: 30 });Cookies.set("password", encrypt(loginForm.value.password), { expires: 30 }); // 调用action的登录方法userStore.login(loginForm.value).then((token) => { router.push({ path: "/"});AndroidInterface.login(token)}).catch(() => {loading.value = false; });}});
}

后端进行配置

链接进行相关逻辑

    public static final Map<String, RtcSocket> WEB_SOCKET_MAP = Collections.synchronizedMap(new HashMap<>());

用一个map放在线用户

http://www.dtcms.com/a/390690.html

相关文章:

  • Vue 3 <script setup> 语法详解
  • Redis三种服务架构详解:主从复制、哨兵模式与Cluster集群
  • 复习1——IP网络基础
  • MATLAB中借助pdetool 实现有限元求解Possion方程
  • string::c_str()写入导致段错误?const指针的只读特性与正确用法
  • 深度解析 CopyOnWriteArrayList:并发编程中的读写分离利器
  • 直接看 rstudio里面的 rds 数据 无法看到 expr 表达矩阵的详细数据 ,有什么办法呢
  • 【示例】通义千问Qwen大模型解析本地pdf文档,转换成markdown格式文档
  • 企业级容器技术Docker 20250919总结
  • 微信小程序-隐藏自定义 tabbar
  • leetcode15.三数之和
  • 强化学习Gym库的常用API
  • ✅ Python微博舆情分析系统 Flask+SnowNLP情感分析 词云可视化 爬虫大数据 爬虫+机器学习+可视化
  • 红队渗透实战
  • 基于MATLAB的NSCT(非下采样轮廓波变换)实现
  • 创建vue3项目,npm install后,运行报错,已解决
  • 设计模式(C++)详解—外观模式(1)
  • pnpm 进阶配置:依赖缓存优化、工作区搭建与镜像管理
  • gitlab:从CentOS 7.9迁移至Ubuntu 24.04.2(版本17.2.2-ee)
  • 有哪些适合初学者的Java项目?
  • 如何开始学习Java编程?
  • 【项目实战 Day3】springboot + vue 苍穹外卖系统(菜品模块 完结)
  • 华为 ai 机考 编程题解答
  • Docker多容器通过卷共享 R 包目录
  • 【保姆级教程】MasterGo MCP + Cursor 一键实现 UI 设计稿还原
  • Unity 性能优化 之 理论基础 (Culling剔除 | Simplization简化 | Batching合批)
  • react+andDesign+vite+ts从零搭建后台管理系统
  • No007:构建生态通道——如何让DeepSeek更贴近生产与生活的真实需求
  • 力扣Hot100--206.反转链表
  • Java 生态监控体系实战:Prometheus+Grafana+SkyWalking 整合全指南(三)