当前位置: 首页 > news >正文

芋道源码 - RabbitMQ + WebSocket 实现分布式消息推送

一、 背景与架构目标

在单机应用中,WebSocket 连接和消息发送都在同一台服务器内完成,实现简单。但在集群环境下,用户可能连接到不同的服务器节点。当需要向某个用户或全体用户广播消息时,问题就出现了:如何让消息准确地送达所有可能连接着目标用户的服务器节点?

解决方案:引入消息队列(如 RabbitMQ)进行解耦。

核心架构图:

[消息发送者] --(1) 发送消息至 RabbitMQ Exchange--> [RabbitMQ]|| (2) 消息路由到所有节点v[WS 节点 A] <--(3) 消费消息,推送至本地 Session-- [Queue A][WS 节点 B] <--(3) 消费消息,推送至本地 Session-- [Queue B][WS 节点 ...]                                      ...

工作流程:

  1. 某台服务器收到一个需要广播的 WebSocket 消息。

  2. 该服务器不直接发送,而是将消息发送到 RabbitMQ 的一个 主题交换机(Topic Exchange)

  3. RabbitMQ 将消息广播到所有监听该交换机的 消息队列(Queue)(通常每个WS服务器节点一个队列)。

  4. 集群中的 每一台 WebSocket 服务器都会消费到这条消息。

  5. 每台服务器在消费消息后,在本地查询是否存在目标 Session,如果存在,则通过本地连接将消息推送给前端。

这样,无论用户连接在哪台服务器,消息都能准确送达。

二、 核心代码与执行流程分析

以下代码基于 芋道源码 的项目结构,清晰地展示了这一流程。

 1. 建立连接与认证

当前端(例如 ws://localhost:48080/infra/ws?token=xxx)发起 WebSocket 连接时:

1.1 Token 认证 (TokenAuthenticationFilter)
/*** Token 过滤器,验证 token 的有效性*/
public class TokenAuthenticationFilter extends OncePerRequestFilter {@Overrideprotected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException {String token = SecurityFrameworkUtils.obtainAuthorization(request, ...);if (StrUtil.isNotEmpty(token)) {Integer userType = WebFrameworkUtils.getLoginUserType(request);try {// 1.1 基于 token 构建登录用户LoginUser loginUser = buildLoginUserByToken(token, userType);// ... (模拟登录等逻辑)// 2. 设置当前用户到 Security 上下文if (loginUser != null) {SecurityFrameworkUtils.setLoginUser(loginUser, request);}} catch (Throwable ex) {// ... 异常处理}}chain.doFilter(request, response);}
}

作用:在 HTTP 握手前,拦截请求,解析 token 参数,并将认证成功的用户信息存入 Security 上下文,为后续 WebSocket 握手拦截器提供用户信息。

1.2 握手拦截器 (LoginUserHandshakeInterceptor)
/*** 登录用户的握手拦截器*/
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {// 从 Security 上下文中获取登录用户LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();if (loginUser != null) {// 将用户信息存入 WebSocket Session 的属性中WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);}return true; // 返回 true 允许握手}// ... afterHandshake 方法
}

作用:在 WebSocket 握手阶段,将 TokenAuthenticationFilter 中设置的 LoginUser 信息,从 HTTP 请求“传递”到 WebSocket Session 的属性中,方便后续业务处理时获取用户身份。

2. 接收前端消息并路由到监听器

连接建立后,前端发送消息,例如:

{"type": "demo-message-send","content": "{\"toUserId\":100,\"text\":\"你好,单聊消息\"}"
}

2.1 消息处理器 (JsonWebSocketMessageHandler)
/*** JSON 格式 WebSocketHandler 实现类*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {// 监听器映射:type -> WebSocketMessageListenerprivate final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();// 构造函数:项目启动时,将所有监听器注册到 Map 中public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {listenersList.forEach(listener -> listeners.put(listener.getType(), listener));}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// ... 解析消息,处理 ping/pong ...// 2.1 解析消息JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);// 2.2 根据 type 获得对应的 WebSocketMessageListenerWebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());// 2.3 处理消息:反序列化 content,并调用监听器的 onMessage 方法Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);messageListener.onMessage(session, messageObj);}
}

作用

  • 启动时注册:通过构造函数,将所有实现了 WebSocketMessageListener 的 Bean 收集起来,以其 getType() 返回值作为 key,构建一个监听器映射。这就是为什么项目启动时会调用 getType 方法。

  • 消息路由:当收到前端消息时,根据消息体中的 type 字段,从 Map 中找到对应的监听器进行处理。

2.2 业务监听器 (DemoWebSocketMessageListener)
/*** WebSocket 示例:单发消息*/
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {@Resourceprivate WebSocketMessageSender webSocketMessageSender;@Overridepublic void onMessage(WebSocketSession session, DemoSendMessage message) {Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);// 情况一:单发if (message.getToUserId() != null) {DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true);// 关键:不直接发送,而是调用 SenderwebSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), "demo-message-receive", toMessage);return;}// 情况二:群发// ... 类似,调用 sendObject 进行群发}@Overridepublic String getType() {return "demo-message-send"; // 声明处理 type 为 ‘demo-message-send’ 的消息}
}

作用:执行业务逻辑(如组装消息体),并决定消息的接收者(单个用户或全体用户)。它不负责真正的网络发送,而是委托给 WebSocketMessageSender

3. 通过 RabbitMQ 广播发送任务

3.1 RabbitMQ 发送者 (RabbitMQWebSocketMessageSender)

/*** 基于 RabbitMQ 的 WebSocketMessageSender 实现类*/
@Slf4j
public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender {private final RabbitTemplate rabbitTemplate;private final TopicExchange topicExchange;@Overridepublic void send(Integer userType, Long userId, String messageType, String messageContent) {sendRabbitMQMessage(null, userId, userType, messageType, messageContent);}private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType, String messageType, String messageContent) {// 构建 MQ 消息体,包含目标信息和消息内容RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage().setSessionId(sessionId).setUserId(userId).setUserType(userType).setMessageType(messageType).setMessageContent(messageContent);// 将消息发送到 RabbitMQ 交换机rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage);}
}

作用:将发送消息的请求(包含接收者信息和消息内容)封装成一个 MQ 消息,发送到 RabbitMQ 的交换机。至此,第一台服务器的任务就完成了,剩下的由 RabbitMQ 和集群中的其他服务器共同完成。

4. 消费 MQ 消息并最终推送

(此部分需要有一个 MQ 消费者 代码,你的示例中未包含,但逻辑如下:)
消费者会监听对应的队列,收到消息后,会调用 AbstractWebSocketMessageSender 的 doSend 方法。

4.1 最终发送者 (AbstractWebSocketMessageSender)
/*** WebSocketMessageSender 实现类*/
@Slf4j
public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender {private final WebSocketSessionManager sessionManager;public void doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent) {// 1. 将消息类型和内容封装成前端约定的格式JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent);String payload = JsonUtils.toJsonString(message);// 2. 遍历本机上的目标 Session,进行发送sessions.forEach(session -> {try {if (session.isOpen()) {session.sendMessage(new TextMessage(payload));}} catch (IOException ex) {log.error("[doSend] 发送消息失败", ex);}});}
}

作用:这是整个流程的终点。它在每一台消费到 MQ 消息的服务器上执行。它根据 MQ 消息中的目标信息(如 userType 和 userId),查询本机的 WebSocketSessionManager,找到具体的 WebSocket 连接,然后将消息通过网络推送给前端。

三、 聊天示例

用户1给用户100发送消息:

ws://localhost:48080/infra/ws?token=09b79f5236dc4548843848e23a897e55

{"type": "demo-message-send","content": "{\"toUserId\":100,\"text\":\"你好,单聊消息\"}"
}

用户100给用户1回复消息:

ws://localhost:48080/infra/ws?token=d9734aef242d496ea404051279470fae

{"type": "demo-message-send","content": "{\"toUserId\":1,\"text\":\"你好,已经收到你的消息\"}"
}

四、 总结

这套架构的精妙之处在于 解耦 和 广播

  1. 解耦业务与通信:业务处理器 (DemoWebSocketMessageListener) 只关心处理什么消息、发给谁,完全不知道也不关心集群、网络推送等复杂问题。

  2. 解决集群难题:通过 RabbitMQ 的发布/订阅模式,一条发送指令可以轻松地扩散到集群中的所有节点,由每个节点在本地进行匹配和发送,完美解决了用户连接在不同服务器的问题。

  3. 组件职责清晰

    • JsonWebSocketMessageHandler: 消息路由。

    • XxxWebSocketMessageListener: 业务处理。

    • RabbitMQWebSocketMessageSender: 消息分发(发往 MQ)。

    • AbstractWebSocketMessageSender: 最终投递(本机发送)。

当你需要发送一条 WebSocket 消息时,完整的流程正如你所总结的:
前端发送消息 → JsonWebSocketMessageHandler → DemoWebSocketMessageListener → RabbitMQWebSocketMessageSender (发往MQ) → (MQ广播) → 所有节点的消费者 → AbstractWebSocketMessageSender.doSend → 前端接收。

http://www.dtcms.com/a/403282.html

相关文章:

  • Spring Data JPA 语法详解与使用案例
  • 网站开发面试题天津公司建设网站
  • 个人怎么做音乐网站网页设计与制作教程刘瑞新课后答案
  • noi-9月23日作业
  • 购物网站每个模块主要功能怎么免费搭建网站
  • 如何高效使用Xshell和finalshell连接服务器,简单高效
  • Nestjs框架: 策略的权限控制(ACL)与数据权限实战
  • mmap 虚拟地址映射
  • 网站做好第二年要多少钱wordpress添加文件2m
  • Linux编程笔记2-控制数组指针函数动态内存构造类型Makefile
  • 【数据结构】冒泡、选择、插入、希尔排序的实现
  • npm镜像源配置指南
  • 【QT常用技术讲解】QTreeWidget实现树形筛选框(包含源码)
  • 站长工具ip查询外贸平台实训总结
  • 在JavaScript / HTML中,让<audio>元素中的多个<source>标签连续播放
  • 【Web前端|第二篇】JavaScript对象和事件
  • Linux配置网络————设置虚拟机为静态ip的网络配置详细教程
  • EPGF 架构为什么能保持长效和稳定?
  • reader should realize that all properties of real numbers that are to句子分析
  • ubuntu安装失败:Sorry, there was a problem completing the installation.原因分析及解决办法
  • 深圳商城网站设计价格网站轮播广告
  • AR技术:轨道交通运维与安全保障的革新力量
  • 友元类和友元函数bug
  • Zabbix7.4.8(二):通过http监控Nginx相关指标
  • 厦门 外贸网站访问数据库的网站开发语言
  • Gerkin+Pytest(python)实现自动化(BDD)
  • 动态住宅IP vs. 静态数据中心IP:未来趋势与当前选择
  • 子域名做微信开放平台网站应用芜湖企业100强
  • 很那网站建设做网站哪个公司好
  • Kafka的核心概念