当前位置: 首页 > news >正文

基于Spring Boot和WebSocket的实时聊天系统

一、项目架构设计

1. 技术栈组成

组件用途版本
Spring Boot基础框架2.7.x
javax.websocketWebSocket实现JSR-356
Mybatis-Plus数据持久化3.5.x
Redis缓存/消息队列6.2.x
MongoDB聊天记录存储5.0.x
MinIO文件存储8.0.x
Kafka消息分发2.8.x
OAuth2认证授权2.5.x

2. 系统架构图

客户端
WebSocket连接
Spring Boot服务
消息处理器
MySQL: 用户/关系
MongoDB: 聊天记录
Redis: 在线状态
MinIO: 文件存储
Kafka: 消息分发

二、核心功能实现

1. WebSocket配置增强版

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(chatWebSocketHandler(), "/websocket").setAllowedOrigins("*").addInterceptors(new AuthHandshakeInterceptor()).withSockJS();}@Beanpublic WebSocketHandler chatWebSocketHandler() {return new ChatWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();container.setMaxTextMessageBufferSize(8192);container.setMaxBinaryMessageBufferSize(8192);container.setMaxSessionIdleTimeout(600000L);return container;}
}

2. 消息处理器实现

public class ChatWebSocketHandler extends TextWebSocketHandler {private static final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();private final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic void afterConnectionEstablished(WebSocketSession session) {String userId = getUserIdFromSession(session);sessions.put(userId, session);updateOnlineStatus(userId, true);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {try {ChatMessage chatMessage = objectMapper.readValue(message.getPayload(), ChatMessage.class);switch (chatMessage.getType()) {case HEARTBEAT:handleHeartbeat(session);break;case SINGLE_CHAT:handleSingleChat(chatMessage);break;case GROUP_CHAT:handleGroupChat(chatMessage);break;case READ_RECEIPT:handleReadReceipt(chatMessage);break;case FILE_UPLOAD:handleFileUpload(chatMessage);break;}} catch (Exception e) {log.error("消息处理异常", e);}}private void handleHeartbeat(WebSocketSession session) {try {session.sendMessage(new TextMessage("{\\"type\\":\\"HEARTBEAT_RESPONSE\\"}"));} catch (IOException e) {log.error("心跳响应失败", e);}}// 其他处理方法...
}

三、关键业务逻辑实现

1. 消息存储设计

MySQL表结构

CREATE TABLE `chat_message` (`id` bigint NOT NULL AUTO_INCREMENT,`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',`sender_id` varchar(64) NOT NULL,`receiver_id` varchar(64) NOT NULL,`content` text,`msg_type` tinyint NOT NULL COMMENT '1-文本 2-图片 3-视频',`status` tinyint DEFAULT '0' COMMENT '0-未读 1-已读',`created_at` datetime NOT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uk_msg_id` (`msg_id`),KEY `idx_sender_receiver` (`sender_id`,`receiver_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

MongoDB文档结构

@Document(collection = "chat_messages")
public class ChatMessageDocument {@Idprivate String id;private String msgId;private String senderId;private String receiverId;private String content;private MessageType msgType;private MessageStatus status;private Date createdAt;private List<ReadReceipt> readReceipts;// 嵌套文档public static class ReadReceipt {private String userId;private Date readAt;}
}

2. 消息分发流程

@Service
@RequiredArgsConstructor
public class MessageDispatcher {private final KafkaTemplate<String, String> kafkaTemplate;private final RedisTemplate<String, String> redisTemplate;public void dispatch(ChatMessage message) {// 存储消息storeMessage(message);// 实时推送if (isUserOnline(message.getReceiverId())) {realtimePush(message);} else {// 离线用户通过推送通知pushNotification(message);}// 发往Kafka做后续处理kafkaTemplate.send("chat-messages", message.getMsgId(), serialize(message));}private boolean isUserOnline(String userId) {return redisTemplate.opsForValue().get("user:online:" + userId) != null;}private void realtimePush(ChatMessage message) {WebSocketSession session = sessions.get(message.getReceiverId());if (session != null && session.isOpen()) {try {session.sendMessage(new TextMessage(serialize(message)));} catch (IOException e) {log.error("消息推送失败", e);}}}
}

四、高级功能实现

1. 心跳检测机制

@Scheduled(fixedRate = 30000)
public void checkHeartbeat() {long now = System.currentTimeMillis();sessions.forEach((userId, session) -> {Long lastHeartbeat = heartbeatTimestamps.get(userId);if (lastHeartbeat == null || now - lastHeartbeat > 60000) {try {session.close(CloseStatus.SESSION_NOT_RELIABLE);sessions.remove(userId);updateOnlineStatus(userId, false);} catch (IOException e) {log.error("关闭会话失败", e);}}});
}

2. 消息已读回执

public void handleReadReceipt(ChatMessage message) {// 更新MySQL中的消息状态chatMessageMapper.updateStatusByMsgId(message.getMsgId(), MessageStatus.READ);// 更新MongoDB中的阅读状态Query query = Query.query(Criteria.where("msgId").is(message.getMsgId()));Update update = new Update().push("readReceipts", new ReadReceipt(message.getSenderId(), new Date())).set("status", MessageStatus.READ);mongoTemplate.updateFirst(query, update, ChatMessageDocument.class);// 通知发送方消息已读if (isUserOnline(message.getSenderId())) {realtimePush(new ChatMessage(MessageType.READ_RECEIPT,message.getMsgId(),message.getReceiverId(),message.getSenderId()));}
}

五、性能优化方案

1. 消息批量处理

@KafkaListener(topics = "chat-messages", groupId = "message-processor")
public void processMessages(List<ConsumerRecord<String, String>> records) {List<ChatMessage> messages = records.stream().map(record -> deserialize(record.value())).collect(Collectors.toList());// 批量存储MySQLchatMessageMapper.batchInsert(messages);// 批量存储MongoDBList<ChatMessageDocument> documents = messages.stream().map(this::convertToDocument).collect(Collectors.toList());mongoTemplate.insertAll(documents);
}

2. Redis缓存优化

@Service
public class UserStatusService {private final RedisTemplate<String, String> redisTemplate;public boolean isOnline(String userId) {return redisTemplate.opsForValue().get("user:online:" + userId) != null;}public void setOnline(String userId, boolean online) {if (online) {redisTemplate.opsForValue().set("user:online:" + userId,"1",5, TimeUnit.MINUTES);} else {redisTemplate.delete("user:online:" + userId);}}public List<String> getOnlineUsers(List<String> userIds) {return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {for (String userId : userIds) {connection.exists(("user:online:" + userId).getBytes());}return null;}).stream().map(Object::toString).collect(Collectors.toList());}
}

六、安全防护措施

1. OAuth2认证集成

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {http.authorizeHttpRequests(auth -> auth.antMatchers("/websocket/**").authenticated().anyRequest().permitAll()).oauth2ResourceServer(oauth2 -> oauth2.jwt(jwt -> jwt.decoder(jwtDecoder()))).sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS));return http.build();
}@Bean
public JwtDecoder jwtDecoder() {return NimbusJwtDecoder.withJwkSetUri(jwkSetUri).build();
}

2. WebSocket安全拦截器

public class AuthHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Map<String, Object> attributes) {String token = extractToken(request);if (token == null) {return false;}try {Jwt jwt = jwtDecoder.decode(token);attributes.put("userId", jwt.getSubject());return true;} catch (JwtException e) {return false;}}private String extractToken(ServerHttpRequest request) {// 从请求头或参数中提取token}
}

七、部署与监控

1. Docker Compose配置

version: '3.8'services:app:build: .ports:- "8080:8080"depends_on:- redis- mysql- mongodb- kafkaredis:image: redis:6.2ports:- "6379:6379"mysql:image: mysql:8.0environment:MYSQL_ROOT_PASSWORD: rootMYSQL_DATABASE: chat_dbports:- "3306:3306"mongodb:image: mongo:5.0ports:- "27017:27017"kafka:image: bitnami/kafka:2.8ports:- "9092:9092"environment:KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181ALLOW_PLAINTEXT_LISTENER: "yes"zookeeper:image: bitnami/zookeeper:3.7ports:- "2181:2181"

2. Prometheus监控配置

scrape_configs:- job_name: 'chat-app'metrics_path: '/actuator/prometheus'static_configs:- targets: ['app:8080']labels:service: 'chat-service'- job_name: 'redis'static_configs:- targets: ['redis:9121']- job_name: 'mysql'static_configs:- targets: ['mysql:9104']- job_name: 'kafka'static_configs:- targets: ['kafka:7071']

通过以上实现方案,我们构建了一个功能完善、性能优越且安全可靠的实时聊天系统。该系统不仅支持基本的聊天功能,还提供了消息存储、已读回执、文件传输等高级特性,同时具备良好的扩展性和可维护性。

http://www.dtcms.com/a/324709.html

相关文章:

  • Openlayers基础教程|从前端框架到GIS开发系列课程(21)geojson实现线要素和区要素
  • git merge的原理和过程,merge conflict产生的原因、处理的逻辑
  • 【话题讨论】GPT-5 发布全解读:参数升级、长上下文与多领域能力提升
  • MCP学习与实践
  • ESP32安装于配置
  • [激光原理与应用-216]:设计 - 皮秒紫外激光器 - 热管理设计,多维策略保障高效稳定运行
  • 腾讯云EdgeOne Pages深度使用指南
  • 计算机网络:什么是AD域
  • 线程的sleep、wait、join、yield如何使用?
  • 随想记——excel报表
  • XGBoost参数evals的作用及使用方法
  • 【图像算法 - 11】基于深度学习 YOLO 与 ByteTrack 的目标检测与多目标跟踪系统(系统设计 + 算法实现 + 代码详解 + 扩展调优)
  • 什么是缓存击穿、缓存穿透、缓存雪崩及其解决方案
  • Oracle lgwr触发条件
  • Docker 容器化工具及常用操作
  • Excel版经纬度和百分度互转v1.1
  • crc32算法php版----crc32.php
  • 【Spring IoC 核心实现类详解:DefaultListableBeanFactory】
  • Leetcode 3646. Next Special Palindrome Number
  • 分发糖果(贪心算法)
  • Vue.js设计于实现 - 响应式(三)
  • Spring Boot 全局异常处理与日志监控实战
  • OneCode 3.0 可视化功能全面分析:从开发者到用户的全场景解析
  • 一周学会Matplotlib3 Python 数据可视化-绘制条形图(Bar)
  • 论文复现与分析内容关于一种实用的车对车(V2V)可见光通信(VLC)传播模型
  • Z20K118库中寄存器及其库函数封装-REGFILE库
  • Windows执行kubectl提示拒绝访问【Windows安装k8s】
  • imx6ull-驱动开发篇17——linux原子操作实验
  • PXE自动化安装部署OpenEuler24.03LTS
  • MySQL中的in和exists的区别