MCP笔记:介绍和原理
1、引言
1.1、背景
早期的模型(如早期的GPT系列)主要依赖静态训练数据,其能力受限于训练时的知识边界,无法直接获取实时数据或与外部系统交互。这种“孤岛式”特性在实际应用中暴露出一系列问题:模型无法理解用户的历史上下文、无法调用外部工具执行任务、也无法动态更新知识库。
随着AI应用场景的复杂化,例如多轮对话系统、代码生成工具和企业级数据分析,开发者开始尝试通过定制化的API或插件将模型与外部数据源连接。然而,这种方法带来了显著的集成挑战。每个数据源(如Google Driver、Stack或内部数据库)都需要独立的接口开发,导致重复劳动和维护成本激增。这种“点对点”集成的N*M问题(N个模型对接M个数据源)使得系统扩展性受限,开发效率低下,同时也增加了安全性和一致性管理的难度。
AI领域缺乏一个通用的、标准化的上下文传递机制,以打破数据孤岛、简化集成流程。
1.2、MCP是什么
2024年11月底,Anthropic公司发布了全新的MCP(Model Context Protocol)协议,即模型上下文协议 。该协议是一种开放协议,支持大模型应用程序与外部数据源和工具之间的无缝集成。无论您是构建 AI 驱动的 IDE、增强聊天界面,还是创建自定义 AI 工作流,MCP 都提供了一种标准化的方式来连接 LLMs 需要的上下文。
2、Function Calling和MCP的区别
2.1、Function Calling
- Function Calling 指的是 AI 模型根据上下文自动调用函数的机制。
- Function Calling 充当了 AI 模型与外部工具之间的桥梁,不同的模型有不同的 Function Calling 实现,代码集成的方式也不一样。由不同的 AI 模型平台来定义和实现。
如果我们使用 Function Calling,那么需要通过代码给 LLM 提供一组 functions,并且提供清晰的函数描述、函数输入和输出,那么 LLM 就可以根据清晰的结构化数据进行推理,调用函数。
Function Calling 的缺点在于处理不好多轮对话和复杂需求,适合边界清晰、描述明确的任务。如果需要处理很多的任务,那么 Function Calling 的代码比较难维护。
2.2、Model Context Protocol (MCP)
- MCP 是一个标准协议,如同电子设备的 Type C 协议(可以充电也可以传输数据),使 AI 模型能够与不同的 API 和数据源无缝交互。
- MCP 旨在替换碎片化的 Agent 代码集成,从而使 AI 系统更可靠,更有效。通过建立通用标准,服务商可以基于协议来推出它们自己的服务,从而支持开发者更快的构建更强大的 AI 应用。开发者也不需要重复造轮子,通过开源项目可以建立强大的 AI Agent 生态。
- MCP 可以在不同的应用/服务之间保持上下文,从而增强整体自主执行任务的能力。
可以理解为 MCP 是将不同任务进行分层处理,每一层都提供特定的能力、描述和限制。而 MCP Client 端根据不同的任务判断,选择是否需要调用某个能力,然后通过每层的输入和输出,构建一个可以处理复杂、多步对话和统一上下文的 Agent。
总结:Function Calling是AI模型调用函数的机制,MCP是一个标准协议,使大模型与API无缝交互,AI Agent利用Function Calling和MCP来分析和执行任务,实现特定目标。
3、MCP核心架构
MCP 协议采用了客户端-服务器架构,其中包含以下几个核心概念:
MCP主机(MCP Hosts):Hosts 是指 LLM 启动连接的应用程序,像 Cursor, Claude Desktop、Cline这样的应用程序。
MCP客户端(MCP Clients):客户端是用来在 Hosts 应用程序内维护与 Server 之间 1:1 连接。
MCP 服务器(MCP Servers):通过标准化的协议,为 Client 端提供上下文、工具和提示。
本地资源(Local Resources):本地的文件、数据库和 API。
远程资源(Remote Services):外部的文件、数据库和 API。
4、通信机制
MCP 协议支持两种主要的通信机制:基于标准输入输出的本地通信和基于SSE(Server-Sent Events)的远程通信。
这两种机制都使用 JSON-RPC 2.0 格式进行消息传输,确保了通信的标准化和可扩展性。
- 本地通信:通过 stdio 传输数据,适用于在同一台机器上运行的客户端和服务器之间的通信。
- 远程通信:利用 SSE 与 HTTP 结合,实现跨网络的实时数据传输,适用于需要访问远程资源或分布式部署的场景。
5、原理
5.1、编写一个MCP Server
准备环境和新建项目请参考视频:MCP终极指南 - 带你深入掌握MCP(进阶篇)_哔哩哔哩_bilibili
mcp server代码:
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP# Initialize FastMCP server
mcp = FastMCP("weather",log_level="ERROR")# Constance
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"async def make_nws_request(url:str) -> dict[str,Any] | None:"""Make a request to the NWS API with proper error handling."""headers = {"User-Agent":USER_AGENT,"Accept":"application/geo+json"}async with httpx.AsyncClient() as client:try:response = await client.get(url,headers=headers,timeout=30.0)response.raise_for_status()return response.json()except Exception:return Nonedef format_alert(feature:dict) -> str:"""Format an alert feature into a readable string."""props = feature["properties"]return f"""
Event:{props.get('event','Unknown')}
Area:{props.get('areaDesc','Unknown')}
Severity:{props.get('severity','Unknown')}
Description:{props.get('description','No description available')}
Instructions:{props.get('instruction','No specific instructions provided')}
"""@mcp.tool()
async def get_alerts(state:str) -> str:"""Get weather alerts for a US state.Args:state:Two-letter US state code (eg: CA,NY)"""url = f"{NWS_API_BASE}/alerts/active/area/{state}"data = await make_nws_request(url)if not data or "features" not in data:return "Unable to fetch alerts or no alerts found"if not data["features"]:return "No active alerts for this state."alerts = [format_alert(feature) for feature in data["features"]]return "\n----\n".join(alerts)@mcp.tool()
async def get_forecast(latitude:float,longitude:float) -> str:"""Get weather forecast for a location.Args:latitude:Latitude of the locationlongitude:Longitude of the location"""points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"points_data = await make_nws_request(points_url)if not points_data:return "Unable to fetch forecast data for this location."forecast_url = points_data["properties"]["forecast"]forecast_data = await make_nws_request(forecast_url)if not forecast_data:return "Unable to fetch detailed forecast"periods = forecast_data['properties']['periods']forecasts = []for period in periods[:5]:forecast = f"""{period['name']}:Temperature:{period['temperature']}{period['temperatureUnit']}Wind:{period['windSpeed']}{period['windDirection']}Forecast:{period['detailedForecast']}"""forecasts.append(forecast)return "\n---\n".join(forecasts)if __name__ == "__main__":mcp.run(transport='stdio')
代码解释:
- @mcp.tool()装饰器:将函数注册为tool,从函数的注释里面提取这个函数的用途,以及每个参数的含义,以便模型决定调用这个函数的最佳时机。提取的内容包括:函数名、函数参数、参数类型、函数的功能、每个参数的功能,这些提取的内容最终都会转换为tool的信息,并在实际调用的时候传给模型。
- transport='stdio':表示MCP Server与Cline的沟通方式是基于标准输入输出的本地通信
5.2、分析MCP底层协议
5.2.1、创建一个中间代理
#!/usr/bin/env python3import sys
import subprocess
import threading
import argparse
import os# --- Configuration ---
LOG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "mcp_io.log")
# --- End Configuration ---# --- Argument Parsing ---
parser = argparse.ArgumentParser(description="Wrap a command, passing STDIN/STDOUT verbatim while logging them.",usage="%(prog)s <command> [args...]"
)
# Capture the command and all subsequent arguments
parser.add_argument('command', nargs=argparse.REMAINDER,help='The command and its arguments to execute.')open(LOG_FILE, 'w', encoding='utf-8')if len(sys.argv) == 1:parser.print_help(sys.stderr)sys.exit(1)args = parser.parse_args()if not args.command:print("Error: No command provided.", file=sys.stderr)parser.print_help(sys.stderr)sys.exit(1)target_command = args.command
# --- End Argument Parsing ---# --- I/O Forwarding Functions ---
# These will run in separate threadsdef forward_and_log_stdin(proxy_stdin, target_stdin, log_file):"""Reads from proxy's stdin, logs it, writes to target's stdin."""try:while True:# Read line by line from the script's actual stdinline_bytes = proxy_stdin.readline()if not line_bytes: # EOF reachedbreak# Decode for logging (assuming UTF-8, adjust if needed)try:line_str = line_bytes.decode('utf-8')except UnicodeDecodeError:line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n" # Log representation# Log with prefixlog_file.write(f"输入: {line_str}")log_file.flush() # Ensure log is written promptly# Write the original bytes to the target process's stdintarget_stdin.write(line_bytes)target_stdin.flush() # Ensure target receives it promptlyexcept Exception as e:# Log errors happening during forwardingtry:log_file.write(f"!!! STDIN Forwarding Error: {e}\n")log_file.flush()except: pass # Avoid errors trying to log errors if log file is brokenfinally:# Important: Close the target's stdin when proxy's stdin closes# This signals EOF to the target process (like test.sh's read loop)try:target_stdin.close()log_file.write("--- STDIN stream closed to target ---\n")log_file.flush()except Exception as e:try:log_file.write(f"!!! Error closing target STDIN: {e}\n")log_file.flush()except: passdef forward_and_log_stdout(target_stdout, proxy_stdout, log_file):"""Reads from target's stdout, logs it, writes to proxy's stdout."""try:while True:# Read line by line from the target process's stdoutline_bytes = target_stdout.readline()if not line_bytes: # EOF reached (process exited or closed stdout)break# Decode for loggingtry:line_str = line_bytes.decode('utf-8')except UnicodeDecodeError:line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n"# Log with prefixlog_file.write(f"输出: {line_str}")log_file.flush()# Write the original bytes to the script's actual stdoutproxy_stdout.write(line_bytes)proxy_stdout.flush() # Ensure output is seen promptlyexcept Exception as e:try:log_file.write(f"!!! STDOUT Forwarding Error: {e}\n")log_file.flush()except: passfinally:try:log_file.flush()except: pass# Don't close proxy_stdout (sys.stdout) here# --- Main Execution ---
process = None
log_f = None
exit_code = 1 # Default exit code in case of early failuretry:# Open log file in append mode ('a') for the threadslog_f = open(LOG_FILE, 'a', encoding='utf-8')# Start the target process# We use pipes for stdin/stdout# We work with bytes (bufsize=0 for unbuffered binary, readline() still works)# stderr=subprocess.PIPE could be added to capture stderr too if needed.process = subprocess.Popen(target_command,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE, # Capture stderr too, good practicebufsize=0 # Use 0 for unbuffered binary I/O)# Pass binary streams to threadsstdin_thread = threading.Thread(target=forward_and_log_stdin,args=(sys.stdin.buffer, process.stdin, log_f),daemon=True # Allows main thread to exit even if this is stuck (e.g., waiting on stdin) - reconsider if explicit join is needed)stdout_thread = threading.Thread(target=forward_and_log_stdout,args=(process.stdout, sys.stdout.buffer, log_f),daemon=True)# Optional: Handle stderr similarly (log and pass through)stderr_thread = threading.Thread(target=forward_and_log_stdout, # Can reuse the functionargs=(process.stderr, sys.stderr.buffer, log_f), # Pass stderr streams# Add a different prefix in the function if needed, or modify function# For now, it will log with "STDOUT:" prefix - might want to change function# Let's modify the function slightly for thisdaemon=True)# A slightly modified version for stderr loggingdef forward_and_log_stderr(target_stderr, proxy_stderr, log_file):"""Reads from target's stderr, logs it, writes to proxy's stderr."""try:while True:line_bytes = target_stderr.readline()if not line_bytes: breaktry: line_str = line_bytes.decode('utf-8')except UnicodeDecodeError: line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n"log_file.write(f"STDERR: {line_str}") # Use STDERR prefixlog_file.flush()proxy_stderr.write(line_bytes)proxy_stderr.flush()except Exception as e:try:log_file.write(f"!!! STDERR Forwarding Error: {e}\n")log_file.flush()except: passfinally:try:log_file.flush()except: passstderr_thread = threading.Thread(target=forward_and_log_stderr,args=(process.stderr, sys.stderr.buffer, log_f),daemon=True)# Start the forwarding threadsstdin_thread.start()stdout_thread.start()stderr_thread.start() # Start stderr thread too# Wait for the target process to completeprocess.wait()exit_code = process.returncode# Wait briefly for I/O threads to finish flushing last messages# Since they are daemons, they might exit abruptly with the main thread.# Joining them ensures cleaner shutdown and logging.# We need to make sure the pipes are closed so the reads terminate.# process.wait() ensures target process is dead, pipes should close naturally.stdin_thread.join(timeout=1.0) # Add timeout in case thread hangsstdout_thread.join(timeout=1.0)stderr_thread.join(timeout=1.0)except Exception as e:print(f"MCP Logger Error: {e}", file=sys.stderr)# Try to log the error tooif log_f and not log_f.closed:try:log_f.write(f"!!! MCP Logger Main Error: {e}\n")log_f.flush()except: pass # Ignore errors during final logging attemptexit_code = 1 # Indicate logger failurefinally:# Ensure the process is terminated if it's still running (e.g., if logger crashed)if process and process.poll() is None:try:process.terminate()process.wait(timeout=1.0) # Give it a moment to terminateexcept: pass # Ignore errors during cleanupif process.poll() is None: # Still running?try: process.kill() # Force killexcept: pass # Ignore kill errors# Final log messageif log_f and not log_f.closed:try:log_f.close()except: pass # Ignore errors during final logging attempt# Exit with the target process's exit codesys.exit(exit_code)
5.2.2、建立连接的日志分析
输入:Cline->MCP Server
输出:MCP Server->Cline
为了方便描述,cline称为客户端,MCP Server称为服务端
输入: {"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"Cline","version":"3.17.11"}},"jsonrpc":"2.0","id":0}
输出: {"jsonrpc":"2.0","id":0,"result":{"protocolVersion":"2025-03-26","capabilities":{"experimental":{},"prompts":{"listChanged":false},"resources":{"subscribe":false,"listChanged":false},"tools":{"listChanged":false}},"serverInfo":{"name":"weather","version":"1.9.3"}}}输入: {"method":"notifications/initialized","jsonrpc":"2.0"}
输入: {"method":"tools/list","jsonrpc":"2.0","id":1}
输出: {"jsonrpc":"2.0","id":1,"result":{"tools":[{"name":"get_alerts","description":"\n Get weather alerts for a US state.\n Args:\n state:Two-letter US state code (eg: CA,NY)\n ","inputSchema":{"properties":{"state":{"title":"State","type":"string"}},"required":["state"],"title":"get_alertsArguments","type":"object"}},{"name":"get_forecast","description":"Get weather forecast for a location.\n\n Args:\n latitude:Latitude of the location\n longitude:Longitude of the location\n ","inputSchema":{"properties":{"latitude":{"title":"Latitude","type":"number"},"longitude":{"title":"Longitude","type":"number"}},"required":["latitude","longitude"],"title":"get_forecastArguments","type":"object"}}]}}输入: {"method":"resources/list","jsonrpc":"2.0","id":2}
输出: {"jsonrpc":"2.0","id":2,"result":{"resources":[]}}输入: {"method":"resources/templates/list","jsonrpc":"2.0","id":3}
输出: {"jsonrpc":"2.0","id":3,"result":{"resourceTemplates":[]}}
1、连接初始化:
- 客户端向服务端发起连接初始化请求,发送自己的信息和能力。
- 服务端对客户端初始化请求做出响应
2、初始化完成通知
- 客户端通知服务端初始化过程已经完成:是一个单向的告知,不需要服务端返回响应,它标志着客户端已经准备好进行后续的交互操作。
3、查询工具列表
- 客户端请求服务端列出可用的工具
- 服务端对客户端查询工具列表请求做出响应
格式化第六行的json字符串
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"tools": [
{
"name": "get_alerts",
"description": "\n Get weather alerts for a US state.\n Args:\n state:Two-letter US state code (eg: CA,NY)\n ",
"inputSchema": {
"properties": {
"state": {
"title": "State",
"type": "string"
}
},
"required": [
"state"
],
"title": "get_alertsArguments",
"type": "object"
}
},
{
"name": "get_forecast",
"description": "Get weather forecast for a location.\n\n Args:\n latitude:Latitude of the location\n longitude:Longitude of the location\n ",
"inputSchema": {
"properties": {
"latitude": {
"title": "Latitude",
"type": "number"
},
"longitude": {
"title": "Longitude",
"type": "number"
}
},
"required": [
"latitude",
"longitude"
],
"title": "get_forecastArguments",
"type": "object"
}
}
]
}
}
tools工具列表中包含了两个工具,每个工具包含名称、功能描述和工具输入参数的结构,工具输入参数的结构用inputSchema定义。
4、查询资源列表
- 客户端请求服务端列出资源列表。
- 服务端对客户端查询资源列表请求做出响应。
5、查询资源模版列表
- 客户端请求服务端列出资源模板列表。
- 服务端对客户端查询资源模板列表请求做出响应。
5.2.3、执行任务的日志分析
输入: {"method":"tools/call","params":{"name":"get_forecast","arguments":{"latitude":40.7128,"longitude":-74.006}},"jsonrpc":"2.0","id":4}
输出: {"jsonrpc":"2.0","id":4,"result":{"content":[{"type":"text","text":"\n Tonight:\n Temperature:66\n Wind:7 to 14 mphN\n Forecast:Mostly cloudy, with a low around 66. North wind 7 to 14 mph.\n \n---\n\n Sunday:\n Temperature:76\n Wind:8 to 12 mphNE\n Forecast:A slight chance of rain after 2pm. Mostly cloudy, with a high near 76. Northeast wind 8 to 12 mph. Chance of precipitation is 20%.\n \n---\n\n Sunday Night:\n Temperature:64\n Wind:12 mphE\n Forecast:A chance of rain before 2am, then a chance of rain showers. Mostly cloudy, with a low around 64. East wind around 12 mph. Chance of precipitation is 30%. New rainfall amounts between a tenth and quarter of an inch possible.\n \n---\n\n Monday:\n Temperature:72\n Wind:10 mphE\n Forecast:A chance of rain showers. Mostly cloudy, with a high near 72. East wind around 10 mph. Chance of precipitation is 30%. New rainfall amounts less than a tenth of an inch possible.\n \n---\n\n Monday Night:\n Temperature:65\n Wind:7 to 10 mphSE\n Forecast:A chance of rain showers. Mostly cloudy, with a low around 65. Southeast wind 7 to 10 mph. Chance of precipitation is 40%.\n "}],"isError":false}}
1、客户端请求调用工具get_forecast,即获取天气预报。并按照inputSchema的定义传递工具函数需要的参数。
2、服务端执行get_forecast函数,获取天气预报信息封装在text变量中返回给客户端,后续将作为上下文信息传递给大模型。
现在我们应该就能理解下面这张流程图的含义了
5.3、小结
MCP协议主要是规定了两部分的内容:
1、每一个MCP Server有哪些函数可以用
2、如何调用这些函数
MCP协议本身并没有规定与模型的交互方式,不同的MCP Host与模型的交互有很大的差异:例如cline是用XML与模型沟通的,cherrystudio是用Function Calling的格式与模型沟通的。