当前位置: 首页 > news >正文

Websocket+Redis实现微服务消息实时同步

目录

一、需求概述

二、功能设计

2.1 Vue技术栈介绍

2.1.1 STOMP 协议

2.1.2 webstomp-client插件

2.1.3 sockjs-client插件

2.2 Java技术栈介绍

2.2.1 Redis的发布/订阅模式

三、功能实现

3.1 Java实现

3.2 Vue实现

3.3 Nginx配置websocket代理

3.4 Websocket实现跨服务消息推送

3.4.1 商城端发布消息

3.4.2 供应商、运营端订阅消息


一、需求概述

商城用户订单留言或者意见反馈时,消息可以实时提醒给后台员。

二、功能设计

技术选型:

  1. 前端:Vue2 + WebSocket (websocket 插件)
  2. 后端:Spring WebSocket (基于 STOMP 协议) + Redis发布订阅模式

2.1 Vue技术栈介绍

2.1.1 STOMP 协议

STOMP (Simple Text-Oriented Messaging Protocol) 是一种简单、基于文本的通信协议,专为消息传递设计,常用于 WebSocket 之上。它提供了一种标准化的消息格式和通信模式,适合实时应用场景,如聊天、通知和实时数据推送。

1.主要特点

1、简单性:STOMP 基于文本,易于阅读和调试,消息格式类似于 HTTP 请求/响应。

2、帧结构:消息以“帧”(Frame)为单位,每帧包含命令、头部和正文。例如:

SEND
destination:/topic/messages
content-type:text/plain
Hello, this is a message!
  • 命令:如 SEND(发送消息)、SUBSCRIBE(订阅)、MESSAGE(接收消息)。
  • 头部:键值对,定义消息元数据,如 destination 指定消息通道。
  • 正文:实际消息内容。

3、发布-订阅模型:支持客户端订阅特定通道(destination),接收服务器推送的消息。

4、跨平台:STOMP 不依赖特定编程语言,支持 Java、JavaScript、Python 等多种客户端。

5、与 WebSocket 集成:STOMP 通常运行在 WebSocket 之上,利用 WebSocket 的全双工通信能力,实现低延迟实时消息传递。

6、与 Spring Boot 等框架集成良好,通过 @EnableWebSocketMessageBroker 可快速配置。

2.工作原理

1、客户端通过 WebSocket 连接到服务器的 STOMP 端点。

2、客户端发送 SUBSCRIBE 帧订阅某个通道(如 /topic/messages)。

3、服务器通过 MESSAGE 帧向订阅该通道的客户端推送消息。

4、客户端可通过 SEND 帧向特定通道发送消息,服务器处理后广播。

2.1.2 webstomp-client插件

1.介绍

webstomp-client 是一个 JavaScript 库,用于在浏览器或 Node.js 环境中通过 WebSocket 或类似协议实现 STOMP(Simple Text Oriented Messaging Protocol)协议的客户端功能。STOMP 是一种基于文本的轻量级消息协议,广泛用于消息队列和实时通信。

2.主要特点

1、STOMP 协议支持:提供 STOMP 帧的发送和接收,支持订阅(SUBSCRIBE)、发送(SEND)、连接(CONNECT)等命令。

2、WebSocket 封装:基于 WebSocket 或其他传输协议(如 SockJS),在 WebSocket 之上实现 STOMP 协议。

3、简单 API:

connect(headers, connectCallback, errorCallback):建立 STOMP 连接。
subscribe(destination, callback):订阅消息主题。
send(destination, headers, body):发送消息。
disconnect(callback):断开连接。

4、心跳检测:支持心跳机制(heartbeatIncoming 和 heartbeatOutgoing),确保连接活跃。

5、调试支持:通过 debug 属性输出详细日志,便于调试。

6、轻量级:专注于 STOMP 协议,体积小,适合前端集成。

2.1.3 sockjs-client插件

1.介绍

sockjs-client 是一个 JavaScript 库,提供 WebSocket 的浏览器端实现,并支持回退机制以兼容不支持 WebSocket 的环境。它通过 HTTP 协议建立连接,优先尝试 WebSocket,若失败则回退到其他传输方式(如 XHR 流、长轮询)。

2.主要特点

1、WebSocket 兼容性:优先使用原生 WebSocket(ws://),若不可用,自动回退到 XHR 流、JSONP 或长轮询。

2、HTTP 协议入口:通过 HTTP 端点(如 http://localhost:8081/ws)发起连接,适合与后端 SockJS 服务配合。

3、事件驱动:

onopen:连接建立时触发。
onmessage:收到消息时触发。
onerror:发生错误时触发。
onclose:连接关闭时触发。

4、跨浏览器支持:支持旧版浏览器(如 IE8+),通过回退机制确保兼容性。

5、与 STOMP 集成:常与 webstomp-client 结合,提供 STOMP 协议的传输层。

6、轻量级:专注于实时通信,API 简单。

3.Vue SockJS → Java WebSocket链路

  • webstomp-client:提供 STOMP 协议支持,用于订阅和处理消息。
  • sockjs-client:提供 WebSocket 连接和回退机制,通过 http://ip:port/ws 建立连接。
  • 如果浏览器都是支持 WebSocket 的,可以不用 .withSockJS(),这样可以直接用
  • ws://127.0.0.1:8081/ws。但如果还要支持 IE 之类的老浏览器就不适合。

2.2 Java技术栈介绍

后端需要考虑消息如何存储、消息存活问题、消息的稳定性、是否影响服务性能登问题。

2.2.1 Redis的发布/订阅模式

在 Redis 中,通过 PUBLISH 命令或 redisTemplate.convertAndSend 发布到频道的消息(Pub/Sub 机制)不会被持久化存储,因此消息的存活时间取决于以下几个因素:

1. Redis Pub/Sub 消息的特性

1、瞬时性:Redis 的 Pub/Sub 机制是基于发布/订阅模式的,消息发布到频道后,会立即发送给所有订阅该频道的客户端。如果没有订阅者在线,消息会立即被丢弃,不会存储在 Redis 中。

2、存活时间:消息的“存活时间”实际上是瞬时的,仅在发布时传递给当前订阅者。一旦传递完成,消息就不再存在于 Redis 中。

3、无持久化:Redis Pub/Sub 不提供消息的持久化存储功能,频道本身不保存历史消息。

2. 影响消息“存活”的因素

1、订阅者状态:如果有客户端订阅了频道(例如通过 SUBSCRIBE message-channel 或 RedisMessageListenerContainer),消息会实时传递给这些订阅者。如果没有订阅者,消息会立即被丢弃,不会存储。

2、网络延迟:消息传递受网络延迟影响,但通常在毫秒级别完成。

3、Redis 配置:默认情况下,Redis 不对 Pub/Sub 消息进行持久化存储,除非使用额外的机制(如自定义存储)。

3.如何延长消息存活时间

1、代替 Pub/Sub,使用 Redis 列表(LPUSH/RPOP)或 Redis Streams(XADD/XREAD)存储消息。这些数据结构支持持久化,消息会保存直到被显式删除或过期。

  • 示例(使用列表)

redisTemplate.opsForList().leftPush("message-queue", notice);

  • 接收端:

Notice notice = (Notice) redisTemplate.opsForList().rightPop("message-queue");

Redis Streams 更适合高吞吐量场景,支持消费者组和消息确认。

2、使用 Redis AOF/RDB 持久化:Redis 的 AOF(追加文件)或 RDB(快照)持久化仅对键值数据有效,对 Pub/Sub 消息无效。如果需要持久化,必须结合其他数据结构(如列表或流)。

4.总结

1、默认行为:通过 redisTemplate.convertAndSend("message-channel", notice) 发布的消息是瞬时的,仅在发布时传递给订阅者,不会存储在 Redis 中,存活时间接近于 0 秒(仅受网络延迟影响)。

2、无订阅者:如果没有客户端订阅 message-channel,消息会立即被丢弃。

3、延长存活时间:若需要消息持久化或延迟处理,建议使用 Redis 列表、流或数据库存储,并可设置 TTL过期时间。如果消息仅实时传递,当前 Pub/Sub 机制已足够,无需额外存储。

三、功能实现

3.1 Java实现

1.添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2.配置 WebSocket

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {// 启用简单消息代理,订阅路径以 /topic 开头config.enableSimpleBroker("/topic");// 客户端发送消息的路径前缀config.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注册 WebSocket 端点,客户端通过此端点连接registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();}
}

3.推送消息

@Service
public class MessageService {@Autowiredprivate SimpMessagingTemplate messagingTemplate;public void saveAndNotify(Message message) {// 保存到数据库。。。// 通过 WebSocket 推送消息到 /topic/messagesmessagingTemplate.convertAndSend("/topic/messages", message);}
}

消息体

@Schema(description = "消息通知")
@Data
public class Message implements Serializable{@Schema(name = "主键")private Integer id;@Schema(name = "发送人ID")private String senderId;@Schema(name = "发送人姓名")private String senderName;@Schema(name = "接收人ID")private String receiverId;@Schema(name = "接收人姓名")private String receiverName;@Schema(name = "消息内容")private String content;@Schema(name = "消息类型")private Integer type;}

3.2 Vue实现

1.依赖安装

npm cache clean --force 清除npm缓存并尝试重新安包
npm install sockjs-client@1.6.1 webstomp-client@1.2.6 安装sockjs与webstomp依赖
npm list sockjs-client webstomp-client 验证sockjs与webstomp依赖是否安装成功
npm uninstall sockjs-client webstomp-client 卸载sockjs与webstomp依赖

2.请求代理配置

在 vue.config.js 中配置 WebSocket请求代理:

proxy: {//访问 ip:port/ws,Vue devServer 会代理到 VUE_APP_BASE_URL/ws,绕过 CORS。'/ws': {target: getBaseUrl(),  // 使用处理后的URLws: true,changeOrigin: true,onProxyReqWs(proxyReq, req, socket) {// 添加错误处理防止代理崩溃socket.on('error', (err) => {console.warn('WebSocket 代理连接错误:', err.message); // 不要抛出错误,防止进程退出});},onError(err, req, socket) {console.warn('WebSocket 代理错误:', err.message);if (socket && socket.writable) {socket.end();}}}
}

3.订阅消息

在App.vue主组件中订阅 WebSocket 消息:

<template><div id="app"><router-view /><!--  消息提示弹框  --><notice ref="notice" /></div>
</template><script>
import Notice from './components/Messaging/Notice.vue';
import SockJS from 'sockjs-client';
import Stomp from 'webstomp-client';
import { mapGetters } from 'vuex';
export default {name: 'App',//初始版本导出只有name属性,如果禁用websocket,把以下代码全部注释data() {return {stompClient: null,connected: false,};},components: { Notice },computed: {...mapGetters(['user']),},mounted() {this.initWebSocket();},methods: {initWebSocket() {if (typeof window === 'undefined') return;try {// 清理现有连接if (this.stompClient) {this.stompClient.disconnect();this.stompClient = null;}// 建立连接const socket = new SockJS("/ws");socket.onerror = (e) => {console.warn('WebSocket 连接失败(不再重连)', e);this.connected = false;this.stompClient = null;};//用 webstomp-client 封装this.stompClient = Stomp.over(socket);this.stompClient.debug = () => {};this.stompClient.connect({}, (frame) => {console.log('WebSocket 连接成功');this.connected = true;this.subscribeToChannels();},(error) => {console.warn('STOMP 连接失败', error);this.connected = false;this.stompClient = null;});socket.onclose = () => {console.warn('WebSocket 连接关闭');this.connected = false;this.stompClient = null;};} catch (err) {console.error('WebSocket 初始化失败', err);this.connected = false;this.stompClient = null;}},subscribeToChannels() {if (!this.stompClient || !this.connected) return;this.stompClient.subscribe('/topic/messages', message => {try {const msg = JSON.parse(message.body);console.log('sellerId:'+this.user.sellerUser.sellerId)//消息类型,1-留言,2-意见反馈,3-通知公告if (String(this.user.sellerUser.sellerId) !== msg.receiverId) {return;}if (this.$refs.notice) this.$refs.notice.showMessage(msg);console.log("收到消息"+message.body)} catch (e) {console.error('消息解析错误:', e);}});}},beforeDestroy() {if (this.stompClient && this.connected) {this.stompClient.disconnect(() => {console.log('WebSocket 已断开');});}}
};
</script>

权限框架,如shiro需要把/ws/** 和 /ws/info 设置为 anon,允许匿名访问,防止 Shiro 拦截 WebSocket 连接。

4.消息弹框实现

消息提示框子组件Notice.vue:

<template><transition name="notification"><div class="notification" v-if="showNotification" @mouseenter="pauseAutoClose" @mouseleave="resumeAutoClose"><div class="notification-content"><!-- 标题:动态显示消息类型 --><p class="notification-title"><strong>消息通知 - {{ messageTypeText }}</strong></p><!-- 消息内容 -->
<!--        <p class="notification-item">订单ID: {{ message.orderSn || '无' }}</p><p class="notification-item">买家ID: {{ message.userId || '无' }}</p>--><p class="notification-item">{{ message.content || '暂无内容' }}</p><!-- 关闭按钮 --><button class="notification-button" @click="closeNotification">关闭</button></div></div></transition>
</template><script>
export default {data() {return {showNotification: false,message: {},autoCloseTimer: null, // 存储自动关闭定时器isPaused: false, // 标记是否暂停自动关闭notificationSound: new Audio('/static/sounds/notification.mp3'), // 预加载音频};},computed: {// 根据消息类型动态生成标题messageTypeText() {const typeMap = {1: '留言',2: '意见反馈',3: '通知公告',};return typeMap[this.message.type] || '未知类型';},},methods: {// 显示消息并播放声音showMessage(message) {// 防止重复显示时定时器冲突this.clearAutoClose();this.message = message;this.showNotification = true;// 播放提示音this.playNotificationSound();// 设置10秒自动关闭this.setAutoClose();},// 播放通知声音playNotificationSound() {this.notificationSound.play().catch(error => {console.error('音频播放失败:', error);// 可选:显示错误提示this.$message.error('无法播放通知音,请检查浏览器设置');});},// 设置自动关闭定时器setAutoClose() {if (!this.isPaused) {this.autoCloseTimer = setTimeout(() => {this.showNotification = false;}, 3000);}},// 清除自动关闭定时器clearAutoClose() {if (this.autoCloseTimer) {clearTimeout(this.autoCloseTimer);this.autoCloseTimer = null;}},// 鼠标悬停时暂停自动关闭pauseAutoClose() {this.isPaused = true;this.clearAutoClose();},// 鼠标离开时恢复自动关闭resumeAutoClose() {this.isPaused = false;this.setAutoClose();},// 手动关闭提示框closeNotification() {this.clearAutoClose();this.showNotification = false;},},beforeDestroy() {// 组件销毁前清理定时器this.clearAutoClose();// 停止音频播放this.notificationSound.pause();this.notificationSound.currentTime = 0;},
};
</script><style scoped>
.notification {position: fixed;bottom: 20px;right: 20px;background: linear-gradient(135deg, #1e3a8a, #3b82f6); /* 科技蓝渐变背景 */border-radius: 8px;padding: 20px;box-shadow: 0 4px 12px rgba(0, 0, 0, 0.2);z-index: 1000;color: #ffffff; /* 白色文字 */max-width: 320px;min-width: 250px;animation: slideIn 0.3s ease-out; /* 入场动画 */cursor: default; /* 鼠标悬停时显示默认光标 */
}/* 内容容器 */
.notification-content {display: flex;flex-direction: column;gap: 10px;
}/* 标题样式 */
.notification-title {font-size: 16px;font-weight: 600;margin: 0 0 10px 0;border-bottom: 1px solid rgba(255, 255, 255, 0.2); /* 标题下划线 */padding-bottom: 8px;
}/* 消息项样式 */
.notification-item {font-size: 14px;margin: 0;line-height: 1.5;word-break: break-word; /* 防止长内容溢出 */
}/* 关闭按钮 */
.notification-button {align-self: flex-end;background-color: #ffffff;color: #1e3a8a; /* 按钮文字颜色与背景协调 */border: none;border-radius: 4px;padding: 8px 16px;cursor: pointer;font-size: 14px;transition: background-color 0.2s;
}.notification-button:hover {background-color: #e5e7eb; /* 按钮悬停效果 */
}/* 入场动画 */
@keyframes slideIn {from {transform: translateX(100%);opacity: 0;}to {transform: translateX(0);opacity: 1;}
}/* 退出动画 */
.notification-leave-active {animation: slideOut 0.3s ease-in;
}@keyframes slideOut {from {transform: translateX(0);opacity: 1;}to {transform: translateX(100%);opacity: 0;}
}
</style>

3.3 Nginx配置websocket代理

server {....
location /ws/ {....# 必需的核心配置proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}
}

3.4 Websocket实现跨服务消息推送

商城服务、供应商服务、运营服务属于三个独立的服务,前后端各自分开,怎么实现消息共享,需要借助redis的发布订阅模式实现消息传递。

  • 商城服务:买家通过前端发送消息,商城端服务发布消息到 Redis 的指定频道(message-channel)。
  • 供应商端、运营端服务:订阅 Redis 的 message-channel,接收消息后通过 WebSocket 推送到订阅了 /topic/messages 的供应商或运营前端。

3.4.1 商城端发布消息

public void sendMessage(Message message) {....// 发布消息到 Redis 频道redisTemplate.convertAndSend("message-channel", message);
}

3.4.2 供应商、运营端订阅消息

1.Redis订阅配置

@Slf4j
@Configuration
public class RedisSubscriberConfig {@Autowiredprivate MessageListener messageListener;@Beanpublic RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(messageListenerAdapter(), new ChannelTopic(MessageUtil.MSG_CHANNEL));container.setErrorHandler(t -> log.error("Redis 监听器错误", t));log.info("RedisMessageListenerContainer 已初始化并订阅了消息通道。");return container;}@Beanpublic MessageListenerAdapter messageListenerAdapter() {MessageListenerAdapter adapter = new MessageListenerAdapter(messageListener, "onMessage");adapter.setSerializer(new Jackson2JsonRedisSerializer<>(Object.class));return adapter;}
}

2.消息监听与推送

@Component
public class MessageListener {private static final Logger log = LoggerFactory.getLogger(MessageListener.class);@Autowiredprivate SimpMessagingTemplate messagingTemplate;@Autowiredprivate ObjectMapper objectMapper; // 注入 ObjectMapper 以复用配置/***因为redis的Jackson2JsonRedisSerializer 在序列化时,由于启用了 enableDefaultTyping,会将对象包装为 [type, data] 格式的数组,是接收的是一个数组而非Message对象*/public void onMessage(Object object) {try {// 检查消息是否为 ArrayListif (!(object instanceof List)) {log.error("无效的消息格式,预期为列表,但实际得到的是: {}", object != null ? object.getClass() : "null");return;}List<?> messageList = (List<?>) object;// 验证列表长度和内容if (messageList.size() < 2) {log.error("无效的消息列表大小: {}", messageList.size());return;}// 第一个元素应为类型信息(如 "com.yixiekeji.entity.messaging.Message")if (!(messageList.get(0) instanceof String)) {log.error("无效的类型信息,预期为字符串,实际为: {}", messageList.get(0) != null ? messageList.get(0).getClass() : "null");return;}// 第二个元素应为 Message 数据Object data = messageList.get(1);if (data == null) {log.error("消息数据为空");return;}// 转换为 Message 对象Message message = objectMapper.convertValue(data, Message.class);log.info("收到通知: {}", message);// 推送消息到 WebSocket 主题messagingTemplate.convertAndSend(MessageUtil.WS_TOPIC, message);} catch (Exception e) {log.error("消息处理失败: {}", object, e);}}
}

http://www.dtcms.com/a/427353.html

相关文章:

  • 仪器仪表第四节课学习笔记
  • Java 黑马程序员学习笔记(进阶篇15)
  • 【开题答辩过程】以《基于SpringBoot+Vue+uni-app的智慧校园服务系统的设计与实现》为例,不会开题答辩的可以进来看看
  • 做二手电脑的网站宣城网站建设 有限公司
  • 没有服务器 怎么做网站建设企业高端网站
  • 极简时钟APP(手机全能计时工具) 极简版
  • 华为光模块命名规则
  • 做企业网站用什么cms好易语言怎么做网页网站
  • 域名做网站北京 网站 公司
  • 深入浅出 Redis:从核心原理到运维实战指南一
  • 自定义含工具包`Ubuntu22.04.5.iso`镜像
  • Day 29 - 密码管理器开发 - Python学习笔记
  • Docker镜像结构全解析
  • ubuntu 22.04安装CUDA 13.0
  • 鸿蒙NEXT Wi-Fi扫描开发指南:从基础到实战
  • wordpress做的学校网站北京网页设计公司
  • 精读 C++20 设计模式:行为型设计模式——观察者模式
  • 广州营销型网站建设旅游网站内容规划特点
  • 【mdBook】6 在持续集成中运行 mdbook
  • Jenkins安装并与GitLab集成,实现dev、qa、uat、prod多分支持续集成的详细步骤
  • k8s部署前后分离架构微服务——跨域和缓存问题
  • 临沂网站建设推荐wordpress 评论倒序
  • MyBatis Plus注解 @TableField
  • 石家庄建设工程招标办网站成都网站推广公司
  • 【C#避坑实战系列文章16】性能优化(CPU / 内存占用过高问题解决)
  • JavaScript性能优化实战:从指标到落地的全链路方案
  • 上海网站优化加盟网站建设的公司开发
  • 网站怎么发布到服务器青岛不错的网站公司
  • linux0.11学习之启动主线要点(一)
  • Invoke-customs are only supported starting with Android O (--min-api 26)