SSE 流式响应实战:如何在 JavaScript 中处理 DeepSeek 流式 API
假设服务器发送的数据:
data: {"content": "Hello"}data: {"content": "World"}data: [DONE]
如图:

手动处理 SSE 格式,核心代码 processStream
上代码
<!DOCTYPE html>
<html lang="en"><head><meta charset="UTF-8" /><meta name="viewport" content="width=device-width, initial-scale=1.0" /><title>Document</title></head><body><divid="output"style="white-space: pre-wrap;border: 1px solid #ccc;padding: 10px;height: 200px;overflow-y: scroll;"></div><div id="status">状态: 进行中...</div><script>class DeepSeekStreamHandler {constructor(apiKey, options = {}) {this.apiKey = apiKey;this.baseURL =options.baseURL || "https://api.deepseek.com/chat/completions";this.onMessage = options.onMessage || (() => {});this.onComplete = options.onComplete || (() => {});this.onError = options.onError || (() => {});this.isStreaming = false;}// 发送消息并处理流式响应async sendMessage(messages, model = "deepseek-chat") {if (this.isStreaming) {throw new Error("已有流式请求在进行中");}this.isStreaming = true;let accumulatedContent = "";try {const response = await fetch(this.baseURL, {method: "POST",headers: {"Content-Type": "application/json",Authorization: `Bearer ${this.apiKey}`,},body: JSON.stringify({model: model,messages: messages,stream: true, // 关键:启用流式响应temperature: 0.7,max_tokens: 2048,}),});if (!response.ok) {throw new Error(`API 请求失败: ${response.status}`);}await this._processStream(response, (chunk) => {if (chunk.choices && chunk.choices[0].delta.content) {const content = chunk.choices[0].delta.content;accumulatedContent += content;this.onMessage({content: content,fullContent: accumulatedContent,chunk: chunk,});}});this.onComplete(accumulatedContent);} catch (error) {this.onError(error);} finally {this.isStreaming = false;}}// 处理流式数据async _processStream(response, onChunk) {// 1. 获取流阅读器 - 用于读取数据块// 因为响应体是流,需要阅读器来读取,Stream 不能直接读取,需要通过 getReader() 获取阅读器const reader = response.body.getReader();// 2. 创建文本解码器 - 用于二进制转文本const decoder = new TextDecoder();// 3. 缓冲区 - 存储不完整的数据,数据块可能被拆分,需要重组完整消息let buffer = "";try {while (true) {// 4. 读取数据块 - 核心步骤!const { value, done } = await reader.read(); // 读取一块数据if (done) break;// 5. 解码二进制数据为文本,value 是 Uint8Array 二进制数据buffer += decoder.decode(value, { stream: true }); // 解码这一块数据const lines = buffer.split("\n");buffer = lines.pop() || "";for (const line of lines) {const trimmed = line.trim();if (trimmed.startsWith("data: ")) {const data = trimmed.slice(6);if (data === "[DONE]") {return;}try {const parsed = JSON.parse(data);onChunk(parsed);} catch (e) {console.warn("解析失败:", data);}}}}} finally {reader.releaseLock(); // 释放资源}}// 停止流式传输stop() {this.isStreaming = false;}}</script><script>// 初始化const deepSeek = new DeepSeekStreamHandler("sk-your-api-key-here",{onMessage: (data) => {// 实时显示内容document.getElementById("output").innerHTML += data.content;console.log("实时内容:", data.content);},onComplete: (fullContent) => {console.log("完整回复:", fullContent);document.getElementById("status").textContent = "完成";},onError: (error) => {console.error("错误:", error);document.getElementById("status").textContent = "错误";},});// 发送消息const messages = [{ role: "user", content: "你好啊" }];deepSeek.sendMessage(messages);</script></body>
</html>
为什么没用 EventSource
认证限制:EventSource 不支持自定义 HTTP 头部
方法限制:只支持 GET 请求
灵活性:Fetch API + ReadableStream 更灵活
SSE基本使用参见文章
使用 EventSource 的问题
// ❌ 这样不行 - EventSource 不支持自定义 Authorization 头部
const eventSource = new EventSource('https://api.deepseek.com/chat/completions', {headers: {'Authorization': 'Bearer your-api-key' // ❌ 不支持!}
});// ❌ 也不支持 POST 请求
// EventSource 只能发起 GET 请求
如果 API 支持 GET + 查询参数认证
// 假设 DeepSeek 支持这种格式(通常不支持)
function connectDeepSeekWithEventSource(apiKey, message) {// 将消息和认证信息放在 URL 参数中const url = `https://api.deepseek.com/chat/completions?apiKey=${apiKey}&message=${encodeURIComponent(message)}&stream=true`;const eventSource = new EventSource(url);eventSource.onmessage = function(event) {try {const data = JSON.parse(event.data);if (data.choices && data.choices[0].delta.content) {const content = data.choices[0].delta.content;console.log('收到内容:', content);// 更新 UIdocument.getElementById('output').textContent += content;}} catch (error) {console.log('原始数据:', event.data);}};eventSource.addEventListener('error', function(event) {console.error('EventSource 错误:', event);});eventSource.addEventListener('done', function(event) {console.log('流结束');eventSource.close();});return eventSource;
}
实际解决方案:使用 Fetch API 的原因
// ✅ 使用 Fetch API 可以:
// 1. 设置自定义头部
// 2. 使用 POST 请求
// 3. 发送 JSON 数据
// 4. 更灵活的错误处理async function callDeepSeekAPI() {const response = await fetch('https://api.deepseek.com/chat/completions', {method: 'POST',headers: {'Content-Type': 'application/json','Authorization': 'Bearer your-api-key' // ✅ 支持自定义头部},body: JSON.stringify({model: 'deepseek-chat',messages: [{ role: 'user', content: 'Hello' }],stream: true})});// 手动处理 SSE 格式await processSSEStream(response, (chunk) => {console.log('收到数据:', chunk);});
}
// 处理 SSE 流的核心函数
async function processSSEStream(response, onChunk) {const reader = response.body.getReader();const decoder = new TextDecoder();let buffer = "";try {while (true) {const { value, done } = await reader.read();if (done) {console.log("流结束");break;}// 解码数据并添加到缓冲区buffer += decoder.decode(value, { stream: true });// 处理缓冲区中的完整事件const lines = buffer.split("\n");buffer = lines.pop() || ""; // 最后一行可能不完整,保留到下次for (const line of lines) {const trimmed = line.trim();if (trimmed.startsWith("data: ")) {const data = trimmed.slice(6);if (data === "[DONE]") {return;}try {const parsed = JSON.parse(data);onChunk(parsed);} catch (e) {console.warn("解析失败:", data);}}}}} finally {reader.releaseLock();}
}