当前位置: 首页 > news >正文

Spring Boot WebSocket:使用 Java 构建多频道聊天系统

这是一个使用 WebFlux 和 MongoDB 构建响应式 Spring Boot WebSocket 聊天的分步指南,包括配置、处理程序和手动测试。


正如您可能已经从标题中猜到的,今天的主题将是 Spring Boot WebSockets。不久前,我提供了一个基于 Akka 工具包库的 WebSocket 聊天示例。然而,这个聊天将拥有更多一些功能,以及一个相当不同的设计。

我将跳过某些部分,以避免与上一篇文章的内容有太多重复。在这里您可以找到关于 WebSockets 更深入的介绍。请注意,本文中使用的所有代码也可以在 GitHub 仓库中找到。

Spring Boot WebSocket:使用的工具

让我们从描述将用于实现整个应用程序的工具开始本文的技术部分。由于我无法完全掌握如何使用经典的 Spring STOMP 覆盖来构建真正的 WebSocket API,我决定选择 Spring WebFlux 并使一切具有响应式特性。

  • Spring Boot – 基于 Spring 的现代 Java 应用程序离不开 Spring Boot;所有的自动配置都是无价的。
  • Spring WebFlux – 经典 Spring 的响应式版本,为处理 WebSocket 和 REST 提供了相当不错且描述性的工具集。我敢说,这是在 Spring 中实际获得 WebSocket 支持的唯一方法。
  • Mongo – 最流行的 NoSQL 数据库之一,我使用它来存储消息历史记录。
  • Spring Reactive Mongo – 用于以响应式方式处理 Mongo 访问的 Spring Boot 启动器。在一个地方使用响应式而在另一个地方不使用并不是最好的主意。因此,我决定也让数据库访问具有响应式特性。

让我们开始实现吧!

Spring Boot WebSocket:实现

依赖项与配置

pom.xml

<dependencies><!--编译时依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb-reactive</artifactId></dependency>
</dependencies>

application.properties

spring.data.mongodb.uri=mongodb://chats-admin:admin@localhost:27017/chats

我更喜欢 .properties 而不是 .yml——依我拙见,YAML 在较大规模上不可读且难以维护。

WebSocketConfig

@Configuration
class WebSocketConfig {@BeanChatStore chatStore(MessagesStore messagesStore) {return new DefaultChatStore(Clock.systemUTC(), messagesStore);}@BeanWebSocketHandler chatsHandler(ChatStore chatStore) {return new ChatsHandler(chatStore);}@BeanSimpleUrlHandlerMapping handlerMapping(WebSocketHandler wsh) {Map<String, WebSocketHandler> paths = Map.of("/chats/{id}", wsh);return new SimpleUrlHandlerMapping(paths, 1);}@BeanWebSocketHandlerAdapter webSocketHandlerAdapter() {return new WebSocketHandlerAdapter();}
}

出乎意料的是,这里定义的四个 Bean 都非常重要。

  • ChatStore – 用于操作聊天的自定义 Bean,我将在后续步骤中详细介绍这个 Bean。
  • WebSocketHandler – 将存储所有与处理 WebSocket 会话相关逻辑的 Bean。
  • SimpleUrlHandlerMapping – 负责将 URL 映射到正确的处理器,此处理的完整 URL 看起来大致像这样:ws://localhost:8080/chats/{id}
  • WebSocketHandlerAdapter – 一种功能性的 Bean,它为 Spring Dispatcher Servlet 添加了 WebSocket 处理支持。

ChatsHandler

class ChatsHandler implements WebSocketHandler {private final Logger log = LoggerFactory.getLogger(ChatsHandler.class);private final ChatStore store;ChatsHandler(ChatStore store) {this.store = store;}@Overridepublic Mono<Void> handle(WebSocketSession session) {String[] split = session.getHandshakeInfo().getUri().getPath().split("/");String chatIdStr = split[split.length - 1];int chatId = Integer.parseInt(chatIdStr);ChatMeta chatMeta = store.get(chatId);if (chatMeta == null) {return session.close(CloseStatus.GOING_AWAY);}if (!chatMeta.canAddUser()) {return session.close(CloseStatus.NOT_ACCEPTABLE);}String sessionId = session.getId();store.addNewUser(chatId, session);log.info("New User {} join the chat {}", sessionId, chatId);return session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(message -> store.addNewMessage(chatId, sessionId, message)).flatMap(message -> broadcastToSessions(sessionId, message, store.get(chatId).sessions())).doFinally(sig -> store.removeSession(chatId, session.getId())).then();}private Mono<Void> broadcastToSessions(String sessionId, String message, List<WebSocketSession> sessions) {return Flux.fromStream(sessions.stream().filter(session -> !session.getId().equals(sessionId)).map(session -> session.send(Mono.just(session.textMessage(message))))).then();}
}

正如我上面提到的,在这里您可以找到所有与处理 WebSocket 会话相关的逻辑。首先,我们从 URL 解析聊天的 ID 以获取目标聊天。根据特定聊天的上下文,响应不同的状态。

此外,我还将消息广播到与特定聊天相关的所有会话——以便用户实际交换消息。我还添加了 doFinally 触发器,它将从 chatStore 中清除已关闭的会话,以减少冗余通信。总的来说,这段代码是响应式的;我需要遵循一些限制。我试图使其尽可能简单和可读,如果您有任何改进的想法,我持开放态度。

ChatsRouter

@Configuration(proxyBeanMethods = false)
class ChatRouter {private final ChatStore chatStore;ChatRouter(ChatStore chatStore) {this.chatStore = chatStore;}@BeanRouterFunction<ServerResponse> routes() {return RouterFunctions.route(POST("api/v1/chats/create"), e -> create(false)).andRoute(POST("api/v1/chats/create-f2f"), e -> create(true)).andRoute(GET("api/v1/chats/{id}"), this::get).andRoute(DELETE("api/v1/chats/{id}"), this::delete);}
}

WebFlux 定义 REST 端点的方法与经典 Spring 有很大不同。上面,您可以看到用于管理聊天的 4 个端点的定义。与 Akka 实现中的情况类似,我希望有一个用于管理聊天的 REST API 和一个用于实际处理聊天的 WebSocket API。我将跳过函数实现,因为它们非常简单;您可以在 GitHub 上查看它们。

ChatStore

首先,接口:

public interface ChatStore {int create(boolean isF2F);void addNewUser(int id, WebSocketSession session);Mono<String> addNewMessage(int id, String userId, String message);void removeSession(int id, String session);ChatMeta get(int id);ChatMeta delete(int id);
}

然后是实现:

public class DefaultChatStore implements ChatStore {private final Map<Integer, ChatMeta> chats;private final AtomicInteger idGen;private final MessagesStore messagesStore;private final Clock clock;public DefaultChatStore(Clock clock, MessagesStore store) {this.chats = new ConcurrentHashMap<>();this.idGen = new AtomicInteger(0);this.clock = clock;this.messagesStore = store;}@Overridepublic int create(boolean isF2F) {int newId = idGen.incrementAndGet();ChatMeta chatMeta = chats.computeIfAbsent(newId, id -> {if (isF2F) {return ChatMeta.ofId(id);}return ChatMeta.ofIdF2F(id);});return chatMeta.id;}@Overridepublic void addNewUser(int id, WebSocketSession session) {chats.computeIfPresent(id, (k, v) -> v.addUser(session));}@Overridepublic void removeSession(int id, String sessionId) {chats.computeIfPresent(id, (k, v) -> v.removeUser(sessionId));}@Overridepublic Mono<String> addNewMessage(int id, String userId, String message) {ChatMeta meta = chats.getOrDefault(id, null);if (meta != null) {Message messageDoc = new Message(id, userId, meta.offset.getAndIncrement(), clock.instant(), message);return messagesStore.save(messageDoc).map(Message::getContent);}return Mono.empty();}// 省略部分
}

ChatStore 的基础是 ConcurrentHashMap,它保存所有开放聊天的元数据。接口中的大多数方法都不言自明,背后没有什么特别之处。

  • create – 创建一个新聊天,带有一个布尔属性,指示聊天是 f2f 还是群聊。
  • addNewUser – 向现有聊天添加新用户。
  • removeUser – 从现有聊天中移除用户。
  • get – 获取具有 ID 的聊天的元数据。
  • delete – 从 CMH 中删除聊天。

这里唯一复杂的方法是 addNewMessages。它增加聊天内的消息计数器,并将消息内容持久化到 MongoDB 中,以实现持久性。

MongoDB

消息实体

public class Message {@Idprivate String id;private int chatId;private String owner;private long offset;private Instant timestamp;private String content;
}

存储在数据库中的消息内容模型,这里有三个重要的字段:

  1. chatId – 表示发送特定消息的聊天。
  2. ownerId – 消息发送者的用户 ID。
  3. offset – 消息在聊天中的序号,用于检索排序。

MessageStore

public interface MessagesStore extends ReactiveMongoRepository<Message, String> {}

没什么特别的,经典的 Spring 仓库,但是以响应式方式实现,提供了与 JpaRepository 相同的功能集。它直接在 ChatStore 中使用。此外,在主应用程序类 WebsocketsChatApplication 中,我通过使用 @EnableReactiveMongoRepositories 来激活响应式仓库。没有这个注解,上面的 messageStore 将无法工作。好了,我们完成了整个聊天的实现。让我们测试一下!

Spring Boot WebSocket:测试

对于测试,我使用 Postman 和 Simple WebSocket Client。

  1. 我正在使用 Postman 创建一个新聊天。在响应体中,我得到了最近创建的聊天的 WebSocket URL。

图片:Postman 创建聊天请求的屏幕截图

  1. 现在是使用它们并检查用户是否可以相互通信的时候了。Simple Web Socket Client 在这里派上用场。因此,我在这里连接到新创建的聊天。

图片:Simple Web Socket Client 连接界面的屏幕截图

  1. 好了,一切正常,用户可以相互通信了。

图片:两个 WebSocket 客户端交换消息的屏幕截图
图片:两个 WebSocket 客户端交换消息的屏幕截图
图片:两个 WebSocket 客户端交换消息的屏幕截图

还有最后一件事要做。让我们花点时间看看哪些地方可以做得更好。

可以改进的地方

由于我刚刚构建的是最基础的聊天应用程序,有一些(或者实际上相当多)地方可以做得更好。下面,我列出了一些我认为值得改进的方面:

  • 身份验证和重新加入支持 – 目前,一切都基于 sessionId。这不是一个最优的方法。最好能有一些身份验证机制,并基于用户数据实现实际的重新加入。
  • 发送附件 – 目前,聊天仅支持简单的文本消息。虽然发消息是聊天的基本功能,但用户也喜欢交换图片和音频文件。
  • 测试 – 目前没有测试,但为什么要保持这样呢?测试总是一个好主意。
  • offset 溢出 – 目前,它只是一个简单的 int。如果我们要在非常长的时间内跟踪 offset,它迟早会溢出。

总结

好了!Spring Boot WebSocket 聊天已经实现,主要任务已完成。您对下一步要开发什么有了一些想法。

请记住,这个聊天案例非常简单,对于任何类型的商业项目,都需要大量的修改和开发。

无论如何,我希望您在阅读本文时学到了一些新东西。

感谢您的时间。


【注】本文译自:Spring Boot WebSocket: Building a Multichannel Chat in Java

http://www.dtcms.com/a/410041.html

相关文章:

  • 中堂镇仿做网站软文网站有哪些
  • Android 应用配置跳转微信小程序
  • Word和WPS文字中的自动编号和文字间距过大怎么办?
  • 京东零售张泽华:从营销意图到购买转化,AI重塑广告增长
  • Casey‘s EDI 需求分析
  • 网站美工和平面设计师手机网站域名开头
  • 从垂直钻到水平钻:如何用陀螺精准掌控钻井轨迹?
  • yield在Python中的应用
  • Linux配置Java/JDK(解决Kali启动ysoserial.jar JRMPListener报错)暨 Kali安装JAVA8和切换JDK版本的详细过程
  • springboot用jar启动能访问,但是打成war,部署到tomcat却访问不到
  • 免费企业网站建设流程华为公司电子商务网站建设策划书
  • 中国网站备案查询系统东莞seo外包公司哪家好
  • STM32H743-ARM例程6-RS422
  • 倾角传感器厂家为物联网应用提供高效双轴监测解决方案
  • 住宅IP vs 数据中心IP 2025实战性能对决:IPIPGO、天启HTTP、光络云深度横评
  • 成都网站seo公司wordpress页面视频播放
  • EasyClick JavaScript正则表达式匹配规则
  • 盟接之桥说制造:“首件手板”商业模式:制造业的下一个黄金赛道,你的公司准备好了吗?
  • mysql怎么安装,新手安装MySQL后如何安全备份不踩坑?
  • 【Git】分⽀管理
  • 半导体制造常见分析仪器之高分辨率 3D X 射线显微镜
  • PCB 半固化片:多层板制造的技术基石,猎板的场景化适配与质控逻辑
  • 番禺网站设计与制作广州海佳网络网站建设公司怎么样
  • Spring Cloud 负载均衡(LoadBalancer)与服务调用(OpenFeign)详解
  • 基于HTTP构建局域网内YUM网络源:详细操作指南(太细)
  • Java核心 之JVM
  • 通过 GAC Code 在国内使用ClaudeCode,Windows 用户配置指南!
  • iOS App 上架流程详解,苹果应用发布步骤、App Store 审核规则、ipa 文件上传与测试分发实战经验
  • 线程安全之《Sychronized的八锁案例》
  • 用户态的epoll实现思路?