当前位置: 首页 > news >正文

springboot+vue3+redis+sse 后端主动推送数据

# 情境
websocket,netty等工具太重,是双向推送,sse单项推送,场景太多了,消息通知,站内信,展示执行任务百分比等等# 后端代码
1、redisconfig配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;@Configuration
public class RedisConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}
}2、sse控制层
@RestController
@RequestMapping("/sse")
public class SSEController {@Resourceprivate StringRedisTemplate stringRedisTemplate;// 添加Redis消息监听容器@Resourceprivate RedisMessageListenerContainer redisMessageListenerContainer;private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();// 添加Redis消息频道常量private static final String CHANNEL_PREFIX = "sse:channel:";private static final String USER_CONNECTION_KEY = "sse:connections";// Redis消息监听方法@PostConstructpublic void initRedisListener() {// 使用PatternTopic来支持通配符订阅
//        redisMessageListenerContainer.addMessageListener(new MessageListener() {
//            @Override
//            public void onMessage(Message message, byte[] pattern) {
//                String channel = new String(message.getChannel());
//                String userId = channel.substring(CHANNEL_PREFIX.length());
//                String messageContent = new String(message.getBody());
//
//                System.out.println("用户[" + userId + "]收到Redis消息: " + messageContent);
//
//                SseEmitter emitter = emitters.get(userId);
//                if (emitter != null) {
//                    try {
//                        emitter.send(SseEmitter.event()
//                                .id(String.valueOf(System.currentTimeMillis()))
//                                .data(messageContent)
//                                .reconnectTime(5000L));
//                        System.out.println("向用户[" + userId + "]本地发送消息成功");
//                    } catch (IOException e) {
//                        System.out.println("向用户[" + userId + "]发送消息失败: " + e.getMessage());
//                        emitters.remove(userId);
//                        stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);
//                    }
//                } else {
//                    System.out.println("用户[" + userId + "]本地连接不存在,无法发送消息");
//                }
//            }
//        }, new PatternTopic(CHANNEL_PREFIX + "*")); // 使用PatternTopic支持通配符// 创建消息监听器适配器MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(message.getChannel());String userId = channel.substring(CHANNEL_PREFIX.length());String messageContent = new String(message.getBody());// 这里需要实现本地SSE连接的管理和消息发送System.out.println("用户[" + userId + "]收到Redis消息: " + messageContent);// 关键:从本地缓存获取 emitter 并发送消息SseEmitter emitter = emitters.get(userId);if (emitter != null) {try {emitter.send(SseEmitter.event().id(String.valueOf(System.currentTimeMillis())).data(messageContent).reconnectTime(5000L));System.out.println("向用户[" + userId + "]本地发送消息成功");} catch (IOException e) {System.out.println("向用户[" + userId + "]发送消息失败: " + e.getMessage());// 处理发送异常(例如连接已断开)emitters.remove(userId);stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);}} else {System.out.println("用户[" + userId + "]本地连接不存在,无法发送消息");}}});// 订阅所有用户的消息频道redisMessageListenerContainer.addMessageListener(listenerAdapter,PatternTopic.of(CHANNEL_PREFIX + "*"));}/*** 创建SSE连接*/@GetMapping(value = "/connect/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter createConnection(@PathVariable String userId) {// 设置0L表示永不超时SseEmitter emitter = new SseEmitter(0L);// 注册回调函数emitter.onCompletion(() -> {System.out.println("[" + userId + "]连接完成");// 从Redis和本地缓存移除连接信息stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);emitters.remove(userId);});emitter.onTimeout(() -> {System.out.println("[" + userId + "]连接超时");stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);emitters.remove(userId);});emitter.onError(throwable -> {System.out.println("[" + userId + "]连接异常: " + throwable.getMessage());stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);emitters.remove(userId);});// 将用户连接信息存储到Redis和本地缓存stringRedisTemplate.opsForSet().add(USER_CONNECTION_KEY, userId);emitters.put(userId, emitter);  // 添加此行,将emitter存入本地缓存System.out.println("用户[" + userId + "]SSE连接创建成功");return emitter;}/*** 向指定用户发送消息*/@PostMapping("/send/{userId}")public boolean sendMessage(@PathVariable String userId, @RequestBody String message) {// 检查用户是否在线Boolean isOnline = stringRedisTemplate.opsForSet().isMember(USER_CONNECTION_KEY, userId);if (Boolean.FALSE.equals(isOnline)) {System.out.println("用户[" + userId + "]未建立连接");return false;}try {// 通过Redis发布消息,所有节点都会收到String channel = CHANNEL_PREFIX + userId;stringRedisTemplate.convertAndSend(channel, message);System.out.println("向用户[" + userId + "]发送消息成功: " + message);return true;} catch (Exception e) {System.out.println("向用户[" + userId + "]发送消息失败: " + e.getMessage());return false;}}/*** 主动关闭连接*/@GetMapping("/close/{userId}")public void closeConnection(@PathVariable String userId) {SseEmitter emitter = emitters.get(userId);if (emitter != null) {emitter.complete();emitters.remove(userId);stringRedisTemplate.opsForSet().remove(USER_CONNECTION_KEY, userId);System.out.println("用户[" + userId + "]连接已关闭");}}
}3、sse.ts
import request from '@/config/axios'
import { fetchEventSource } from '@microsoft/fetch-event-source'
import { getAccessToken } from '@/utils/auth'
import { config } from '@/config/axios/config'export const SSEApi = {// 创建SSE连接createSseConnection : async (userId: number, onMessage) => {const token = getAccessToken()return fetchEventSource(`${config.base_url}/sse/connect/` + userId, {method: 'get',headers: {'Content-Type': 'application/json',Authorization: `Bearer ${token}`},onmessage: onMessage})},// 向指定用户发送消息sendMessageToUser : (userId: number, message: string) => {return request.post({ url: '/sse/send/' + userId, data: message })},// 关闭SSE连接closeSseConnection : (userId: number) => {return request.get({ url: '/sse/close/' + userId })}}4、业务调用代码
// 222给111发送消息
const sendMessage = () => {await SSEApi.sendMessageToUser(111, 'test')
}// 接收sse消息
const sseMessage = async (message) => {console.log(message.data)alert(message.data)
}onMounted(async () => {// 创建SSE连接await SSEApi.createSseConnection(222, sseMessage)
})// 组件卸载时恢复
onUnmounted(async () => {// 关闭SSE连接await SSEApi.closeSseConnection(222)
})

上图示意

# 解释
前端用户管理员发布消息,通过高可用的任意一个后台服务,后台服务接收到消息后将消息发送到redis中,此时redis广播给所有订阅的后台服务,由于前端用户和后台服务存在长链接,所以当后台服务收到消息后,会被推送到前端的onmessage监听中
http://www.dtcms.com/a/573450.html

相关文章:

  • 潍坊网站优化培训四川在线
  • Azure AI服务成本分析指南
  • OceanBase数据库SQL调优
  • ④使用 PPTSYNC.exe 与华为手机拍照制作 GIF 动画
  • OceanBase all-in-one 4.2.0.0 安装教程(CentOS 7/EL7 一键部署详细步骤)​
  • 环保网站建设方案wordpress前端后端
  • 如何给网站备案山东网站搭建有限公司
  • veereal将在中国推广微型led技术
  • 对话 NuttX 创始人Gregory Nutt——openvela 与 NuttX 的 “双向奔赴” 新范式
  • 河北省地图谷歌seo网站优化
  • 湘潭知名网站建设网站静态页面生成
  • 从EMS看分布式能源发展:挑战与机遇并存
  • Java接入飞书发送通知消息
  • Vue.js 基础教程:从入门到实践
  • .net实现秒杀商品(Redis高并发)
  • 解决phpstudy 8.x软件中php8.2.9没有redis扩展的问题
  • 【MCP系列】飞书MCP使用
  • 阜新网站设计淮北市矿务局工程建设公司网站
  • 攻克维吾尔语识别的技术实践(多语言智能识别系统)
  • [Windows] 漫画翻译工具Saber Translator2.5.1
  • 手术机器人智能控制系统基本课时项目化课件(2025.08.25)
  • NATS安装与配置完全指南
  • 开发网站如何选需要注意什么汉川网页设计
  • seo根据什么具体优化想做个卷帘门百度优化网站
  • Rust 练习册 7:高阶生命周期与高阶 trait 限定
  • Linux服务器通过密钥登录服务器
  • 网站开发2008家纺外发加工订单网
  • 广州地铁站路线图广告设计师证怎么考
  • 【拾遗补漏】.NET 常见术语集
  • 从零开发一个简单的Web爬虫(使用Requests和BeautifulSoup)