基于FastAPI与LangChain的Excel智能数据分析API开发实践
本文将详细介绍如何使用FastAPI和LangChain构建一个支持流式响应的Excel智能数据分析API,实现对结构化数据的自然语言查询与对话式分析。
内容大纲
一、项目概述与核心功能
在现代数据驱动决策的环境中,让非技术用户能够通过自然语言与数据进行交互变得越来越重要。本项目开发了一个基于RESTful API的智能数据分析服务,具有以下核心功能:
- 自然语言查询: 用户可以使用日常语言提问关于Excel数据的问题
- 多会话管理: 支持多用户同时使用,各自维护独立的对话历史
- 流式响应: 提供真正的流式输出体验,大幅提升用户体验
- Excel数据支持: 支持读取和解析多工作表的Excel文件
- 智能代理: 基于LangChain构建的Pandas数据分析代理
二、技术架构与工具选择
本项目采用了以下技术栈,每项技术都承担着特定角色:
技术组件 | 版本要求 | 职责说明 |
---|---|---|
FastAPI | >=0.68.0 | 高性能Web框架,提供API服务 |
LangChain | >=0.12.0 | 智能代理和链式处理核心 |
Pandas | >=1.3.0 | DataFrame数据处理和分析 |
Uvicorn | >=0.15.0 | ASGI服务器,用于运行FastAPI |
Tongyi Qwen | - | 大语言模型,提供自然语言理解能力 |
技术选型理由:
- FastAPI:凭借其异步支持、自动文档生成和高性能特性,成为Python API开发的首选
- LangChain:提供了与大语言模型集成的标准化方式,简化了代理创建和管理
- Pandas:是Python数据分析的事实标准库,完美处理Excel文件
- 流式响应:通过Server-Sent Events(SSE)实现真正流式输出,提升用户体验
三、核心实现细节
3.1 FastAPI应用初始化
首先创建FastAPI应用实例,并设置元数据信息:
from fastapi import FastAPIapp = FastAPI(title="Excel Data Analysis API",description="基于Pandas DataFrame的数据分析API",version="1.0.0"
)
FastAPI基于Python类型提示自动生成API文档,支持OpenAPI标准,开发者可以通过/docs
和/redoc
端点访问交互式文档。
3.2 数据加载与代理初始化
项目启动时自动加载Excel数据并初始化LangChain代理:
@app.on_event("startup")
async def startup_event():"""应用启动时初始化模型和代理"""try:initialize_agent()print("模型和代理初始化成功")except Exception as e:print(f"初始化失败: {e}")raise
数据加载过程支持多工作表Excel文件,自动合并所有工作表到一个DataFrame中:
def initialize_agent():global llm, pandas_agent# 读取Excel文件中的所有工作表excel_file = pd.ExcelFile(path)all_sheets = {}for sheet_name in excel_file.sheet_names:all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)# 合并所有工作表df = pd.concat(all_sheets.values(), ignore_index=True)
这种方法确保了无论Excel文件的结构如何,都能完整地加载所有数据。
3.3 LangChain代理创建
使用LangChain的create_pandas_dataframe_agent
创建专门用于Pandas数据处理的AI代理:
pandas_agent = create_pandas_dataframe_agent(llm, # 通义千问模型实例df, # 合并后的DataFrameverbose=True, # 启用详细日志agent_type="zero-shot-react-description", # 代理类型allow_dangerous_code=True, # 允许执行代码max_iterations=5 # 最大迭代次数
)
这种代理结合了大语言模型的语言理解能力和Pandas的数据处理能力,能够理解自然语言查询并将其转换为DataFrame操作。
3.4 流式响应实现
为实现真正的流式输出,创建了自定义回调处理器:
class StreamingCallbackHandler(BaseCallbackHandler):"""自定义回调处理器,用于实现真正的流式输出"""def __init__(self):self.tokens = []self.finished = Falsedef on_llm_new_token(self, token: str, **kwargs: Any) -> None:"""当LLM生成新token时调用"""self.tokens.append(token)def on_llm_end(self, response: Any, **kwargs: Any) -> None:"""当LLM生成结束时调用"""self.finished = True
流式响应接口使用Server-Sent Events(SSE)技术:
@app.post("/chat", response_model=ChatResponse)
async def chat_with_data(request: ChatRequest):# ... 省略其他代码 ...if request.stream:return StreamingResponse(true_stream_response(full_input, session_history, request.session_id),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive",})
这种方式相比传统响应,能够实时地将生成的token发送给客户端,大幅减少用户感知的延迟。
3.5 会话管理机制
为实现多用户支持,实现了基于session_id的会话管理:
conversation_history = {}# 在聊天接口中维护会话历史
session_history = conversation_history.get(request.session_id, [])
session_history.append(request.message)
conversation_history[request.session_id] = session_history
还提供了清除历史记录的端点:
@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):"""清除指定会话的历史记录"""if session_id in conversation_history:del conversation_history[session_id]return {"message": f"会话 {session_id} 的历史记录已清除"}
这种设计允许不同用户或不同对话线程保持独立的上下文历史。
四、API端点设计
本项目实现了以下RESTful端点:
端点 | 方法 | 描述 | 参数 |
---|---|---|---|
/chat | POST | 主聊天接口,支持流式和非流式响应 | session_id, message, history_length, stream |
/clear_history/{session_id} | DELETE | 清除指定会话的历史记录 | session_id |
/health | GET | 健康检查端点 | 无 |
/stream_test | GET | 流式接口测试端点 | 无 |
主要请求和响应模型:
class ChatRequest(BaseModel):session_id: str # 会话ID,用于区分不同用户的对话历史message: str # 用户消息history_length: Optional[int] = 5 # 历史消息长度,默认为5stream: Optional[bool] = False # 是否使用流式响应class ChatResponse(BaseModel):session_id: strresponse: strsuccess: boolerror: Optional[str] = None
五、部署与性能优化
5.1 生产环境部署
使用Uvicorn作为ASGI服务器部署应用:
uvicorn main:app --host 0.0.0.0 --port 9113 --workers 4 --timeout-keep-alive 300
参数说明:
--workers 4
:启动4个工作进程,充分利用多核CPU--timeout-keep-alive 300
:保持连接超时时间设置为300秒--host 0.0.0.0
:监听所有网络接口
5.2 性能优化建议
- 缓存机制:引入Redis缓存频繁查询的结果
- 数据库持久化:使用Redis或数据库持久化会话历史,替代内存存储
- 负载均衡:在多服务器部署时使用Nginx进行负载均衡
- 异步处理:确保所有I/O操作都使用异步方式,避免阻塞事件循环
六、应用场景与扩展方向
6.1 典型应用场景
- 企业数据分析:让业务人员通过自然语言查询销售数据、业绩指标等
- 学术研究:研究人员快速探索实验数据,发现潜在模式
- 财务报表分析:自动解析财务数据,回答关于收入、支出和利润的问题
- 客户支持:分析客户反馈数据,识别常见问题和趋势
6.2 扩展方向
- 多数据源支持:扩展支持数据库、CSV、JSON等更多数据格式
- 可视化集成:将分析结果以图表形式返回,增强数据表现力
- 权限控制:增加用户认证和授权,控制数据访问权限
- 预定义查询模板:提供常用查询模板,降低用户学习成本
- 批量处理:支持批量查询和异步处理大量分析任务
七、总结与展望
本项目展示了如何将FastAPI、LangChain和大语言模型结合,构建一个功能强大的智能数据分析API。关键优势包括:
- 自然语言交互:降低了数据查询的技术门槛
- 流式响应:大幅提升了用户体验,减少等待时间
- 多会话支持:适合多用户并发使用场景
- 易于扩展:模块化设计方便后续功能扩展
未来发展方向:
- 支持更多数据源和格式
- 增加数据可视化能力
- 实现更复杂的对话状态管理
- 提供查询建议和自动补全功能
通过这种技术组合,我们能够将先进的大语言模型能力转化为实用的企业级应用,真正实现"用自然语言与数据对话"的愿景。
参考资料
- https://fastapi.tiangolo.com/
- https://python.langchain.com/
- https://pandas.pydata.org/docs/
- https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
在这里插入代码片
```# main.py
import pandas as pd
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_experimental.agents import create_pandas_dataframe_agent
import os
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from typing import List, Optional
from fastapi.responses import StreamingResponse
import asyncio
import json
import time
from langchain.callbacks.base import BaseCallbackHandler
from typing import Any, Dict, List
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate# 初始化 FastAPI 应用
app = FastAPI(title="Excel Data Analysis API", description="基于 Pandas DataFrame 的数据分析 API")# 全局变量存储模型和代理
llm = None
pandas_agent = None
conversation_history = {}class StreamingCallbackHandler(BaseCallbackHandler):"""自定义回调处理器,用于实现真正的流式输出"""def __init__(self):self.tokens = []self.finished = Falsedef on_llm_new_token(self, token: str, **kwargs: Any) -> None:"""当LLM生成新token时调用"""self.tokens.append(token)def on_llm_end(self, response: Any, **kwargs: Any) -> None:"""当LLM生成结束时调用"""self.finished = Truedef get_tokens(self):"""获取已生成的tokens"""return self.tokensclass ChatRequest(BaseModel):session_id: str # 会话ID,用于区分不同用户的对话历史message: str # 用户消息history_length: Optional[int] = 5 # 历史消息长度,默认为5stream: Optional[bool] = False # 是否使用流式响应class ChatResponse(BaseModel):session_id: strresponse: strsuccess: boolerror: Optional[str] = Nonedef initialize_agent():"""初始化模型和数据代理"""global llm, pandas_agentpath = r'./data/1.xlsx'# 检查文件是否存在if not os.path.exists(path):raise FileNotFoundError(f"文件未找到:{path}")# 读取 Excel 中的所有工作表excel_file = pd.ExcelFile(path)all_sheets = {}for sheet_name in excel_file.sheet_names:all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)# 合并所有工作表到一个DataFrame中df = pd.concat(all_sheets.values(), ignore_index=True)os.environ["DASHSCOPE_API_KEY"] = 'sk-50254f6d4df1ab3c9baf30093c4e'llm = ChatTongyi(model="qwen-max-latest",temperature=0.4,streaming=True # 启用流式输出)# 创建excel_agentpandas_agent = create_pandas_dataframe_agent(llm,df,verbose=True,agent_type="zero-shot-react-description",allow_dangerous_code=True,max_iterations=5)@app.on_event("startup")
async def startup_event():"""应用启动时初始化模型和代理"""try:initialize_agent()print("模型和代理初始化成功")except Exception as e:print(f"初始化失败: {e}")raise@app.post("/chat", response_model=ChatResponse)
async def chat_with_data(request: ChatRequest):"""与数据进行对话(支持流式和非流式响应)"""global pandas_agent, conversation_historyif not pandas_agent:raise HTTPException(status_code=500, detail="模型未初始化")try:# 获取或创建会话历史session_history = conversation_history.get(request.session_id, [])# 构建历史文本history_text = "\n".join(session_history[-request.history_length:])# 构造带上下文的输入if history_text:full_input = f"聊天历史:{history_text},当前问题:{request.message},请根据历史和当前问题,不要截断输出,展示全部内容,不要总结,严格按照查询内容输出,不要多余输出,如果无法确定指代对象,请询问用户澄清,并且不要编造。".strip()else:full_input = request.message# 如果请求流式响应if request.stream:return StreamingResponse(true_stream_response(full_input, session_history, request.session_id),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive",})# 非流式响应response = pandas_agent.invoke({"input": full_input})output = response['output'] if isinstance(response, dict) else str(response)# 更新会话历史session_history.append(request.message)conversation_history[request.session_id] = session_historyreturn ChatResponse(session_id=request.session_id,response=output,success=True)except Exception as e:if request.stream:# 对于流式请求,返回流式错误响应async def error_stream():yield f"data: {json.dumps({'error': str(e)})}\n\n"return StreamingResponse(error_stream(), media_type="text/event-stream")else:return ChatResponse(session_id=request.session_id,response="",success=False,error=str(e))async def true_stream_response(input_text: str, session_history: list, session_id: str):"""真正的流式响应生成器"""global pandas_agent, conversation_historytry:# 使用回调处理器捕获流式输出callback_handler = StreamingCallbackHandler()# 在后台运行代理处理async def run_agent():try:# 调用代理,传入回调处理器response = await asyncio.get_event_loop().run_in_executor(None,lambda: pandas_agent.invoke({"input": input_text},{"callbacks": [callback_handler]}))# 确保结束标记被设置callback_handler.finished = Trueexcept Exception as e:print(f"代理执行错误: {e}")callback_handler.finished = True# 启动代理任务agent_task = asyncio.create_task(run_agent())# 流式输出循环last_token_count = 0start_time = time.time()max_wait_time = 60 # 最大等待时间60秒while not callback_handler.finished and (time.time() - start_time) < max_wait_time:current_tokens = callback_handler.get_tokens()# 如果有新token,发送给客户端if len(current_tokens) > last_token_count:for i in range(last_token_count, len(current_tokens)):token_data = {"token": current_tokens[i],"type": "token"}yield f"data: {json.dumps(token_data)}\n\n"last_token_count = len(current_tokens)# 短暂等待后继续检查await asyncio.sleep(0.1)# 如果超时,发送错误信息if (time.time() - start_time) >= max_wait_time:error_data = {"error": "请求超时", "type": "error"}yield f"data: {json.dumps(error_data)}\n\n"else:# 发送完成标记done_data = {"done": True, "type": "done"}yield f"data: {json.dumps(done_data)}\n\n"# 更新会话历史current_message = input_text.split("当前问题:")[1].split(",")[0] if "当前问题:" in input_text else input_textsession_history.append(current_message)conversation_history[session_id] = session_history# 等待代理任务完成(如果还未完成)if not agent_task.done():agent_task.cancel()except Exception as e:error_data = {"error": f"流式处理错误: {str(e)}", "type": "error"}yield f"data: {json.dumps(error_data)}\n\n"@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):"""清除指定会话的历史记录"""if session_id in conversation_history:del conversation_history[session_id]return {"message": f"会话 {session_id} 的历史记录已清除"}@app.get("/health")
async def health_check():"""健康检查接口"""return {"status": "healthy","model_loaded": pandas_agent is not None,"active_sessions": len(conversation_history)}@app.get("/stream_test")
async def stream_test():"""测试流式接口"""async def generate_test_data():for i in range(10):yield f"data: 测试消息 {i}\n\n"await asyncio.sleep(1)return StreamingResponse(generate_test_data(), media_type="text/event-stream")if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=9113)