websocket入门到实战(详解websocket,实战聊天室,消息推送,springboot+vue)
入门websocket的基础应该掌握一下问题:
1、什么是握手?
2、什么是websocket?
3、websocket和http的区别,应用场景
4、websocket的使用
5、使用vue+elementui打造简单聊天室
6、使用websocket进行信息实时推送,使用mysql数据库进行存储(若依Ruoyi+websocket)
7、【进阶】使用STOMP(websocket的高级封装)-配合Spring-security服用
8、【进阶】整合rabbitMQ进行信息处理
1、什么是握手?
“握手”在计算机网络中是一个比喻性的术语,用来描述两个设备或程序在通信开始前,互相确认彼此身份、能力和准备状态的过程,就像现实中人们见面时通过“握手”表示友好和确认一样。
简单来说就是:
握手 = 建立通信前的“问好+确认”步骤
目的:确保双方都准备好、安全、可靠地通信。
什么是TCP的三次握手?
“三次握手”是指在建立 TCP连接 时,客户端与服务器之间进行的三个步骤,用于确保双方都能正常发送和接收数据。这是 TCP 协议中非常基础而重要的概念。
为什么要三次握手?
主要目的是为了 确保双方都具备发送和接收能力,并为数据传输建立可靠的连接。
三次握手的详细解析
可以将客户端理解成浏览器,服务器就是后端
客户端 服务器
| |
| --------------------- SYN, Seq = 100 -----------> | 第一次握手
| |
| <--- SYN+ACK, Seq = 200, Ack = 101 ---> | 第二次握手
| |
| ------ ACK, Seq = 101, Ack = 201 -----------> | 第三次握手
| |
模拟的三次抓包详细
第一次握手:客户端 → 服务器
-
客户端向服务器发送一个 SYN(同步)包,表示希望建立连接。
-
包含客户端的初始序列号(
Seq = 100
)。
第二次握手:服务器 → 客户端
-
服务器收到 SYN 后,回复一个 SYN+ACK 包。
-
表示“我同意建立连接”,并告诉客户端自己的初始序列号(
Seq = 200
)。 -
同时对客户端的 SYN 进行确认(
Ack = 101
)。
第三次握手:客户端 → 服务器
-
客户端收到 SYN+ACK 后,再发送一个 ACK 包,表示连接建立完成。
-
确认服务器的序列号(Seq=101,
Ack = 201
)。
现实生活中的例子
小陈:你好,我是 小陈,能听见我吗?(SYN)
小霜:你好 小陈,我是 小霜,能听见你,我这边也能说话,你能听我吗?(SYN+ACK)
小陈:能听见,咱们开始说话吧!(ACK)
2、什么是websocket?
websocket简介
WebSocket 是一种在 单个 TCP 连接上进行全双工通信的协议,它被设计为在客户端(通常是浏览器)和服务器之间建立持久连接,从而实现实时通信。
websocket应用场景
场景:当我们想做一个聊天室功能,建立一个聊天信息表,发信息(新增)的时候,都会执行一次查询聊天信息表,如果想获取到别人发的信息,是不是就要再查询一次数据库,获取到最新的信息?使用普通的http请求来做的话,要么点刷新、要么就做轮询(每几秒查询一次数据库执行查询操作)这样显得非常繁琐,且极大消耗服务器资源(反复的发请求会消耗带宽)。这时候我们就需要websocket来进行全双工通信。
websocket工作原理
WebSocket 使用 HTTP 协议进行 一次性握手,然后升级为 WebSocket 协议(握手升级协议)
客户端(浏览器)发送握手请求
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
服务器(后端)相应握手成功:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
3、websocket和http的区别
对比维度 | HTTP | WebSocket |
---|---|---|
协议层 | 应用层协议(基于 TCP) | 应用层协议(基于 TCP) |
设计目的 | 客户端发起请求,获取数据 | 实时、低延迟、双向通信 |
通信模式 | 单向通信(客户端发起,请求-响应) | 双向通信(全双工,任意一方可主动发送) |
连接方式 | 短连接(默认),可使用 Keep-Alive | 长连接(握手成功后持续存在) |
连接建立 | 直接建立 HTTP 请求 | 先通过 HTTP 握手,然后升级协议 |
协议升级 | 不支持 | 通过 HTTP 101 Switching Protocols 升级 |
数据传输格式 | 文本(HTML、JSON、XML),需携带完整头部 | 轻量帧结构,支持文本和二进制(Blob、ArrayBuffer) |
头部开销 | 每次请求都发送完整 HTTP Header(较大) | 初始握手有一次 HTTP Header,之后数据帧头部极小 |
服务端主动推送 | 不支持 | 支持(服务端可随时向客户端发送消息) |
实时性 | 差(依赖轮询或长轮询) | 高(事件驱动、低延迟) |
资源消耗 | 轻量(请求结束即释放连接) | 高(每个客户端都占用一个 TCP 连接) |
并发能力 | 强,连接瞬时断开 | 并发多时需优化连接管理与线程池 |
跨域支持 | 同源策略限制较多 | 可跨域连接,需配置服务端 |
安全传输 | HTTPS(HTTP over TLS) | WSS(WebSocket over TLS) |
认证机制 | 基于 Cookie、Token、HTTP Header | 通常通过 URL 参数或连接后的自定义认证逻辑 |
浏览器支持 | 所有浏览器 | 现代浏览器均已支持(IE10+、Chrome、Firefox、Safari 等) |
常见应用场景 | 表单提交、文件下载、网页加载、RESTful API | 聊天室、推送通知、在线游戏、实时数据监控、协作编辑等 |
示例协议头 | GET /api/data HTTP/1.1 | Upgrade: websocket Connection: Upgrade |
4、websocket的使用
一、maven引入websocket
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
二、新建websocketHandler(websocket的处理类)
websocketHandler的使用通常有两种用法
一种是实现 WebSocketHandler,一种是继承 TextWebSocketHandler
WebSocketHandler 是一个底层接口,你必须实现所有的方法,一般是实现对websocket的细节有更强的控制需求。
@Component
public class ConnectWebsocketHandler implements WebSocketHandler {@Override// 连接建立时触发public void afterConnectionEstablished(WebSocketSession session) throws Exception {System.out.println("连接建立成功:"+session.getId());}@Override// 接收到消息时触发public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {System.out.println("收到消息: " + message.getPayload());}@Override// 出现异常时触发public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {System.out.println("发生错误: " + exception.getMessage());}@Override// 连接关闭时触发public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {System.out.println("连接关闭: " + session.getId());}@Override// 是否支持部分消息public boolean supportsPartialMessages() {return false;//默认返回false}
}
TextWebSocketHandler 是一个简化版的实现类,适用于只处理文本信息的场景
public class ConnectWebsocketHandler extends TextWebSocketHandler {@Overridepublic void afterConnectionEstablished(WebSocketSession session) {// 连接建立后}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {// 只处理文本消息}
}
应用场景对比
特性 | WebSocketHandler (接口) | TextWebSocketHandler (类) |
---|---|---|
复杂度 | 高,需要实现全部方法 | 低,继承并重写想用的方法 |
消息支持 | 文本、二进制、分片都可以 | 仅支持文本消息 |
推荐场景 | 高级控制,处理多类型消息 | 聊天系统、通知推送等 |
三、配置websocket
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {//连接websocket测试registry.addHandler(new ConnectWebsocketHandler(), "/websocket").setAllowedOrigins("*"); // 允许跨域// //聊天室
// registry.addHandler(new ChatRoomWebsocketHandler(), "/chatRoom")
// .setAllowedOrigins("*"); // 允许跨域
//
// //消息推送
// registry.addHandler(new PushMessageWebsocketHandler(), "/pushMessage")
// .setAllowedOrigins("*"); // 允许跨域}
}
给websocket注册处理器,处理的接口为 "/websocket"。
ConnectWebsocketHandler 是使用了第一种方法,实现了websocketHandler,方便理解,如果没有复杂业务,直接继承TextWebSocketHandler就可以。
@Component
public class ConnectWebsocketHandler implements WebSocketHandler {@Override// 连接建立时触发public void afterConnectionEstablished(WebSocketSession session) throws Exception {System.out.println("连接建立成功:"+session.getId());}@Override// 接收到消息时触发public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {System.out.println("收到消息: " + message.getPayload());}@Override// 出现异常时触发public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {System.out.println("发生错误: " + exception.getMessage());}@Override// 连接关闭时触发public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {System.out.println("连接关闭: " + session.getId());}@Override// 是否支持部分消息public boolean supportsPartialMessages() {return false;//默认返回false}
}
项目结构目录截图
前端代码
<template><div style="padding: 20px"><el-input v-model="message" placeholder="输入消息" style="width: 300px" @keyup.enter.native="send" /><el-button type="primary" @click="send">发送</el-button><el-button type="success" @click="connect" >连接websocket</el-button><el-button type="danger" @click="disconnect">断开websocket</el-button><el-divider>消息记录</el-divider><div v-for="(msg, index) in messages" :key="index">{{ msg }}</div></div>
</template><script>
export default {data() {return {ws: null,message: '',messages: []};},mounted() {this.connect();},beforeDestroy() {if (this.ws) this.ws.close();},methods: {connect() {this.ws = new WebSocket('ws://localhost:8080/websocket');this.ws.onopen = () => {this.messages.push('WebSocket连接成功');};this.ws.onmessage = (event) => {this.messages.push('收到:' + event.data);};this.ws.onclose = () => {this.messages.push('WebSocket连接关闭');};this.ws.onerror = (err) => {this.messages.push('WebSocket连接异常');console.error(err);};},disconnect() {if (this.ws) {this.ws.close(1000, "用户主动断开");this.ws = null;}},send() {if (this.ws && this.message) {this.ws.send(this.message);this.messages.push('发送:' + this.message);this.message = '';}else{alert("未连接websocket或消息为空")}}}
};
</script>
运行结果
后台执行结果
5、实现聊天室功能
效果图:
为了更深的了解websocket,接下来准备使用websocket常用的方法来实现下面的功能清单
功能清单:
功能模块 | 描述 |
---|---|
🧑🤝🧑 在线用户列表 | 实时显示当前在线用户 |
💬 公聊(群聊) | 所有人共享一个频道,任何人发送的消息都广播给所有人 |
💌 私聊(点对点聊天) | 用户之间一对一聊天,消息只发送给指定接收者 |
🛎️ 通知广播 | 系统广播(如:某人上线、下线、踢人) |
🔒 用户鉴权 | 用户登录后建立 WebSocket 连接,鉴权校验(如 token) |
🔁 重连机制 | 断线后自动重连,保持体验流畅 |
❤️ 心跳机制 | 保活机制,防止连接超时关闭 |
🧾 消息已读/未读 | 显示消息状态,比如是否已读、已送达 |
需要用到的机制:
WebSocket机制 | 用于支撑哪些功能 |
---|---|
onopen | 建立连接后触发,通常用于用户上线通知、鉴权 |
onmessage | 收发消息的核心方法(群聊、私聊、系统通知) |
onclose | 用户下线、主动退出、断线等清理操作 |
onerror | 连接异常处理 |
send() | 客户端主动发送消息(聊天、心跳) |
close() | 主动断开连接(退出聊天室) |
广播推送机制 | 服务端将消息同时发送给所有连接的用户 |
用户会话管理 | 服务端需要保存每个用户的 WebSocketSession ,支持点对点通信 |
心跳机制 | 定时 ping/pong 保持连接(客户端或服务端主动发心跳) |
有两个必须要理解和掌握的接口和类:WebSocketSession 和 TextMessage
我们查看源码:
WebSokcetSession
解析:
方法/属性 | 类型 | 说明 |
---|---|---|
getId() | String | 会话的唯一标识符,Spring 自动生成(如 "1a2b3c..." ) |
getUri() | URI | 客户端连接时的 URI,通常是 /chat 这类路径 |
getPrincipal() | Principal | 获取当前用户的认证信息(如登录用户)— 需要集成 Spring Security |
getAttributes() | Map<String, Object> | 存放连接时设置的自定义属性,可在 HandshakeInterceptor 中初始化 |
getHandshakeHeaders() | HttpHeaders | 获取握手时的请求头信息(例如 Token、Cookie) |
isOpen() | boolean | 当前连接是否打开 |
sendMessage(WebSocketMessage<?> message) | void | 向客户端发送消息(常用) |
close() | void | 主动关闭连接 |
getRemoteAddress() | InetSocketAddress | 获取客户端的 IP 地址和端口 |
getLocalAddress() | InetSocketAddress | 获取本地服务端地址 |
getTextMessageSizeLimit() | int | 文本消息大小限制(单位:字节) |
setTextMessageSizeLimit(int limit) | void | 设置文本消息大小限制 |
getBinaryMessageSizeLimit() | int | 二进制消息大小限制 |
setBinaryMessageSizeLimit(int limit) | void | 设置二进制消息大小限制 |
在WebSocketConfig中新增配置
//聊天室 -- sessionId版registry.addHandler(new ChatRoomSessionIdWebsocketHandler(), "/websocket/chatRoomSessionId").setAllowedOrigins("*"); // 允许跨域
新增handler类ChatRoomSessionIdWebsocketHandler
@Component
public class ChatRoomSessionIdWebsocketHandler extends TextWebSocketHandler {Map<String, WebSocketSession> userSessions = new ConcurrentHashMap<>();@Override// 连接建立时触发public void afterConnectionEstablished(WebSocketSession session) throws Exception {//连接成功后,加入集合WebSocketSessionUtil.addSession(session);System.out.println("连接建立成功:"+session.getId());System.out.println("当前在线人数:"+WebSocketSessionUtil.getOnlineCount());WebSocketSessionUtil.getAllSessions().forEach(s -> {System.out.println("在线SessionID:"+s.getId());});broadcastMessage("【广播信息】"+"["+session.getId()+"]"+"-连接成功");// 推送最新用户列表broadcastUserList();}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {// 断开连接时移除WebSocketSessionUtil.removeSession(session);System.out.println("连接关闭:" + session.getId());System.out.println("当前在线人数:"+WebSocketSessionUtil.getOnlineCount());WebSocketSessionUtil.getAllSessions().forEach(s -> {System.out.println("在线SessionID:"+s.getId());});broadcastMessage("【广播信息】"+"["+session.getId()+"]"+"-退出连接");// 推送最新用户列表broadcastUserList();}@Overridepublic void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {JSONObject json = JSON.parseObject(message.getPayload());String type = json.getString("type");String content = json.getString("content");String fromUserId = session.getId();System.out.println(session.getId() + " 发送消息:" + content);if ("broadcast".equals(type)) {// 广播给所有在线用户for (WebSocketSession s : WebSocketSessionUtil.getAllSessions()) {if (s.isOpen()) {s.sendMessage(new TextMessage("[群聊] " + fromUserId + ":" + content));}}} else if ("private".equals(type)) {String toUserId = json.getString("toUserId");WebSocketSession toSession = WebSocketSessionUtil.getSession(toUserId);if (toSession != null && toSession.isOpen()) {toSession.sendMessage(new TextMessage("[私聊] " + fromUserId + ":" + content));} else {session.sendMessage(new TextMessage("用户 " + toUserId + " 不在线"));}}}private void broadcastUserList() {List<String> onlineUsers = WebSocketSessionUtil.getAllSessions().stream().map(WebSocketSession::getId) // 你可以替换成用户ID(如 session.getAttributes().get("userId")).collect(Collectors.toList());JSONObject json = new JSONObject();json.put("type", "userList");json.put("users", onlineUsers);TextMessage userListMessage = new TextMessage(json.toJSONString());for (WebSocketSession s : WebSocketSessionUtil.getAllSessions()) {if (s.isOpen()) {try {s.sendMessage(userListMessage);} catch (Exception e) {e.printStackTrace();}}}}private void broadcastMessage(String message) {for (WebSocketSession s : WebSocketSessionUtil.getAllSessions()) {if (s.isOpen()) {try {s.sendMessage(new TextMessage(message));} catch (Exception e) {e.printStackTrace();}}}}}
前端目录:
前端代码
<template><div style="padding: 20px"><el-input v-model="toUserId" placeholder="接收用户ID(留空表示群发)" style="width: 300px; margin-bottom: 10px" /><el-input v-model="message" placeholder="输入消息" style="width: 300px" @keyup.enter.native="send" /><el-button type="primary" @click="send">发送</el-button><el-button type="success" @click="connect">连接WebSocket</el-button><el-button type="danger" @click="disconnect">断开WebSocket</el-button><el-divider>在线用户</el-divider><div v-if="userList.length === 0">暂无在线用户</div><el-tagv-for="user in userList":key="user"type="info"@click="selectUser(user)">{{ user }}</el-tag><el-divider>消息记录</el-divider><div v-for="(msg, index) in messages" :key="index">{{ msg }}</div></div>
</template><script>
export default {name: 'WebSocketChatRoomSessionId',data() {return {ws: null,message: '',messages: [],toUserId: '',userList: []};},mounted() {this.connect();},beforeDestroy() {if (this.ws) this.ws.close();},methods: {connect() {this.ws = new WebSocket('ws://localhost:8080/websocket/chatRoomSessionId');this.ws.onopen = () => {this.messages.push('✅ WebSocket 连接成功');};this.ws.onmessage = (event) => {try {const data = JSON.parse(event.data);if (data.type === 'userList') {this.userList = data.users;} else if (data.type === 'message') {this.messages.push(`[${data.fromUserId || '系统'}]: ${data.content}`);}} catch (e) {this.messages.push(event.data);}};this.ws.onclose = () => {this.messages.push('❌ WebSocket 连接关闭');};this.ws.onerror = (err) => {this.messages.push('⚠️ WebSocket 连接异常');console.error(err);};},disconnect() {if (this.ws) {this.ws.close(1000, "用户主动断开");this.ws = null;}},send() {if (this.ws && this.message) {const payload = {type: this.toUserId ? 'private' : 'broadcast',toUserId: this.toUserId || null,content: this.message};this.ws.send(JSON.stringify(payload));this.message = '';} else {alert("未连接 WebSocket 或消息为空");}},selectUser(userId) {this.toUserId = userId;}}
};
</script>
6、实现消息推送功能
来了,来了,大家做系统应该是最关心这个功能。
【思路】
需求:对全系统【所有的业务操作】进行消息推送,有【群发】、【私发】功能、处理【消息状态(未读/已读)】,websocket持续链接防止因其他故障中断【心跳机制】
【后端篇】
1、确定自己系统的需求,先做数据表
通过代码生成,对后续推送的信息进行保存,通过is_read字段来对消息进行已读未读操作
添加mapper
/*** 设为已读* @param id 消息的id* @return 结果* */public int updateWbNoticeMessageReadStatus(Long id);
添加service
/*** 设为已读* @param id 消息的id* @return 结果* */public int updateWbNoticeMessageReadStatus(Long id);
添加serviceImpl
/*** 更新消息的阅读状态* @param id 消息的id* @return*/@Overridepublic int updateWbNoticeMessageReadStatus(Long id) {return wbNoticeMessageMapper.updateWbNoticeMessageReadStatus(id);}
添加mapper.xml下的方法
<update id="updateWbNoticeMessageReadStatus" parameterType="Long">update wb_notice_messageset is_read = '1'where id = #{id}
</update>
2、明确websocket链接
消息的推送,肯定是有推送人和被推送人,根据如何获取这些数据来确定你的websocket链接
// const token // 需要鉴权
const currentUserId = this.$store.state.user.id;
const currentUserNickName = this.$store.state.user.nickName;
const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替换为你的 WebSocket 地址
this.socket = new WebSocket(wsUrl);
这是我的websocket链接,可以看出我是通过前端拼接的userId和userName来获取到推送人信息的。
ps:实际开发过程中最好是通过token来获取,并解析出用户,进行后续的操作,此处是为了方便理解和通用
3、配置WebSocketConfig
package com.ruoyi.websocket.config;import com.ruoyi.websocket.handler.ChatRoomSessionIdWebsocketHandler;
import com.ruoyi.websocket.handler.ChatRoomUserIdWebsocketHandler;
import com.ruoyi.websocket.handler.ConnectWebsocketHandler;
import com.ruoyi.websocket.handler.PushMessageWebsocketHandler;
import com.ruoyi.websocket.interceptor.WebSocketInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.*;@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Autowiredprivate PushMessageWebsocketHandler pushMessageWebsocketHandler;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {//连接websocket测试registry.addHandler(new ConnectWebsocketHandler(), "/websocket").setAllowedOrigins("*"); // 允许跨域//聊天室 -- sessionId版registry.addHandler(new ChatRoomSessionIdWebsocketHandler(), "/websocket/chatRoomSessionId").setAllowedOrigins("*"); // 允许跨域//聊天室 -- UserId版registry.addHandler(new ChatRoomUserIdWebsocketHandler(), "/websocket/chatRoomUserId").addInterceptors(new WebSocketInterceptor())//拦截器用来获取前端传递过来的userid.setAllowedOrigins("*"); // 允许跨域//消息推送registry.addHandler(pushMessageWebsocketHandler, "/websocket/pushMessage").addInterceptors(new WebSocketInterceptor())//拦截器用来获取前端传递过来的userid.setAllowedOrigins("*"); // 允许跨域}}
4、添加拦截器 WebSocketInterceptor 来获取到webocket链接携带的userId和nickName
package com.ruoyi.websocket.interceptor;import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.net.URI;
import java.util.Map;@Component
public class WebSocketInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Map<String, Object> attributes) throws Exception {URI uri = request.getURI();String query = uri.getQuery(); // userId=xxx&nickName=yyyif (query == null) return false;Map<String, String> paramMap = parseQuery(query);String userId = paramMap.get("userId");String nickName = paramMap.get("nickName");if (userId == null || nickName == null) {return false; // 拒绝握手}// 放入 WebSocketSession attributes,后面 WebSocketHandler 可取attributes.put("userId", userId);attributes.put("nickName", nickName);return true; // 允许握手}@Overridepublic void afterHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Exception exception) {// 握手完成后进行的操作}//拆分传递的参数private Map<String, String> parseQuery(String query) {Map<String, String> map = new java.util.HashMap<>();if (query == null || query.isEmpty()) return map;String[] pairs = query.split("&");for (String pair : pairs) {int idx = pair.indexOf('=');if (idx > 0) {String key = pair.substring(0, idx);String value = pair.substring(idx + 1);map.put(key, value);}}return map;}}
5、添加 PushMessageWebsocketHandler 来处理推送信息
package com.ruoyi.websocket.handler;import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.SysUser;
import com.ruoyi.system.mapper.SysUserMapper;
import com.ruoyi.websocket.domain.WbNoticeMessage;
import com.ruoyi.websocket.service.IWbNoticeMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;/*** 消息推送 WebSocket Handler*/
@Component
public class PushMessageWebsocketHandler extends TextWebSocketHandler {@Autowiredprivate IWbNoticeMessageService wbNoticeMessageService;@Autowiredprivate SysUserMapper userMapper;// 存储所有连接的会话private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {sessions.add(session);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {//获取前端发送的messageString payload = message.getPayload();// 解析整个 JSON 对象JSONObject jsonObject = JSONObject.parseObject(payload);// 心跳检测String type = jsonObject.getString("type");if ("ping".equalsIgnoreCase(type)) {session.sendMessage(new TextMessage("{\"type\":\"pong\"}"));return;}//获取websocket携带的参数的userId和nickName// todo 前端可以通过token携带参数,然后使用ruoyi封装的token方法获取到当前用户,这里方便演示和通用性直接使用前端传递的UserId和nickNameString userId = (String) session.getAttributes().get("userId");String nickName = (String) session.getAttributes().get("nickName");// 提取 data 对象--从这里添加前端所需要推送的字段JSONObject data = jsonObject.getJSONObject("data");String title = data.getString("title");String content = data.getString("content");Long receiverId = data.getLong("receiverId");String receiverName = data.getString("receiverName");// 1. 如果receiverId为空则是群发,否则是单发,保存消息到数据库// todo 可以自行根据前端传递的type来判断是群发还是单发,这里为了方便演示直接通过receiverId是否为空来判断if (receiverId != null) {WbNoticeMessage wbNoticeMessage = new WbNoticeMessage();wbNoticeMessage.setTitle(title);wbNoticeMessage.setContent(content);wbNoticeMessage.setSenderId(Long.parseLong(userId));wbNoticeMessage.setSenderName(nickName);wbNoticeMessage.setReceiverId(receiverId);wbNoticeMessage.setReceiverName(receiverName);wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage);} else {SysUser user = new SysUser();List<SysUser> userList = userMapper.selectUserList(user);for (SysUser sysUser : userList) {WbNoticeMessage wbNoticeMessage = new WbNoticeMessage();wbNoticeMessage.setTitle(title);wbNoticeMessage.setContent(content);wbNoticeMessage.setSenderId(Long.parseLong(userId));wbNoticeMessage.setSenderName(nickName);wbNoticeMessage.setReceiverId(sysUser.getUserId());wbNoticeMessage.setReceiverName(receiverName);wbNoticeMessageService.insertWbNoticeMessage(wbNoticeMessage);}}// 2. 给所有在线客户端广播消息for (WebSocketSession s : sessions) {if (s.isOpen()) {s.sendMessage(new TextMessage(payload));}}// todo 3.重要的信息还可以通过邮件等其他方式通知用户}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {sessions.remove(session);}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {exception.printStackTrace();sessions.remove(session);if (session.isOpen()) {session.close();}}
}
【前端篇】
1、创建消息铃铛样式,封装成组件
InfoBell.vue代码
<template><div><el-tooltip :content="noticeContent" effect="dark" placement="bottom"><el-badge :value="noticeCount" class="right-menu-item hover-effect" :class="{ 'badge-custom': noticeCount > 0 }"><i class="el-icon-message-solid" @click="toNoticePage"></i></el-badge></el-tooltip></div></template><script>import { listWbNoticeMessage } from "@/api/websocket/WbNoticeMessage";export default {name: "InfoBell",props: {refreshNoticeCount: {type: Boolean,default: false}},data() {return {noticeContent: "", // 通知内容noticeCount: 0, // 通知数量socket: null, // WebSocket 实例// 查询参数queryParams: {pageNum: 1,pageSize: 10,title: null,content: null,type: null,senderId: null,senderName: null,receiverId: this.$store.state.user.id,receiverName: null,isRead: null,readTime: null,priority: null,targetUrl: null,bizType: null,bizId: null},};},created() {this.getList();},mounted() {this.initWebSocket();},beforeDestroy() {this.closeWebSocket();},watch: {refreshNoticeCount(val) {if (val) {this.getList();}}},methods: {/**---------------------websocket专栏-------------------- *//** 初始化/连接 WebSocket */initWebSocket() {// const token // 需要鉴权const currentUserId = this.$store.state.user.id;const currentUserNickName = this.$store.state.user.nickName;const wsUrl = `ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`; // 替换为你的 WebSocket 地址this.socket = new WebSocket(wsUrl);this.socket.onopen = () => {console.log("头部导航消息铃铛-WebSocket 连接已建立");this.startHeartbeat();//启用心跳机制};this.socket.onmessage = (event) => {try {const msg = JSON.parse(event.data);if (msg.type === "pong") {console.log("收到心跳 pong");return;}} catch (e) {// 非 JSON 消息,继续执行}this.getList();};this.socket.onerror = (error) => {console.error("头部导航消息铃铛-WebSocket 发生错误:", error);};this.socket.onclose = () => {console.log("头部导航消息铃铛-WebSocket 已关闭");this.stopHeartbeat();this.tryReconnect();};},/** 关闭 WebSocket */closeWebSocket() {if (this.socket) {this.socket.close();this.socket = null;}this.stopHeartbeat();if (this.reconnectTimer) {clearInterval(this.reconnectTimer);this.reconnectTimer = null;}},/** 启动心跳 */startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.socket && this.socket.readyState === WebSocket.OPEN) {this.socket.send(JSON.stringify({ type: "ping" }));console.log("发送心跳 ping");}}, 30000); // 每 30 秒},/** 停止心跳 */stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);this.heartbeatTimer = null;}},/** 尝试重连 */tryReconnect() {if (this.reconnectTimer) return;this.reconnectTimer = setInterval(() => {console.log("尝试重连 InfoBell-WebSocket...");this.initWebSocket();if (this.socket && this.socket.readyState === WebSocket.OPEN) {clearInterval(this.reconnectTimer);this.reconnectTimer = null;}}, 5000); // 每 5 秒重连一次},/** -------------------------- 业务处理专栏---------------------- *//** 查询通知信息框列表 */getList() {this.queryParams.isRead = 0;listWbNoticeMessage(this.queryParams).then(response => {this.noticeCount = response.total;this.noticeContent = `您有${this.noticeCount}条未读的信息`;})},/** 跳转到通知页面 */toNoticePage() {this.$router.push("/websocket/pushMessage");},},};</script><style lang="scss" scoped>::v-deep .el-badge__content {margin-top: 9px;margin-right: 1px;}.badge-custom {animation: blink-animation 0.5s infinite alternate;}@keyframes blink-animation {0% {opacity: 1;}100% {opacity: 0.1;}}</style>
2、在顶部导航引用消息铃铛组件(InfoBell)
引入组件后,页面就完成了
3、创建推送信息查看页面
pushMessage.vue代码
<template><div style="padding: 50px;"><el-row :gutter="20"><el-col :span="5" ><el-card><h3>消息推送(快捷创建)</h3><el-form ref="form" :model="form" label-width="90px"><el-form-item label="通知标题" prop="title"><el-input v-model="form.title" placeholder="请输入通知标题" /></el-form-item><el-form-item label="通知内容"><el-input v-model="form.content" placeholder="请输入通知标题" type="textarea" /></el-form-item><el-form-item label="接收人ID" prop="receiverId"><el-input v-model="form.receiverId" placeholder="请输入接收人ID" /></el-form-item><el-form-item label="接收人昵称" prop="receiverName"><el-input v-model="form.receiverName" placeholder="请输入接收人昵称" /></el-form-item></el-form><div style="color: red;font-weight: 600;font-size: 14px;">PS:不填接受人id则视为群发</div><el-button type="primary" @click="sendMessage" style="margin-top: 10px;">推送消息</el-button><el-divider></el-divider><div style="height: 300px; overflow-y: auto; border: 1px solid #ebeef5; padding: 10px;"><div v-for="(msg, index) in messages" :key="index" style="margin-bottom: 8px;"><el-tag type="info" size="small">消息 {{ index + 1 }}</el-tag><span style="margin-left: 8px;">{{ msg }}</span></div></div></el-card></el-col><el-col :span="19"><el-card><el-tabs v-model="activeName" @tab-click="handleClick"><el-tab-pane label="未读" name="unread"><el-table v-loading="loading" :data="WbNoticeMessageList"><el-table-column label="id" align="center" prop="id" /><el-table-column label="通知标题" align="center" prop="title" /><el-table-column label="通知内容" align="center" prop="content" /><el-table-column label="消息类型" align="center" prop="type" /><el-table-column label="发送人ID" align="center" prop="senderId" /><el-table-column label="发送人名称" align="center" prop="senderName" /><el-table-column label="接受者ID" align="center" prop="receiverId" /><el-table-column label="接受者名称" align="center" prop="receiverName" /><el-table-column label="是否已读" align="center" prop="isRead" /><el-table-column label="阅读时间" align="center" prop="readTime" width="100"><template slot-scope="scope"><span>{{ parseTime(scope.row.readTime, '{y}-{m}-{d}') }}</span></template></el-table-column><el-table-column label="优先级" align="center" prop="priority" /><el-table-column label="业务类型" align="center" prop="bizType" /><el-table-column label="业务ID" align="center" prop="bizId" /><el-table-column label="操作" align="center" class-name="small-padding fixed-width"><template slot-scope="scope"><el-buttonsize="mini"type="text"icon="el-icon-edit"@click="handleUpdateReadStatus(scope.row)">设为已读</el-button></template></el-table-column></el-table><pagination v-show="total > 0" :total="total" :page.sync="queryParams.pageNum" :limit.sync="queryParams.pageSize" @pagination="getList" /></el-tab-pane><el-tab-pane label="已读" name="read"><el-table v-loading="loading" :data="WbNoticeMessageList" ><el-table-column label="id" align="center" prop="id" /><el-table-column label="通知标题" align="center" prop="title" /><el-table-column label="通知内容" align="center" prop="content" /><el-table-column label="消息类型" align="center" prop="type" /><el-table-column label="发送人ID" align="center" prop="senderId" /><el-table-column label="发送人名称" align="center" prop="senderName" /><el-table-column label="接受者ID" align="center" prop="receiverId" /><el-table-column label="接受者名称" align="center" prop="receiverName" /><el-table-column label="是否已读" align="center" prop="isRead" /><el-table-column label="阅读时间" align="center" prop="readTime" width="100"><template slot-scope="scope"><span>{{ parseTime(scope.row.readTime, '{y}-{m}-{d}') }}</span></template></el-table-column><el-table-column label="优先级" align="center" prop="priority" /><el-table-column label="业务类型" align="center" prop="bizType" /><el-table-column label="业务ID" align="center" prop="bizId" /></el-table><pagination v-show="total > 0" :total="total" :page.sync="queryParams.pageNum" :limit.sync="queryParams.pageSize" @pagination="getList" /></el-tab-pane></el-tabs></el-card></el-col></el-row><div v-show="false"><info-bell :refreshNoticeCount="isRefreshNoticeCount" /></div></div>
</template><script>
import { listWbNoticeMessage,updateReadStatus} from "@/api/websocket/WbNoticeMessage"
import InfoBell from "@/components/InfoBell";export default {name:"pushMesage",components: { InfoBell },data() {return {ws: null,message: '',messages: [],loading: true,total: 0,WbNoticeMessageList: [],form:{},// 查询参数queryParams: {pageNum: 1,pageSize: 10,title: null,content: null,type: null,senderId: null,senderName: null,receiverId: this.$store.state.user.id,receiverName: null,isRead: null,readTime: null,priority: null,targetUrl: null,bizType: null,bizId: null},activeName: 'unread',isRefreshNoticeCount:false,//是否刷新通知数量};},methods: {connectWebSocket() {// 连接 WebSocket,地址根据后端实际情况调整const currentUserId = this.$store.state.user.id;const currentUserNickName = this.$store.state.user.nickName;this.ws = new WebSocket(`ws://localhost:8080/websocket/pushMessage?userId=${currentUserId}&nickName=${currentUserNickName}`);this.ws.onopen = () => {console.log("推送信息-WebSocket 已连接");this.addMessage("推送信息-WebSocket 已连接");};this.ws.onmessage = event => {console.log("收到消息:", event.data);this.addMessage(event.data);};this.ws.onclose = () => {this.addMessage("推送信息-WebSocket 已关闭");};this.ws.onerror = error => {this.addMessage("推送信息-WebSocket 发生错误");};},sendMessage() {if (!this.form.content.trim()) {this.$message.warning("请输入消息内容");return;}if (this.ws && this.ws.readyState === WebSocket.OPEN) {// 发送整个表单内容this.ws.send(JSON.stringify({data: this.form}));this.$message.success("消息发送成功");// 因为websocket发送请求是异步的,为了方便显示这里使用了延时,实际情况还是要在后端通过返回值来显示getListsetTimeout(() => {this.getList();}, 500);} else {this.$message.error("WebSocket 未连接");}},addMessage(msg) {this.messages.push(msg);this.$nextTick(() => {// 自动滚动到底部const container = this.$el.querySelector("div[style*='overflow-y']");if (container) container.scrollTop = container.scrollHeight;});},/** --------------------------------- 信息模块 --------------------- */handleClick(){this.getList();},/** 查询通知信息框列表 */getList() {this.loading = truethis.queryParams.isRead = this.activeName === 'unread' ? 0 : 1;console.log(this.queryParams);listWbNoticeMessage(this.queryParams).then(response => {this.WbNoticeMessageList = response.rowsthis.total = response.totalthis.loading = false})},handleUpdateReadStatus(row){if (row.id != null) {updateReadStatus(row.id).then(response => {this.isRefreshNoticeCount = true;console.log(this.$store);this.$modal.msgSuccess("该信息已标记为已读~")this.getList();})}}},created() {this.getList();},mounted() {this.connectWebSocket();},beforeDestroy() {if (this.ws) {this.ws.close();}}
};
</script><style scoped>
</style>
以下是快捷创建推送信息的页面
4、详解【心跳机制】
一、详解
WebSocket 的心跳机制,是一种保持连接活跃、防止断线、检测对方是否存活的机制。特别是在使用 WebSocket 建立了长连接之后,如果网络设备(如代理、网关、防火墙)或者服务端/客户端本身在长时间没有数据传输时自动断开连接,就会导致推送失败、消息丢失的问题。
二、为什么要使用心跳机制?
1、防止连接被中间设备断开
很多中间设备(比如 Nginx、CDN、防火墙)会在一段时间内没有数据传输时,主动断开“看起来闲置”的连接。
2、检测对方是否在线
如果客户端意外断线(如:网络断了、电脑睡眠、浏览器崩溃),服务器端并不知道,继续保留 WebSocket 会话资源,浪费内存。
3、实现自动重连
通过心跳,可以判断连接是否断开,如果断了,客户端就能自动发起重连。
三、心跳机制怎么工作?
通常的设计方式如下:
角色 | 行为说明 |
---|---|
客户端 | 每隔一段时间(如 30 秒)发送一个特定的“心跳包”消息,如 { "type": "ping" } |
服务端 | 收到 ping 后立即回复 { "type": "pong" } ,表示“我还活着” |
客户端 | 若在预期时间内未收到 pong ,说明可能断线,可以发起重连 |
四、代码实操
【浏览器】,每隔30秒向【后端】发送ping信号,后端接收到了返回pong信号表示通信正常,不做任何业务处理。
可以理解成这是一个地震的救援过程:
遇难者被埋在了地底下,救援人员在进行挖地救援,遇难者每隔30秒向救援人员叫喊一声:ping!,救援人员听到了遇难者的声音得知遇难者还活着,随之回复一声:pong!。表示别怕,我正在救援。表示通信正常。
【前端发起心跳】
/** 启动心跳 */startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.socket && this.socket.readyState === WebSocket.OPEN) {this.socket.send(JSON.stringify({ type: "ping" }));console.log("发送心跳 ping");}}, 30000); // 每 30 秒},/** 停止心跳 */stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);this.heartbeatTimer = null;}},/** 尝试重连 */tryReconnect() {if (this.reconnectTimer) return;this.reconnectTimer = setInterval(() => {console.log("正在尝试重连 InfoBell-WebSocket...");this.initWebSocket();if (this.socket && this.socket.readyState === WebSocket.OPEN) {clearInterval(this.reconnectTimer);this.reconnectTimer = null;}}, 5000); // 每 5 秒重连一次},
【后端接收心跳】
// 心跳检测String type = jsonObject.getString("type");if ("ping".equalsIgnoreCase(type)) {session.sendMessage(new TextMessage("{\"type\":\"pong\"}"));return;}
代码将整理成 ruoyi-vue-websocket上传到git~