WebSocket实时通信系统——js技能提升
2. WebSocket实时通信系统
功能概述
实现完整的WebSocket通信系统,支持实时消息推送、连接管理、心跳检测和自动重连。
技术难点
- WebSocket连接生命周期管理
- 消息序列化和反序列化
- 心跳机制和连接保活
- 错误处理和重连策略
- 多组件状态同步
实现思路
2.1 WebSocket管理器
// src/utils/websocket.ts
export interface WebSocketOptions {url: stringonConnect?: () => voidonDisconnect?: () => voidonMessage?: (message: any) => voidonError?: (error: any) => void
}export class WebSocketManager {private ws: WebSocket | null = nullprivate options: WebSocketOptionsprivate reconnectAttempts = 0private maxReconnectAttempts = 5private reconnectInterval = 1000private heartbeatInterval: NodeJS.Timeout | null = nullconstructor(options: WebSocketOptions) {this.options = options}// 连接WebSocketconnect(): Promise<void> {return new Promise((resolve, reject) => {try {this.ws = new WebSocket(this.options.url)this.ws.onopen = () => {console.log('✅ WebSocket 连接成功')this.reconnectAttempts = 0this.startHeartbeat()this.options.onConnect?.()resolve()}this.ws.onclose = (event) => {console.log('🔌 WebSocket 连接断开:', event.code, event.reason)this.stopHeartbeat()this.options.onDisconnect?.()// 自动重连逻辑if (this.reconnectAttempts < this.maxReconnectAttempts) {this.reconnectAttempts++setTimeout(() => {this.connect().catch(console.error)}, this.reconnectInterval * this.reconnectAttempts)}}this.ws.onerror = error => {console.error('❌ WebSocket 错误:', error)this.options.onError?.(error)reject(error)}this.ws.onmessage = event => {try {const data = JSON.parse(event.data)this.options.onMessage?.(data)} catch (err) {console.warn('⚠ 消息解析失败:', event.data)this.options.onMessage?.(event.data)}}} catch (error) {console.error('❌ WebSocket 连接失败:', error)reject(error)}})}// 发送消息send(data: any): void {if (this.ws && this.ws.readyState === WebSocket.OPEN) {const message = typeof data === 'string' ? data : JSON.stringify(data)this.ws.send(message)} else {throw new Error('WebSocket未连接')}}// 心跳机制private startHeartbeat(): void {this.heartbeatInterval = setInterval(() => {if (this.ws && this.ws.readyState === WebSocket.OPEN) {this.send({ type: 'heartbeat', timestamp: Date.now() })}}, 30000) // 30秒发送一次心跳}private stopHeartbeat(): void {if (this.heartbeatInterval) {clearInterval(this.heartbeatInterval)this.heartbeatInterval = null}}// 断开连接disconnect(): void {this.stopHeartbeat()if (this.ws) {this.ws.close()this.ws = null}}// 获取连接状态get isConnected(): boolean {return this.ws?.readyState === WebSocket.OPEN}
}
2.2 Vue组合式API封装
// src/hooks/useWebSocket.ts
export function useWebSocket(options: WSOptions) {const socketManager = ref<WebSocketManager | null>(null)const isConnected = ref(false)const connect = async (params?: WSOptions) => {if (!useCookie('token').value) returnconst innerParams = params || optionssocketManager.value = createWebSocketManager(typeof innerParams === 'function' ? innerParams() : innerParams)try {await socketManager.value?.connect()isConnected.value = true// 设置断开连接回调socketManager.value?.setDisconnectCallback(() => {isConnected.value = falseconst { message } = useGlobalComponent()message.error('网络连接断开,请刷新页面')})} catch (error) {console.error('WebSocket连接失败:', error)isConnected.value = false}}// 发送消息const sendMessage = <T>(data: T) => {if (!socketManager.value?.isConnected) {const { message } = useGlobalComponent()message.error('网络连接断开,请刷新页面')return}socketManager.value?.send(data)}const disconnect = () => {socketManager.value?.disconnect()isConnected.value = false}onUnmounted(disconnect)return {sendMessage,connect,disconnect,isConnected: () => isConnected.value,}
}
2.3 聊天状态管理集成
// src/stores/chat/methods.ts
export const useChatStore = defineStore('chat', () => {const { sendMessage, connect, disconnect, isConnected } = useWebSocket(() => ({url: `ws://192.168.201.201:8088/api/websocket?token=${useCookie('token').value}`,onMessage: msg => {if (msg.event_id !== state.list[state.list.length - 1]?.event_id) returnstate.list = onMessage(msg, state.list)if (state.isFirst &&[NotificationType.FINISH, NotificationType.END].includes(msg?.data?.type)) {getChatSummary(msg.data.session_id).then(res => {state.title = resstate.isFirst = false})}},}))const send = (data: string, id: number) => {const lastMsg = state.list[state.list.length - 1]let callerInstanceId = ''if (lastMsg && 'caller' in lastMsg) {callerInstanceId = (lastMsg?.caller as { instance_id: string })?.instance_id}const msg = createUserMessage(data, id, callerInstanceId)const question = createResponseMessage(data, id, callerInstanceId)if (state.list.length) {if (state.list[state.list.length - 1]?.session_id === id) {state.list = [...state.list, question]} else {state.list = [question]state.isFirst = true}} else {state.list = [question]state.isFirst = true}sendMessage(msg)return question}return {send,isConnected,}
})
关键技术点
- 连接管理: 完整的连接生命周期管理
- 自动重连: 指数退避重连策略
- 心跳机制: 保持连接活跃状态
- 错误处理: 完善的错误捕获和用户提示
- 状态同步: 多组件间的连接状态同步