【LLM LangChain】AgentExecutor 创建带工具的Agent+加入BufferMemory+支持多用户记忆 demos
文章目录
- 使用工具
- 加入记忆
- BufferMemory关键方法 / 接口
- 简单BufferMemory实现代码
- 支持多用户 session 记忆
- 用法示例:
- 清除历史
使用工具
import { NextRequest, NextResponse } from "next/server";
import { StreamingTextResponse } from "ai";// LangChain
import { ChatOpenAI } from "@langchain/openai";
import { DynamicTool } from "@langchain/core/tools";
import { initializeAgentExecutorWithOptions } from "langchain/agents";export const runtime = "edge";// 定义一个示例工具(加法器)
const addTool = new DynamicTool({name: "add_numbers",description: "Add two numbers together. 输入格式: 'a,b'",func: async (input: string) => {const [a, b] = input.split(",").map(Number);return (a + b).toString();},
});export async function POST(req: NextRequest) {try {const body = await req.json();const messages = body.messages ?? [];const currentMessageContent = messages[messages.length - 1].content;// 初始化模型const model = new ChatOpenAI({model: "qwen-plus",apiKey: process.env.MODEL_API_KEY,configuration: {baseURL: "https://dashscope.aliyuncs.com/compatible-mode/v1",},});// 初始化 AgentExecutorconst executor = await initializeAgentExecutorWithOptions([addTool], // 工具列表model,{agentType: "openai-functions", // 使用函数调用式 agentverbose: true,});// 注意:AgentExecutor 本身不返回异步 stream// 需要手动封装流式输出// const stream = await executor.stream({// input: currentMessageContent,// }); // executor.stream() → 迭代器,不是字符串流// const stream = await executor.stream({// input: currentMessageContent,// chat_history: [], // 而不是 "" 或 undefined // 必须补上// });// 把 agent 的迭代结果转成 string 流async function* streamText() {for await (const step of await executor.stream({input: currentMessageContent,chat_history: [], // 必须是数组})) {if (step.output) {yield step.output; // 只推送最终的文本}}}return new StreamingTextResponse(streamText());// return new StreamingTextResponse(stream);} catch (e: any) {return NextResponse.json({ error: e.message },{ status: e.status ?? 500 });}
}
加入记忆
- BufferMemory 是 LangChain 中最基础的记忆类型,它会把对话的完整历史记录按顺序保存在内存中,并在每次调用模型时作为上下文传入。它的特点是实现简单、语义完整,非常适合短对话场景或 Demo。但因为历史会不断累积,如果对话太长,可能会导致上下文超出模型的 token 限制。因此,BufferMemory 更适合用在 短会话应用,或者作为入门理解 LangChain 记忆机制的基础组件。
BufferMemory关键方法 / 接口
方法 / 属性 | 类型 | 作用 | 典型用法 |
---|---|---|---|
memoryKey | string | 存储对话历史的 key 名(默认 "history" ,常配合 MessagesPlaceholder ) | new BufferMemory({ memoryKey: "chat_history" }) |
inputKey | string | 指定用户输入在上下文对象中的 key | new BufferMemory({ inputKey: "input" }) |
outputKey | string | 指定模型输出在上下文对象中的 key | new BufferMemory({ outputKey: "output" }) |
returnMessages | boolean | 是否以 BaseMessage[] 的形式保存历史,而不是字符串拼接 | returnMessages: true |
chatHistory | ChatMessageHistory | 内部维护的消息记录对象(有 addUserMessage / addAIMessage / clear 等方法) | memory.chatHistory.clear() |
loadMemoryVariables(inputs) | Promise<Record<string, any>> | 读取 memory 中的历史记录,返回 { [memoryKey]: ... } | await memory.loadMemoryVariables({}) |
saveContext(inputs, outputs) | Promise<void> | 把一次对话的输入输出存进 memory | await memory.saveContext({ input: "你好" }, { output: "你好!" }) |
clear() | Promise<void> | 清空整个 memory 的历史 | await memory.clear() |
简单BufferMemory实现代码
// chat.tsx
import { NextRequest, NextResponse } from "next/server";
import { StreamingTextResponse } from "ai";// LangChain
import { ChatOpenAI } from "@langchain/openai";
import { DynamicTool } from "@langchain/core/tools";
import { initializeAgentExecutorWithOptions } from "langchain/agents";
import { BufferMemory } from "langchain/memory";export const runtime = "edge";// 示例工具:加法器
const addTool = new DynamicTool({name: "add_numbers",description: "Add two numbers together. 输入格式: 'a,b'",func: async (input: string) => {const [a, b] = input.split(",").map(Number);return (a + b).toString();},
});// 创建一个全局 memory(可以改成 RedisMemory、UpstashMemory,支持多用户)
// const memory = new BufferMemory({
// memoryKey: "chat_history", // 这里要和 MessagesPlaceholder 对应
// returnMessages: true, // 让它存 BaseMessage,而不是拼成字符串
// });
const memory = new BufferMemory({memoryKey: "chat_history",inputKey: "input", // 指定输入的 keyoutputKey: "output", // 指定输出的 keyreturnMessages: true,
});export async function POST(req: NextRequest) {try {const body = await req.json();const messages = body.messages ?? [];const currentMessageContent = messages[messages.length - 1].content;// 初始化模型const model = new ChatOpenAI({model: "qwen-plus",apiKey: process.env.MODEL_API_KEY,configuration: {baseURL: "https://dashscope.aliyuncs.com/compatible-mode/v1",},});// 初始化 AgentExecutor,并挂载 Memoryconst executor = await initializeAgentExecutorWithOptions([addTool],model,{agentType: "openai-functions",verbose: true,memory, // 注入记忆, 这样每次调用 agent 都会带上历史对话});// 把 agent 的迭代结果转成 string 流async function* streamText() {console.log("Agent 处理用户输入:", currentMessageContent);// 假如你是新闻分析师for await (const step of await executor.stream({input: currentMessageContent,chat_history: await memory.loadMemoryVariables({}).then(v => v.chat_history),})) {if (step.output) {yield step.output;}}}return new StreamingTextResponse(streamText());} catch (e: any) {return NextResponse.json({ error: e.message },{ status: e.status ?? 500 });}
}
支持多用户 session 记忆
- 你现在的
BufferMemory
是全局的,所有用户都会共享一份chat_history
。 - 我们要在请求里传一个
sessionId
,然后为每个 session 维护独立的 memory。
- 下面是完整代码:
// chat.tsx
import { NextRequest, NextResponse } from "next/server";
import { StreamingTextResponse } from "ai";// LangChain
import { ChatOpenAI } from "@langchain/openai";
import { DynamicTool } from "@langchain/core/tools";
import { initializeAgentExecutorWithOptions } from "langchain/agents";
import { BufferMemory } from "langchain/memory";export const runtime = "edge";// 示例工具:加法器
const addTool = new DynamicTool({name: "add_numbers",description: "Add two numbers together. 输入格式: 'a,b'",func: async (input: string) => {const [a, b] = input.split(",").map(Number);return (a + b).toString();},
});// 内存缓存(每个 session 独立)
const memoryStore = new Map<string, BufferMemory>();// 工厂函数:获取或创建某个 session 的 memory
function getMemory(sessionId: string) {if (!memoryStore.has(sessionId)) {memoryStore.set(sessionId,new BufferMemory({memoryKey: "chat_history",inputKey: "input",outputKey: "output",returnMessages: true,}));}return memoryStore.get(sessionId)!;
}export async function POST(req: NextRequest) {try {const body = await req.json();const { messages = [], sessionId = "default" } = body;if (!sessionId) {throw new Error("sessionId 不能为空");}const currentMessageContent = messages[messages.length - 1]?.content ?? "";// 获取当前用户的独立 memoryconst memory = getMemory(sessionId);// 初始化模型const model = new ChatOpenAI({model: "qwen-plus",apiKey: process.env.MODEL_API_KEY,configuration: {baseURL: "https://dashscope.aliyuncs.com/compatible-mode/v1",},});// 初始化 AgentExecutor,并挂载用户独立 Memoryconst executor = await initializeAgentExecutorWithOptions([addTool],model,{agentType: "openai-functions",verbose: true,memory,});// 把 agent 的迭代结果转成 string 流async function* streamText() {console.log(`[${sessionId}] 用户输入:`, currentMessageContent);for await (const step of await executor.stream({input: currentMessageContent,chat_history: await memory.loadMemoryVariables({}).then(v => v.chat_history),})) {if (step.output) {yield step.output;}}}return new StreamingTextResponse(streamText());} catch (e: any) {return NextResponse.json({ error: e.message },{ status: e.status ?? 500 });}
}
用法示例:
客户端调用时带上 sessionId
:
await fetch("/api/chat", {method: "POST",body: JSON.stringify({sessionId: "user123", // 每个用户/会话唯一messages: [{ role: "user", content: "假如你是新闻分析师" }],}),
});
清除历史
// 获取当前用户的独立 memoryconst memory = getMemory(sessionId);// 执行清理if (clearHistory) {await memory.clear();return NextResponse.json({ message: `Session ${sessionId} 历史已清理` });}