MCP Client适配DeepSeek
本文是通过MCP
官方的client例子进行修改,适配DeepSeek
API.
MCP client
先解析一下什么是MCP client。
MCP Client 是 Model Context Protocol(模型上下文协议)架构中的客户端组件,主要负责与 MCP 服务器建立和管理连接。它是一个轻量级的中间层,通常是一个 Python 或 Node.js 编写的库,用于处理工具调用的通信逻辑。
主要功能
- 协议版本协商:确保与服务器的兼容性。
- 能力协商:确定可用功能。
- 消息传输和通信:通过 JSON-RPC 2.0 进行消息交换。
- 工具发现和执行:客户端可以发现并执行服务器端提供的工具。
- 资源访问和管理:通过 URI 模板访问服务器端数据源。
- 提示系统交互:与服务器端的提示模板进行交互。
- 安全措施:例如,需要人工批准才能执行工具。
应用场景
- 与大型语言模型(LLM)集成:MCP Client 充当 LLM 和外部资源之间的桥梁,允许 LLM 调用外部工具和服务。
- 跨模型兼容:支持多种 LLM,如 Claude、OpenAI 等。
- 灵活部署:可以在本地或云端运行。
工作原理
- 构建请求:将用户的自然语言指令转换为 MCP 服务器能理解的请求格式。
- 通信与响应:通过网络与服务器通信,并将服务器的响应反馈给用户或 LLM。
MCP Client 在 MCP 生态系统中扮演着“快递员”的角色,负责将 LLM 的指令传递给 MCP 服务器,并将结果返回给 LLM。
适配过程
官方文档: https://modelcontextprotocol.io/quickstart/client
文档中的示例用的是Claude
, API和DeepSeek有区别(DeepSeek用的是OpenAI兼容接口,SDK使用OpenAI即可).
API Client
首先是要替换掉原来的claude client
self.deepseek = OpenAI(api_key=api_key, base_url=base_url)
Chat
然后是交互过程中,要用OpenAI的chat completion
response: ChatCompletion = self.deepseek.chat.completions.create(model="deepseek-chat",messages=messages,max_tokens=1000,tools=available_tools,stream=False,)
注意这里stream一定是False,model笔者用的是deepseek-chat
,没有测试reasoner
.
Message
因为两个模型的message有差异,所以message需要进行一些对应的改动,否则会出现序列化问题
messages.append({"role": "assistant","content": "\n".join(assistant_message_content),})messages.append({"role": "user","content": result.content[0].text,})
完整代码
import asyncio
import json
import traceback
from contextlib import AsyncExitStack
from typing import Optionalfrom dotenv import load_dotenv
from loguru import logger
from mcp import ClientSession, StdioServerParameters, ListToolsResult
from mcp.client.stdio import stdio_client
from mcp.types import CallToolResult
from openai import OpenAI
from openai.types.chat import ChatCompletionload_dotenv()api_key = "sk-123456"
base_url = "https://api.deepseek.com"
print(f"base_url: {base_url}")class MCPClient:def __init__(self):self.session: Optional[ClientSession] = Noneself.exit_stack = AsyncExitStack()self.deepseek = OpenAI(api_key=api_key, base_url=base_url)async def connect_to_server(self, server_script_path: str):is_python = server_script_path.endswith(".py")is_js = server_script_path.endswith(".js")if not is_python and not is_js:raise ValueError("Invalid server script path")command = "python" if is_python else "node"server_params = StdioServerParameters(command=command, args=[server_script_path], env=None)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))self.stdio, self.write = stdio_transportself.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))await self.session.initialize()response = await self.session.list_tools()tools = response.toolslogger.info("Connected to server with tools: {}", [tool.name for tool in tools])async def process_query(self, query: str) -> str:messages = [{"role": "system", "content": "You are a helpful assistant."},{"role": "user", "content": query},]response: ListToolsResult = await self.session.list_tools()available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema,},}for tool in response.tools]# noinspection PyTypeCheckerresponse: ChatCompletion = self.deepseek.chat.completions.create(model="deepseek-chat",messages=messages,max_tokens=1000,tools=available_tools,stream=False,)final_text = []assistant_message_content = []logger.info(f"{response}")for choice in response.choices:if choice.finish_reason != "tool_calls":# 确保 content 是可序列化的if hasattr(choice.message.content, "to_json"):content = choice.message.content.to_json()else:content = str(choice.message.content)final_text.append(content)assistant_message_content.append(content)else:for tool_call in choice.message.tool_calls:tool_name = tool_call.function.nametool_args = tool_call.function.arguments# invoke toolresult: CallToolResult = await self.session.call_tool(tool_name, json.loads(tool_args))logger.info(f"tool call result:{result}")final_text.append(f"Calling tool {tool_name} with args {tool_args} returned: {result}")# 确保 content 是可序列化的if hasattr(choice.message.content, "to_json"):content = choice.message.content.to_json()else:content = str(choice.message.content)assistant_message_content.append(content)# 修改此处,将列表转换为字符串messages.append({"role": "assistant","content": "\n".join(assistant_message_content),})messages.append({"role": "user","content": result.content[0].text,})# 确保 content 是可序列化的if hasattr(response.choices[0].message.content, "to_json"):content = response.choices[0].message.content.to_json()else:content = str(response.choices[0].message.content)final_text.append(content)# Call the model again with the tool result# noinspection PyTypeCheckerlogger.debug(f"messages: {messages}")# noinspection PyTypeCheckerresponse = self.deepseek.chat.completions.create(model="deepseek-chat",messages=messages,max_tokens=1000,tools=available_tools,stream=False,)final_text.append(response.choices[0].message.content)logger.info(f"{response}")return "\n".join(final_text)async def chat_loop(self):print("\nMCP Client Started!")print("Type your queries or 'quit' to exit.")while True:try:query = input("\nYour Query: ").strip()if query.lower() == "quit":print("Exiting...")breakresponse = await self.process_query(query)print("\n" + response)except Exception as e:print(f"Error: {e}")traceback.print_exc()async def cleanup(self):await self.exit_stack.aclose()print("MCP Client Cleaned Up!")async def main():if len(sys.argv) < 2:print("Usage: python client.py <path_to_server_script>")sys.exit(1)client = MCPClient()try:await client.connect_to_server(sys.argv[1])await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":import sysasyncio.run(main())