Deepresearch的MCP实践
项目链接:https://github.com/stay-leave/enhance_llm/tree/main/deepresearch
deepresearch是目前最流行的大模型应用范式,将agent应用于调研报告上,实现了用户只需要输入自己的问题,大模型自动搜索信息完成报告的过程。
区别于rag的单次检索过程和定制化的流程,deepresearch建立在deepsearch的的基础上,由LLM自主控制整个流程。
deepsearch的核心在于搜索、阅读、推理的循环。接收到查询时,进行搜索和阅读,然后根据当前搜索结果,决定是否终止或是扩展查询继续搜索。
参考:
https://jina.ai/news/a-practical-guide-to-implementing-deepsearch-deepresearch
本项目,就是基于MCP工具的deepresearch实现。首先定义了满足MCP协议的工具,主要是本地知识库检索工具、网络检索工具、混合检索工具。然后建立MCP客户端,与工具函数进行链接,从而将不同类型的工具,以统一的方式集成到项目。
满足MCP协议的工具
无需手动定义工具的类型、参数等,直接用mcp库封装,只需要编写实际可执行的函数即可。
# 本地知识库检索
@mcp.tool()
def retrieve(query: str) -> str:
"""本地知识库检索"""
iteration_limit = 3 # 最大迭代次数
iteration = 0
aggregated_contexts = [] # 聚合的检索结果
all_search_queries = [] # 所有查询记录
# 初始查询改写
current_query = rewrite_query(query)
all_search_queries.append(current_query)
while iteration < iteration_limit:
print(f"\n=== 第 {iteration + 1} 次迭代 ===")
print(f"用户原始查询:{query} | 当前检索查询:{current_query}")
# 执行本地知识库混合检索
try:
chunk_df, image_df, _ = chunk_hybid_search(db_object, model, current_query)
iteration_contexts = chunk_df["restored_content"].tolist()
except Exception as e:
print(f"检索失败:{e}")
break
# 处理检索结果
if iteration_contexts:
aggregated_contexts.extend(iteration_contexts)
else:
print("本次迭代未找到有用信息。")
# 生成新查询
new_query = get_new_search_query(
user_query=query,
previous_search_queries=all_search_queries,
all_contexts=aggregated_contexts
)
# 终止条件判断
if not new_query:
print("无需进一步研究。")
break
elif new_query in all_search_queries:
print(f"查询 {new_query} 已执行过,停止迭代。")
break
# 更新查询和记录
current_query = new_query
all_search_queries.append(current_query)
iteration += 1
return '\n\n'.join(aggregated_contexts)
MCP客户端
链接MCP工具,并执行查询流程的循环。
# 客户端
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = ZhipuAI(
api_key=api_key
)
# 启动本地工具服务器(如 search_mcp.py),并通过标准输入/输出与其通信
async def connect_to_server(self, server_script_path: str):
server_params = StdioServerParameters(
command="python",
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# 列出可用工具
response = await self.session.list_tools()
tools = response.tools
print(f"\nConnected to server with tools: {[tool.name for tool in tools]}")
# 进入查询处理流程
async def process_query(self, query: str) -> str:
"""使用 LLM 和 MCP 服务器提供的工具处理查询"""
# 列出所有的工具
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]
print(f'available_tools:\n{available_tools}')
# 初始规划,选择工具,返回工具名称和参数
messages = [
{
"role": "system",
"content": prompts["SYSTEM_PROMPT"] + str(available_tools)
},
{
"role": "user",
"content": query
}
]
response = self.client.chat.completions.create(
model=model_name,
messages=messages
)
message = response.choices[0].message
print(f'llm_output(tool call):{message.content}') # 这一步直接给我返回大模型的结果了,无语
# 确定好工具,进行循环
results = [] # 工具返回的结果聚合
while True:
flag, json_text = get_clear_json(message.content)# 根据是否能解析出json,执行不同的方法
if flag == 0: # 没有生成json格式,直接用大模型生成回复
response = self.client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": query}]
)
return response.choices[0].message.content
# 成功生成json,解析出工具名和参数
json_text = json.loads(json_text)
tool_name = json_text['name']
tool_args = json_text['params']
# 执行工具函数,获得返回值
result = await self.session.call_tool(tool_name, tool_args)
print(f'tool name: \n{tool_name}\ntool call result: \n{result}')
results.append(result.content[0].text)
# 把返回工具回复加入历史消息列表
messages.append({
"role": "assistant",
"content": message.content
})
messages.append({
"role": "user",
"content": f'工具调用结果如下:{result}'
})
# 在工具调用完成后,由大模型决定是否结束检索,还是继续检索
messages.append({
"role": "user",
"content": prompts["NEXT_STEP_PROMPT"].format(query)
})
response = self.client.chat.completions.create(
model=model_name,
messages=messages
)
message = response.choices[0].message
print(f'llm_output:\n{message.content}')
# 检查是否该结束
if 'finish' in message.content:
break
# 继续检索,就把大模型的回复加入历史消息,继续循环
messages.append({
"role": "assistant",
"content": message.content
})
# 循环终止后,进入报告撰写阶段
messages.append({
"role": "user",
"content": prompts["FINISH_GENETATE"].format('\n\n'.join(results), query)
})
response = self.client.chat.completions.create(
model=model_name,
messages=messages
)
# 返回报告内容
message = response.choices[0].message.content
return message
async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print(response)
except Exception as e:
print(f"\nError: {str(e)}")
注意事项
1.LLM一定要满足:上下文窗口比较长,人类对齐能力强。否则无法容纳长文本的检索结果,或者无法成功调用工具。
2.无论是网络检索,还是本地检索,都只是信息检索的一种具体路径。