当前位置: 首页 > news >正文

LLM对话框项目 EventSource封装和MessageServiceClass流式展示封装

你这段代码实现了一个对浏览器原生 EventSource 的封装,主要目的是更方便地管理服务端事件推送(Server-Sent Events,简称 SSE),如自动构建 URL、配置管理、连接控制等。


EventSourceAPI

✅ 面试时介绍建议(逻辑清晰)

面试官您好,我封装了一个 EventSourceWrapper 类,用于统一管理 SSE(Server-Sent Events)连接。这是一个单例模式实现,确保在整个应用生命周期中只有一个 SSE 实例。以下是它的几个核心设计点:
封装了一个 EventSourceWrapper 类,用于统一管理客户端与服务端之间的 SSE(Server-Sent Events)连接。这个类采用单例模式确保全局只存在一个长连接实例,避免重复连接的问题。它支持配置参数的动态更新通过构造函数进行依赖注入),自动构建带查询参数的连接 URL,并对 onopen、onmessage、onerror 和关闭事件做了统一的封装处理。考虑到原生 SSE 的自动重连机制较为有限(不能监听重连次数、不知道是否已经掉线 、不可自定义重连逻辑等),我还设计了一个可选的自定义重连机制,支持最大重试次数、指数退避等策略,使连接管理更加可控和健壮。此外,它还提供了 reconnect 接口,便于业务层在需要时手动触发重连。


1. 单例模式保证全局唯一性

  • 类中通过静态方法 getESWrapperInstance 创建或复用唯一实例。
  • 适用于聊天类应用中长连接唯一的场景,避免资源浪费或重复连接
static getESWrapperInstance(): EventSourceWrapper

2. 配置灵活、支持动态更新

  • 初始配置通过构造函数传入(依赖注入),内部通过 setConfig 实现对原配置的合并更新(特别是 queryParams 的深层合并)。
setConfig(config: Partial<EventSourceConfig>)

3. 自动构建连接 URL

  • 封装了 buildUrl 方法,根据 queryParams 自动生成合法 SSE 请求地址。

4. 事件监听器封装清晰

  • onopen, onmessage, onerror, close 全部通过统一接口回调管理。
  • 错误处理逻辑健壮,连接异常自动关闭,并调用 onError 通知业务层。
  • 提供 reconnect 方法便于手动重连。

5. 易于拓展

  • 可以按需添加 onRetry, onReconnect 等回调,拓展性强。

//utils/EventSourceWrapper.ts
import {EventSourceConfig} from "../type/EventSourceConfig";export class EventSourceWrapper {private eventSource: EventSource | null = null;private config: EventSourceConfig;private isActive: boolean = false;private static instance:EventSourceWrapper|null=null;private retryCount = 0;private reconnectTimer: ReturnType<typeof setTimeout> | null = null;constructor(options: EventSourceConfig) {this.config = {withCredentials: false,...options};}static getESWrapperInstance():EventSourceWrapper{if(!this.instance) {this.instance = new EventSourceWrapper({url: "/dev-api/chatStream",queryParams: {content: "",contentType: "",},onError: (err) => {console.log('Error occurred:', err);}});}return this.instance;}getConfig():EventSourceConfig{return this.config;}//Partial 是 TypeScript 中的一个工具类型,用于创建一个新类型,其中所有属性都是可选的。setConfig(config: Partial<EventSourceConfig>): void {this.config = {...this.config, // 保留原有配置...config,      // 覆盖新配置queryParams: {  // 针对嵌套对象特殊处理...this.config?.queryParams,...config?.queryParams},};}private scheduleReconnect(): void {const {autoReconnect = false,maxRetries = 5,retryInterval = 2000,backoffMultiplier = 2} = this.config;if (!autoReconnect) return;if (this.retryCount >= maxRetries) {console.warn('SSE max retries reached');this.config.onError?.(new Error('Max reconnect attempts reached'));return;}const delay = retryInterval * Math.pow(backoffMultiplier, this.retryCount);console.log(`Retry #${this.retryCount + 1} in ${delay}ms`);this.reconnectTimer = setTimeout(() => {this.retryCount++;this.connect();}, delay);}private buildUrl(): string {const { url, queryParams } = this.config;return queryParams? `${url}?${new URLSearchParams(queryParams)}`: url;}connect(): void {if (this.isActive) return;try {this.isActive = true;const url = this.buildUrl();this.eventSource = new EventSource(url, {withCredentials: this.config.withCredentials});this.eventSource.onopen = (event) => {console.log('SSE connection established', event);};this.eventSource.onmessage = ({ data }) => {this.config.onMessage?.(data);this.retryCount = 0;if (this.reconnectTimer) {clearTimeout(this.reconnectTimer);this.reconnectTimer = null;}};//解答完一次为什么都会报错this.eventSource.onerror = (event) => {console.error('SSE connection error:', event);this.close();this.config.onError?.(new Error('Connection failed'));this.scheduleReconnect(); // 自定义重连};this.eventSource.addEventListener('close', () => {this.close();// console.log("close")this.config.onEnd?.("SSE connection close");});} catch (error) {this.close();this.config.onError?.(error instanceof Error ? error : new Error('Connection error'));}}close(): void {if (this.eventSource) {this.eventSource.close();this.eventSource = null;this.isActive = false;}if (this.reconnectTimer) {clearTimeout(this.reconnectTimer);this.reconnectTimer = null;}}// 手动重新连接(如果需要)reconnect(): void {this.close();this.connect();}
}

✅ Mermaid 类图

EventSourceWrapper
-eventSource: EventSource
-config: EventSourceConfig
-isActive: boolean
-retryCount: number
-reconnectTimer: Timeout
-static instance: EventSourceWrapper
+constructor(config: EventSourceConfig)
+connect()
+close()
+reconnect()
+getConfig()
+setConfig(config: Partial)
+static getESWrapperInstance()
-buildUrl()
-scheduleReconnect()
EventSourceConfig
+url: string
+queryParams?: Record<string, string>
+withCredentials?: boolean
+autoReconnect?: boolean
+maxRetries?: number
+retryInterval?: number
+backoffMultiplier?: number
+onMessage?:(data)
+onError?:(error)
+onEnd?:(msg: string)

✅ 面试官可能问的问题及回应建议

面试官提问应答建议
为什么使用单例?保证全局只有一个连接实例,避免多个 SSE 并发浪费资源或产生冲突。
错误处理怎么做的?onerror 中做了关闭连接和触发回调,并可通过 reconnect() 手动重连。
如何支持动态传参?通过 setConfig 合并旧配置,支持 queryParams 动态更新。
线程安全问题?由于前端 JS 是单线程的,不涉及并发修改,设计上已满足需求。

MessageServiceClass

你这段代码是一个非常清晰、职责明确的封装,负责处理“用户发送消息 -> 通过 SSE 获取 AI 响应 -> 流式展示结果”的完整流程。


// src/services/MessageService.ts
import { v4 as uuidv4 } from 'uuid';
import {ContentType} from "../type/Info.ts";
import {EventSourceWrapper} from "./EventSourceWrapper.ts";
import {ChatMessage} from "../type/EventSourceConfig.ts";type MessageHandler = {setHistoryContent: (updater: (prev: ChatMessage[]) => ChatMessage[]) => void;setContent: (content: string) => void;setStreamId: (id: string | null) => void;esInstance: EventSourceWrapper; // SSE连接配置类型需根据实际SDK定义
};export class MessageService {private botMessageId: string | null = null;constructor(private handler: MessageHandler) {}public send = async (content: string) => {if (!this.validateInput(content)) return;this.addUserMessage(content);this.prepareBotResponse();try {await this.establishSSEConnection(content);} catch (error) {this.handleError(error as Error);} finally {this.cleanup();}};private validateInput = (content: string): boolean => {const trimmed = content.trim();if (!trimmed) {console.warn('空消息被拦截');return false;}return true;};private addUserMessage = (content: string) => {this.handler.setHistoryContent(prev => [...prev,{ id: uuidv4(), author: 'User', text: content }]);};private prepareBotResponse = () => {this.botMessageId = uuidv4();this.handler.setHistoryContent(prev => [...prev,{id: this.botMessageId,author: 'Assistance',text: 'AI正在思考中...',isStreaming: true}]);this.handler.setStreamId(this.botMessageId);};private establishSSEConnection = (content: string) => {return new Promise<void>((resolve, reject) => {try {this.handler.esInstance.setConfig({onMessage: (chunk: string) =>{this.handleChunk(chunk);}});this.handler.esInstance.setConfig({queryParams:{content,contentType: ContentType.TXT}});this.handler.esInstance.setConfig({onEnd: (arg) => {console.log(arg)resolve();}})this.handler.esInstance.connect();this.handler.setContent('');} catch (error) {reject(error);}});};private handleChunk = (chunk: string) => {if (!this.botMessageId) return;this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? {...msg,text: msg.text === 'AI正在思考中...'? chunk: msg.text + chunk.replace(/\\n/g, '\n')}: msg));};private handleError = (error: Error) => {console.error('消息服务错误:', error);if (!this.botMessageId) return;this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? { ...msg, text: '请求失败,请重试', isStreaming: false }: msg));};private cleanup = () => {this.handler.setStreamId(null);this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? { ...msg, isStreaming: false }: msg));this.botMessageId = null;};
}

✅ 逻辑梳理

整个 MessageService 类的工作流程如下:

🌟 1. 用户输入消息后,触发 send(content)

  1. validateInput(content):过滤掉空消息。

  2. addUserMessage(content):将用户消息添加到历史记录。

  3. prepareBotResponse():生成一个唯一 bot 消息 ID,先展示 "AI正在思考中..." 的占位消息,并标记为 isStreaming: true

  4. establishSSEConnection(content):启动 SSE 连接:

    • 设置 onMessage 处理流式片段(chunk)
    • 设置 onEnd 处理结束回调
    • 设置 queryParams 作为请求参数
    • 调用 connect() 启动事件源
  5. handleChunk(chunk):每次服务端发送数据片段,就更新 AI 响应内容。

  6. 若出错,handleError() 会将 bot 消息内容替换为“请求失败,请重试”。

  7. 最后调用 cleanup():关闭流式标记、重置状态。


🧠 一句话向面试官介绍这个类

我封装了一个 MessageService 类,专门用于管理用户消息的发送与 AI 响应的流式展示。它通过注入的 handler 操作 UI 层状态,并结合我封装的 EventSourceWrapper 实现稳定的 SSE 长连接,支持自动重连、状态清理与错误处理等能力。整个消息流转过程被拆分为输入校验、消息插入、连接建立、数据处理、异常兜底、连接回收六个步骤,逻辑清晰、职责单一,有利于维护和扩展。


📈 Mermaid 类图

MessageService
-botMessageId: string
-handler: MessageHandler
+constructor(handler: MessageHandler)
+send(content: string)
-validateInput(content: string)
-addUserMessage(content: string)
-prepareBotResponse()
-establishSSEConnection(content: string)
-handleChunk(chunk: string)
-handleError(error: Error)
-cleanup()
EventSourceWrapper
ChatMessage
+id: string
+author: string
+text: string
+isStreaming?: boolean
MessageHandler
+esInstance: EventSourceWrapper
+setHistoryContent(updater)
+setContent(content)
+setStreamId(id)

在这里插入图片描述

✅ 亮点总结(面试加分点)

亮点说明
状态分离MessageService 不直接操作 UI,而是通过 MessageHandler 依赖注入,解耦逻辑与视图。
渐进式流处理支持服务端分片返回(chunk)消息,实时拼接展示。
可拓展性强错误处理、连接状态、Bot 消息标记都独立封装,便于扩展如中断续传、撤回、重试等功能。
复用性好配合封装的 EventSourceWrapper,可以在多个模块中复用消息推送逻辑。

相关文章:

  • Spring Boot的Security安全控制——应用SpringSecurity!
  • 关于脉冲功率技术的认识
  • 子集筛选(Select by Data Index)组件研究
  • 贝塞尔曲线的切矢量
  • Java事务隔离问题详解:脏读、不可重复读与幻读(含解决方案)
  • 【算法 day03】LeetCode 203.移除链表元素 | 707.设计链表 | 206.反转链表
  • 【Elasticsearch】文档(一):新增 删除
  • db2主从同步 逻辑复制 APPLY_THROTTLE参数
  • 【CompletableFuture】基础Future (一)
  • 车载诊断框架 --- TCP window size设置
  • Kubernetes架构解析
  • Rviz2中,在rviz和launch文件中都需要配置urdf文件,二者作用上的区别?
  • 【工具教程】如何批量识别大量图片的文字并重命名图片,图片文件批量文件识别改名的详细操作步骤和注意事项
  • 水果商城管理系统笔记
  • [深度学习]目标检测YOLO v3
  • C语言状态机:从入门到精通
  • 英语~四级CET4考试——入栏需看
  • Chapter12-API testing
  • 「Linux文件及目录管理」文件及目录操作类命令
  • 【Zephyr 系列 26】跨平台测试框架设计:CLI + 自动脚本 + OTA 校验一体化方案
  • 公司做网站多/开发网站的流程是
  • 怎么做饲料电商网站/广州百度竞价外包
  • 福建seo网站/朝阳网站建设
  • 成都小程序系统定制开发/网站优化比较好的公司
  • 宣城网站推广/十大搜索引擎排行榜
  • 哪个网站可以做平面兼职/营销型网站建设多少钱