下流式接入ai
文章目录
- 前言
- 一、什么是下流式接入ai?
- 二、准备步骤
- 1.浏览器端如何接收“流” ---- fetch + ReadableStream
- 2.后端负责转发和流处理
- 3. Vue 前端实时显示
- 总结
前言
在我的项目中我准备接入ai来回答用户的问题以及对用户的健康状况进行基本总结等功能,而在这里我们一般使用下流式接入ai
一、什么是下流式接入ai?
- 下流&上流
在 AI 系统架构中,通常分成:
上游(Upstream):负责模型的训练、推理服务、算法优化等,是“AI 能力提供者”。
例如:OpenAI、百度文心、阿里通义、讯飞星火 等提供的模型 API。
下游(Downstream):负责把这些 AI 能力整合进具体的应用场景中,是“AI 能力使用者”。
例如:你用 GPT 接入一个健康管理系统的智能分析模块,这个系统就是“下游”。
2. 下流式接入ai
“下流式接入 AI”一般指:
不是自己训练模型,而是调用上游提供的现成大模型 API,将 AI 能力嵌入自己的应用中。
也就是我们使用别人训练好的ai我们并不去构建ai。
3. 流式和非流式
非流式就是等全部做好一次性输出,而流式就是边生成边输出,下流式可以实现超快展示,也是市面上的ai的展示方式,也就是在ai生成的过程就把它生成的部分逐渐展示出来。
二、准备步骤
1.浏览器端如何接收“流” ---- fetch + ReadableStream
fetch是在前端用来发起请求的,普通fetch拿到的是完整数据。
fetch('/api/something')
ReadableStream是流式展示数据的关键,因为ai返回的不是标准JSON而是分片数据,必须一点一点读,而ReadableStream是一个可以“边读边拿数据”的流。
他们俩组合在一起就可以实现一块一块地读取AI返回的内容
代码实例如下:
async function askAI() {const response = await fetch('/api/ai')const reader = response.body.getReader()const decoder = new TextDecoder()let result = ''while (true) {const { value, done } = await reader.read()if (done) breakconst chunk = decoder.decode(value)result += chunkconsole.log("AI 输出:", chunk)}console.log("最终结果:", result)
}
而这个过程可以大致概括为
fetch 请求 → response.body 是一个 ReadableStream → 用 reader.read() 循环读取 AI 流式返回的内容
就可以实现ai一点点渲染的效果
2.后端负责转发和流处理
前端无法直接访问ai密钥,所以后端要写一个Node服务器,这里采用Express。
后端需要做俩件事:
- 接收前端需求-prompt
- 向上游ai发起流式请求
- 将获得的数据以流的形式实时转发给ai
- 支持:取消、错误处理、降级(回退)、日志/计费统计 与 安全(不泄露 API KEY)。
而在前端与后端通信协议中通常采用SSE
代码实例:
这段代码可以实现 ---- 前端发 POST /api/ai/stream,后端向上游请求 stream:true,然后以 SSE(text/event-stream)将每个 chunk 发回前端
// server.js (Node 18+ 可以直接用 global fetch)
import express from 'express'
import rateLimit from 'express-rate-limit' // 简单限流
import bodyParser from 'body-parser'const app = express()
app.use(bodyParser.json())// 简单限流(按 IP)
app.use('/api/ai', rateLimit({ windowMs: 60_000, max: 30 }))const OPENAI_KEY = process.env.OPENAI_API_KEY
const controllers = new Map() // userId -> AbortControllerapp.post('/api/ai/stream', async (req, res) => {const { prompt, userId } = req.bodyif (!prompt) return res.status(400).json({ error: 'prompt required' })// 简单鉴权/校验(示例)if (!userId) return res.status(401).json({ error: 'unauthorized' })// 设置 SSE headersres.setHeader('Content-Type', 'text/event-stream')res.setHeader('Cache-Control', 'no-cache')res.setHeader('Connection', 'keep-alive')res.flushHeaders?.()// 创建可取消控制器const abortController = new AbortController()controllers.set(userId, abortController)try {// 调用上游(示例为 OpenAI 风格,注意不同服务细节不同)const aiRes = await fetch('https://api.openai.com/v1/chat/completions', {method: 'POST',headers: {'Authorization': `Bearer ${OPENAI_KEY}`,'Content-Type': 'application/json',},body: JSON.stringify({model: 'gpt-4o-mini',stream: true,messages: [{ role: 'user', content: prompt }],}),signal: abortController.signal,})if (!aiRes.ok) {// 读取错误响应并发送给前端,然后结束const txt = await aiRes.text()res.write(`event: error\ndata: ${JSON.stringify({ status: aiRes.status, message: txt })}\n\n`)return res.end()}// 兼容两种流格式:SSE 字符串(data: ...)或 raw text chunkconst reader = aiRes.body.getReader()const decoder = new TextDecoder()while (true) {const { done, value } = await reader.read()if (done) breakconst chunkStr = decoder.decode(value, { stream: true })// 上游可能已经是 SSE(带 data: ...),也可能是纯文本或JSON片段// 我们这里直接把原始 chunk 转发给前端作为 data// 前端可以再解析:如果是完整 JSON 或 data: 前缀则处理res.write(`data: ${chunkStr.replace(/\n/g, '\\n')}\n\n`) // 转义换行避免断行问题// optional: flush}// 通知完成res.write('data: [DONE]\n\n')res.end()} catch (err) {if (err.name === 'AbortError') {res.write('event: canceled\ndata: {}\n\n')res.end()} else {console.error('AI proxy error', err)res.write(`event: error\ndata: ${JSON.stringify({ message: 'upstream error' })}\n\n`)res.end()}} finally {controllers.delete(userId)}
})// 取消接口:前端在想要取消时调用
app.post('/api/ai/cancel', (req, res) => {const { userId } = req.bodyconst ctrl = controllers.get(userId)if (ctrl) {ctrl.abort()controllers.delete(userId)return res.json({ ok: true })}res.json({ ok: false, message: 'no running request' })
})app.listen(3000, () => console.log('Server on 3000'))
如果选用WebSocket作为前端和后端通信协议,则会实现更好的双方交互体验
3. Vue 前端实时显示
Vue在这里起到的作用是接收流式结果并展示。
<script setup>
import { ref } from "vue"const text = ref("")async function ask() {text.value = ""const response = await fetch('/api/ai', { method: 'POST' })const reader = response.body.getReader()const decoder = new TextDecoder()while (true) {const { value, done } = await reader.read()if (done) breaktext.value += decoder.decode(value)}
}
</script><template><button @click="ask">问 AI</button><pre>{{ text }}</pre>
</template>
这样就能实现常见ai的实时逐字输出的效果。
总结
在项目中接入ai会给项目增加很大的一个闪光点,这个下流式接入ai的方法我们都要掌握。
