LLM对话框项目 EventSource封装和MessageServiceClass流式展示封装
你这段代码实现了一个对浏览器原生 EventSource
的封装,主要目的是更方便地管理服务端事件推送(Server-Sent Events,简称 SSE),如自动构建 URL、配置管理、连接控制
等。
EventSourceAPI
✅ 面试时介绍建议(逻辑清晰)
面试官您好,我封装了一个
EventSourceWrapper
类,用于统一管理 SSE(Server-Sent Events)连接。这是一个单例模式实现,确保在整个应用生命周期中只有一个 SSE 实例。以下是它的几个核心设计点:
封装了一个EventSourceWrapper 类
,用于统一管理客户端与服务端之间的 SSE(Server-Sent Events)连接。这个类采用单例模式
,确保全局只存在一个长连接实例,避免重复连接的问题
。它支持配置参数的动态更新
(通过构造函数进行依赖注入
),自动构建带查询参数的连接 URL,并对 onopen、onmessage、onerror 和关闭事件做了统一的封装处理
。考虑到原生 SSE 的自动重连机制较为有限(不能监听重连次数
、不知道是否已经掉线 、不可自定义重连逻辑
等),我还设计了一个可选的自定义重连机制,支持最大重试次数、指数退避等策略
,使连接管理更加可控和健壮。此外,它还提供了 reconnect 接口,便于业务层在需要时手动触发重连。
1. 单例模式保证全局唯一性
- 类中通过静态方法
getESWrapperInstance
创建或复用唯一实例。 - 适用于聊天类应用中长连接唯一的场景,
避免资源浪费或重复连接
。
static getESWrapperInstance(): EventSourceWrapper
2. 配置灵活、支持动态更新
- 初始配置通过
构造函数
传入(依赖注入
),内部通过setConfig
实现对原配置的合并更新
(特别是queryParams
的深层合并)。
setConfig(config: Partial<EventSourceConfig>)
3. 自动构建连接 URL
- 封装了
buildUrl
方法,根据queryParams
自动生成合法 SSE 请求地址。
4. 事件监听器封装清晰
onopen
,onmessage
,onerror
,close
全部通过统一接口回调管理。- 错误处理逻辑健壮,
连接异常自动关闭
,并调用onError
通知业务层。 - 提供
reconnect
方法便于手动重连。
5. 易于拓展
- 可以按需添加
onRetry
,onReconnect
等回调,拓展性强。
//utils/EventSourceWrapper.ts
import {EventSourceConfig} from "../type/EventSourceConfig";export class EventSourceWrapper {private eventSource: EventSource | null = null;private config: EventSourceConfig;private isActive: boolean = false;private static instance:EventSourceWrapper|null=null;private retryCount = 0;private reconnectTimer: ReturnType<typeof setTimeout> | null = null;constructor(options: EventSourceConfig) {this.config = {withCredentials: false,...options};}static getESWrapperInstance():EventSourceWrapper{if(!this.instance) {this.instance = new EventSourceWrapper({url: "/dev-api/chatStream",queryParams: {content: "",contentType: "",},onError: (err) => {console.log('Error occurred:', err);}});}return this.instance;}getConfig():EventSourceConfig{return this.config;}//Partial 是 TypeScript 中的一个工具类型,用于创建一个新类型,其中所有属性都是可选的。setConfig(config: Partial<EventSourceConfig>): void {this.config = {...this.config, // 保留原有配置...config, // 覆盖新配置queryParams: { // 针对嵌套对象特殊处理...this.config?.queryParams,...config?.queryParams},};}private scheduleReconnect(): void {const {autoReconnect = false,maxRetries = 5,retryInterval = 2000,backoffMultiplier = 2} = this.config;if (!autoReconnect) return;if (this.retryCount >= maxRetries) {console.warn('SSE max retries reached');this.config.onError?.(new Error('Max reconnect attempts reached'));return;}const delay = retryInterval * Math.pow(backoffMultiplier, this.retryCount);console.log(`Retry #${this.retryCount + 1} in ${delay}ms`);this.reconnectTimer = setTimeout(() => {this.retryCount++;this.connect();}, delay);}private buildUrl(): string {const { url, queryParams } = this.config;return queryParams? `${url}?${new URLSearchParams(queryParams)}`: url;}connect(): void {if (this.isActive) return;try {this.isActive = true;const url = this.buildUrl();this.eventSource = new EventSource(url, {withCredentials: this.config.withCredentials});this.eventSource.onopen = (event) => {console.log('SSE connection established', event);};this.eventSource.onmessage = ({ data }) => {this.config.onMessage?.(data);this.retryCount = 0;if (this.reconnectTimer) {clearTimeout(this.reconnectTimer);this.reconnectTimer = null;}};//解答完一次为什么都会报错this.eventSource.onerror = (event) => {console.error('SSE connection error:', event);this.close();this.config.onError?.(new Error('Connection failed'));this.scheduleReconnect(); // 自定义重连};this.eventSource.addEventListener('close', () => {this.close();// console.log("close")this.config.onEnd?.("SSE connection close");});} catch (error) {this.close();this.config.onError?.(error instanceof Error ? error : new Error('Connection error'));}}close(): void {if (this.eventSource) {this.eventSource.close();this.eventSource = null;this.isActive = false;}if (this.reconnectTimer) {clearTimeout(this.reconnectTimer);this.reconnectTimer = null;}}// 手动重新连接(如果需要)reconnect(): void {this.close();this.connect();}
}
✅ Mermaid 类图
✅ 面试官可能问的问题及回应建议
面试官提问 | 应答建议 |
---|---|
为什么使用单例? | 保证全局只有一个连接实例,避免多个 SSE 并发浪费资源或产生冲突。 |
错误处理怎么做的? | onerror 中做了关闭连接和触发回调,并可通过 reconnect() 手动重连。 |
如何支持动态传参? | 通过 setConfig 合并旧配置,支持 queryParams 动态更新。 |
线程安全问题? | 由于前端 JS 是单线程的,不涉及并发修改,设计上已满足需求。 |
MessageServiceClass
你这段代码是一个非常清晰、职责明确的封装,负责处理“用户发送消息 -> 通过 SSE 获取 AI 响应 -> 流式展示结果”的完整流程。
// src/services/MessageService.ts
import { v4 as uuidv4 } from 'uuid';
import {ContentType} from "../type/Info.ts";
import {EventSourceWrapper} from "./EventSourceWrapper.ts";
import {ChatMessage} from "../type/EventSourceConfig.ts";type MessageHandler = {setHistoryContent: (updater: (prev: ChatMessage[]) => ChatMessage[]) => void;setContent: (content: string) => void;setStreamId: (id: string | null) => void;esInstance: EventSourceWrapper; // SSE连接配置类型需根据实际SDK定义
};export class MessageService {private botMessageId: string | null = null;constructor(private handler: MessageHandler) {}public send = async (content: string) => {if (!this.validateInput(content)) return;this.addUserMessage(content);this.prepareBotResponse();try {await this.establishSSEConnection(content);} catch (error) {this.handleError(error as Error);} finally {this.cleanup();}};private validateInput = (content: string): boolean => {const trimmed = content.trim();if (!trimmed) {console.warn('空消息被拦截');return false;}return true;};private addUserMessage = (content: string) => {this.handler.setHistoryContent(prev => [...prev,{ id: uuidv4(), author: 'User', text: content }]);};private prepareBotResponse = () => {this.botMessageId = uuidv4();this.handler.setHistoryContent(prev => [...prev,{id: this.botMessageId,author: 'Assistance',text: 'AI正在思考中...',isStreaming: true}]);this.handler.setStreamId(this.botMessageId);};private establishSSEConnection = (content: string) => {return new Promise<void>((resolve, reject) => {try {this.handler.esInstance.setConfig({onMessage: (chunk: string) =>{this.handleChunk(chunk);}});this.handler.esInstance.setConfig({queryParams:{content,contentType: ContentType.TXT}});this.handler.esInstance.setConfig({onEnd: (arg) => {console.log(arg)resolve();}})this.handler.esInstance.connect();this.handler.setContent('');} catch (error) {reject(error);}});};private handleChunk = (chunk: string) => {if (!this.botMessageId) return;this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? {...msg,text: msg.text === 'AI正在思考中...'? chunk: msg.text + chunk.replace(/\\n/g, '\n')}: msg));};private handleError = (error: Error) => {console.error('消息服务错误:', error);if (!this.botMessageId) return;this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? { ...msg, text: '请求失败,请重试', isStreaming: false }: msg));};private cleanup = () => {this.handler.setStreamId(null);this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? { ...msg, isStreaming: false }: msg));this.botMessageId = null;};
}
✅ 逻辑梳理
整个 MessageService
类的工作流程如下:
🌟 1. 用户输入消息后,触发 send(content)
-
validateInput(content)
:过滤掉空消息。 -
addUserMessage(content)
:将用户消息添加到历史记录。 -
prepareBotResponse()
:生成一个唯一 bot 消息 ID,先展示"AI正在思考中..."
的占位消息,并标记为isStreaming: true
。 -
establishSSEConnection(content)
:启动 SSE 连接:- 设置
onMessage
处理流式片段(chunk) - 设置
onEnd
处理结束回调 - 设置
queryParams
作为请求参数 - 调用
connect()
启动事件源
- 设置
-
handleChunk(chunk)
:每次服务端发送数据片段,就更新 AI 响应内容。 -
若出错,
handleError()
会将 bot 消息内容替换为“请求失败,请重试”。 -
最后调用
cleanup()
:关闭流式标记、重置状态。
🧠 一句话向面试官介绍这个类
我封装了一个
MessageService
类,专门用于管理用户消息的发送与 AI 响应的流式展示。它通过注入的handler
操作 UI 层状态,并结合我封装的EventSourceWrapper
实现稳定的 SSE 长连接,支持自动重连、状态清理与错误处理等能力。整个消息流转过程被拆分为输入校验、消息插入、连接建立、数据处理、异常兜底、连接回收六个步骤,逻辑清晰、职责单一,有利于维护和扩展。
📈 Mermaid 类图
✅ 亮点总结(面试加分点)
亮点 | 说明 |
---|---|
状态分离 | MessageService 不直接操作 UI,而是通过 MessageHandler 依赖注入,解耦逻辑与视图。 |
渐进式流处理 | 支持服务端分片返回(chunk)消息,实时拼接展示。 |
可拓展性强 | 错误处理、连接状态、Bot 消息标记都独立封装,便于扩展如中断续传、撤回、重试等功能。 |
复用性好 | 配合封装的 EventSourceWrapper ,可以在多个模块中复用消息推送逻辑。 |