从轮询到实时推送:将站内消息接口改造为 WebSocket 服务
在现代 Web 应用中,站内消息功能是提升用户体验的关键组件。传统的轮询方式不仅浪费服务器资源,还无法做到消息的实时推送。本文将带你全面了解如何将一个普通的查询站内消息接口改造为基于 WebSocket 的实时推送服务,从底层原理到完整实现,让你的应用消息系统迈入实时时代。
一、为什么需要将消息接口改造为 WebSocket?
在开始改造之前,我们先搞清楚为什么需要做这个改造。让我们通过对比传统轮询方案和 WebSocket 方案,理解背后的技术逻辑。
1.1 传统轮询方案的痛点
传统的站内消息查询通常采用以下两种方式:
- 定时轮询:客户端每隔固定时间(如 30 秒)发送一次 HTTP 请求查询新消息
- 长轮询:客户端发送请求后,服务器保持连接直到有新消息或超时才返回
这两种方式都存在明显缺陷:
- 资源浪费:大量无效请求(没有新消息时)占用服务器 CPU、内存和网络带宽
- 实时性差:消息延迟至少为轮询间隔时间的一半(平均情况下)
- 服务器压力大:每一次请求都需要重新建立连接、进行身份验证等操作
- 移动端体验差:频繁的网络请求会消耗更多电量
1.2 WebSocket 方案的优势
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它解决了 HTTP 协议的局限性:
- 持久连接:一次握手后保持连接状态,避免重复建立连接的开销
- 双向通信:服务器可以主动向客户端推送消息,无需客户端请求
- 低延迟:消息可以实时传递,延迟通常在毫秒级
- 轻量级:数据帧头部较小,相比 HTTP 节省带宽
1.3 两种方案的架构对比
下面的架构图直观展示了两种方案的区别:
二、WebSocket 核心原理详解
在动手改造之前,我们需要先理解 WebSocket 的工作原理,这有助于我们更好地设计和实现系统。
2.1 WebSocket 握手过程
WebSocket 并不是全新的协议,而是借助 HTTP 协议完成握手,之后使用自定义的帧格式进行通信:
握手请求的关键 HTTP 头部:
Upgrade: websocket
:表示希望升级到 WebSocket 协议Connection: Upgrade
:配合 Upgrade 头部使用Sec-WebSocket-Key
:客户端生成的随机字符串,用于验证服务器是否支持 WebSocketSec-WebSocket-Version: 13
:指定 WebSocket 协议版本
2.2 WebSocket 数据帧格式
WebSocket 通信使用帧 (frame) 作为数据传输的基本单位,帧格式如下:
0 1 2 30 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1+-+-+-+-+-------+-+-------------+-------------------------------+|F|R|R|R| opcode|M| Payload len | Extended payload length ||I|S|S|S| (4) |A| (7) | (16/64) ||N|V|V|V| |S| | (if payload len==126/127) || |1|2|3| |K| | |+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +| Extended payload length continued, if payload len == 127 |+ - - - - - - - - - - - - - - - +-------------------------------+| |Masking-key, if MASK set to 1 |+-------------------------------+-------------------------------+| Masking-key (continued) | Payload Data |+-------------------------------- - - - - - - - - - - - - - - - +: Payload Data continued ... :+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| Payload Data continued ... |+---------------------------------------------------------------+
关键字段说明:
FIN
:1 表示这是消息的最后一帧opcode
:表示帧的类型(0x1 表示文本帧,0x2 表示二进制帧)MASK
:客户端发送的帧必须设置为 1,表示数据经过掩码处理Payload Data
:实际传输的数据
2.3 WebSocket 与 HTTP 的本质区别
特性 | HTTP | WebSocket |
---|---|---|
连接方式 | 短连接,每次请求建立新连接 | 长连接,一次握手后保持连接 |
通信方向 | 单向,客户端请求 - 服务器响应 | 双向,服务器可主动推送 |
头部开销 | 每个请求头部较大 | 初始握手后头部开销小 |
实时性 | 差,依赖轮询间隔 | 好,毫秒级延迟 |
适用场景 | 普通网页请求 | 实时通信(消息、聊天、通知等) |
三、改造方案设计
现在我们开始设计具体的改造方案。假设我们已有一个基于 HTTP 的站内消息查询接口,我们需要将其改造为 WebSocket 服务。
3.1 需求分析
我们的站内消息系统需要实现以下功能:
- 客户端连接 WebSocket 后,自动接收未读消息
- 服务器有新消息时,主动推送给相关用户
- 支持客户端标记消息为已读
- 支持查询历史消息
- 连接断开后重连机制
- 身份认证与权限控制
3.2 系统架构设计
改造后的系统架构如下:
核心组件说明:
- 连接管理器:维护用户与 WebSocket 连接的映射关系
- 消息服务:处理消息的 CRUD 操作和推送逻辑
- WebSocket 服务器:处理连接、消息收发
- API 网关:处理认证、路由等功能
3.3 数据模型设计
我们需要设计消息相关的数据表,这里使用 MySQL:
-- 消息表
CREATE TABLE `message` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',`sender_id` bigint NOT NULL COMMENT '发送者ID',`receiver_id` bigint NOT NULL COMMENT '接收者ID',`content` varchar(2000) NOT NULL COMMENT '消息内容',`type` tinyint NOT NULL DEFAULT 1 COMMENT '消息类型:1-系统消息,2-通知消息,3-私信',`status` tinyint NOT NULL DEFAULT 0 COMMENT '状态:0-未读,1-已读,2-已删除',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),KEY `idx_receiver_status` (`receiver_id`,`status`) COMMENT '接收者ID和状态索引,用于查询未读消息',KEY `idx_create_time` (`create_time`) COMMENT '创建时间索引,用于排序'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='站内消息表';-- 消息已读记录表(用于批量标记已读)
CREATE TABLE `message_read_record` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',`user_id` bigint NOT NULL COMMENT '用户ID',`last_read_time` datetime NOT NULL COMMENT '最后已读时间',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_user_id` (`user_id`) COMMENT '用户ID唯一索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息已读记录表';
四、环境准备与依赖配置
接下来我们开始搭建项目环境,使用 Maven 管理依赖,采用当前最新的稳定版本。
4.1 项目结构
src/
├── main/
│ ├── java/
│ │ └── com/
│ │ └── example/
│ │ ├── MessageApplication.java
│ │ ├── config/
│ │ │ ├── WebSocketConfig.java
│ │ │ ├── SwaggerConfig.java
│ │ │ └── SecurityConfig.java
│ │ ├── controller/
│ │ │ ├── MessageController.java
│ │ │ └── WebSocketController.java
│ │ ├── dto/
│ │ │ ├── MessageDTO.java
│ │ │ └── WebSocketMessage.java
│ │ ├── entity/
│ │ │ ├── Message.java
│ │ │ └── MessageReadRecord.java
│ │ ├── enums/
│ │ │ ├── MessageStatusEnum.java
│ │ │ └── MessageTypeEnum.java
│ │ ├── exception/
│ │ │ ├── BusinessException.java
│ │ │ └── GlobalExceptionHandler.java
│ │ ├── mapper/
│ │ │ ├── MessageMapper.java
│ │ │ └── MessageReadRecordMapper.java
│ │ ├── service/
│ │ │ ├── MessageService.java
│ │ │ ├── WebSocketService.java
│ │ │ └── impl/
│ │ │ ├── MessageServiceImpl.java
│ │ │ └── WebSocketServiceImpl.java
│ │ └── util/
│ │ └── SecurityUtil.java
│ └── resources/
│ ├── application.yml
│ └── mybatis/
│ └── mapper/
│ ├── MessageMapper.xml
│ └── MessageReadRecordMapper.xml
└── pom.xml
4.2 Maven 依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.0</version><relativePath/></parent><groupId>com.example</groupId><artifactId>message-websocket-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>message-websocket-demo</name><description>将站内消息接口改造为WebSocket服务的示例项目</description><properties><java.version>17</java.version><mybatis-plus.version>3.5.5</mybatis-plus.version><fastjson2.version>2.0.45</fastjson2.version><guava.version>32.1.3-jre</guava.version><lombok.version>1.18.30</lombok.version><springdoc.version>2.2.0</springdoc.version></properties><dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot WebSocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- Spring Boot Security --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><!-- MyBatis-Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><!-- MySQL Driver --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- FastJSON2 --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson2.version}</version></dependency><!-- Guava --><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>${guava.version}</version></dependency><!-- Swagger3 --><dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-starter-webmvc-ui</artifactId><version>${springdoc.version}</version></dependency><!-- Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
4.3 配置文件
spring:application:name: message-websocket-demodatasource:url: jdbc:mysql://localhost:3306/message_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Drivermybatis-plus:mapper-locations: classpath:mybatis/mapper/*.xmltype-aliases-package: com.example.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImplglobal-config:db-config:id-type: autologic-delete-field: deletedlogic-delete-value: 1logic-not-delete-value: 0server:port: 8080servlet:context-path: /springdoc:api-docs:path: /api-docsswagger-ui:path: /swagger-ui.htmloperationsSorter: methodpackages-to-scan: com.example.controllerlogging:level:root: INFOcom.example: DEBUGorg.springframework.web.socket: INFO
五、核心代码实现
现在我们开始实现核心代码,按照分层架构的思想,从实体类、数据访问层、服务层到控制器逐层实现。
5.1 实体类与枚举
首先定义消息相关的实体类和枚举类型:
5.1.1 消息类型枚举
package com.example.enums;import lombok.AllArgsConstructor;
import lombok.Getter;/*** 消息类型枚举** @author ken*/
@Getter
@AllArgsConstructor
public enum MessageTypeEnum {/*** 系统消息*/SYSTEM(1, "系统消息"),/*** 通知消息*/NOTICE(2, "通知消息"),/*** 私信*/PRIVATE(3, "私信");/*** 类型编码*/private final int code;/*** 类型描述*/private final String desc;/*** 根据编码获取枚举** @param code 编码* @return 枚举*/public static MessageTypeEnum getByCode(int code) {for (MessageTypeEnum type : values()) {if (type.code == code) {return type;}}return null;}
}
5.1.2 消息状态枚举
package com.example.enums;import lombok.AllArgsConstructor;
import lombok.Getter;/*** 消息状态枚举** @author ken*/
@Getter
@AllArgsConstructor
public enum MessageStatusEnum {/*** 未读*/UNREAD(0, "未读"),/*** 已读*/READ(1, "已读"),/*** 已删除*/DELETED(2, "已删除");/*** 状态编码*/private final int code;/*** 状态描述*/private final String desc;/*** 根据编码获取枚举** @param code 编码* @return 枚举*/public static MessageStatusEnum getByCode(int code) {for (MessageStatusEnum status : values()) {if (status.code == code) {return status;}}return null;}
}
5.1.3 消息实体类
package com.example.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;import java.time.LocalDateTime;/*** 消息实体类** @author ken*/
@Data
@TableName("message")
public class Message {/*** 主键ID*/@TableId(type = IdType.AUTO)private Long id;/*** 发送者ID*/private Long senderId;/*** 接收者ID*/private Long receiverId;/*** 消息内容*/private String content;/*** 消息类型:1-系统消息,2-通知消息,3-私信* @see com.example.enums.MessageTypeEnum*/private Integer type;/*** 状态:0-未读,1-已读,2-已删除* @see com.example.enums.MessageStatusEnum*/private Integer status;/*** 创建时间*/private LocalDateTime createTime;/*** 更新时间*/private LocalDateTime updateTime;
}
5.1.4 消息已读记录实体类
package com.example.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;import java.time.LocalDateTime;/*** 消息已读记录实体类** @author ken*/
@Data
@TableName("message_read_record")
public class MessageReadRecord {/*** 主键ID*/@TableId(type = IdType.AUTO)private Long id;/*** 用户ID*/private Long userId;/*** 最后已读时间*/private LocalDateTime lastReadTime;/*** 创建时间*/private LocalDateTime createTime;/*** 更新时间*/private LocalDateTime updateTime;
}
5.2 数据传输对象 (DTO)
定义前后端数据交互的 DTO 类:
5.2.1 消息 DTO
package com.example.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;import java.time.LocalDateTime;/*** 消息DTO** @author ken*/
@Data
@Schema(description = "消息数据传输对象")
public class MessageDTO {/*** 消息ID*/@Schema(description = "消息ID")private Long id;/*** 发送者ID*/@Schema(description = "发送者ID")private Long senderId;/*** 接收者ID*/@Schema(description = "接收者ID")private Long receiverId;/*** 消息内容*/@Schema(description = "消息内容")private String content;/*** 消息类型:1-系统消息,2-通知消息,3-私信*/@Schema(description = "消息类型:1-系统消息,2-通知消息,3-私信")private Integer type;/*** 消息类型名称*/@Schema(description = "消息类型名称")private String typeName;/*** 状态:0-未读,1-已读,2-已删除*/@Schema(description = "状态:0-未读,1-已读,2-已删除")private Integer status;/*** 状态名称*/@Schema(description = "状态名称")private String statusName;/*** 创建时间*/@Schema(description = "创建时间")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")private LocalDateTime createTime;
}
5.2.2 WebSocket 消息封装类
package com.example.dto;import com.alibaba.fastjson2.JSON;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;/*** WebSocket消息封装类** @author ken*/
@Data
@Schema(description = "WebSocket消息封装类")
public class WebSocketMessage<T> {/*** 消息类型:CONNECT-连接成功,MESSAGE-新消息,READ-已读确认,ERROR-错误信息*/@Schema(description = "消息类型:CONNECT-连接成功,MESSAGE-新消息,READ-已读确认,ERROR-错误信息")private String type;/*** 消息内容*/@Schema(description = "消息内容")private T data;/*** 时间戳*/@Schema(description = "时间戳")private Long timestamp = System.currentTimeMillis();/*** 创建连接成功消息** @param message 消息内容* @return WebSocketMessage*/public static WebSocketMessage<String> connect(String message) {WebSocketMessage<String> webSocketMessage = new WebSocketMessage<>();webSocketMessage.setType("CONNECT");webSocketMessage.setData(message);return webSocketMessage;}/*** 创建新消息** @param data 消息数据* @return WebSocketMessage*/public static <T> WebSocketMessage<T> message(T data) {WebSocketMessage<T> webSocketMessage = new WebSocketMessage<>();webSocketMessage.setType("MESSAGE");webSocketMessage.setData(data);return webSocketMessage;}/*** 创建已读确认消息** @param messageId 消息ID* @return WebSocketMessage*/public static WebSocketMessage<Long> read(Long messageId) {WebSocketMessage<Long> webSocketMessage = new WebSocketMessage<>();webSocketMessage.setType("READ");webSocketMessage.setData(messageId);return webSocketMessage;}/*** 创建错误消息** @param error 错误信息* @return WebSocketMessage*/public static WebSocketMessage<String> error(String error) {WebSocketMessage<String> webSocketMessage = new WebSocketMessage<>();webSocketMessage.setType("ERROR");webSocketMessage.setData(error);return webSocketMessage;}/*** 转换为JSON字符串** @return JSON字符串*/public String toJson() {return JSON.toJSONString(this);}
}
5.3 异常处理
定义业务异常和全局异常处理器:
5.3.1 业务异常类
package com.example.exception;import lombok.Getter;/*** 业务异常类** @author ken*/
@Getter
public class BusinessException extends RuntimeException {/*** 错误码*/private final int code;/*** 构造方法** @param code 错误码* @param message 错误信息*/public BusinessException(int code, String message) {super(message);this.code = code;}/*** 构造方法** @param message 错误信息*/public BusinessException(String message) {this(500, message);}
}
5.3.2 全局异常处理器
package com.example.exception;import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;/*** 全局异常处理器** @author ken*/
@Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler {/*** 处理业务异常** @param e 业务异常* @return 错误响应*/@ExceptionHandler(BusinessException.class)public JSONObject handleBusinessException(BusinessException e) {log.error("业务异常: {}", e.getMessage(), e);JSONObject result = new JSONObject();result.put("code", e.getCode());result.put("message", e.getMessage());return result;}/*** 处理其他异常** @param e 异常* @return 错误响应*/@ExceptionHandler(Exception.class)@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)public JSONObject handleException(Exception e) {log.error("系统异常: {}", e.getMessage(), e);JSONObject result = new JSONObject();result.put("code", 500);result.put("message", "系统异常,请联系管理员");return result;}
}
5.4 数据访问层
使用 MyBatis-Plus 实现数据访问层:
5.4.1 消息 Mapper 接口
package com.example.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.dto.MessageDTO;
import com.example.entity.Message;
import org.apache.ibatis.annotations.Param;import java.time.LocalDateTime;
import java.util.List;/*** 消息Mapper接口** @author ken*/
public interface MessageMapper extends BaseMapper<Message> {/*** 分页查询用户消息** @param page 分页参数* @param receiverId 接收者ID* @param status 消息状态* @return 分页消息列表*/IPage<MessageDTO> selectUserMessages(Page<MessageDTO> page,@Param("receiverId") Long receiverId,@Param("status") Integer status);/*** 查询用户未读消息数量** @param receiverId 接收者ID* @return 未读消息数量*/Integer selectUnreadCount(@Param("receiverId") Long receiverId);/*** 查询用户指定时间后的未读消息** @param receiverId 接收者ID* @param time 时间点* @return 未读消息列表*/List<MessageDTO> selectUnreadMessagesAfterTime(@Param("receiverId") Long receiverId,@Param("time") LocalDateTime time);/*** 批量更新消息状态为已读** @param receiverId 接收者ID* @param status 原状态* @param newStatus 新状态* @param time 时间点* @return 更新数量*/Integer batchUpdateStatus(@Param("receiverId") Long receiverId,@Param("status") Integer status,@Param("newStatus") Integer newStatus,@Param("time") LocalDateTime time);
}
5.4.2 消息已读记录 Mapper 接口
package com.example.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.entity.MessageReadRecord;
import org.apache.ibatis.annotations.Param;/*** 消息已读记录Mapper接口** @author ken*/
public interface MessageReadRecordMapper extends BaseMapper<MessageReadRecord> {/*** 根据用户ID查询最后已读时间** @param userId 用户ID* @return 最后已读时间记录*/MessageReadRecord selectLastReadTimeByUserId(@Param("userId") Long userId);/*** 更新用户最后已读时间** @param userId 用户ID* @param lastReadTime 最后已读时间* @return 更新数量*/Integer updateLastReadTime(@Param("userId") Long userId, @Param("lastReadTime") LocalDateTime lastReadTime);
}
5.4.3 消息 Mapper XML 实现
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mapper.MessageMapper"><sql id="messageDTOColumns">m.id,m.sender_id as senderId,m.receiver_id as receiverId,m.content,m.type,(case m.type when 1 then '系统消息' when 2 then '通知消息' when 3 then '私信' else '' end) as typeName,m.status,(case m.status when 0 then '未读' when 1 then '已读' when 2 then '已删除' else '' end) as statusName,m.create_time as createTime</sql><select id="selectUserMessages" resultType="com.example.dto.MessageDTO">select<include refid="messageDTOColumns"/>from message mwhere m.receiver_id = #{receiverId}<if test="status != null">and m.status = #{status}</if>order by m.create_time desc</select><select id="selectUnreadCount" resultType="java.lang.Integer">select count(1) from messagewhere receiver_id = #{receiverId}and status = 0</select><select id="selectUnreadMessagesAfterTime" resultType="com.example.dto.MessageDTO">select<include refid="messageDTOColumns"/>from message mwhere m.receiver_id = #{receiverId}and m.status = 0and m.create_time > #{time}order by m.create_time asc</select><update id="batchUpdateStatus">update messageset status = #{newStatus},update_time = now()where receiver_id = #{receiverId}and status = #{status}and create_time <= #{time}</update>
</mapper>
5.4.4 消息已读记录 Mapper XML 实现
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mapper.MessageReadRecordMapper"><select id="selectLastReadTimeByUserId" resultType="com.example.entity.MessageReadRecord">select id, user_id as userId, last_read_time as lastReadTime, create_time as createTime, update_time as updateTimefrom message_read_recordwhere user_id = #{userId}limit 1</select><update id="updateLastReadTime">update message_read_recordset last_read_time = #{lastReadTime},update_time = now()where user_id = #{userId}</update>
</mapper>
5.5 服务层
实现业务逻辑,包括消息的 CRUD 和 WebSocket 相关操作:
5.5.1 消息服务接口
package com.example.service;import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.dto.MessageDTO;
import com.example.entity.Message;
import com.baomidou.mybatisplus.extension.service.IService;import java.time.LocalDateTime;
import java.util.List;/*** 消息服务接口** @author ken*/
public interface MessageService extends IService<Message> {/*** 发送消息** @param senderId 发送者ID* @param receiverId 接收者ID* @param content 消息内容* @param type 消息类型* @return 消息DTO*/MessageDTO sendMessage(Long senderId, Long receiverId, String content, Integer type);/*** 分页查询用户消息** @param page 分页参数* @param receiverId 接收者ID* @param status 消息状态,null表示查询所有状态* @return 分页消息列表*/IPage<MessageDTO> getUserMessages(Page<MessageDTO> page, Long receiverId, Integer status);/*** 获取用户未读消息数量** @param userId 用户ID* @return 未读消息数量*/Integer getUnreadCount(Long userId);/*** 标记消息为已读** @param userId 用户ID* @param messageId 消息ID* @return 是否成功*/boolean markAsRead(Long userId, Long messageId);/*** 批量标记消息为已读** @param userId 用户ID* @param time 时间点,该时间点之前的所有未读消息将被标记为已读* @return 标记数量*/int batchMarkAsRead(Long userId, LocalDateTime time);/*** 获取用户指定时间后的未读消息** @param userId 用户ID* @param time 时间点* @return 未读消息列表*/List<MessageDTO> getUnreadMessagesAfterTime(Long userId, LocalDateTime time);/*** 获取用户最后已读时间** @param userId 用户ID* @return 最后已读时间*/LocalDateTime getLastReadTime(Long userId);/*** 更新用户最后已读时间** @param userId 用户ID* @param time 最后已读时间*/void updateLastReadTime(Long userId, LocalDateTime time);
}
5.5.2 WebSocket 服务接口
package com.example.service;import com.example.dto.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;import java.io.IOException;/*** WebSocket服务接口** @author ken*/
public interface WebSocketService {/*** 连接建立后处理** @param session WebSocket会话* @param userId 用户ID*/void handleOpen(WebSocketSession session, Long userId);/*** 处理收到的消息** @param session WebSocket会话* @param message 消息内容* @param userId 用户ID* @throws IOException IO异常*/void handleMessage(WebSocketSession session, String message, Long userId) throws IOException;/*** 连接关闭后处理** @param session WebSocket会话* @param userId 用户ID*/void handleClose(WebSocketSession session, Long userId);/*** 处理错误** @param session WebSocket会话* @param error 错误* @param userId 用户ID*/void handleError(WebSocketSession session, Throwable error, Long userId);/*** 向指定用户发送消息** @param userId 用户ID* @param message 消息* @return 是否发送成功*/boolean sendToUser(Long userId, WebSocketMessage<?> message);/*** 广播消息** @param message 消息*/void broadcast(WebSocketMessage<?> message);
}
5.5.3 消息服务实现类
package com.example.service.impl;import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.dto.MessageDTO;
import com.example.entity.Message;
import com.example.entity.MessageReadRecord;
import com.example.enums.MessageStatusEnum;
import com.example.enums.MessageTypeEnum;
import com.example.exception.BusinessException;
import com.example.mapper.MessageMapper;
import com.example.mapper.MessageReadRecordMapper;
import com.example.service.MessageService;
import com.example.service.WebSocketService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;import java.time.LocalDateTime;
import java.util.List;/*** 消息服务实现类** @author ken*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageServiceImpl extends ServiceImpl<MessageMapper, Message> implements MessageService {private final MessageMapper messageMapper;private final MessageReadRecordMapper messageReadRecordMapper;private final WebSocketService webSocketService;/*** 发送消息** @param senderId 发送者ID* @param receiverId 接收者ID* @param content 消息内容* @param type 消息类型* @return 消息DTO*/@Override@Transactional(rollbackFor = Exception.class)public MessageDTO sendMessage(Long senderId, Long receiverId, String content, Integer type) {log.info("发送消息:senderId={}, receiverId={}, type={}", senderId, receiverId, type);// 参数校验if (ObjectUtils.isEmpty(senderId)) {throw new BusinessException("发送者ID不能为空");}if (ObjectUtils.isEmpty(receiverId)) {throw new BusinessException("接收者ID不能为空");}if (ObjectUtils.isEmpty(content)) {throw new BusinessException("消息内容不能为空");}if (ObjectUtils.isEmpty(type) || MessageTypeEnum.getByCode(type) == null) {throw new BusinessException("消息类型无效");}// 创建消息Message message = new Message();message.setSenderId(senderId);message.setReceiverId(receiverId);message.setContent(content);message.setType(type);message.setStatus(MessageStatusEnum.UNREAD.getCode());message.setCreateTime(LocalDateTime.now());message.setUpdateTime(LocalDateTime.now());// 保存消息int insert = messageMapper.insert(message);if (insert <= 0) {throw new BusinessException("消息发送失败");}// 转换为DTOMessageDTO messageDTO = convertToDTO(message);// 通过WebSocket推送消息给接收者webSocketService.sendToUser(receiverId, com.example.dto.WebSocketMessage.message(messageDTO));return messageDTO;}/*** 分页查询用户消息** @param page 分页参数* @param receiverId 接收者ID* @param status 消息状态,null表示查询所有状态* @return 分页消息列表*/@Overridepublic IPage<MessageDTO> getUserMessages(Page<MessageDTO> page, Long receiverId, Integer status) {log.info("分页查询用户消息:receiverId={}, status={}, page={}", receiverId, status, page.getCurrent());if (ObjectUtils.isEmpty(receiverId)) {throw new BusinessException("接收者ID不能为空");}return messageMapper.selectUserMessages(page, receiverId, status);}/*** 获取用户未读消息数量** @param userId 用户ID* @return 未读消息数量*/@Overridepublic Integer getUnreadCount(Long userId) {log.info("获取用户未读消息数量:userId={}", userId);if (ObjectUtils.isEmpty(userId)) {throw new BusinessException("用户ID不能为空");}return messageMapper.selectUnreadCount(userId);}/*** 标记消息为已读** @param userId 用户ID* @param messageId 消息ID* @return 是否成功*/@Override@Transactional(rollbackFor = Exception.class)public boolean markAsRead(Long userId, Long messageId) {log.info("标记消息为已读:userId={}, messageId={}", userId, messageId);if (ObjectUtils.isEmpty(userId)) {throw new BusinessException("用户ID不能为空");}if (ObjectUtils.isEmpty(messageId)) {throw new BusinessException("消息ID不能为空");}// 查询消息Message message = messageMapper.selectById(messageId);if (ObjectUtils.isEmpty(message)) {throw new BusinessException("消息不存在");}// 验证消息所有者if (!message.getReceiverId().equals(userId)) {throw new BusinessException("无权操作此消息");}// 如果已经是已读状态,直接返回成功if (message.getStatus().equals(MessageStatusEnum.READ.getCode())) {return true;}// 更新消息状态message.setStatus(MessageStatusEnum.READ.getCode());message.setUpdateTime(LocalDateTime.now());int update = messageMapper.updateById(message);// 更新最后已读时间updateLastReadTime(userId, LocalDateTime.now());return update > 0;}/*** 批量标记消息为已读** @param userId 用户ID* @param time 时间点,该时间点之前的所有未读消息将被标记为已读* @return 标记数量*/@Override@Transactional(rollbackFor = Exception.class)public int batchMarkAsRead(Long userId, LocalDateTime time) {log.info("批量标记消息为已读:userId={}, time={}", userId, time);if (ObjectUtils.isEmpty(userId)) {throw new BusinessException("用户ID不能为空");}if (ObjectUtils.isEmpty(time)) {throw new BusinessException("时间点不能为空");}// 批量更新消息状态int count = messageMapper.batchUpdateStatus(userId,MessageStatusEnum.UNREAD.getCode(),MessageStatusEnum.READ.getCode(),time);// 更新最后已读时间updateLastReadTime(userId, LocalDateTime.now());return count;}/*** 获取用户指定时间后的未读消息** @param userId 用户ID* @param time 时间点* @return 未读消息列表*/@Overridepublic List<MessageDTO> getUnreadMessagesAfterTime(Long userId, LocalDateTime time) {log.info("获取用户指定时间后的未读消息:userId={}, time={}", userId, time);if (ObjectUtils.isEmpty(userId)) {throw new BusinessException("用户ID不能为空");}if (ObjectUtils.isEmpty(time)) {throw new BusinessException("时间点不能为空");}return messageMapper.selectUnreadMessagesAfterTime(userId, time);}/*** 获取用户最后已读时间** @param userId 用户ID* @return 最后已读时间*/@Overridepublic LocalDateTime getLastReadTime(Long userId) {log.info("获取用户最后已读时间:userId={}", userId);if (ObjectUtils.isEmpty(userId)) {throw new BusinessException("用户ID不能为空");}MessageReadRecord record = messageReadRecordMapper.selectLastReadTimeByUserId(userId);return ObjectUtils.isEmpty(record) ? LocalDateTime.of(2000, 1, 1, 0, 0, 0) : record.getLastReadTime();}/*** 更新用户最后已读时间** @param userId 用户ID* @param time 最后已读时间*/@Override@Transactional(rollbackFor = Exception.class)public void updateLastReadTime(Long userId, LocalDateTime time) {log.info("更新用户最后已读时间:userId={}, time={}", userId, time);if (ObjectUtils.isEmpty(userId)) {throw new BusinessException("用户ID不能为空");}if (ObjectUtils.isEmpty(time)) {throw new BusinessException("时间点不能为空");}MessageReadRecord record = messageReadRecordMapper.selectLastReadTimeByUserId(userId);if (ObjectUtils.isEmpty(record)) {// 不存在则创建record = new MessageReadRecord();record.setUserId(userId);record.setLastReadTime(time);record.setCreateTime(LocalDateTime.now());record.setUpdateTime(LocalDateTime.now());messageReadRecordMapper.insert(record);} else {// 存在则更新messageReadRecordMapper.updateLastReadTime(userId, time);}}/*** 将实体转换为DTO** @param message 消息实体* @return 消息DTO*/private MessageDTO convertToDTO(Message message) {MessageDTO dto = new MessageDTO();dto.setId(message.getId());dto.setSenderId(message.getSenderId());dto.setReceiverId(message.getReceiverId());dto.setContent(message.getContent());dto.setType(message.getType());MessageTypeEnum typeEnum = MessageTypeEnum.getByCode(message.getType());if (typeEnum != null) {dto.setTypeName(typeEnum.getDesc());}dto.setStatus(message.getStatus());MessageStatusEnum statusEnum = MessageStatusEnum.getByCode(message.getStatus());if (statusEnum != null) {dto.setStatusName(statusEnum.getDesc());}dto.setCreateTime(message.getCreateTime());return dto;}
}
5.5.4 WebSocket 服务实现类
package com.example.service.impl;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.example.dto.WebSocketMessage;
import com.example.exception.BusinessException;
import com.example.service.MessageService;
import com.example.service.WebSocketService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;/*** WebSocket服务实现类** @author ken*/
@Slf4j
@Service
@RequiredArgsConstructor
public class WebSocketServiceImpl implements WebSocketService {/*** 用户ID与WebSocketSession的映射关系* 一个用户可能有多个连接(如多个浏览器标签页)*/private final Map<Long, List<WebSocketSession>> userSessionMap = new ConcurrentHashMap<>();private final MessageService messageService;/*** 连接建立后处理** @param session WebSocket会话* @param userId 用户ID*/@Overridepublic void handleOpen(WebSocketSession session, Long userId) {log.info("WebSocket连接建立:userId={}, sessionId={}", userId, session.getId());if (ObjectUtils.isEmpty(userId)) {sendErrorMessage(session, "用户ID不能为空");try {session.close();} catch (IOException e) {log.error("关闭WebSocket连接失败", e);}return;}// 将会话添加到用户的会话列表中userSessionMap.computeIfAbsent(userId, k -> new CopyOnWriteArrayList<>()).add(session);try {// 获取用户最后已读时间LocalDateTime lastReadTime = messageService.getLastReadTime(userId);// 查询最后已读时间之后的未读消息List<com.example.dto.MessageDTO> unreadMessages = messageService.getUnreadMessagesAfterTime(userId, lastReadTime);// 发送连接成功消息session.sendMessage(new TextMessage(WebSocketMessage.connect("连接成功").toJson()));// 如果有未读消息,推送给用户if (!CollectionUtils.isEmpty(unreadMessages)) {for (com.example.dto.MessageDTO message : unreadMessages) {session.sendMessage(new TextMessage(WebSocketMessage.message(message).toJson()));}// 批量标记为已读messageService.batchMarkAsRead(userId, LocalDateTime.now());}} catch (Exception e) {log.error("处理WebSocket连接建立事件失败", e);sendErrorMessage(session, "连接处理失败:" + e.getMessage());}}/*** 处理收到的消息** @param session WebSocket会话* @param message 消息内容* @param userId 用户ID* @throws IOException IO异常*/@Overridepublic void handleMessage(WebSocketSession session, String message, Long userId) throws IOException {log.info("收到WebSocket消息:userId={}, message={}", userId, message);try {// 解析消息JSONObject jsonObject = JSON.parseObject(message);if (ObjectUtils.isEmpty(jsonObject)) {sendErrorMessage(session, "消息格式无效");return;}String type = jsonObject.getString("type");if (ObjectUtils.isEmpty(type)) {sendErrorMessage(session, "消息类型不能为空");return;}// 处理不同类型的消息switch (type) {case "READ":// 处理已读确认Long messageId = jsonObject.getLong("data");if (ObjectUtils.isEmpty(messageId)) {sendErrorMessage(session, "消息ID不能为空");return;}boolean success = messageService.markAsRead(userId, messageId);if (success) {session.sendMessage(new TextMessage(WebSocketMessage.read(messageId).toJson()));} else {sendErrorMessage(session, "标记已读失败");}break;case "BATCH_READ":// 处理批量已读确认Long timestamp = jsonObject.getLong("data");if (ObjectUtils.isEmpty(timestamp)) {sendErrorMessage(session, "时间戳不能为空");return;}LocalDateTime time = LocalDateTime.ofEpochSecond(timestamp / 1000, 0, java.time.ZoneOffset.of("+8"));int count = messageService.batchMarkAsRead(userId, time);session.sendMessage(new TextMessage(WebSocketMessage.message("成功标记" + count + "条消息为已读").toJson()));break;default:sendErrorMessage(session, "不支持的消息类型:" + type);}} catch (BusinessException e) {log.error("处理WebSocket消息业务异常", e);sendErrorMessage(session, e.getMessage());} catch (Exception e) {log.error("处理WebSocket消息失败", e);sendErrorMessage(session, "处理消息失败:" + e.getMessage());}}/*** 连接关闭后处理** @param session WebSocket会话* @param userId 用户ID*/@Overridepublic void handleClose(WebSocketSession session, Long userId) {log.info("WebSocket连接关闭:userId={}, sessionId={}", userId, session.getId());if (!ObjectUtils.isEmpty(userId)) {List<WebSocketSession> sessions = userSessionMap.get(userId);if (!CollectionUtils.isEmpty(sessions)) {sessions.remove(session);// 如果用户没有任何连接了,从映射中移除if (sessions.isEmpty()) {userSessionMap.remove(userId);}}}}/*** 处理错误** @param session WebSocket会话* @param error 错误* @param userId 用户ID*/@Overridepublic void handleError(WebSocketSession session, Throwable error, Long userId) {log.error("WebSocket错误:userId={}, sessionId={}", userId, session.getId(), error);sendErrorMessage(session, "发生错误:" + error.getMessage());}/*** 向指定用户发送消息** @param userId 用户ID* @param message 消息* @return 是否发送成功*/@Overridepublic boolean sendToUser(Long userId, WebSocketMessage<?> message) {log.info("向用户发送WebSocket消息:userId={}, messageType={}", userId, message.getType());if (ObjectUtils.isEmpty(userId) || ObjectUtils.isEmpty(message)) {log.error("发送消息失败:用户ID或消息为空");return false;}List<WebSocketSession> sessions = userSessionMap.get(userId);if (CollectionUtils.isEmpty(sessions)) {log.info("用户没有活跃的WebSocket连接:userId={}", userId);return false;}boolean allSuccess = true;String jsonMessage = message.toJson();TextMessage textMessage = new TextMessage(jsonMessage);// 向用户的所有连接发送消息for (WebSocketSession session : sessions) {try {if (session.isOpen()) {session.sendMessage(textMessage);log.info("向用户发送消息成功:userId={}, sessionId={}", userId, session.getId());} else {log.warn("WebSocket会话已关闭,无法发送消息:userId={}, sessionId={}", userId, session.getId());allSuccess = false;// 移除已关闭的会话sessions.remove(session);}} catch (IOException e) {log.error("向用户发送消息失败:userId={}, sessionId={}", userId, session.getId(), e);allSuccess = false;}}return allSuccess;}/*** 广播消息** @param message 消息*/@Overridepublic void broadcast(WebSocketMessage<?> message) {log.info("广播WebSocket消息:messageType={}", message.getType());if (ObjectUtils.isEmpty(message)) {log.error("广播消息失败:消息为空");return;}String jsonMessage = message.toJson();TextMessage textMessage = new TextMessage(jsonMessage);// 向所有用户的所有连接发送消息for (Map.Entry<Long, List<WebSocketSession>> entry : userSessionMap.entrySet()) {Long userId = entry.getKey();List<WebSocketSession> sessions = entry.getValue();for (WebSocketSession session : sessions) {try {if (session.isOpen()) {session.sendMessage(textMessage);log.info("向用户广播消息成功:userId={}, sessionId={}", userId, session.getId());} else {log.warn("WebSocket会话已关闭,无法广播消息:userId={}, sessionId={}", userId, session.getId());// 移除已关闭的会话sessions.remove(session);}} catch (IOException e) {log.error("向用户广播消息失败:userId={}, sessionId={}", userId, session.getId(), e);}}// 如果用户没有任何连接了,从映射中移除if (sessions.isEmpty()) {userSessionMap.remove(userId);}}}/*** 发送错误消息** @param session WebSocket会话* @param errorMessage 错误消息*/private void sendErrorMessage(WebSocketSession session, String errorMessage) {try {if (session.isOpen()) {session.sendMessage(new TextMessage(WebSocketMessage.error(errorMessage).toJson()));}} catch (IOException e) {log.error("发送错误消息失败", e);}}
}
5.6 控制器层
实现 HTTP 接口和 WebSocket 处理器:
5.6.1 消息控制器(HTTP 接口)
package com.example.controller;import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.dto.MessageDTO;
import com.example.service.MessageService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;/*** 消息控制器(HTTP接口)* 保留部分HTTP接口用于兼容和特殊场景** @author ken*/
@Slf4j
@RestController
@RequestMapping("/api/messages")
@RequiredArgsConstructor
@Tag(name = "消息管理", description = "消息相关的HTTP接口")
public class MessageController {private final MessageService messageService;/*** 发送消息** @param senderId 发送者ID* @param receiverId 接收者ID* @param content 消息内容* @param type 消息类型* @return 消息DTO*/@PostMapping("/send")@Operation(summary = "发送消息", description = "发送一条新消息")public MessageDTO sendMessage(@Parameter(description = "发送者ID", required = true) @RequestParam Long senderId,@Parameter(description = "接收者ID", required = true) @RequestParam Long receiverId,@Parameter(description = "消息内容", required = true) @RequestParam String content,@Parameter(description = "消息类型:1-系统消息,2-通知消息,3-私信", required = true) @RequestParam Integer type) {return messageService.sendMessage(senderId, receiverId, content, type);}/*** 分页查询用户消息** @param userId 用户ID* @param status 消息状态:0-未读,1-已读,2-已删除,null-所有* @param pageNum 页码* @param pageSize 每页大小* @return 分页消息列表*/@GetMapping("/user")@Operation(summary = "分页查询用户消息", description = "分页查询指定用户的消息")public IPage<MessageDTO> getUserMessages(@Parameter(description = "用户ID", required = true) @RequestParam Long userId,@Parameter(description = "消息状态:0-未读,1-已读,2-已删除,null-所有") @RequestParam(required = false) Integer status,@Parameter(description = "页码,默认1") @RequestParam(defaultValue = "1") Integer pageNum,@Parameter(description = "每页大小,默认10") @RequestParam(defaultValue = "10") Integer pageSize) {Page<MessageDTO> page = new Page<>(pageNum, pageSize);return messageService.getUserMessages(page, userId, status);}/*** 获取用户未读消息数量** @param userId 用户ID* @return 未读消息数量*/@GetMapping("/unread/count")@Operation(summary = "获取用户未读消息数量", description = "获取指定用户的未读消息数量")public Integer getUnreadCount(@Parameter(description = "用户ID", required = true) @RequestParam Long userId) {return messageService.getUnreadCount(userId);}/*** 标记消息为已读** @param userId 用户ID* @param messageId 消息ID* @return 是否成功*/@PutMapping("/read")@Operation(summary = "标记消息为已读", description = "将指定消息标记为已读")public Boolean markAsRead(@Parameter(description = "用户ID", required = true) @RequestParam Long userId,@Parameter(description = "消息ID", required = true) @RequestParam Long messageId) {return messageService.markAsRead(userId, messageId);}
}
5.6.2 WebSocket 控制器
package com.example.controller;import com.example.service.WebSocketService;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.socket.server.support.WebSocketHandlerRegistry;import java.util.Map;/*** WebSocket控制器** @author ken*/
@Slf4j
@RestController
@RequestMapping("/ws")
@RequiredArgsConstructor
@Tag(name = "WebSocket消息", description = "WebSocket消息相关接口")
public class WebSocketController implements HandshakeInterceptor {private final WebSocketService webSocketService;/*** 注册WebSocket处理器** @param registry WebSocket处理器注册表*/@org.springframework.web.socket.config.annotation.WebSocketHandlerRegistrypublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(new CustomWebSocketHandler(webSocketService), "/messages").addInterceptors(this).setAllowedOrigins("*"); // 实际生产环境中应指定具体的允许跨域的域名}/*** 握手前处理** @param request 请求* @param response 响应* @param wsHandler WebSocket处理器* @param attributes 属性* @return 是否继续握手* @throws Exception 异常*/@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {log.info("WebSocket握手开始");// 从请求参数中获取用户IDif (request instanceof ServletServerHttpRequest servletRequest) {String userIdStr = servletRequest.getServletRequest().getParameter("userId");if (userIdStr == null || userIdStr.isEmpty()) {log.error("WebSocket握手失败:用户ID为空");return false;}try {Long userId = Long.parseLong(userIdStr);attributes.put("userId", userId);log.info("WebSocket握手:userId={}", userId);return true;} catch (NumberFormatException e) {log.error("WebSocket握手失败:用户ID格式无效", e);return false;}}log.error("WebSocket握手失败:请求类型不支持");return false;}/*** 握手后处理** @param request 请求* @param response 响应* @param wsHandler WebSocket处理器* @param exception 异常*/@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Exception exception) {log.info("WebSocket握手完成");}/*** 自定义WebSocket处理器*/public static class CustomWebSocketHandler extends org.springframework.web.socket.handler.TextWebSocketHandler {private final WebSocketService webSocketService;public CustomWebSocketHandler(WebSocketService webSocketService) {this.webSocketService = webSocketService;}/*** 连接建立后调用** @param session WebSocket会话* @throws Exception 异常*/@Overridepublic void afterConnectionEstablished(org.springframework.web.socket.WebSocketSession session) throws Exception {Long userId = (Long) session.getAttributes().get("userId");webSocketService.handleOpen(session, userId);}/*** 收到消息时调用** @param session WebSocket会话* @param message 消息* @throws Exception 异常*/@Overrideprotected void handleTextMessage(org.springframework.web.socket.WebSocketSession session,org.springframework.web.socket.TextMessage message) throws Exception {Long userId = (Long) session.getAttributes().get("userId");webSocketService.handleMessage(session, message.getPayload(), userId);}/*** 连接关闭后调用** @param session WebSocket会话* @param status 状态* @throws Exception 异常*/@Overridepublic void afterConnectionClosed(org.springframework.web.socket.WebSocketSession session,org.springframework.web.socket.CloseStatus status) throws Exception {Long userId = (Long) session.getAttributes().get("userId");webSocketService.handleClose(session, userId);}/*** 发生错误时调用** @param session WebSocket会话* @param exception 异常* @throws Exception 异常*/@Overridepublic void handleTransportError(org.springframework.web.socket.WebSocketSession session,Throwable exception) throws Exception {Long userId = (Long) session.getAttributes().get("userId");webSocketService.handleError(session, exception, userId);}}
}
5.7 配置类
实现必要的配置:
5.7.1 WebSocket 配置
package com.example.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** WebSocket配置类** @author ken*/
@Configuration
public class WebSocketConfig {/*** 注册ServerEndpointExporter,自动注册使用@ServerEndpoint注解的Bean** @return ServerEndpointExporter*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
5.7.2 Swagger3 配置
package com.example.config;import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Info;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Swagger3配置类** @author ken*/
@Configuration
public class SwaggerConfig {/*** 配置OpenAPI信息** @return OpenAPI*/@Beanpublic OpenAPI customOpenAPI() {return new OpenAPI().info(new Info().title("站内消息WebSocket服务API").version("1.0").description("将站内消息接口改造为WebSocket服务的示例项目API文档"));}
}
5.7.3 安全配置
package com.example.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.web.SecurityFilterChain;/*** 安全配置类* 实际项目中应根据需求完善安全配置** @author ken*/
@Configuration
@EnableWebSecurity
public class SecurityConfig {/*** 配置安全过滤器链** @param http HttpSecurity* @return SecurityFilterChain* @throws Exception 异常*/@Beanpublic SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {http.csrf(csrf -> csrf.disable()) // 为了简化示例,禁用CSRF.authorizeHttpRequests(auth -> auth.anyRequest().permitAll() // 允许所有请求访问,实际项目中应根据需求配置权限);return http.build();}
}
5.8 工具类
实现安全相关的工具类:
package com.example.util;import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;/*** 安全工具类** @author ken*/
public class SecurityUtil {/*** 获取当前登录用户ID* 实际项目中应根据认证方式实现** @return 当前登录用户ID*/public static Long getCurrentUserId() {// 这里只是示例,实际项目中应从SecurityContext中获取真实的用户IDAuthentication authentication = SecurityContextHolder.getContext().getAuthentication();if (authentication == null) {return null;}// 假设用户名是用户ID的字符串形式try {return Long.parseLong(authentication.getName());} catch (NumberFormatException e) {return null;}}
}
5.9 应用启动类
package com.example;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.EnableAspectJAutoProxy;/*** 应用启动类** @author ken*/
@SpringBootApplication
@MapperScan("com.example.mapper")
@EnableAspectJAutoProxy(exposeProxy = true)
public class MessageApplication {public static void main(String[] args) {SpringApplication.run(MessageApplication.class, args);}
}
六、前端实现示例
为了完整展示 WebSocket 的使用,我们提供一个简单的前端页面示例,使用原生 JavaScript 实现 WebSocket 客户端:
<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>站内消息WebSocket示例</title><style>body {font-family: Arial, sans-serif;max-width: 800px;margin: 0 auto;padding: 20px;}.container {border: 1px solid #ccc;border-radius: 5px;padding: 20px;}.connection-status {color: #666;margin-bottom: 10px;}.connected {color: green;}.disconnected {color: red;}.messages {border: 1px solid #eee;height: 400px;overflow-y: auto;margin-bottom: 20px;padding: 10px;}.message {margin-bottom: 10px;padding: 10px;border-radius: 5px;background-color: #f5f5f5;}.message.unread {background-color: #e3f2fd;border-left: 3px solid #2196f3;}.message .info {font-size: 0.8em;color: #666;margin-bottom: 5px;}.input-area {display: flex;gap: 10px;}#messageContent {flex: 1;padding: 10px;border: 1px solid #ccc;border-radius: 5px;}button {padding: 10px 20px;background-color: #2196f3;color: white;border: none;border-radius: 5px;cursor: pointer;}button:hover {background-color: #0b7dda;}.controls {margin-bottom: 20px;display: flex;gap: 10px;}</style>
</head>
<body><div class="container"><h1>站内消息WebSocket示例</h1><div class="controls"><input type="number" id="userId" placeholder="输入用户ID" value="1"><button onclick="connect()">连接WebSocket</button><button onclick="disconnect()">断开连接</button><span id="status" class="connection-status disconnected">未连接</span></div><div class="messages" id="messagesContainer"></div><div class="input-area"><input type="number" id="receiverId" placeholder="接收者ID" value="2"><input type="text" id="messageContent" placeholder="输入消息内容"><button onclick="sendMessage()">发送消息</button></div></div><script>let webSocket;let userIdInput = document.getElementById('userId');let receiverIdInput = document.getElementById('receiverId');let messageContentInput = document.getElementById('messageContent');let messagesContainer = document.getElementById('messagesContainer');let statusElement = document.getElementById('status');/*** 连接WebSocket*/function connect() {let userId = userIdInput.value.trim();if (!userId) {alert('请输入用户ID');return;}// 关闭已有的连接if (webSocket) {webSocket.close();}// 创建WebSocket连接const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';const wsUrl = `${wsProtocol}//${window.location.host}/ws/messages?userId=${userId}`;webSocket = new WebSocket(wsUrl);// 连接成功事件webSocket.onopen = function(event) {console.log('WebSocket连接已建立');statusElement.textContent = '已连接';statusElement.className = 'connection-status connected';addMessage('系统消息', '连接成功,可以接收消息了', new Date(), true);};// 收到消息事件webSocket.onmessage = function(event) {console.log('收到消息:', event.data);try {let message = JSON.parse(event.data);handleReceivedMessage(message);} catch (e) {console.error('解析消息失败:', e);addMessage('系统错误', '收到无效格式的消息', new Date(), false);}};// 连接关闭事件webSocket.onclose = function(event) {console.log('WebSocket连接已关闭,代码:', event.code, '原因:', event.reason);statusElement.textContent = '未连接';statusElement.className = 'connection-status disconnected';addMessage('系统消息', '连接已关闭', new Date(), true);};// 连接错误事件webSocket.onerror = function(error) {console.error('WebSocket错误:', error);addMessage('系统错误', '连接发生错误', new Date(), false);};}/*** 断开WebSocket连接*/function disconnect() {if (webSocket) {webSocket.close();webSocket = null;}}/*** 发送消息*/function sendMessage() {if (!webSocket || webSocket.readyState !== WebSocket.OPEN) {alert('请先连接WebSocket');return;}let receiverId = receiverIdInput.value.trim();let content = messageContentInput.value.trim();if (!receiverId) {alert('请输入接收者ID');return;}if (!content) {alert('请输入消息内容');return;}// 发送HTTP请求创建消息fetch('/api/messages/send', {method: 'POST',headers: {'Content-Type': 'application/x-www-form-urlencoded',},body: `senderId=${userIdInput.value}&receiverId=${receiverId}&content=${encodeURIComponent(content)}&type=3`}).then(response => {if (!response.ok) {throw new Error('发送消息失败');}return response.json();}).then(data => {console.log('消息发送成功:', data);addMessage(`我发送给 ${receiverId}`, content, new Date(data.createTime), false);messageContentInput.value = '';}).catch(error => {console.error('发送消息失败:', error);alert('发送消息失败: ' + error.message);});}/*** 处理收到的消息* @param {Object} message 消息对象*/function handleReceivedMessage(message) {switch (message.type) {case 'CONNECT':addMessage('系统消息', message.data, new Date(message.timestamp), true);break;case 'MESSAGE':let msgData = message.data;let isUnread = msgData.status === 0;addMessage(`来自 ${msgData.senderId} 的消息`, msgData.content, new Date(msgData.createTime), isUnread);// 如果是未读消息,发送已读确认if (isUnread) {sendReadConfirmation(msgData.id);}break;case 'READ':addMessage('系统消息', `消息 ${message.data} 已标记为已读`, new Date(message.timestamp), true);break;case 'ERROR':addMessage('系统错误', message.data, new Date(message.timestamp), false);break;default:addMessage('系统消息', `收到未知类型的消息: ${JSON.stringify(message)}`, new Date(message.timestamp), true);}}/*** 发送已读确认* @param {number} messageId 消息ID*/function sendReadConfirmation(messageId) {if (!webSocket || webSocket.readyState !== WebSocket.OPEN) {console.log('WebSocket未连接,无法发送已读确认');return;}let readMessage = {type: 'READ',data: messageId};webSocket.send(JSON.stringify(readMessage));console.log('发送已读确认:', readMessage);}/*** 添加消息到页面* @param {string} title 消息标题* @param {string} content 消息内容* @param {Date} time 消息时间* @param {boolean} isSystem 是否系统消息*/function addMessage(title, content, time, isSystem) {let messageDiv = document.createElement('div');messageDiv.className = `message ${isSystem ? '' : 'unread'}`;let timeStr = time.toLocaleString();let infoDiv = document.createElement('div');infoDiv.className = 'info';infoDiv.textContent = `${title} [${timeStr}]`;let contentDiv = document.createElement('div');contentDiv.className = 'content';contentDiv.textContent = content;messageDiv.appendChild(infoDiv);messageDiv.appendChild(contentDiv);messagesContainer.prepend(messageDiv); // 添加到最前面}// 页面关闭时断开连接window.onbeforeunload = function() {disconnect();};</script>
</body>
</html>
七、测试与验证
为了确保系统能够正常工作,我们需要进行全面的测试验证:
7.1 测试环境准备
- 确保 MySQL 数据库已启动,并创建了
message_db
数据库 - 执行前面提供的 SQL 脚本创建数据表
- 启动 Spring Boot 应用,确保应用正常运行
- 将前端 HTML 页面部署到 Web 服务器或直接在浏览器中打开
7.2 功能测试步骤
7.2.1 连接测试
- 打开前端页面,输入用户 ID(如 1)
- 点击 "连接 WebSocket" 按钮
- 确认页面显示 "已连接" 状态
- 查看服务器日志,确认连接成功
7.2.2 消息发送与接收测试
- 使用两个浏览器窗口或标签页打开前端页面
- 窗口 1 输入用户 ID=1 并连接
- 窗口 2 输入用户 ID=2 并连接
- 在窗口 1 中,接收者 ID 输入 2,消息内容输入 "Hello, User2!",点击 "发送消息"
- 确认窗口 2 收到消息 "Hello, User2!"
- 查看窗口 2 中的消息是否标记为未读
- 确认窗口 2 自动发送已读确认
- 重复步骤 4-7,在窗口 2 向窗口 1 发送消息
7.2.3 未读消息测试
- 窗口 1 连接用户 1,窗口 2 连接用户 2
- 关闭窗口 2
- 在窗口 1 向用户 2 发送消息 "Test unread message"
- 重新打开窗口 2 并连接用户 2
- 确认窗口 2 收到刚才发送的未读消息
- 确认消息被自动标记为已读
7.2.4 批量已读测试
- 窗口 1 连接用户 1,窗口 2 连接用户 2
- 窗口 1 向用户 2 发送 3 条消息
- 在窗口 2 中,打开浏览器控制台,执行以下代码发送批量已读确认:
webSocket.send(JSON.stringify({type: 'BATCH_READ', data: Date.now()}))
- 确认服务器日志显示批量标记已读成功
- 确认窗口 2 收到标记成功的反馈消息
7.3 性能与稳定性测试
- 并发连接测试:使用工具模拟多个用户同时连接 WebSocket 服务,观察服务器性能
- 消息吞吐量测试:模拟大量消息发送,测试系统处理能力
- 断线重连测试:断开网络后重新连接,确认能正常接收重连后的消息
- 长时间运行测试:保持连接数小时,观察是否有内存泄漏或连接异常
八、生产环境注意事项
将 WebSocket 服务部署到生产环境时,需要注意以下事项:
8.1 安全性考虑
- 认证与授权:在实际项目中,应实现完善的认证机制,不能仅通过 URL 参数传递用户 ID
- 数据加密:使用 wss:// 协议(WebSocket Secure)加密传输数据
- 输入验证:严格验证所有收到的消息内容,防止注入攻击
- CSRF 防护:在握手过程中加入 CSRF 令牌验证
- 限流:限制单个 IP 或用户的连接数和消息发送频率
8.2 高可用性设计
- 集群部署:WebSocket 服务需要集群部署时,需使用消息中间件(如 RabbitMQ、Redis)实现集群内消息同步
- 会话共享:使用 Redis 等实现 WebSocket 会话的分布式存储
- 负载均衡:使用支持 WebSocket 的负载均衡器(如 Nginx),配置
proxy_set_header Upgrade $http_upgrade;
和proxy_set_header Connection "upgrade";
- 心跳检测:实现 WebSocket 心跳机制,及时检测断开的连接
8.3 性能优化
- 连接池管理:合理设置连接池大小,避免资源耗尽
- 异步处理:消息处理尽量采用异步方式,避免阻塞 WebSocket 线程
- 消息压缩:对大型消息进行压缩传输
- 批量处理:对于高频消息,考虑批量处理和发送
- 内存管理:注意 WebSocketSession 的内存占用,及时清理无效连接
8.4 监控与日志
- 连接监控:监控当前连接数、连接成功率、断开率等指标
- 消息监控:监控消息发送量、接收量、处理耗时等指标
- 错误监控:监控连接错误、消息处理错误等异常
- 详细日志:记录关键操作日志,但避免记录敏感信息
- 告警机制:当连接数过高、错误率上升时触发告警
九、总结与展望
本文详细介绍了如何将传统的站内消息查询接口改造为基于 WebSocket 的实时推送服务。我们从原理分析、方案设计、代码实现到测试验证,全面覆盖了改造过程中的各个环节。
WebSocket 技术为实时通信提供了高效的解决方案,不仅适用于站内消息,还可应用于在线聊天、实时通知、协同编辑等多种场景。希望本文能帮助你在实际项目中成功应用 WebSocket 技术,提升用户体验。