基于MQTT的实时消息推送系统设计与实现(Java后端+Vue前端)
基于MQTT的实时消息推送系统设计与实现(Java后端+Vue前端)
本文档将详细介绍如何使用MQTT协议实现一个具备实时推送、消息优先级、消息撤回、重试机制和连接稳定性保障的完整消息系统。我们将以社交应用的实时通知场景为例,提供完整的Java后端和Vue前端实现方案。
1. 系统设计概述
1.1 业务场景
我们实现一个社交应用的实时通知系统,支持:
- 实时接收好友请求、评论、点赞等通知
- 按优先级处理不同类型消息(系统通知 > 私信 > 点赞)
- 消息发送者可在5分钟内撤回消息
- 网络异常时自动重试发送
- 客户端断线后自动重连并接收离线消息
1.2 系统架构
Vue前端应用|| MQTT over WebSocketv
MQTT Broker (EMQX)|| 发布/订阅模式v
Spring Boot后端服务|v
MySQL数据库 (存储消息记录)
1.3 技术选型
- MQTT Broker: EMQX 5.0(支持MQTT 5.0,高并发)
- 后端: Spring Boot 2.7.x + Eclipse Paho MQTT客户端
- 前端: Vue 3 + MQTT.js + Element Plus
- 数据库: MySQL 8.0
- 认证: JWT令牌认证
2. 环境搭建
2.1 安装EMQX Broker
# 使用Docker安装EMQX
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 18083:18083 emqx/emqx:5.0.24
访问EMQX控制台:http://localhost:18083
,默认账号密码:admin/public
2.2 创建主题结构
设计以下MQTT主题:
user/${userId}/notifications
- 用户接收通知的主题user/${userId}/recall
- 用户接收消息撤回指令的主题system/broadcast
- 系统广播通知notifications/dlq
- 死信队列
3. 后端实现(Java/Spring Boot)
3.1 项目初始化
创建Spring Boot项目,添加依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.14</version><relativePath/></parent><groupId>com.example</groupId><artifactId>mqtt-notification-service</artifactId><version>0.0.1-SNAPSHOT</version><name>mqtt-notification-service</name><description>MQTT Notification Service with Spring Boot</description><properties><java.version>11</java.version></properties><dependencies><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Data JPA --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!-- MySQL Driver --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- MQTT Client --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- JWT Authentication --><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt-api</artifactId><version>0.11.5</version></dependency><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt-impl</artifactId><version>0.11.5</version><scope>runtime</scope></dependency><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt-jackson</artifactId><version>0.11.5</version><scope>runtime</scope></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- Validation --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><!-- Testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
3.2 配置文件
# 服务器端口
server.port=8080# 数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/mqtt_notification?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver# JPA配置
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect# MQTT配置
mqtt.broker-url=tcp://localhost:1883
mqtt.client-id=notification-server-${random.value}
mqtt.username=admin
mqtt.password=public
mqtt.keep-alive-interval=30
mqtt.connection-timeout=3000
mqtt.clean-session=false
mqtt.retry-interval=1000
mqtt.max-retry-attempts=3# JWT配置
jwt.secret=your-secret-key-should-be-very-long-and-secure-for-production-use
jwt.expiration=86400000# 业务配置
app.max-recall-minutes=5
3.3 MQTT客户端配置
package com.example.mqtt.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.keep-alive-interval}")private int keepAliveInterval;@Value("${mqtt.connection-timeout}")private int connectionTimeout;@Value("${mqtt.clean-session}")private boolean cleanSession;@Beanpublic MqttClient mqttClient() throws MqttException {// 创建连接选项MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setKeepAliveInterval(keepAliveInterval);options.setConnectionTimeout(connectionTimeout);options.setCleanSession(cleanSession);options.setAutomaticReconnect(true); // 自动重连// 创建MQTT客户端MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());client.connect(options);return client;}
}
3.4 数据模型设计
消息实体类
package com.example.mqtt.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import javax.persistence.*;
import java.time.LocalDateTime;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "notifications")
public class Notification {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;@Column(unique = true, nullable = false)private String messageId;@Column(nullable = false)private String userId;@Column(nullable = false)private String senderId;@Column(nullable = false)private String title;@Column(nullable = false, columnDefinition = "TEXT")private String content;@Column(nullable = false)@Enumerated(EnumType.STRING)private NotificationType type;@Column(nullable = false)private int priority; // 0-9,9为最高优先级@Column(nullable = false)@Enumerated(EnumType.STRING)private NotificationStatus status = NotificationStatus.ACTIVE;@Column(nullable = false)private LocalDateTime createdAt;private LocalDateTime recallAt;private int retryCount = 0;private boolean isRead = false;public enum NotificationType {FRIEND_REQUEST, COMMENT, LIKE, SYSTEM, MENTION, MESSAGE}public enum NotificationStatus {ACTIVE, RECALLED, EXPIRED, FAILED}
}
消息数据访问接口
package com.example.mqtt.repository;import com.example.mqtt.model.Notification;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;import java.util.Optional;@Repository
public interface NotificationRepository extends JpaRepository<Notification, Long> {Optional<Notification> findByMessageId(String messageId);Page<Notification> findByUserIdOrderByCreatedAtDesc(String userId, Pageable pageable);long countByUserIdAndIsReadFalse(String userId);
}
3.5 MQTT消息服务实现
package com.example.mqtt.service;import com.example.mqtt.model.Notification;
import com.example.mqtt.repository.NotificationRepository;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@Service
public class MqttService {private final MqttClient mqttClient;private final NotificationRepository notificationRepository;private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);@Value("${mqtt.retry-interval}")private int retryInterval;@Value("${mqtt.max-retry-attempts}")private int maxRetryAttempts;@Value("${app.max-recall-minutes}")private int maxRecallMinutes;@Autowiredpublic MqttService(MqttClient mqttClient, NotificationRepository notificationRepository) {this.mqttClient = mqttClient;this.notificationRepository = notificationRepository;}@PostConstructpublic void init() {try {// 订阅撤回主题,用于记录撤回操作mqttClient.subscribe("user/+/recall", 2, (topic, message) -> {String payload = new String(message.getPayload(), StandardCharsets.UTF_8);System.out.println("Received recall command: " + payload + " on topic: " + topic);// 这里可以添加撤回记录逻辑});} catch (MqttException e) {System.err.println("Failed to subscribe to recall topics: " + e.getMessage());}}/*** 发送通知消息*/public String sendNotification(Notification notification) {// 生成唯一消息IDString messageId = "notif-" + UUID.randomUUID().toString();notification.setMessageId(messageId);notification.setCreatedAt(LocalDateTime.now());notification.setStatus(Notification.NotificationStatus.ACTIVE);// 保存消息到数据库notificationRepository.save(notification);// 构建MQTT消息String topic = "user/" + notification.getUserId() + "/notifications";String payload = buildNotificationPayload(notification, messageId);MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));mqttMessage.setQos(2); // 确保消息恰好一次送达// 设置MQTT 5.0属性 - 优先级和过期时间if (mqttClient.getVersion() == MqttClient.PROTOCOL_VERSION_5) {MqttProperties properties = new MqttProperties();properties.setMessagePriority(notification.getPriority());properties.setMessageExpiryInterval(86400); // 24小时后过期mqttMessage.setProperties(properties);}try {// 发送消息mqttClient.publish(topic, mqttMessage);System.out.println("Message sent successfully: " + messageId);return messageId;} catch (MqttException e) {System.err.println("Failed to send message: " + e.getMessage());// 启动重试机制scheduleRetry(notification, 1);return messageId;}}/*** 构建通知消息 payload*/private String buildNotificationPayload(Notification notification, String messageId) {return String.format("{\"messageId\":\"%s\",\"userId\":\"%s\",\"senderId\":\"%s\",\"title\":\"%s\",\"content\":\"%s\"," +"\"type\":\"%s\",\"priority\":%d,\"createdAt\":\"%s\"}",messageId,notification.getUserId(),notification.getSenderId(),escapeJson(notification.getTitle()),escapeJson(notification.getContent()),notification.getType(),notification.getPriority(),notification.getCreatedAt());}/*** JSON转义*/private String escapeJson(String value) {if (value == null) return "";return value.replace("\\", "\\\\").replace("\"", "\\\"").replace("\b", "\\b").replace("\f", "\\f").replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t");}/*** 安排消息重试*/private void scheduleRetry(Notification notification, int attempt) {if (attempt > maxRetryAttempts) {// 达到最大重试次数,标记为失败notification.setStatus(Notification.NotificationStatus.FAILED);notification.setRetryCount(attempt);notificationRepository.save(notification);// 发送到死信队列String dlqTopic = "notifications/dlq";String payload = String.format("{\"messageId\":\"%s\",\"error\":\"Max retry attempts reached\",\"retryCount\":%d}",notification.getMessageId(), attempt);try {mqttClient.publish(dlqTopic, new MqttMessage(payload.getBytes()));} catch (MqttException e) {System.err.println("Failed to publish to DLQ: " + e.getMessage());}return;}// 指数退避策略计算延迟long delay = (long) (Math.pow(2, attempt) * retryInterval);scheduler.schedule(() -> {try {String topic = "user/" + notification.getUserId() + "/notifications";String payload = buildNotificationPayload(notification, notification.getMessageId());MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));mqttMessage.setQos(2);mqttClient.publish(topic, mqttMessage);System.out.println("Message retried successfully (attempt " + attempt + "): " + notification.getMessageId());// 更新重试计数notification.setRetryCount(attempt);notificationRepository.save(notification);} catch (MqttException e) {System.err.println("Retry attempt " + attempt + " failed: " + e.getMessage());// 继续重试scheduleRetry(notification, attempt + 1);}}, delay, TimeUnit.MILLISECONDS);}/*** 撤回消息*/public boolean recallMessage(String messageId, String operatorId) {// 查找消息Notification notification = notificationRepository.findByMessageId(messageId).orElseThrow(() -> new IllegalArgumentException("Message not found"));// 验证权限if (!notification.getSenderId().equals(operatorId) && !"admin".equals(operatorId)) {throw new SecurityException("No permission to recall this message");}// 检查是否在可撤回时间内LocalDateTime now = LocalDateTime.now();long minutesSinceCreation = java.time.Duration.between(notification.getCreatedAt(), now).toMinutes();if (minutesSinceCreation > maxRecallMinutes) {throw new IllegalStateException("Cannot recall message after " + maxRecallMinutes + " minutes");}// 更新消息状态notification.setStatus(Notification.NotificationStatus.RECALLED);notification.setRecallAt(now);notificationRepository.save(notification);// 发送撤回指令String topic = "user/" + notification.getUserId() + "/recall";String payload = String.format("{\"messageId\":\"%s\",\"operatorId\":\"%s\",\"recalledAt\":\"%s\",\"isRecall\":true}",messageId, operatorId, now);try {MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));mqttMessage.setQos(2);mqttClient.publish(topic, mqttMessage);return true;} catch (MqttException e) {System.err.println("Failed to send recall command: " + e.getMessage());return false;}}
}
3.6 业务服务和控制器
通知业务服务
package com.example.mqtt.service;import com.example.mqtt.model.Notification;
import com.example.mqtt.repository.NotificationRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;@Service
public class NotificationService {private final NotificationRepository notificationRepository;private final MqttService mqttService;@Autowiredpublic NotificationService(NotificationRepository notificationRepository, MqttService mqttService) {this.notificationRepository = notificationRepository;this.mqttService = mqttService;}/*** 创建并发送通知*/public String createAndSendNotification(String userId, String senderId, String title, String content, Notification.NotificationType type, int priority) {Notification notification = Notification.builder().userId(userId).senderId(senderId).title(title).content(content).type(type).priority(priority).build();return mqttService.sendNotification(notification);}/*** 撤回消息*/public boolean recallNotification(String messageId, String operatorId) {return mqttService.recallMessage(messageId, operatorId);}/*** 获取用户通知列表*/public Page<Notification> getUserNotifications(String userId, Pageable pageable) {return notificationRepository.findByUserIdOrderByCreatedAtDesc(userId, pageable);}/*** 标记消息为已读*/public void markAsRead(String messageId, String userId) {Notification notification = notificationRepository.findByMessageId(messageId).orElseThrow(() -> new IllegalArgumentException("Message not found"));if (!notification.getUserId().equals(userId)) {throw new SecurityException("No permission to mark this message as read");}notification.setRead(true);notificationRepository.save(notification);}/*** 获取未读消息数量*/public long getUnreadCount(String userId) {return notificationRepository.countByUserIdAndIsReadFalse(userId);}
}
通知API控制器
package com.example.mqtt.controller;import com.example.mqtt.model.Notification;
import com.example.mqtt.service.NotificationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.web.bind.annotation.*;import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;@RestController
@RequestMapping("/api/notifications")
public class NotificationController {private final NotificationService notificationService;@Autowiredpublic NotificationController(NotificationService notificationService) {this.notificationService = notificationService;}/*** 发送通知*/@PostMappingpublic ResponseEntity<String> sendNotification(Authentication authentication,@Valid @RequestBody SendNotificationRequest request) {String senderId = authentication.getName();String messageId = notificationService.createAndSendNotification(request.getUserId(),senderId,request.getTitle(),request.getContent(),request.getType(),request.getPriority());return ResponseEntity.ok(messageId);}/*** 撤回通知*/@PostMapping("/{messageId}/recall")public ResponseEntity<Boolean> recallNotification(Authentication authentication,@PathVariable String messageId) {String operatorId = authentication.getName();boolean result = notificationService.recallNotification(messageId, operatorId);return ResponseEntity.ok(result);}/*** 获取当前用户的通知列表*/@GetMappingpublic ResponseEntity<Page<Notification>> getUserNotifications(Authentication authentication,Pageable pageable) {String userId = authentication.getName();Page<Notification> notifications = notificationService.getUserNotifications(userId, pageable);return ResponseEntity.ok(notifications);}/*** 标记消息为已读*/@PutMapping("/{messageId}/read")public ResponseEntity<Void> markAsRead(Authentication authentication,@PathVariable String messageId) {String userId = authentication.getName();notificationService.markAsRead(messageId, userId);return ResponseEntity.noContent().build();}/*** 获取未读消息数量*/@GetMapping("/unread/count")public ResponseEntity<Long> getUnreadCount(Authentication authentication) {String userId = authentication.getName();long count = notificationService.getUnreadCount(userId);return ResponseEntity.ok(count);}// 请求参数模型public static class SendNotificationRequest {@NotBlank(message = "User ID is required")private String userId;@NotBlank(message = "Title is required")private String title;@NotBlank(message = "Content is required")private String content;@NotNull(message = "Type is required")private Notification.NotificationType type;@Min(value = 0, message = "Priority must be between 0 and 9")private int priority = 5;// Getters and setterspublic String getUserId() { return userId; }public void setUserId(String userId) { this.userId = userId; }public String getTitle() { return title; }public void setTitle(String title) { this.title = title; }public String getContent() { return content; }public void setContent(String content) { this.content = content; }public Notification.NotificationType getType() { return type; }public void setType(Notification.NotificationType type) { this.type = type; }public int getPriority() { return priority; }public void setPriority(int priority) { this.priority = priority; }}
}
3.7 JWT认证配置
package com.example.mqtt.config;import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.io.Decoders;
import io.jsonwebtoken.security.Keys;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;import java.security.Key;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;@Component
public class JwtConfig {@Value("${jwt.secret}")private String secretKey;@Value("${jwt.expiration}")private long jwtExpiration;public String extractUsername(String token) {return extractClaim(token, Claims::getSubject);}public <T> T extractClaim(String token, Function<Claims, T> claimsResolver) {final Claims claims = extractAllClaims(token);return claimsResolver.apply(claims);}public String generateToken(UserDetails userDetails) {return generateToken(new HashMap<>(), userDetails);}public String generateToken(Map<String, Object> extraClaims, UserDetails userDetails) {return buildToken(extraClaims, userDetails, jwtExpiration);}private String buildToken(Map<String, Object> extraClaims,UserDetails userDetails,long expiration) {return Jwts.builder().setClaims(extraClaims).setSubject(userDetails.getUsername()).setIssuedAt(new Date(System.currentTimeMillis())).setExpiration(new Date(System.currentTimeMillis() + expiration)).signWith(getSignInKey(), SignatureAlgorithm.HS256).compact();}public boolean isTokenValid(String token, UserDetails userDetails) {final String username = extractUsername(token);return (username.equals(userDetails.getUsername())) && !isTokenExpired(token);}private boolean isTokenExpired(String token) {return extractExpiration(token).before(new Date());}private Date extractExpiration(String token) {return extractClaim(token, Claims::getExpiration);}private Claims extractAllClaims(String token) {return Jwts.parserBuilder().setSigningKey(getSignInKey()).build().parseClaimsJws(token).getBody();}private Key getSignInKey() {byte[] keyBytes = Decoders.BASE64.decode(secretKey);return Keys.hmacShaKeyFor(keyBytes);}
}
4. 前端实现(Vue 3)
4.1 项目初始化
# 创建Vue项目
vue create mqtt-notification-client
cd mqtt-notification-client# 安装依赖
npm install mqtt element-plus axios jwt-decode
4.2 MQTT服务封装
import mqtt from 'mqtt/dist/mqtt.min'
import { ElNotification, ElMessageBox } from 'element-plus'class MqttService {constructor() {this.client = nullthis.userId = nullthis.connected = falsethis.reconnectTimeout = 1000 // 初始重连延迟(ms)this.maxReconnectTimeout = 30000 // 最大重连延迟this.notificationCallback = null // 接收通知的回调this.recallCallback = null // 接收撤回的回调}/*** 初始化MQTT连接* @param {string} userId 用户ID* @param {function} notificationCallback 通知回调函数* @param {function} recallCallback 撤回回调函数*/init(userId, notificationCallback, recallCallback) {this.userId = userIdthis.notificationCallback = notificationCallbackthis.recallCallback = recallCallback// 如果已连接则先断开if (this.client) {this.disconnect()}this.connect()}/*** 连接到MQTT Broker*/connect() {if (!this.userId) {console.error('用户ID不能为空')return}// 生成客户端IDconst clientId = `web-client-${this.userId}-${Math.random().toString(16).substr(2, 8)}`// MQTT over WebSocket连接地址const brokerUrl = 'ws://localhost:8083/mqtt'// 连接选项const options = {clientId,clean: false, // 保持会话,重连后可接收离线消息connectTimeout: 4000,username: 'admin', // 实际环境中应使用JWT令牌password: 'public',keepalive: 30 // 30秒心跳}// 连接到brokerthis.client = mqtt.connect(brokerUrl, options)// 连接成功this.client.on('connect', (connack) => {this.connected = truethis.reconnectTimeout = 1000 // 重置重连延迟console.log('MQTT连接成功', connack)ElNotification({title: '连接成功',message: '已成功连接到消息服务器',type: 'success',duration: 3000})// 订阅用户专属主题const topics = [`user/${this.userId}/notifications`,`user/${this.userId}/recall`]this.client.subscribe(topics, { qos: 2 }, (err) => {if (!err) {console.log(`已订阅主题: ${topics.join(', ')}`)} else {console.error('订阅主题失败:', err)ElNotification({title: '订阅失败',message: '无法订阅通知主题',type: 'error'})}})})// 接收消息this.client.on('message', (topic, message) => {this.handleMessage(topic, message.toString())})// 连接断开this.client.on('close', () => {this.connected = falseconsole.log('MQTT连接已关闭')this.scheduleReconnect()})// 连接错误this.client.on('error', (err) => {this.connected = falseconsole.error('MQTT错误:', err)ElNotification({title: '连接错误',message: '消息服务连接发生错误',type: 'error'})this.scheduleReconnect()})// 重连中this.client.on('reconnect', () => {console.log('正在重连到MQTT服务器...')})}/*** 处理接收到的消息*/handleMessage(topic, payload) {try {const message = JSON.parse(payload)if (topic.endsWith('/recall') && message.isRecall) {// 处理撤回消息console.log('收到撤回指令:', message)if (this.recallCallback) {this.recallCallback(message)}ElNotification({title: '消息已撤回',message: '一条消息已被发送者撤回',type: 'info'})} else if (topic.endsWith('/notifications')) {// 处理通知消息console.log('收到新通知:', message)if (this.notificationCallback) {this.notificationCallback(message)}// 显示通知ElNotification({title: message.title,message: message.content,type: this.getNotificationTypeIcon(message.type),duration: 5000})}} catch (e) {console.error('解析消息失败:', e)}}/*** 根据通知类型获取对应的图标*/getNotificationTypeIcon(type) {switch (type) {case 'SYSTEM':return 'warning'case 'FRIEND_REQUEST':return 'success'case 'MESSAGE':return 'info'default:return 'info'}}/*** 安排重连*/scheduleReconnect() {if (!this.client.connected) {ElNotification({title: '连接断开',message: `将在 ${this.reconnectTimeout / 1000} 秒后尝试重连`,type: 'warning'})// 指数退避重连setTimeout(() => {if (!this.connected) {this.connect()this.reconnectTimeout = Math.min(this.reconnectTimeout * 2, this.maxReconnectTimeout)}}, this.reconnectTimeout)}}/*** 断开连接*/disconnect() {if (this.client) {this.client.end()this.client = nullthis.connected = falseconsole.log('MQTT连接已手动断开')}}/*** 检查连接状态*/isConnected() {return this.connected}
}// 导出单例实例
export default new MqttService()
4.3 API服务封装
import axios from 'axios'
import { ElMessage } from 'element-plus'// 创建axios实例
const api = axios.create({baseURL: 'http://localhost:8080/api',timeout: 10000,headers: {'Content-Type': 'application/json'}
})// 请求拦截器 - 添加认证令牌
api.interceptors.request.use((config) => {const token = localStorage.getItem('token')if (token) {config.headers.Authorization = `Bearer ${token}`}return config},(error) => {return Promise.reject(error)}
)// 响应拦截器 - 处理错误
api.interceptors.response.use((response) => {return response.data},(error) => {const message = error.response?.data?.error || error.message || '请求失败'ElMessage.error(message)return Promise.reject(error)}
)// 通知相关API
export const notificationApi = {/*** 发送通知*/sendNotification: (data) => {return api.post('/notifications', data)},/*** 撤回通知*/recallNotification: (messageId) => {return api.post(`/notifications/${messageId}/recall`)},/*** 获取当前用户的通知列表*/getUserNotifications: (page = 0, size = 20) => {return api.get(`/notifications?page=${page}&size=${size}`)},/*** 标记消息为已读*/markAsRead: (messageId) => {return api.put(`/notifications/${messageId}/read`)},/*** 获取未读消息数量*/getUnreadCount: () => {return api.get('/notifications/unread/count')}
}// 用户认证相关API
export const authApi = {/*** 用户登录*/login: (credentials) => {return api.post('/auth/login', credentials)},/*** 用户注册*/register: (userData) => {return api.post('/auth/register', userData)},/*** 获取当前用户信息*/getCurrentUser: () => {return api.get('/auth/me')}
}export default api
4.4 通知列表组件
<template><div class="notification-list"><el-card><div slot="header" class="card-header"><h2>通知中心</h2><el-badge :value="unreadCount" class="notification-badge"><el-button size="small" @click="markAllAsRead">全部标为已读</el-button></el-badge></div><el-empty v-if="notifications.length === 0 && !loading"description="暂无通知"></el-empty><el-skeleton v-if="loading":rows="5"animated></el-skeleton><el-timeline v-else><el-timeline-item v-for="notification in notifications" :key="notification.messageId":timestamp="formatTime(notification.createdAt)":icon="getIcon(notification.type)":color="getColor(notification.type)"placement="top"><el-card :class="{ 'unread-notification': !notification.read, 'recalled-notification': notification.status === 'RECALLED' }"><div class="notification-header"><h3>{{ notification.title }}</h3><span class="notification-type">{{ getTypeName(notification.type) }}</span></div><div v-if="notification.status === 'RECALLED'" class="recall-message">此消息已被撤回</div><div v-else class="notification-content">{{ notification.content }}</div><div class="notification-actions"><el-button size="mini" text @click="markAsRead(notification.messageId)"v-if="!notification.read && notification.status !== 'RECALLED'">标为已读</el-button><el-button size="mini" text type="danger"@click="handleRecall(notification.messageId)"v-if="canRecall(notification)">撤回</el-button></div></el-card></el-timeline-item></el-timeline><el-paginationv-if="notifications.length > 0"class="pagination":current-page="currentPage":page-size="pageSize":total="totalNotifications"@current-change="handlePageChange"layout="prev, pager, next"></el-pagination></el-card></div>
</template><script setup>
import { ref, onMounted, computed } from 'vue'
import { notificationApi } from '../services/apiService'
import { ElMessageBox, ElMessage } from 'element-plus'
import mqttService from '../services/mqttService'
import { formatDistanceToNow } from 'date-fns'
import { zhCN } from 'date-fns/locale'// 状态变量
const notifications = ref([])
const loading = ref(true)
const currentPage = ref(0)
const pageSize = ref(20)
const totalNotifications = ref(0)
const unreadCount = ref(0)
const userId = ref(localStorage.getItem('userId'))// 初始化
onMounted(() => {fetchNotifications()fetchUnreadCount()setupMqttListeners()
})// 设置MQTT监听器
const setupMqttListeners = () => {if (userId.value) {mqttService.init(userId.value,handleNewNotification, // 处理新通知handleMessageRecall // 处理消息撤回)}
}// 获取通知列表
const fetchNotifications = async () => {try {loading.value = trueconst response = await notificationApi.getUserNotifications(currentPage.value, pageSize.value)notifications.value = response.contenttotalNotifications.value = response.totalElements} catch (error) {console.error('获取通知失败:', error)} finally {loading.value = false}
}// 获取未读消息数量
const fetchUnreadCount = async () => {try {const count = await notificationApi.getUnreadCount()unreadCount.value = count} catch (error) {console.error('获取未读数量失败:', error)}
}// 处理新通知
const handleNewNotification = (newNotification) => {// 添加到列表头部notifications.value.unshift(newNotification)// 更新未读计数fetchUnreadCount()
}// 处理消息撤回
const handleMessageRecall = (recallMessage) => {// 在本地标记消息为已撤回const index = notifications.value.findIndex(n => n.messageId === recallMessage.messageId)if (index !== -1) {notifications.value[index].status = 'RECALLED'}
}// 标记消息为已读
const markAsRead = async (messageId) => {try {await notificationApi.markAsRead(messageId)// 更新本地状态const index = notifications.value.findIndex(n => n.messageId === messageId)if (index !== -1) {notifications.value[index].read = true}fetchUnreadCount()ElMessage.success('已标记为已读')} catch (error) {console.error('标记已读失败:', error)}
}// 全部标为已读
const markAllAsRead = async () => {try {// 这里简化处理,实际应调用后端批量接口const unreadNotifications = notifications.value.filter(n => !n.read)for (const notification of unreadNotifications) {await notificationApi.markAsRead(notification.messageId)notification.read = true}fetchUnreadCount()ElMessage.success('全部已标为已读')} catch (error) {console.error('批量标记已读失败:', error)}
}// 撤回消息
const handleRecall = async (messageId) => {try {const confirm = await ElMessageBox.confirm('确定要撤回这条消息吗?','确认撤回',{confirmButtonText: '确定',cancelButtonText: '取消',type: 'warning'})if (confirm) {const result = await notificationApi.recallNotification(messageId)if (result) {// 更新本地状态const index = notifications.value.findIndex(n => n.messageId === messageId)if (index !== -1) {notifications.value[index].status = 'RECALLED'}ElMessage.success('消息已撤回')}}} catch (error) {console.error('撤回消息失败:', error)}
}// 分页变化
const handlePageChange = (page) => {currentPage.value = page - 1 // 后端页码从0开始fetchNotifications()
}// 格式化时间
const formatTime = (dateString) => {return formatDistanceToNow(new Date(dateString), { addSuffix: true,locale: zhCN})
}// 根据类型获取图标
const getIcon = (type) => {switch (type) {case 'SYSTEM': return 'el-icon-warning'case 'FRIEND_REQUEST': return 'el-icon-user-plus'case 'COMMENT': return 'el-icon-comment'case 'LIKE': return 'el-icon-heart'case 'MENTION': return 'el-icon-at'case 'MESSAGE': return 'el-icon-message'default: return 'el-icon-bell'}
}// 根据类型获取颜色
const getColor = (type) => {switch (type) {case 'SYSTEM': return '#e6a23c'case 'FRIEND_REQUEST': return '#52c41a'case 'MESSAGE': return '#1890ff'default: return '#8c8c8c'}
}// 获取类型名称
const getTypeName = (type) => {const typeMap = {'SYSTEM': '系统通知','FRIEND_REQUEST': '好友请求','COMMENT': '评论','LIKE': '点赞','MENTION': '提及','MESSAGE': '私信'}return typeMap[type] || type
}// 判断是否可以撤回
const canRecall = (notification) => {// 只有自己发送的消息且未撤回的才能撤回return notification.senderId === userId.value && notification.status !== 'RECALLED' &&// 5分钟内可以撤回new Date() - new Date(notification.createdAt) < 5 * 60 * 1000
}
</script><style scoped>
.card-header {display: flex;justify-content: space-between;align-items: center;
}.notification-badge {margin-left: 10px;
}.notification-header {display: flex;justify-content: space-between;align-items: center;margin-bottom: 10px;
}.notification-type {font-size: 12px;padding: 2px 8px;border-radius: 12px;background-color: #f0f2f5;
}.notification-content {margin-bottom: 10px;line-height: 1.6;
}.recall-message {margin-bottom: 10px;padding: 10px;background-color: #fff1f0;border-radius: 4px;color: #f5222d;font-size: 14px;
}.notification-actions {display: flex;justify-content: flex-end;gap: 10px;
}.unread-notification {border-left: 3px solid #1890ff;
}.recalled-notification {opacity: 0.7;
}.pagination {margin-top: 20px;text-align: center;
}
</style>
4.5 发送通知组件
<template><el-card><div slot="header"><h2>发送通知</h2></div><el-form ref="formRef" :model="form" :rules="rules" label-width="100px"class="send-notification-form"><el-form-item label="接收用户ID" prop="userId"><el-input v-model="form.userId" placeholder="请输入接收用户ID"></el-input></el-form-item><el-form-item label="通知类型" prop="type"><el-select v-model="form.type" placeholder="请选择通知类型"><el-option label="系统通知" value="SYSTEM"></el-option><el-option label="好友请求" value="FRIEND_REQUEST"></el-option><el-option label="评论" value="COMMENT"></el-option><el-option label="点赞" value="LIKE"></el-option><el-option label="提及" value="MENTION"></el-option><el-option label="私信" value="MESSAGE"></el-option></el-select></el-form-item><el-form-item label="优先级" prop="priority"><el-slider v-model="form.priority" :min="0" :max="9" :step="1"show-input></el-slider><div class="priority-info"><span>低</span><span class="priority-high">高</span></div></el-form-item><el-form-item label="标题" prop="title"><el-input v-model="form.title" placeholder="请输入通知标题"></el-input></el-form-item><el-form-item label="内容" prop="content"><el-input v-model="form.content" type="textarea" :rows="4" placeholder="请输入通知内容"></el-input></el-form-item><el-form-item><el-button type="primary" @click="submitForm">发送通知</el-button><el-button @click="resetForm">重置</el-button></el-form-item></el-form></el-card>
</template><script setup>
import { ref, reactive } from 'vue'
import { notificationApi } from '../services/apiService'
import { ElMessage } from 'element-plus'// 表单引用
const formRef = ref(null)// 表单数据
const form = reactive({userId: '',type: '',priority: 5,title: '',content: ''
})// 表单规则
const rules = reactive({userId: [{ required: true, message: '请输入接收用户ID', trigger: 'blur' }],type: [{ required: true, message: '请选择通知类型', trigger: 'change' }],title: [{ required: true, message: '请输入通知标题', trigger: 'blur' },{ max: 50, message: '标题长度不能超过50个字符', trigger: 'blur' }],content: [{ required: true, message: '请输入通知内容', trigger: 'blur' },{ max: 500, message: '内容长度不能超过500个字符', trigger: 'blur' }]
})// 提交表单
const submitForm = async () => {if (!formRef.value) returntry {await formRef.value.validate()// 发送通知const messageId = await notificationApi.sendNotification(form)ElMessage.success(`通知发送成功,消息ID: ${messageId}`)resetForm()} catch (error) {console.error('发送通知失败:', error)if (error.name !== 'ValidationError') {ElMessage.error('发送通知失败,请稍后重试')}}
}// 重置表单
const resetForm = () => {if (formRef.value) {formRef.value.resetFields()}
}
</script><style scoped>
.send-notification-form {max-width: 600px;
}.priority-info {display: flex;justify-content: space-between;margin-top: -10px;margin-bottom: 10px;font-size: 12px;color: #606266;
}.priority-high {color: #f56c6c;
}
</style>
4.6 主页面组件
<template><div class="home-container"><el-row :gutter="20"><el-col :span="12"><SendNotification /></el-col><el-col :span="12"><div class="connection-status"><el-card><div slot="header"><h2>连接状态</h2></div><div class="status-info"><el-descriptions column="1"><el-descriptions-item label="MQTT连接状态"><el-tag :type="mqttConnected ? 'success' : 'danger'">{{ mqttConnected ? '已连接' : '未连接' }}</el-tag></el-descriptions-item><el-descriptions-item label="当前用户">{{ currentUser || '未登录' }}</el-descriptions-item><el-descriptions-item label="未读消息"><el-badge :value="unreadCount" class="item"><span>{{ unreadCount }} 条</span></el-badge></el-descriptions-item></el-descriptions><el-button v-if="!mqttConnected && currentUser" type="primary" @click="reconnectMqtt"style="margin-top: 15px;">重新连接</el-button></div></el-card></div></el-col></el-row><el-row style="margin-top: 20px;"><el-col :span="24"><NotificationList /></el-col></el-row></div>
</template><script setup>
import { ref, onMounted, watchEffect } from 'vue'
import SendNotification from '../components/SendNotification.vue'
import NotificationList from '../components/NotificationList.vue'
import mqttService from '../services/mqttService'
import { authApi, notificationApi } from '../services/apiService'// 状态变量
const mqttConnected = ref(false)
const currentUser = ref('')
const unreadCount = ref(0)// 初始化
onMounted(() => {checkLoginStatus()setupMqttStatusListener()fetchUnreadCount()
})// 检查登录状态
const checkLoginStatus = async () => {try {const user = await authApi.getCurrentUser()currentUser.value = user.idlocalStorage.setItem('userId', user.id)} catch (error) {console.log('用户未登录或会话已过期')// 这里可以跳转到登录页}
}// 设置MQTT状态监听器
const setupMqttStatusListener = () => {// 定期检查连接状态setInterval(() => {mqttConnected.value = mqttService.isConnected()}, 1000)
}// 获取未读消息数量
const fetchUnreadCount = async () => {try {const count = await notificationApi.getUnreadCount()unreadCount.value = count} catch (error) {console.error('获取未读数量失败:', error)}
}// 重新连接MQTT
const reconnectMqtt = () => {if (currentUser.value) {mqttService.init(currentUser.value, () => {}, () => {})}
}// 监听未读数量变化
watchEffect(() => {// 每30秒刷新一次未读数量const timer = setInterval(fetchUnreadCount, 30000)return () => clearInterval(timer)
})
</script><style scoped>
.home-container {padding: 20px;
}.connection-status {height: 100%;
}.status-info {padding: 10px 0;
}
</style>
5. 系统测试与验证
5.1 功能测试
- 用户A登录系统,进入通知中心
- 用户B发送通知给用户A,验证用户A能否实时收到
- 用户B撤回消息,验证用户A的界面是否显示消息已撤回
- 断开网络连接,再重新连接,验证是否能收到离线期间的消息
- 发送高优先级消息,验证是否优先展示
5.2 性能测试
- 使用工具模拟1000个并发连接,测试系统稳定性
- 连续发送10000条消息,测试消息处理能力和延迟
- 测试网络波动情况下的自动重连和消息重试机制
6. 总结
本系统基于MQTT协议实现了一个功能完整的实时消息推送系统,具有以下特点:
- 实时性:利用MQTT的发布/订阅模式和长连接特性,实现消息的实时推送
- 可靠性:通过QoS 2级别保证消息恰好一次送达,配合重试机制处理发送失败情况
- 优先级支持:基于MQTT 5.0的消息优先级属性,实现不同重要程度消息的分级处理
- 消息撤回:通过撤回指令消息实现逻辑上的消息撤回功能
- 断线重连:客户端实现自动重连机制,确保连接稳定性
- 离线消息:利用MQTT的会话保持特性,支持客户端重连后接收离线消息
系统架构清晰,前后端分离,易于扩展和维护,可根据实际业务需求进一步扩展功能。
补充(Vue2前端)
如果使用的前端为vue2,前端实现可参考以下
前端实现(Vue 2)
4.1 项目初始化
# 创建Vue 2项目
vue create mqtt-notification-client-vue2
# 选择Vue 2版本# 安装依赖
cd mqtt-notification-client-vue2
npm install mqtt element-ui axios jsonwebtoken moment
4.2 MQTT服务封装
import mqtt from 'mqtt/dist/mqtt.min'
import { Notification } from 'element-ui'class MqttService {constructor() {this.client = nullthis.userId = nullthis.connected = falsethis.reconnectTimeout = 1000 // 初始重连延迟(ms)this.maxReconnectTimeout = 30000 // 最大重连延迟this.notificationCallback = null // 接收通知的回调this.recallCallback = null // 接收撤回的回调}/*** 初始化MQTT连接* @param {string} userId 用户ID* @param {function} notificationCallback 通知回调函数* @param {function} recallCallback 撤回回调函数*/init(userId, notificationCallback, recallCallback) {this.userId = userIdthis.notificationCallback = notificationCallbackthis.recallCallback = recallCallback// 如果已连接则先断开if (this.client) {this.disconnect()}this.connect()}/*** 连接到MQTT Broker*/connect() {if (!this.userId) {console.error('用户ID不能为空')return}// 生成客户端IDconst clientId = `web-client-${this.userId}-${Math.random().toString(16).substr(2, 8)}`// MQTT over WebSocket连接地址const brokerUrl = 'ws://localhost:8083/mqtt'// 连接选项const options = {clientId,clean: false, // 保持会话,重连后可接收离线消息connectTimeout: 4000,username: 'admin', // 实际环境中应使用JWT令牌password: 'public',keepalive: 30 // 30秒心跳}// 连接到brokerthis.client = mqtt.connect(brokerUrl, options)// 连接成功this.client.on('connect', (connack) => {this.connected = truethis.reconnectTimeout = 1000 // 重置重连延迟console.log('MQTT连接成功', connack)Notification.success({title: '连接成功',message: '已成功连接到消息服务器',duration: 3000})// 订阅用户专属主题const topics = [`user/${this.userId}/notifications`,`user/${this.userId}/recall`]this.client.subscribe(topics, { qos: 2 }, (err) => {if (!err) {console.log(`已订阅主题: ${topics.join(', ')}`)} else {console.error('订阅主题失败:', err)Notification.error({title: '订阅失败',message: '无法订阅通知主题'})}})})// 接收消息this.client.on('message', (topic, message) => {this.handleMessage(topic, message.toString())})// 连接断开this.client.on('close', () => {this.connected = falseconsole.log('MQTT连接已关闭')this.scheduleReconnect()})// 连接错误this.client.on('error', (err) => {this.connected = falseconsole.error('MQTT错误:', err)Notification.error({title: '连接错误',message: '消息服务连接发生错误'})this.scheduleReconnect()})// 重连中this.client.on('reconnect', () => {console.log('正在重连到MQTT服务器...')})}/*** 处理接收到的消息*/handleMessage(topic, payload) {try {const message = JSON.parse(payload)if (topic.endsWith('/recall') && message.isRecall) {// 处理撤回消息console.log('收到撤回指令:', message)if (this.recallCallback) {this.recallCallback(message)}Notification.info({title: '消息已撤回',message: '一条消息已被发送者撤回'})} else if (topic.endsWith('/notifications')) {// 处理通知消息console.log('收到新通知:', message)if (this.notificationCallback) {this.notificationCallback(message)}// 显示通知Notification({title: message.title,message: message.content,type: this.getNotificationTypeIcon(message.type),duration: 5000})}} catch (e) {console.error('解析消息失败:', e)}}/*** 根据通知类型获取对应的图标*/getNotificationTypeIcon(type) {switch (type) {case 'SYSTEM':return 'warning'case 'FRIEND_REQUEST':return 'success'case 'MESSAGE':return 'info'default:return 'info'}}/*** 安排重连*/scheduleReconnect() {if (!this.client.connected) {Notification.warning({title: '连接断开',message: `将在 ${this.reconnectTimeout / 1000} 秒后尝试重连`})// 指数退避重连setTimeout(() => {if (!this.connected) {this.connect()this.reconnectTimeout = Math.min(this.reconnectTimeout * 2, this.maxReconnectTimeout)}}, this.reconnectTimeout)}}/*** 断开连接*/disconnect() {if (this.client) {this.client.end()this.client = nullthis.connected = falseconsole.log('MQTT连接已手动断开')}}/*** 检查连接状态*/isConnected() {return this.connected}
}// 导出单例实例
export default new MqttService()
4.3 API服务封装
import axios from 'axios'
import { Message } from 'element-ui'// 创建axios实例
const api = axios.create({baseURL: 'http://localhost:8080/api',timeout: 10000,headers: {'Content-Type': 'application/json'}
})// 请求拦截器 - 添加认证令牌
api.interceptors.request.use((config) => {const token = localStorage.getItem('token')if (token) {config.headers.Authorization = `Bearer ${token}`}return config},(error) => {return Promise.reject(error)}
)// 响应拦截器 - 处理错误
api.interceptors.response.use((response) => {return response.data},(error) => {const message = error.response?.data?.error || error.message || '请求失败'Message.error(message)return Promise.reject(error)}
)// 通知相关API
export const notificationApi = {/*** 发送通知*/sendNotification: (data) => {return api.post('/notifications', data)},/*** 撤回通知*/recallNotification: (messageId) => {return api.post(`/notifications/${messageId}/recall`)},/*** 获取当前用户的通知列表*/getUserNotifications: (page = 0, size = 20) => {return api.get(`/notifications?page=${page}&size=${size}`)},/*** 标记消息为已读*/markAsRead: (messageId) => {return api.put(`/notifications/${messageId}/read`)},/*** 获取未读消息数量*/getUnreadCount: () => {return api.get('/notifications/unread/count')}
}// 用户认证相关API
export const authApi = {/*** 用户登录*/login: (credentials) => {return api.post('/auth/login', credentials)},/*** 用户注册*/register: (userData) => {return api.post('/auth/register', userData)},/*** 获取当前用户信息*/getCurrentUser: () => {return api.get('/auth/me')}
}export default api
4.4 通知列表组件
<template><div class="notification-list"><el-card><div slot="header" class="card-header"><h2>通知中心</h2><el-badge :value="unreadCount" class="notification-badge"><el-button size="small" @click="markAllAsRead">全部标为已读</el-button></el-badge></div><el-empty v-if="notifications.length === 0 && !loading"description="暂无通知"></el-empty><el-skeleton v-if="loading":rows="5"animated></el-skeleton><el-timeline v-else><el-timeline-item v-for="notification in notifications" :key="notification.messageId":timestamp="formatTime(notification.createdAt)":icon="getIcon(notification.type)":color="getColor(notification.type)"placement="top"><el-card :class="{ 'unread-notification': !notification.read, 'recalled-notification': notification.status === 'RECALLED' }"><div class="notification-header"><h3>{{ notification.title }}</h3><span class="notification-type">{{ getTypeName(notification.type) }}</span></div><div v-if="notification.status === 'RECALLED'" class="recall-message">此消息已被撤回</div><div v-else class="notification-content">{{ notification.content }}</div><div class="notification-actions"><el-button size="mini" type="text" @click="markAsRead(notification.messageId)"v-if="!notification.read && notification.status !== 'RECALLED'">标为已读</el-button><el-button size="mini" type="text" style="color: #F56C6C;"@click="handleRecall(notification.messageId)"v-if="canRecall(notification)">撤回</el-button></div></el-card></el-timeline-item></el-timeline><el-paginationv-if="notifications.length > 0"class="pagination":current-page="currentPage + 1":page-size="pageSize":total="totalNotifications"@current-change="handlePageChange"layout="prev, pager, next"></el-pagination></el-card></div>
</template><script>
import { notificationApi } from '../services/apiService'
import { MessageBox, Message } from 'element-ui'
import mqttService from '../services/mqttService'
import moment from 'moment'export default {name: 'NotificationList',data() {return {notifications: [],loading: true,currentPage: 0,pageSize: 20,totalNotifications: 0,unreadCount: 0,userId: localStorage.getItem('userId')}},mounted() {this.fetchNotifications()this.fetchUnreadCount()this.setupMqttListeners()},methods: {setupMqttListeners() {if (this.userId) {mqttService.init(this.userId,this.handleNewNotification, // 处理新通知this.handleMessageRecall // 处理消息撤回)}},fetchNotifications() {this.loading = truenotificationApi.getUserNotifications(this.currentPage, this.pageSize).then(response => {this.notifications = response.contentthis.totalNotifications = response.totalElements}).catch(error => {console.error('获取通知失败:', error)}).finally(() => {this.loading = false})},fetchUnreadCount() {notificationApi.getUnreadCount().then(count => {this.unreadCount = count}).catch(error => {console.error('获取未读数量失败:', error)})},handleNewNotification(newNotification) {// 添加到列表头部this.notifications.unshift(newNotification)// 更新未读计数this.fetchUnreadCount()},handleMessageRecall(recallMessage) {// 在本地标记消息为已撤回const index = this.notifications.findIndex(n => n.messageId === recallMessage.messageId)if (index !== -1) {this.$set(this.notifications, index, {...this.notifications[index],status: 'RECALLED'})}},markAsRead(messageId) {notificationApi.markAsRead(messageId).then(() => {// 更新本地状态const index = this.notifications.findIndex(n => n.messageId === messageId)if (index !== -1) {this.$set(this.notifications[index], 'read', true)}this.fetchUnreadCount()Message.success('已标记为已读')}).catch(error => {console.error('标记已读失败:', error)})},markAllAsRead() {// 这里简化处理,实际应调用后端批量接口const unreadNotifications = this.notifications.filter(n => !n.read)Promise.all(unreadNotifications.map(notification => notificationApi.markAsRead(notification.messageId))).then(() => {unreadNotifications.forEach((notification, index) => {const localIndex = this.notifications.findIndex(n => n.messageId === notification.messageId)if (localIndex !== -1) {this.$set(this.notifications[localIndex], 'read', true)}})this.fetchUnreadCount()Message.success('全部已标为已读')}).catch(error => {console.error('批量标记已读失败:', error)})},handleRecall(messageId) {MessageBox.confirm('确定要撤回这条消息吗?','确认撤回',{confirmButtonText: '确定',cancelButtonText: '取消',type: 'warning'}).then(() => {return notificationApi.recallNotification(messageId)}).then(result => {if (result) {// 更新本地状态const index = this.notifications.findIndex(n => n.messageId === messageId)if (index !== -1) {this.$set(this.notifications, index, {...this.notifications[index],status: 'RECALLED'})}Message.success('消息已撤回')}}).catch(error => {if (error !== 'cancel') { // 排除用户取消的情况console.error('撤回消息失败:', error)}})},handlePageChange(page) {this.currentPage = page - 1 // 后端页码从0开始this.fetchNotifications()},formatTime(dateString) {return moment(dateString).fromNow()},getIcon(type) {switch (type) {case 'SYSTEM': return 'el-icon-warning'case 'FRIEND_REQUEST': return 'el-icon-user-plus'case 'COMMENT': return 'el-icon-comment'case 'LIKE': return 'el-icon-heart'case 'MENTION': return 'el-icon-at'case 'MESSAGE': return 'el-icon-message'default: return 'el-icon-bell'}},getColor(type) {switch (type) {case 'SYSTEM': return '#e6a23c'case 'FRIEND_REQUEST': return '#52c41a'case 'MESSAGE': return '#1890ff'default: return '#8c8c8c'}},getTypeName(type) {const typeMap = {'SYSTEM': '系统通知','FRIEND_REQUEST': '好友请求','COMMENT': '评论','LIKE': '点赞','MENTION': '提及','MESSAGE': '私信'}return typeMap[type] || type},canRecall(notification) {// 只有自己发送的消息且未撤回的才能撤回return notification.senderId === this.userId && notification.status !== 'RECALLED' &&// 5分钟内可以撤回new Date() - new Date(notification.createdAt) < 5 * 60 * 1000}}
}
</script><style scoped>
.card-header {display: flex;justify-content: space-between;align-items: center;
}.notification-badge {margin-left: 10px;
}.notification-header {display: flex;justify-content: space-between;align-items: center;margin-bottom: 10px;
}.notification-type {font-size: 12px;padding: 2px 8px;border-radius: 12px;background-color: #f0f2f5;
}.notification-content {margin-bottom: 10px;line-height: 1.6;
}.recall-message {margin-bottom: 10px;padding: 10px;background-color: #fff1f0;border-radius: 4px;color: #f5222d;font-size: 14px;
}.notification-actions {display: flex;justify-content: flex-end;gap: 10px;
}.unread-notification {border-left: 3px solid #1890ff;
}.recalled-notification {opacity: 0.7;
}.pagination {margin-top: 20px;text-align: center;
}
</style>
4.5 发送通知组件
<template><el-card><div slot="header"><h2>发送通知</h2></div><el-form ref="form" :model="form" :rules="rules" label-width="100px"class="send-notification-form"><el-form-item label="接收用户ID" prop="userId"><el-input v-model="form.userId" placeholder="请输入接收用户ID"></el-input></el-form-item><el-form-item label="通知类型" prop="type"><el-select v-model="form.type" placeholder="请选择通知类型"><el-option label="系统通知" value="SYSTEM"></el-option><el-option label="好友请求" value="FRIEND_REQUEST"></el-option><el-option label="评论" value="COMMENT"></el-option><el-option label="点赞" value="LIKE"></el-option><el-option label="提及" value="MENTION"></el-option><el-option label="私信" value="MESSAGE"></el-option></el-select></el-form-item><el-form-item label="优先级" prop="priority"><el-slider v-model="form.priority" :min="0" :max="9" :step="1"show-input></el-slider><div class="priority-info"><span>低</span><span class="priority-high">高</span></div></el-form-item><el-form-item label="标题" prop="title"><el-input v-model="form.title" placeholder="请输入通知标题"></el-input></el-form-item><el-form-item label="内容" prop="content"><el-input v-model="form.content" type="textarea" :rows="4" placeholder="请输入通知内容"></el-input></el-form-item><el-form-item><el-button type="primary" @click="submitForm">发送通知</el-button><el-button @click="resetForm">重置</el-button></el-form-item></el-form></el-card>
</template><script>
import { notificationApi } from '../services/apiService'
import { Message } from 'element-ui'export default {name: 'SendNotification',data() {return {form: {userId: '',type: '',priority: 5,title: '',content: ''},rules: {userId: [{ required: true, message: '请输入接收用户ID', trigger: 'blur' }],type: [{ required: true, message: '请选择通知类型', trigger: 'change' }],title: [{ required: true, message: '请输入通知标题', trigger: 'blur' },{ max: 50, message: '标题长度不能超过50个字符', trigger: 'blur' }],content: [{ required: true, message: '请输入通知内容', trigger: 'blur' },{ max: 500, message: '内容长度不能超过500个字符', trigger: 'blur' }]}}},methods: {submitForm() {this.$refs.form.validate((valid) => {if (valid) {notificationApi.sendNotification(this.form).then(messageId => {Message.success(`通知发送成功,消息ID: ${messageId}`)this.resetForm()}).catch(error => {console.error('发送通知失败:', error)Message.error('发送通知失败,请稍后重试')})}})},resetForm() {this.$refs.form.resetFields()}}
}
</script><style scoped>
.send-notification-form {max-width: 600px;
}.priority-info {display: flex;justify-content: space-between;margin-top: -10px;margin-bottom: 10px;font-size: 12px;color: #606266;
}.priority-high {color: #f56c6c;
}
</style>
4.6 主页面组件
<template><div class="home-container"><el-row :gutter="20"><el-col :span="12"><send-notification /></el-col><el-col :span="12"><div class="connection-status"><el-card><div slot="header"><h2>连接状态</h2></div><div class="status-info"><el-descriptions column="1"><el-descriptions-item label="MQTT连接状态"><el-tag :type="mqttConnected ? 'success' : 'danger'">{{ mqttConnected ? '已连接' : '未连接' }}</el-tag></el-descriptions-item><el-descriptions-item label="当前用户">{{ currentUser || '未登录' }}</el-descriptions-item><el-descriptions-item label="未读消息"><el-badge :value="unreadCount" class="item"><span>{{ unreadCount }} 条</span></el-badge></el-descriptions-item></el-descriptions><el-button v-if="!mqttConnected && currentUser" type="primary" @click="reconnectMqtt"style="margin-top: 15px;">重新连接</el-button></div></el-card></div></el-col></el-row><el-row style="margin-top: 20px;"><el-col :span="24"><notification-list /></el-col></el-row></div>
</template><script>
import SendNotification from '../components/SendNotification.vue'
import NotificationList from '../components/NotificationList.vue'
import mqttService from '../services/mqttService'
import { authApi, notificationApi } from '../services/apiService'export default {name: 'Home',components: {SendNotification,NotificationList},data() {return {mqttConnected: false,currentUser: '',unreadCount: 0,statusCheckInterval: null}},mounted() {this.checkLoginStatus()this.setupMqttStatusListener()this.fetchUnreadCount()// 每30秒刷新一次未读数量setInterval(() => {this.fetchUnreadCount()}, 30000)},beforeDestroy() {// 清除定时器if (this.statusCheckInterval) {clearInterval(this.statusCheckInterval)}},methods: {checkLoginStatus() {authApi.getCurrentUser().then(user => {this.currentUser = user.idlocalStorage.setItem('userId', user.id)}).catch(error => {console.log('用户未登录或会话已过期')// 这里可以跳转到登录页})},setupMqttStatusListener() {// 定期检查连接状态this.statusCheckInterval = setInterval(() => {this.mqttConnected = mqttService.isConnected()}, 1000)},fetchUnreadCount() {notificationApi.getUnreadCount().then(count => {this.unreadCount = count}).catch(error => {console.error('获取未读数量失败:', error)})},reconnectMqtt() {if (this.currentUser) {mqttService.init(this.currentUser, () => {}, () => {})}}}
}
</script><style scoped>
.home-container {padding: 20px;
}.connection-status {height: 100%;
}.status-info {padding: 10px 0;
}
</style>
4.7 入口文件配置
import Vue from 'vue'
import App from './App.vue'
import router from './router'
import ElementUI from 'element-ui'
import 'element-ui/lib/theme-chalk/index.css'// 全局注册Element UI
Vue.use(ElementUI)Vue.config.productionTip = falsenew Vue({router,render: h => h(App)
}).$mount('#app')
4.8 路由配置
import Vue from 'vue'
import VueRouter from 'vue-router'
import Home from '../views/Home.vue'Vue.use(VueRouter)const routes = [{path: '/',name: 'Home',component: Home},{path: '/login',name: 'Login',// 懒加载登录组件component: () => import('../views/Login.vue')}
]const router = new VueRouter({mode: 'history',base: process.env.BASE_URL,routes
})export default router
主要改动说明
从Vue3迁移到Vue2的主要变化包括:
-
Composition API → Options API:将Vue3的
setup()
语法转换为Vue2的选项式API,使用data()
、methods
、mounted
等选项。 -
响应式系统变化:
- 使用
this.$set
进行响应式数据更新,替代Vue3的ref
和reactive
- 移除
watchEffect
,使用setInterval
实现定时任务
- 使用
-
组件注册方式:在Vue2中需要显式注册组件,通过
components
选项。 -
Element UI替代Element Plus:
- 导入和使用Element UI而非Element Plus
- 调整部分组件的属性和事件(如
type="text"
替代text
属性)
-
路由配置:Vue2使用Vue Router 3.x版本,与Vue3使用的4.x版本在配置上略有差异。
-
工具库替换:使用
moment.js
替代date-fns
处理日期格式化,更符合Vue2生态习惯。 -
模板语法调整:保持基本一致,但移除了Vue3特有的语法如
v-model
参数。
这个Vue2版本的实现与之前的Vue3版本功能完全一致,包括实时消息推送、消息优先级、消息撤回、重试机制和连接稳定性保障等核心功能,同时保持了相同的UI风格和用户体验。