LangChain + MCP 构建带可视化图表功能的ChatBI智能体
一、实现流程介绍
在本专栏的前面文章中,介绍过基于 LangChain + MCP
和 SpringAI + MCP
构建 ChatBI
智能体,但未对数据进行可视化图表的分析,而可视化图表是数据分析的重要组成部分,它能够将复杂的数据转化为直观易懂的图形表示。因此本篇文章基于 LangChain + MCP
实验构建 ChatBI + 可视化图表 综合的智能体。
使用数据说明
其中分析数据采用数据采用 COVID-19
测试案例,包括:美国 2021-01-28
号,各个县county
的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:
date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27
下载地址如下:
https://github.com/BIXUECHAO/covid-19-tes-data/blob/main/us-covid19-counties.csv
整体实现过程如下所示:
其中可视化图表能力采用 ModelScope
上公开的 MCP-Server
,地址为:
https://modelscope.cn/mcp/servers/@antvis/mcp-server-chart
连接方式:
{"mcpServers": {"mcp-server-chart": {"type": "sse","url": "https://mcp.api-inference.modelscope.net/530c948576c248/sse"}}
}
Covid19 Query MCP Server
则是通过 FastMCP
来自定义实现。大模型端采用 OpenAI
的 GPT-4.1
模型,你也可以换成其他支持工具调通的模型。
实验最终实现的效果如下所示:
提问: 找出确诊人数前五名的州,按饼图的方式绘制给我
提问: 根据每个县的确诊人数取Top10,以柱状图的方式呈现给我
本次实验所依赖的版本如下所示:
mcp==1.9.2
openai==1.75.0
langchain==0.3.25
langchain-openai==0.3.18
langgraph==0.4.7
pymysql==1.0.3
二、搭建 数据查询 MCP Server
首先创建数据表,并导入上述数据:
CREATE TABLE `us_covid19_counties` (`date` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '日期',`county` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '县',`state` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '州',`fips` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '县编码code',`cases` int DEFAULT NULL COMMENT '累计确诊病例',`deaths` int DEFAULT NULL COMMENT '累计死亡病例'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='COVID-19 案例';
实现数据库操作,这里仅仅做了数据库交互,实际使用时应考虑很多性能细节的优化:
utils_db.py
import pymysqldef get_conn():return pymysql.connect(host="127.0.0.1",port=3306,database="test3",user="root",password="root",autocommit=True)def query(sql):conn = get_conn()cursor = conn.cursor()cursor.execute(sql)columns = [column[0] for column in cursor.description]res = list()for row in cursor.fetchall():res.append(dict(zip(columns, row)))cursor.close()conn.close()return res
创建数据查询 MCP Server
,这里实现了两个 MCP Tools
, 分别是获取表 Schema
和 执行SQL
两个操作:
import jsonfrom mcp.server.fastmcp import FastMCP
import utils_dbmcp = FastMCP("COVID-19-DB-Query-Mcp-Server")schema_sql = """
SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'test3' AND TABLE_NAME = '{table}'
"""effective_tables = ["us_covid19_counties"]@mcp.tool()
def get_table_schema() -> str:"""获取当前有效表的 MySQL Schema, 生成SQL之前,你应该使用该工具获取完整的表结构"""table_schema = []for table in effective_tables:schemas = utils_db.query(schema_sql.format(table=table))schemas = ", ".join([f"{s['COLUMN_NAME']} {s['DATA_TYPE']} COMMENT {s['COLUMN_COMMENT']}" for s in schemas])table_schema.append(f"TABLE `{table}` ({schemas})")return "\n\n".join(table_schema)@mcp.tool()
def run_sql(sql: str) -> str:"""执行MySQL SQL语句查询数据,一次仅能执行一句SQL!"""try:return str(utils_db.query(sql))except Exception as e:return f"执行SQL错误:{str(e)} ,请修正后重新发起。"if __name__ == "__main__":mcp.settings.port = 6030mcp.run("sse")
启动 MCP Server
:
三、LangChain MCP Client 智能体构建
LangChain
官方关于 MCP
的集成介绍文档如下:
https://langchain-ai.github.io/langgraph/agents/mcp/
关于MCP
的连接,通过 MultiServerMCPClient
抽取出所有的 MCP Tools
,然后交给 create_react_agent
构建智能体。
实现过程和本地控制台测试逻辑如下:
import os, configos.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1" ## 支持openai协议的 API BASE
os.environ["OPENAI_API_KEY"] = "sk-xxxxx" ## 你的API KEY
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from colorama import Fore, Style
import asyncio
from langgraph.checkpoint.memory import InMemorySaverasync def main():client = MultiServerMCPClient({"chart": {"url": "https://mcp.api-inference.modelscope.net/530c948576c248/sse","transport": "sse"},"covid19-query": {"url": "http://127.0.0.1:6030/sse","transport": "sse"}})tools = await client.get_tools()checkpointer = InMemorySaver()agent = create_react_agent("openai:gpt-4.1",tools,checkpointer=checkpointer,prompt="You are an AI assistant that helps people find information.")config = {"configurable": {"thread_id": "1"}}while True:question = input("请输入:")if not question:continueif question == "q":breakasync for chunk in agent.astream({"messages": [{"role": "user", "content": question}]},stream_mode="updates",config=config):if "agent" in chunk:content = chunk["agent"]["messages"][0].contenttool_calls = chunk["agent"]["messages"][0].tool_callsif tool_calls:for tool in tool_calls:print(Fore.YELLOW, Style.BRIGHT, f">>> Call MCP Server: {tool['name']} , args: {tool['args']}")else:print(Fore.BLACK, Style.BRIGHT, f"LLM: {content}")elif "tools" in chunk:content = chunk["tools"]["messages"][0].contentname = chunk["tools"]["messages"][0].nameprint(Fore.GREEN, Style.BRIGHT, f"<<< {name} : {content}")if __name__ == '__main__':asyncio.run(main())
运行测试:
提问:统计的确诊数量前十名的州,以折线图的方式展示
四、封装为 OpenAI 协议接口
上述已经通过控制台实现了完整的 Agent
交互过程,下面继续改造逻辑,使用 FastAPI
封装为 OpenAI
协议接口,便于在客户端使用。
注意:这里主要为实现功能,api_key
写死在程序中为: sk-da4b6cb4a41e4cascascasc9508deb556942
(随机生成的), 后续使用客户端连接时,需要填写该 api_key
。
import os
os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1" ## 支持openai协议的 API BASE
os.environ["OPENAI_API_KEY"] = "sk-xxxxx" ## 你的API KEYfrom fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Dict, Any, Optional, Generator
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.client import MultiServerMCPClient
from datetime import datetime
import time, uuidapp = FastAPI(title="OpenAI Compatible Chat API")
api_key = "sk-da4b6cb4a41e4cascascasc9508deb556942"
agent = Noneclass ChatCompletionRequest(BaseModel):model: strmessages: List[Dict[str, Any]]temperature: Optional[float] = 1.0max_tokens: Optional[int] = Nonestream: Optional[bool] = Falseclass ChatCompletionChoice(BaseModel):index: intmessage: List[Dict[str, Any]]finish_reason: strclass ChatCompletionResponse(BaseModel):id: strobject: str = "chat.completion"created: intmodel: strchoices: List[ChatCompletionChoice]usage: Dict[str, int]class ChatCompletionChunk(BaseModel):id: strobject: str = "chat.completion.chunk"created: intmodel: strchoices: List[Dict[str, Any]]async def create_agent():'''创建MCP Agent'''client = MultiServerMCPClient({"chart": {"url": "https://mcp.api-inference.modelscope.net/530c948576c248/sse","transport": "sse"},"covid19-query": {"url": "http://127.0.0.1:6030/sse","transport": "sse"}})tools = await client.get_tools()return create_react_agent("openai:gpt-4.1",,tools,prompt="You are an AI assistant that helps people find information.\n针对用户的问题,你的执行流程是1.获取表结构。2.生成SQL查询数据。3.生成可视化图表。4.总结结果回复用户")async def invoke_agent(messages: []):global agentif not agent:agent = await create_agent()async for chunk in agent.astream({"messages": messages}, stream_mode="updates"):if "agent" in chunk:content = chunk["agent"]["messages"][0].contenttool_calls = chunk["agent"]["messages"][0].tool_callsif tool_calls:for tool in tool_calls:yield f"> ```Call MCP Server: {tool['name']}```\n\n"else:yield contentasync def handle_stream_response(messages: [],model: str, request_id: str) -> Generator[str, None, None]:# 执行 agentasync for msg in invoke_agent(messages):chunk_data = ChatCompletionChunk(id=request_id,created=int(time.time()),model=model,choices=[{"index": 0,"delta": {"content": msg},"finish_reason": None}])yield f"data: {chunk_data.model_dump_json()}\n\n"final_chunk = ChatCompletionChunk(id=request_id,created=int(time.time()),model=model,choices=[{"index": 0,"delta": {},"finish_reason": "stop"}])yield f"data: {final_chunk.model_dump_json()}\n\n"yield "data: [DONE]\n\n"async def verify_auth(authorization: Optional[str] = Header(None)) -> bool:'''验证token'''if not authorization:return Falseif authorization.startswith("Bearer "):token = authorization[7:]else:token = authorizationreturn token == api_key@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest, auth_result: bool = Depends(verify_auth)):# 检查身份验证结果if not auth_result:raise HTTPException(status_code=401,detail={"error": {"message": "Invalid authentication credentials","type": "invalid_request_error","param": None,"code": "invalid_api_key"}},headers={"WWW-Authenticate": "Bearer"})## 暂不支持非流式返回if not request.stream:raise HTTPException(status_code=400,detail={"error": {"message": "Streaming responses are not implemented in this mock API","type": "invalid_request_error","param": "stream","code": "invalid_parameter"}})try:# 触发 agent 并流式返回request_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"return StreamingResponse(handle_stream_response(request.messages, request.model, request_id),media_type="text/plain",headers={"Cache-Control": "no-cache","Connection": "keep-alive","Content-Type": "text/event-stream"})except Exception as e:raise HTTPException(status_code=500, detail=str(e))@app.get("/v1/models")
async def list_models():"""列出可用模型"""return {"object": "list","data": [{"id": "agent_model","object": "model","created": int(time.time()),"owned_by": "agent"}]}@app.get("/health")
async def health_check():"""健康检查"""return {"status": "healthy", "timestamp": datetime.now().isoformat()}if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
启动服务:
五、使用 OpenAI 协议客户端进行测试
这里以 Cherry Studio
为例,你也可以换成 OpenWebUI
或者其他支持 OpenAI
协议的客户端:
配置连接:
对话测试:
提问:统计的确诊数量前十名的州,以折线图的方式展示
提问: 找出确诊人数前五名的州,按饼图的方式绘制给我
提问: 根据每个县的确诊人数取Top10,以柱状图的方式呈现给我
提问: 统计确诊数量最多的州占整体的百分比,以液体展示