LangGraph 集成 MCP Server
简介:
LangGraph 与 MCP(模型上下文协议)的结合使用,实现了大语言模型与外部工具、数据源的标准化交互,核心功能包括多轮对话记忆、动态工作流分支及工具调用链管理。
下面例子有两个python文件,一个用作服务器端,一个用作客户端。
服务器端:
FastMCP 也可以非常方便创建 MCP Server
from mcp.server.fastmcp import FastMCP
创建名为 “Logistics” 的MCP服务器
mcp = FastMCP("Logistics")
工具1:查询包裹状态
@mcp.tool()
def get_package_status(tracking_id: str) -> str:"""Get the current status of a package by tracking ID."""statuses = {"LGT123456": "已发货,在途中","LGT789012": "本地分拣中心,待派送","LGT345678": "已签收","LGT000000": "无效单号,请检查"}return statuses.get(tracking_id, "未找到该包裹信息")
工具2:计算运算费用
@mcp.tool()
def estimate_delivery_time(distance_km: float) -> str:"""Estimate delivery time in hours based on distance."""avg_speed_km_per_hour = 40time_hours = distance_km / avg_speed_km_per_hourdays = int(time_hours // 24)hours = round(time_hours % 24)return f"预计送达时间:{days} 天 {hours} 小时"
工具3:预计到达时间
@mcp.tool()
def estimate_delivery_time(distance_km: float) -> str:"""Estimate delivery time in hours based on distance."""avg_speed_km_per_hour = 40time_hours = distance_km / avg_speed_km_per_hourdays = int(time_hours // 24)hours = round(time_hours % 24)return f"预计送达时间:{days} 天 {hours} 小时"
完整代码:
from mcp.server.fastmcp import FastMCPmcp = FastMCP("Logistics")@mcp.tool()
def get_package_status(tracking_id: str) -> str:"""Get the current status of a package by tracking ID."""statuses = {"LGT123456": "已发货,在途中","LGT789012": "本地分拣中心,待派送","LGT345678": "已签收","LGT000000": "无效单号,请检查"}return statuses.get(tracking_id, "未找到该包裹信息")@mcp.tool()
def calculate_shipping_cost(weight_kg: float, distance_km: float) -> str:"""Calculate shipping cost based on weight and distance."""base_rate = 5.0per_kg_rate = 2.0per_km_rate = 0.01cost = base_rate + (weight_kg * per_kg_rate) + (distance_km * per_km_rate)return f"运费估算:{round(cost, 2)} 元"@mcp.tool()
def estimate_delivery_time(distance_km: float) -> str:"""Estimate delivery time in hours based on distance."""avg_speed_km_per_hour = 40time_hours = distance_km / avg_speed_km_per_hourdays = int(time_hours // 24)hours = round(time_hours % 24)return f"预计送达时间:{days} 天 {hours} 小时"if __name__ == "__main__":mcp.run(transport="streamable-http")
客戶端:
完整代码:
# pip install langchain-mcp-adapters
import os
from typing import List
from typing_extensions import TypedDict
from typing import Annotated
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.prebuilt import tools_condition, ToolNode
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_mcp_adapters.prompts import load_mcp_prompt
from langchain_openai import ChatOpenAI
import asyncio# 初始化多服务器客户端
client = MultiServerMCPClient({# "math": {# "command": "python",# "args": ["math_mcp_server.py"], # 确保这个文件存在且可运行# "transport": "stdio",# },"logistics": {"url": "http://localhost:8000/mcp", # 物流MCP服务地址"transport": "streamable_http",}}
)async def create_graph(logistics_session):# 使用 OpenAI 模型llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, api_key="sk-淘宝35块apikey")# 加载工具logistics_tools = await load_mcp_tools(logistics_session)tools = logistics_tools# 绑定工具到 LLMllm_with_tool = llm.bind_tools(tools)# 可选:从 MCP 加载 system prompt - 静默处理system_prompt = "你是一个智能助手,可以调用工具来回答用户问题。"try:system_prompt_msg = await load_mcp_prompt(logistics_session, "system_prompt")if system_prompt_msg and len(system_prompt_msg) > 0:system_prompt = system_prompt_msg[0].contentexcept Exception:# 静默失败,使用默认提示,不打印错误信息passprompt_template = ChatPromptTemplate.from_messages([("system", system_prompt),MessagesPlaceholder(variable_name="messages")])chat_llm = prompt_template | llm_with_tool# 状态定义class State(TypedDict):messages: Annotated[List[AnyMessage], add_messages]# 节点函数def chat_node(state: State) -> State:response = chat_llm.invoke({"messages": state["messages"]})return {"messages": [response]}tool_node = ToolNode(tools=tools)# 构建图graph_builder = StateGraph(State)graph_builder.add_node("chat_node", chat_node)graph_builder.add_node("tool_node", tool_node)graph_builder.add_edge(START, "chat_node")graph_builder.add_conditional_edges("chat_node",tools_condition,{"tools": "tool_node", "__end__": END})graph_builder.add_edge("tool_node", "chat_node")graph = graph_builder.compile(checkpointer=MemorySaver())return graphasync def main():config = {"configurable": {"thread_id": "logistics_thread_001"}}async with client.session("logistics") as logistics_session:print("MCP 客户端已连接:Logistics 服务")agent = await create_graph(logistics_session)print("\n欢迎使用智能物流客服!你可以询问:")print("包裹状态、运费、送达时间等")print("输入 'quit' 退出\n")while True:try:user_input = input("User: ").strip()if user_input.lower() in ["quit", "exit", "退出"]:print("再见!")break# 调用代理response = await agent.ainvoke({"messages": [{"role": "user", "content": user_input}]},config=config)ai_message = response["messages"][-1].contentprint(f"AI: {ai_message}\n")except KeyboardInterrupt:print("\n已退出。")breakexcept Exception as e:print(f"出错: {e}")if __name__ == "__main__":asyncio.run(main())
测试案例:
启动服务器端:
启动客户器端:
测试案例:
1.我的包裹 LGT123456 到哪了?
2.从北京到上海寄一个5公斤的包裹要多少钱?距离大约是1200公里。
3.那大概多久能到?
因为LangGraph 自带记忆功能,所以可以直接问下一个问题。