Langchain 历史消息和会话管理
一、基础短期记忆管理
1. 对话缓冲区 (ConversationBufferMemory)
作用:完整记录所有历史消息
适用场景:对话轮次较少,需完整上下文
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory()
memory.save_context({"input": "你好"}, {"output": "你好!有什么可以帮助您?"})
memory.save_context({"input": "推荐一部科幻电影"}, {"output": "《星际穿越》怎么样?"})
print(memory.load_memory_variables({}))
# 输出: {'history': 'Human: 你好\nAI: 你好!有什么可以帮助您?\nHuman: 推荐一部科幻电影\nAI: 《星际穿越》怎么样?'}
2. 滑动窗口记忆 (ConversationBufferWindowMemory)
作用:仅保留最近的K轮对话
适用场景:避免上下文过长,符合模型token限制
from langchain.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(k=2) # 保留最近2轮
for i in range(5):
memory.save_context(
{"input": f"消息{i}"},
{"output": f"回复{i}"}
)
print(memory.load_memory_variables({}))
# 输出: {'history': 'Human: 消息3\nAI: 回复3\nHuman: 消息4\nAI: 回复4'}
二、消息总结优化
1. 摘要式记忆 (ConversationSummaryMemory)
作用:动态生成历史摘要
原理:定期调用LLM压缩历史
from langchain.memory import ConversationSummaryMemory
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo")
memory = ConversationSummaryMemory(llm=llm)
for i in range(1,6):
memory.save_context(
{"input": f"讨论要点{i}"},
{"output": f"详细内容{i}"}
)
print(memory.load_memory_variables({}))
# 输出类似: {'history': '系统总结了5个讨论要点及其内容...'}
2. 混合模式 (Buffer+Summary)
策略:最近消息完整保留 + 早期历史摘要
from langchain.memory import ConversationSummaryBufferMemory
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=100 # 总token超限时触发摘要
)
# 模拟长对话
for i in range(10):
memory.save_context(
{"input": f"长对话消息{i}"},
{"output": f"详细回复{i}"*20}
)
history = memory.load_memory_variables({})
print(history['history']) # 部分完整+部分摘要
三、进阶记忆管理
1. 向量存储长期记忆
作用:持久化存储重要信息,支持语义检索
from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
retriever = FAISS(OpenAIEmbeddings()).as_retriever()
memory = VectorStoreRetrieverMemory(retriever=retriever)
# 存储关键信息
memory.save_context(
{"input": "用户偏好:喜欢硬科幻"},
{"output": "已记录偏好"}
)
# 检索相关记忆
print(memory.load_memory_variables({"input": "推荐电影"}))
# 输出: {'history': '用户偏好:喜欢硬科幻'}
2. 结构化记忆 (Entity Memory)
作用:跟踪对话中的关键实体
from langchain.memory import ConversationEntityMemory
memory = ConversationEntityMemory(llm=llm)
memory.save_context(
{"input": "张三今年30岁,住在北京"},
{"output": "已记录个人信息"}
)
print(memory.load_memory_variables({"input": "张三多大了?"}))
# 输出: {'history': 'Human: 张三多大了?\n',
# 'entities': {'张三': '张三今年30岁,住在北京'}}
四、消息处理策略
1. 动态裁剪算法
from langchain.schema import BaseMemory
from langchain_community.llms import OpenAI
import tiktoken
class SmartTruncateMemory(BaseMemory):
def __init__(self, max_tokens=2000):
self.max_tokens = max_tokens
self.buffer = []
self.encoder = tiktoken.encoding_for_model("gpt-3.5-turbo")
def _count_tokens(self, text):
return len(self.encoder.encode(text))
def save_context(self, inputs, outputs):
self.buffer.append(f"User: {inputs['input']}")
self.buffer.append(f"AI: {outputs['output']}")
# 自动裁剪
total = sum(self._count_tokens(m) for m in self.buffer)
while total > self.max_tokens:
removed = self.buffer.pop(0)
total -= self._count_tokens(removed)
def load_memory_variables(self, inputs):
return {"history": "\n".join(self.buffer)}
2. 关键信息提取
from langchain_core.prompts import PromptTemplate
extract_prompt = PromptTemplate.from_template("""
对话历史:
{history}
请提取与当前问题相关的关键信息:
问题:{input}
""")
chain = extract_prompt | llm
result = chain.invoke({
"history": "用户之前提到喜欢诺兰的电影和物理学主题",
"input": "有什么电影推荐吗?"
})
# 输出: "关键信息:用户偏好诺兰导演和物理学相关主题"
五、生产级解决方案
1. 分层记忆架构
2. 集成示例
from langchain.chains import ConversationChain
from langchain.memory import (
ConversationBufferMemory,
VectorStoreRetrieverMemory,
CombinedMemory
)
# 组合多种记忆
buff_memory = ConversationBufferMemory()
vec_memory = VectorStoreRetrieverMemory(...)
combined_memory = CombinedMemory(memories=[buff_memory, vec_memory])
# 构建对话链
conversation = ConversationChain(
llm=llm,
memory=combined_memory,
verbose=True
)
response = conversation.predict(input="根据我的喜好推荐电影")
六、性能优化技巧
1. Token计算优化
def calculate_token_usage(messages):
encoder = tiktoken.encoding_for_model("gpt-4")
count = 0
for msg in messages:
count += len(encoder.encode(msg))
return count
# 在保存上下文前检查
if calculate_token_usage(current_history) > 3800:
trigger_summary()
2. 异步处理
async def async_summarize(history):
chain = summarize_prompt | llm
return await chain.arun(history)
# 在事件循环中执行
import asyncio
summary = asyncio.run(async_summarize(long_history))
七、评估指标
指标 | 说明 | 目标值 |
---|---|---|
响应延迟 | 消息处理到响应生成的时间 | <2s |
Token使用效率 | 有效信息/token比率 | >0.8 |
信息召回率 | 关键历史信息被利用的比例 | >90% |
内存占用 | 对话历史存储的内存消耗 | <100MB/会话 |
通过以上方案,可实现:
- 精准上下文控制:动态调整历史信息量
- 长期知识留存:关键信息向量化存储
- 高效资源利用:平衡性能与信息完整性
- 智能语义处理:基于内容的记忆管理
- 易扩展架构:灵活组合不同记忆策略
实际部署时需根据具体场景调整记忆策略组合,并通过A/B测试验证效果。
会话隔离
一、会话管理核心要素
要素 | 说明 |
---|---|
会话ID生成 | 唯一标识用户会话 (UUID/JWT/用户ID) |
存储后端 | 内存/Database/Redis 等持久化方案 |
对话历史隔离 | 保证不同用户的上下文不混淆 |
生命周期管理 | 会话超时自动清理机制 |
并发安全 | 处理多线程/协程下的数据访问 |
二、基础会话管理实现
1. 内存存储方案 (开发环境适用)
from langchain.memory import ConversationBufferMemory
from uuid import uuid4
class SessionManager:
def __init__(self):
self.sessions = {} # {session_id: memory}
def create_session(self) -> str:
session_id = str(uuid4())
self.sessions[session_id] = ConversationBufferMemory()
return session_id
def get_memory(self, session_id: str) -> ConversationBufferMemory:
return self.sessions.get(session_id)
# 使用示例
manager = SessionManager()
session_id = manager.create_session()
# 保存对话
memory = manager.get_memory(session_id)
memory.save_context({"input": "你好"}, {"output": "您好!有什么可以帮助您?"})
# 读取历史
print(memory.load_memory_variables({}))
# 输出: {'history': 'Human: 你好\nAI: 您好!有什么可以帮助您?'}
2. Redis存储方案 (生产环境推荐)
import redis
import json
from langchain.memory import ConversationBufferMemory
class RedisSessionManager:
def __init__(self, host='localhost', port=6379, db=0, ttl=3600):
self.redis = redis.Redis(host=host, port=port, db=db)
self.ttl = ttl # 会话过期时间(秒)
def create_session(self, user_id: str) -> bool:
memory = ConversationBufferMemory()
return self.redis.setex(
name=user_id,
time=self.ttl,
value=json.dumps(memory.dict())
)
def get_memory(self, user_id: str) -> ConversationBufferMemory:
data = self.redis.get(user_id)
if data:
memory = ConversationBufferMemory()
memory.__dict__.update(json.loads(data))
return memory
return None
def save_memory(self, user_id: str, memory: ConversationBufferMemory):
self.redis.setex(
name=user_id,
time=self.ttl,
value=json.dumps(memory.dict())
)
# 使用示例
r_manager = RedisSessionManager(ttl=7200)
user_id = "user_123"
r_manager.create_session(user_id)
memory = r_manager.get_memory(user_id)
memory.save_context(
{"input": "推荐一部电影"},
{"output": "《盗梦空间》如何?诺兰的经典作品"}
)
r_manager.save_memory(user_id, memory)
三、Web 服务集成示例 (FastAPI)
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
session_manager = SessionManager() # 或用RedisSessionManager
class ChatRequest(BaseModel):
session_id: str
message: str
class ChatResponse(BaseModel):
session_id: str
response: str
history: str
@app.post("/chat")
async def chat_endpoint(request: ChatRequest) -> ChatResponse:
# 获取会话内存
memory = session_manager.get_memory(request.session_id)
if not memory:
raise HTTPException(status_code=404, detail="Session not found")
# 构建对话链
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(temperature=0.7)
conversation = ConversationChain(
llm=llm,
memory=memory,
verbose=True
)
# 处理请求
response = conversation.predict(input=request.message)
return ChatResponse(
session_id=request.session_id,
response=response,
history=memory.load_memory_variables({})["history"]
)
@app.post("/create_session")
async def create_session():
session_id = session_manager.create_session()
return {"session_id": session_id}
@app.delete("/close_session/{session_id}")
async def close_session(session_id: str):
if session_manager.delete_session(session_id):
return {"status": "success"}
raise HTTPException(status_code=404, detail="Session not found")
四、高级会话管理功能
1. 会话元数据管理
from pydantic import BaseModel
from datetime import datetime
class SessionMeta(BaseModel):
created_at: datetime
last_accessed: datetime
user_agent: str
ip_address: str
class EnhancedSessionManager(SessionManager):
def __init__(self):
super().__init__()
self.metadata = {} # {session_id: SessionMeta}
def create_session(self, request) -> str:
session_id = super().create_session()
self.metadata[session_id] = SessionMeta(
created_at=datetime.now(),
last_accessed=datetime.now(),
user_agent=request.headers.get("User-Agent"),
ip_address=request.client.host
)
return session_id
2. 自动清理机制
import asyncio
from datetime import timedelta
class AutoCleanManager(RedisSessionManager):
async def auto_clean(self, interval=300):
while True:
await asyncio.sleep(interval)
# 实现清理逻辑(示例:删除7天未活跃会话)
all_keys = self.redis.keys("*")
for key in all_keys:
last_access = self.redis.object("idletime", key)
if last_access > 604800: # 7天
self.redis.delete(key)
# 在FastAPI启动时运行
@app.on_event("startup")
async def startup_event():
cleaner = AutoCleanManager()
asyncio.create_task(cleaner.auto_clean())
五、生产环境最佳实践
1. 安全措施
# 会话ID加密示例
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher = Fernet(key)
def encrypt_session_id(session_id: str) -> str:
return cipher.encrypt(session_id.encode()).decode()
def decrypt_session_id(token: str) -> str:
return cipher.decrypt(token.encode()).decode()
2. 性能优化
# 使用LRU缓存近期活跃会话
from functools import lru_cache
class CachedSessionManager(RedisSessionManager):
@lru_cache(maxsize=1000)
def get_memory(self, user_id: str):
return super().get_memory(user_id)
3. 监控指标
from prometheus_client import Counter, Gauge
SESSION_COUNTER = Counter(
'session_total',
'Total sessions',
['operation'] # create/delete
)
ACTIVE_SESSIONS = Gauge(
'active_sessions',
'Currently active sessions'
)
class MonitoredSessionManager(SessionManager):
def create_session(self):
SESSION_COUNTER.labels('create').inc()
ACTIVE_SESSIONS.inc()
return super().create_session()
def delete_session(self, session_id):
SESSION_COUNTER.labels('delete').inc()
ACTIVE_SESSIONS.dec()
return super().delete_session(session_id)
六、会话管理架构图
通过以上方案,您可以实现:
- 严格的会话隔离:确保不同用户数据完全独立
- 弹性扩展能力:支持从单机到分布式部署
- 生产级可靠性:包含持久化存储和故障恢复机制
- 完善的监控体系:实时跟踪会话状态
- 企业级安全:加密传输和存储
实际部署时可根据业务需求选择存储方案(推荐生产环境使用Redis),并通过调整TTL值和清理策略优化资源使用。