当前位置: 首页 > news >正文

AI开发 | 基于FastAPI+React的流式对话

文章目录

  • 应用背景
  • 技术选型
    • 后端技术栈
    • 前端技术栈
    • 数据库
    • 开发工具
  • 工程结构
  • 关键技术:SSE流式对话实现
    • 后端实现代码
    • 前端实现代码
  • 快速开始
    • 环境准备
    • 安装步骤
    • 数据库初始化
    • 启动服务
    • 访问应用
  • 总结


应用背景

近期公司持续收到垂直领域的AI对话系统开发需求,典型场景包括法律咨询助手外贸流程顾问等应用。为验证技术可行性并建立实施框架,本文核心目标在于实现:

  1. 大模型统一接入

    • 标准化接口对接OpenAI、DeepSeek等主流大模型
    • 实时模型热切换能力演示(运行时无中断切换)
  2. 高效流式对话

    • 基于SSE(Server-Sent Events)的流式传输协议
    • 前端动态渲染优化(逐字输出的打字机效果)
  3. 全周期对话管理

    • 对话历史自动持久化存储(MySQL事务保障)
    • 完整上下文追溯能力(历史消息即时调取)

技术选型

后端技术栈

​​FastAPI​​:高性能异步Web框架,提供自动API文档生成和高效IO处理能力
​​LangChain​​:LLM应用开发框架,提供模型抽象层和对话链管理
​​SQLAlchemy​​:Python ORM工具,支持多种数据库后端
​​PyMySQL​​:MySQL数据库驱动
​​Python-Jose​​:JWT认证实现,保障API安全
​​Pydantic​​:数据模型验证和序列化工具

前端技术栈

​​React​​:构建用户界面的主流库,支持组件化开发
​​Redux Toolkit​​:状态管理工具,简化全局状态管理
​​Ant Design​​:企业级UI组件库,提供丰富的预制组件
​​Axios​​:HTTP客户端,处理API请求
​​React Router​​:前端路由管理

数据库

​​MySQL 5.7+​​:关系型数据库,存储用户信息、对话记录等结构化数据

开发工具

​​Node.js 16+​​:前端运行环境
​​Python 3.8+​​:后端运行环境
​​nvm​​:Node.js版本管理工具
​​uvicorn​​:ASGI服务器,用于运行FastAPI应用

工程结构

llm-chat/
├── backend/                # 后端服务
│   ├── api/                # API路由
│   │   ├── admin.py        # 后台管理相关接口
│   │   ├── auth.py         # 认证相关接口
│   │   └── chat.py         # 对话相关接口
│   ├── config/             # 配置管理
│   │   ├── config.py       # 应用配置
│   │   └── database.py     # 数据库配置
│   ├── models/             # 数据库模型定义
│   ├── schemas/            # Pydantic数据模型
│   ├── utils/              # 工具函数
│   │   ├── auth.py         # 认证工具
│   │   └── llm.py          # LLM管理工具
│   └── main.py             # 应用入口
│
├── frontend/               # 前端应用
│   ├── src/
│   │   ├── components/     # 可复用UI组件
│   │   ├── pages/          # 页面组件
│   │   ├── services/       # API服务封装
│   │   ├── store/          # Redux状态管理
│   │   ├── styles/         # 样式文件
│   │   ├── App.js          # 应用根组件
│   │   └── index.js        # 入口文件
│   └── package.json        # 依赖管理

关键技术:SSE流式对话实现

(1) 代码对接的是DeepSeek模型,刚开始按照LangChain的方式走非流式对接没有太大问题,但是直接换成流式输出时问题就比较多了,所以参考DeepSeek官方的示例(采用的是OpenAI的SDK)就调通了。
(2)注意流式SSE走的API请求应该是GET请求
(3) MVVM框架如React或者Vue使用代理时,要注意是否支持SSE。(默认是不支持的)

后端实现代码

@router.get("/stream")
async def generate(conversation_id: str = None,model_name: str=None,message: str=None,current_user: User = Depends(get_current_user),db: Session = Depends(get_db),llm_manager: LLMManager=Depends(get_llm_manager)
) -> StreamingResponse:"""流式生成回复"""try:# 获取对话历史history = []# === 1. 处理对话创建 ===#注意:如果conversation_id不存在或者为空,则创建一个新的对话if not conversation_id:# 创建新对话title = message[:50] + "..." if message else "新对话"new_conversation = Conversation(user_id=current_user.id,title=title,model_name=model_name)db.add(new_conversation)db.flush()  # 立即刷新获取IDdb.commit()conversation_id = new_conversation.idelse:# 从数据库获取历史消息history_messages = (db.query(Message).filter(Message.conversation_id == conversation_id).order_by(Message.created_at).all())history = [{"role": msg.role, "content": msg.content}for msg in history_messages]#保存用户消息user_message = Message(conversation_id=conversation_id,role="user",content=message)db.add(user_message)db.commit()# 创建生成器函数async def generate_stream():async for chunk in llm_manager.generate_response(model_name=model_name,prompt=message,history=history):yield f"data: {chunk}\n\n"yield "data: [DONE]\n\n"# === 4. 通过中间件捕获流结束后保存助手消息 ===async def response_generator():"""带后处理功能的响应生成器"""# 创建临时容器存储完整回复full_response = []# 迭代流式输出async for chunk in generate_stream():full_response.append(chunk)yield chunk# 流结束后保存助手消息到数据库assistant_content = "".join(full_response).replace("data: [DONE]\n\n", "").replace("data: ", "").replace("\n\n", "")assistant_content = assistant_content.strip()  # 清理多余空格# 排除结束标志后保存有效内容if assistant_content:assistant_msg = Message(conversation_id=conversation_id,role="assistant",content=assistant_content)db.add(assistant_msg)db.commit()print(f"助手消息已保存,长度: {len(assistant_content)}")return StreamingResponse(response_generator(),media_type="text/event-stream",headers={"Cache-Control": "no-cache","X-Conversation-ID": str(conversation_id)  # 辅助头部})except Exception as e:raise HTTPException(status_code=500, detail=str(e))# llm_manager工具类实现的大模型对话async def generate_response(self,model_name: str,prompt: str,history: Optional[List[Dict[str, str]]] = None,system_prompt: Optional[str] = None) -> AsyncGenerator[str, None]:"""生成回复"""# 构建消息列表messages = []# 添加系统提示if system_prompt:messages.append({"role": "system", "content": system_prompt})# 添加历史消息if history:messages.extend(history)# 添加当前问题messages.append({"role": "user", "content": prompt})# 根据模型名称获取模型信息model= self.get_model(model_name)if not model:raise ValueError(f"未找到模型: {model_name}")#创建 OpenAI 客户端实例client = AsyncOpenAI(api_key=model.configuration["api_key"], base_url=model.configuration["base_url"])try:# 创建流式响应# 这里假设使用 OpenAI 的流式接口response = await client.chat.completions.create(model=model_name,messages=messages,stream=True)buffer = ""# 使用 OpenAI 的流式接口async for chunk in response:if content := chunk.choices[0].delta.content:yield content  # 直接返回整个文本片段except Exception as e:logger.error(f"Error generating response: {e}")yield f"Error: {str(e)}"    finally:# 确保资源清理await client.close()

前端实现代码

 // 发送消息const handleSendMessage = async () => {if (!messageInput.trim()) return;// 清除之前的响应setCurrentResponse('');try {dispatch(setLoading(true));let fullResponse = '';const messageId = Date.now(); // 使用时间戳作为临时ID// 记录用户消息const userMessage = {id: messageId,role: 'user',content: messageInput,conversation_id: currentConversation?.id};dispatch(addMessage(userMessage));setMessageInput(''); // 立即清空输入框// 创建暂存的响应消息const assistantMessage = {id: messageId + 1,role: 'assistant',content: '',conversation_id: currentConversation?.id};dispatch(addMessage(assistantMessage));// 开始流式请求const stream = await chatAPI.sendMessage({//要防止undefinedconversation_id: currentConversation?.id || '',message: messageInput,model_name: selectedModel},{onStream: (content) => {// 更新助手消息的内容dispatch(addMessage({...assistantMessage,content: content}));},onHeadersReceived: (headers) => {console.log("收到响应头:", headers);const newConversationId = headers.get('X-Conversation-ID');if (newConversationId) {console.log("收到新对话ID:", newConversationId);// 1. 更新当前对话状态dispatch(setCurrentConversation({id: newConversationId,isNew:true }));}}});// 保存stream引用以便清理eventSourceRef.current = stream;} catch (err) {message.error('发送消息失败');// 清理事件源eventSourceRef.current?.close();} finally {dispatch(setLoading(false));}};

快速开始

环境准备

安装Python 3.8+
安装Node.js 16+(推荐使用nvm管理)
安装MySQL 5.7+
获取DeepSeek API密钥

安装步骤

# 克隆项目
git clone https://gitee.com/cenho/icen.git
cd icen# 后端设置
cd backend
python -m venv .venv
source .venv/bin/activate  # Linux/Mac
# .\.venv\Scripts\activate  # Windows
pip install -r requirements.txt
cp .env.example .env
# 编辑.env文件配置数据库和API密钥# 前端设置
cd ../frontend
npm install

数据库初始化

创建MySQL数据库
配置backend/.env中的数据库连接信息

启动服务

# 启动后端
cd backend
python run.py# 启动前端
cd ../frontend
npm start

访问应用

前端界面:http://localhost:3000
API文档:http://localhost:8000/docs

总结

本文介绍了一个基于大语言模型的通用AI对话系统的设计与实现。该系统采用现代化的技术栈(FastAPI + React),支持用户认证和实时对话功能。通过清晰的工程结构和详细的快速开始指南,开发者可以轻松部署和扩展系统。未来可扩展方向包括知识库集成等功能,进一步提升系统的智能化水平和应用场景广度。

http://www.dtcms.com/a/289272.html

相关文章:

  • ChatIM项目语音识别安装与使用
  • 论文笔记: Holistic Semantic Representation for Navigational Trajectory Generation
  • 《计算机网络》实验报告四 TCP协议分析
  • 基于FPGA的多级流水线加法器verilog实现,包含testbench测试文件
  • Haproxy算法精简化理解及企业级高功能实战
  • Uniapp 纯前端台球计分器开发指南:能否上架微信小程序 打包成APP?
  • 专题 解空间的一种遍历方式:深度优先(Depth First)
  • 【unitrix】 6.9 减一操作(sub_one.rs)
  • Go语言的函数
  • qcow2磁盘虚拟机的使用
  • Spring Cloud Gateway 电商系统实战指南:架构设计与深度优化
  • Work SSD基础知识
  • 数列-冒泡排序,鸡尾酒排序
  • LINUX(三)文件I/O、对文件打开、读、写、偏移量
  • 什么是 ELK/Grafana
  • Cosmos:构建下一代互联网的“区块链互联网
  • roboflow使用教程
  • GaussDB 数据库架构师修炼(七) 安全规划
  • C51单片机学习笔记——定时器与中断
  • Image Processing 【Normlize和DeNormlize】
  • 【Linux】3. Shell语言
  • Oracle触发器:数据世界的“隐形守护者“
  • EXPLAIN 用法详解(表格)
  • 数据结构-线性表顺序表示
  • 【Linux内核模块】导出符号详解:模块间的“资源共享”机制
  • 子查询转连接查询
  • 30天打牢数模基础-模糊综合评价讲解
  • Vue基础(21)_Vue生命周期
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 用户注册实现
  • 《拆解WebRTC:NAT穿透的探测逻辑与中继方案》