当前位置: 首页 > news >正文

揭开 MCP 的神秘面纱:标准化 AI 上下文管理的未来(下)

引言

最近MCP大火,本文尝试揭开它神秘的面纱。文章较长,分为上下两篇。这是第二篇。

MCP实战

MCP有通过高级API和底层API实现两种方法,我们先来看下底层API如何实现。

底层 API实现

服务器端:

server.py:

import anyio  # AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio.
import click  # Click is a Python package for creating beautiful command line interfaces in a composable way with as little code as necessary.
import httpx
import mcp.types as types
from mcp.server.lowlevel import Server

from datetime import datetime
from tavily import TavilyClient
import os
import json

from dotenv import load_dotenv

load_dotenv()

tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))


def get_now() -> list[types.TextContent]:
    return_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    return [types.TextContent(type="text", text=return_str)]


def web_search(query: str) -> list[types.TextContent]:
    response = tavily_client.search(query)

    results = response.get("results")

    return [
        types.TextContent(type="text", text=result.get("content")) for result in results
    ]


@click.command()
@click.option("--port", default=8000, help="Port to listen on for SSE")
@click.option(
    "--transport",
    type=click.Choice(["stdio", "sse"]),
    default="stdio",
    help="Transport type",
)
def main(port: int, transport: str) -> int:
    app = Server("mcp-test-server")

    @app.call_tool()
    async def fetch_tool(name: str, arguments: dict) -> list[types.TextContent]:
        if name == "get_now":
            return get_now()
        elif name == "web_search":
            return web_search(arguments["query"])
        else:
            raise ValueError(f"Unkonw tool: {name}")

    @app.list_tools()
    async def list_tools() -> list[types.Tool]:
        return [
            types.Tool(
                name="web_search",
                description="进行谷歌搜索,可以查询最近发生的实事、天气等",
                inputSchema={
                    "type": "object",
                    "required": ["query"],
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "要进行互联网搜索的查询",
                        }
                    },
                },
            ),
            types.Tool(
                name="get_now",
                description="获取当前时间",
                inputSchema={},
            ),
        ]

    if transport == "sse":
        from mcp.server.sse import SseServerTransport
        from starlette.applications import (
            Starlette,
        )  # Starlette is a lightweight ASGI framework/toolkit, which is ideal for building async web services in Python.
        from starlette.routing import Mount, Route

        sse = SseServerTransport("/messages/")

        async def handle_sse(request):
            async with sse.connect_sse(
                request.scope, request.receive, request._send
            ) as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )

        starlette_app = Starlette(
            debug=True,
            routes=[
                Route("/sse", endpoint=handle_sse),
                Mount("/messages/", app=sse.handle_post_message),
            ],
        )

        import uvicorn

        uvicorn.run(starlette_app, host="0.0.0.0", port=port)

    else:
        from mcp.server.stdio import stdio_server

        async def arun():
            async with stdio_server() as streams:
                await app.run(
                    streams[0], streams[1], app.create_initialization_options()
                )

            anyio.run(arun)

        return 0


if __name__ == "__main__":
    import sys

    sys.exit(main())
    # python -m server --transport sse

服务器端支持sse和stdio,如果以sse启动: python -m server --transport sse

客户端:

client.py:

import asyncio
import json
import os
import sys
import logging
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import AsyncOpenAI
from dotenv import load_dotenv

load_dotenv()

# 配置日志系统
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class MCPChatClient:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        # 初始化 OpenAI 客户端
        self.openai = AsyncOpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            base_url=os.getenv("OPENAI_BASE_URL"),
        )

    async def __aenter__(self):
        await self.exit_stack.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.exit_stack.__aexit__(exc_type, exc_val, exc_tb)

    async def connect(self, server_url: str):
        logger.info(f"Connecting to SSE server at {server_url}...")
        # 连接 SSE 服务端并创建 MCP 会话
        streams = await self.exit_stack.enter_async_context(sse_client(url=server_url))
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(*streams)
        )
        await self.session.initialize()

        # 获取并打印可用工具列表
        response = await self.session.list_tools()
        tools = response.tools
        logger.info(f"Connected to server with tools: {[tool.name for tool in tools]}")

    async def handle_query(self, user_input: str) -> str:
        messages = [{"role": "user", "content": user_input}]
        response = await self.session.list_tools()

        # 将 MCP 工具列表格式化为 OpenAI function calling 格式
        tools_payload = [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema,
                },
            }
            for tool in response.tools
        ]

        logger.debug(f"Available tools: {json.dumps(tools_payload, indent=2)}")

        # 首次调用 OpenAI Chat Completion
        chat_response = await self.openai.chat.completions.create(
            model=os.getenv("OPENAI_MODEL"),
            max_tokens=1000,
            messages=messages,
            tools=tools_payload,
        )

        output_texts = []
        assistant_msg = chat_response.choices[0].message

        # 检查是否触发了工具调用
        if assistant_msg.tool_calls:
            for tool_call in assistant_msg.tool_calls:
                tool_name = tool_call.function.name
                tool_args = json.loads(tool_call.function.arguments)

                try:
                    # 执行工具调用
                    result = await self.session.call_tool(tool_name, tool_args)
                    output_texts.append(f"[Called {tool_name} with args {tool_args}]")

                    # 将工具调用响应添加到对话中
                    messages.extend(
                        [
                            {
                                "role": "assistant",
                                "content": None,
                                "tool_calls": [tool_call],
                            },
                            {
                                "role": "tool",
                                "tool_call_id": tool_call.id,
                                "content": result.content[0].text,
                            },
                        ]
                    )

                    logger.info(f"Tool {tool_name} returned: {result.content[0].text}")

                    # 根据工具响应再次请求 OpenAI,继续对话
                    chat_response = await self.openai.chat.completions.create(
                        model=os.getenv("OPENAI_MODEL"),
                        max_tokens=1000,
                        messages=messages,
                    )
                    content = chat_response.choices[0].message.content
                    output_texts.append(str(content))
                except Exception as e:
                    logger.exception(
                        f"Error calling tool {tool_name} with args {tool_args}."
                    )
        else:
            # 没有工具调用,直接返回 Assistant 的响应
            content = assistant_msg.content
            output_texts.append(str(content))

        return "\n".join(output_texts)

    async def interactive_chat(self):
        print("\nMCP Chat Client Started!")
        print("Type your queries or 'q' to exit.")
        while True:
            try:
                query = input("\nQuery: ").strip()
                if query.lower() == "q":
                    break
                response = await self.handle_query(query)
                print("\n" + response)
            except Exception as e:
                logger.exception("Unexpected error during chat interaction.")


async def main():
    if len(sys.argv) < 2:
        print("Usage: python -m client <SSE MCP server URL>")
        sys.exit(1)

    try:
        async with MCPChatClient() as client:
            await client.connect(server_url=sys.argv[1])
            await client.interactive_chat()
    except Exception:
        logger.exception("Failed to start MCPChatClient.")


if __name__ == "__main__":
    asyncio.run(main())

假设服务器以sse启动,通过python -m client http://localhost:8000/sse启动客户端。这里示例了通过OpenAI协议中的函数调用方式来实现MCP的工具调用,实际上还可通过ReACT等方式。

fastmcp实现

MCP的sdk提供了高级API实现,可以快速编写服务器:

fast_server.py:

from mcp.server.fastmcp import FastMCP

from datetime import datetime
from tavily import TavilyClient
import os

from dotenv import load_dotenv

load_dotenv()

tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))

mcp = FastMCP("test-demo", port="8088")


@mcp.tool()
def get_now() -> str:
    """获取当前时间

    Returns:
        str: %Y-%m-%d %H:%M:%S 格式的时间
    """
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


@mcp.tool()
def web_search(query: str) -> list[str]:
    """进行谷歌搜索,可以查询最近发生的实事、天气等

    Args:
        query (str): 要进行互联网搜索的查询

    Returns:
        list[str]: 查询结果列表
    """
    response = tavily_client.search(query)

    results = response.get("results")

    return [result.get("content") for result in results]


if __name__ == "__main__":
    # Initialize and run the server
    mcp.run(transport="sse")

可以看到这里我们主要关心的就是如何定义好工具。

首先通过python fast_server.py启动服务端,然后通过python -m client http://localhost:8088/sse启动客户端。

客户端日志:

> python -m client http://localhost:8088/sse
INFO:__main__:Connecting to SSE server at http://localhost:8088/sse...
INFO:mcp.client.sse:Connecting to SSE endpoint: http://localhost:8088/sse
INFO:httpx:HTTP Request: GET http://localhost:8088/sse "HTTP/1.1 200 OK"
INFO:mcp.client.sse:Received endpoint URL: http://localhost:8088/messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7
INFO:mcp.client.sse:Starting post writer with endpoint URL: http://localhost:8088/messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7
INFO:httpx:HTTP Request: POST http://localhost:8088/messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7 "HTTP/1.1 202 Accepted"
INFO:httpx:HTTP Request: POST http://localhost:8088/messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7 "HTTP/1.1 202 Accepted"
INFO:httpx:HTTP Request: POST http://localhost:8088/messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7 "HTTP/1.1 202 Accepted"
INFO:__main__:Connected to server with tools: ['get_now', 'web_search']

服务端日志:

> python fast_server.py
INFO:     Started server process [97933]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8088 (Press CTRL+C to quit)
INFO:     127.0.0.1:50789 - "GET /sse HTTP/1.1" 200 OK
INFO:     127.0.0.1:50791 - "POST /messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7 HTTP/1.1" 202 Accepted
INFO:     127.0.0.1:50793 - "POST /messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7 HTTP/1.1" 202 Accepted
INFO:     127.0.0.1:50795 - "POST /messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7 HTTP/1.1" 202 Accepted
[04/08/25 15:32:04] INFO     Processing request of type ListToolsRequest   

从上面的日志可以看到:

  1. 客户端通过http://localhost:8088/sse建立连接
  2. 服务端返回带session_id的URL: http://localhost:8088/messages/?session_id=6f7720204b1b4e6ab53ccd65d7a4c3a7
  3. 客户端通过这个端点发送POST请求,进入初始化阶段(能力协商等)。
  4. 然后发送了ListToolsRequest请求获取工具列表。
    • 这里返回了服务端定义的两个工具

然后假设用户输入了一个问题(客户端日志):

MCP Chat Client Started!
Type your queries or 'q' to exit.

Query: 现在几点了
INFO:httpx:HTTP Request: POST http://localhost:8088/messages/?session_id=5fd44bdc7c564f4c9feea03e59b6fc88 "HTTP/1.1 202 Accepted"
INFO:httpx:HTTP Request: POST http://***/v1/chat/completions "HTTP/1.1 200 "
INFO:httpx:HTTP Request: POST http://localhost:8088/messages/?session_id=5fd44bdc7c564f4c9feea03e59b6fc88 "HTTP/1.1 202 Accepted"
INFO:__main__:Tool get_now returned: 2025-04-08 16:11:54
INFO:httpx:HTTP Request: POST http://***/v1/chat/completions "HTTP/1.1 200 "

[Called get_now with args {}]
现在的时间是2025年4月8日16点11分54秒。 

Query: 

服务端日志:

INFO:     127.0.0.1:60904 - "POST /messages/?session_id=5fd44bdc7c564f4c9feea03e59b6fc88 HTTP/1.1" 202 Accepted
[04/08/25 16:11:51] INFO     Processing request of type ListToolsRequest                                                                                                       server.py:534
INFO:     127.0.0.1:60925 - "POST /messages/?session_id=5fd44bdc7c564f4c9feea03e59b6fc88 HTTP/1.1" 202 Accepted
[04/08/25 16:11:54] INFO     Processing request of type CallToolRequest         

这里用户输入了一个问题,实际上执行过程如下:

  1. 通过带session_id的URL发送POST请求获取工具列表(ListToolsRequest)
  2. 服务端返回工具列表
  3. 调用LLM来决定是否需要调用工具
  4. (这里需要调用工具)发送CallToolRequest
  5. 服务端处理CallToolRequest,执行工具调用并返回结果
  6. 客户端对工具调用结果进行渲染,返回给用户

参考

  1. https://modelcontextprotocol.io/
  2. https://spec.modelcontextprotocol.io/specification/2025-03-26/
  3. https://github.com/sidharthrajaram/mcp-sse
http://www.dtcms.com/a/121992.html

相关文章:

  • 永磁同步电机 | 分类 / 转子结构 / FOC 控制 / 电路分析
  • Android 中集成 Unity 工程的步骤
  • 点云处理常用的软件、开源库及数据集
  • 将jar包制作成deb一键安装包
  • 从 Excel 到你的表格应用:条件格式功能的嵌入实践指南
  • 【机密计算顶会解读】13:CAGE:通过 GPU 扩展补充 Arm CCA
  • 2025年3月全国青少年软件编程等级考试(Python五级)试卷及答案
  • 图解Java设计模式
  • 005.Gitlab CICD变量使用
  • oauth2.0认证原理
  • word表格间隔设置
  • C++20 数学常数:<numbers> 头文件的革新
  • cmd清除网络共享连接凭证
  • C++高精度算法(加、减、乘)
  • 【C++】 —— 笔试刷题day_13
  • 抽象类及其特性
  • cpp(c++)win 10编译GDAL、PROJ、SQLite3、curl、libtiff
  • Easysearch VS Opensearch 数据写入与存储性能对比
  • HOW - 实现 useClickOutside 或者 useClickAway
  • 大模型本地部署系列(1) Ollama的安装与配置
  • 神经网络 | 基于脉冲耦合神经网络PCNN图像特征提取与匹配(附matlab代码)
  • 408 计算机网络 知识点记忆(6)
  • Elasticsearch DSL 中的 aggs 聚合分析
  • 数据结构实验3.3:求解迷宫路径问题
  • 西门子S7-1500与S7-200SMART通讯全攻略:从基础配置到远程IO集成
  • SQL注入(SQL Injection)
  • Ollama 与 llama.cpp 深度对比
  • [特殊字符]【高并发实战】Java Socket + 线程池实现高性能文件上传服务器(附完整源码)[特殊字符]
  • 虽然理解git命令,但是我选择vscode插件!
  • Databricks: Why did your cluster disappear?