springboot+vue3+redis+sse 后端主动推送数据
# 情境
websocket,netty等工具太重,是双向推送,sse单项推送,场景太多了,消息通知,站内信,展示执行任务百分比等等# 后端代码
1、redisconfig配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;@Configuration
public class RedisConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}2、sse控制层
@RestController
@RequestMapping("/sse")
public class SSEController {@Resourceprivate StringRedisTemplate stringRedisTemplate;// 添加Redis消息监听容器@Resourceprivate RedisMessageListenerContainer redisMessageListenerContainer;private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();// 添加Redis消息频道常量private static final String CHANNEL_PREFIX = "sse:channel:";private static final String USER_CONNECTION_KEY = "sse:connections";// Redis消息监听方法@PostConstructpublic void initRedisListener() {// 使用PatternTopic来支持通配符订阅
// redisMessageListenerContainer.addMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// String channel = new String(message.getChannel());
// String userId = channel.substring(CHANNEL_PREFIX.length());
// String messageContent = new String(message.getBody());
//
// System.out.println("用户[" + userId + "]收到Redis消息: " + messageContent);
//
// SseEmitter emitter = emitters.get(userId);
// if (emitter != null) {
// try {
// emitter.send(SseEmitter.event()
// .id(String.valueOf(System.currentTimeMillis()))
// .data(messageContent)
// .reconnectTime(5000L));
// System.out.println("向用户[" + userId + "]本地发送消息成功");
// } catch (IOException e) {
// System.out.println("向用户[" + userId + "]发送消息失败: " + e.getMessage());
// emitters.remove(userId);
// stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);
// }
// } else {
// System.out.println("用户[" + userId + "]本地连接不存在,无法发送消息");
// }
// }
// }, new PatternTopic(CHANNEL_PREFIX + "*")); // 使用PatternTopic支持通配符// 创建消息监听器适配器MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(message.getChannel());String userId = channel.substring(CHANNEL_PREFIX.length());String messageContent = new String(message.getBody());// 这里需要实现本地SSE连接的管理和消息发送System.out.println("用户[" + userId + "]收到Redis消息: " + messageContent);// 关键:从本地缓存获取 emitter 并发送消息SseEmitter emitter = emitters.get(userId);if (emitter != null) {try {emitter.send(SseEmitter.event().id(String.valueOf(System.currentTimeMillis())).data(messageContent).reconnectTime(5000L));System.out.println("向用户[" + userId + "]本地发送消息成功");} catch (IOException e) {System.out.println("向用户[" + userId + "]发送消息失败: " + e.getMessage());// 处理发送异常(例如连接已断开)emitters.remove(userId);stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);}} else {System.out.println("用户[" + userId + "]本地连接不存在,无法发送消息");}}});// 订阅所有用户的消息频道redisMessageListenerContainer.addMessageListener(listenerAdapter,PatternTopic.of(CHANNEL_PREFIX + "*"));}/*** 创建SSE连接*/@GetMapping(value = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter createConnection(@PathVariable String userId) {// 设置0L表示永不超时SseEmitter emitter = new SseEmitter(0L);// 注册回调函数emitter.onCompletion(() -> {System.out.println("[" + userId + "]连接完成");// 从Redis和本地缓存移除连接信息stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);emitters.remove(userId);});emitter.onTimeout(() -> {System.out.println("[" + userId + "]连接超时");stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);emitters.remove(userId);});emitter.onError(throwable -> {System.out.println("[" + userId + "]连接异常: " + throwable.getMessage());stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);emitters.remove(userId);});// 将用户连接信息存储到Redis和本地缓存stringRedisTemplate.opsForSet().add(USER_CONNECTION_KEY, userId);emitters.put(userId, emitter); // 添加此行,将emitter存入本地缓存System.out.println("用户[" + userId + "]SSE连接创建成功");return emitter;}/*** 向指定用户发送消息*/@PostMapping("/send/{userId}")public boolean sendMessage(@PathVariable String userId, @RequestBody String message) {// 检查用户是否在线Boolean isOnline = stringRedisTemplate.opsForSet().isMember(USER_CONNECTION_KEY, userId);if (Boolean.FALSE.equals(isOnline)) {System.out.println("用户[" + userId + "]未建立连接");return false;}try {// 通过Redis发布消息,所有节点都会收到String channel = CHANNEL_PREFIX + userId;stringRedisTemplate.convertAndSend(channel, message);System.out.println("向用户[" + userId + "]发送消息成功: " + message);return true;} catch (Exception e) {System.out.println("向用户[" + userId + "]发送消息失败: " + e.getMessage());return false;}}/*** 主动关闭连接*/@GetMapping("/close/{userId}")public void closeConnection(@PathVariable String userId) {SseEmitter emitter = emitters.get(userId);if (emitter != null) {emitter.complete();emitters.remove(userId);stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);System.out.println("用户[" + userId + "]连接已关闭");}}
}3、sse.ts
import request from '@/config/axios'
import { fetchEventSource } from '@microsoft/fetch-event-source'
import { getAccessToken } from '@/utils/auth'
import { config } from '@/config/axios/config'export const SSEApi = {// 创建SSE连接createSseConnection : async (userId: number, onMessage) => {const token = getAccessToken()return fetchEventSource(`${config.base_url}/sse/connect/` + userId, {method: 'get',headers: {'Content-Type': 'application/json',Authorization: `Bearer ${token}`},onmessage: onMessage})},// 向指定用户发送消息sendMessageToUser : (userId: number, message: string) => {return request.post({ url: '/sse/send/' + userId, data: message })},// 关闭SSE连接closeSseConnection : (userId: number) => {return request.get({ url: '/sse/close/' + userId })}}4、业务调用代码
// 222给111发送消息
const sendMessage = () => {await SSEApi.sendMessageToUser(111, 'test')
}// 接收sse消息
const sseMessage = async (message) => {console.log(message.data)alert(message.data)
}onMounted(async () => {// 创建SSE连接await SSEApi.createSseConnection(222, sseMessage)
})// 组件卸载时恢复
onUnmounted(async () => {// 关闭SSE连接await SSEApi.closeSseConnection(222)
})上图示意

# 解释
前端用户管理员发布消息,通过高可用的任意一个后台服务,后台服务接收到消息后将消息发送到redis中,此时redis广播给所有订阅的后台服务,由于前端用户和后台服务存在长链接,所以当后台服务收到消息后,会被推送到前端的onmessage监听中