当前位置: 首页 > news >正文

基于MCP架构的LLM-Agent融合—构建AI Agent的技术体系与落地实践

随着大型语言模型(LLM)能力的不断提升,构建具备复杂任务处理能力的AI Agent系统成为研究与应用的热点。然而,赋予Agent调用外部工具的能力是其走向实用化的关键一步。本文旨在探讨一种新兴的标准化工具接口协议——Model Context Protocol (MCP),并深入分析如何将其集成到当前八种主流的LLM Agent开发框架中,包括OpenAI Agents SDK、LangGraph、LlamaIndex、AutoGen、Pydantic AI、SmolAgents、Camel和CrewAI。通过梳理各框架的核心特性、集成方法及代码示例,本文为开发者提供了一套清晰的实践指南,旨在促进Agent系统与外部世界的高效交互,推动AI Agent技术的标准化与工程化落地。

1. 什么是 MCP Server?

MCP(Model Context Protocol)Server 是一个标准化的工具接口协议,它允许 AI Agent 通过统一的方式调用各种外部工具和服务。无论是搜索引擎、数据库查询,还是API调用,MCP Server 都能提供标准化的接入方式。

MCP Server支持两种主要连接模式:

  • Stdio模式:通过命令行进程通信,适合本地开发
  • SSE模式:通过HTTP连接,适合生产环境部署

在这里插入图片描述

2. 主流LLM Agent框架与MCP集成实战

2.1 OpenAI Agents SDK - 轻量级多Agent协作

OpenAI官方推出的轻量级框架,特别适合构建简单的多Agent协作场景。

  • 集成要点 : 使用mcp-client-python库连接MCP Server,并将获取的工具传递给create_agent 函数。

  • 安装依赖: pip install openai-agents-sdk mcp-client-python

  • 优势 :API简洁,OpenAI生态无缝集成。

  • 应用场景 :创建“研究员”和“写手Agent,研究员调用MCP搜索工具获取信息,完成后将任务转交给写手生成报告。

    # 基础Agent设置
    from openai_agents import Agent, create_agent
    from mcp_client import MCPClient
    import asyncioasync def setup_openai_agent_with_mcp():# 创建MCP客户端mcp_client = MCPClient()# 连接Tavily搜索工具await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])# 获取工具列表tools = await mcp_client.get_tools()# 创建Agentagent = create_agent(name="搜索助手",instructions="你是一个专业的搜索助手,能够帮助用户获取最新信息",tools=tools,model="gpt-4")return agent, mcp_client# 运行Agent
    async def main():agent, mcp_client = await setup_openai_agent_with_mcp()# 与Agent对话response = await agent.run("帮我搜索一下2025年AI领域的重要进展")print(response.content)# 清理连接await mcp_client.disconnect()# 执行
    if __name__ == "__main__":asyncio.run(main())
    

创建和运行多Agent协作系统的异步函数,使用了类似 OpenAI Assistants API 的概念,并结合了 MCP(Model Control Protocol)客户端来集成外部工具(如 Tavily 搜索):

  • 安装依赖: pip install langgraph langchain-openai tavily-python

    import asyncio
    from typing import TypedDict, Annotated, Sequence
    from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
    from langchain_openai import ChatOpenAI
    from tavily import TavilyClient
    from langgraph.graph import StateGraph, END# 初始化模型
    llm = ChatOpenAI(model="gpt-4", temperature=0)# 初始化 Tavily 搜索工具
    tavily = TavilyClient(api_key="your_tavily_api_key")  # 替换为你的 Tavily Key# 定义状态
    class AgentState(TypedDict):messages: Annotated[Sequence[BaseMessage], ...]current_agent: strresearch_data: str# 创建研究员 Agent
    async def researcher_node(state: AgentState):query = state['messages'][-1].contentprint("🔍 研究员正在搜集信息...")search_result = tavily.search(query=query, max_results=5)result_text = "\n\n".join([r["content"] for r in search_result["results"]])# 调用 LLM 做初步分析(可选)response = llm.invoke([HumanMessage(content=f"请总结以下关于'{query}'的信息,用于写科普文章:\n{result_text}")])return {**state,"research_data": response.content,"messages": state["messages"] + [AIMessage(content=response.content)],"current_agent": "writer"}# 创建写手 Agent
    async def writer_node(state: AgentState):print("✍️ 写手正在撰写文章...")research_summary = state["research_data"]prompt = f"""请将以下研究内容整理成一篇通俗易懂的中文科普文章,风格生动,适合大众阅读:{research_summary}"""response = llm.invoke([HumanMessage(content=prompt)])return {**state,"messages": state["messages"] + [AIMessage(content=response.content)],"current_agent": "end"}# 构建图
    async def create_agent_team():workflow = StateGraph(AgentState)workflow.add_node("researcher", researcher_node)workflow.add_node("writer", writer_node)workflow.set_entry_point("researcher")# 设置流转逻辑workflow.add_conditional_edges("researcher",lambda x: "writer",{"writer": "writer"})workflow.add_edge("writer", END)graph = workflow.compile()return graph# 运行任务
    async def run_team_task():app = await create_agent_team()inputs = {"messages": [HumanMessage(content="研究量子计算的最新突破,并整理成一篇科普文章")],"current_agent": "researcher","research_data": ""}async for output in app.astream(inputs):for key, value in output.items():print(f"\n[Node: {key}]")if 'research_data' in value:print("📊 研究完成,数据已生成...")if 'messages' in value and len(value['messages']) > 0:last_msg = value['messages'][-1]if isinstance(last_msg, AIMessage):print("📝 输出:", last_msg.content[:200] + "..." if len(last_msg.content) > 200 else last_msg.content)# 最终结果final_msg = output.get("writer", {}).get("messages", [])[-1]print("\n" + "="*50)print("最终文章:\n", final_msg.content)print("="*50)# 主程序入口
    if __name__ == "__main__":asyncio.run(run_team_task())
    

    多Agent协作示例

    # 多Agent协作示例async def create_agent_team():# 创建MCP客户端mcp_client = MCPClient()await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])tools = await mcp_client.get_tools()# 创建研究员Agentresearcher = create_agent(name="研究员",instructions="负责信息搜集和分析",tools=tools,model="gpt-4")# 创建写手Agentwriter = create_agent(name="写手",instructions="负责将研究结果整理成文章",model="gpt-4")# 创建协作工作流from openai_agents import Handoffs# 配置Agent间的任务转交handoffs = Handoffs()handoffs.add_handoff(from_agent=researcher,to_agent=writer,condition="完成信息收集后转交给写手")return researcher, writer, handoffs# 运行协作任务
    async def run_team_task():researcher, writer, handoffs = await create_agent_team()# 启动协作任务result = await researcher.run("研究量子计算的最新突破,并整理成一篇科普文章",handoffs=handoffs)print(result.content)
    

2.2 LangGraph - 有状态图工作流

基于图结构的框架,适用于需要状态管理和复杂决策逻辑的长周期任务。

  • MCP集成 :使用load_mcp_toolsMultiServerMCPClient加载工具。

  • 应用场景 :构建“研究-分析-报告”三阶段工作流。研究节点调用MCP搜索,分析和报告节点则基于研究结果进行纯推理。

  • 优势 :可视化工作流,状态持久化,适合处理复杂、多步骤任务。

  • 安装依赖:pip install langgraph langchain-openai mcp-client-python

    from langgraph.prebuilt import create_react_agent
    from langchain_openai import ChatOpenAI
    from mcp_client import MCPClient, load_mcp_tools
    import asyncio
    async def create_langgraph_agent():# 方法1:使用load_mcp_toolstools = await load_mcp_tools(server_config={"command": "npx","args": ["@tavily/mcp-server"]})# 初始化LLMllm = ChatOpenAI(model="gpt-4")# 创建ReactAgentagent = create_react_agent(llm=llm,tools=tools,state_modifier="你是一个专业的AI助手,能够使用各种工具帮助用户解决问题")return agent
    # 运行Agent
    async def run_langgraph_agent():agent = await create_langgraph_agent()# 执行任务config = {"configurable": {"thread_id": "1"}}result = await agent.ainvoke({"messages": [("user", "帮我搜索并总结Python 3.12的新特性")]},config=config)print(result["messages"][-1].content)
    if __name__ == "__main__":asyncio.run(run_langgraph_agent())
    

集成多个 MCP Server(如 Tavily 搜索、数据库等)作为工具,供一个 Agent 使用,并通过 LangGraph 驱动 ReAct 模式完成任务。

  • 安装依赖: pip install langchain langchain-mcp tavily-python openai

    import asyncio
    import json
    import sys
    from typing import List, Dict, Any
    from langchain_core.tools import Tool
    from langchain_openai import ChatOpenAI
    from langchain.agents import create_react_agent
    from langchain.agents.agent import AgentExecutor
    from langchain.memory import ConversationBufferMemory# =======================
    # MCP 客户端:与 MCP Server 通信
    # =======================
    class MCPClient:def __init__(self, server_id: str, command: str, args: List[str]):self.server_id = server_idself.command = commandself.args = argsself.reader = Noneself.writer = Noneasync def connect(self):print(f"🔁 启动 MCP Server: {self.server_id}")proc = await asyncio.create_subprocess_exec(self.command,*self.args,stdin=asyncio.subprocess.PIPE,stdout=asyncio.subprocess.PIPE,stderr=sys.stderr,)self.reader, self.writer = proc.stdin, proc.stdout# 读取初始化消息header = await self._read_line()if not header.startswith("MCP/"):raise RuntimeError(f"Invalid MCP server response: {header}")# 发送客户端握手self._send({"version": "0.2.0"})init = await self._read_json()if init["type"] != "init":raise RuntimeError("Expected init message")print(f"✅ MCP Server '{self.server_id}' 初始化完成,工具数: {len(init.get('tools', []))}")return init.get("tools", [])async def _read_line(self) -> str:line = await self.reader.readline()return line.decode().strip()def _send(self, data: dict):text = json.dumps(data, ensure_ascii=False)self.writer.write((text + "\n").encode())# 注意:不 await writer.drain() 可能导致问题# await self.writer.drain()async def _read_json(self) -> dict:while True:line = await self.reader.readline()if not line:raise EOFError("Connection closed")try:return json.loads(line.decode())except json.JSONDecodeError:continueasync def close(self):if self.writer:self.writer.close()await self.writer.wait_closed()# =======================
    # 创建多 Server Agent
    # =======================
    async def create_multi_server_agent():clients: List[MCPClient] = []all_tools: List[Tool] = []# 创建 MCP 客户端列表mcp_configs = [("search", "npx", ["@tavily/mcp-server"]),# 示例:其他 MCP Server(需要实际存在)# ("db", "python", ["-m", "my_mcp_db_server"]),]for server_id, cmd, args in mcp_configs:client = MCPClient(server_id, cmd, args)await client.connect()clients.append(client)# 获取工具并转换为 LangChain Tooltools = await client.get_all_tools()  # 实际上我们已经在 connect 返回了 toolsfor tool in tools:lc_tool = Tool(name=f"{server_id}.{tool['name']}",description=tool.get("description", "") or tool.get("prompt", ""),func=lambda q, t=tool, c=client: asyncio.run(c.call_tool(t["name"], {"query": q})),)all_tools.append(lc_tool)# 初始化 LLMllm = ChatOpenAI(model="gpt-4o", temperature=0)# 创建 ReAct Agentfrom langchain import hubprompt = hub.pull("hwchase17/react")agent = create_react_agent(llm, all_tools, prompt)agent_executor = AgentExecutor(agent=agent, tools=all_tools, verbose=True)return agent_executor, clients# =======================
    # 调用工具(简化版,需增强)
    # =======================
    # 注意:上面的 lambda 有问题(闭包陷阱),需改写为:
    async def call_tool(client: MCPClient, tool_name: str, input_dict: dict):# 发送调用消息client._send({"type": "call","method": tool_name,"params": input_dict})# 等待结果(简化处理,实际需处理流式/多消息)result = await client._read_json()if result["type"] == "result":return result["output"]elif result["type"] == "error":raise RuntimeError(result["error"])return str(result)# 修正:动态绑定时避免闭包问题
    def make_tool(client, tool_name, tool_info):async def func(input_val):if isinstance(input_val, dict):params = input_valelse:params = {"query": input_val}try:result = await call_tool(client, tool_name, params)return json.dumps(result, ensure_ascii=False, indent=2)except Exception as e:return f"Error calling {tool_name}: {str(e)}"return Tool(name=f"{client.server_id}.{tool_name}",description=tool_info.get("description", "") or tool_info.get("prompt", "No description"),func=func,coroutine=func,)# 重新实现 create_multi_server_agent(修正版)
    async def create_multi_server_agent():clients: List[MCPClient] = []all_tools: List[Tool] = []mcp_configs = [("search", "npx", ["@tavily/mcp-server"]),# 可扩展其他 MCP Server]for server_id, cmd, args in mcp_configs:client = MCPClient(server_id, cmd, args)await client.connect()clients.append(client)# 获取工具tools = await client.get_all_tools()  # 实际上我们已经在 connect 返回了 toolsfor tool in tools:lc_tool = make_tool(client, tool["name"], tool)all_tools.append(lc_tool)llm = ChatOpenAI(model="gpt-4o", temperature=0)from langchain import hubprompt = hub.pull("hwchase17/react")agent = create_react_agent(llm, all_tools, prompt)agent_executor = AgentExecutor(agent=agent, tools=all_tools, verbose=True)return agent_executor, clients# =======================
    # 运行任务
    # =======================
    async def run_langgraph_agent():agent_executor, clients = await create_multi_server_agent()try:result = await agent_executor.ainvoke({"input": "量子计算最近有什么突破?请搜索并总结。","chat_history": [],})print("\n\n🎯 最终结果:")print(result["output"])finally:for client in clients:await client.close()if __name__ == "__main__":asyncio.run(run_langgraph_agent())
    

    多Server集成示例

    # 多Server集成示例
    from langgraph.mcp import MultiServerMCPClient
    async def create_multi_server_agent():# 创建多Server客户端mcp_client = MultiServerMCPClient()# 添加搜索工具await mcp_client.add_server("search",command="npx",args=["@tavily/mcp-server"])# 添加数据库工具(示例)await mcp_client.add_server("database",command="npx",args=["@example/database-mcp-server"])# 获取所有工具all_tools = await mcp_client.get_all_tools()llm = ChatOpenAI(model="gpt-4")agent = create_react_agent(llm=llm, tools=all_tools)return agent, mcp_client
    

使用 LangGraph + LangChain 构建的 自动化研究工作流系统 ,让多个 AI Agent 分工协作,自动完成:“用户提问 → 搜集资料 → 分析信息 → 写成专业报告”

  • 安装依赖:pip install langgraph langchain-openai langchain-community tavily-python

    import asyncio
    from typing import TypedDict, Annotated, List
    from langgraph.graph import StateGraph, START, END
    from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
    from langchain_openai import ChatOpenAI
    from langchain.agents import create_react_agent
    from langchain_community.tools.tavily_search import TavilySearchResults
    from langchain_core.prompts import ChatPromptTemplate
    from operator import add# =======================
    # 自定义状态
    # =======================
    class AgentState(TypedDict):messages: Annotated[List[BaseMessage], add]        # 自动合并消息research_data: dict                                # 存储中间数据final_report: str                                  # 最终输出# =======================
    # 工具:使用 Tavily 搜索(替代 MCP)
    # =======================
    def load_tools():"""加载外部工具,例如 Tavily 搜索"""search_tool = TavilySearchResults(max_results=5,include_images=False,include_raw_content=True)return [search_tool]# =======================
    # 节点函数
    # =======================
    llm = ChatOpenAI(model="gpt-4o", temperature=0)async def research_node(state: AgentState):"""研究节点:调用搜索工具收集信息"""print("\n🔍 正在执行【研究节点】...")query = state["messages"][-1].contentsystem_prompt = "你是一个专业研究员,擅长使用工具收集权威信息。"# 创建 ReAct Agentagent = create_react_agent(llm=llm,tools=load_tools(),state_modifier=system_prompt  # 实际上会被包装进 prompt)# 执行调用result = await agent.ainvoke({"messages": [("system", system_prompt),("human", f"请深入研究:{query}")]})# 提取响应response_msg: AIMessage = result["messages"][-1]return {"messages": [response_msg],"research_data": {"raw_data": response_msg.content}}async def analysis_node(state: AgentState):"""分析节点:对研究结果进行深度分析"""print("\n📊 正在执行【分析节点】...")raw_data = state["research_data"]["raw_data"]prompt = f"""你是一名资深行业分析师,请基于以下研究内容进行结构化分析:{raw_data}请输出:1. 关键发现(3-5条)2. 行业趋势判断3. 潜在挑战与机遇"""response = await llm.ainvoke([("system", "你是一个专业的数据分析师,逻辑严谨,表达清晰。"),("human", prompt)])return {"messages": [response],"research_data": {**state["research_data"],"analysis": response.content}}async def report_node(state: AgentState):"""报告生成节点:输出结构化报告"""print("\n📝 正在执行【报告节点】...")raw_data = state["research_data"]["raw_data"]analysis = state["research_data"]["analysis"]prompt = f"""请根据以下内容撰写一份专业报告:【原始研究】{raw_data}【分析结论】{analysis}要求:- 标题明确- 分为“背景”、“关键发现”、“趋势预测”、“建议”四个部分- 语言正式,适合向管理层汇报"""response = await llm.ainvoke([("system", "你是一位资深咨询顾问,擅长撰写高质量行业报告。"),("human", prompt)])return {"messages": [response],"final_report": response.content}# =======================
    # 构建工作流
    # =======================
    async def create_research_workflow():workflow = StateGraph(AgentState)# 添加节点workflow.add_node("research", research_node)workflow.add_node("analysis", analysis_node)workflow.add_node("report", report_node)# 设置图结构workflow.add_edge(START, "research")workflow.add_edge("research", "analysis")workflow.add_edge("analysis", "report")workflow.add_edge("report", END)# 编译图app = workflow.compile()return app# =======================
    # 运行工作流
    # =======================
    async def run_research_workflow():app = await create_research_workflow()inputs = {"messages": [HumanMessage(content="人工智能在医疗领域的应用前景")],"research_data": {},"final_report": ""}print("🚀 开始运行复杂研究工作流...\n")result = await app.ainvoke(inputs)print("\n" + "="*60)print("✅ 最终报告")print("="*60)print(result["final_report"])print("="*60)# =======================
    # 主入口
    # =======================
    if __name__ == "__main__":asyncio.run(run_research_workflow())
    

2.3 LlamaIndex - 企业级RAG+Agent

专为检索增强生成(RAG)设计,擅长处理企业知识库。

  • MCP集成 :使用McpToolSpec包装MCP工具。

  • 应用场景 :Agent优先查询企业内部的向量知识库(如产品手册),若信息不足,则通过MCP调用外部搜索引擎获取最新资讯。

  • 优势 :完美融合内部知识与外部信息,确保回答的全面性和时效性。

  • 安装依赖:pip install llama-index llama-index-agent-openai mcp-client-python

    # 安装和基础设置
    from llama_index.core import Settings
    from llama_index.llms.openai import OpenAI
    from llama_index.agent.openai import OpenAIAgent
    from llama_index.tools.mcp import McpToolSpec
    from mcp_client import BasicMCPClient
    import asyncio
    async def create_llamaindex_agent():# 配置全局设置Settings.llm = OpenAI(model="gpt-4")# 创建MCP客户端mcp_client = BasicMCPClient()await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])# 使用McpToolSpec包装MCP工具mcp_tool_spec = McpToolSpec(mcp_client)tools = mcp_tool_spec.to_tool_list()# 创建Agentagent = OpenAIAgent.from_tools(tools=tools,system_prompt="""你是一个专业的AI助手,具备以下能力:1. 使用搜索工具获取最新信息2. 分析和整理信息3. 提供专业建议请始终基于可靠的信息源进行回答。""",verbose=True)return agent, mcp_client
    # 运行Agent
    async def run_llamaindex_agent():agent, mcp_client = await create_llamaindex_agent()try:# 执行查询response = await agent.achat("帮我搜索并分析2024年大语言模型的发展趋势")print("Agent回复:")print(response.response)# 查看工具调用历史print("
    工具调用历史:")for i, source in enumerate(response.source_nodes):print(f"调用 {i+1}: {source.node.text[:100]}...")finally:await mcp_client.disconnect()
    if __name__ == "__main__":# 运行基础Agentasyncio.run(run_llamaindex_agent())
    

RAG + Agent 集成

  • 安装依赖:pip install llama-index-core llama-index-embeddings-openai llama-index-llms-openai llama-index-readers-web tavily-python

    import asyncio
    from llama_index.core import Settings, VectorStoreIndex, Document
    from llama_index.core.tools import QueryEngineTool, ToolMetadata
    from llama_index.core.agent import ReActAgent
    from llama_index.llms.openai import OpenAI
    from llama_index.embeddings.openai import OpenAIEmbedding
    from llama_index.readers.web import TavilyWebReader# =======================
    # 全局设置
    # =======================
    Settings.llm = OpenAI(model="gpt-4o", temperature=0)
    Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large")# =======================
    # 创建 RAG 知识库
    # =======================
    async def create_knowledge_base():documents = [Document(text="人工智能是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。"),Document(text="机器学习是人工智能的一个子领域,专注于开发能够从数据中学习的算法。"),Document(text="深度学习是机器学习的一个分支,使用多层神经网络来模拟人脑的工作方式。"),Document(text="卷积神经网络(CNN)在图像识别中表现优异,常用于医学影像分析。"),Document(text="Transformer 模型通过自注意力机制,在自然语言处理任务中取得突破。")]index = VectorStoreIndex.from_documents(documents)return index# =======================
    # 创建 RAG 工具
    # =======================
    def create_rag_tool(index):return QueryEngineTool(query_engine=index.as_query_engine(similarity_top_k=3),metadata=ToolMetadata(name="knowledge_base",description="查询内部知识库,获取AI相关的基础概念和定义"))# =======================
    # 创建外部搜索工具(替代 MCP)
    # =======================
    def create_search_tool():# 使用 TavilyWebReader 作为外部搜索工具web_reader = TavilyWebReader(api_key="your_tavily_api_key",  # 替换为你的 Tavily Keyinclude_raw_content=True,max_results=3)async def search_fn(query: str):try:nodes = await web_reader.load_data([query])content = "\n\n".join([n.text for n in nodes])return contentexcept Exception as e:return f"搜索出错: {str(e)}"return QueryEngineTool(query_engine=None,  # 手动定义函数metadata=ToolMetadata(name="web_search",description="搜索互联网以获取关于AI、科技等领域的最新进展"),tool_fn=search_fn,async_tool_fn=search_fn,)# =======================
    # 创建 Agent
    # =======================
    async def create_rag_agent():# 1. 创建 RAG 知识库index = await create_knowledge_base()# 2. 创建工具rag_tool = create_rag_tool(index)search_tool = create_search_tool()all_tools = [rag_tool, search_tool]# 3. 创建 Agentagent = ReActAgent.from_tools(tools=all_tools,system_prompt="""你是一个专业的AI知识助手,具备以下能力:1. 使用内部知识库查询AI的基础概念和定义2. 使用网络搜索工具获取最新的技术进展和新闻3. 结合两者提供全面、准确的回答回答策略:- 对于基础概念(如“什么是深度学习”),优先使用内部知识库- 对于时效性问题(如“2025年最新进展”),必须使用搜索工具- 若问题包含两部分,请分别查询并综合回答""",verbose=True)return agent# =======================
    # 运行 Agent
    # =======================
    async def run_rag_agent():agent = await create_rag_agent()query = "什么是深度学习?它在2025年有哪些最新进展?"print(f"📌 问题:{query}\n")try:response = await agent.achat(query)print("🧠 RAG Agent 回复:")print(response.response)except Exception as e:print(f"❌ Agent 执行出错: {e}")# =======================
    # 主程序入口
    # =======================
    if __name__ == "__main__":asyncio.run(run_rag_agent())
    

2.4 AutoGen 0.4+ - 分布式多Agent系统

AutoGen是微软开发的多Agent协作框架,0.4版本开始引入了更强大的分布式能力。

  • MCP集成 :通过AutoGenMCPClient连接,支持SSE模式。应用场景 :

  • 构建分布式系统,一个本地Agent负责决策,另一个远程Agent通过SSE连接的MCP Server执行耗时的工具调用。

  • 优势 :强大的Agent通信和协作机制,适合大型、分布式的Agent系统。

  • 安装依赖:pip install autogen-agentchat mcp-client-python

    import asyncio
    from autogen_agentchat.agents import AssistantAgent
    from autogen_agentchat.teams import RoundRobinGroupChat
    from autogen_agentchat.ui import Console
    from mcp_client import StdioServerParams, SseServerParams
    from mcp_client.integrations.autogen import AutoGenMCPClient
    from autogen_agentchat.base import TaskResult
    from autogen_agentchat.messages import ChatMessageasync def create_autogen_agents():# 创建MCP客户端mcp_client = AutoGenMCPClient()# 配置Stdio Serverstdio_params = StdioServerParams(command="npx",args=["@tavily/mcp-server"])# 连接MCP Serverawait mcp_client.connect_server("search_tools", stdio_params)# 获取工具列表tools = await mcp_client.get_tools("search_tools")# 创建研究员Agentresearcher = AssistantAgent(name="研究员",model_client=OpenAI(model="gpt-4"),tools=tools,system_message="""你是一个专业的研究员,负责:1. 收集和分析信息2. 使用搜索工具获取最新数据3. 提供准确的研究结果""")# 创建分析师Agentanalyst = AssistantAgent(name="分析师",model_client=OpenAI(model="gpt-4"),system_message="""你是一个专业的数据分析师,负责:1. 分析研究员提供的数据2. 识别趋势和模式3. 提供深入见解""")# 创建报告员Agentreporter = AssistantAgent(name="报告员",model_client=OpenAI(model="gpt-4"),system_message="""你是一个专业的报告撰写专家,负责:1. 整理研究和分析结果2. 生成结构化报告3. 确保内容准确易懂""")return researcher, analyst, reporter, mcp_clientasync def run_autogen_team():researcher, analyst, reporter, mcp_client = await create_autogen_agents()try:# 创建团队team = RoundRobinGroupChat([researcher, analyst, reporter])# 执行任务result = await Console(team.run_stream(task="请研究、分析并报告区块链技术在2024年的最新发展趋势"))print("团队协作结果:")print(result.messages[-1].content)finally:await mcp_client.disconnect_all()async def create_distributed_system():# 创建多个MCP客户端(模拟分布式环境)search_client = AutoGenMCPClient()# 使用SSE连接远程MCP Serversse_params = SseServerParams(url="http://localhost:8000/mcp",headers={"Authorization": "Bearer your-token"})await search_client.connect_server("remote_search", sse_params)# 创建专门的搜索Agentsearch_agent = AssistantAgent(name="搜索专家",model_client=OpenAI(model="gpt-4"),tools=await search_client.get_tools("remote_search"),system_message="你专门负责信息搜索和数据收集")# 创建本地分析Agentlocal_analyst = AssistantAgent(name="本地分析师",model_client=OpenAI(model="gpt-4"),system_message="你负责分析从搜索Agent获得的数据")# 创建分布式团队distributed_team = RoundRobinGroupChat([search_agent, local_analyst])return distributed_team, search_clientif __name__ == "__main__":asyncio.run(run_autogen_team())
    

自定义MCP工具集成

from autogen_agentchat.base import Tool
from typing import Any, Dict
import jsonclass CustomMCPTool(Tool):"""自定义MCP工具包装器"""def __init__(self, mcp_client, tool_name: str):self.mcp_client = mcp_clientself.tool_name = tool_nameasync def run(self, **kwargs) -> Any:"""执行MCP工具"""try:result = await self.mcp_client.call_tool(self.tool_name,kwargs)return resultexcept Exception as e:return f"工具调用失败: {str(e)}"@propertydef schema(self) -> Dict[str, Any]:"""工具schema"""return {"type": "function","function": {"name": self.tool_name,"description": f"调用{self.tool_name}工具","parameters": {"type": "object","properties": {"query": {"type": "string","description": "查询参数"}},"required": ["query"]}}}async def create_custom_tools_agent():# 创建MCP客户端mcp_client = AutoGenMCPClient()# 连接多个MCP服务await mcp_client.connect_server("search",StdioServerParams(command="npx", args=["@tavily/mcp-server"]))# 创建自定义工具search_tool = CustomMCPTool(mcp_client, "tavily_search")# 创建Agentagent = AssistantAgent(name="多工具专家",model_client=OpenAI(model="gpt-4"),tools=[search_tool],system_message="你是一个能够使用多种工具的专家")return agent, mcp_client# 运行自定义工具Agent
async def run_custom_tools():agent, mcp_client = await create_custom_tools_agent()try:# 创建单Agent对话from autogen_agentchat.teams import Swarmteam = Swarm([agent])result = await team.run("使用搜索工具查找Python asyncio的最佳实践")print("自定义工具结果:")print(result.messages[-1].content)finally:await mcp_client.disconnect_all()

2.5 Pydantic AI - 结构化输出

Pydantic AI结合了Pydantic的类型验证能力,强调输出的类型安全和结构化,特别适合需要JSON或特定数据模型的场景。

  • MCP集成 :通过MCPServerStdioMCPServerHTTP连接。
  • 应用场景:要求Agent返回结构化的ResearchReport对象,包含key_findings、search_results等字段,便于下游程序直接解析。
  • 优势 :输出可预测、可验证,减少后处理成本。
  • 安装依赖:pip install pydantic-ai mcp-client-python
    from pydantic_ai import Agent, ModelRetry
    from pydantic import BaseModel, Field
    from mcp_client import MCPServerStdio, MCPServerHTTP
    from typing import List, Optional
    import asyncio# 定义结构化输出模型
    class SearchResult(BaseModel):title: str = Field(description="搜索结果标题")content: str = Field(description="搜索结果内容")url: str = Field(description="搜索结果链接")relevance_score: float = Field(description="相关度评分", ge=0.0, le=1.0)class ResearchReport(BaseModel):topic: str = Field(description="研究主题")executive_summary: str = Field(description="执行摘要")key_findings: List[str] = Field(description="关键发现")search_results: List[SearchResult] = Field(description="搜索结果")conclusion: str = Field(description="结论")confidence_level: float = Field(description="置信度", ge=0.0, le=1.0)async def create_pydantic_agent():# 创建MCP Server连接mcp_server = MCPServerStdio(command="npx",args=["@tavily/mcp-server"])# 获取工具tools = await mcp_server.get_tools()# 创建Agentagent = Agent(model="openai:gpt-4",tools=tools,system_prompt="""你是一个专业的研究助手,能够:1. 使用搜索工具获取信息2. 分析和整理搜索结果3. 生成结构化的研究报告请确保输出符合指定的数据结构。""",result_type=ResearchReport)return agent, mcp_serverasync def run_pydantic_agent():agent, mcp_server = await create_pydantic_agent()try:# 执行结构化查询result = await agent.run("研究人工智能在教育领域的应用现状,生成详细的研究报告")print("结构化研究报告:")print(f"研究主题: {result.data.topic}")print(f"执行摘要: {result.data.executive_summary}")print(f"置信度: {result.data.confidence_level}")print("\n关键发现:")for i, finding in enumerate(result.data.key_findings, 1):print(f"{i}. {finding}")print("\n搜索结果:")for i, search_result in enumerate(result.data.search_results, 1):print(f"{i}. {search_result.title} (相关度: {search_result.relevance_score})")print(f"   链接: {search_result.url}")print(f"   摘要: {search_result.content[:100]}...")print(f"\n结论: {result.data.conclusion}")finally:await mcp_server.disconnect()class AnalysisStep(BaseModel):step_name: str = Field(description="分析步骤名称")description: str = Field(description="步骤描述")tools_used: List[str] = Field(description="使用的工具")findings: str = Field(description="发现内容")confidence: float = Field(description="步骤置信度", ge=0.0, le=1.0)class MultiStepAnalysis(BaseModel):query: str = Field(description="原始查询")analysis_steps: List[AnalysisStep] = Field(description="分析步骤")final_answer: str = Field(description="最终答案")overall_confidence: float = Field(description="整体置信度", ge=0.0, le=1.0)sources_cited: int = Field(description="引用源数量")async def create_advanced_pydantic_agent():# 创建HTTP MCP Server连接mcp_server = MCPServerHTTP(base_url="http://localhost:8000/mcp",headers={"Authorization": "Bearer your-token"})tools = await mcp_server.get_tools()# 创建多步骤分析Agentagent = Agent(model="openai:gpt-4",tools=tools,system_prompt="""你是一个专业的分析师,能够进行多步骤深度分析。分析流程:1. 信息收集 - 使用搜索工具收集相关信息2. 数据验证 - 验证信息的准确性3. 深度分析 - 分析数据的含义和影响4. 结论综合 - 整合所有发现得出结论每个步骤都要记录使用的工具、发现的内容和置信度。""",result_type=MultiStepAnalysis)return agent, mcp_serverasync def run_advanced_analysis():agent, mcp_server = await create_advanced_pydantic_agent()try:result = await agent.run("分析区块链技术在供应链管理中的应用前景和挑战")print("多步骤分析结果:")print(f"查询: {result.data.query}")print(f"整体置信度: {result.data.overall_confidence}")print(f"引用源数量: {result.data.sources_cited}")print("\n分析步骤:")for i, step in enumerate(result.data.analysis_steps, 1):print(f"步骤 {i}: {step.step_name}")print(f"  描述: {step.description}")print(f"  使用工具: {', '.join(step.tools_used)}")print(f"  发现: {step.findings}")print(f"  置信度: {step.confidence}")print()print(f"最终答案: {result.data.final_answer}")finally:await mcp_server.disconnect()if __name__ == "__main__":# 运行基础结构化Agentasyncio.run(run_pydantic_agent())# 运行高级分析Agentprint("\n" + "=" * 50 + "\n")asyncio.run(run_advanced_analysis())
    

2.6 SmolAgents - 轻量级代码生成

Hugging Face推出的轻量级框架,其核心是让Agent生成代码来调用工具。

  • MCP集成 :使用ToolCollection.from_mcp导入工具。

  • 应用场景 :Agent被要求“并行搜索多个主题”,它会生成Python asyncio.gather代码来并发调用MCP搜索工具。

  • 优势 :灵活性高,Agent可以自由组合和调度工具。

  • 安装依赖:pip install smolagents mcp-client-python

    from smolagents import CodeAgent, ToolCollection
    from mcp_client import MCPClient
    from smolagents import Tool
    import asyncioasync def create_smol_agent():# 创建MCP客户端mcp_client = MCPClient()await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])# 使用ToolCollection导入MCP工具tools = ToolCollection.from_mcp(mcp_client)# 创建CodeAgentagent = CodeAgent(tools=tools,model="gpt-4",system_prompt="""你是一个专业的代码生成助手,能够:1. 理解用户需求2. 生成Python代码调用工具3. 执行代码并返回结果请总是生成清晰、可执行的代码。""")return agent, mcp_clientasync def run_smol_agent():agent, mcp_client = await create_smol_agent()try:# 执行代码生成任务result = await agent.run("""请帮我搜索"Python异步编程最佳实践",然后分析搜索结果,提取关键要点。""")print("SmolAgent生成的代码:")print(result.code)print("\n执行结果:")print(result.output)finally:await mcp_client.disconnect()# 自定义工具集成示例async def create_custom_smol_tools():# 创建自定义工具class CustomSearchTool(Tool):name = "custom_search"description = "自定义搜索工具,支持高级搜索选项"def __init__(self, mcp_client):self.mcp_client = mcp_clientsuper().__init__()async def forward(self, query: str, max_results: int = 5) -> str:"""执行自定义搜索"""try:# 调用MCP工具result = await self.mcp_client.call_tool("tavily_search",{"query": query,"max_results": max_results})return resultexcept Exception as e:return f"搜索失败: {str(e)}"@propertydef inputs(self):return {"query": {"type": "string", "description": "搜索查询"},"max_results": {"type": "integer", "description": "最大结果数", "default": 5}}@propertydef output_type(self):return "string"# 创建MCP客户端mcp_client = MCPClient()await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])# 创建自定义工具custom_tool = CustomSearchTool(mcp_client)# 创建工具集合tools = ToolCollection([custom_tool])# 创建Agentagent = CodeAgent(tools=tools,model="gpt-4",system_prompt="""你是一个专业的搜索分析师,能够:1. 使用自定义搜索工具2. 分析搜索结果3. 生成洞察报告""")return agent, mcp_clientasync def run_custom_smol_agent():agent, mcp_client = await create_custom_smol_tools()try:result = await agent.run("""使用自定义搜索工具搜索"机器学习模型部署"相关内容,限制结果数量为3个,然后分析这些结果的共同点。""")print("自定义SmolAgent结果:")print(result.code)print("\n输出:")print(result.output)finally:await mcp_client.disconnect()# 批量任务处理示例async def create_batch_processing_agent():# 创建MCP客户端mcp_client = MCPClient()await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])tools = ToolCollection.from_mcp(mcp_client)# 创建批量处理Agentagent = CodeAgent(tools=tools,model="gpt-4",system_prompt="""你是一个批量任务处理专家,能够:1. 处理多个搜索任务2. 并行执行工具调用3. 汇总和分析结果请使用Python的asyncio来实现并行处理。""")return agent, mcp_clientasync def run_batch_processing():agent, mcp_client = await create_batch_processing_agent()try:# 批量搜索任务queries = ["人工智能在金融领域的应用","区块链技术发展趋势","云计算市场分析","物联网安全挑战"]result = await agent.run(f"""对以下查询进行批量搜索和分析:{queries}请:1. 并行执行所有搜索2. 分析每个查询的结果3. 生成综合报告""")print("批量处理结果:")print(result.code)print("\n综合报告:")print(result.output)finally:await mcp_client.disconnect()if __name__ == "__main__":# 运行基础SmolAgentasyncio.run(run_smol_agent())# 运行自定义工具Agentprint("\n" + "=" * 50 + "\n")asyncio.run(run_custom_smol_agent())# 运行批量处理Agentprint("\n" + "=" * 50 + "\n")asyncio.run(run_batch_processing())
    

2.7 Camel - 多Agent角色扮演

Camel专为多Agent角色扮演和协作任务设计,特别适合需要不同专业角色协作的场景。

  • MCP集成 :通过MCPToolkit提供工具。

  • 应用场景 :模拟“研究员”、“策略师”和“执行官”的团队协作。研究员使用MCP搜索市场数据,策略师据此制定策略,执行官规划落地步骤。

  • 优势 :生动地模拟了人类团队的协作过程。

  • 安装依赖:pip install camel-ai mcp-client-python

    from camel.agents import ChatAgent
    from camel.messages import BaseMessage
    from camel.types import RoleType, ModelType
    from camel.toolkits import MCPToolkit
    from mcp_client import MCPClient
    from camel.societies import RolePlaying
    import asyncioasync def create_camel_agents():# 创建MCP客户端mcp_client = MCPClient()await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])# 创建MCP工具包mcp_toolkit = MCPToolkit(mcp_client)tools = mcp_toolkit.get_tools()# 创建研究员Agentresearcher = ChatAgent(system_message=BaseMessage.make_assistant_message(role_name="研究员",content="""你是一个专业的研究员,具备以下特点:1. 善于信息收集和分析2. 能够使用各种搜索工具3. 注重数据的准确性和可靠性4. 擅长发现趋势和模式你的任务是为团队提供准确、全面的研究数据。"""),model_type=ModelType.GPT_4,tools=tools)# 创建策略师Agentstrategist = ChatAgent(system_message=BaseMessage.make_assistant_message(role_name="策略师",content="""你是一个资深的策略师,具备以下能力:1. 基于研究数据制定策略2. 分析市场趋势和机会3. 识别风险和挑战4. 提供可行的行动建议你的任务是将研究结果转化为实际的策略建议。"""),model_type=ModelType.GPT_4)# 创建执行官Agentexecutor = ChatAgent(system_message=BaseMessage.make_assistant_message(role_name="执行官",content="""你是一个经验丰富的执行官,专长包括:1. 将策略转化为具体行动计划2. 评估资源需求和时间安排3. 识别执行风险和解决方案4. 制定关键指标和里程碑你的任务是确保策略能够有效执行。"""),model_type=ModelType.GPT_4)return researcher, strategist, executor, mcp_clientasync def run_camel_collaboration():researcher, strategist, executor, mcp_client = await create_camel_agents()try:# 定义协作任务task = "制定一个关于人工智能在零售业应用的完整商业策略"# 第一阶段:研究员收集信息research_prompt = BaseMessage.make_user_message(role_name="项目经理",content=f"""请对以下主题进行全面研究:{task}需要重点关注:1. 当前市场状况2. 技术发展趋势3. 成功案例分析4. 潜在挑战和机遇""")research_result = await researcher.step(research_prompt)print("研究员报告:")print(research_result.msg.content)print("\n" + "="*50 + "\n")# 第二阶段:策略师制定策略strategy_prompt = BaseMessage.make_user_message(role_name="项目经理",content=f"""基于研究员的报告,请制定详细的商业策略:研究报告:{research_result.msg.content}请提供:1. 市场定位策略2. 技术实施方案3. 竞争优势分析4. 风险评估和应对""")strategy_result = await strategist.step(strategy_prompt)print("策略师方案:")print(strategy_result.msg.content)print("\n" + "="*50 + "\n")# 第三阶段:执行官制定执行计划execution_prompt = BaseMessage.make_user_message(role_name="项目经理",content=f"""基于策略师的方案,请制定详细的执行计划:策略方案:{strategy_result.msg.content}请提供:1. 具体行动步骤2. 资源需求和预算3. 时间表和里程碑4. 关键指标和评估方法""")execution_result = await executor.step(execution_prompt)print("执行官计划:")print(execution_result.msg.content)finally:await mcp_client.disconnect()# 动态角色分配示例
    async def create_dynamic_role_system():# 创建MCP客户端mcp_client = MCPClient()await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])mcp_toolkit = MCPToolkit(mcp_client)tools = mcp_toolkit.get_tools()# 创建角色扮演系统role_play = RolePlaying(assistant_role_name="AI专家",user_role_name="企业顾问",assistant_agent_kwargs={"model_type": ModelType.GPT_4,"tools": tools},user_agent_kwargs={"model_type": ModelType.GPT_4})return role_play, mcp_clientasync def run_dynamic_roles():role_play, mcp_client = await create_dynamic_role_system()try:# 初始化对话task_prompt = """我需要为我的制造业公司制定一个AI转型战略。请AI专家研究当前的AI技术趋势,企业顾问则基于实际业务需求提供建议。让我们开始这个协作过程。"""# 运行角色扮演对话input_msg = BaseMessage.make_user_message(role_name="项目发起人",content=task_prompt)print("开始角色扮演协作:")print(f"任务: {task_prompt}")print("\n" + "="*50 + "\n")# 运行多轮对话for i in range(3):  # 进行3轮对话assistant_msg, user_msg = await role_play.step(input_msg)print(f"第{i+1}轮对话:")print(f"AI专家: {assistant_msg.content}")print(f"企业顾问: {user_msg.content}")print("\n" + "-"*30 + "\n")# 准备下一轮输入input_msg = user_msgfinally:await mcp_client.disconnect()# 专业团队协作示例
    async def create_professional_team():# 创建MCP客户端mcp_client = MCPClient()await mcp_client.connect_stdio(command="npx",args=["@tavily/mcp-server"])mcp_toolkit = MCPToolkit(mcp_client)tools = mcp_toolkit.get_tools()# 定义专业角色roles = {"数据科学家": {"description": "专注于数据分析、机器学习模型开发和数据洞察","tools": tools},"产品经理": {"description": "负责产品规划、用户需求分析和市场策略","tools": tools},"技术架构师": {"description": "设计系统架构、技术选型和实施方案","tools": []},"业务分析师": {"description": "分析业务流程、需求分析和效益评估","tools": []}}# 创建Agent团队team = {}for role_name, role_config in roles.items():team[role_name] = ChatAgent(system_message=BaseMessage.make_assistant_message(role_name=role_name,content=f"""你是一个专业的{role_name},职责是:{role_config['description']}请以专业的角度参与团队协作,提供你领域内的专业建议和见解。"""),model_type=ModelType.GPT_4,tools=role_config.get('tools', []))return team, mcp_clientasync def run_professional_team():team, mcp_client = await create_professional_team()try:# 团队协作任务project_brief = """项目:开发一个基于AI的客户服务聊天机器人需求:1. 能够理解客户问题并提供准确回答2. 支持多语言交互3. 具备学习和优化能力4. 集成现有客服系统请各位专家从自己的角度提供专业建议。"""print("专业团队协作开始:")print(f"项目简介: {project_brief}")print("\n" + "="*50 + "\n")# 让每个角色分别提供建议for role_name, agent in team.items():prompt = BaseMessage.make_user_message(role_name="项目经理",content=f"""项目简介:{project_brief}请从{role_name}的角度,提供专业的建议和方案。""")result = await agent.step(prompt)print(f"{role_name}的建议:")print(result.msg.content)print("\n" + "-"*30 + "\n")# 生成综合方案print("团队协作完成,各专家已提供专业建议。")finally:await mcp_client.disconnect()if __name__ == "__main__":# 运行基础协作asyncio.run(run_camel_collaboration())# 运行动态角色系统print("\n" + "="*70 + "\n")asyncio.run(run_dynamic_roles())# 运行专业团队print("\n" + "="*70 + "\n")asyncio.run(run_professional_team())
    

2.8 CrewAI - 结构化Agent团队

CrewAI专注于构建结构化的Agent团队(Crew),强调明确的角色(Role)、目标(Goal)和任务(Task)的结构化团队。

  • MCP集成 :需自定义MCPSearchTool类,包装MCP调用逻辑。
  • 应用场景 :创建一个“市场研究员”Agent,其目标是“收集和分析信息”,通过MCP工具完成研究任务。
  • 优势 :任务流程清晰,易于管理和监控。
  • 安装依赖:pip install crewai mcp-client-python
# 安装第三方MCP适配器
# pip install crewai-mcp-adapterfrom crewai import Agent, Task, Crew, Process
from crewai.tools import BaseTool
import asyncio
import json
import subprocess
import sys
from typing import Dict, Any, Optional
import os# 简化的MCP客户端实现
class MCPClient:def __init__(self):self.process = Noneself.connected = Falseasync def connect_stdio(self, command: str, args: list, env: Optional[Dict[str, str]] = None):"""连接到MCP服务器"""try:# 设置环境变量full_env = os.environ.copy()if env:full_env.update(env)# 启动MCP服务器进程self.process = await asyncio.create_subprocess_exec(command, *args,stdin=asyncio.subprocess.PIPE,stdout=asyncio.subprocess.PIPE,stderr=asyncio.subprocess.PIPE,env=full_env)# 发送初始化请求init_request = {"jsonrpc": "2.0","id": 1,"method": "initialize","params": {"protocolVersion": "2024-11-05","capabilities": {"tools": {}},"clientInfo": {"name": "CrewAI-MCP-Client","version": "1.0.0"}}}await self._send_request(init_request)response = await self._receive_response()if response.get("result"):self.connected = Trueprint("MCP客户端连接成功")return Trueelse:print(f"MCP连接失败: {response}")return Falseexcept Exception as e:print(f"MCP连接错误: {e}")return Falseasync def _send_request(self, request: Dict[str, Any]):"""发送请求到MCP服务器"""if not self.process:raise Exception("MCP客户端未连接")request_str = json.dumps(request) + "\n"self.process.stdin.write(request_str.encode())await self.process.stdin.drain()async def _receive_response(self) -> Dict[str, Any]:"""接收来自MCP服务器的响应"""if not self.process:raise Exception("MCP客户端未连接")line = await self.process.stdout.readline()if line:try:return json.loads(line.decode().strip())except json.JSONDecodeError as e:print(f"JSON解析错误: {e}")return {"error": "JSON解析失败"}return {"error": "无响应"}async def call_tool(self, tool_name: str, params: Dict[str, Any]) -> str:"""调用MCP工具"""if not self.connected:return "MCP客户端未连接"try:# 发送工具调用请求request = {"jsonrpc": "2.0","id": 2,"method": "tools/call","params": {"name": tool_name,"arguments": params}}await self._send_request(request)response = await self._receive_response()if "result" in response:content = response["result"].get("content", [])if content:return content[0].get("text", "无内容")return "工具调用成功但无返回内容"else:return f"工具调用失败: {response.get('error', '未知错误')}"except Exception as e:return f"工具调用异常: {str(e)}"async def disconnect(self):"""断开MCP连接"""if self.process:self.process.terminate()await self.process.wait()self.connected = Falseprint("MCP客户端已断开连接")# 创建MCP工具适配器
class MCPSearchTool(BaseTool):name: str = "MCP搜索工具"description: str = "使用MCP协议进行网络搜索,获取最新信息"def __init__(self, mcp_client):super().__init__()self.mcp_client = mcp_clientdef _run(self, query: str) -> str:"""执行搜索"""try:# 在新的事件循环中运行异步代码import asynciotry:loop = asyncio.get_event_loop()if loop.is_running():# 如果事件循环正在运行,创建一个新的事件循环import threadingresult = [None]exception = [None]def run_in_thread():new_loop = asyncio.new_event_loop()asyncio.set_event_loop(new_loop)try:result[0] = new_loop.run_until_complete(self.mcp_client.call_tool("tavily_search", {"query": query}))except Exception as e:exception[0] = efinally:new_loop.close()thread = threading.Thread(target=run_in_thread)thread.start()thread.join()if exception[0]:raise exception[0]return result[0]else:result = loop.run_until_complete(self.mcp_client.call_tool("tavily_search", {"query": query}))return resultexcept RuntimeError:# 如果没有事件循环,创建一个新的result = asyncio.run(self.mcp_client.call_tool("tavily_search", {"query": query}))return resultexcept Exception as e:return f"搜索失败: {str(e)}"async def create_crew_with_mcp():"""创建带有MCP工具的Crew"""# 创建MCP客户端mcp_client = MCPClient()# 连接到Tavily搜索服务# 注意:需要设置TAVILY_API_KEY环境变量success = await mcp_client.connect_stdio(command="npx",args=["-y", "@tavily/mcp-server"],env={"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY", "your_api_key_here")})if not success:print("警告:MCP连接失败,将使用模拟数据")# 创建模拟工具class MockSearchTool(BaseTool):name: str = "模拟搜索工具"description: str = "模拟搜索工具,用于演示"def _run(self, query: str) -> str:return f"模拟搜索结果:关于'{query}'的信息 - 这是一个模拟搜索结果,包含了相关的基础信息。"search_tool = MockSearchTool()else:# 创建MCP搜索工具search_tool = MCPSearchTool(mcp_client)# 创建研究员Agentresearcher = Agent(role='资深研究员',goal='收集和分析相关信息,提供准确的研究报告',backstory="""你是一个经验丰富的研究员,擅长从各种来源收集信息,进行深入分析,并提供有价值的洞察。你总是确保信息的准确性和相关性。""",tools=[search_tool],verbose=True,allow_delegation=False)# 创建分析师Agentanalyst = Agent(role='数据分析师',goal='分析研究数据,识别趋势和模式,提供专业见解',backstory=""",你是一个专业的数据分析师,善于从复杂的数据中提取有意义的信息,识别趋势和模式,并提供基于数据的专业建议。""",verbose=True,allow_delegation=False)# 创建内容创作者Agentcontent_creator = Agent(role='内容创作专家',goal='将分析结果转化为易于理解的内容',backstory=""",你是一个专业的内容创作专家,擅长将复杂的分析结果转化为清晰、有吸引力的内容,确保读者能够轻松理解。""",verbose=True,allow_delegation=False)return researcher, analyst, content_creator, mcp_clientasync def create_crew_tasks(researcher, analyst, content_creator):"""创建任务"""# 定义研究任务research_task = Task(description="""对"人工智能在医疗保健中的应用"进行全面研究。需要研究的方面:1. 当前应用现状2. 主要技术和解决方案3. 成功案例和失败教训4. 未来发展趋势5. 面临的挑战和机遇请提供详细的研究报告。""",expected_output="一份包含当前状况、技术方案、案例分析和趋势预测的详细研究报告",agent=researcher)# 定义分析任务analysis_task = Task(description=""",基于研究报告,进行深入的数据分析。分析要点:1. 市场规模和增长趋势2. 技术成熟度分析3. 竞争格局评估4. 投资机会识别5. 风险因素评估请提供专业的分析报告。""",expected_output="一份包含市场分析、技术评估、竞争分析和投资建议的专业分析报告",agent=analyst)# 定义内容创作任务content_task = Task(description=""",基于研究和分析报告,创作一篇高质量的科普文章。内容要求:1. 通俗易懂,适合普通读者2. 结构清晰,逻辑性强3. 包含实际案例和数据4. 提供实用的建议5. 字数控制在1500-2000字请创作一篇引人入胜的科普文章。""",expected_output="一篇1500-2000字的高质量科普文章,内容准确、易懂、有趣",agent=content_creator)return research_task, analysis_task, content_taskasync def create_advanced_crew():"""创建高级Crew配置"""# 创建MCP客户端mcp_client = MCPClient()success = await mcp_client.connect_stdio(command="npx",args=["-y", "@tavily/mcp-server"],env={"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY", "your_api_key_here")})if success:search_tool = MCPSearchTool(mcp_client)else:# 使用模拟工具class MockSearchTool(BaseTool):name: str = "模拟搜索工具"description: str = "模拟搜索工具,用于演示"def _run(self, query: str) -> str:return f"模拟搜索结果:关于'{query}'的专业信息和最新数据。"search_tool = MockSearchTool()# 创建专业化的Agent团队market_researcher = Agent(role='市场研究专家',goal='专注于市场趋势和竞争分析',backstory='你是一个资深的市场研究专家,对各行业的市场动态有深入了解。',tools=[search_tool],verbose=True,max_iter=3,  # 最大迭代次数memory=True  # 启用记忆功能)tech_analyst = Agent(role='技术分析师',goal='专注于技术发展和创新分析',backstory='你是一个技术分析专家,对新兴技术和创新趋势有敏锐的洞察力。',tools=[search_tool],verbose=True,max_iter=3,memory=True)strategy_consultant = Agent(role='战略咨询师',goal='提供战略建议和实施方案',backstory='你是一个经验丰富的战略咨询师,擅长将研究和分析转化为可执行的战略方案。',verbose=True,max_iter=3,memory=True)return market_researcher, tech_analyst, strategy_consultant, mcp_clientasync def run_crew_ai():"""运行CrewAI任务"""try:print("开始创建CrewAI团队...")# 创建Agentsresearcher, analyst, content_creator, mcp_client = await create_crew_with_mcp()# 创建任务research_task, analysis_task, content_task = await create_crew_tasks(researcher, analyst, content_creator)# 创建Crewcrew = Crew(agents=[researcher, analyst, content_creator],tasks=[research_task, analysis_task, content_task],process=Process.sequential,  # 顺序执行verbose=2)print("📋 开始执行CrewAI任务...")# 执行任务result = crew.kickoff()print("\nCrewAI任务完成!")print("\n最终结果:")print("="*50)print(result)print("="*50)return resultexcept Exception as e:print(f"执行过程中出现错误: {str(e)}")return Nonefinally:# 清理资源if 'mcp_client' in locals():await mcp_client.disconnect()async def run_advanced_crew():"""运行高级Crew配置"""try:print("开始创建高级CrewAI团队...")# 创建高级Agentsmarket_researcher, tech_analyst, strategy_consultant, mcp_client = await create_advanced_crew()# 创建高级任务market_task = Task(description=""",对人工智能医疗保健市场进行深入的市场研究。重点关注:1. 全球市场规模和增长预测2. 主要参与者和竞争格局3. 区域市场差异4. 监管环境影响""",expected_output="详细的市场研究报告,包含数据分析和竞争格局",agent=market_researcher)tech_task = Task(description=""",分析人工智能在医疗保健中的技术发展趋势。重点关注:1. 关键技术栈和解决方案2. 技术成熟度评估3. 创新突破点4. 技术实施挑战""",expected_output="技术分析报告,包含技术路线图和实施建议",agent=tech_analyst)strategy_task = Task(description=""",基于市场研究和技术分析,制定战略建议。重点关注:1. 投资机会识别2. 风险评估和缓解3. 实施路径规划4. 成功因素分析""",expected_output="战略建议报告,包含可执行的行动方案",agent=strategy_consultant)# 创建高级Crewadvanced_crew = Crew(agents=[market_researcher, tech_analyst, strategy_consultant],tasks=[market_task, tech_task, strategy_task],process=Process.sequential,verbose=2)print("📋 开始执行高级CrewAI任务...")# 执行任务result = advanced_crew.kickoff()print("\n高级CrewAI任务完成!")print("\n最终结果:")print("="*50)print(result)print("="*50)return resultexcept Exception as e:print(f"❌ 执行过程中出现错误: {str(e)}")return Nonefinally:# 清理资源if 'mcp_client' in locals():await mcp_client.disconnect()def main():"""主函数"""print("CrewAI + MCP 集成演示")print("="*50)# 检查环境变量if not os.getenv("TAVILY_API_KEY"):print(" 警告:未设置TAVILY_API_KEY环境变量")print("   将使用模拟数据进行演示")print("   要获取真实搜索结果,请设置环境变量:")print("   export TAVILY_API_KEY=your_api_key_here")print()print("请选择运行模式:")print("1. 基础CrewAI演示")print("2. 高级CrewAI演示")choice = input("请输入选择 (1 或 2): ").strip()if choice == "1":asyncio.run(run_crew_ai())elif choice == "2":asyncio.run(run_advanced_crew())else:print("无效选择,运行基础演示...")asyncio.run(run_crew_ai())if __name__ == "__main__":main()

3. 结论与展望

本文系统性地说明了MCP协议作为AI Agent“神经中枢”的价值。通过将八大主流框架与MCP集成,我们证明了其强大的通用性和工程可行性。MCP不仅解决了工具调用的标准化问题,更促进了Agent生态的繁荣。

未来,MCP有望成为AI Agent领域的“TCP/IP协议”。我们期待看到更多企业级特性(如权限控制、审计日志、服务网格)被集成进来,推动AI Agent从实验室走向千行百业。

http://www.dtcms.com/a/294267.html

相关文章:

  • day060-zabbix监控各种客户端
  • 力扣MySQL(1)
  • python 字符串常用处理函数
  • Zookeeper学习专栏(七):集群监控与管理
  • 解决代码生成过程虚拟总线信号无法直接传递给自定义总线信号问题的方法
  • Python curl_cffi库详解:从入门到精通
  • Redis能完全保证数据不丢失吗?
  • 基于OpenOCD 的 STM32CubeIDE 开发烧录调试环境搭建 DAPLINK/STLINK
  • 《计算机网络》实验报告六 电子邮件
  • 【轨物方案】分布式光伏电站运维升级智能化系列:老电站的数智化重生
  • Zabbix 企业级分布式监控
  • Axios 响应拦截器
  • dfaews
  • vue3笔记(2)自用
  • 设备虚拟化技术
  • 筛选数据-group_concat
  • Go语言实现对象存储——下载任意图片,保存到阿里云存储,并将URL保存到数据库。
  • 【数据库】国产数据库的新机遇:电科金仓以融合技术同步全球竞争
  • Pycaita二次开发基础代码解析:图层管理、基准控制与特征解析深度剖析
  • lwIP学习记录5——裸机lwIP工程学习后的总结
  • 【C++】类和对象(中)构造函数、析构函数
  • 海信IP501H-IP502h_GK6323处理器-原机安卓9专用-优盘卡刷固件包
  • ZLMediaKit流媒体服务器WebRTC页面显示:使用docker部署
  • Android多开实现方案深度分析
  • Android13重置锁屏(2)
  • 论文略读:Knowledge is a Region in Weight Space for Finetuned Language Models
  • springboot集成LangChain4j
  • 世博会无法在Android上启动项目:无法连接到TCP端口5554:连接被拒绝
  • 2025暑期—05神经网络-BP网络
  • PyCharm配置python软件安装步骤(附安装包)PyCharm 2025 超详细下载安装教程