芋道源码 - RabbitMQ + WebSocket 实现分布式消息推送
一、 背景与架构目标
在单机应用中,WebSocket 连接和消息发送都在同一台服务器内完成,实现简单。但在集群环境下,用户可能连接到不同的服务器节点。当需要向某个用户或全体用户广播消息时,问题就出现了:如何让消息准确地送达所有可能连接着目标用户的服务器节点?
解决方案:引入消息队列(如 RabbitMQ)进行解耦。
核心架构图:
[消息发送者] --(1) 发送消息至 RabbitMQ Exchange--> [RabbitMQ]|| (2) 消息路由到所有节点v[WS 节点 A] <--(3) 消费消息,推送至本地 Session-- [Queue A][WS 节点 B] <--(3) 消费消息,推送至本地 Session-- [Queue B][WS 节点 ...] ...
工作流程:
-
某台服务器收到一个需要广播的 WebSocket 消息。
-
该服务器不直接发送,而是将消息发送到 RabbitMQ 的一个 主题交换机(Topic Exchange)。
-
RabbitMQ 将消息广播到所有监听该交换机的 消息队列(Queue)(通常每个WS服务器节点一个队列)。
-
集群中的 每一台 WebSocket 服务器都会消费到这条消息。
-
每台服务器在消费消息后,在本地查询是否存在目标 Session,如果存在,则通过本地连接将消息推送给前端。
这样,无论用户连接在哪台服务器,消息都能准确送达。
二、 核心代码与执行流程分析
以下代码基于 芋道源码 的项目结构,清晰地展示了这一流程。
1. 建立连接与认证
当前端(例如 ws://localhost:48080/infra/ws?token=xxx
)发起 WebSocket 连接时:
1.1 Token 认证 (TokenAuthenticationFilter
)
/*** Token 过滤器,验证 token 的有效性*/
public class TokenAuthenticationFilter extends OncePerRequestFilter {@Overrideprotected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException {String token = SecurityFrameworkUtils.obtainAuthorization(request, ...);if (StrUtil.isNotEmpty(token)) {Integer userType = WebFrameworkUtils.getLoginUserType(request);try {// 1.1 基于 token 构建登录用户LoginUser loginUser = buildLoginUserByToken(token, userType);// ... (模拟登录等逻辑)// 2. 设置当前用户到 Security 上下文if (loginUser != null) {SecurityFrameworkUtils.setLoginUser(loginUser, request);}} catch (Throwable ex) {// ... 异常处理}}chain.doFilter(request, response);}
}
作用:在 HTTP 握手前,拦截请求,解析 token
参数,并将认证成功的用户信息存入 Security 上下文,为后续 WebSocket 握手拦截器提供用户信息。
1.2 握手拦截器 (LoginUserHandshakeInterceptor
)
/*** 登录用户的握手拦截器*/
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {// 从 Security 上下文中获取登录用户LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();if (loginUser != null) {// 将用户信息存入 WebSocket Session 的属性中WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);}return true; // 返回 true 允许握手}// ... afterHandshake 方法
}
作用:在 WebSocket 握手阶段,将 TokenAuthenticationFilter
中设置的 LoginUser
信息,从 HTTP 请求“传递”到 WebSocket Session 的属性中,方便后续业务处理时获取用户身份。
2. 接收前端消息并路由到监听器
连接建立后,前端发送消息,例如:
{"type": "demo-message-send","content": "{\"toUserId\":100,\"text\":\"你好,单聊消息\"}"
}
2.1 消息处理器 (JsonWebSocketMessageHandler
)
/*** JSON 格式 WebSocketHandler 实现类*/
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {// 监听器映射:type -> WebSocketMessageListenerprivate final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();// 构造函数:项目启动时,将所有监听器注册到 Map 中public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {listenersList.forEach(listener -> listeners.put(listener.getType(), listener));}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {// ... 解析消息,处理 ping/pong ...// 2.1 解析消息JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);// 2.2 根据 type 获得对应的 WebSocketMessageListenerWebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());// 2.3 处理消息:反序列化 content,并调用监听器的 onMessage 方法Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);messageListener.onMessage(session, messageObj);}
}
作用:
-
启动时注册:通过构造函数,将所有实现了
WebSocketMessageListener
的 Bean 收集起来,以其getType()
返回值作为 key,构建一个监听器映射。这就是为什么项目启动时会调用getType
方法。 -
消息路由:当收到前端消息时,根据消息体中的
type
字段,从 Map 中找到对应的监听器进行处理。
2.2 业务监听器 (DemoWebSocketMessageListener
)
/*** WebSocket 示例:单发消息*/
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {@Resourceprivate WebSocketMessageSender webSocketMessageSender;@Overridepublic void onMessage(WebSocketSession session, DemoSendMessage message) {Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);// 情况一:单发if (message.getToUserId() != null) {DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId).setText(message.getText()).setSingle(true);// 关键:不直接发送,而是调用 SenderwebSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), "demo-message-receive", toMessage);return;}// 情况二:群发// ... 类似,调用 sendObject 进行群发}@Overridepublic String getType() {return "demo-message-send"; // 声明处理 type 为 ‘demo-message-send’ 的消息}
}
作用:执行业务逻辑(如组装消息体),并决定消息的接收者(单个用户或全体用户)。它不负责真正的网络发送,而是委托给 WebSocketMessageSender
。
3. 通过 RabbitMQ 广播发送任务
3.1 RabbitMQ 发送者 (RabbitMQWebSocketMessageSender
)
/*** 基于 RabbitMQ 的 WebSocketMessageSender 实现类*/
@Slf4j
public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender {private final RabbitTemplate rabbitTemplate;private final TopicExchange topicExchange;@Overridepublic void send(Integer userType, Long userId, String messageType, String messageContent) {sendRabbitMQMessage(null, userId, userType, messageType, messageContent);}private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType, String messageType, String messageContent) {// 构建 MQ 消息体,包含目标信息和消息内容RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage().setSessionId(sessionId).setUserId(userId).setUserType(userType).setMessageType(messageType).setMessageContent(messageContent);// 将消息发送到 RabbitMQ 交换机rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage);}
}
作用:将发送消息的请求(包含接收者信息和消息内容)封装成一个 MQ 消息,发送到 RabbitMQ 的交换机。至此,第一台服务器的任务就完成了,剩下的由 RabbitMQ 和集群中的其他服务器共同完成。
4. 消费 MQ 消息并最终推送
(此部分需要有一个 MQ 消费者 代码,你的示例中未包含,但逻辑如下:)
消费者会监听对应的队列,收到消息后,会调用 AbstractWebSocketMessageSender
的 doSend
方法。
4.1 最终发送者 (AbstractWebSocketMessageSender
)
/*** WebSocketMessageSender 实现类*/
@Slf4j
public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender {private final WebSocketSessionManager sessionManager;public void doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent) {// 1. 将消息类型和内容封装成前端约定的格式JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent);String payload = JsonUtils.toJsonString(message);// 2. 遍历本机上的目标 Session,进行发送sessions.forEach(session -> {try {if (session.isOpen()) {session.sendMessage(new TextMessage(payload));}} catch (IOException ex) {log.error("[doSend] 发送消息失败", ex);}});}
}
作用:这是整个流程的终点。它在每一台消费到 MQ 消息的服务器上执行。它根据 MQ 消息中的目标信息(如 userType
和 userId
),查询本机的 WebSocketSessionManager
,找到具体的 WebSocket 连接,然后将消息通过网络推送给前端。
三、 聊天示例
用户1给用户100发送消息:
ws://localhost:48080/infra/ws?token=09b79f5236dc4548843848e23a897e55
{"type": "demo-message-send","content": "{\"toUserId\":100,\"text\":\"你好,单聊消息\"}"
}
用户100给用户1回复消息:
ws://localhost:48080/infra/ws?token=d9734aef242d496ea404051279470fae
{"type": "demo-message-send","content": "{\"toUserId\":1,\"text\":\"你好,已经收到你的消息\"}"
}
四、 总结
这套架构的精妙之处在于 解耦 和 广播:
-
解耦业务与通信:业务处理器 (
DemoWebSocketMessageListener
) 只关心处理什么消息、发给谁,完全不知道也不关心集群、网络推送等复杂问题。 -
解决集群难题:通过 RabbitMQ 的发布/订阅模式,一条发送指令可以轻松地扩散到集群中的所有节点,由每个节点在本地进行匹配和发送,完美解决了用户连接在不同服务器的问题。
-
组件职责清晰:
-
JsonWebSocketMessageHandler
: 消息路由。 -
XxxWebSocketMessageListener
: 业务处理。 -
RabbitMQWebSocketMessageSender
: 消息分发(发往 MQ)。 -
AbstractWebSocketMessageSender
: 最终投递(本机发送)。
-
当你需要发送一条 WebSocket 消息时,完整的流程正如你所总结的:
前端发送消息 → JsonWebSocketMessageHandler
→ DemoWebSocketMessageListener
→ RabbitMQWebSocketMessageSender
(发往MQ) → (MQ广播) → 所有节点的消费者 → AbstractWebSocketMessageSender.doSend
→ 前端接收。