当前位置: 首页 > news >正文

Deepresearch的MCP实践

项目链接:https://github.com/stay-leave/enhance_llm/tree/main/deepresearch

deepresearch是目前最流行的大模型应用范式,将agent应用于调研报告上,实现了用户只需要输入自己的问题,大模型自动搜索信息完成报告的过程。

区别于rag的单次检索过程和定制化的流程,deepresearch建立在deepsearch的的基础上,由LLM自主控制整个流程。
deepsearch的核心在于搜索、阅读、推理的循环。接收到查询时,进行搜索和阅读,然后根据当前搜索结果,决定是否终止或是扩展查询继续搜索。
参考:
https://jina.ai/news/a-practical-guide-to-implementing-deepsearch-deepresearch

本项目,就是基于MCP工具的deepresearch实现。首先定义了满足MCP协议的工具,主要是本地知识库检索工具、网络检索工具、混合检索工具。然后建立MCP客户端,与工具函数进行链接,从而将不同类型的工具,以统一的方式集成到项目。

满足MCP协议的工具

无需手动定义工具的类型、参数等,直接用mcp库封装,只需要编写实际可执行的函数即可。

# 本地知识库检索
@mcp.tool()
def retrieve(query: str) -> str:
    """本地知识库检索"""
    iteration_limit = 3  # 最大迭代次数
    iteration = 0
    aggregated_contexts = []  # 聚合的检索结果
    all_search_queries = []   # 所有查询记录
    
    # 初始查询改写
    current_query = rewrite_query(query)
    all_search_queries.append(current_query)

    while iteration < iteration_limit:
        print(f"\n=== 第 {iteration + 1} 次迭代 ===")
        print(f"用户原始查询:{query} | 当前检索查询:{current_query}")
        
        # 执行本地知识库混合检索
        try:
            chunk_df, image_df, _ = chunk_hybid_search(db_object, model, current_query)
            iteration_contexts = chunk_df["restored_content"].tolist()
        except Exception as e:
            print(f"检索失败:{e}")
            break
        
        # 处理检索结果
        if iteration_contexts:
            aggregated_contexts.extend(iteration_contexts)
        else:
            print("本次迭代未找到有用信息。")
        
        # 生成新查询
        new_query = get_new_search_query(
            user_query=query,
            previous_search_queries=all_search_queries,
            all_contexts=aggregated_contexts
        )
        
        # 终止条件判断
        if not new_query:
            print("无需进一步研究。")
            break
        elif new_query in all_search_queries:
            print(f"查询 {new_query} 已执行过,停止迭代。")
            break
        
        # 更新查询和记录
        current_query = new_query
        all_search_queries.append(current_query)
        iteration += 1

    return '\n\n'.join(aggregated_contexts)

MCP客户端

链接MCP工具,并执行查询流程的循环。

# 客户端
class MCPClient:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.client = ZhipuAI(
            api_key=api_key
        )
    
    # 启动本地工具服务器(如 search_mcp.py),并通过标准输入/输出与其通信
    async def connect_to_server(self, server_script_path: str):
        server_params = StdioServerParameters(
            command="python",
            args=[server_script_path],
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # 列出可用工具
        response = await self.session.list_tools()
        tools = response.tools
        print(f"\nConnected to server with tools: {[tool.name for tool in tools]}")
    
    # 进入查询处理流程
    async def process_query(self, query: str) -> str:
        """使用 LLM 和 MCP 服务器提供的工具处理查询"""
        # 列出所有的工具
        response = await self.session.list_tools()
        
        available_tools = [{
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.inputSchema
            }
        } for tool in response.tools]
        print(f'available_tools:\n{available_tools}')
        
        # 初始规划,选择工具,返回工具名称和参数
        messages = [
            {
                "role": "system",
                "content": prompts["SYSTEM_PROMPT"] + str(available_tools)
            },
            {
                "role": "user",
                "content": query
            }
        ]
        response = self.client.chat.completions.create(
                model=model_name,
                messages=messages
            )
        
        message = response.choices[0].message
        print(f'llm_output(tool call):{message.content}') # 这一步直接给我返回大模型的结果了,无语
        
        # 确定好工具,进行循环
        results = [] # 工具返回的结果聚合
        while True:
            
            flag, json_text = get_clear_json(message.content)# 根据是否能解析出json,执行不同的方法
            
            if flag == 0: # 没有生成json格式,直接用大模型生成回复
                response = self.client.chat.completions.create(
                    model=model_name,
                    messages=[{"role": "user", "content": query}]
                )
                return response.choices[0].message.content
            
            # 成功生成json,解析出工具名和参数
            json_text = json.loads(json_text)
            tool_name = json_text['name']
            tool_args = json_text['params']
            # 执行工具函数,获得返回值
            result = await self.session.call_tool(tool_name, tool_args)
            print(f'tool name: \n{tool_name}\ntool call result: \n{result}')
            results.append(result.content[0].text)
            
            # 把返回工具回复加入历史消息列表
            messages.append({
                "role": "assistant",
                "content": message.content
            })
            messages.append({
                "role": "user",
                "content": f'工具调用结果如下:{result}'
            })
            
            # 在工具调用完成后,由大模型决定是否结束检索,还是继续检索
            messages.append({
                "role": "user",
                "content": prompts["NEXT_STEP_PROMPT"].format(query)
            })
            
            response = self.client.chat.completions.create(
                model=model_name,
                messages=messages
            )
            
            message = response.choices[0].message
            print(f'llm_output:\n{message.content}')
            # 检查是否该结束
            if 'finish' in message.content:
                break
            
            # 继续检索,就把大模型的回复加入历史消息,继续循环
            messages.append({
                "role": "assistant",
                "content": message.content
            })
        
        # 循环终止后,进入报告撰写阶段
        messages.append({
                "role": "user",
                "content": prompts["FINISH_GENETATE"].format('\n\n'.join(results), query)
                })
        
        response = self.client.chat.completions.create(
                model=model_name,
                messages=messages
            )
        # 返回报告内容
        message = response.choices[0].message.content
        return message
    

    async def chat_loop(self):
        """运行交互式聊天循环"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")

        while True:
            try:
                query = input("\nQuery: ").strip()
                if query.lower() == 'quit':
                    break
                response = await self.process_query(query)
                print(response)
            except Exception as e:
                print(f"\nError: {str(e)}")

注意事项

1.LLM一定要满足:上下文窗口比较长,人类对齐能力强。否则无法容纳长文本的检索结果,或者无法成功调用工具。

2.无论是网络检索,还是本地检索,都只是信息检索的一种具体路径。

相关文章:

  • 接上文,SpringBoot的线程池配置以及JVM监控
  • 初探:简道云平台架构及原理
  • 创建HAL版本MDK工程模板
  • 游戏引擎学习第199天
  • 如何访问和使用Sora:OpenAI视频生成模型的完整指南
  • 修改jar包里面的文件方法
  • WEB安全--内网渗透--LMNTLM基础
  • pom导包成功,但是就是无法使用相关类,同时报错:Library:Maven ‘xxx‘ has broken path
  • 【ESP32】ESP32物联网应用:MQTT控制与状态监测
  • SPSS系列1—无聊的列联表卡方检验
  • 【4】数据结构的循环链表章
  • MySQL 存储过程的实用技巧与最佳实践
  • Business English Certificates (BEC) 高频词汇背诵
  • 【C / C++】蓝桥第27场月赛
  • vue2 vue3 响应式差异
  • Android NDK C/C++交叉编译脚本
  • c++使用gstreamer录屏+声音
  • JVM中常见的垃圾回收器(Garbage Collectors)
  • Angular 项目使用 pdf.js 及批注插件Elasticpdf 教程
  • React框架的Concurrent Mode
  • 程序员自己做项目的网站/官网优化哪家专业
  • 视频网站怎样做/google关键词规划师
  • 农家乐网站建设方案/百度商城
  • 东莞做微网站建设/网站建设山东聚搜网络
  • 手机怎么做自己的网站/如何推销自己的产品
  • 国内有名的软件开发公司排名/灯塔seo