智能体开发(2)智能数据处理Agent
目标:工具、ToolNode、agent执行器、工具调用状态、函数调用
以上是逻辑和状态的一部分
环境准备和导入
输入控制:TypedDict
,Literal
,List
和Dict
数据处理:requests
处理HTTP网络请求
AI工具:ToolNode
和tools_condition
处理工具调用逻辑,tool
装饰器创建可调用工具,BaseModel
和Field
用于定义数据验证模型。
# 安装必要的库
# !pip install langgraph langchain-core langchain-openai langchain-communityimport os
from typing import Annotated, Literal, TypedDict, List, Dict, Any
from langgraph import StateGraph, START, END
from langgraph.graph import MessagesState
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain_core.pydantic_v1 import BaseModel, Field
import json
import random
import time
import requests
from datetime import datetime, timedeltaprint("环境准备完成")
1. 基础工具定义和使用
首先学习如何定义和使用基本的工具
@tool:类似springboot的@Component,告诉ai你可以用
# 使用装饰器定义工具
@tool
def calculator(expression: str) -> str:"""计算数学表达式的结果。Args:expression: 要计算的数学表达式,例如 "2 + 3 * 4"Returns:计算结果的字符串表示"""try:# 安全的数学表达式计算allowed_chars = '0123456789+-*/().'if all(c in allowed_chars or c.isspace() for c in expression):result = eval(expression)return f"计算结果: {result}"else:return "错误: 表达式包含不允许的字符"except Exception as e:return f"计算错误: {str(e)}"@tool
def get_current_time() -> str:"""获取当前的日期和时间。Returns:当前日期和时间的字符串"""current_time = datetime.now()return f"当前时间: {current_time.strftime('%Y-%m-%d %H:%M:%S')}"@tool
def search_weather(city: str) -> str:"""查询指定城市的天气信息(模拟)。Args:city: 要查询的城市名称Returns:天气信息字符串"""# 模拟天气数据weather_options = ["晴天,气温25°C","多云,气温22°C", "小雨,气温18°C","阴天,气温20°C"]weather = random.choice(weather_options)return f"{city}的天气: {weather}"@tool
def generate_random_number(min_val: int = 1, max_val: int = 100) -> str:"""生成指定范围内的随机数。Args:min_val: 最小值(默认1)max_val: 最大值(默认100)Returns:随机数字符串"""number = random.randint(min_val, max_val)return f"随机数: {number} (范围: {min_val}-{max_val})"# 创建工具列表
basic_tools = [calculator, get_current_time, search_weather, generate_random_number]# 测试工具
print("基础工具测试:")
print(calculator.invoke({"expression": "2 + 3 * 4"}))
print(get_current_time.invoke({}))
print(search_weather.invoke({"city": "北京"}))
print(generate_random_number.invoke({"min_val": 10, "max_val": 50}))
2. ToolNode的使用
ToolNode是LangGraph提供的专门处理工具调用的节点类型
🆚 核心区别:
方面 |
|
|
---|---|---|
作用 | 定义工具函数 | 执行工具调用 |
层级 | 函数级别 | 节点级别 |
使用 | 装饰普通函数 | 在StateGraph中添加节点 |
输入 | 函数参数 | AI的tool_calls请求 |
输出 | 函数返回值 | 格式化的ToolMessage |
🎯 简单理解:
@tool
= 制作工具(锤子、螺丝刀)ToolNode
= 工具包(收纳所有工具,知道什么时候用什么工具)
AI说:"我要用锤子" → ToolNode
听到 → 从工具包拿出锤子(@tool
函数) → 使用 → 把结果告诉AI
# 定义带有工具调用的状态
class AgentState(TypedDict):messages: Annotated[list, "消息列表"]# 模拟LLM节点(实际应用中会使用真实的LLM)
def mock_llm_node(state: AgentState) -> AgentState:"""模拟LLM决策工具调用的节点"""messages = state.get("messages", [])last_message = messages[-1] if messages else Noneif isinstance(last_message, HumanMessage):content = last_message.content.lower()# 根据用户输入决定调用哪个工具if "计算" in content or "算" in content:# 提取数学表达式(简单示例)expression = "2 + 3" # 实际应用中需要更智能的提取if "*" in content or "乘" in content:expression = "5 * 6"elif "/" in content or "除" in content:expression = "20 / 4"ai_message = AIMessage(content="我来帮你计算",tool_calls=[{"name": "calculator","args": {"expression": expression},"id": "calc_" + str(random.randint(1000, 9999))}])elif "时间" in content or "几点" in content:ai_message = AIMessage(content="我来查询当前时间",tool_calls=[{"name": "get_current_time","args": {},"id": "time_" + str(random.randint(1000, 9999))}])elif "天气" in content:# 简单提取城市名(实际应用需要更复杂的NER)city = "北京" # 默认城市if "上海" in content:city = "上海"elif "深圳" in content:city = "深圳"ai_message = AIMessage(content=f"我来查询{city}的天气",tool_calls=[{"name": "search_weather","args": {"city": city},"id": "weather_" + str(random.randint(1000, 9999))}])elif "随机" in content:ai_message = AIMessage(content="我来生成一个随机数",tool_calls=[{"name": "generate_random_number","args": {"min_val": 1, "max_val": 100},"id": "random_" + str(random.randint(1000, 9999))}])else:ai_message = AIMessage(content="我没有找到合适的工具来处理您的请求。请尝试询问时间、天气、计算或随机数。")else:# 处理工具调用结果tool_messages = [msg for msg in messages if isinstance(msg, ToolMessage)]if tool_messages:latest_tool_result = tool_messages[-1].contentai_message = AIMessage(content=f"根据工具调用结果:{latest_tool_result}")else:ai_message = AIMessage(content="请告诉我您需要什么帮助。")return {"messages": messages + [ai_message]}# 创建ToolNode
tool_node = ToolNode(basic_tools)# 决定是否需要调用工具
def should_continue(state: AgentState) -> Literal["tools", "end"]:messages = state.get("messages", [])last_message = messages[-1] if messages else None# 检查最后一条消息是否包含工具调用if isinstance(last_message, AIMessage) and last_message.tool_calls:return "tools"return "end"# 构建带有工具调用的图
tool_workflow = StateGraph(AgentState)# 添加节点
tool_workflow.add_node("llm", mock_llm_node)
tool_workflow.add_node("tools", tool_node)# 设置入口点
tool_workflow.set_entry_point("llm")# 添加条件边
tool_workflow.add_conditional_edges("llm",should_continue,{"tools": "tools","end": END}
)# 工具执行后回到LLM
tool_workflow.add_edge("tools", "llm")# 编译图
tool_app = tool_workflow.compile()# 测试工具调用
test_queries = ["请帮我计算5乘以8","现在几点了?","北京今天天气怎么样?","给我一个随机数","你好" # 不需要工具调用的查询
]print("\n工具调用测试:")
for query in test_queries:print(f"\n用户: {query}")result = tool_app.invoke({"messages": [HumanMessage(content=query)]})# 显示对话历史for msg in result["messages"]:if isinstance(msg, AIMessage):print(f"AI: {msg.content}")if msg.tool_calls:print(f" 工具调用: {msg.tool_calls}")elif isinstance(msg, ToolMessage):print(f" 工具结果: {msg.content}")
3. 复杂工具系统
构建更复杂的工具系统,包含多种类型的工具
# 文件操作工具
@tool
def read_file(file_path: str) -> str:"""读取文件内容(模拟)。Args:file_path: 文件路径Returns:文件内容或错误信息"""# 模拟文件内容mock_files = {"data.txt": "这是数据文件的内容\n包含一些重要信息","config.json": '{"api_key": "xxx", "timeout": 30}',"log.txt": "2023-01-01 10:00:00 - 系统启动\n2023-01-01 10:01:00 - 加载配置"}if file_path in mock_files:return f"文件 {file_path} 的内容:\n{mock_files[file_path]}"else:return f"错误: 找不到文件 {file_path}"@tool
def write_file(file_path: str, content: str) -> str:"""写入文件(模拟)。Args:file_path: 文件路径content: 要写入的内容Returns:操作结果"""# 模拟写入操作return f"成功将内容写入文件 {file_path}\n内容长度: {len(content)} 字符"# 数据处理工具
@tool
def analyze_data(data: str) -> str:"""分析数据并提供统计信息。Args:data: 要分析的数据(用逗号分隔的数字)Returns:数据分析结果"""try:numbers = [float(x.strip()) for x in data.split(',')]if not numbers:return "错误: 没有找到有效的数字"count = len(numbers)total = sum(numbers)average = total / countminimum = min(numbers)maximum = max(numbers)return f"""数据分析结果:
数量: {count}
总和: {total}
平均值: {average:.2f}
最小值: {minimum}
最大值: {maximum}"""except Exception as e:return f"数据分析错误: {str(e)}"# 网络请求工具
@tool
def fetch_url(url: str) -> str:"""获取URL内容(模拟)。Args:url: 要获取的URLReturns:URL内容或错误信息"""# 模拟网络请求mock_responses = {"https://api.example.com/status": '{"status": "online", "version": "1.0"}',"https://api.example.com/data": '{"data": [1, 2, 3, 4, 5]}',"https://news.example.com": "今日头条新闻内容"}if url in mock_responses:return f"成功获取 {url}:\n{mock_responses[url]}"else:return f"模拟请求 {url}\n返回: 404 - 页面未找到"# 邮件发送工具
@tool
def send_email(to: str, subject: str, body: str) -> str:"""发送邮件(模拟)。Args:to: 收件人邮箱subject: 邮件主题body: 邮件内容Returns:发送结果"""return f"""邮件发送成功!
收件人: {to}
主题: {subject}
内容长度: {len(body)} 字符
发送时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""# 数据库查询工具
@tool
def query_database(table: str, condition: str = "") -> str:"""查询数据库(模拟)。Args:table: 表名condition: 查询条件(可选)Returns:查询结果"""# 模拟数据库数据mock_data = {"users": [{"id": 1, "name": "张三", "age": 25},{"id": 2, "name": "李四", "age": 30},{"id": 3, "name": "王五", "age": 35}],"orders": [{"id": 101, "user_id": 1, "amount": 100.0},{"id": 102, "user_id": 2, "amount": 200.0}],"products": [{"id": 1, "name": "笔记本电脑", "price": 5000},{"id": 2, "name": "手机", "price": 3000}]}if table in mock_data:data = mock_data[table]result_count = len(data)return f"""查询表 '{table}' 成功:
条件: {condition if condition else '无'}
结果数量: {result_count}
数据: {json.dumps(data, ensure_ascii=False, indent=2)}"""else:return f"错误: 找不到表 '{table}'"# 创建复杂工具列表
advanced_tools = [calculator, get_current_time, search_weather, generate_random_number,read_file, write_file, analyze_data, fetch_url, send_email, query_database
]print("复杂工具系统测试:")
print("\n1. 文件操作:")
print(read_file.invoke({"file_path": "data.txt"}))print("\n2. 数据分析:")
print(analyze_data.invoke({"data": "10, 20, 30, 40, 50"}))print("\n3. 网络请求:")
print(fetch_url.invoke({"url": "https://api.example.com/status"}))print("\n4. 数据库查询:")
print(query_database.invoke({"table": "users", "condition": "age > 25"}))
4. Agent执行器构建
构建一个完整的Agent执行器,能够智能地选择和执行工具
# 增强的Agent状态
class EnhancedAgentState(TypedDict):messages: Annotated[list, "消息历史"]current_task: strtool_results: Dict[str, Any]execution_step: intmax_steps: interror_count: intcontext: Dict[str, Any]# 智能LLM节点(更复杂的决策逻辑)
def intelligent_llm_node(state: EnhancedAgentState) -> EnhancedAgentState:"""智能LLM节点,能够根据上下文做出更好的决策"""messages = state.get("messages", [])execution_step = state.get("execution_step", 0)max_steps = state.get("max_steps", 10)tool_results = state.get("tool_results", {})context = state.get("context", {})# 检查是否超过最大步数if execution_step >= max_steps:ai_message = AIMessage(content=f"已达到最大执行步数({max_steps}),停止执行。")return {**state,"messages": messages + [ai_message]}last_message = messages[-1] if messages else Noneif isinstance(last_message, HumanMessage):content = last_message.content.lower()task_type = analyze_task_type(content)# 根据任务类型制定执行计划ai_message, new_context = plan_execution(content, task_type, context)elif isinstance(last_message, ToolMessage):# 处理工具执行结果ai_message = process_tool_result(last_message, messages, tool_results, context)new_context = contextelse:ai_message = AIMessage(content="请告诉我您需要什么帮助。")new_context = contextreturn {**state,"messages": messages + [ai_message],"execution_step": execution_step + 1,"context": new_context}def analyze_task_type(content: str) -> str:"""分析任务类型"""if any(word in content for word in ["计算", "算", "数学"]):return "calculation"elif any(word in content for word in ["文件", "读取", "写入", "保存"]):return "file_operation"elif any(word in content for word in ["数据", "分析", "统计"]):return "data_analysis"elif any(word in content for word in ["邮件", "发送", "通知"]):return "communication"elif any(word in content for word in ["查询", "搜索", "数据库"]):return "query"elif any(word in content for word in ["天气", "时间", "信息"]):return "information"else:return "general"def plan_execution(content: str, task_type: str, context: Dict) -> tuple:"""制定执行计划"""if task_type == "calculation":# 提取表达式expression = extract_math_expression(content)ai_message = AIMessage(content=f"我来计算表达式: {expression}",tool_calls=[{"name": "calculator","args": {"expression": expression},"id": f"calc_{random.randint(1000, 9999)}"}])new_context = {**context, "task_type": "calculation", "expression": expression}elif task_type == "file_operation":if "读取" in content or "read" in content:file_path = extract_file_path(content)ai_message = AIMessage(content=f"我来读取文件: {file_path}",tool_calls=[{"name": "read_file","args": {"file_path": file_path},"id": f"read_{random.randint(1000, 9999)}"}])else:# 写入文件的例子ai_message = AIMessage(content="我来创建一个示例文件",tool_calls=[{"name": "write_file","args": {"file_path": "example.txt","content": "这是一个示例文件内容"},"id": f"write_{random.randint(1000, 9999)}"}])new_context = {**context, "task_type": "file_operation"}elif task_type == "data_analysis":# 提取数据或使用示例数据data = extract_data(content) or "1,2,3,4,5,6,7,8,9,10"ai_message = AIMessage(content=f"我来分析数据: {data}",tool_calls=[{"name": "analyze_data","args": {"data": data},"id": f"analyze_{random.randint(1000, 9999)}"}])new_context = {**context, "task_type": "data_analysis", "data": data}elif task_type == "query":table = extract_table_name(content)ai_message = AIMessage(content=f"我来查询数据库表: {table}",tool_calls=[{"name": "query_database","args": {"table": table},"id": f"query_{random.randint(1000, 9999)}"}])new_context = {**context, "task_type": "query", "table": table}elif task_type == "information":if "时间" in content:ai_message = AIMessage(content="我来获取当前时间",tool_calls=[{"name": "get_current_time","args": {},"id": f"time_{random.randint(1000, 9999)}"}])else: # 天气查询city = extract_city(content)ai_message = AIMessage(content=f"我来查询{city}的天气",tool_calls=[{"name": "search_weather","args": {"city": city},"id": f"weather_{random.randint(1000, 9999)}"}])new_context = {**context, "task_type": "information"}else:ai_message = AIMessage(content="请更具体地描述您需要什么帮助。我可以帮您:计算、文件操作、数据分析、查询数据库、获取信息等。")new_context = contextreturn ai_message, new_context# 辅助函数
def extract_math_expression(content: str) -> str:"""从文本中提取数学表达式(简化版本)"""# 简单的表达式提取逻辑import re# 寻找数学表达式模式patterns = [r'(\d+(?:\.\d+)?\s*[+\-*/]\s*\d+(?:\.\d+)?(?:\s*[+\-*/]\s*\d+(?:\.\d+)?)*)',r'(\d+(?:\.\d+)?)\s*[+加]\s*(\d+(?:\.\d+)?)',r'(\d+(?:\.\d+)?)\s*[\-减]\s*(\d+(?:\.\d+)?)',r'(\d+(?:\.\d+)?)\s*[*×乘]\s*(\d+(?:\.\d+)?)',r'(\d+(?:\.\d+)?)\s*[/÷除]\s*(\d+(?:\.\d+)?)']for pattern in patterns:match = re.search(pattern, content)if match:if len(match.groups()) == 1:return match.group(1)else:# 处理中文运算符if '加' in content:return f"{match.group(1)} + {match.group(2)}"elif '减' in content:return f"{match.group(1)} - {match.group(2)}"elif any(x in content for x in ['乘', '×']):return f"{match.group(1)} * {match.group(2)}"elif any(x in content for x in ['除', '÷']):return f"{match.group(1)} / {match.group(2)}"return "2 + 2" # 默认表达式def extract_file_path(content: str) -> str:"""提取文件路径"""if "data.txt" in content:return "data.txt"elif "config" in content:return "config.json"elif "log" in content:return "log.txt"return "data.txt"def extract_data(content: str) -> str:"""提取数据"""import re# 寻找逗号分隔的数字match = re.search(r'([0-9, ]+)', content)if match:return match.group(1).strip()return Nonedef extract_table_name(content: str) -> str:"""提取表名"""if "用户" in content or "user" in content:return "users"elif "订单" in content or "order" in content:return "orders"elif "产品" in content or "product" in content:return "products"return "users"def extract_city(content: str) -> str:"""提取城市名"""cities = ["北京", "上海", "深圳", "广州", "杭州", "成都"]for city in cities:if city in content:return cityreturn "北京"def process_tool_result(tool_message: ToolMessage, messages: list, tool_results: Dict, context: Dict) -> AIMessage:"""处理工具执行结果"""result = tool_message.contenttask_type = context.get("task_type", "general")# 根据任务类型提供不同的响应if task_type == "calculation":return AIMessage(content=f"计算完成!{result}")elif task_type == "file_operation":return AIMessage(content=f"文件操作完成:{result}")elif task_type == "data_analysis":return AIMessage(content=f"数据分析结果:\n{result}")elif task_type == "query":return AIMessage(content=f"数据库查询结果:\n{result}")else:return AIMessage(content=f"操作完成:{result}")# 错误处理节点
def error_handler(state: EnhancedAgentState) -> EnhancedAgentState:"""处理执行错误"""messages = state.get("messages", [])error_count = state.get("error_count", 0) + 1if error_count >= 3:ai_message = AIMessage(content="抱歉,连续出现多次错误,请稍后重试或联系管理员。")else:ai_message = AIMessage(content=f"执行过程中出现错误,正在重试... (第{error_count}次)")return {**state,"messages": messages + [ai_message],"error_count": error_count}# 构建增强的Agent执行器
def enhanced_should_continue(state: EnhancedAgentState) -> Literal["tools", "error", "end"]:messages = state.get("messages", [])error_count = state.get("error_count", 0)last_message = messages[-1] if messages else None# 检查错误次数if error_count >= 3:return "error"# 检查是否需要调用工具if isinstance(last_message, AIMessage) and last_message.tool_calls:return "tools"return "end"# 创建增强的工具节点
enhanced_tool_node = ToolNode(advanced_tools)# 构建增强的Agent图
enhanced_agent_workflow = StateGraph(EnhancedAgentState)# 添加节点
enhanced_agent_workflow.add_node("llm", intelligent_llm_node)
enhanced_agent_workflow.add_node("tools", enhanced_tool_node)
enhanced_agent_workflow.add_node("error_handler", error_handler)# 设置入口
enhanced_agent_workflow.set_entry_point("llm")# 添加条件边
enhanced_agent_workflow.add_conditional_edges("llm",enhanced_should_continue,{"tools": "tools","error": "error_handler","end": END}
)# 工具执行后回到LLM
enhanced_agent_workflow.add_edge("tools", "llm")
enhanced_agent_workflow.add_edge("error_handler", END)# 编译增强Agent
enhanced_agent_app = enhanced_agent_workflow.compile()# 测试增强Agent
complex_queries = ["请计算15乘以23加上67","帮我读取data.txt文件的内容","分析这些数据:10, 15, 20, 25, 30, 35, 40","查询用户表的信息","现在几点了?","上海今天天气如何?"
]print("\n增强Agent执行器测试:")
for query in complex_queries:print(f"\n========== 查询: {query} ==========")initial_state = {"messages": [HumanMessage(content=query)],"current_task": query,"tool_results": {},"execution_step": 0,"max_steps": 5,"error_count": 0,"context": {}}result = enhanced_agent_app.invoke(initial_state)# 显示执行过程for i, msg in enumerate(result["messages"]):if isinstance(msg, HumanMessage):print(f"👤 用户: {msg.content}")elif isinstance(msg, AIMessage):print(f"🤖 AI: {msg.content}")if msg.tool_calls:for tool_call in msg.tool_calls:print(f" 🔧 调用工具: {tool_call['name']}({tool_call['args']})")elif isinstance(msg, ToolMessage):print(f" ⚙️ 工具结果: {msg.content[:100]}..." if len(msg.content) > 100 else f" ⚙️ 工具结果: {msg.content}")print(f"执行步数: {result['execution_step']}, 错误次数: {result['error_count']}")
5. 错误处理和异常恢复
实现健壮的错误处理机制
# 带有错误处理的工具
@tool
def unreliable_service(task: str, fail_rate: float = 0.3) -> str:"""模拟不可靠的服务调用。Args:task: 要执行的任务描述fail_rate: 失败率(0.0-1.0)Returns:任务结果或错误信息"""if random.random() < fail_rate:error_types = ["网络超时","服务不可用", "权限不足","请求格式错误","系统维护中"]error = random.choice(error_types)raise Exception(f"服务调用失败: {error}")return f"任务 '{task}' 执行成功!结果:模拟数据处理完成"@tool
def validate_input(input_data: str, data_type: str = "string") -> str:"""验证输入数据。Args:input_data: 要验证的数据data_type: 期望的数据类型 (string, number, email, url)Returns:验证结果"""import reif data_type == "number":try:float(input_data)return f"✅ 数字验证通过: {input_data}"except ValueError:raise ValueError(f"❌ 无效的数字格式: {input_data}")elif data_type == "email":pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'if re.match(pattern, input_data):return f"✅ 邮箱格式验证通过: {input_data}"else:raise ValueError(f"❌ 无效的邮箱格式: {input_data}")elif data_type == "url":pattern = r'^https?://[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'if re.match(pattern, input_data):return f"✅ URL格式验证通过: {input_data}"else:raise ValueError(f"❌ 无效的URL格式: {input_data}")return f"✅ 字符串验证通过: {input_data}"# 错误恢复状态
class ErrorRecoveryState(TypedDict):messages: Annotated[list, "消息历史"]current_task: strretry_count: Dict[str, int]max_retries: interror_history: listfallback_strategies: Dict[str, str]execution_status: str # running, error, recovered, failed# 错误恢复LLM节点
def error_recovery_llm(state: ErrorRecoveryState) -> ErrorRecoveryState:"""带有错误恢复能力的LLM节点"""messages = state.get("messages", [])retry_count = state.get("retry_count", {})max_retries = state.get("max_retries", 3)error_history = state.get("error_history", [])last_message = messages[-1] if messages else None#新加的赋值,不然报错new_retry_count = retry_count.copy() # 先赋初始值new_error_history = error_history.copy()new_status = "running"if isinstance(last_message, HumanMessage):content = last_message.content.lower()if "验证" in content:# 输入验证任务if "邮箱" in content:test_email = "invalid-email" # 故意使用无效邮箱来触发错误ai_message = AIMessage(content=f"我来验证邮箱格式: {test_email}",tool_calls=[{"name": "validate_input","args": {"input_data": test_email, "data_type": "email"},"id": f"validate_{random.randint(1000, 9999)}"}])else:ai_message = AIMessage(content="我来验证输入数据",tool_calls=[{"name": "validate_input","args": {"input_data": "test123", "data_type": "number"},"id": f"validate_{random.randint(1000, 9999)}"}])elif "服务" in content or "调用" in content:# 不可靠服务调用ai_message = AIMessage(content="我来调用服务",tool_calls=[{"name": "unreliable_service","args": {"task": "数据处理", "fail_rate": 0.6},"id": f"service_{random.randint(1000, 9999)}"}])else:ai_message = AIMessage(content="请告诉我您需要验证什么数据或调用什么服务。")elif isinstance(last_message, ToolMessage):# 检查工具调用是否成功if "错误" in last_message.content or "失败" in last_message.content:# 处理错误tool_name = last_message.name or "unknown_tool"current_retries = retry_count.get(tool_name, 0)if current_retries < max_retries:# 重试ai_message = AIMessage(content=f"工具调用失败,正在重试... (第{current_retries + 1}次)")new_retry_count = {**retry_count, tool_name: current_retries + 1}new_error_history = error_history + [{"tool": tool_name,"error": last_message.content,"timestamp": datetime.now().isoformat(),"retry_count": current_retries + 1}]new_status = "error"else:# 达到最大重试次数,使用备用策略ai_message = AIMessage(content=f"工具 {tool_name} 多次调用失败,使用备用策略。")new_retry_count = retry_countnew_error_history = error_history + [{"tool": tool_name,"error": "达到最大重试次数","timestamp": datetime.now().isoformat(),"retry_count": current_retries}]new_status = "failed"else:# 成功执行ai_message = AIMessage(content=f"操作成功完成:{last_message.content}")new_retry_count = retry_countnew_error_history = error_historynew_status = "recovered" if error_history else "running"else:ai_message = AIMessage(content="请告诉我您需要什么帮助。")new_retry_count = retry_countnew_error_history = error_historynew_status = "running"return {**state,"messages": messages + [ai_message],"retry_count": new_retry_count,"error_history": new_error_history,"execution_status": new_status}# 带错误处理的工具节点
class ErrorHandlingToolNode:def __init__(self, tools):self.tools = {tool.name: tool for tool in tools}def __call__(self, state: ErrorRecoveryState) -> ErrorRecoveryState:messages = state.get("messages", [])last_message = messages[-1] if messages else Noneif not isinstance(last_message, AIMessage) or not last_message.tool_calls:return statenew_messages = messages.copy()for tool_call in last_message.tool_calls:tool_name = tool_call["name"]tool_args = tool_call["args"]call_id = tool_call["id"]if tool_name in self.tools:try:# 执行工具result = self.tools[tool_name].invoke(tool_args)tool_message = ToolMessage(content=result,tool_call_id=call_id,name=tool_name)except Exception as e:# 捕获工具执行错误error_msg = f"工具 {tool_name} 执行失败: {str(e)}"tool_message = ToolMessage(content=error_msg,tool_call_id=call_id,name=tool_name)new_messages.append(tool_message)else:# 未知工具error_msg = f"未知工具: {tool_name}"tool_message = ToolMessage(content=error_msg,tool_call_id=call_id,name=tool_name)new_messages.append(tool_message)return {**state, "messages": new_messages}# 创建错误处理工具
error_tools = [unreliable_service, validate_input, calculator]
error_tool_node = ErrorHandlingToolNode(error_tools)# 决策是否继续
def error_recovery_continue(state: ErrorRecoveryState) -> Literal["tools", "end"]:messages = state.get("messages", [])execution_status = state.get("execution_status", "running")last_message = messages[-1] if messages else None# 如果状态为失败,结束执行if execution_status == "failed":return "end"# 检查是否需要调用工具if isinstance(last_message, AIMessage) and last_message.tool_calls:return "tools"return "end"# 构建错误恢复图
error_recovery_workflow = StateGraph(ErrorRecoveryState)# 添加节点
error_recovery_workflow.add_node("llm", error_recovery_llm)
error_recovery_workflow.add_node("tools", error_tool_node)# 设置入口
error_recovery_workflow.set_entry_point("llm")# 添加条件边
error_recovery_workflow.add_conditional_edges("llm",error_recovery_continue,{"tools": "tools","end": END}
)# 工具执行后回到LLM
error_recovery_workflow.add_edge("tools", "llm")# 编译错误恢复图
error_recovery_app = error_recovery_workflow.compile()# 测试错误处理和恢复
error_test_cases = ["请调用不可靠的服务","请验证邮箱格式","请验证数字格式"
]print("\n错误处理和恢复测试:")
for test_case in error_test_cases:print(f"\n========== 测试: {test_case} ==========")initial_state = {"messages": [HumanMessage(content=test_case)],"current_task": test_case,"retry_count": {},"max_retries": 2,"error_history": [],"fallback_strategies": {},"execution_status": "running"}result = error_recovery_app.invoke(initial_state)# 显示执行结果for msg in result["messages"]:if isinstance(msg, HumanMessage):print(f"👤 用户: {msg.content}")elif isinstance(msg, AIMessage):print(f"🤖 AI: {msg.content}")if msg.tool_calls:for tool_call in msg.tool_calls:print(f" 🔧 调用工具: {tool_call['name']}")elif isinstance(msg, ToolMessage):status_icon = "❌" if ("错误" in msg.content or "失败" in msg.content) else "✅"print(f" {status_icon} 工具结果: {msg.content}")print(f"最终状态: {result['execution_status']}")print(f"重试计数: {result['retry_count']}")if result['error_history']:print(f"错误历史: {len(result['error_history'])} 个错误")for error in result['error_history'][-2:]: # 只显示最后2个错误print(f" - {error['tool']}: {error['error'][:50]}...")
6. 实践案例:智能数据处理Agent
构建一个完整的智能数据处理Agent系统
# 数据处理专用工具
@tool
def load_dataset(source: str, format_type: str = "csv") -> str:"""加载数据集(模拟)。Args:source: 数据源名称或路径format_type: 数据格式 (csv, json, excel)Returns:数据加载结果"""mock_datasets = {"sales_data": {"rows": 1000,"columns": ["date", "product", "sales", "region"],"size": "2.5MB"},"user_data": {"rows": 5000,"columns": ["id", "name", "age", "city", "registration_date"],"size": "1.8MB"},"log_data": {"rows": 10000,"columns": ["timestamp", "level", "message", "source"],"size": "5.2MB"}}if source in mock_datasets:data_info = mock_datasets[source]return f"""数据集 '{source}' 加载成功!
格式: {format_type.upper()}
行数: {data_info['rows']:,}
列数: {len(data_info['columns'])}
列名: {', '.join(data_info['columns'])}
文件大小: {data_info['size']}
状态: 已加载到内存"""else:return f"错误: 找不到数据集 '{source}'"@tool
def clean_data(dataset: str, operations: str) -> str:"""清理数据(模拟)。Args:dataset: 数据集名称operations: 清理操作列表,用逗号分隔Returns:数据清理结果"""operations_list = [op.strip() for op in operations.split(',')]results = []for op in operations_list:if "空值" in op or "null" in op.lower():results.append("删除了125个空值记录")elif "重复" in op or "duplicate" in op.lower():results.append("删除了43个重复记录")elif "格式" in op or "format" in op.lower():results.append("标准化了日期格式")elif "异常" in op or "outlier" in op.lower():results.append("识别并标记了15个异常值")else:results.append(f"执行了操作: {op}")return f"""数据集 '{dataset}' 清理完成:
执行的操作:
{chr(10).join([f"- {result}" for result in results])}
清理后数据质量得分: 94/100"""@tool
def statistical_analysis(dataset: str, analysis_type: str) -> str:"""执行统计分析(模拟)。Args:dataset: 数据集名称analysis_type: 分析类型 (basic, correlation, trend, distribution)Returns:统计分析结果"""if analysis_type == "basic":return f"""基础统计分析结果 - {dataset}:
平均值: 245.67
中位数: 230.00
标准差: 78.23
最小值: 45.00
最大值: 987.00
四分位数范围: [180.00, 320.00]"""elif analysis_type == "correlation":return f"""相关性分析结果 - {dataset}:
强正相关 (r > 0.7):
- sales & marketing_spend: r = 0.85
- age & experience: r = 0.72强负相关 (r < -0.7):
- price & demand: r = -0.78弱相关 (-0.3 < r < 0.3):
- region & satisfaction: r = 0.12"""elif analysis_type == "trend":return f"""趋势分析结果 - {dataset}:
时间序列趋势:
- 整体趋势: 上升 (+12.5%)
- 季节性模式: 检测到季度周期
- 异常点: 发现3个异常时期
- 预测准确度: 87.3%
- 下一周期预测: 预计增长8.2%"""else: # distributionreturn f"""分布分析结果 - {dataset}:
数据分布特征:
- 分布类型: 近似正态分布
- 偏度: 0.23 (轻微右偏)
- 峰度: -0.47 (较平坦)
- 正态性检验: p-value = 0.082
- 异常值比例: 2.1%"""@tool
def generate_visualization(dataset: str, chart_type: str, variables: str) -> str:"""生成可视化图表(模拟)。Args:dataset: 数据集名称chart_type: 图表类型 (bar, line, scatter, heatmap, histogram)variables: 要可视化的变量,用逗号分隔Returns:可视化生成结果"""chart_types = {"bar": "柱状图","line": "折线图","scatter": "散点图","heatmap": "热力图","histogram": "直方图","pie": "饼图"}chart_name = chart_types.get(chart_type, chart_type)variables_list = [v.strip() for v in variables.split(',')]return f"""可视化图表生成成功!
数据集: {dataset}
图表类型: {chart_name}
变量: {', '.join(variables_list)}
图表尺寸: 1200x800 像素
保存路径: ./charts/{dataset}_{chart_type}_chart.png
交互式版本: ./charts/{dataset}_{chart_type}_interactive.html
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"""@tool
def export_report(dataset: str, format_type: str = "pdf") -> str:"""导出分析报告(模拟)。Args:dataset: 数据集名称format_type: 报告格式 (pdf, html, docx)Returns:报告导出结果"""report_sections = ["执行摘要","数据概览","数据质量评估","统计分析结果","可视化图表","主要发现","建议和结论","附录"]return f"""分析报告导出完成!
数据集: {dataset}
报告格式: {format_type.upper()}
包含章节: {len(report_sections)} 个
页数: 25 页
文件大小: 3.2 MB
保存路径: ./reports/{dataset}_analysis_report.{format_type}
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}报告章节:
{chr(10).join([f"{i+1}. {section}" for i, section in enumerate(report_sections)])}"""# 数据处理Agent状态
class DataProcessingAgentState(TypedDict):messages: Annotated[list, "消息历史"]current_dataset: strprocessing_pipeline: listcompleted_steps: listanalysis_results: Dict[str, Any]workflow_status: strnext_action: str# 数据处理规划器
def data_processing_planner(state: DataProcessingAgentState) -> DataProcessingAgentState:"""数据处理任务规划器"""messages = state.get("messages", [])last_message = messages[-1] if messages else Noneif isinstance(last_message, HumanMessage):content = last_message.content.lower()# 分析用户意图并制定处理计划if "分析" in content and "数据" in content:# 确定数据集dataset = determine_dataset(content)# 制定处理流水线pipeline = create_processing_pipeline(content)ai_message = AIMessage(content=f"我将为您分析 {dataset} 数据集。处理流程包括:{', '.join(pipeline)}",tool_calls=[{"name": "load_dataset","args": {"source": dataset, "format_type": "csv"},"id": f"load_{random.randint(1000, 9999)}"}])new_state = {**state,"messages": messages + [ai_message],"current_dataset": dataset,"processing_pipeline": pipeline,"completed_steps": [],"workflow_status": "data_loading","next_action": "load_data"}else:ai_message = AIMessage(content="请告诉我您想要分析什么数据。我可以帮您分析销售数据、用户数据或日志数据。")new_state = {**state, "messages": messages + [ai_message]}elif isinstance(last_message, ToolMessage):# 根据当前工作流状态决定下一步new_state = process_workflow_step(state, last_message)else:ai_message = AIMessage(content="请告诉我您需要什么数据分析帮助。")new_state = {**state, "messages": messages + [ai_message]}return new_statedef determine_dataset(content: str) -> str:"""确定要使用的数据集"""if "销售" in content or "sale" in content:return "sales_data"elif "用户" in content or "user" in content:return "user_data"elif "日志" in content or "log" in content:return "log_data"else:return "sales_data" # 默认数据集def create_processing_pipeline(content: str) -> list:"""创建处理流水线"""pipeline = ["加载数据"]if "清理" in content or "clean" in content:pipeline.append("数据清理")if any(word in content for word in ["统计", "分析", "statistical"]):pipeline.append("统计分析")if any(word in content for word in ["可视化", "图表", "visualization", "chart"]):pipeline.append("生成图表")if "报告" in content or "report" in content:pipeline.append("导出报告")# 如果没有指定具体操作,添加默认流程if len(pipeline) == 1:pipeline.extend(["数据清理", "统计分析", "生成图表"])return pipelinedef process_workflow_step(state: DataProcessingAgentState, tool_result: ToolMessage) -> DataProcessingAgentState:"""处理工作流步骤"""messages = state.get("messages", [])current_dataset = state.get("current_dataset", "")pipeline = state.get("processing_pipeline", [])completed_steps = state.get("completed_steps", [])workflow_status = state.get("workflow_status", "")# 记录完成的步骤if workflow_status not in completed_steps:completed_steps = completed_steps + [workflow_status]# 确定下一步next_step_map = {"data_loading": "data_cleaning","data_cleaning": "statistical_analysis", "statistical_analysis": "visualization","visualization": "report_generation","report_generation": "completed"}next_status = next_step_map.get(workflow_status, "completed")if next_status == "data_cleaning" and "数据清理" in pipeline:ai_message = AIMessage(content="数据加载完成,现在开始清理数据",tool_calls=[{"name": "clean_data","args": {"dataset": current_dataset,"operations": "删除空值, 删除重复项, 格式标准化"},"id": f"clean_{random.randint(1000, 9999)}"}])elif next_status == "statistical_analysis" and "统计分析" in pipeline:ai_message = AIMessage(content="数据清理完成,开始统计分析",tool_calls=[{"name": "statistical_analysis","args": {"dataset": current_dataset, "analysis_type": "basic"},"id": f"stats_{random.randint(1000, 9999)}"}])elif next_status == "visualization" and "生成图表" in pipeline:ai_message = AIMessage(content="统计分析完成,生成可视化图表",tool_calls=[{"name": "generate_visualization","args": {"dataset": current_dataset,"chart_type": "bar","variables": "sales, region"},"id": f"viz_{random.randint(1000, 9999)}"}])elif next_status == "report_generation" and "导出报告" in pipeline:ai_message = AIMessage(content="图表生成完成,导出分析报告",tool_calls=[{"name": "export_report","args": {"dataset": current_dataset, "format_type": "pdf"},"id": f"report_{random.randint(1000, 9999)}"}])else:# 工作流完成progress = f"完成步骤: {len(completed_steps)}/{len(pipeline)}"ai_message = AIMessage(content=f"数据分析工作流已完成!\n{progress}\n所有分析结果已保存。")next_status = "completed"return {**state,"messages": messages + [ai_message],"completed_steps": completed_steps,"workflow_status": next_status}# 创建数据处理工具节点
data_tools = [load_dataset, clean_data, statistical_analysis, generate_visualization, export_report]
data_tool_node = ToolNode(data_tools)# 决策是否继续
def data_processing_continue(state: DataProcessingAgentState) -> Literal["tools", "end"]:messages = state.get("messages", [])workflow_status = state.get("workflow_status", "")last_message = messages[-1] if messages else Noneif workflow_status == "completed":return "end"if isinstance(last_message, AIMessage) and last_message.tool_calls:return "tools"return "end"# 构建数据处理Agent
data_processing_workflow = StateGraph(DataProcessingAgentState)# 添加节点
data_processing_workflow.add_node("planner", data_processing_planner)
data_processing_workflow.add_node("tools", data_tool_node)# 设置入口
data_processing_workflow.set_entry_point("planner")# 添加条件边
data_processing_workflow.add_conditional_edges("planner",data_processing_continue,{"tools": "tools","end": END}
)# 工具执行后回到规划器
data_processing_workflow.add_edge("tools", "planner")# 编译数据处理Agent
data_processing_app = data_processing_workflow.compile()# 测试数据处理Agent
data_queries = ["请分析销售数据,包括数据清理、统计分析和可视化","帮我处理用户数据,生成分析报告","分析日志数据的趋势和异常"
]print("\n智能数据处理Agent测试:")
for query in data_queries:print(f"\n{'='*60}")print(f"📊 数据分析请求: {query}")print('='*60)initial_state = {"messages": [HumanMessage(content=query)],"current_dataset": "","processing_pipeline": [],"completed_steps": [],"analysis_results": {},"workflow_status": "planning","next_action": ""}result = data_processing_app.invoke(initial_state)# 显示处理过程step_counter = 1for msg in result["messages"]:if isinstance(msg, HumanMessage):print(f"👤 用户请求: {msg.content}")elif isinstance(msg, AIMessage):print(f"\n🤖 步骤 {step_counter}: {msg.content}")if msg.tool_calls:for tool_call in msg.tool_calls:print(f" 🔧 执行: {tool_call['name']}({', '.join([f'{k}={v}' for k, v in tool_call['args'].items()])})") step_counter += 1elif isinstance(msg, ToolMessage):print(f" ✅ 结果: {msg.content[:150]}..." if len(msg.content) > 150 else f" ✅ 结果: {msg.content}")print(f"\n📈 工作流状态: {result.get('workflow_status', 'N/A')}")print(f"📋 处理流程: {' → '.join(result.get('processing_pipeline', []))}")print(f"✔️ 已完成步骤: {len(result.get('completed_steps', []))}/{len(result.get('processing_pipeline', []))}")if result.get('workflow_status') == 'completed':print("🎉 数据分析工作流全部完成!")