手搓一个企业级Agent智能体
在上一篇文章中,我们已经对 Agent 智能体的基本概念及相关话题进行了初步探讨。本文将从工程实践的角度,尝试一步步“手搓”一个属于自己的 Agent 智能体。
当然,市面上已经有不少优秀的开源框架可供选择,例如 LangChain、AWS 开源的 Strands-agents 等。既然如此,为什么还要“重复造轮子”?
原因在于:大模型的本质是概率模型,这决定了 Agent 的决策过程天然存在不确定性,工具调用的结果也可能不够精确。而开源框架往往像一个“黑盒子”,当问题出现时,开发者很难深入排查,更谈不上对其策略进行有针对性的干预与优化。
如果你真正想要 调优 Agent 的策略与准确性,仅仅依赖这些框架可能会显得捉襟见肘。
因此,“手搓”一个 Agent 的意义就在于:
- 透明度:能够实时展现智能体的推理路径与决策逻辑,而非被黑盒遮蔽。
- 可控性:允许在关键节点进行人为干预,从而持续优化决策质量。
- 可观测性:自由地埋点、监控与分析 Agent 的内部行为,形成更完善的调优闭环。
简而言之,自己动手并不仅仅是“重复造轮子”,而是为了获得可解释、可控、可演进的智能体实现。
好,话不多说,下面我们正式开搓。
在真正开始写代码之前,我们先来看一张“极简版”的 Agent 框架图:
这就是一个最小可用的 Agent 基础框架,虽然简化,但已经涵盖了“感知—决策—执行—反馈”的核心循环逻辑。
下面上核心逻辑代码:
一、目录结构
采用了模块化设计,核心包含四个组件:
Agent (协调器、控制器)
├── LLM (大语言模型封装)
├── ContextManager (上下文管理器)
└── MCPClient (mcp工具调用客户端)
这种设计的优势在于:
- 职责分离:每个组件专注于自己的核心功能
- 易于扩展:可以独立替换任何组件
- 便于调试:问题定位更加精确
二、核心组件实现
1. Agent 核心类
Agent 类是整个系统的"大脑",负责协调各个组件:
class Agent:def __init__(self, model: LLM, context_manager: ContextManager = None, server_script_path: str = '', ctx: dict = None):self.model = model # LLM模型self.mcp_client = MCPClient(server_script_path) # 工具调用客户端self.context_manager = context_manager # 上下文管理self.ctx = ctx # 用户信息上下文async def ask(self, message: str, use_context: bool = True):"""处理用户问题,支持上下文管理Args:message: 用户消息use_context: 是否使用上下文历史Returns:str: AI回答"""# 记录用户消息到上下文if use_context:if not self.context_manager:self.context_manager = ContextManager(str(uuid.uuid4()))self.context_manager.add_message("user", message)else:self.context_manager = Nonetry:answer = await self.mcp_client.ask(message, self.model, self.context_manager, self.ctx)# 记录AI回答到上下文if use_context:self.context_manager.add_message("assistant", answer)return answerexcept Exception as e:error_msg = f"处理查询时发生错误: {str(e)}"# 即使出错也要记录到上下文if use_context:self.context_manager.add_message("assistant", error_msg)return error_msg
设计要点:
- 依赖注入:通过构造函数注入各个组件,提高可测试性
- 上下文传递:ctx 参数确保用户身份在整个调用链中传递
- 灵活配置:可选的上下文管理器,支持有状态和无状态两种模式
2. LLM 模型封装
class LLM:async def chat(self, messages: list, **kwargs):call_params = {"model": self.model or "gpt-4","messages": messages,"temperature": kwargs.get('temperature', 0.7)}# 关键:动态工具调用支持available_tools = kwargs.get('available_tools')if available_tools:call_params["tools"] = available_toolscall_params["tool_choice"] = "auto"return await self.client.chat.completions.create(**call_params)
核心设计思想:
- 工具调用控制:根据是否有可用工具动态决定是否启用 Function Calling
- 参数透传:通过 kwargs 支持灵活的参数配置
- 异步处理:全异步设计,提升并发性能
- 支持主流大模型:继承LLM实现chat方法即可
3. 上下文管理器
上下文管理是 Agent 的"记忆系统",负责维护对话历史:
@dataclass
class ChatMessage:role: str # 角色:user/assistantcontent: str # 消息内容timestamp: str # 时间戳message_id: str # 唯一标识class ContextManager:def __init__(self, session_id: str, context_length: int = 8, max_store_context_length: int = 100):"""初始化上下文管理器Args:session_id: 会话IDmax_store_context_length: 每个会话最大保存的消息数量,默认100条context_length: 每次会话携带的历史消息数量,默认8条"""self.session_id = session_idself.context_length = context_lengthself.context_dir = settings.CONTEXT_DIRself.max_context_length = max_store_context_lengthself.session: Optional[ChatSession] = None# 确保上下文目录存在os.makedirs(self.context_dir, exist_ok=True)# 初始化时加载或创建会话self._initialize_session()def _get_session_file_path(self) -> str:"""获取会话文件路径"""return os.path.join(self.context_dir, f"{self.session_id}.json")def _initialize_session(self) -> None:"""初始化会话,尝试从文件加载,如果不存在则创建新会话"""session = self._load_session_from_file()if session is None:# 创建新会话now = datetime.now().isoformat()session = ChatSession(session_id=self.session_id,messages=[],created_at=now,updated_at=now)logger.info(f"Created new session: {self.session_id}")else:logger.info(f"Loaded existing session: {self.session_id}")self.session = sessiondef _load_session_from_file(self) -> Optional[ChatSession]:"""从文件加载会话数据"""file_path = self._get_session_file_path()if not os.path.exists(file_path):return Nonetry:with open(file_path, 'r', encoding='utf-8') as f:data = json.load(f)# 将字典数据转换为ChatMessage对象messages = [ChatMessage(**msg) for msg in data.get('messages', [])]session = ChatSession(session_id=data['session_id'],messages=messages,created_at=data['created_at'],updated_at=data['updated_at'])return sessionexcept Exception as e:logger.error(f"Error loading session {self.session_id} from file: {str(e)}")return Nonedef _save_session_to_file(self) -> bool:"""将会话数据保存到文件"""if self.session is None:logger.error(f"No session to save for {self.session_id}")return Falsefile_path = self._get_session_file_path()try:# 将ChatSession转换为字典session_data = {'session_id': self.session_id,'messages': [asdict(msg) for msg in self.session.messages],'created_at': self.session.created_at,'updated_at': self.session.updated_at}with open(file_path, 'w', encoding='utf-8') as f:json.dump(session_data, f, ensure_ascii=False, indent=2)return Trueexcept Exception as e:logger.error(f"Error saving session {self.session_id} to file: {str(e)}")return Falsedef add_message(self, role: str, content: str) -> Optional[ChatMessage]:"""添加消息到会话"""if self.session is None:logger.error(f"No session available for {self.session_id}")return Nonemessage = ChatMessage(role=role,content=content,timestamp=datetime.now().isoformat())self.session.messages.append(message)self.session.updated_at = datetime.now().isoformat()# 限制消息数量if len(self.session.messages) > self.max_context_length:removed_count = len(self.session.messages) - self.max_context_lengthself.session.messages = self.session.messages[-self.max_context_length:]logger.debug(f"Trimmed {removed_count} old messages from session {self.session_id}")# 保存到文件self._save_session_to_file()logger.debug(f"Added {role} message to session {self.session_id}: {content[:100]}...")return messagedef get_context_messages(self, last_n: Optional[int] = None) -> List[Dict[str, str]]:"""获取上下文消息,返回适合LLM API的格式Args:last_n: 获取最近n条消息,如果为None则使用context_length配置的数量Returns:包含role和content的消息列表"""if self.session is None:logger.error(f"No session available for {self.session_id}")return []messages = self.session.messagesif last_n is not None:messages = messages[-last_n:]else:messages = messages[-self.context_length:]# 转换为LLM API格式context_messages = [{"role": msg.role, "content": msg.content}for msg in messages]logger.debug(f"Retrieved {len(context_messages)} context messages for session {self.session_id}")return context_messages@staticmethoddef list_sessions(context_dir: Optional[str] = None) -> List[str]:"""列出所有会话ID"""if context_dir is None:context_dir = settings.CONTEXT_DIRsessions = set()# 从文件中获取if os.path.exists(context_dir):for filename in os.listdir(context_dir):if filename.endswith('.json'):session_id = filename[:-5] # 移除.json后缀sessions.add(session_id)return list(sessions)
设计亮点:
- 数据类:使用 @dataclass 简化消息结构定义
- 自动持久化:每次添加消息都自动保存,防止数据丢失
- 长度控制:防止上下文无限增长影响性能
- 结构化存储:JSON 格式存储,便于查看和调试
- 可以使用其他存储后端:如oss、数据库等,实现ContextManager核心add_message、get_context_messages方法即可
三、核心工作流程
让我们深入了解 Agent 处理用户查询的完整流程:
async def ask(self, message: str, use_context: bool = True):# 1. 身份验证if 'username' not in self.ctx:return "未识别出当前用户身份"# 2. 上下文管理if use_context:self.context_manager.add_message("user", message)# 3. 委托给 MCP 客户端处理try:answer = await self.mcp_client.ask(message, self.model, self.context_manager, self.ctx)# 4. 保存回答到上下文if use_context:self.context_manager.add_message("assistant", answer)return answerexcept Exception as e:# 5. 错误处理error_msg = f"处理查询时发生错误: {str(e)}"if use_context:self.context_manager.add_message("assistant", error_msg)return error_msg
流程解析:
- 身份验证:确保每个请求都有明确的用户身份
- 上下文记录:将用户消息加入历史记录
- 委托处理:交给 MCP 客户端进行具体的 LLM 调用和工具执行
- 结果保存:将 AI 回答保存到上下文中
- 错误兜底:即使出错也要记录到上下文,保持对话连续性
四、MCP 模块深度解析
MCP(Model Context Protocol)是我们系统中最核心的工具调用模块,负责管理和执行各种业务工具。让我们深入了解其设计和实现:
1. MCP 整体架构
MCP 模块
├── Server (工具注册中心)
│ ├── FastMCP 服务器实例
│ └── 工具注册管理
├── Client (连接管理器)
│ ├── 单例连接管理
│ ├── 多轮工具调用处理
│ └── 上下文传递
└── Tools (工具集合)├── billing/ (账单查询工具)├── file/ (文件操作工具)└── metric/ (指标查询工具)
2. MCP 服务器:工具注册中心
# server.py - 统一的工具注册中心
from mcp.server.fastmcp import FastMCP# 创建服务器实例
mcp_server = FastMCP("SmartAgentServer")# 模块化注册各类工具
register_billing_tools(mcp_server) # 账单查询工具
register_metric_tools(mcp_server) # 指标监控工具
register_file_tools(mcp_server) # 文件操作工具
设计亮点:
- 模块化注册:每个工具类别独立注册,便于管理和扩展
- 统一服务器:所有工具通过同一个服务器实例提供服务
- 标准化接口:基于 FastMCP 框架,确保工具接口一致性
3. MCP 客户端:智能连接管理器
MCP 客户端是系统的"神经中枢",负责管理与工具服务器的连接和工具调用:
class MCPClient:# 单例模式确保全局唯一连接_instance = Noneasync def get_connection(self) -> bool:"""智能连接管理:检测连接状态,自动重连"""if self._is_connected and self.session:try:# 连接健康检查await self.session.list_tools()return Trueexcept Exception:# 连接失效,需要重连await self._establish_connection()async def ask(self, query: str, model, context_manager, ctx) -> str:"""核心查询处理逻辑"""# 1. 确保连接可用if not await self.get_connection():return "工具服务连接失败"# 2. 构建系统消息(权限控制)system_message = self._build_system_message(ctx['username'])# 3. 整合上下文历史messages = [system_message] + context_manager.get_context_messages()# 4. 多轮工具调用处理return await self._process_query_with_messages(messages, model, ctx['username'])
4. 多轮工具调用核心逻辑
这是 MCP 客户端最精妙的设计——支持多轮智能工具调用:
async def _process_query_with_messages(self, messages, model, username):"""多轮工具调用的核心处理逻辑"""max_rounds = 10 # 防止无限循环current_round = 0while current_round < max_rounds:current_round += 1# 1. 调用 LLM,传递可用工具列表response = await model.chat(messages=messages, available_tools=self.available_tools)choice = response.choices[0]# 2. 判断 LLM 的决策if choice.finish_reason == "tool_calls":# LLM 决定使用工具logger.info(f"第{current_round}轮:使用{len(choice.message.tool_calls)}个工具")# 3. 记录 LLM 的工具调用决策messages.append(choice.message.model_dump())# 4. 执行每个工具调用for tool_call in choice.message.tool_calls:tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)tool_args['username'] = username # 注入用户身份# 执行工具并记录结果tool_result = await self.call_tool(tool_name, tool_args)messages.append({"role": "tool", "content": tool_result, "tool_call_id": tool_call.id})# 5. 继续下一轮,让 LLM 基于工具结果决定后续动作continueelse:# LLM 生成最终答案,结束循环return choice.message.content# 达到最大轮次限制return "工具调用次数过多,请简化问题"
核心设计思想:
- 智能决策:每轮都让 LLM 决定是继续使用工具还是给出最终答案
- 上下文传递:工具执行结果会加入对话历史,供下轮使用
- 循环控制:设置最大轮次防止无限循环
- 身份注入:每个工具调用都自动注入用户身份信息
5. 工具实现模式
以账单查询工具为例,展示标准的工具实现模式:
def register_billing_tools(mcp_server):"""工具注册函数"""@mcp_server.tool()def billing_query_tool(username: str,month: Optional[str] = None,resource_types: Optional[list[str]] = None,# ... 其他参数):"""工具描述:支持多维度灵活查询账单费用参数说明和使用示例..."""# 1. 权限验证permed_orgs = get_billing_permed_orgs(username)if not permed_orgs:return "权限不足"# 2. 参数验证和处理if not validate_params(month, resource_types):return "参数错误"# 3. 业务逻辑执行try:with DBSession() as db:# 数据库查询逻辑results = db.execute(query).fetchall()return format_results(results)except Exception as e:return f"查询失败: {str(e)}"
工具设计原则:
- 权限优先:每个工具都先验证用户权限
- 参数验证:严格验证输入参数的合法性
- 错误处理:提供友好的错误信息
- 结果格式化:返回易于理解的结构化结果
6. 连接管理优化策略
class MCPClient:async def get_connection(self):"""连接管理的核心策略"""async with self._connection_lock:# 1. 连接状态检查if self._is_connected and self.session:try:await self.session.list_tools() # 健康检查return Trueexcept Exception:logger.warning("连接失效,准备重连")self._is_connected = False# 2. 建立新连接return await self._establish_connection()async def _establish_connection(self):"""连接建立的标准流程"""try:# a. 启动 MCP 服务器进程server_params = StdioServerParameters(command="python", args=[self.server_script_path])# b. 建立 stdio 通信管道self.stdio, self.write = await self.exit_stack.enter_async_context(stdio_client(server_params))# c. 创建客户端会话self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))# d. 初始化会话并获取工具列表await self.session.initialize()await self._refresh_available_tools()logger.info(f"MCP连接成功,可用工具: {len(self.available_tools)}")return Trueexcept Exception as e:logger.error(f"连接失败: {e}")return False
7. MCP 工作流程图
五、关键设计模式
1. 单例模式 - MCP 客户端
class MCPClient:_instance = Nonedef __new__(cls, *args, **kwargs):if cls._instance is None:cls._instance = super().__new__(cls)return cls._instance
优势:避免重复创建连接,提升性能和资源利用率。
2. 策略模式 - LLM 接口
# 可以轻松替换不同的 LLM 实现
class CustomLLM:async def chat(self, messages: list, **kwargs):# 自定义实现(如调用其他 API)return response# 使用时只需替换实例
agent = Agent(model=CustomLLM())
3. 工厂模式 - 工具注册
# 在 MCP 服务器中动态注册工具
def register_billing_tools():return [billing_query_tool, billing_breakdown_tool, ...]def register_file_tools():return [create_file_tool, read_file_tool, ...]
六、扩展指南
想要添加新功能?按照以下步骤:
- 新增工具:在
mcp/tools/
下创建工具模块 - 注册工具:在
mcp/server.py
中注册 - 测试验证:编写测试用例确保功能正常
- 文档更新:更新 README 和使用示例
七、性能优化技巧
- 连接复用:MCP 客户端使用单例模式
- 异步处理:全链路异步,避免阻塞
- 上下文限制:控制历史消息长度
- 工具调用限制:防止无限循环调用
通过这种模块化、可扩展的设计,我们不仅实现了一个功能完整的 Agent,更重要的是获得了可解释、可控、可演进的智能体架构。每个组件都有清晰的职责边界,便于独立测试、调试和优化。
这就是"手搓" Agent 的价值所在:透明的实现逻辑 + 灵活的扩展能力 + 精确的问题定位。
八、结果展示
产品形态以钉钉机器人问答形式展示: