当前位置: 首页 > news >正文

基于FastAPI与LangChain的Excel智能数据分析API开发实践

本文将详细介绍如何使用FastAPI和LangChain构建一个支持流式响应的Excel智能数据分析API,实现对结构化数据的自然语言查询与对话式分析。

内容大纲

一、项目概述与核心功能

在现代数据驱动决策的环境中,让非技术用户能够通过自然语言与数据进行交互变得越来越重要。本项目开发了一个基于RESTful API的智能数据分析服务,具有以下核心功能:

  • 自然语言查询: 用户可以使用日常语言提问关于Excel数据的问题
  • 多会话管理: 支持多用户同时使用,各自维护独立的对话历史
  • 流式响应: 提供真正的流式输出体验,大幅提升用户体验
  • Excel数据支持: 支持读取和解析多工作表的Excel文件
  • 智能代理: 基于LangChain构建的Pandas数据分析代理

二、技术架构与工具选择

本项目采用了以下技术栈,每项技术都承担着特定角色:

技术组件版本要求职责说明
FastAPI>=0.68.0高性能Web框架,提供API服务
LangChain>=0.12.0智能代理和链式处理核心
Pandas>=1.3.0DataFrame数据处理和分析
Uvicorn>=0.15.0ASGI服务器,用于运行FastAPI
Tongyi Qwen-大语言模型,提供自然语言理解能力

技术选型理由

  • FastAPI:凭借其异步支持、自动文档生成和高性能特性,成为Python API开发的首选
  • LangChain:提供了与大语言模型集成的标准化方式,简化了代理创建和管理
  • Pandas:是Python数据分析的事实标准库,完美处理Excel文件
  • 流式响应:通过Server-Sent Events(SSE)实现真正流式输出,提升用户体验

三、核心实现细节

3.1 FastAPI应用初始化

首先创建FastAPI应用实例,并设置元数据信息:

from fastapi import FastAPIapp = FastAPI(title="Excel Data Analysis API",description="基于Pandas DataFrame的数据分析API",version="1.0.0"
)

FastAPI基于Python类型提示自动生成API文档,支持OpenAPI标准,开发者可以通过/docs/redoc端点访问交互式文档。

3.2 数据加载与代理初始化

项目启动时自动加载Excel数据并初始化LangChain代理:

@app.on_event("startup")
async def startup_event():"""应用启动时初始化模型和代理"""try:initialize_agent()print("模型和代理初始化成功")except Exception as e:print(f"初始化失败: {e}")raise

数据加载过程支持多工作表Excel文件,自动合并所有工作表到一个DataFrame中:

def initialize_agent():global llm, pandas_agent# 读取Excel文件中的所有工作表excel_file = pd.ExcelFile(path)all_sheets = {}for sheet_name in excel_file.sheet_names:all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)# 合并所有工作表df = pd.concat(all_sheets.values(), ignore_index=True)

这种方法确保了无论Excel文件的结构如何,都能完整地加载所有数据。

3.3 LangChain代理创建

使用LangChain的create_pandas_dataframe_agent创建专门用于Pandas数据处理的AI代理:

pandas_agent = create_pandas_dataframe_agent(llm,  # 通义千问模型实例df,   # 合并后的DataFrameverbose=True,  # 启用详细日志agent_type="zero-shot-react-description",  # 代理类型allow_dangerous_code=True,  # 允许执行代码max_iterations=5  # 最大迭代次数
)

这种代理结合了大语言模型的语言理解能力和Pandas的数据处理能力,能够理解自然语言查询并将其转换为DataFrame操作。

3.4 流式响应实现

为实现真正的流式输出,创建了自定义回调处理器:

class StreamingCallbackHandler(BaseCallbackHandler):"""自定义回调处理器,用于实现真正的流式输出"""def __init__(self):self.tokens = []self.finished = Falsedef on_llm_new_token(self, token: str, **kwargs: Any) -> None:"""当LLM生成新token时调用"""self.tokens.append(token)def on_llm_end(self, response: Any, **kwargs: Any) -> None:"""当LLM生成结束时调用"""self.finished = True

流式响应接口使用Server-Sent Events(SSE)技术:

@app.post("/chat", response_model=ChatResponse)
async def chat_with_data(request: ChatRequest):# ... 省略其他代码 ...if request.stream:return StreamingResponse(true_stream_response(full_input, session_history, request.session_id),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive",})

这种方式相比传统响应,能够实时地将生成的token发送给客户端,大幅减少用户感知的延迟。

3.5 会话管理机制

为实现多用户支持,实现了基于session_id的会话管理:

conversation_history = {}# 在聊天接口中维护会话历史
session_history = conversation_history.get(request.session_id, [])
session_history.append(request.message)
conversation_history[request.session_id] = session_history

还提供了清除历史记录的端点:

@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):"""清除指定会话的历史记录"""if session_id in conversation_history:del conversation_history[session_id]return {"message": f"会话 {session_id} 的历史记录已清除"}

这种设计允许不同用户或不同对话线程保持独立的上下文历史。

四、API端点设计

本项目实现了以下RESTful端点:

端点方法描述参数
/chatPOST主聊天接口,支持流式和非流式响应session_id, message, history_length, stream
/clear_history/{session_id}DELETE清除指定会话的历史记录session_id
/healthGET健康检查端点
/stream_testGET流式接口测试端点

主要请求和响应模型

class ChatRequest(BaseModel):session_id: str  # 会话ID,用于区分不同用户的对话历史message: str  # 用户消息history_length: Optional[int] = 5  # 历史消息长度,默认为5stream: Optional[bool] = False  # 是否使用流式响应class ChatResponse(BaseModel):session_id: strresponse: strsuccess: boolerror: Optional[str] = None

五、部署与性能优化

5.1 生产环境部署

使用Uvicorn作为ASGI服务器部署应用:

uvicorn main:app --host 0.0.0.0 --port 9113 --workers 4 --timeout-keep-alive 300

参数说明

  • --workers 4:启动4个工作进程,充分利用多核CPU
  • --timeout-keep-alive 300:保持连接超时时间设置为300秒
  • --host 0.0.0.0:监听所有网络接口

5.2 性能优化建议

  1. 缓存机制:引入Redis缓存频繁查询的结果
  2. 数据库持久化:使用Redis或数据库持久化会话历史,替代内存存储
  3. 负载均衡:在多服务器部署时使用Nginx进行负载均衡
  4. 异步处理:确保所有I/O操作都使用异步方式,避免阻塞事件循环

六、应用场景与扩展方向

6.1 典型应用场景

  • 企业数据分析:让业务人员通过自然语言查询销售数据、业绩指标等
  • 学术研究:研究人员快速探索实验数据,发现潜在模式
  • 财务报表分析:自动解析财务数据,回答关于收入、支出和利润的问题
  • 客户支持:分析客户反馈数据,识别常见问题和趋势

6.2 扩展方向

  1. 多数据源支持:扩展支持数据库、CSV、JSON等更多数据格式
  2. 可视化集成:将分析结果以图表形式返回,增强数据表现力
  3. 权限控制:增加用户认证和授权,控制数据访问权限
  4. 预定义查询模板:提供常用查询模板,降低用户学习成本
  5. 批量处理:支持批量查询和异步处理大量分析任务

七、总结与展望

本项目展示了如何将FastAPI、LangChain和大语言模型结合,构建一个功能强大的智能数据分析API。关键优势包括:

  1. 自然语言交互:降低了数据查询的技术门槛
  2. 流式响应:大幅提升了用户体验,减少等待时间
  3. 多会话支持:适合多用户并发使用场景
  4. 易于扩展:模块化设计方便后续功能扩展

未来发展方向

  • 支持更多数据源和格式
  • 增加数据可视化能力
  • 实现更复杂的对话状态管理
  • 提供查询建议和自动补全功能

通过这种技术组合,我们能够将先进的大语言模型能力转化为实用的企业级应用,真正实现"用自然语言与数据对话"的愿景。

参考资料

  1. https://fastapi.tiangolo.com/
  2. https://python.langchain.com/
  3. https://pandas.pydata.org/docs/
  4. https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
在这里插入代码片
```# main.py
import pandas as pd
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_experimental.agents import create_pandas_dataframe_agent
import os
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from typing import List, Optional
from fastapi.responses import StreamingResponse
import asyncio
import json
import time
from langchain.callbacks.base import BaseCallbackHandler
from typing import Any, Dict, List
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate# 初始化 FastAPI 应用
app = FastAPI(title="Excel Data Analysis API", description="基于 Pandas DataFrame 的数据分析 API")# 全局变量存储模型和代理
llm = None
pandas_agent = None
conversation_history = {}class StreamingCallbackHandler(BaseCallbackHandler):"""自定义回调处理器,用于实现真正的流式输出"""def __init__(self):self.tokens = []self.finished = Falsedef on_llm_new_token(self, token: str, **kwargs: Any) -> None:"""当LLM生成新token时调用"""self.tokens.append(token)def on_llm_end(self, response: Any, **kwargs: Any) -> None:"""当LLM生成结束时调用"""self.finished = Truedef get_tokens(self):"""获取已生成的tokens"""return self.tokensclass ChatRequest(BaseModel):session_id: str  # 会话ID,用于区分不同用户的对话历史message: str  # 用户消息history_length: Optional[int] = 5  # 历史消息长度,默认为5stream: Optional[bool] = False  # 是否使用流式响应class ChatResponse(BaseModel):session_id: strresponse: strsuccess: boolerror: Optional[str] = Nonedef initialize_agent():"""初始化模型和数据代理"""global llm, pandas_agentpath = r'./data/1.xlsx'# 检查文件是否存在if not os.path.exists(path):raise FileNotFoundError(f"文件未找到:{path}")# 读取 Excel 中的所有工作表excel_file = pd.ExcelFile(path)all_sheets = {}for sheet_name in excel_file.sheet_names:all_sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)# 合并所有工作表到一个DataFrame中df = pd.concat(all_sheets.values(), ignore_index=True)os.environ["DASHSCOPE_API_KEY"] = 'sk-50254f6d4df1ab3c9baf30093c4e'llm = ChatTongyi(model="qwen-max-latest",temperature=0.4,streaming=True  # 启用流式输出)# 创建excel_agentpandas_agent = create_pandas_dataframe_agent(llm,df,verbose=True,agent_type="zero-shot-react-description",allow_dangerous_code=True,max_iterations=5)@app.on_event("startup")
async def startup_event():"""应用启动时初始化模型和代理"""try:initialize_agent()print("模型和代理初始化成功")except Exception as e:print(f"初始化失败: {e}")raise@app.post("/chat", response_model=ChatResponse)
async def chat_with_data(request: ChatRequest):"""与数据进行对话(支持流式和非流式响应)"""global pandas_agent, conversation_historyif not pandas_agent:raise HTTPException(status_code=500, detail="模型未初始化")try:# 获取或创建会话历史session_history = conversation_history.get(request.session_id, [])# 构建历史文本history_text = "\n".join(session_history[-request.history_length:])# 构造带上下文的输入if history_text:full_input = f"聊天历史:{history_text},当前问题:{request.message},请根据历史和当前问题,不要截断输出,展示全部内容,不要总结,严格按照查询内容输出,不要多余输出,如果无法确定指代对象,请询问用户澄清,并且不要编造。".strip()else:full_input = request.message# 如果请求流式响应if request.stream:return StreamingResponse(true_stream_response(full_input, session_history, request.session_id),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive",})# 非流式响应response = pandas_agent.invoke({"input": full_input})output = response['output'] if isinstance(response, dict) else str(response)# 更新会话历史session_history.append(request.message)conversation_history[request.session_id] = session_historyreturn ChatResponse(session_id=request.session_id,response=output,success=True)except Exception as e:if request.stream:# 对于流式请求,返回流式错误响应async def error_stream():yield f"data: {json.dumps({'error': str(e)})}\n\n"return StreamingResponse(error_stream(), media_type="text/event-stream")else:return ChatResponse(session_id=request.session_id,response="",success=False,error=str(e))async def true_stream_response(input_text: str, session_history: list, session_id: str):"""真正的流式响应生成器"""global pandas_agent, conversation_historytry:# 使用回调处理器捕获流式输出callback_handler = StreamingCallbackHandler()# 在后台运行代理处理async def run_agent():try:# 调用代理,传入回调处理器response = await asyncio.get_event_loop().run_in_executor(None,lambda: pandas_agent.invoke({"input": input_text},{"callbacks": [callback_handler]}))# 确保结束标记被设置callback_handler.finished = Trueexcept Exception as e:print(f"代理执行错误: {e}")callback_handler.finished = True# 启动代理任务agent_task = asyncio.create_task(run_agent())# 流式输出循环last_token_count = 0start_time = time.time()max_wait_time = 60  # 最大等待时间60秒while not callback_handler.finished and (time.time() - start_time) < max_wait_time:current_tokens = callback_handler.get_tokens()# 如果有新token,发送给客户端if len(current_tokens) > last_token_count:for i in range(last_token_count, len(current_tokens)):token_data = {"token": current_tokens[i],"type": "token"}yield f"data: {json.dumps(token_data)}\n\n"last_token_count = len(current_tokens)# 短暂等待后继续检查await asyncio.sleep(0.1)# 如果超时,发送错误信息if (time.time() - start_time) >= max_wait_time:error_data = {"error": "请求超时", "type": "error"}yield f"data: {json.dumps(error_data)}\n\n"else:# 发送完成标记done_data = {"done": True, "type": "done"}yield f"data: {json.dumps(done_data)}\n\n"# 更新会话历史current_message = input_text.split("当前问题:")[1].split(",")[0] if "当前问题:" in input_text else input_textsession_history.append(current_message)conversation_history[session_id] = session_history# 等待代理任务完成(如果还未完成)if not agent_task.done():agent_task.cancel()except Exception as e:error_data = {"error": f"流式处理错误: {str(e)}", "type": "error"}yield f"data: {json.dumps(error_data)}\n\n"@app.delete("/clear_history/{session_id}")
async def clear_history(session_id: str):"""清除指定会话的历史记录"""if session_id in conversation_history:del conversation_history[session_id]return {"message": f"会话 {session_id} 的历史记录已清除"}@app.get("/health")
async def health_check():"""健康检查接口"""return {"status": "healthy","model_loaded": pandas_agent is not None,"active_sessions": len(conversation_history)}@app.get("/stream_test")
async def stream_test():"""测试流式接口"""async def generate_test_data():for i in range(10):yield f"data: 测试消息 {i}\n\n"await asyncio.sleep(1)return StreamingResponse(generate_test_data(), media_type="text/event-stream")if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=9113)  
http://www.dtcms.com/a/477724.html

相关文章:

  • 【四级】全国大学英语四级历年真题及答案解析PDF电子版(2015-2025年6月)
  • 专业制造双轴倾角传感器与水平监测传感器的优质厂家分析
  • QtitanNavigation赋能工业制造:提升生产效率的界面导航利器
  • 网站不备案做优化网站建设 中软
  • 成都市建设厅网站查询建设部举报网站
  • 优秘智能深度学习应用场景实战提升效率指南
  • 【01】首页建立-vue+vite开发实战-做一个非常漂亮的APP下载落地页-支持PC和H5自适应提供安卓苹果鸿蒙下载和网页端访问-优雅草卓伊凡
  • 做网站建议农业网站建设模板下载
  • xdma IP使用教程1-xdma ip核配置
  • Pytest参数化实战:高效测试API接口
  • 关于力扣第167场双周赛的第一二题赛后反思
  • Post-training of LLMs
  • 【学习总结】AI接口测试-零基础从接口概念到客达天下系统Apifox+DeepSeek接口测试实战全流程
  • 【苍穹外卖笔记】Day04--套餐管理模块
  • 初识redis(分布式系统, redis的特性, 基本命令)
  • [特殊字符] Avalonia + Silk.NET 加载 3D 模型时 GenBuffer 返回 0?这是个底层兼容性陷阱!
  • 学习threejs,打造交互式花卉生成器
  • Redis 学习笔记(二)
  • 北京展览馆网站建设wordpress插件排列
  • 北京做网站优化多少钱最基本最重要的网站推广工具是
  • 每日算法刷题Day70:10.13:leetcode 二叉树10道题,用时2h
  • MySQL 设置远程 IP 连接方式(含自动检测授权脚本)
  • flash型网站网址高校思政课网站建设
  • 网站建设费做什么会计科目硬件开发外包平台
  • 【SpringBoot从初学者到专家的成长15】MVC、Spring MVC与Spring Boot:理解其差异与联系
  • Docker 存储与数据共享
  • k8s storageclasses nfs-provisioner 部署
  • Linux(Samba服务)
  • 电商智能客服进化论:多轮对话+意图识别+知识推荐系统开发
  • 算法198. 打家劫舍