当前位置: 首页 > news >正文

Langchain 历史消息和会话管理

一、基础短期记忆管理

1. 对话缓冲区 (ConversationBufferMemory)

作用:完整记录所有历史消息
适用场景:对话轮次较少,需完整上下文

from langchain.memory import ConversationBufferMemory

memory = ConversationBufferMemory()
memory.save_context({"input": "你好"}, {"output": "你好!有什么可以帮助您?"})
memory.save_context({"input": "推荐一部科幻电影"}, {"output": "《星际穿越》怎么样?"})

print(memory.load_memory_variables({}))
# 输出: {'history': 'Human: 你好\nAI: 你好!有什么可以帮助您?\nHuman: 推荐一部科幻电影\nAI: 《星际穿越》怎么样?'}
2. 滑动窗口记忆 (ConversationBufferWindowMemory)

作用:仅保留最近的K轮对话
适用场景:避免上下文过长,符合模型token限制

from langchain.memory import ConversationBufferWindowMemory

memory = ConversationBufferWindowMemory(k=2)  # 保留最近2轮
for i in range(5):
    memory.save_context(
        {"input": f"消息{i}"}, 
        {"output": f"回复{i}"}
    )
    
print(memory.load_memory_variables({}))
# 输出: {'history': 'Human: 消息3\nAI: 回复3\nHuman: 消息4\nAI: 回复4'}

二、消息总结优化

1. 摘要式记忆 (ConversationSummaryMemory)

作用:动态生成历史摘要
原理:定期调用LLM压缩历史

from langchain.memory import ConversationSummaryMemory
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-3.5-turbo")
memory = ConversationSummaryMemory(llm=llm)

for i in range(1,6):
    memory.save_context(
        {"input": f"讨论要点{i}"},
        {"output": f"详细内容{i}"}
    )

print(memory.load_memory_variables({}))
# 输出类似: {'history': '系统总结了5个讨论要点及其内容...'}
2. 混合模式 (Buffer+Summary)

策略:最近消息完整保留 + 早期历史摘要

from langchain.memory import ConversationSummaryBufferMemory

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=100  # 总token超限时触发摘要
)

# 模拟长对话
for i in range(10):
    memory.save_context(
        {"input": f"长对话消息{i}"},
        {"output": f"详细回复{i}"*20}
    )

history = memory.load_memory_variables({})
print(history['history'])  # 部分完整+部分摘要

三、进阶记忆管理

1. 向量存储长期记忆

作用:持久化存储重要信息,支持语义检索

from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings

retriever = FAISS(OpenAIEmbeddings()).as_retriever()
memory = VectorStoreRetrieverMemory(retriever=retriever)

# 存储关键信息
memory.save_context(
    {"input": "用户偏好:喜欢硬科幻"},
    {"output": "已记录偏好"}
)

# 检索相关记忆
print(memory.load_memory_variables({"input": "推荐电影"}))
# 输出: {'history': '用户偏好:喜欢硬科幻'}
2. 结构化记忆 (Entity Memory)

作用:跟踪对话中的关键实体

from langchain.memory import ConversationEntityMemory

memory = ConversationEntityMemory(llm=llm)
memory.save_context(
    {"input": "张三今年30岁,住在北京"},
    {"output": "已记录个人信息"}
)

print(memory.load_memory_variables({"input": "张三多大了?"}))
# 输出: {'history': 'Human: 张三多大了?\n',
#      'entities': {'张三': '张三今年30岁,住在北京'}}

四、消息处理策略

1. 动态裁剪算法
from langchain.schema import BaseMemory
from langchain_community.llms import OpenAI
import tiktoken

class SmartTruncateMemory(BaseMemory):
    def __init__(self, max_tokens=2000):
        self.max_tokens = max_tokens
        self.buffer = []
        self.encoder = tiktoken.encoding_for_model("gpt-3.5-turbo")
    
    def _count_tokens(self, text):
        return len(self.encoder.encode(text))
    
    def save_context(self, inputs, outputs):
        self.buffer.append(f"User: {inputs['input']}")
        self.buffer.append(f"AI: {outputs['output']}")
        
        # 自动裁剪
        total = sum(self._count_tokens(m) for m in self.buffer)
        while total > self.max_tokens:
            removed = self.buffer.pop(0)
            total -= self._count_tokens(removed)
    
    def load_memory_variables(self, inputs):
        return {"history": "\n".join(self.buffer)}
2. 关键信息提取
from langchain_core.prompts import PromptTemplate

extract_prompt = PromptTemplate.from_template("""
对话历史:
{history}

请提取与当前问题相关的关键信息:
问题:{input}
""")

chain = extract_prompt | llm
result = chain.invoke({
    "history": "用户之前提到喜欢诺兰的电影和物理学主题",
    "input": "有什么电影推荐吗?"
})
# 输出: "关键信息:用户偏好诺兰导演和物理学相关主题"

五、生产级解决方案

1. 分层记忆架构
未超限
超限
当前对话
Token检查
保留完整上下文
摘要核心信息
向量存储
语义检索
当前对话+相关记忆
2. 集成示例
from langchain.chains import ConversationChain
from langchain.memory import (
    ConversationBufferMemory,
    VectorStoreRetrieverMemory,
    CombinedMemory
)

# 组合多种记忆
buff_memory = ConversationBufferMemory()
vec_memory = VectorStoreRetrieverMemory(...)
combined_memory = CombinedMemory(memories=[buff_memory, vec_memory])

# 构建对话链
conversation = ConversationChain(
    llm=llm,
    memory=combined_memory,
    verbose=True
)

response = conversation.predict(input="根据我的喜好推荐电影")

六、性能优化技巧

1. Token计算优化
def calculate_token_usage(messages):
    encoder = tiktoken.encoding_for_model("gpt-4")
    count = 0
    for msg in messages:
        count += len(encoder.encode(msg))
    return count

# 在保存上下文前检查
if calculate_token_usage(current_history) > 3800:
    trigger_summary()
2. 异步处理
async def async_summarize(history):
    chain = summarize_prompt | llm
    return await chain.arun(history)

# 在事件循环中执行
import asyncio
summary = asyncio.run(async_summarize(long_history))

七、评估指标

指标说明目标值
响应延迟消息处理到响应生成的时间<2s
Token使用效率有效信息/token比率>0.8
信息召回率关键历史信息被利用的比例>90%
内存占用对话历史存储的内存消耗<100MB/会话

通过以上方案,可实现:

  1. 精准上下文控制:动态调整历史信息量
  2. 长期知识留存:关键信息向量化存储
  3. 高效资源利用:平衡性能与信息完整性
  4. 智能语义处理:基于内容的记忆管理
  5. 易扩展架构:灵活组合不同记忆策略

实际部署时需根据具体场景调整记忆策略组合,并通过A/B测试验证效果。

会话隔离

一、会话管理核心要素

要素说明
会话ID生成唯一标识用户会话 (UUID/JWT/用户ID)
存储后端内存/Database/Redis 等持久化方案
对话历史隔离保证不同用户的上下文不混淆
生命周期管理会话超时自动清理机制
并发安全处理多线程/协程下的数据访问

二、基础会话管理实现

1. 内存存储方案 (开发环境适用)
from langchain.memory import ConversationBufferMemory
from uuid import uuid4

class SessionManager:
    def __init__(self):
        self.sessions = {}  # {session_id: memory}
    
    def create_session(self) -> str:
        session_id = str(uuid4())
        self.sessions[session_id] = ConversationBufferMemory()
        return session_id
    
    def get_memory(self, session_id: str) -> ConversationBufferMemory:
        return self.sessions.get(session_id)

# 使用示例
manager = SessionManager()
session_id = manager.create_session()

# 保存对话
memory = manager.get_memory(session_id)
memory.save_context({"input": "你好"}, {"output": "您好!有什么可以帮助您?"})

# 读取历史
print(memory.load_memory_variables({}))
# 输出: {'history': 'Human: 你好\nAI: 您好!有什么可以帮助您?'}
2. Redis存储方案 (生产环境推荐)
import redis
import json
from langchain.memory import ConversationBufferMemory

class RedisSessionManager:
    def __init__(self, host='localhost', port=6379, db=0, ttl=3600):
        self.redis = redis.Redis(host=host, port=port, db=db)
        self.ttl = ttl  # 会话过期时间(秒)
    
    def create_session(self, user_id: str) -> bool:
        memory = ConversationBufferMemory()
        return self.redis.setex(
            name=user_id,
            time=self.ttl,
            value=json.dumps(memory.dict())
        )
    
    def get_memory(self, user_id: str) -> ConversationBufferMemory:
        data = self.redis.get(user_id)
        if data:
            memory = ConversationBufferMemory()
            memory.__dict__.update(json.loads(data))
            return memory
        return None
    
    def save_memory(self, user_id: str, memory: ConversationBufferMemory):
        self.redis.setex(
            name=user_id,
            time=self.ttl,
            value=json.dumps(memory.dict())
        )

# 使用示例
r_manager = RedisSessionManager(ttl=7200)
user_id = "user_123"
r_manager.create_session(user_id)

memory = r_manager.get_memory(user_id)
memory.save_context(
    {"input": "推荐一部电影"},
    {"output": "《盗梦空间》如何?诺兰的经典作品"}
)
r_manager.save_memory(user_id, memory)

三、Web 服务集成示例 (FastAPI)

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()
session_manager = SessionManager()  # 或用RedisSessionManager

class ChatRequest(BaseModel):
    session_id: str
    message: str

class ChatResponse(BaseModel):
    session_id: str
    response: str
    history: str

@app.post("/chat")
async def chat_endpoint(request: ChatRequest) -> ChatResponse:
    # 获取会话内存
    memory = session_manager.get_memory(request.session_id)
    if not memory:
        raise HTTPException(status_code=404, detail="Session not found")
    
    # 构建对话链
    from langchain.chains import ConversationChain
    from langchain_openai import ChatOpenAI
    
    llm = ChatOpenAI(temperature=0.7)
    conversation = ConversationChain(
        llm=llm,
        memory=memory,
        verbose=True
    )
    
    # 处理请求
    response = conversation.predict(input=request.message)
    
    return ChatResponse(
        session_id=request.session_id,
        response=response,
        history=memory.load_memory_variables({})["history"]
    )

@app.post("/create_session")
async def create_session():
    session_id = session_manager.create_session()
    return {"session_id": session_id}

@app.delete("/close_session/{session_id}")
async def close_session(session_id: str):
    if session_manager.delete_session(session_id):
        return {"status": "success"}
    raise HTTPException(status_code=404, detail="Session not found")

四、高级会话管理功能

1. 会话元数据管理
from pydantic import BaseModel
from datetime import datetime

class SessionMeta(BaseModel):
    created_at: datetime
    last_accessed: datetime
    user_agent: str
    ip_address: str

class EnhancedSessionManager(SessionManager):
    def __init__(self):
        super().__init__()
        self.metadata = {}  # {session_id: SessionMeta}
    
    def create_session(self, request) -> str:
        session_id = super().create_session()
        self.metadata[session_id] = SessionMeta(
            created_at=datetime.now(),
            last_accessed=datetime.now(),
            user_agent=request.headers.get("User-Agent"),
            ip_address=request.client.host
        )
        return session_id
2. 自动清理机制
import asyncio
from datetime import timedelta

class AutoCleanManager(RedisSessionManager):
    async def auto_clean(self, interval=300):
        while True:
            await asyncio.sleep(interval)
            # 实现清理逻辑(示例:删除7天未活跃会话)
            all_keys = self.redis.keys("*")
            for key in all_keys:
                last_access = self.redis.object("idletime", key)
                if last_access > 604800:  # 7天
                    self.redis.delete(key)

# 在FastAPI启动时运行
@app.on_event("startup")
async def startup_event():
    cleaner = AutoCleanManager()
    asyncio.create_task(cleaner.auto_clean())

五、生产环境最佳实践

1. 安全措施
# 会话ID加密示例
from cryptography.fernet import Fernet

key = Fernet.generate_key()
cipher = Fernet(key)

def encrypt_session_id(session_id: str) -> str:
    return cipher.encrypt(session_id.encode()).decode()

def decrypt_session_id(token: str) -> str:
    return cipher.decrypt(token.encode()).decode()
2. 性能优化
# 使用LRU缓存近期活跃会话
from functools import lru_cache

class CachedSessionManager(RedisSessionManager):
    @lru_cache(maxsize=1000)
    def get_memory(self, user_id: str):
        return super().get_memory(user_id)
3. 监控指标
from prometheus_client import Counter, Gauge

SESSION_COUNTER = Counter(
    'session_total', 
    'Total sessions', 
    ['operation']  # create/delete
)

ACTIVE_SESSIONS = Gauge(
    'active_sessions',
    'Currently active sessions'
)

class MonitoredSessionManager(SessionManager):
    def create_session(self):
        SESSION_COUNTER.labels('create').inc()
        ACTIVE_SESSIONS.inc()
        return super().create_session()
    
    def delete_session(self, session_id):
        SESSION_COUNTER.labels('delete').inc()
        ACTIVE_SESSIONS.dec()
        return super().delete_session(session_id)

六、会话管理架构图

User WebServer SessionManager Redis LLM POST /create_session create_session() SETEX session_id OK session_id Return session_id POST /chat (with session_id) get_memory(session_id) GET session_id Memory data Memory object Process with memory save_memory() SETEX updated_data Return response User WebServer SessionManager Redis LLM

通过以上方案,您可以实现:

  1. 严格的会话隔离:确保不同用户数据完全独立
  2. 弹性扩展能力:支持从单机到分布式部署
  3. 生产级可靠性:包含持久化存储和故障恢复机制
  4. 完善的监控体系:实时跟踪会话状态
  5. 企业级安全:加密传输和存储

实际部署时可根据业务需求选择存储方案(推荐生产环境使用Redis),并通过调整TTL值和清理策略优化资源使用。

相关文章:

  • 简单一周日期展示及选择切换
  • 定时任务框架选型指南:Quartz、Elastic-Job 与 XXL-JOB 深度对比与场景实践
  • vue对文件进行加密,后台解密后保存
  • EFK日志分析
  • 操作系统 :Linux基础开发工具
  • LLM之Agent(十四)| 字节开源ComputerUse纯视觉驱动GUI 智能体模型 UI-TARS
  • file io(I)
  • 数据类设计_图片类设计之8_自由图形类设计_(前端架构)
  • 云资源开发学习应用场景指南,场景 1 云上编程实践平台
  • 【Linux网络(五)】传输层协议
  • 说说MyBatis一、二级缓存和Spring一二级缓存有什么关系?
  • Vue Router动态改变路由参数的两种方法
  • 装饰器模式介绍和典型实现
  • k8s常用命令
  • js的闭包
  • linux 运行脚本命令区别
  • Pinecone数据库介绍、Milvus数据库介绍
  • 基于FastAPI与Kimi AI的智能聊天应用开发实践
  • 6. 使用VUE实现前端页面的分级嵌套
  • Spring Boot集成阿里云OSS:对象存储实战指南
  • 网站建设用苹果系统与liunx/网站优化外包找谁
  • 网站建设基础百度百科/互联网怎么打广告推广
  • 发布asp.net网站到虚拟主机/爱站工具查询
  • 宣传网站开发/网站优化软件
  • 国外域名建站/保定网站制作
  • 如何在yy做电影网站/项目网站