当前位置: 首页 > news >正文

MCP项目开发-一个简单的RAG示例

MCP项目开发-一个简单的RAG示例

前言

前言

  • 客户端是基于官网的例子改的,模型改成了openai库连接
  • 仅仅使用基础的RAG流程作为一个演示,包含了以下步骤
    1. query改写
    2. 搜索:使用google serper
    3. 重排序:使用硅基流动的api
  • 大模型api也使用的硅基流动
  • 调用速度还可以

官方文档

  • mcp官方文档

    https://modelcontextprotocol.io/introduction

使用到的api注册

  • 检索:google serper

    https://serper.dev

    有2500 Credits,具体也没搞明白,好像是每次检索返回的文本数量超过10则花费2Credits,不过这个不重要,反正自己用肯定够,api在这

    image-20250406165845194

    连接示例

    curl --location 'https://google.serper.dev/search' \
    --header 'X-API-KEY: 你的api-key' \
    --header 'Content-Type: application/json' \
    --data '{
        "q": "你的问题",
        "num": 2
    }'
    

    其中q表示要检索的问题,然后num为返回的文本数,返回示例如下

    {
        "searchParameters": {
            "q": "中国首都是哪",
            "type": "search",
            "num": 2,
            "engine": "google"
        },
        "organic": [
            {
                "title": "中华人民共和国首都 - 维基百科",
                "link": "https://zh.wikipedia.org/zh-hans/%E4%B8%AD%E5%8D%8E%E4%BA%BA%E6%B0%91%E5%85%B1%E5%92%8C%E5%9B%BD%E9%A6%96%E9%83%BD",
                "snippet": "中华人民共和国首都位于北京市,新中国成立前夕的旧称为北平,是中共中央及中央人民政府所在地,中央四个直辖市之一,全国政治、文化、国际交往和科技创新中心,中国古都、 ...",
                "position": 1
            },
            {
                "title": "中华人民共和国首都_中华人民共和国中央人民政府门户网站",
                "link": "https://www.gov.cn/guoqing/2005-05/24/content_2615214.htm",
                "snippet": "1949年9月27日,中国人民政治协商会议第一届全体会议一致通过中华人民共和国的国都定于北平,即日起北平改名北京。 北京,简称京,是中国共产党中央委员会、中华人民共和国 ...",
                "position": 2
            }
        ],
        "relatedSearches": [
            {
                "query": "中国首都是北京还是上海"
            },
            {
                "query": "中国首都为什么是北京"
            },
            {
                "query": "中国首都上海"
            },
            {
                "query": "中国以前的首都"
            },
            {
                "query": "新中国首都选址"
            },
            {
                "query": "中国历代首都"
            },
            {
                "query": "中国首都候选"
            },
            {
                "query": "中国首都英文"
            }
        ],
        "credits": 1
    }
    

    主要的就是在organic中,如果我们的num为2,则organic中返回两个dict,我们取的是没个dict中的snippet字段

  • 聊天大模型与重排序模型:硅基流动

    https://cloud.siliconflow.cn/

    注册可以看我的另一篇

    https://blog.csdn.net/hbkybkzw/article/details/145558234

    api-key如图

    image-20250406165928270


服务器

环境搭建

  • 官方推荐使用uv来管理环境,先下载uv

    pip install uv
    
    #或者macos linux
    curl -LsSf https://astral.sh/uv/install.sh | sh
    
    #或者windows
    powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
    

    之后重启终端

  • 初始化项目。名称为rag_sample

    uv init rag_sample
    
  • 创建虚拟环境并激活

    cd rag_sample
    uv venv
    
    # windows
    .venv\Scripts\activate
    
    # linux
    source .venv/bin/activate
    
  • 安装依赖

    openai库是客户端用的,为了方便,客户端就不初始化了,直接和服务器用同一个环境

    # windows
    uv add mcp[cli] aiohttp dotenv openai rich
    
    # linux
    uv add "mcp[cli]" aiohttp dotenv openai rich
    

构建服务器

  • 在rag_sample目录下,创建 .env 文件保存为环境变量

    GOOGLE_SERPER_URL=https://google.serper.dev/search
    GOOGLE_SERPER_API=
    SILICONFLOW_RERANKING_BASE_URL=https://api.siliconflow.cn/v1/rerank
    SILICONFLOW_CHAT_BASE_URL=https://api.siliconflow.cn/v1/chat/completions
    SILICONFLOW_RERANKING_API_KEY=
    RERANKING_MODEL=BAAI/bge-reranker-v2-m3
    OPENAI_CHAT_MODEL=Qwen/QwQ-32B
    OPENAI_BASE_URL=https://api.siliconflow.cn/v1
    

    GOOGLE_SERPER_APISILICONFLOW_RERANKING_API_KEY填为自己的

  • rag_client.py

    import mcp
    import aiohttp
    import asyncio
    from mcp.server.fastmcp import FastMCP
    from utils import query_rewrite,search_with_serper,reranking_by_bge
    
    mcp = FastMCP("rag")
    
    
    @mcp.tool()
    async def rag_sample(query:str,doc_num:int=10,top_n:int=4) -> list:
        """rag_sample:   输入query,输出检索重排序后的文档
        
        Parameters:
        -------------------------------------------------------------------------------------------------------------------
        query:               type: str              desc: 用户问题               
        doc_num:             type: int              desc: 检索文档数              
        top_n:               type: top_n            desc: 重排序后返回top_n              
        
        Returns:
        -------------------------------------------------------------------------------------------------------------------
        reranked_doc:        type: list             desc: top_n检索文档
        
        """
        rewrite_query = await query_rewrite(query=query)
        documents = await search_with_serper(query=rewrite_query,document_num=doc_num)
        reranked_doc = await reranking_by_bge(query=query,documents=documents,top_n=top_n)
        return reranked_doc
    
    
    async def test():
        query = "MCP协议介绍"
        reranked_doc = await rag_sample(query=query)
        import
        rich.print(reranked_doc)
    
    
    if __name__ == "__main__":
        # asyncio.run(test())
        mcp.run(transport="stdio")
    

    test函数是用来测试的

    image-20250406201419091

    我们将主要的代码放在rag_client.py,辅助函数都放在utils.py

    主要的流程就是

    1. query_rewrite: 传入用户问题,返回改写后的问题
    2. search_with_serper: 传入改写后的问题和检索数量,返回检索到的文档信息
    3. reranking_by_bge:传入用户问题和检索到的信息,返回相似度高的top_n文档信息

    代码见下面的utils.py

  • utils.py

    import asyncio
    import aiohttp
    from dotenv import load_dotenv
    import os
    
    #从.env文件加载环境变量
    load_dotenv()
    
    # google serper
    google_serper_url = os.getenv("GOOGLE_SERPER_URL")
    google_serper_api = os.getenv("GOOGLE_SERPER_API")
    
    # siliconflow api
    chat_base_url = os.getenv("SILICONFLOW_CHAT_BASE_URL")
    reranking_base_url = os.getenv("SILICONFLOW_RERANKING_BASE_URL")
    reranking_api_key = os.getenv("SILICONFLOW_RERANKING_API_KEY")
    reranking_model = os.getenv("RERANKING_MODEL")
    
    # query rewrite prompt
    query_rewrite_prompt = """
    请根据以下规则优化用户的问题:
    1. 首先判断用户query是否为问候或打招呼(例如:'你好'、'在吗'、'您好'等)。如果是问候,直接返回原文query。
    2. 如果不是,对query进行优化改写,请保持原意并使改写后的query更精准更简洁,限制在15个字以内
    示例:
        - 输入:'你好,能帮个忙吗?'
        输出:'你好,能帮个忙吗?'
        
        - 输入:'怎么重写句子结构?'
        输出:'句子结构优化步骤示例'
    
    输出时无需解释,仅返回最终结果。
    """
    
    
    async def fetch_search_results(url, headers, payload):
        """发动请求并获取响应"""
        async with aiohttp.ClientSession() as session:
            async with session.post(url, headers=headers, json=payload) as response:
                assert response.status == 200
                response_json = await response.json()
                return response_json  # 返回响应json
    
    async def query_rewrite(query:str):
        """一个简单的query改写,直接用aiohttp了,没有使用openai"""
        url = chat_base_url
        headers = {
                    'Authorization': f'Bearer {reranking_api_key}',
                    'Content-Type': 'application/json'
                    }
        payload={
                    "model": "Qwen/Qwen2.5-72B-Instruct",
                    "messages": [
                                    {
                                    "role": "system",
                                    "content": query_rewrite_prompt
                                    },
                                    {
                                    "role": "user",
                                        "content": query
                                    }
                                ],
                    "max_tokens": 512,
                    "stream":False
        }
        response_data = await fetch_search_results( url=url,
                                                    headers=headers,
                                                    payload=payload
                                                    )
        rewrite_query = response_data['choices'][0]['message']['content']
        return rewrite_query
    
    
    async def search_with_serper(query:str,document_num:int=10):
        """google serper 搜索并返回documents,形式如下
    
        [{"title1":"","link1":"","snippet1":"","position1":""},
         {"title2":"","link2":"","snippet2":"","position2":""},
         ...
        ]
        
        """
        url = google_serper_url
        headers = {
            'X-API-KEY': google_serper_api,
            'Content-Type': 'application/json'
        }
        payload = {
            "q": query, # 要搜索的问题
            "num": document_num # 要所搜的文档数量
        }
    
        
        response_data = await fetch_search_results( url=url,
                                                    headers=headers,
                                                    payload=payload
                                                    )
    
        documents = response_data['organic']
        return documents 
    
    
    async def reranking_by_bge(query:str,documents:list[dict],top_n:int=4):
        """调用排序模型,返回排序后的文档"""
        doc_to_rerank = [doc["snippet"] for doc in documents]
    
        url = reranking_base_url
        headers = {
                    'Authorization': f'Bearer {reranking_api_key}',
                    'Content-Type': 'application/json'
                  }
        payload = {
                    "model": reranking_model,
                    "query": query,
                    "documents": doc_to_rerank,
                    "top_n": top_n,
                    "return_documents": True, # 如果是false则只返回索引,麻烦
                    "max_chunks_per_doc": 1024,
                    "overlap_tokens": 80
                }
    
        reranked_info = await fetch_search_results( url=url,
                                                  headers=headers,
                                                  payload=payload
                                                )
        reranked_doc =[item['document']['text'] for item in reranked_info['results']]
        
        return reranked_doc
    
    
    async def test():
        import rich
        query = "流水不争先"
        rewrite_query = await query_rewrite(query=query)
        rich.print('rewrite_query',rewrite_query)
        documents = await search_with_serper(query=rewrite_query,document_num=4)
        rich.print('documents',documents)
        reranked_doc = await reranking_by_bge(query=query,documents=documents,top_n=2)
        rich.print("reranked_doc",reranked_doc)
    
    
    if __name__ == "__main__":
        asyncio.run(test())
    
    

    test函数是用来测试的

    image-20250406202012371

  • 运行 python rag_client.py以确认一切正常

app端使用(cherrystudio为例)

  • 安装cherrystudio,并配置好模型

    https://cherry-ai.com/

  • 在app端添加单个weather服务器,下面的json文件需要复制到cherrystudio中

    {
        "mcpServers": {
            "rag": {
                "command": "uv",
                "args": [
                    "--directory",
                    "C:\\Users\\孟智超\\Desktop\\mac_backups\\MCP\\rag_sample",
                    "run",
                    "rag_client.py"
                ]
            }
        }
    }
    

    意思是:

    1. 有一个名为“天气”的 MCP 服务器
    2. 通过运行 uv --directory C:\\Users\\孟智超\\Desktop\\mac_backups\\MCP\\rag_sample run rag_client.py 来启动它(项目的地址)
  • 将上面的json文本复制到cherrystudio

    image-20250406202414304

  • 选择模型,因为mcp也借用了functioncalling,所以需要选用的模型必须支持functioncalling(tools)

    在cherrystudio中的表现就是,有绿色的小扳手

    image-20250406202448479

  • 开始聊天之前开启mcp

    image-20250406202556713

  • 看看使用前后的对比

    问题:什么是MCP?

    未开启前,列举了很多,但是都不是我们想要的

    image-20250406202722555

    开启后,虽然在思考过程中也提到了其他无关的,但是最后还是决定调用工具rag_sample,最后得到的是我们想要的正确答案。

    image-20250406202908565

客户端

代码

  • 和服务器共用一个环境(不推荐,一般重新开一个环境)

  • 代码rag_client.py

    import os
    import sys
    import json
    import asyncio
    from typing import Optional
    from contextlib import AsyncExitStack
    
    from mcp import ClientSession,StdioServerParameters
    from mcp.client.stdio import stdio_client
    
    import openai
    from openai import OpenAI
    from dotenv import load_dotenv
    import traceback
    
    # 从.env文件加载环境变量
    load_dotenv() 
    
    # siliconflow api
    openai.base_url = os.getenv("OPENAI_BASE_URL")
    openai.api_key = os.getenv("SILICONFLOW_RERANKING_API_KEY")
    openai_chat_model = os.getenv("OPENAI_CHAT_MODEL")
    
    class MCPClient:
        def __init__(self):
            # 初始化会话和客户端对象
            self.session: Optional[ClientSession] = None # 保存mcp客户端会话
            self.exit_stack = AsyncExitStack()
            self.model = openai_chat_model
            self.openai = OpenAI(
                base_url=openai.base_url,
                api_key=openai.api_key
            )
    
        async def connect_to_server(self, server_script_path:str):
            """connect_to_server:   连接到mcp服务器 
            
            
            Parameters:
            -------------------------------------------------------------------------------------------------------------------
            server_script_path:         type:str             desc:  server脚本地址(.py or .js)            
            """
            is_python = server_script_path.endswith('.py') # python脚本?
            is_js = server_script_path.endswith('.js') # node 脚本?
    
            if not (is_python or is_js):
                raise ValueError("Server script must be a .py or .js file")
            
            command = "python" if is_python else "node"
            server_params = StdioServerParameters(
                command=command,
                args=[server_script_path],
                env=None
            ) # 告诉mcp客户端如何启动服务器,比如当前的 python rag_server.py
    
            stdio_transport = await self.exit_stack.enter_async_context(
                stdio_client(server_params) # 启动服务器进程,建立标准I/O通信管道
                )
            self.stdio,self.write = stdio_transport # 拿到读写流
            self.session = await self.exit_stack.enter_async_context(
                ClientSession(self.stdio,self.write) # 创建MCP客户端会话,与服务器交互
                )
            
            await self.session.initialize() # 发送初始化消息给服务器,等待服务器就绪。
    
            # 列出 mcp 服务器上可用的工具
            response = await self.session.list_tools() # 向 mcp 服务器请求所有已注册的工具(即@mcp.tool)
            tools = response.tools
            print("\nConnected to server with tools:", [tool.name for tool in tools])
        
        async def process_query(self,query:str) -> str:
            """process_query:   使用大模型处理查询,调用已经注册的MCP工具 
            
            
            Parameters:
            -------------------------------------------------------------------------------------------------------------------
            query:               type:str               desc:用户问题                 
            """
            messages = [
                {
                    "role":"user",
                    "content": query
                }
            ]
    
            response = await self.session.list_tools() # 向 mcp 服务器请求所有已注册的工具(即@mcp.tool)
            # 转换为functional calling 的available_tools格式
            available_tools = [{
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "input_schema": tool.inputSchema
                }} for tool in response.tools]
            # print(f"available_tools:\n{available_tools}")
    
            # 初始化 大模型连接(OpenAI)
            response = self.openai.chat.completions.create(
                model=self.model,
                max_tokens=1000,
                messages=messages,
                tools=available_tools
            )
            content = response.choices[0]
            print(f"content.finish_reason : {content.finish_reason}")
            if content.finish_reason == "tool_calls": 
                print(f"进入functional calling")
    
                # content.finish_reason == "tool_calls"表示要调用工具
                # 会在content.message.tool_calls 列表中声明要用哪个函数、参数是什么
                tool_call = content.message.tool_calls[0]
                tool_name = tool_call.function.name
                tool_args = json.loads(tool_call.function.arguments)
                
                # 取出工具名 tool_name 和参数 tool_args,执行mcp工具
                result = await self.session.call_tool(tool_name, tool_args)
                print(f"\n[Calling tool {tool_name} with args {tool_args}]\n\n")
                # print(f"\n\ncall_tool_result: {result}")
                # print(f"\n\ncall_tool_result.content[0].text:{result.content[0].text}")
                # print(f"i.text for i in result.content:{[i.text for i in result.content]}")
    
                # 将模型返回的调用哪个工具数据和工具执行完成后的数据都存入messages中
                messages.append(content.message.model_dump())
                messages.append({
                    "role": "tool",
                    "content": f"{[i.text for i in result.content]}",
                    "tool_call_id": tool_call.id,
                })
                
                # 将上面的结果再次返回给大模型并产生最终结果
                response = self.openai.chat.completions.create(
                    model=self.model,
                    max_tokens=1000,
                    messages=messages,
                )
                return response.choices[0].message.content
            
            # 如果没有调用工具则直接返回当前结果
            return content.message.content
        
        async def chat_loop(self):
            """运行一个交互式聊天循环"""
            print("MCP 服务器已经启动!")
            print("输入你的问题(输入'quit'退出).")
    
            while True:
                try:
                    query = input("\n你的问题:\n").strip()
                    if query.lower() == "quit":
                        break
    
                    response = await self.process_query(query)
                    print(f"模型回答:\n{response}")
    
                except Exception as e:
                    tb = traceback.extract_tb(e.__traceback__)
                    for frame in tb:
                        print("File:", frame.filename)
                        print("Line:", frame.lineno)
                        print("Function:", frame.name)
                        print("Code:", frame.line)
                    
        async def cleanup(self):
            """清理资源"""
            # 异步地关闭所有在 exit_stack 中注册的资源(包括 MCP 会话)。
            await self.exit_stack.aclose()
    
    async def main():
        """main:   主执行逻辑,以现在的rag为例子
        标准IO启动需要服务器的代码作为客户端的子进程启动
        服务器代码:rag_server.py
        客户端代码:rag_client.py
        因为都在同一个项目下,所以启动直接为
        python rag_client.py rag_server.py
    
        """
        if len(sys.argv) < 2: # 
            print("Usage: python client.py <path_to_server_script>")
            sys.exit(1)
        
        client = MCPClient()
        try:
            print(sys.argv[1])
            # python rag_client.py rag_server.py ,则argv[1] = rag_client.py 为客户端代码
            await client.connect_to_server(sys.argv[1]) 
            await client.chat_loop()
        finally:
            await client.cleanup()
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    1. connect_to_server:连接到MCP服务器

    2. process_query: 使用大模型处理查询,将MCP工具转换为OpenAI兼容的格式,允许模型决定是否调用工具,如果是,则调用已经注册的MCP工具

    3. chat_loop: 交互式聊天(只有单轮)

    4. main:主函数

      因为在客户端脚本中,启动方式是"stdio"

      if __name__ == "__main__":
          mcp.run(transport="stdio")
      

      这种适合在客户端和服务器在同一个环境中的情况下,是将服务器作为客户端的子进程

      所以其中这个客户端的命令如下

      python rag_client.py rag_server.py
      

测试运行

  • 启动

    python rag_client.py rag_server.py
    
  • 测试下

    (模型不太好触发这个tool,所以我直接加了“请调用rag_sample”)

    问题:

    请调用rag_sample,解释下最近很火的mcp
    

    image-20250406204328355


相关文章:

  • 第15届蓝桥杯java-c组省赛真题
  • 其他 vector 操作详解(四十)
  • 如何做到一个项目的高可用保障
  • 美国mlb与韩国mlb的关系·棒球9号位
  • 第五章 定积分 第二节 微积分基本公式
  • k8s1.24升级1.28
  • OCC Shape 操作
  • 【CSS基础】- 02(emmet语法、复合选择器、显示模式、背景标签)
  • 基于大模型的脑梗死全流程诊疗技术方案
  • Ubuntu 下 无界面环境 多进程/多线程 使用DrissionPage
  • 【最新版】啦啦外卖v64系统独立版源码+全部小程序APP端+安装教程
  • 【论文精读】Copy or Not? Reference-Based Face Image Restoration with Fine Details
  • mysql中my.cnf权限不能过大。否则无法生效
  • SOMEIP通信矩阵解读
  • 探索深度学习模型:技术演进、应用与挑战
  • 【C语言】container_of 宏定义
  • 数据集 handpose_x_plus 3D RGB 三维手势多场景
  • Nginx 配置文件解析
  • 内存池项目(1)——前置知识
  • CF2074F Counting Necessary Nodes
  • 做卖挖掘机的网站/网站提交工具
  • 天河网站建设哪个好/上海正规seo公司
  • 产品服务展示型网站有哪些/营销方案案例范文
  • 第三方做公司网站/宁波网站推广优化哪家正规
  • 购物网站开发参考文献/搜索引擎优化的方法和技巧
  • 东华大学网络教育网页设计作业/推推蛙seo顾问