当前位置: 首页 > news >正文

吴恩达MCP课程(5):mcp_chatbot_prompt_resource.py

前提条件:
1、吴恩达MCP课程(5):research_server_prompt_resource.py
2、server_config_prompt_resource.json文件

{"mcpServers": {"filesystem": {"command": "npx","args": ["--y","@modelcontextprotocol/server-filesystem","."]},"research": {"command": "uv","args": ["run", "research_server_prompt_resource.py"]},"fetch": {"command": "uvx","args": ["mcp-server-fetch"]}}
}

代码

原课程用的anthropic的,下面改成openai,并用千问模型做测试

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
import json
import asyncio
import osload_dotenv()class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}async def connect_to_server(self, server_name, server_config):try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()try:# List available toolsresponse = await session.list_tools()for tool in response.tools:self.sessions[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})# List available promptsprompts_response = await session.list_prompts()if prompts_response and prompts_response.prompts:for prompt in prompts_response.prompts:self.sessions[prompt.name] = sessionself.available_prompts.append({"name": prompt.name,"description": prompt.description,"arguments": prompt.arguments})# List available resourcesresources_response = await session.list_resources()if resources_response and resources_response.resources:for resource in resources_response.resources:resource_uri = str(resource.uri)self.sessions[resource_uri] = sessionexcept Exception as e:print(f"Error {e}")except Exception as e:print(f"Error connecting to {server_name}: {e}")async def connect_to_servers(self):try:with open("server_config_prompt_resource.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server config: {e}")raiseasync def process_query(self, query):messages = [{"role":"user", "content":query}]while True:response = self.client.chat.completions.create(model="qwen-turbo",tools=self.available_tools,messages=messages)message = response.choices[0].message# 检查是否有普通文本内容if message.content:print(message.content)messages.append({"role": "assistant", "content": message.content})break# 检查是否有工具调用elif message.tool_calls:# 添加助手消息到历史messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 处理每个工具调用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 获取session并调用工具session = self.sessions.get(tool_name)if not session:print(f"Tool '{tool_name}' not found.")breakresult = await session.call_tool(tool_name, arguments=tool_args)# 添加工具结果到消息历史messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})else:breakasync def get_resource(self, resource_uri):session = self.sessions.get(resource_uri)# Fallback for papers URIs - try any papers resource sessionif not session and resource_uri.startswith("papers://"):for uri, sess in self.sessions.items():if uri.startswith("papers://"):session = sessbreakif not session:print(f"Resource '{resource_uri}' not found.")returntry:result = await session.read_resource(uri=resource_uri)if result and result.contents:print(f"\nResource: {resource_uri}")print("Content:")print(result.contents[0].text)else:print("No content available.")except Exception as e:print(f"Error: {e}")async def list_prompts(self):"""List all available prompts."""if not self.available_prompts:print("No prompts available.")returnprint("\nAvailable prompts:")for prompt in self.available_prompts:print(f"- {prompt['name']}: {prompt['description']}")if prompt['arguments']:print(f"  Arguments:")for arg in prompt['arguments']:arg_name = arg.name if hasattr(arg, 'name') else arg.get('name', '')print(f"    - {arg_name}")async def execute_prompt(self, prompt_name, args):"""Execute a prompt with the given arguments."""session = self.sessions.get(prompt_name)if not session:print(f"Prompt '{prompt_name}' not found.")returntry:result = await session.get_prompt(prompt_name, arguments=args)if result and result.messages:prompt_content = result.messages[0].content# Extract text from content (handles different formats)if isinstance(prompt_content, str):text = prompt_contentelif hasattr(prompt_content, 'text'):text = prompt_content.textelse:# Handle list of content itemstext = " ".join(item.text if hasattr(item, 'text') else str(item) for item in prompt_content)print(f"\nExecuting prompt '{prompt_name}'...")await self.process_query(text)except Exception as e:print(f"Error: {e}")async def chat_loop(self):print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")print("Use @folders to see available topics")print("Use @<topic> to search papers in that topic")print("Use /prompts to list available prompts")print("Use /prompt <name> <arg1=value1> to execute a prompt")while True:try:query = input("\nQuery: ").strip()if not query:continueif query.lower() == 'quit':break# Check for @resource syntax firstif query.startswith('@'):# Remove @ sign  topic = query[1:]if topic == "folders":resource_uri = "papers://folders"else:resource_uri = f"papers://{topic}"await self.get_resource(resource_uri)continue# Check for /command syntaxif query.startswith('/'):parts = query.split()command = parts[0].lower()if command == '/prompts':await self.list_prompts()elif command == '/prompt':if len(parts) < 2:print("Usage: /prompt <name> <arg1=value1> <arg2=value2>")continueprompt_name = parts[1]args = {}# Parse argumentsfor arg in parts[2:]:if '=' in arg:key, value = arg.split('=', 1)args[key] = valueawait self.execute_prompt(prompt_name, args)else:print(f"Unknown command: {command}")continueawait self.process_query(query)except Exception as e:print(f"\nError: {str(e)}")async def cleanup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()if __name__ == "__main__":asyncio.run(main())

代码解释

这段代码实现了一个基于MCP(Model Context Protocol)的聊天机器人,能够连接到多个MCP服务器,使用它们提供的工具、提示和资源。下面是对代码的详细解释:

类结构与初始化

class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}
  • AsyncExitStack:用于管理多个异步上下文管理器,确保资源正确释放
  • openai.OpenAI:创建OpenAI客户端,使用环境变量中的API密钥和基础URL
  • available_tools:存储可用工具列表,用于OpenAI API的工具调用功能
  • available_prompts:存储可用提示列表,用于快速显示
  • sessions:字典,将工具名称、提示名称或资源URI映射到对应的MCP客户端会话

服务器连接

async def connect_to_server(self, server_name, server_config):# 创建服务器参数、建立连接并初始化会话# 获取可用工具、提示和资源

这个方法负责:

  1. 使用提供的配置创建StdioServerParameters
  2. 通过stdio_client建立与服务器的连接
  3. 创建并初始化ClientSession
  4. 获取服务器提供的工具、提示和资源
  5. 将它们添加到相应的列表和字典中
async def connect_to_servers(self):# 从配置文件加载服务器信息并连接到每个服务器

这个方法从server_config_prompt_resource.json文件加载服务器配置,并为每个服务器调用connect_to_server方法。

查询处理

async def process_query(self, query):# 处理用户查询,使用OpenAI API和可用工具

这个方法是聊天机器人的核心,它:

  1. 创建包含用户查询的消息列表
  2. 调用OpenAI API,传递消息和可用工具
  3. 处理API的响应:
    • 如果有普通文本内容,打印并添加到消息历史
    • 如果有工具调用,执行每个工具调用并将结果添加到消息历史

工具调用的处理流程:

  1. 从响应中提取工具ID、名称和参数
  2. sessions字典中获取对应的会话
  3. 调用工具并获取结果
  4. 将结果添加到消息历史中

资源和提示管理

async def get_resource(self, resource_uri):# 获取并显示指定URI的资源内容

这个方法用于获取和显示资源内容,特别是论文资源。它会:

  1. sessions字典中获取对应的会话
  2. 对于以papers://开头的URI,如果找不到对应会话,会尝试使用任何处理papers的会话
  3. 调用read_resource方法获取资源内容
  4. 打印资源内容
async def list_prompts(self):# 列出所有可用的提示

这个方法列出所有可用的提示及其描述和参数。

async def execute_prompt(self, prompt_name, args):# 执行指定的提示,并处理结果

这个方法执行指定的提示,并将结果传递给process_query方法进行处理。它处理不同格式的提示内容,确保能够正确提取文本。

聊天循环

async def chat_loop(self):# 主聊天循环,处理用户输入

这个方法是聊天机器人的主循环,它:

  1. 打印欢迎信息和使用说明
  2. 循环接收用户输入
  3. 根据输入的不同格式执行不同操作:
    • 如果输入是quit,退出循环
    • 如果输入以@开头,调用get_resource方法获取资源
    • 如果输入以/开头,执行命令(如列出提示或执行提示)
    • 否则,调用process_query方法处理普通查询

资源清理

async def cleanup(self):# 清理资源

这个方法调用exit_stack.aclose()来清理所有资源。

主函数

async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()

主函数创建MCP_ChatBot实例,连接到服务器,运行聊天循环,并确保在结束时清理资源。

代码运行结果

uv run mcp_chatbot_prompt_resource.py

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

相关文章:

  • K-匿名模型
  • 面向连接的运输:TCP
  • 基于 Android 和 JBox2D 的简单小游戏
  • Android高级开发第三篇 - JNI异常处理与线程安全编程
  • 用 Whisper 打破沉默:AI 语音技术如何重塑无障碍沟通方式?
  • HTTP、WebSocket、SSE 对比
  • CNN卷积网络:让计算机拥有“火眼金睛“(superior哥AI系列第4期)
  • 打卡day43
  • 秋招Day12 - 计算机网络 - UDP
  • 05.MySQL表的约束
  • 如何区分虚拟货币诈骗与经营失败?
  • STM32G4 电机外设篇(四)DAC输出电流波形 + CAN通讯
  • Vue-3-前端框架Vue基础入门之VSCode开发环境配置和Tomcat部署Vue项目
  • paoxiaomo的XCPC算法竞赛训练经验
  • C++中实现随机数(超详细!​​​​​)
  • 黑马程序员C++核心编程笔记--4 类和对象--多态
  • 1.文件操作相关的库
  • Java Netty 中处理粘包和半包问题的解决方案 | TCP消息完整性校验(XOR )
  • 基于GPT-SoVITS-v4-TTS的音频文本推理,流式生成
  • SOC-ESP32S3部分:25-HTTP请求
  • 南充商城网站建设/个人免费网上注册公司
  • 南京专门做网站/网络广告策划的内容
  • 武汉网站建设组织/google下载安装
  • 模板网站建设教程/淘宝关键词排名查询工具
  • 苏州写信小程序开发公司/windows优化大师的作用
  • 电商网站模板下载/品牌营销策略案例