中州养老:Websocket实现报警通知
需求分析
原型图
通知方式:
1)报警生效时间:报警规则的生效时间,报警规则只在生效时间内才会检查监控数据是否需要报警;
2)报警沉默周期:指报警发生后如果未恢复正常,重复发送报警通知的时间间隔; 报警方式:
1)当触发报警规则时,则发送消息通知,
通知对象:设备数据类型=老人异常数据时,通知老人对应的护理员;设备数据类型=设备异常数据时,通知后勤部维修工;
渠道:站内信(见消息通知模块原型图)、短信;
下面这个就是刚刚创建的报警规则查询列表
其中在操作中可以处理报警规则(删除,编辑,启用禁用)
表结构
详细报警规则,如下图:
监测的产品为睡眠监测带,物模型为心率,过滤的是该产品下的所有设备
报警类型为老人异常数据(设备报警通知老人绑定的护理员和超级管理员)
持续周期:
持续1个周期(1周期=1分钟):表示触发报警之后,马上会保存报警数据
持续3个周期(1周期=1分钟):表示触发报警之后,连续三次都是异常数据才会保存报警数据
依此类推
阈值为65,运算符为<:表示采集的心率数据如果小于65就触发报警
沉默周期为5分钟,已经保存报警数据之后,如果后面有连续报警,5分钟之后再触发报警规则
报警生效时段为00:00:00~23:59:59:表示任意时段都会采集数据
案例二:
报警规则如下图:
监测的产品为烟雾报警器,物模型为温度,过滤的是该产品下的全部设备
报警类型为设备异常数据(设备报警通知行政和超级管理员)
持续周期为持续1个周期(1后期=1分钟):表示触发报警之后,马上会保存报警数据
阈值为55,运算符为 >=表示采集的室内温度数据大于等于55就触发报警
沉默周期为5分钟,已经保存报警数据之后,如果后面有连续报警,5分钟之后再触发报警规则
报警生效时段为00:00:00~23:59:59:表示任意时段都会采集数据
当设备中的数据触发了报警规则之后,需要及时提醒相关人员来进行处理,为了更快的让相对应的负责人收到消息,一旦监测到有异常数据,可以在当前后台管理系统中进行通知,如下效果:
Websocket介绍
WebSocket 是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工通信——浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接, 并进行双向数据传输。 HTTP协议和WebSocket协议对比:
HTTP是短连接
WebSocket是长连接
HTTP通信是单向的,基于请求响应模式
WebSocket支持双向通信
HTTP和WebSocket底层都是TCP连接
WebSocket缺点: 服务器长期维护长连接需要一定的成本 各个浏览器支持程度不一
WebSocket 是长连接,受网络限制比较大,需要处理好重连
结论:WebSocket并不能完全取代HTTP,它只适合在特定的场景下使用
适用场景:实况更新
功能实现:
第一步:导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
第二步:定义WebSocket服务和注册WebSocket
package com.zzyl.nursing.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {/*** 注册基于@ServerEndpoint声明的Websocket Endpoint* @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}}
// 包声明,定义当前类所在的包路径
package com.zzyl.nursing.config;// 导入工具类 - 集合工具
import cn.hutool.core.collection.CollUtil;
// 导入工具类 - 对象工具
import cn.hutool.core.util.ObjectUtil;
// 导入工具类 - JSON工具
import cn.hutool.json.JSONUtil;
// 导入自定义异常类
import com.zzyl.common.exception.base.BaseException;
// 导入报警通知值对象
import com.zzyl.nursing.vo.AlertNotifyVo;
// 导入Lombok的日志注解
import lombok.extern.slf4j.Slf4j;
// 导入Spring组件注解
import org.springframework.stereotype.Component;
// 导入Spring集合工具类
import org.springframework.util.CollectionUtils;
// 导入WebSocket启用注解
import org.springframework.web.socket.config.annotation.EnableWebSocket;// 导入WebSocket相关注解和类
import javax.websocket.*;
// 导入路径参数注解
import javax.websocket.server.PathParam;
// 导入WebSocket服务端点注解
import javax.websocket.server.ServerEndpoint;
// 导入IO异常类
import java.io.IOException;
// 导入集合类
import java.util.Collection;
// 导入HashMap类
import java.util.HashMap;
// 导入List接口
import java.util.List;
// 导入Map接口
import java.util.Map;// Lombok注解,自动生成日志记录器
@Slf4j
// Spring注解,标识该类为组件,由Spring容器管理
@Component
// Spring注解,启用WebSocket功能
@EnableWebSocket
// WebSocket注解,定义WebSocket服务端点路径,{sid}为路径参数
@ServerEndpoint("/ws/{sid}")
public class WebSocketServer {// 静态Map,用于存储所有WebSocket会话,key为session id,value为Session对象private static Map<String, Session> sessionMap = new HashMap<>();/*** 连接建立时触发** @param session WebSocket会话对象* @param sid 路径参数,客户端标识*/@OnOpen // WebSocket注解,标识该方法在连接建立时调用public void onOpen(Session session, @PathParam("sid") String sid) {// 记录连接建立的日志信息log.info("有客户端连接到了服务器 , {}", sid);// 将客户端会话存储到Map中sessionMap.put(sid, session);}/*** 服务端接收到消息时触发** @param session WebSocket会话对象* @param message 客户端发送的消息内容* @param sid 路径参数,客户端标识*/@OnMessage // WebSocket注解,标识该方法在接收到消息时调用public void onMessage(Session session, String message, @PathParam("sid") String sid) {// 记录接收到的消息日志log.info("接收到了客户端 {} 发来的消息 : {}", sid, message);}/*** 连接关闭时触发** @param session WebSocket会话对象* @param sid 路径参数,客户端标识*/@OnClose // WebSocket注解,标识该方法在连接关闭时调用public void onClose(Session session, @PathParam("sid") String sid) {// 输出连接断开信息到控制台System.out.println("连接断开:" + sid);// 从Map中移除该客户端的会话sessionMap.remove(sid);}/*** 通信发生错误时触发** @param session WebSocket会话对象* @param sid 路径参数,客户端标识* @param throwable 异常对象*/@OnError // WebSocket注解,标识该方法在发生错误时调用public void onError(Session session, @PathParam("sid") String sid, Throwable throwable) {// 输出错误信息到控制台System.out.println("出现错误:" + sid);// 打印异常堆栈信息throwable.printStackTrace();}/*** 广播消息给所有连接的客户端** @param message 要发送的消息内容* @throws IOException 输入输出异常*/public void sendMessageToAll(String message) throws IOException {// 获取所有会话的集合Collection<Session> sessions = sessionMap.values();// 检查会话集合是否不为空if (!CollectionUtils.isEmpty(sessions)) {// 遍历所有会话for (Session session : sessions) {// 服务器向客户端发送文本消息session.getBasicRemote().sendText(message);}}}/*** 发送websocket消息给指定消费者** @param alertNotifyVo 报警消息对象* @param userIds 用户ID集合,指定要发送消息的用户* @throws IOException io异常*/public void sendMessageToConsumer(AlertNotifyVo alertNotifyVo, Collection<Long> userIds) {// 如果消费者ID集合为空,直接返回,不执行后续操作if (CollUtil.isEmpty(userIds)) {return;}// 如果WebSocket会话Map为空,直接返回,不执行后续操作if (ObjectUtil.isEmpty(sessionMap)) {return;}// 遍历消费者ID集合,发送消息给每个指定的用户userIds.forEach(userId -> {// 根据用户ID获取对应的WebSocket会话Session session = sessionMap.get(String.valueOf(userId));// 如果会话不存在,跳过当前循环,继续下一个if (ObjectUtil.isEmpty(session)) {return;}// 获取该消费者的websocket连接,并发送消息try {// 将报警消息对象转换为JSON字符串并发送给客户端session.getBasicRemote().sendText(JSONUtil.toJsonStr(alertNotifyVo));} catch (IOException e) {// 如果发送消息失败,抛出业务异常throw new BaseException("websocket推送消息失败");}});}
}
/**
* 设备数据匹配过滤规则
* @param deviceData
* @param rule*/private void deviceDataAlarmHandler(DeviceData deviceData, AlertRule rule) {//判断上报的数据是否在生效时段内 00:00:00~23:59:59 00:05:00~22:59:59String[] split = rule.getAlertEffectivePeriod().split("~");LocalTime startTime = LocalTime.parse(split[0]);LocalTime endTime = LocalTime.parse(split[1]);//数据上报的时间LocalTime localTime = deviceData.getAlarmTime().toLocalTime();//如果上报的时间不在生效时段内,则结束请求if(localTime.isBefore(startTime) || localTime.isAfter(endTime)){return;}//统计次数的keyString aggCountKey = CacheConstants.IOT_COUNT_ALERT+deviceData.getIotId()+":"+deviceData.getFunctionId()+":"+rule.getId();//判断上报的数据是否达到了规则的阈值Double dataValue = Double.valueOf(deviceData.getDataValue());Double value = rule.getValue();//工具类x,y(顺序有要求,左边是上报的数据,后边是规则定义的数据) x等于y 返回0 x>y 返回大于0的值 x<y 返回小于0的值int compare = NumberUtil.compare(dataValue, value);if((rule.getOperator().equals(">=") && compare >= 0) || (rule.getOperator().equals("<") && compare < 0)){//符合上报的规则,产生了异常数据log.info("当前数据符合报警规则");}else {log.info("正常上报的数据");redisTemplate.delete(aggCountKey);return;}//沉默周期 持续周期//设计一个redis的可以,必须唯一,代表的当前的设备、物模型、规则IDString silentKey = CacheConstants.IOT_SILENT_ALERT+deviceData.getIotId()+":"+deviceData.getFunctionId()+":"+rule.getId();//获取沉默周期String silentData = redisTemplate.opsForValue().get(silentKey);if(StringUtils.isNotEmpty(silentData)){return;}//持续周期String aggCountData = redisTemplate.opsForValue().get(aggCountKey);Integer count = StringUtils.isEmpty(aggCountData)? 1 : (Integer.parseInt(aggCountData) + 1);//当前count不等于持续周期,就累加数据,并且结束请求if(ObjectUtil.notEqual(count,rule.getDuration())){//累加数据redisTemplate.opsForValue().set(aggCountKey,count+"");return;}//到了报警的条件了,保存一份沉默周期redisTemplate.opsForValue().set(silentKey,"1",rule.getAlertSilentPeriod(), TimeUnit.MINUTES);//删除统计的次数redisTemplate.delete(aggCountKey);//保存异常数据// 判断上报数据的设备的类型,如果老人的异常数据(手表、睡眠检测带) 设备异常(烟雾报警)List<Long> userIds = new ArrayList<>();if(rule.getAlertDataType().equals(0)){//老人异常(手表、睡眠检测带) 设备id-->设备-->老人id--->护理员if(deviceData.getLocationType().equals(0)){//随身设备userIds = deviceMapper.selectNursingIdsByIotIdWithElder(deviceData.getIotId());}else if(deviceData.getLocationType().equals(1) && deviceData.getPhysicalLocationType().equals(2)){//床位设备 设备id-->设备-->床位-->老人id--->护理员userIds = deviceMapper.selectNursingIdsByIotIdWithBed(deviceData.getIotId());}}else {//设备异常 找维修人员 通过角色名称(维修工) 用户 角色 用户角色中间表userIds = sysUserRoleMapper.selectByRoleName("维修工");}//找到超级管理员List<Long> managerIds = sysUserRoleMapper.selectByRoleName("超级管理员");//合并两份用户idCollection<Long> allUserIds = CollUtil.addAll(userIds, managerIds);//去重allUserIds = CollUtil.distinct(allUserIds);//批量保存异常数据List<AlertData> alertDataList = insertAlertData(allUserIds, deviceData, rule);//websocket推送消息websocketNotity(alertDataList.get(0),rule,allUserIds);}@Autowired
private WebSocketServer webSocketServer;/**
* websocket推送消息
* @param alertData
* @param rule
* @param allUserIds*/private void websocketNotity(AlertData alertData, AlertRule rule, Collection<Long> allUserIds) {//属性拷贝AlertNotifyVo alertNotifyVo = BeanUtil.toBean(alertData, AlertNotifyVo.class);alertNotifyVo.setFunctionName(rule.getFunctionName());alertNotifyVo.setAlertDataType(rule.getAlertDataType());alertNotifyVo.setNotifyType(1);//向指定的人推送消息webSocketServer.sendMessageToConsumer(alertNotifyVo,allUserIds);}@Autowired
private IAlertDataService alertDataService;/**
* 保存异常数据
* @param allUserIds
* @param deviceData
* @param rule*/private List<AlertData> insertAlertData(Collection<Long> allUserIds, DeviceData deviceData, AlertRule rule) {//属性拷贝,从deviceData拷贝到alertDataAlertData alertData = BeanUtil.toBean(deviceData, AlertData.class);//关于规则的数据都拷贝不过去alertData.setAlertRuleId(rule.getId());//功能名称+运算符+阈值+持续周期+聚合周期String reason = CharSequenceUtil.format("{}{}{},持续了{}周期,就报警",rule.getFunctionId(),rule.getOperator(),rule.getValue(),rule.getDuration());alertData.setAlertReason(reason);//报警状态alertData.setStatus(0);alertData.setType(rule.getAlertDataType());//批量保存数据了,由于多个人List<AlertData> list = allUserIds.stream().map(userId -> {//再次拷贝AlertData dBalertData = BeanUtil.toBean(alertData, AlertData.class);dBalertData.setUserId(userId);dBalertData.setId(null);return dBalertData;}).collect(Collectors.toList());//批量保存alertDataService.saveBatch(list);return list;}
由于前端也要发请求到后端建立连接,需要在spring security中对于ws的请求放行 放行代码