AIGC入门,手搓大模型客户端与MCP交互第2集
AIGC入门,手搓大模型客户端与MCP交互第2集
在上一篇文章中,我们初步实现了将大语言模型(LLM)与 Model Context Protocol(MCP)服务进行整合,使模型能够根据用户查询调用相应的工具(如获取当前时间、列出时区等)。然而,实践中我们发现,模型输出的 JSON 结构经常不规范,导致解析失败,严重影响系统可靠性。
本文在此基础上,提出并实现了一套更加鲁棒的参数提取机制,显著提升了工具调用的成功率。
一、背景与问题
最初的实现中,我们假设大模型(如 Phi-3)总是返回格式完美的 JSON,例如:
{"action": "call_tool","tool_name": "get_current_time","arguments": {"timezone": "Asia/Shanghai"}
}
但实际上,模型常常返回包含多余文本、注释、甚至格式错误的响应,例如:
我应该调用工具来获取时间。代码如下:
{"action": "call_tool","tool_name": "get_current_time", // 这是工具名"arguments": {"timezone": "New York"}
}
这种非纯 JSON 响应会导致 json.loads()
解析失败,进而导致整个工具调用流程中断。
-
有多余的中文内容
-
json字符串中间有注释内容
二、改进方案:鲁棒的 JSON 提取与验证
我们引入了两个关键函数来增强系统的容错能力:
extract_json_from_response(response_text)
该函数用于从模型响应中提取 JSON 对象,具备以下特性:
-
移除注释:使用正则表达式清除单行(//)和多行(/* */)注释;
-
多层解析策略:
-
首先尝试直接解析整个响应;
-
若失败,则使用递归正则表达式匹配最内层的完整 JSON 对象;
-
-
支持嵌套结构:使用 regex 库的 (?R) 递归模式匹配嵌套的 JSON。
validate_tool_call(tool_call_data)
对提取出的 JSON 进行结构验证,确保包含必要的字段且类型正确:
-
必须包含 “action”: “call_tool”;
-
必须包含字符串类型的 tool_name;
-
arguments 为可选,但若存在则必须为字典类型。
三、完整代码解析
以下是改进后的核心代码段(省略部分重复内容):
def extract_json_from_response(response_text):# 清除注释cleaned_text = regex.sub(r'//.*?$', '', response_text, flags=regex.MULTILINE)cleaned_text = regex.sub(r'/\*.*?\*/', '', cleaned_text, flags=regex.DOTALL)try:return json.loads(cleaned_text)except json.JSONDecodeError:# 使用递归正则匹配嵌套JSONpattern = r'\{(?:[^{}]|(?R))*\}'json_match = regex.search(pattern, cleaned_text)if json_match:try:return json.loads(json_match.group())except:passreturn Nonedef validate_tool_call(tool_call_data):if not isinstance(tool_call_data, dict):return Falseif tool_call_data.get("action") != "call_tool":return Falseif not isinstance(tool_call_data.get("tool_name"), str):return Falseif "arguments" in tool_call_data and not isinstance(tool_call_data["arguments"], dict):return Falsereturn True
完整的改进后的测试脚本如下
import asyncio
import sys
import traceback
import json
import regeximport ollama
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_clientclass MCPTimeClient:def __init__(self):self.server_params = StdioServerParameters(command=sys.executable,args=["time_server.py"],env=None)self.session = Noneself.tools_available = []async def __aenter__(self):"""进入异步上下文管理器"""self._stdio_client = stdio_client(self.server_params)self._stdio_client_context = self._stdio_client.__aenter__()read, write = await self._stdio_client_contextself.session = ClientSession(read, write)await self.session.__aenter__()# 初始化连接await self.session.initialize()# 获取可用工具tools_response = await self.session.list_tools()self.tools_available = [tool.name for tool in tools_response.tools]print(f"已连接到时间服务器,可用工具: {self.tools_available}")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""退出异步上下文管理器"""if self.session:await self.session.__aexit__(exc_type, exc_val, exc_tb)await self._stdio_client.__aexit__(exc_type, exc_val, exc_tb)async def call_tool(self, tool_name, arguments):"""调用指定的 MCP 工具"""if not self.session:return "错误: 未连接到时间服务器"if tool_name not in self.tools_available:return f"错误: 工具 '{tool_name}' 不可用"try:result = await self.session.call_tool(tool_name, arguments=arguments)# return result.content[0].textreturn resultexcept Exception as e:# 打印错误堆栈print(f"错误堆栈:")traceback.print_exc()return f"调用工具时出错: {e}"def extract_json_from_response(response_text):"""从大模型响应中提取JSON内容,提供更强的容错能力参数:response_text (str): 大模型的原始响应文本返回:dict: 解析出的JSON对象,如果解析失败则返回None"""if not response_text:return None# 移除单行注释 (//...)cleaned_text = regex.sub(r'//.*?$', '', response_text, flags=regex.MULTILINE)# 移除多行注释 (/*...*/)cleaned_text = regex.sub(r'/\*.*?\*/', '', cleaned_text, flags=regex.DOTALL)try:# 方法1: 直接尝试解析整个响应return json.loads(cleaned_text)except json.JSONDecodeError:# 如果解析失败,尝试提取第一个完整的 JSON 对象# json_match = regex.search(r'\{.*?\}', cleaned_text)pattern = r'\{(?:[^{}]|(?R))*\}'json_match = regex.search(pattern, cleaned_text)if json_match:try:print("正则匹配到字符串: ", json_match.group())return json.loads(json_match.group())except json.JSONDecodeError:pass# 所有方法都失败return Nonedef validate_tool_call(tool_call_data):"""验证工具调用数据的有效性参数:tool_call_data (dict): 解析出的工具调用数据返回:bool: 数据是否有效"""if not isinstance(tool_call_data, dict):return Falseif tool_call_data.get("action") != "call_tool":return Falseif "tool_name" not in tool_call_data:return False# 工具名称必须是字符串if not isinstance(tool_call_data["tool_name"], str):return False# 参数字段可选,但如果存在必须是字典类型if "arguments" in tool_call_data and not isinstance(tool_call_data["arguments"], dict):return Falsereturn Trueasync def ask_llm_with_mcp(user_query):"""使用大模型分析用户查询,并决定是否需要调用 MCP 服务参数:user_query (str): 用户查询返回:str: 最终响应"""# 初始化 MCP 客户端async with MCPTimeClient() as mcp_client:# 构建系统提示,告诉大模型可用的工具和调用方式system_prompt = f"""你是一个AI助手,可以回答用户问题并决定是否需要调用时间服务。你可以使用的工具:- get_current_time: 获取指定时区的当前时间,参数: timezone (时区名称)- list_common_timezones: 获取常见时区列表,无参数调用格式:如果需要调用工具,请以以下JSON格式回复:{{"action": "call_tool","tool_name": "工具名称","arguments": {{参数键: 参数值}}}}如果不需要调用工具,请直接回复答案。当前可用工具: {mcp_client.tools_available}"""# 第一次询问大模型print(f"大模型提示词: {system_prompt}")print(f"用户查询: {user_query}")print("询问大模型是否需要调用工具...")response = ollama.chat(model="phi3:mini",messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_query}])model_response = response['message']['content']print(f"大模型初始响应: {model_response}")# 尝试解析大模型的响应,看是否要调用工具try:# 使用改进的解析方法tool_call = extract_json_from_response(model_response)if tool_call and validate_tool_call(tool_call):tool_name = tool_call.get("tool_name")arguments = tool_call.get("arguments", {})print(f"大模型决定调用工具: {tool_name}, 参数: {arguments}")# 调用MCP工具tool_result = await mcp_client.call_tool(tool_name, arguments)print(f"工具调用结果: {tool_result}")return tool_resultexcept Exception as e:print(f"解析大模型响应时出错: {e}")# 打印异常堆栈以便调试traceback.print_exc()# 如果解析失败,直接返回大模型的原始响应pass# 如果不需要调用工具或解析失败,直接返回大模型的响应return model_responseasync def main():"""主函数,处理多个示例查询"""examples = ["现在几点了?","纽约现在是什么时间?","给我列出一些常见的时区","Invalid/Timezone 现在的时间是多少?","讲一个关于时间旅行的故事"]for query in examples:print("\n" + "=" * 60)print(f"处理查询: {query}")response = await ask_llm_with_mcp(query)print("\n最终响应:")print(response)print("=" * 60)if __name__ == "__main__":# 运行主函数asyncio.run(main())
四、测试与效果
我们使用以下示例查询进行测试:
examples = ["现在几点了?","纽约现在是什么时间?","给我列出一些常见的时区","Invalid/Timezone 现在的时间是多少?","讲一个关于时间旅行的故事"
]
改进后,系统能够:
-
正确解析带注释的 JSON;
-
从非结构化文本中提取 JSON 对象;
-
在无法提取时降级返回模型原始响应,避免中断;
-
对无效工具调用进行验证和过滤。
五、总结与展望
本文通过引入更强的 JSON 提取和验证机制,显著提升了大模型与 MCP 服务交互的可靠性。下一步可能的优化包括:
-
支持多工具调用(multi-tool calling);
-
引入上下文记忆,支持多轮对话中的工具调用;
-
整合更多类型的 MCP 服务(如数据库、API 等)。
通过不断优化解析策略和扩展工具集,我们可以构建更强大、更可靠的 AI 代理系统,更好地服务于实际应用场景。