当前位置: 首页 > news >正文

手搓一个企业级Agent智能体


在上一篇文章中,我们已经对 Agent 智能体的基本概念及相关话题进行了初步探讨。本文将从工程实践的角度,尝试一步步“手搓”一个属于自己的 Agent 智能体。

当然,市面上已经有不少优秀的开源框架可供选择,例如 LangChain、AWS 开源的 Strands-agents 等。既然如此,为什么还要“重复造轮子”?

原因在于:大模型的本质是概率模型,这决定了 Agent 的决策过程天然存在不确定性,工具调用的结果也可能不够精确。而开源框架往往像一个“黑盒子”,当问题出现时,开发者很难深入排查,更谈不上对其策略进行有针对性的干预与优化。

如果你真正想要 调优 Agent 的策略与准确性,仅仅依赖这些框架可能会显得捉襟见肘。

因此,“手搓”一个 Agent 的意义就在于:

  • 透明度:能够实时展现智能体的推理路径与决策逻辑,而非被黑盒遮蔽。
  • 可控性:允许在关键节点进行人为干预,从而持续优化决策质量。
  • 可观测性:自由地埋点、监控与分析 Agent 的内部行为,形成更完善的调优闭环。

简而言之,自己动手并不仅仅是“重复造轮子”,而是为了获得可解释、可控、可演进的智能体实现。

好,话不多说,下面我们正式开搓。

在真正开始写代码之前,我们先来看一张“极简版”的 Agent 框架图:
在这里插入图片描述
这就是一个最小可用的 Agent 基础框架,虽然简化,但已经涵盖了“感知—决策—执行—反馈”的核心循环逻辑。

下面上核心逻辑代码:

一、目录结构

在这里插入图片描述
采用了模块化设计,核心包含四个组件:

Agent (协调器、控制器)
├── LLM (大语言模型封装) 
├── ContextManager (上下文管理器)
└── MCPClient (mcp工具调用客户端)

这种设计的优势在于:

  • 职责分离:每个组件专注于自己的核心功能
  • 易于扩展:可以独立替换任何组件
  • 便于调试:问题定位更加精确

二、核心组件实现

1. Agent 核心类

Agent 类是整个系统的"大脑",负责协调各个组件:

class Agent:def __init__(self, model: LLM, context_manager: ContextManager = None, server_script_path: str = '', ctx: dict = None):self.model = model                              # LLM模型self.mcp_client = MCPClient(server_script_path) # 工具调用客户端self.context_manager = context_manager          # 上下文管理self.ctx = ctx                                  # 用户信息上下文async def ask(self, message: str, use_context: bool = True):"""处理用户问题,支持上下文管理Args:message: 用户消息use_context: 是否使用上下文历史Returns:str: AI回答"""# 记录用户消息到上下文if use_context:if not self.context_manager:self.context_manager = ContextManager(str(uuid.uuid4()))self.context_manager.add_message("user", message)else:self.context_manager = Nonetry:answer = await self.mcp_client.ask(message, self.model, self.context_manager, self.ctx)# 记录AI回答到上下文if use_context:self.context_manager.add_message("assistant", answer)return answerexcept Exception as e:error_msg = f"处理查询时发生错误: {str(e)}"# 即使出错也要记录到上下文if use_context:self.context_manager.add_message("assistant", error_msg)return error_msg

设计要点

  • 依赖注入:通过构造函数注入各个组件,提高可测试性
  • 上下文传递:ctx 参数确保用户身份在整个调用链中传递
  • 灵活配置:可选的上下文管理器,支持有状态和无状态两种模式
2. LLM 模型封装
class LLM:async def chat(self, messages: list, **kwargs):call_params = {"model": self.model or "gpt-4","messages": messages,"temperature": kwargs.get('temperature', 0.7)}# 关键:动态工具调用支持available_tools = kwargs.get('available_tools')if available_tools:call_params["tools"] = available_toolscall_params["tool_choice"] = "auto"return await self.client.chat.completions.create(**call_params)

核心设计思想

  • 工具调用控制:根据是否有可用工具动态决定是否启用 Function Calling
  • 参数透传:通过 kwargs 支持灵活的参数配置
  • 异步处理:全异步设计,提升并发性能
  • 支持主流大模型:继承LLM实现chat方法即可
3. 上下文管理器

上下文管理是 Agent 的"记忆系统",负责维护对话历史:

@dataclass
class ChatMessage:role: str          # 角色:user/assistantcontent: str       # 消息内容timestamp: str     # 时间戳message_id: str    # 唯一标识class ContextManager:def __init__(self, session_id: str, context_length: int = 8, max_store_context_length: int = 100):"""初始化上下文管理器Args:session_id: 会话IDmax_store_context_length: 每个会话最大保存的消息数量,默认100条context_length: 每次会话携带的历史消息数量,默认8条"""self.session_id = session_idself.context_length = context_lengthself.context_dir = settings.CONTEXT_DIRself.max_context_length = max_store_context_lengthself.session: Optional[ChatSession] = None# 确保上下文目录存在os.makedirs(self.context_dir, exist_ok=True)# 初始化时加载或创建会话self._initialize_session()def _get_session_file_path(self) -> str:"""获取会话文件路径"""return os.path.join(self.context_dir, f"{self.session_id}.json")def _initialize_session(self) -> None:"""初始化会话,尝试从文件加载,如果不存在则创建新会话"""session = self._load_session_from_file()if session is None:# 创建新会话now = datetime.now().isoformat()session = ChatSession(session_id=self.session_id,messages=[],created_at=now,updated_at=now)logger.info(f"Created new session: {self.session_id}")else:logger.info(f"Loaded existing session: {self.session_id}")self.session = sessiondef _load_session_from_file(self) -> Optional[ChatSession]:"""从文件加载会话数据"""file_path = self._get_session_file_path()if not os.path.exists(file_path):return Nonetry:with open(file_path, 'r', encoding='utf-8') as f:data = json.load(f)# 将字典数据转换为ChatMessage对象messages = [ChatMessage(**msg) for msg in data.get('messages', [])]session = ChatSession(session_id=data['session_id'],messages=messages,created_at=data['created_at'],updated_at=data['updated_at'])return sessionexcept Exception as e:logger.error(f"Error loading session {self.session_id} from file: {str(e)}")return Nonedef _save_session_to_file(self) -> bool:"""将会话数据保存到文件"""if self.session is None:logger.error(f"No session to save for {self.session_id}")return Falsefile_path = self._get_session_file_path()try:# 将ChatSession转换为字典session_data = {'session_id': self.session_id,'messages': [asdict(msg) for msg in self.session.messages],'created_at': self.session.created_at,'updated_at': self.session.updated_at}with open(file_path, 'w', encoding='utf-8') as f:json.dump(session_data, f, ensure_ascii=False, indent=2)return Trueexcept Exception as e:logger.error(f"Error saving session {self.session_id} to file: {str(e)}")return Falsedef add_message(self, role: str, content: str) -> Optional[ChatMessage]:"""添加消息到会话"""if self.session is None:logger.error(f"No session available for {self.session_id}")return Nonemessage = ChatMessage(role=role,content=content,timestamp=datetime.now().isoformat())self.session.messages.append(message)self.session.updated_at = datetime.now().isoformat()# 限制消息数量if len(self.session.messages) > self.max_context_length:removed_count = len(self.session.messages) - self.max_context_lengthself.session.messages = self.session.messages[-self.max_context_length:]logger.debug(f"Trimmed {removed_count} old messages from session {self.session_id}")# 保存到文件self._save_session_to_file()logger.debug(f"Added {role} message to session {self.session_id}: {content[:100]}...")return messagedef get_context_messages(self, last_n: Optional[int] = None) -> List[Dict[str, str]]:"""获取上下文消息,返回适合LLM API的格式Args:last_n: 获取最近n条消息,如果为None则使用context_length配置的数量Returns:包含role和content的消息列表"""if self.session is None:logger.error(f"No session available for {self.session_id}")return []messages = self.session.messagesif last_n is not None:messages = messages[-last_n:]else:messages = messages[-self.context_length:]# 转换为LLM API格式context_messages = [{"role": msg.role, "content": msg.content}for msg in messages]logger.debug(f"Retrieved {len(context_messages)} context messages for session {self.session_id}")return context_messages@staticmethoddef list_sessions(context_dir: Optional[str] = None) -> List[str]:"""列出所有会话ID"""if context_dir is None:context_dir = settings.CONTEXT_DIRsessions = set()# 从文件中获取if os.path.exists(context_dir):for filename in os.listdir(context_dir):if filename.endswith('.json'):session_id = filename[:-5]  # 移除.json后缀sessions.add(session_id)return list(sessions)

设计亮点

  • 数据类:使用 @dataclass 简化消息结构定义
  • 自动持久化:每次添加消息都自动保存,防止数据丢失
  • 长度控制:防止上下文无限增长影响性能
  • 结构化存储:JSON 格式存储,便于查看和调试
  • 可以使用其他存储后端:如oss、数据库等,实现ContextManager核心add_message、get_context_messages方法即可

三、核心工作流程

让我们深入了解 Agent 处理用户查询的完整流程:

async def ask(self, message: str, use_context: bool = True):# 1. 身份验证if 'username' not in self.ctx:return "未识别出当前用户身份"# 2. 上下文管理if use_context:self.context_manager.add_message("user", message)# 3. 委托给 MCP 客户端处理try:answer = await self.mcp_client.ask(message, self.model, self.context_manager, self.ctx)# 4. 保存回答到上下文if use_context:self.context_manager.add_message("assistant", answer)return answerexcept Exception as e:# 5. 错误处理error_msg = f"处理查询时发生错误: {str(e)}"if use_context:self.context_manager.add_message("assistant", error_msg)return error_msg

流程解析

  1. 身份验证:确保每个请求都有明确的用户身份
  2. 上下文记录:将用户消息加入历史记录
  3. 委托处理:交给 MCP 客户端进行具体的 LLM 调用和工具执行
  4. 结果保存:将 AI 回答保存到上下文中
  5. 错误兜底:即使出错也要记录到上下文,保持对话连续性

四、MCP 模块深度解析

MCP(Model Context Protocol)是我们系统中最核心的工具调用模块,负责管理和执行各种业务工具。让我们深入了解其设计和实现:

1. MCP 整体架构
MCP 模块
├── Server (工具注册中心)
│   ├── FastMCP 服务器实例
│   └── 工具注册管理
├── Client (连接管理器)
│   ├── 单例连接管理
│   ├── 多轮工具调用处理
│   └── 上下文传递
└── Tools (工具集合)├── billing/ (账单查询工具)├── file/ (文件操作工具)└── metric/ (指标查询工具)
2. MCP 服务器:工具注册中心
# server.py - 统一的工具注册中心
from mcp.server.fastmcp import FastMCP# 创建服务器实例
mcp_server = FastMCP("SmartAgentServer")# 模块化注册各类工具
register_billing_tools(mcp_server)  # 账单查询工具
register_metric_tools(mcp_server)   # 指标监控工具
register_file_tools(mcp_server)     # 文件操作工具

设计亮点

  • 模块化注册:每个工具类别独立注册,便于管理和扩展
  • 统一服务器:所有工具通过同一个服务器实例提供服务
  • 标准化接口:基于 FastMCP 框架,确保工具接口一致性
3. MCP 客户端:智能连接管理器

MCP 客户端是系统的"神经中枢",负责管理与工具服务器的连接和工具调用:

class MCPClient:# 单例模式确保全局唯一连接_instance = Noneasync def get_connection(self) -> bool:"""智能连接管理:检测连接状态,自动重连"""if self._is_connected and self.session:try:# 连接健康检查await self.session.list_tools()return Trueexcept Exception:# 连接失效,需要重连await self._establish_connection()async def ask(self, query: str, model, context_manager, ctx) -> str:"""核心查询处理逻辑"""# 1. 确保连接可用if not await self.get_connection():return "工具服务连接失败"# 2. 构建系统消息(权限控制)system_message = self._build_system_message(ctx['username'])# 3. 整合上下文历史messages = [system_message] + context_manager.get_context_messages()# 4. 多轮工具调用处理return await self._process_query_with_messages(messages, model, ctx['username'])
4. 多轮工具调用核心逻辑

这是 MCP 客户端最精妙的设计——支持多轮智能工具调用:

async def _process_query_with_messages(self, messages, model, username):"""多轮工具调用的核心处理逻辑"""max_rounds = 10  # 防止无限循环current_round = 0while current_round < max_rounds:current_round += 1# 1. 调用 LLM,传递可用工具列表response = await model.chat(messages=messages, available_tools=self.available_tools)choice = response.choices[0]# 2. 判断 LLM 的决策if choice.finish_reason == "tool_calls":# LLM 决定使用工具logger.info(f"第{current_round}轮:使用{len(choice.message.tool_calls)}个工具")# 3. 记录 LLM 的工具调用决策messages.append(choice.message.model_dump())# 4. 执行每个工具调用for tool_call in choice.message.tool_calls:tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)tool_args['username'] = username  # 注入用户身份# 执行工具并记录结果tool_result = await self.call_tool(tool_name, tool_args)messages.append({"role": "tool", "content": tool_result, "tool_call_id": tool_call.id})# 5. 继续下一轮,让 LLM 基于工具结果决定后续动作continueelse:# LLM 生成最终答案,结束循环return choice.message.content# 达到最大轮次限制return "工具调用次数过多,请简化问题"

核心设计思想

  1. 智能决策:每轮都让 LLM 决定是继续使用工具还是给出最终答案
  2. 上下文传递:工具执行结果会加入对话历史,供下轮使用
  3. 循环控制:设置最大轮次防止无限循环
  4. 身份注入:每个工具调用都自动注入用户身份信息
5. 工具实现模式

以账单查询工具为例,展示标准的工具实现模式:

def register_billing_tools(mcp_server):"""工具注册函数"""@mcp_server.tool()def billing_query_tool(username: str,month: Optional[str] = None,resource_types: Optional[list[str]] = None,# ... 其他参数):"""工具描述:支持多维度灵活查询账单费用参数说明和使用示例..."""# 1. 权限验证permed_orgs = get_billing_permed_orgs(username)if not permed_orgs:return "权限不足"# 2. 参数验证和处理if not validate_params(month, resource_types):return "参数错误"# 3. 业务逻辑执行try:with DBSession() as db:# 数据库查询逻辑results = db.execute(query).fetchall()return format_results(results)except Exception as e:return f"查询失败: {str(e)}"

工具设计原则

  • 权限优先:每个工具都先验证用户权限
  • 参数验证:严格验证输入参数的合法性
  • 错误处理:提供友好的错误信息
  • 结果格式化:返回易于理解的结构化结果
6. 连接管理优化策略
class MCPClient:async def get_connection(self):"""连接管理的核心策略"""async with self._connection_lock:# 1. 连接状态检查if self._is_connected and self.session:try:await self.session.list_tools()  # 健康检查return Trueexcept Exception:logger.warning("连接失效,准备重连")self._is_connected = False# 2. 建立新连接return await self._establish_connection()async def _establish_connection(self):"""连接建立的标准流程"""try:# a. 启动 MCP 服务器进程server_params = StdioServerParameters(command="python", args=[self.server_script_path])# b. 建立 stdio 通信管道self.stdio, self.write = await self.exit_stack.enter_async_context(stdio_client(server_params))# c. 创建客户端会话self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))# d. 初始化会话并获取工具列表await self.session.initialize()await self._refresh_available_tools()logger.info(f"MCP连接成功,可用工具: {len(self.available_tools)}")return Trueexcept Exception as e:logger.error(f"连接失败: {e}")return False
7. MCP 工作流程图

在这里插入图片描述

五、关键设计模式

1. 单例模式 - MCP 客户端
class MCPClient:_instance = Nonedef __new__(cls, *args, **kwargs):if cls._instance is None:cls._instance = super().__new__(cls)return cls._instance

优势:避免重复创建连接,提升性能和资源利用率。

2. 策略模式 - LLM 接口
# 可以轻松替换不同的 LLM 实现
class CustomLLM:async def chat(self, messages: list, **kwargs):# 自定义实现(如调用其他 API)return response# 使用时只需替换实例
agent = Agent(model=CustomLLM())
3. 工厂模式 - 工具注册
# 在 MCP 服务器中动态注册工具
def register_billing_tools():return [billing_query_tool, billing_breakdown_tool, ...]def register_file_tools():return [create_file_tool, read_file_tool, ...]

六、扩展指南

想要添加新功能?按照以下步骤:

  1. 新增工具:在 mcp/tools/ 下创建工具模块
  2. 注册工具:在 mcp/server.py 中注册
  3. 测试验证:编写测试用例确保功能正常
  4. 文档更新:更新 README 和使用示例

七、性能优化技巧

  1. 连接复用:MCP 客户端使用单例模式
  2. 异步处理:全链路异步,避免阻塞
  3. 上下文限制:控制历史消息长度
  4. 工具调用限制:防止无限循环调用

通过这种模块化、可扩展的设计,我们不仅实现了一个功能完整的 Agent,更重要的是获得了可解释、可控、可演进的智能体架构。每个组件都有清晰的职责边界,便于独立测试、调试和优化。

这就是"手搓" Agent 的价值所在:透明的实现逻辑 + 灵活的扩展能力 + 精确的问题定位

八、结果展示

产品形态以钉钉机器人问答形式展示:
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


文章转载自:

http://wVJ2o0Jq.nhzxd.cn
http://kbcRJQfK.nhzxd.cn
http://HpiTQUgU.nhzxd.cn
http://tctBYCNg.nhzxd.cn
http://oVo1OwjQ.nhzxd.cn
http://h33zYbtF.nhzxd.cn
http://JuhpZ9M6.nhzxd.cn
http://ypslojAN.nhzxd.cn
http://STbWveoh.nhzxd.cn
http://m6d5iNij.nhzxd.cn
http://ya8LKDvT.nhzxd.cn
http://9kkXxtHF.nhzxd.cn
http://TVH4eXoi.nhzxd.cn
http://c50AZQxE.nhzxd.cn
http://MNYS77nm.nhzxd.cn
http://559f87RD.nhzxd.cn
http://RsLEcose.nhzxd.cn
http://GgXgTWNP.nhzxd.cn
http://4UWFiraQ.nhzxd.cn
http://vAfa8ki4.nhzxd.cn
http://jecuwLBY.nhzxd.cn
http://WWKVgOlT.nhzxd.cn
http://W9LYcOlk.nhzxd.cn
http://jHvhmwsU.nhzxd.cn
http://u2cIS8jH.nhzxd.cn
http://pjCbhyxj.nhzxd.cn
http://QhHZRosl.nhzxd.cn
http://G6oBWYP5.nhzxd.cn
http://kwJXmmuB.nhzxd.cn
http://aVMTm7O1.nhzxd.cn
http://www.dtcms.com/a/364978.html

相关文章:

  • PyTorch 面试题及详细答案120题(116-120)-- 综合应用与实践
  • 英语四级学习指南
  • 《单链表学习手册:从原理到代码实现(含头插 / 尾插 / 销毁)》
  • go-mapus为局域网地图协作而生
  • 充电枪结构-常规特征设计
  • 小程序点击之数据绑定
  • 【数学建模学习笔记】相关性分析
  • Git在idea中的实战使用经验(二)
  • Elasticsearch 数字字段随机取多值查询缓慢-原理分析与优化方案
  • 408考研——单链表代码题常见套路总结
  • [光学原理与应用-375]:ZEMAX - 分析 - 物理光学图
  • Debezium报错处理系列之第130篇:OutOfMemoryError: Java heap space
  • 复杂网络环境不用愁,声网IoT多通道传输实战经验丰富
  • 数据结构---双向链表
  • 明确用户提问的核心
  • 【计算机网络】TCP状态转移
  • AI随笔番外 · 猫猫狐狐的尾巴式技术分享
  • 醋酸铕:点亮现代生活的“隐形之光“
  • Java jar 如何防止被反编译?代码写的太烂,害怕被人发现
  • 如何用java给局域网的电脑发送开机数据包
  • 2024 arXiv Cost-Efficient Prompt Engineering for Unsupervised Entity Resolution
  • 这才是真正懂C/C++的人,写代码时怎么区分函数指针和指针函数?
  • Masonry
  • 少儿编程C++快速教程之——1. 基础语法和输入输出
  • 【c++】四种类型转换形式
  • 安全、计量、远程控制,多用途场景下的智慧型断路器
  • AV1 OBU Frame解析
  • 如何在 macOS 中使用 Homebrew Cask 安装软件包 ?
  • 机器学习从入门到精通 - 决策树完全解读:信息熵、剪枝策略与可视化实战
  • Java 合并 PDF:实用教程与解决方案