当前位置: 首页 > news >正文

49、发起流式请求获取回答

1、 该方法用于发起一个流式的 POST 请求,并处理服务器响应。下面逐行解释这段代码:

  async postEventStream(url: string, body: Record<string, any>, 
methods: StreamMethods, config: RequestConfig) {//new Headers() 创建一个新的 Headers 对象,用于存储请求头信息。const headers = new Headers();let undheaders.set('X-CSRF-TOKEN', und)headers.set('Content-Type', 'application/json')headers.set('Accept', 'application/json, text/plain, */*')headers.set('Cookie', 'SESSION=603feaca-fa9f-4b61-9444-b96620fb3eaf;JSESSIONID=AF3489505A1AAE85600A593A85F1A0CE')// 如何输出查看 headers 的值//headers.entries() 返回一个迭代器,用于遍历 Headers 对象中的所有键值对。for (const [key, value] of headers.entries()) {console.log(`%c${key}:`, 'color: blue;', value); // 高亮键名[3](@ref)}await fetch(this.baseURL + url, {method: 'POST',headers: {'X-CSRF-TOKEN': und,'Content-Type': 'application/json','Accept': 'application/json, text/plain, */*'},body: JSON.stringify(body),...config,}).then(async (res) => {if (res.ok) {await this.handleStream(res, methods)}else {throw new Error('服务器响应异常')}}).catch((err) => {methods.onError?.(err)})}

 2、这部分代码定义了 handleStream 方法,其主要功能是处理流式响应体。

流式响应指的是服务器不会一次性把所有数据返回,而是分多次、持续地发送数据,这种方式常用于实时数据传输场景,比如聊天机器人的实时回复。下面为你简单总结这个方法的核心工作流程:

- 准备工作

- 获取响应体的读取器,用于逐块读取二进制数据。
- 创建解码器,把读取到的二进制数据转换为字符串。
- 初始化一个缓冲区,用来存储可能不完整的数据块。
- 从传入的参数中解构出 onAdd 、 onEvent 和 onFinish 这三个回调函数,后续会在不同情况下调用它们。
- 循环读取数据 :

- 进入一个递归函数,不断从响应体读取数据块,直到数据读取完毕。
- 每读取一个数据块,就用解码器将其转为字符串并添加到缓冲区。
- 数据拆分与处理 :

- 按换行符把缓冲区中的字符串拆分成多行。
- 保留最后一行,因为它可能不完整,等待下一次读取数据时补充完整。
- 遍历拆分后的每一行数据,只处理以 data: 开头且不是 data: [DONE] 的行。
- 数据解析与格式化 :

- 去掉每行数据前面的 data: 前缀和多余空格,得到 JSON 格式的字符串。
- 尝试把 JSON 字符串解析成 JavaScript 对象。
- 如果解析成功且对象的 event 字段为 message ,就提取其中的 answer 内容,判断数据是新增数据( add 事件)还是结束数据( finish 事件)。
- 把处理后的数据格式化成特定结构,存储到一个数组里。
- 调用回调函数 :

- 把格式化后的数据里的文本内容合并起来,调用 onAdd 回调函数,通知外部有新数据到来。
- 调用 onEvent 回调函数,传递最后一条处理后的数据和所有格式化数据,让外部可以处理事件相关逻辑。
- 如果遇到结束事件,调用 onFinish 回调函数,通知外部流式数据传输结束。

 async handleStream(response: Response, methods: StreamMethods) {//获取响应体的读取器,用于逐块读取数据,用于从响应体里逐块读取二进制数据。const reader = response.body!.getReader();//将二进制数据解码为字符串。const decoder = new TextDecoder();//用于存储未完整的文本数据,因为数据可能是分块传输的let buffer = ''const { onAdd, onEvent, onFinish } = methods;//定义递归读取数据的函数async function readData() {//异步读取一块数据,返回一个包含 done 和 value 的对象。 done 表示是否读取完所有数据, //value 是读取到的二进制数据。const { done, value } = await reader.read()if (done)return// 将二进制数据解码为字符串,并将其添加到 buffer 中buffer += decoder.decode(value, { stream: true });// 按换行符拆分,最后一行可能是不完整的,保留到 buffer 中const lines = buffer.split('\n');//将最后一行数据从 lines 数组中取出并赋值给 buffer ,因为最后一行可能不完整。buffer = lines.pop() || '';const formattedResults = [];//遍历每行数据并解析for (const line of lines) {// 只处理以 "data:" 开头且不为 "[DONE]" 的行if (line.startsWith('data:') && line.trim() !== 'data: [DONE]') {// 去除所有 "data:" 前缀及空白字符const jsonStr = line.replace(/^(?:data:\s*)+/, '').trim();// 如果 jsonStr 为空,则跳过当前循环if (jsonStr === '') {continue}try {const parsed = JSON.parse(jsonStr)console.log(parsed);if(parsed.event === 'message'){// 处理 parsed 数据,计算出描述内容及事件类型let desc = ''let type = 'text' // 默认为文本let computedEvent = 'add'if (parsed && parsed.answer) {desc += parsed.answer}// 如果存在 finish_reason,则认为是结束事件if (!parsed.answer || parsed.answer === '') {computedEvent = 'finish'}// 构造传给 onEvent 的数据格式:data 为 JSON 字符串,解析后应包含 desc 字段const formatted = {event: computedEvent,data: JSON.stringify({ desc, type, text: desc }),...parsed}formattedResults.push(formatted)// 如果是 finish 事件,可立即调用 onFinishif (computedEvent === 'finish') {onFinish?.(JSON.stringify({ desc, type, text: desc }))}}}catch (error) {console.error('JSON 解析错误:', error, jsonStr)}}}// 将本次所有解析的内容合并用于 onAdd 回调const accumulatedContent = formattedResults.reduce((acc, item) => {const parsedData = JSON.parse(item.data)return acc + (parsedData.desc || '')}, '')if (accumulatedContent) {onAdd?.(accumulatedContent, formattedResults)}// 调用 onEvent,传入最后一条事件以及所有格式化后的结果if (formattedResults.length) {onEvent?.(formattedResults[formattedResults.length - 1], formattedResults)}await readData()}await readData()}

3、 

export function fetchStreamText(data: Record<string, any> | undefined,onAdd: (str: string) => void,onFinish: (str: string) => void,onError: (err: Error) => void,signal?: AbortSignal,
) {if (!data?.question) {throw new Error('Question is required')}return request.postEventStream('/reModuleHomePage/chat-messages-stream', {inputs: {},query: data.question,response_mode: "streaming",conversation_id: data.conversation_id,user: data.user,agentId: data.agentId,files: data.files}, { onAdd, onFinish, onError }, { signal })
}

4、getAnswer 方法主要用于处理获取流式回答的逻辑。它会发起一个流式请求,对服务器返回的流式数据进行解析和处理,同时支持中途停止请求。 

 async getAnswer(msgAnswer: MsgAnswer) {console.log(msgAnswer,'msgAnswer')// 省略不必要的变量//创建 AbortController 实例,用于后续取消请求const abortController = new AbortController();//为 currentSession 的 stopReplyFn 赋值,以便在外部调用时能取消请求。this.currentSession!.stopReplyFn = () => {abortController.abort()}// 初始化 msgAnswer 的相关属性,表示开始获取回答。msgAnswer.result = { text: '', type: 'text', id: '', message_id: '', task_id: '', conversation_id: '' }msgAnswer.time = getTime()msgAnswer.generating = truemsgAnswer.loading = true// 也可以在 msgAnswer 上存一个已解析好的 chunks 数组,或者// 让前端组件使用 msgAnswer.result!.text 的方式展示也行// 示例:在 msgAnswer 上挂一个 parsedChunks//若 msgAnswer 没有 parsedChunks 属性,则初始化该属性为一个空数组,//用于存储解析后的块数据。if (!(msgAnswer as any).parsedChunks) {(msgAnswer as any).parsedChunks = []}// 判断文本中是否包含###,若包含就当 markdown 处理,否则当普通textconst convertToTextOrMarkdownChunk = (text: string) => {const isMarkdown = text.includes('###')return {type: isMarkdown ? 'markdown' : 'text',content: text,}}// parseOneChunk:只解析开头是否能构成一个完整 <data> / <chart> 块,或纯文本/markdown 块// 若能返回 {parsed, remaining},否则返回 nullconst parseOneChunk = (text: string) => {// 正则同时捕捉 <data>...</data>、<chart>...</chart>、<sql>...</sql>const tagRegex = /<(data|chart|sql)>([\s\S]*?)<\/\1>/const match = tagRegex.exec(text)if (!match) {// 如果没匹配到,可能是纯文本或未完整标签const potentialTagIndex= text.includes('<data>')|| text.includes('<chart>')|| text.includes('<sql>')// 如果已经出现任意标签开头,但没有完整闭合,说明还没接收完,等下次if (potentialTagIndex) {return null}// 否则全部作为文本if (!text)return nullreturn {parsed: convertToTextOrMarkdownChunk(text),remaining: '',}}// 如果标签前有文本,就先拆出来当文本/markdown 块if (match.index > 0) {const textBeforeTag = text.substring(0, match.index)if (textBeforeTag.trim()) {return {parsed: convertToTextOrMarkdownChunk(textBeforeTag),remaining: text.substring(match.index),}}}// 匹配到的整个标签,如 <data>{"x":1}</data>const wholeTag = match[0]// "data" / "chart" / "sql"const tagType = match[1]// 标签内的内容const contentString = match[2]let chunkif (tagType === 'data' || tagType === 'chart') {// 对 data/chart 尝试 JSON.parsetry {const content = JSON.parse(contentString)chunk = { type: tagType, content }}catch {// JSON 不完整,等待下一波数据return null}}else if (tagType === 'sql') {// 假设 sqlContent 是 match[2] 的原文const codeBlock = `\`\`\`sql\n${contentString}\n\`\`\``chunk = { type: 'markdown', content: codeBlock }}// 计算标签后的剩余字符串const nextPos = match.index + wholeTag.lengthconst remaining = text.substring(nextPos)return { parsed: chunk, remaining }}// handleStreamData:每次收到后端的一行文本,就调这里做增量解析const handleStreamData = (newText: string) => {// 1. 先把 newText 累加到 buffer(msgAnswer as any)._unparsedBuffer += newText// 2. 尝试不断从 buffer 里解析块let chunkdo {chunk = parseOneChunk((msgAnswer as any)._unparsedBuffer)if (chunk) {// 2.1 解析成功,则更新 buffer、并把解析出的 chunk 加入 msgAnswer(msgAnswer as any)._unparsedBuffer = chunk.remaining;(msgAnswer as any).parsedChunks.push(chunk.parsed)}} while (chunk)}// ---------------------------------------------------------------------// 开始发起流式请求const streamAbortController = new AbortController()let uploadFiles =  this.questionInputFiles.length > 0 ? this.questionInputFiles.map((item) => ({// ...item,type: item.type,transfer_method: 'local_file',upload_file_id: item.fileId})) : []fetchStreamText({question: msgAnswer.question,sessionId: this.currentSessionId,files: uploadFiles,agentId: ChatInfoObject.agentId,user: ChatInfoObject.userId,conversation_id: this.currentChatId},// 第三个参数: 每次后端返回一行新数据就会调用(str, formattedResults) => {// 调用 handleStreamData 做增量解析handleStreamData(str)// 如果你仍然需要在 msgAnswer.result!.text 里拼接全部文本// (比如回退兼容旧逻辑),可以保留这一句msgAnswer.result!.text += strmsgAnswer.result!.id = formattedResults[0].idmsgAnswer.result!.message_id = formattedResults[0].message_idmsgAnswer.result!.task_id = formattedResults[0].task_idmsgAnswer.result!.conversation_id = formattedResults[0].conversation_idthis.currentChatId = formattedResults[0].conversation_id},// 第四个参数: 流式结束(包括自然结束或报错)时执行() => {msgAnswer.loading = falsemsgAnswer.generating = falsethis.scrolledBottom = truethis.currentSession!.stopReplyFn = undefined},// 第五个参数: 流式错误(err) => {msgAnswer.loading = falsemsgAnswer.generating = falsethis.scrolledBottom = truethis.currentSession!.stopReplyFn = undefined// message.error('流式请求失败')},// 传递 AbortSignal,用于中途取消流streamAbortController.signal,).then(() => {}).catch((err) => {})// 设置stopReplyFn,用于在外部想中断请求时调用this.currentSession!.stopReplyFn = () => {streamAbortController.abort()msgAnswer.generating = falsemsgAnswer.loading = false}this.scroll2Bottom?.(true)},

相关文章:

  • Jarpress 开源项目重构公告
  • Java设计模式之观察者模式:从入门到架构级实践
  • 安卓基础(SQLite)
  • 设计模式之状态模式:优雅管理对象行为变化
  • 最简单的使用SDL2 播放原始音频数据程序
  • KrillinAI:视频跨语言传播的一站式AI解决方案
  • js原型链污染
  • 使用 LLaMA-Factory 对 DeepSeek R1进行微调教程
  • Docker--Docker镜像原理
  • PHP4 Session定制与使用指南
  • 部署LLaMA Factory,及快速使用
  • 3DGS之渲染管线
  • 天元证券|8家汽车零部件上市公司一季度业绩预喜
  • [特殊字符] LoRA微调大模型实践:从MAC到Web的全流程指南
  • SGFormer:卫星-地面融合 3D 语义场景补全
  • 大模型预标注和自动化标注在OCR标注场景的应用
  • http、https、TLS、证书原理理解,对称加密到非对称加密问题,以及对应的大致流程
  • Linux驱动开发进阶(八)- GPIO子系统BSP驱动
  • 【Qt】初识Qt
  • 使用Python+xml+shutil修改目标检测图片和对应xml标注文件
  • 苹果Safari浏览器上的搜索量首次下降
  • 技术派|伊朗展示新型弹道导弹,美“萨德”系统真的拦不住?
  • 常州市委原常委、组织部部长陈翔调任江苏省民宗委副主任
  • 保利发展前4个月销售额约876亿元,单月斥资128亿元获4个项目
  • 首家股份行旗下AIC来了,兴银金融资产投资有限公司获批筹建
  • 中俄领导人将讨论从俄罗斯经蒙古至中国天然气管道项目?外交部回应