fetch post请求SSE「eventsource-parser/stream」
随着AI越来越多的进入人们的生活,程序员难免会开发一些AI助手的工作 ,这里记录开发中遇到的一个坑
const response = await fetch('/sse', {method: 'POST',headers: {'Content-Type': 'text/event-stream'},body: {"user_id": 123}})
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
while (true) {const {value, done} = await reader.read();if (done) break;console.log('Received', value);
}
这样的代码,就可以完整的使用fetch POST请求到SSE的数据,reader.read() 会一直的循环读取
但是 这里需要自己解析:
得到的格式是:
data: {"event":"message","task_id":"544396bb-b898-467f-833a-11b8393b1064","id":"793f44db-a07d-4be7-96ec-73c08e131ce0","message":"","type":"","url":"","audio":"","answer":"**\n","created_at":1747021789,"conversation_id":"5549a44c-8087-4ac7-9084-af4bee7f481e","workflow_run_id":"","metadata":{"usage":{"prompt_tokens":0,"prompt_unit_price":"","prompt_price_unit":"","prompt_price":"","completion_tokens":0,"completion_unit_price":"","completion_price_unit":"","completion_price":"","total_tokens":0,"total_price":"","currency":"","latency":0},"retriever_resources":null},"from_variable_selector":["17424585111000","text"],"code":"","data":{"id":"","workflow_id":"","sequence_number":0,"node_id":"","node_type":"","title":"","index":0,"created_at":0,"finished_at":0,"status":"","error":"","elapsed_time":0,"predecessor_node_id":""}}
这不是一个对象,是一个字符串,并且如果没有读取完就处理,会出现,JSON截断的情况。
response.then(resp => {resp.data.on('data', data => {const lines = data.toString().split('\n').filter(line => line.trim() !== '')for (const line of lines) {const message = line.replace(/^data: /, '')if (message === '[DONE]') {res.write('data: DONE\n\n')res.end()return}const parsed = JSON.parse(message) // 此处会报错res.write(`data: ${parsed.choices[0].text}\n\n`)}})}).catch(err => {console.log(err)})
尝试了在 JSON.parse处 加上try catch 一旦报错 记录下当前行的值,拼接到下一行的头部。
然后在处理,这种情况理论上也是可行的,但是实际调试非常复杂,字符串分割总有看不见的问题。
eventsource-parser/stream作用是将sse接口返回的字符串转为对象且避免了debug断点时接口不间断返回的数据被塞到一个字符串的问题
是的 只需要读取的时候 改造一下代码
const reader = response.body.pipeThrough(new TextDecoderStream()).pipeThrough(new EventSourceParserStream()).getReader()
就可以在循环的时候直接读取到 每一行的对象数据