当前位置: 首页 > news >正文

WebSocket功能完整解析

WebSocket功能完整解析

概述

使用WebSocket实现实时双向通信。整个WebSocket系统设计得非常完善,包含了连接管理、消息处理、状态同步等完整功能。

架构设计

1. 分层架构

┌─────────────────┐
│   页面组件层     │  (chat/[id].tsx)
├─────────────────┤
│   状态管理层     │  (stores/chat)
├─────────────────┤
│   Hook封装层     │  (hooks/useWebSocket)
├─────────────────┤
│   工具类层       │  (utils/websocket.ts)
├─────────────────┤
│   WebSocket API  │  (浏览器原生API)
└─────────────────┘

核心组件详解

1. WebSocket管理器 (src/utils/websocket.ts)

这是整个WebSocket系统的核心底层,负责与浏览器原生WebSocket API交互。

主要功能:
export class WebSocketManager {private ws: WebSocket | null = nullprivate options: WebSocketOptions// 连接WebSocketconnect(): Promise<void>// 断开连接disconnect(): void// 发送消息send(data: any): void// 获取连接状态get isConnected(): boolean// 设置各种回调函数setMessageCallback(callback: (message: any) => void): voidsetConnectCallback(callback: () => void): voidsetDisconnectCallback(callback: () => void): void
}
关键特性:
  • Promise化连接:使用Promise包装连接过程,便于异步处理
  • 自动JSON解析:接收消息时自动解析JSON格式
  • 错误处理:完善的错误处理和日志记录
  • 状态管理:实时跟踪连接状态

2. WebSocket Hook (src/hooks/useWebSocket.ts)

这是Vue 3 Composition API的封装层,将WebSocket管理器与Vue组件生命周期结合。

主要功能:
export function useWebSocket(options: WSOptions) {const socketManager = ref<WebSocketManager | null>(null)// 连接WebSocketconst connect = async (params?: WSOptions) => {// 检查tokenif (!useCookie('token').value) return// 创建连接管理器socketManager.value = createWebSocketManager(typeof innerParmas === 'function' ? innerParmas() : innerParmas)// 建立连接await socketManager.value?.connect()// 设置断开连接回调socketManager.value?.setDisconnectCallback(() => {const { message } = useGlobalComponent()message.error('网络连接断开,请刷新页面')})}// 发送消息const sendMessage = <T>(data: T) => {if (!socketManager.value?.isConnected) {const { message } = useGlobalComponent()message.error('网络连接断开,请刷新页面')}socketManager.value?.send(data)}// 断开连接const disconnect = () => {socketManager.value?.disconnect()}// 检查连接状态const isConnected = () => socketManager.value?.isConnected// 组件卸载时自动断开连接onUnmounted(disconnect)return { sendMessage, connect, disconnect, isConnected }
}
关键特性:
  • 自动生命周期管理:组件卸载时自动断开连接
  • Token验证:连接前检查用户token
  • 错误提示:连接断开时自动显示错误信息
  • 类型安全:完整的TypeScript类型支持

3. 聊天状态管理 (src/stores/chat/methods.ts)

这是业务逻辑层,使用Pinia管理聊天相关的状态和操作。

核心状态:
const state = reactive<ChatState>({list: [],           // 消息列表title: '',          // 会话标题isFirst: false,     // 是否首次会话agentState: {       // Agent状态content: 'switch Agent can complete the selection process',show: false,modelType: 'model',thinking: false,network: false,},
})
WebSocket集成:
const { sendMessage, connect, disconnect, isConnected } = useWebSocket(() => ({// WebSocket服务器地址url: `ws://192.168.201.49:8088/api/websocket?token=${useCookie('token').value}`,// 消息处理回调onMessage: msg => {// 处理接收到的消息state.list = onMessage(msg, state.list)// 处理会话完成逻辑if (state.isFirst && [NotificationType.FINISH, NotificationType.END].includes(msg?.data?.type)) {getChatSummary(msg.data.session_id).then(res => {state.title = resstate.isFirst = false})}},})
)
发送消息功能:
const send = (data: string, id: number) => {const lastMsg = state.list[state.list.length - 1]let callerInstanceId = ''// 获取调用者实例IDif (lastMsg && 'caller' in lastMsg) {callerInstanceId = (lastMsg?.caller as { instance_id: string })?.instance_id}// 创建用户消息和响应消息const msg = createUserMessage(data, id, callerInstanceId)const question = createResponseMessage(data, id, callerInstanceId)// 更新消息列表if (state.list.length) {if (state.list[state.list.length - 1]?.session_id === id) {state.list = [...state.list, question]} else {state.list = [question]state.isFirst = true}} else {state.list = [question]state.isFirst = true}// 发送WebSocket消息sendMessage(msg)return question
}
路由监听:
watch(() => route.path,() => {if (import.meta.server) return// 重置状态setTitle('')state.list = []// 检查tokenif (!useCookie('token').value) {return disconnect()}// 如果已连接则跳过if (isConnected()) return// 建立连接connect()},{ immediate: true }
)

4. 消息处理工具 (src/stores/chat/utils.ts)

这是消息处理的核心逻辑,负责解析、转换和更新消息。

消息创建函数:

创建用户消息:

export function createUserMessage(content: string,session_id: number,instance_id?: string
): ChatMessageWrapper {return {direction: ChatDirection.INPUT,type: ChatType.CHAT,data: {session_id,type: 'user_input',content: [{ id: 0, step: 0, type: 'text', content }],agents: [{agent_id: 'agent-commander',custom_id: 'agent-commander',published_version: '1.0.0',url: 'local',type: '',agent_provider_id: 2106919896481796,description: '',name: '',}],tools: [],caller: { type: 'user', instance_id: instance_id || '' },},}
}

创建响应消息:

export function createResponseMessage(content: string,session_id: number,instance_id?: string
): ResponseMessageData {return {id: dayjs().valueOf(),type: NotificationType.QUESTION,content: [{ id: 0, type: 'text', content, step: 0 }],timestamp: Date.now(),read: false,session_id,event_id: '',task_id: 0,agent_id: '',agent_instance_id: 0,parent_agent_instance_id: 0,step: 0,artifact: [],action: {},ext: {},call_batch_id: '',caller_type: 'user',caller_instance_id: instance_id || '',user_id: '',stream_uuid: '',tree: [],has_todo: false,}
}
核心消息处理函数:
export function onMessage(msg: ChatMessageWrapper,messages: ResponseMessageData[]
) {const data = msg.data as ResponseMessageDataif (msg.type === ChatType.CHAT && data.type) {if (data.type === NotificationType.START) {// 开始新会话return [...messages, { ...data, tree: [] }]} else {// 更新现有消息const targetIndex = messages.findIndex(i => i.event_id === data.event_id)if (targetIndex === -1) return messagesconst newMessages = [...messages]const targetData = newMessages[targetIndex]!const target = [...targetData.content]// 处理内容更新(关键:累加内容)if (data.type !== NotificationType.AGENT_START) {data.content.forEach(item => {const targetContentItemIndex = target.findIndex(i =>i.id === data.agent_instance_id &&i.type === item.type &&i.step === data.step)if (targetContentItemIndex !== -1) {// 累加内容(实现流式渲染)const currentItem = target[targetContentItemIndex]!target[targetContentItemIndex] = {...currentItem,content: currentItem.content + item.content,}} else {// 添加新内容项target.push({id: data.agent_instance_id,type: item.type,content: item.content,step: data.step,})}})}// 处理附件data.artifact.forEach(item => {// 处理附件状态更新逻辑})// 更新树形结构createTree(targetData.tree, data)const newData = { ...data, tree: [...targetData.tree], content: target }newMessages[targetIndex] = newDatareturn newMessages}}return messages
}

5. 树形结构管理 (src/stores/chat/utils.ts)

负责管理AI Agent的执行流程树形结构。

export function createTree(tree: ResponseMessageData['tree'],data: ResponseMessageData
) {const node: (typeof tree)[0] = {plan_id: data.agent_instance_id,type: 'base',loading: true,parent_plan_id: data.parent_agent_instance_id,child_plan_ids: [],content: [],status: 'start',label: data.agent_id,timestamp: data.timestamp,}// 处理不同类型的消息switch (data.type) {case NotificationType.AGENT_START:// Agent开始执行node.content = data.content.reduce((acc, item) => {if (item.type === 'text') {acc.push({id: data.agent_instance_id,type: item.type,content: item.content,step: data.step,})}return acc}, [] as (typeof tree)[0]['content'])tree.push(node)breakcase NotificationType.AGENT_END:// Agent执行结束target.loading = falsetarget.status = 'end'breakcase NotificationType.ARTIFACT_START:// 附件处理开始tree.push(node)breakcase NotificationType.ARTIFACT_END:// 附件处理结束target.loading = falsetarget.status = 'end'breakcase NotificationType.ARTIFACT_RESULT:// 附件处理结果target.content = data.content.map(item => ({id: data.agent_instance_id,type: 'artifact',content: JSON.stringify(item),step: data.step,status: 'end',}))break}// 更新父子关系tree.forEach((item, index) => {if (!index) {item.child_plan_ids = []return}if (item.parent_plan_id === -1) {tree[0]!.child_plan_ids.push(item.plan_id)}item.child_plan_ids = tree.reduce((acc, curr) => {if (curr.parent_plan_id === item.plan_id) {acc.push(curr.plan_id)}return acc}, [] as number[])})
}

消息类型系统

1. 基础枚举类型 (src/types/chat.ts)

// WebSocket消息类型
export enum ChatType {CHAT = 'chat',           // 聊天消息HEADRBEAT = 'heartbeat', // 心跳CONNECTED = 'connected', // 连接FINISH = 'finish',       // 正常结束END = 'end',             // SSE断开结束START = 'start',         // 开始
}// 通知类型(业务消息类型)
export enum NotificationType {QUESTION = 'question',           // 用户问题START = 'start',                 // 开始MODEL_START = 'model_start',     // 模型开始MODEL_RESULT = 'model_result',   // 模型结果MODEL_END = 'model_end',         // 模型结束AGENT_START = 'agent_start',     // Agent开始AGENT_RESULT = 'agent_result',   // Agent结果AGENT_END = 'agent_end',         // Agent结束ARTIFACT_START = 'artifact_start', // 附件开始ARTIFACT_RESULT = 'artifact_result', // 附件结果ARTIFACT_END = 'artifact_end',   // 附件结束FINISH = 'finish',               // 完成ERROR = 'error',                 // 错误END = 'end',                     // 结束
}

2. 消息数据结构

// 响应消息数据
export interface ResponseMessageData {has_todo: booleanid: numbertype: NotificationTypecontent: ContentItem[]           // 消息内容数组timestamp: numberread: booleansession_id: numberevent_id: stringtask_id: numberagent_id: stringagent_instance_id: numberparent_agent_instance_id: numberstep: numberartifact: Artifact[]            // 附件数组action: {}ext: {}call_batch_id: stringcaller_type: 'user' | 'agent' | 'event_source' | 'unknown'caller_instance_id: stringuser_id: stringstream_uuid: stringtree: WSTreeNode[]              // 树形结构
}// 内容项
export interface ContentItem {id: numbertype: 'text' | 'think' | 'artifact' | 'image_url' | 'image_base64'status?: 'start' | 'end'content: stringread?: booleanstep: number
}// 树节点
export interface WSTreeNode {plan_id: numbertype: 'start' | 'base'loading: booleanparent_plan_id: number | nullchild_plan_ids: number[]content: ContentItem[]status: 'start' | 'end'label: stringtimestamp: number
}

页面集成

1. 聊天页面 (src/pages/chat/[id].tsx)

// 发送消息回调
const sendCallback = (val: string) => {console.log('发送消息123123:', val)// 检查WebSocket连接状态if (!chatStore.isConnected()) {return message.error('WebSocket未连接, 无法发送消息')}if (!route.params.id) return// 发送消息并更新UIchatStore.value.list = [...chatStore.value.list,chatStore.send(val, Number(route.params.id)),]
}// 监听消息变化,更新画布
watch(() => chatStore.value.list,val => {if (val[val.length - 1]?.tree) {if (timer) returntimer = setTimeout(() => {timer = nullconst { tree } = chatStore.value.list[chatStore.value.list.length - 1]!state.data = generateVueFlowData(tree)  // 生成画布数据state.loading = false}, 300)  // 防抖处理}}
)

SSE支持

项目还支持Server-Sent Events (SSE)作为备选方案:

1. SSE连接类 (src/utils/sse.ts)

export class SSEConnection {private eventSource: EventSource | null = nullprivate messageCallback?: (data: any) => void// 连接SSEconnect(): Promise<void>// 断开连接disconnect(): void// 设置消息回调setMessageCallback(callback: (data: any) => void): void// 获取连接状态get isConnected(): boolean
}

2. SSE API (src/api/chat/method.ts)

// 创建SSE连接
export function createChatSSE(sessionId: string,options?: {onMessage?: (data: any) => void}
): SSEConnection {const baseURL = '/api'const sseUrl = `${baseURL}/events/${sessionId}`return createSSEConnection(sseUrl, options)
}

完整工作流程

1. 连接建立流程

1. 用户访问聊天页面
2. 检查用户token
3. 创建WebSocket连接管理器
4. 建立WebSocket连接
5. 设置消息处理回调
6. 设置断开连接回调

2. 消息发送流程

1. 用户在输入框输入消息
2. 点击发送按钮
3. 检查WebSocket连接状态
4. 创建用户消息对象
5. 创建响应消息对象
6. 更新本地消息列表
7. 通过WebSocket发送消息
8. 触发UI更新

3. 消息接收流程

1. WebSocket接收到服务器消息
2. 解析JSON数据
3. 调用onMessage处理函数
4. 根据消息类型进行不同处理
5. 更新消息列表状态
6. 触发Vue响应式更新
7. 更新UI显示
8. 如果是流式消息,累加内容实现打字机效果

4. 流式渲染机制

1. 服务器发送部分内容
2. 前端接收并累加到现有内容
3. 触发Vue响应式更新
4. TextChunk组件检测内容变化
5. 启动打字机动画
6. 逐字显示内容
7. 动画完成后停止
8. 等待下一批内容

关键特性总结

1. 实时双向通信

  • WebSocket提供全双工通信
  • 支持实时消息发送和接收

2. 流式渲染

  • 内容累加机制实现流式显示
  • 打字机效果提升用户体验

3. 状态管理

  • Pinia管理全局状态
  • 响应式更新确保UI同步

4. 错误处理

  • 完善的连接错误处理
  • 自动重连和用户提示

5. 生命周期管理

  • 自动连接和断开
  • 组件卸载时清理资源

6. 类型安全

  • 完整的TypeScript类型定义
  • 编译时错误检查

7. 扩展性

  • 支持多种消息类型
  • 模块化设计便于扩展

这个WebSocket系统设计得非常完善,既保证了功能的完整性,又考虑了性能和用户体验。通过分层架构和模块化设计,代码结构清晰,易于维护和扩展。

http://www.dtcms.com/a/357217.html

相关文章:

  • Linux系统——EXT2 文件系统
  • 【论文阅读】Sparse4D v2:Recurrent Temporal Fusion with Sparse Model
  • HTML 和 JavaScript 关联的基础教程
  • Emeditor 提取IP地址正则表达式
  • 音视频直播卡顿分析与优化:技术原理、实践案例与未来趋势
  • 如何使用 Graylog 连接 Easysearch
  • vue3+wangEditor实现富文本编辑器
  • 【黑客技术零基础入门】黑客入门教程(非常详细)从零基础入门到精通,看完这一篇就够了
  • Java面试现场:Spring Boot+Redis+MySQL在电商场景下的技术深度剖析
  • 机器学习复习
  • 使用 C# 复制 Word 文档内容 - 页面、节、段落、表格、页眉页脚等
  • 对接连连支付(八)-- 支付订单关闭
  • 52-容器总结与应用
  • LeetCode259~282题解
  • 使用STM32CubeMX使用CAN驱动无刷电机DJI3508
  • 多智能体框架(下)
  • 【系列03】端侧AI:构建与部署高效的本地化AI模型 第2章:端侧AI硬件入门
  • c++ 右值引用
  • 从零开始的python学习——常量与变量
  • 【STM32外设】ADC
  • OSS Nginx 反代提示 SignatureDoesNotMatch
  • 网络_协议
  • (十)ps识别:Swin Transformer-T 与 ResNet50 结合的 PS 痕迹识别模型训练过程解析
  • 链表有环找入口节点原理
  • Vue3 + TS + MapboxGL.js 三维地图开发项目
  • Marin说PCB之POC电路layout设计仿真案例---11
  • Jenkins Pipeline(二)-设置Docker Agent
  • 渲染速度由什么决定?四大关键因素深度解析
  • 【拍摄学习记录】07-影调、直方图量化、向右向左
  • Docker部署openai-edge-tts和即梦API以及应用案例