站内信通知功能websoket+锁+重试机制+多线程
项目背景 在当前系统中,有周期性的任务,每天会执行一次,执行完毕就去创建一条执行日志
目前是通过定时任务自动创建的执行日志,通过定时任务去查全部的数据库,查出每一个任务,查出每一个周期性任务 查出当前时间和周期任务的执行时间是否相同 如果相同就创建一条执行日志
因为执行日志是自动创建的,所以执行人要确保去执行任务 ,就要引入站内信的功能,去通知工作人员去执行任务。采用弹窗提醒 使用websoket
1. 通知触发机制
当前系统有两个主要的通知触发点:
任务提醒通知触发(普通通知)
1. 1. genCycleTask 定时任务每分钟执行一次
@Scheduled(cron = "0 0/1 * * * ?")public void genCycleTask() {log.info("开始执行genCycleTask");DateTime now = DateTime.now();//获取所有已发布的工作流Workflow query = new Workflow();query.setStatus(WorkflowStatus.PUBED.getType());query.setType(WorkflowType.PATROL.getType());query.setExecType(WorkflowExecType.CYCLE.getType());List<Workflow> workflows = workflowService.selectWorkflowList(query);workflows.parallelStream().forEach(workflow -> {String workflowLockKey = "workflow_" + workflow.getId();AtomicBoolean lock = taskLocks.computeIfAbsent(workflowLockKey, k -> new AtomicBoolean(false));// 尝试获取锁if (lock.compareAndSet(false, true)) {try {processWorkflowTask(workflow, now);} catch (Exception e) {log.error("处理工作流 {} 时发生异常", workflow.getId(), e);} finally {// 释放锁lock.set(false);taskLocks.remove(workflowLockKey, lock);}} else {log.debug("工作流 {} 当前正在处理中,跳过本次执行", workflow.getId());}});}
2. 2.查询所有已发布的周期性巡检工作流
对每个工作流,检查是否到达执行时间(创建时间+偏移时间是否匹配当前时间)
/*** 处理工作流任务的具体逻辑*/private void processWorkflowTask(Workflow workflow, DateTime now) {try {int pushCycle = workflow.getPushCycle();String pushUnit = workflow.getPushUnit();Date firstExecTime = workflow.getFirstExecTime();if (firstExecTime == null) {log.warn("工作流 {} 的首次执行时间为空,跳过处理", workflow.getId());return;}// 计算间隔天数int intervalDays = 0;if ("day".equals(pushUnit)) {intervalDays = pushCycle;} else if ("week".equals(pushUnit)) {intervalDays = pushCycle * 7;}long day = DateUtil.betweenDay(firstExecTime, now, false);if (Convert.toInt(day) % intervalDays == 0) {// 获取当前时间并计算30分钟后的时间DateTime thirtyMinutesLater = now.offset(DateField.MINUTE, 30);// 获取首次执行时间的时分秒格式String firstExecTimeStr = DateTime.of(firstExecTime).toString("HHmmss");String thirtyMinutesLaterStr = thirtyMinutesLater.toString("HHmmss");// 比较首次执行时间是否与30分钟后的时间匹配if (firstExecTimeStr.equals(thirtyMinutesLaterStr)) {// 创建任务日志并发送通知createTaskLogAndNotify(workflow);log.info("符合条件,已创建任务并发送通知: {}", workflow.getName());}}} catch (Exception e) {log.error("处理工作流任务时发生异常", e);throw e;}}
4. 4. 符合条件时,调用 createTaskLogAndNotify 方法创建任务并触发通知
/*** 创建任务执行日志并发送通知* @param workflow 工作流信息*/private void createTaskLogAndNotify(Workflow workflow) {if (workflow == null) {log.warn("工作流信息为空,无法创建任务日志和发送通知");return;}String taskLogLockKey = "task_log_" + workflow.getId() + "_" + System.currentTimeMillis();AtomicBoolean lock = taskLocks.computeIfAbsent(taskLogLockKey, k -> new AtomicBoolean(false));// 尝试获取锁if (lock.compareAndSet(false, true)) {try {// 1. 创建任务执行日志(现有逻辑)createTaskPlan(workflow);log.info("已为工作流 {} 创建任务计划", workflow.getName());// 2. 获取执行人IDLong executorId = workflow.getExecUserId();if (executorId == null) {log.warn("工作流 {} 的执行人ID为空,无法发送通知", workflow.getName());return;}// 3. 异步发送WebSocket通知taskExecutor.submit(() -> {try {sendTaskNotification(executorId, workflow.getName());} catch (Exception e) {log.error("异步发送任务通知时发生异常", e);}});} catch (Exception e) {log.error("创建任务日志并发送通知时发生异常", e);} finally {// 释放锁lock.set(false);taskLocks.remove(taskLogLockKey, lock);}} else {log.debug("工作流 {} 的任务日志创建和通知发送正在进行中,跳过本次执行", workflow.getId());}}
/*** 发送任务通知* @param userId 用户ID* @param taskName 任务名称*/private void sendTaskNotification(Long userId, String taskName) {sendNotification(userId, taskName, false);}
/*** 发送任务通知* @param userId 用户ID* @param taskName 任务名称* @param isTimeout 是否为超时通知*/private void sendNotification(Long userId, String taskName, boolean isTimeout) {if (userId == null) {log.warn("用户ID为空,无法发送通知");return;}String message = isTimeout ? "任务'" + taskName + "'已经超时,赶快巡检" : "任务'" + taskName + "'将在30分钟后执行,请准备";try {Map<String, Object> msg = new HashMap<>();msg.put("inspectionReminder", message);String url = "http://localhost:8997/ws-api/sendExecMessage?userId=" + userId;String content = JSONUtil.toJsonStr(msg);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity<String> request = new HttpEntity<>(content, headers);// 添加重试机制int maxRetries = 3;int retryCount = 0;boolean success = false;while (!success && retryCount < maxRetries) {try {restTemplate.postForObject(url, request, String.class);success = true;log.info("已发送{}{}通知给用户 {}: {}", isTimeout ? "超时" : "", taskName, userId, message);} catch (Exception e) {retryCount++;if (retryCount >= maxRetries) {log.error("发送{}{}通知给用户 {} 失败,已重试{}次", isTimeout ? "超时" : "", taskName, userId, maxRetries, e);} else {log.warn("发送通知失败,正在进行第{}次重试", retryCount);// 指数退避重试Thread.sleep(1000 * (long)Math.pow(2, retryCount - 1));}}}} catch (Exception e) {log.error("构建或发送通知过程中发生异常", e);}}
这里采用后端发送请求到另一个websoket模块的接口,是因为当时在开发阶段,发现websoket存储的在线用户的map,在站内信模块调用时永远都是0,原因应该是类加载机制导致的问题,所以采用跨模块的方式,避免让他找不到在线用户
/*** 向指定在线执行日志任务用户发送消息** @param userId 数据所属执行任务用户ID(不是单个目标用户,而是消息携带的业务归属标识)* @param content 消息内容(JSON格式字符串)*/@PostMapping("/sendExecMessage")public AjaxResult sendExecMessage(@RequestParam Long userId,@RequestBody String content) {// 直接调用 Socket 的发送方法RtcSocket.sendMsgByUserId(userId, content);return AjaxResult.success("消息已发送");}
public static void sendMsgByUserId(Long userId, String msg) {try {WEB_SOCKET_MAP.forEach((k,v)->{if(k.startsWith(userId + "_")){try {v.session.getBasicRemote().sendText(msg);} catch (IOException e) {e.printStackTrace();}}});} catch (Exception e) {e.printStackTrace();logger.error(e.getMessage());}}
根据用户id,准确的发送到对应的用户
任务超时通知触发
1. 1.
checkTimeoutTasks 定时任务每5分钟执行一次
2. 2.
查询当天该工作流的未完成任务计划
3. 3.
计算任务应该的执行时间(创建时间+30分钟)以及超时检查时间(执行时间+15分钟)
4. 4.
如果当前时间已超过超时检查时间且任务未开始执行,调用sendTimeoutNotification 发送超时通知
2. 通知发送流程
系统通过统一的 sendNotification 方法处理所有通知,具体流程如下:
1. 1.参数校验 :检查用户ID是否为空,为空则记录警告日志并返回
2. 2. 构建消息内容 :根据是否为超时通知,构建不同的消息内容
3. 3.准备HTTP请求 :构建包含消息内容的HTTP请求实体
4. 4.执行重试发送 :
- - 设置最大重试次数为3次
- 使用 restTemplate 调用WebSocket服务发送通知
- 发送成功则记录成功日志并结束
- 发送失败则进行重试,每次重试前根据指数退避策略等待一段时间
- 超过最大重试次数后记录失败日志
- 5.
异常处理 :捕获并记录整个过程中可能发生的异常
/*** 发送任务通知* @param userId 用户ID* @param taskName 任务名称* @param isTimeout 是否为超时通知*/private void sendNotification(Long userId, String taskName, boolean isTimeout) {if (userId == null) {log.warn("用户ID为空,无法发送通知");return;}String message = isTimeout ? "任务'" + taskName + "'已经超时,赶快巡检" : "任务'" + taskName + "'将在30分钟后执行,请准备";try {Map<String, Object> msg = new HashMap<>();msg.put("inspectionReminder", message);String url = "http://localhost:8997/ws-api/sendExecMessage?userId=" + userId;String content = JSONUtil.toJsonStr(msg);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);HttpEntity<String> request = new HttpEntity<>(content, headers);// 添加重试机制int maxRetries = 3;int retryCount = 0;boolean success = false;while (!success && retryCount < maxRetries) {try {restTemplate.postForObject(url, request, String.class);success = true;log.info("已发送{}{}通知给用户 {}: {}", isTimeout ? "超时" : "", taskName, userId, message);} catch (Exception e) {retryCount++;if (retryCount >= maxRetries) {log.error("发送{}{}通知给用户 {} 失败,已重试{}次", isTimeout ? "超时" : "", taskName, userId, maxRetries, e);} else {log.warn("发送通知失败,正在进行第{}次重试", retryCount);// 指数退避重试Thread.sleep(1000 * (long)Math.pow(2, retryCount - 1));}}}} catch (Exception e) {log.error("构建或发送通知过程中发生异常", e);}}
这里的重试机制,对于高并发高压力的情况下,并不完美,应该引入消息队列采用专业的重试机制和采用异步重试,这里的重试现在是同步阻塞的
3. 通知类型详解 任务提醒通知
- 触发条件 :任务将在30分钟后执行
- 消息内容 : 任务'xxx'将在30分钟后执行,请准备
- 发送时机 :任务创建时异步发送 任务超时通知
- 触发条件 :任务已超过执行时间15分钟且未开始执行
- 消息内容 : 任务'xxx'已经超时,赶快巡检
- 发送时机 :每5分钟检查一次,发现超时任务时发送
websoket
当前项目是建立在websoket的背景下进行的通信
在前端,用户登录完毕,就进行websoket链接
function handleLogin() {proxy.$refs.loginRef.validate(valid => {if (valid) {loading.value = true; Cookies.set("username", loginForm.value.username, { expires: 30 });Cookies.set("password", encrypt(loginForm.value.password), { expires: 30 }); // 调用action的登录方法userStore.login(loginForm.value).then((token) => { router.push({ path: "/"});AndroidInterface.login(token)}).catch(() => {loading.value = false; });}});
}
后端进行配置
链接进行相关逻辑
public static final Map<String, RtcSocket> WEB_SOCKET_MAP = Collections.synchronizedMap(new HashMap<>());
用一个map放在线用户