LangChain02-Agent与Memory模块
Agent与Memory模块深度解析
1. Agent模块原理
1.1 ReAct框架的实现机制
Agent是LangChain中最具智能化的组件,其核心思想基于 ReAct框架(Reasoning + Acting),即通过 思维(Thought) 和 行动(Action) 的协同实现自主决策。ReAct框架的核心流程如下:
- 观察(Observation):接收用户输入或环境反馈。
- 推理(Reasoning):通过LLM生成决策逻辑(如调用哪个工具)。
- 行动(Action):执行工具调用或直接生成回答。
- 结果(Result):将结果反馈给LLM,进入下一轮循环。
示例:天气查询Agent
from langchain.agents import Tool, AgentExecutor, ReActAgent tools = [ Tool( name="Weather API", func=lambda location: get_weather_data(location), description="查询指定地区的实时天气" )
] agent = ReActAgent(tools=tools)
executor = AgentExecutor(agent=agent, tools=tools) result = executor.run("北京今天的天气如何?")
关键点:
- 状态机设计:Agent通过状态转换(如从“提问”到“调用API”)实现复杂流程。
- 上下文感知:每次调用工具时,LLM会结合历史对话生成更精准的指令。
1.2 工具集成与扩展
自定义工具开发指南
LangChain支持通过Tool
类封装任意外部功能。例如,开发一个 文件读取工具:
import os class FileReaderTool(Tool): def __init__(self): super().__init__( name="File Reader", func=self.read_file, description="读取指定路径的文本文件内容" ) def read_file(self, file_path): with open(file_path, "r") as f: return f.read()
工具调用的异常处理
- 参数校验:确保输入符合预期(如文件路径是否存在)。
- 错误反馈:捕获异常并返回用户友好的提示:
try: result = tool.run(input)
except Exception as e: return f"调用工具时出错:{str(e)}"
代码示例:天气查询Agent的实现
import requests def get_weather_data(location): url = f"https://api.weatherapi.com/v1/current.json?key=YOUR_API_KEY&q={location}" response = requests.get(url) return response.json()
2. Memory模块实战
2.1 会话历史存储方案
内存存储 vs 持久化存储
类型 | 优点 | 缺点 |
---|---|---|
内存存储 | 快速、无需依赖外部服务 | 会话结束后数据丢失 |
Redis存储 | 持久化、支持高并发 | 需要维护Redis服务 |
数据库存储 | 支持复杂查询、安全性高 | 实现复杂、性能较低 |
代码示例:基于Neo4j的会话图谱构建
from langchain.memory import Neo4jGraphMemory
from neo4j import GraphDatabase driver = GraphDatabase.driver("neo4j://localhost:7687", auth=("neo4j", "password"))
memory = Neo4jGraphMemory(driver=driver, session_id="user123") # 存储会话历史
memory.save_context({"input": "你好"}, {"output": "您好!"}) # 查询会话历史
history = memory.load_memory_variables({})
print(history) # 输出:{"history": "Human: 你好\nAI: 您好!"}
2.2 上下文增强技术
历史对话的摘要与精简
-
摘要策略:
- 使用LLM生成对话摘要(如“用户多次询问退货政策”)。
- 通过正则表达式提取关键信息(如订单号、时间戳)。
-
代码示例:动态上下文截取算法
def truncate_context(context, max_tokens=2000): tokens = context.split() if len(tokens) > max_tokens: return " ".join(tokens[:max_tokens]) + "[...]" return context
3. 复杂场景应用
3.1 多Agent协作系统
任务分解与路由机制
多Agent系统通过 任务分解 和 路由决策 实现复杂业务逻辑。例如,一个电商客服系统可能包含以下Agent:
- 意图识别Agent:判断用户问题是退货还是售后。
- 政策查询Agent:调用Qdrant检索相关政策。
- 答案生成Agent:调用LLM生成自然语言回答。
代码示例:客服系统的多Agent路由方案
from langchain.agents import AgentExecutor, ReActAgent
from langchain.tools import Tool # 定义工具
tools = [ Tool(name="Return Policy Lookup", func=query_policy, description="查询退货政策"), Tool(name="After-sales Service", func=contact_support, description="转接人工客服")
] # 定义Agent
intent_recognition_agent = ReActAgent(tools=tools)
policy_lookup_agent = ReActAgent(tools=tools) # 路由逻辑
def route_query(query): if "退货" in query: return policy_lookup_agent else: return intent_recognition_agent
3.2 知识检索增强生成(RAG)
向量数据库与LLM的联合调用
RAG(Retrieval-Augmented Generation)通过以下步骤实现:
- 检索:从向量数据库(如Qdrant)检索相关文档。
- 生成:将检索结果与用户查询输入LLM,生成最终答案。
代码示例:基于Qdrant的RAG系统
from langchain.vectorstores import Qdrant
from langchain.embeddings import TongYiEmbeddings # 构建向量数据库
db = Qdrant.from_documents(docs, embeddings, url="http://localhost:6333") # 检索相关文档
query = "API调用频率限制是多少?"
results = db.similarity_search(query, k=3) # 生成答案
prompt = PromptTemplate(template="根据以下内容回答:{context}\n问题:{query}")
chain = LLMChain(llm=llm, prompt=prompt)
answer = chain.run({"context": results, "query": query})
4. 性能优化与调试
4.1 执行效率提升策略
并行调用与异步处理
- 并行调用:使用
ThreadPoolExecutor
并发执行多个工具调用。
from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor() as executor: futures = [executor.submit(tool.run, input) for tool in tools] results = [future.result() for future in futures]
- 异步处理:通过
asyncio
实现非阻塞调用:
import asyncio async def async_call_tool(tool, input): return await tool.arun(input) async def main(): tasks = [async_call_tool(tool, input) for tool in tools] results = await asyncio.gather(*tasks)
缓存机制的设计
- FIFO缓存:限制缓存大小,优先淘汰最久未使用的数据。
from collections import deque class FIFOCache: def __init__(self, max_size=100): self.cache = {} self.queue = deque() self.max_size = max_size def get(self, key): if key in self.cache: return self.cache[key] return None def set(self, key, value): if len(self.cache) >= self.max_size: oldest = self.queue.popleft() del self.cache[oldest] self.cache[key] = value self.queue.append(key)
4.2 调试工具与监控
日志记录与追踪
- 日志级别:通过
logging
模块记录不同级别的日志(DEBUG/INFO/WARNING/ERROR)。
import logging logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__) logger.debug("调试信息")
logger.info("常规信息")
- 追踪工具:使用
OpenTelemetry
实现分布式追踪:
from opentelemetry import trace tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("Agent Execution"): result = agent.run(input)
性能指标的可视化监控
- Prometheus + Grafana:监控Agent的响应时间、调用次数等指标。
from prometheus_client import start_http_server, Counter REQUESTS = Counter('agent_requests_total', 'Total number of agent requests') start_http_server(8000) def run_agent(input): REQUESTS.inc() return agent.run(input)
5. 总结与展望
5.1 安全性考量
敏感信息保护策略
- 加密存储:对会话历史、用户偏好等敏感数据加密。
- 权限控制:通过RBAC(基于角色的访问控制)限制工具调用权限。
工具调用的权限控制
- 白名单机制:仅允许特定工具被调用。
- 审计日志:记录所有工具调用行为,便于追溯。
5.2 未来发展方向
多模态Agent的探索
- 视觉-语言融合:结合图像识别与自然语言处理能力。
- 语音交互:支持语音输入输出,提升用户体验。
与区块链技术的结合
- 数据存证:利用区块链的不可篡改性存储关键会话记录。
- 智能合约:通过智能合约自动化执行复杂业务逻辑。
参考资料:
- LangChain官方文档
- Qdrant向量数据库
- TongYiEmbeddings使用指南
版权声明:本文为CSDN博客原创内容,转载请注明出处。