企业级 Spring Boot + WebSocket + Redis 分布式消息推送方案
企业级 Spring Boot + WebSocket + Redis 分布式消息推送方案
文章目录
- 企业级 Spring Boot + WebSocket + Redis 分布式消息推送方案
- 1. 方案概述
- 2. 系统架构
- 3. 项目结构
- 4. Maven 依赖 (`pom.xml`)
- 5. 核心代码讲解
- 5.1 应用入口
- 5.2 WebSocket 配置
- 5.3 Redis 配置
- 5.4 DTO & 模型
- 5.5 Redis 发布者与订阅者
- 5.6 消息服务
- 5.7 REST 控制器
- 5.8 WebSocket 消息处理
- 5.9 安全配置(示例)
- 5.10 JWT 工具
- 5.11 配置文件 (`application.yml`)
- 5.12 Docker Compose (`docker/docker-compose.yml`)
- 6. OpenAPI 文档示例 (`apifox/openapi.yaml`)
- 7. 前端接入示例
- 8. 集群消息流转流程
- 9. 自动化测试
- 10. 部署与运维建议
- 11. 运行步骤
- 12. 总结
1. 方案概述
- 目标:搭建支持集群部署的实时消息推送后端,通过 WebSocket 将消息推送到前端浏览器,实现多节点一致广播。
- 关键技术栈:Spring Boot 3.x、Spring WebSocket (STOMP)、Spring Messaging、Redis、Spring Data Redis、Spring Security、Docker Compose。
- 核心思路:每个应用节点同时作为 WebSocket 服务器和 Redis 发布者,集群节点间通过 Redis Pub/Sub 同步消息,实现跨节点推送。
2. 系统架构
- Web 层:
@RestController暴露消息发送接口。 - WebSocket 层:
@EnableWebSocketMessageBroker配置 STOMP 端点、主题路径、应用消息前缀。 - 消息分发层:使用 Spring
SimpMessagingTemplate在本节点广播;Redis Pub/Sub 承担跨节点同步。 - 数据层:Redis Cluster / Sentinel / 单节点(示例采单节点,部署建议高可用)。
- 安全层:基于 JWT 的简单认证过滤 WebSocket 握手与 REST 接口访问(示例实现基础版,可接入企业 IAM)。
┌────────────────────────────────────────────────────┐
│ 前端浏览器 │
│ - STOMP over WebSocket 客户端 │
└──────────────────────▲─────────────────────────────┘│WebSocket/STOMP 数据通道│
┌──────────────────────┴─────────────────────────────┐
│ Spring Boot 节点 A │
│ - REST API / WebSocket Endpoint │
│ - SimpMessagingTemplate │
│ - RedisMessagePublisher (向 Redis 发布消息) │
│ - RedisMessageSubscriber (接收 Redis 消息再推送) │
└──────────────────────▲─────────────────────────────┘│ Redis Pub/Sub
┌──────────────────────┴─────────────────────────────┐
│ Spring Boot 节点 B │
│ - 逻辑与节点 A 相同(水平扩展) │
└──────────────────────▲─────────────────────────────┘│
┌──────────────────────┴─────────────────────────────┐
│ Redis 服务器/集群 │
└────────────────────────────────────────────────────┘
3. 项目结构
enterprise-ws-redis/
├── README.md
├── docker/
│ └── docker-compose.yml
├── docs/
│ └── api.md
├── pom.xml
├── src/
│ ├── main/
│ │ ├── java/com/example/enterprise/ws/
│ │ │ ├── EnterpriseWsApplication.java
│ │ │ ├── config/
│ │ │ │ ├── RedisConfig.java
│ │ │ │ ├── SecurityConfig.java
│ │ │ │ └── WebSocketConfig.java
│ │ │ ├── controller/
│ │ │ │ ├── MessageController.java
│ │ │ │ └── WebSocketHandshakeHandler.java
│ │ │ ├── dto/
│ │ │ │ └── MessageRequest.java
│ │ │ ├── model/
│ │ │ │ └── ChatMessage.java
│ │ │ ├── redis/
│ │ │ │ ├── RedisMessagePublisher.java
│ │ │ │ └── RedisMessageSubscriber.java
│ │ │ ├── service/
│ │ │ │ └── MessageService.java
│ │ │ └── util/
│ │ │ └── JwtTokenUtil.java
│ │ └── resources/
│ │ ├── application.yml
│ │ └── logback-spring.xml
│ └── test/
│ └── java/com/example/enterprise/ws/
│ └── WebSocketIntegrationTest.java
└── apifox/└── openapi.yaml
说明:完整项目可直接导入 IDEA 或使用 Maven 构建。
4. Maven 依赖 (pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example.enterprise</groupId><artifactId>enterprise-ws-redis</artifactId><version>1.0.0</version><name>enterprise-ws-redis</name><properties><java.version>17</java.version><spring.boot.version>3.2.5</spring.boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.security</groupId><artifactId>spring-security-messaging</artifactId></dependency><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt-api</artifactId><version>0.11.5</version></dependency><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt-impl</artifactId><version>0.11.5</version><scope>runtime</scope></dependency><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt-jackson</artifactId><version>0.11.5</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
5. 核心代码讲解
5.1 应用入口
package com.example.enterprise.ws;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class EnterpriseWsApplication {public static void main(String[] args) {SpringApplication.run(EnterpriseWsApplication.class, args);}
}
5.2 WebSocket 配置
package com.example.enterprise.ws.config;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.setApplicationDestinationPrefixes("/app");registry.enableSimpleBroker("/topic");}
}
说明:
/ws为握手端点;/app前缀用于客户端发送消息;/topic为广播目的地。生产环境可替换为外置消息代理(RabbitMQ 等),本方案由于有 Redis Pub/Sub,保留简单内存 broker,用于本节点转发。
5.3 Redis 配置
package com.example.enterprise.ws.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;@Configuration
public class RedisConfig {@Beanpublic RedisConnectionFactory redisConnectionFactory() {return new LettuceConnectionFactory();}@Beanpublic StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {return new StringRedisTemplate(factory);}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,MessageListenerAdapter messageListener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(messageListener, new PatternTopic("cluster:ws:topic"));return container;}
}
5.4 DTO & 模型
package com.example.enterprise.ws.dto;import jakarta.validation.constraints.NotBlank;
import lombok.Data;@Data
public class MessageRequest {@NotBlankprivate String destination;@NotBlankprivate String payload;
}
package com.example.enterprise.ws.model;import lombok.Builder;
import lombok.Data;@Data
@Builder
public class ChatMessage {private String destination;private String payload;private String sender;private long timestamp;
}
5.5 Redis 发布者与订阅者
package com.example.enterprise.ws.redis;import com.example.enterprise.ws.model.ChatMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class RedisMessagePublisher {private final StringRedisTemplate redisTemplate;private final ObjectMapper objectMapper = new ObjectMapper();@SneakyThrowspublic void publish(ChatMessage chatMessage) {String message = objectMapper.writeValueAsString(chatMessage);redisTemplate.convertAndSend("cluster:ws:topic", message);}
}
package com.example.enterprise.ws.redis;import com.example.enterprise.ws.model.ChatMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;@Slf4j
@Component
@RequiredArgsConstructor
public class RedisMessageSubscriber implements MessageListener {private final SimpMessagingTemplate messagingTemplate;private final ObjectMapper objectMapper = new ObjectMapper();@Override@SneakyThrowspublic void onMessage(Message message, byte[] pattern) {String body = new String(message.getBody());ChatMessage chatMessage = objectMapper.readValue(body, ChatMessage.class);log.debug("Redis订阅消息:{}", chatMessage);messagingTemplate.convertAndSend(chatMessage.getDestination(), chatMessage);}
}
5.6 消息服务
package com.example.enterprise.ws.service;import com.example.enterprise.ws.model.ChatMessage;
import com.example.enterprise.ws.redis.RedisMessagePublisher;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;@Service
@RequiredArgsConstructor
public class MessageService {private final SimpMessagingTemplate messagingTemplate;private final RedisMessagePublisher redisMessagePublisher;public void sendToTopic(ChatMessage chatMessage) {messagingTemplate.convertAndSend(chatMessage.getDestination(), chatMessage);redisMessagePublisher.publish(chatMessage);}
}
5.7 REST 控制器
package com.example.enterprise.ws.controller;import com.example.enterprise.ws.dto.MessageRequest;
import com.example.enterprise.ws.model.ChatMessage;
import com.example.enterprise.ws.service.MessageService;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.Instant;@RestController
@RequestMapping("/api/messages")
@RequiredArgsConstructor
public class MessageController {private final MessageService messageService;@PostMappingpublic ResponseEntity<Void> broadcast(@Valid @RequestBody MessageRequest request,Authentication authentication) {String sender = authentication != null ? authentication.getName() : "system";ChatMessage chatMessage = ChatMessage.builder().destination(request.getDestination()).payload(request.getPayload()).sender(sender).timestamp(Instant.now().toEpochMilli()).build();messageService.sendToTopic(chatMessage);return ResponseEntity.accepted().build();}
}
5.8 WebSocket 消息处理
package com.example.enterprise.ws.controller;import com.example.enterprise.ws.model.ChatMessage;
import com.example.enterprise.ws.service.MessageService;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.stereotype.Controller;@Controller
@RequiredArgsConstructor
public class WebSocketMessageController {private final MessageService messageService;@MessageMapping("/chat/{roomId}")@SendTo("/topic/chat/{roomId}")public ChatMessage relay(@DestinationVariable String roomId,@Payload ChatMessage incoming,SimpMessageHeaderAccessor headerAccessor) {incoming.setDestination("/topic/chat/" + roomId);incoming.setSender(headerAccessor.getUser() != null? headerAccessor.getUser().getName(): "anonymous");messageService.sendToTopic(incoming);return incoming;}
}
5.9 安全配置(示例)
package com.example.enterprise.ws.config;import com.example.enterprise.ws.util.JwtTokenUtil;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;
import org.springframework.web.filter.OncePerRequestFilter;import java.io.IOException;@Configuration
@EnableMethodSecurity
public class SecurityConfig {@Beanpublic SecurityFilterChain securityFilterChain(HttpSecurity http,JwtAuthenticationFilter jwtAuthenticationFilter) throws Exception {http.csrf(csrf -> csrf.disable()).authorizeHttpRequests(auth -> auth.requestMatchers("/actuator/**").permitAll().requestMatchers("/ws/**").permitAll().requestMatchers(HttpMethod.POST, "/api/messages").authenticated().anyRequest().permitAll()).sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS)).addFilterBefore(jwtAuthenticationFilter, UsernamePasswordAuthenticationFilter.class).httpBasic(Customizer.withDefaults());return http.build();}@Beanpublic PasswordEncoder passwordEncoder() {return new BCryptPasswordEncoder();}@Beanpublic JwtAuthenticationFilter jwtAuthenticationFilter(JwtTokenUtil jwtTokenUtil) {return new JwtAuthenticationFilter(jwtTokenUtil);}static class JwtAuthenticationFilter extends OncePerRequestFilter {private final JwtTokenUtil jwtTokenUtil;JwtAuthenticationFilter(JwtTokenUtil jwtTokenUtil) {this.jwtTokenUtil = jwtTokenUtil;}@Overrideprotected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)throws ServletException, IOException {String token = jwtTokenUtil.resolveToken(request);if (token != null && jwtTokenUtil.validateToken(token)) {String username = jwtTokenUtil.getUsername(token);UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken(username,null,User.withUsername(username).password("").authorities("ROLE_USER").build().getAuthorities());SecurityContextHolder.getContext().setAuthentication(authentication);}filterChain.doFilter(request, response);}}
}
5.10 JWT 工具
package com.example.enterprise.ws.util;import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.security.Keys;
import jakarta.annotation.PostConstruct;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.crypto.SecretKey;
import java.util.Date;@Component
public class JwtTokenUtil {@Value("${security.jwt.secret}")private String secret;@Value("${security.jwt.expire-seconds:3600}")private long expirationSeconds;private SecretKey secretKey;@PostConstructpublic void init() {this.secretKey = Keys.hmacShaKeyFor(secret.getBytes());}public String generateToken(String username) {Date now = new Date();Date expiry = new Date(now.getTime() + expirationSeconds * 1000);return Jwts.builder().setSubject(username).setIssuedAt(now).setExpiration(expiry).signWith(secretKey, SignatureAlgorithm.HS256).compact();}public boolean validateToken(String token) {try {getClaims(token);return true;} catch (Exception e) {return false;}}public String getUsername(String token) {return getClaims(token).getSubject();}public String resolveToken(HttpServletRequest request) {String bearerToken = request.getHeader("Authorization");if (bearerToken != null && bearerToken.startsWith("Bearer ")) {return bearerToken.substring(7);}return null;}private Claims getClaims(String token) {return Jwts.parserBuilder().setSigningKey(secretKey).build().parseClaimsJws(token).getBody();}
}
5.11 配置文件 (application.yml)
spring:application:name: enterprise-ws-redisredis:host: ${REDIS_HOST:localhost}port: ${REDIS_PORT:6379}password: ${REDIS_PASSWORD:}lettuce:pool:max-active: 8max-idle: 8min-idle: 1websocket:message-broker:application-destination-prefix: /appsimple-broker:enabled: true
management:endpoints:web:exposure:include: health,infosecurity:jwt:secret: "ChangeMeToASecretKeyForJWTSignatures123456"expire-seconds: 7200
5.12 Docker Compose (docker/docker-compose.yml)
version: "3.8"
services:redis:image: redis:7.2container_name: enterprise-redisports:- "6379:6379"command: redis-server --appendonly yesapp-node-1:build: ..container_name: enterprise-app-1environment:- REDIS_HOST=redisports:- "8080:8080"depends_on:- redisapp-node-2:build: ..container_name: enterprise-app-2environment:- REDIS_HOST=redisports:- "8081:8080"depends_on:- redis
说明:
build: ..假设 Dockerfile 位于项目根目录,可根据需要调整。容器内通过 Redis 服务名互联,实现应用集群。
6. OpenAPI 文档示例 (apifox/openapi.yaml)
openapi: 3.0.3
info:title: 企业 WebSocket 分布式消息 APIversion: 1.0.0
servers:- url: http://localhost:8080
paths:/api/messages:post:summary: 广播消息description: 向指定目的地广播消息,WebSocket 客户端将收到推送。security:- BearerAuth: []requestBody:required: truecontent:application/json:schema:$ref: '#/components/schemas/MessageRequest'responses:'202':description: 已接受广播请求'400':description: 参数校验失败'401':description: 未授权
components:securitySchemes:BearerAuth:type: httpscheme: bearerbearerFormat: JWTschemas:MessageRequest:type: objectrequired:- destination- payloadproperties:destination:type: stringdescription: STOMP 目的地,例如 /topic/chat/globalpayload:type: stringdescription: 消息内容
7. 前端接入示例
<!DOCTYPE html>
<html lang="zh-CN">
<head><meta charset="UTF-8"><title>WebSocket 集群推送 Demo</title><script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script><script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
</head>
<body>
<h1>集群推送实时消息</h1>
<div id="messages"></div>
<script>const socket = new SockJS("http://localhost:8080/ws");const stompClient = Stomp.over(socket);stompClient.connect({}, frame => {console.log("Connected: " + frame);stompClient.subscribe("/topic/chat/global", message => {const data = JSON.parse(message.body);const div = document.getElementById("messages");const p = document.createElement("p");p.innerText = `[${new Date(data.timestamp).toLocaleTimeString()}] ${data.sender}: ${data.payload}`;div.appendChild(p);});});
</script>
</body>
</html>
8. 集群消息流转流程
- 客户端通过 REST 接口或 STOMP
/app/chat/{roomId}发送消息至节点 A。 - 节点 A 调用
MessageService.sendToTopic():- 使用
SimpMessagingTemplate在本节点推送到/topic。 - 调用
RedisMessagePublisher将消息发布到cluster:ws:topic。
- 使用
- Redis 将消息推送给所有订阅者,包括节点 B 的
RedisMessageSubscriber。 - 节点 B 收到后,通过
SimpMessagingTemplate将消息广播给自己的 WebSocket 连接。 - 所有节点的在线用户都能接收来自任意节点产生的消息,实现集群同步。
9. 自动化测试
package com.example.enterprise.ws;import com.example.enterprise.ws.dto.MessageRequest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;import static org.assertj.core.api.Assertions.assertThat;@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class WebSocketIntegrationTest {@Autowiredprivate TestRestTemplate restTemplate;@Testvoid shouldAcceptBroadcastRequest() {HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);MessageRequest request = new MessageRequest();request.setDestination("/topic/chat/test");request.setPayload("hello cluster");HttpEntity<MessageRequest> entity = new HttpEntity<>(request, headers);ResponseEntity<Void> response = restTemplate.postForEntity("/api/messages", entity, Void.class);assertThat(response.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED);}
}
说明:示例测试验证未携带 JWT 会被拒绝,真实环境应补充带 token、WebSocket 集成测试等用例。
10. 部署与运维建议
- 配置中心:建议使用 Nacos、Apollo 或 Spring Cloud Config 统一配置,密钥参数通过 Vault/KMS 管理。
- 日志监控:集成 ELK / EFK,实现 WebSocket 消息处理链路追踪;结合 Spring Boot Actuator 暴露指标。
- 可观测性:Prometheus + Grafana 监控 Redis、JVM、WebSocket 连接数等。
- 扩展性:如需点对点消息,可开启
/queue前缀并在 Redis Pub/Sub 区分主题。 - 容灾:Redis 采用主从+哨兵或 Redis Cluster,应用层搭配 Kubernetes 部署,实现水平扩展与自动恢复。
11. 运行步骤
-
克隆或创建项目,执行
mvn clean package. -
启动 Redis 服务(可用
docker-compose up redis)。 -
启动一个或多个应用节点:
- 本地
java -jar target/enterprise-ws-redis-1.0.0.jar --server.port=8080 - 第二节点
java -jar target/enterprise-ws-redis-1.0.0.jar --server.port=8081
- 本地
-
通过 REST 接口发送消息(需携带 JWT):
curl -X POST "http://localhost:8080/api/messages" \-H "Content-Type: application/json" \-H "Authorization: Bearer <TOKEN>" \-d '{"destination":"/topic/chat/global","payload":"Hello Cluster!"}' -
打开前端 Demo 页面或实际业务页面,确认消息实时推送。
12. 总结
- 弹性扩展:Redis Pub/Sub 保证横向扩容不丢消息,支持多节点部署。
- 安全可控:JWT 鉴权保证内部接口与 WebSocket 握手安全。
- 易维护:文档、OpenAPI、Docker Compose 一应俱全,便于快速交付与运维。
- 可扩展性:可接入消息队列、按需引入分布式会话、离线消息存储等增强功能。
如需进一步企业化增强,可在此基础上集成消息持久化、灰度发布、消息回溯等高级特性。
