当前位置: 首页 > news >正文

22_AI智能体开发架构搭建之基于Redis构建高性能AI对话记忆存储服务实践

前言

在AI应用开发中,对话记忆管理是提升用户体验的关键技术。今天分享如何通过Redis构建高可用、高性能的对话记忆存储服务,实现智能的多轮对话上下文管理。

AI智能体系统设计相关文章:
👉《01_AI智能体系统设计之系统架构设计》
👉《02_AI智能体系统设计之钉钉消息处理流程设计》
👉《03_AI智能体系统设计之Agent决策流程设计》
👉《04_AI智能体系统设计之工具调用人工干预机制深度解析》

AI智能体开发环境搭建相关文章:
👉《05_AI智能体开发环境搭建之获取相关资源指南》
👉《06_AI智能体开发环境搭建之Miniconda零基础安装配置指南》
👉《07_AI智能体开发环境搭建之Poetry安装适用指南,Python开发者告别依赖管理烦恼》
👉《08_AI智能体开发环境搭建之Conda与Poetry的完美整合创建虚拟环境》
👉《09_AI智能体开发环境搭建之Redis安装配置完整指南》
👉《10_AI智能体开发环境搭建之Qdrant向量搜索引擎安装配置全攻略》
👉《11_AI智能体开发环境搭建之VSCode安装配置与效率提升完整指南》
👉《12_AI智能体开发环境搭建之PyCharm社区版安装配置全攻略,打造高效的Python开发环境》

AI智能体开发架构搭建相关文章:
👉《13_AI智能体开发架构搭建之资深开发者的初始化项目实践》
👉《14_AI智能体开发架构搭建之资深开发者的项目依赖管理实践》
👉《15_AI智能体开发架构搭建之生产级架构全局配置管理最佳实践》
👉《16_AI智能体开发架构搭建之全局日志配置实践》
👉《17_AI智能体开发架构搭建之Flask集成swagger在线文档实践》
👉《18_AI智能体开发架构搭建之集成DeepSeek-V3与BGE-M3的最佳实践指南》
👉《19_AI智能体开发架构搭建之基于Qdrant构建知识库最佳实践指南》
👉《20_AI智能体开发架构搭建之构建高可用网络爬虫工具最佳实践指南》

更多相关文章内容: 👉《AI智能体从0到企业级项目落地》专栏

配套视频教程👉《AI智能体实战开发教程(从0到企业级项目落地)》共62节(已完结),从零开始,到企业级项目落地,这套课程将为你提供最完整的学习路径。不管你是初学者还是有一定经验的开发者,都能在这里获得实实在在的成长和提升。

二、为什么选择Redis?

传统内存存储的痛点:

  • 应用重启导致对话历史丢失
  • 多实例部署时状态不一致
  • 内存压力随用户量增长

Redis的核心优势:

  • 持久化保证数据安全
  • 原子操作确保一致性
  • 丰富的数据结构支持
  • 卓越的读写性能

三、配置驱动的存储设计

3.1 环境配置策略

# .env 配置文件
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=123456
MEMORY_KEY=flyoss_memory
MEMORY_MAX_LENGTH=-1    # -1表示不限制,支持动态扩容
MEMORY_QUERY_LENGTH=10  # 偶数保证对话回合完整性

在这里插入图片描述

3.2 配置文件加载器

在 app/utils/config_loader.py 文件添加Redis存储配置项

# Redis存储配置
REDIS_HOST: str = Field("localhost", env="REDIS_HOST")
REDIS_PORT: int = Field(6379, env="REDIS_PORT")
REDIS_DB: int = Field(0, env="REDIS_DB")
REDIS_PASSWORD: str = Field("", env="REDIS_PASSWORD")
MEMORY_KEY: str = Field("flyoss_memory", env="MEMORY_KEY")
MEMORY_MAX_LENGTH: int = Field(-1, env="MEMORY_MAX_LENGTH")
MEMORY_QUERY_LENGTH: int = Field(5, env="MEMORY_QUERY_LENGTH")

在这里插入图片描述

三、集成 Redis 构建对话记忆存储服务

在 app 目录下 core 包创建 memory.py 文件,编写集成 Redis 构建对话记忆服务的代码实现

import redis.asyncio as redis
import json
from typing import List, Dict, Any, Optional
from app.utils import config, setup_logging# 初始化日志配置
logger = setup_logging()class RedisMemory:"""基于Redis的记忆系统"""def __init__(self):logger.info("初始化RedisMemory...")self.client = redis.Redis(  # 使用异步Redis客户端host=config.REDIS_HOST,port=config.REDIS_PORT,db=config.REDIS_DB,password=config.REDIS_PASSWORD or None,decode_responses=True)# 验证配置值if config.MEMORY_MAX_LENGTH < -1:raise ValueError("MEMORY_MAX_LENGTH 不能小于 -1")if config.MEMORY_QUERY_LENGTH <= 0:raise ValueError("MEMORY_QUERY_LENGTH 必须大于 0")# 新增验证:配置值必须为大于0的偶数(MEMORY_MAX_LENGTH 可以为-1)if config.MEMORY_MAX_LENGTH != -1:if config.MEMORY_MAX_LENGTH <= 0:raise ValueError("MEMORY_MAX_LENGTH 必须为-1或大于0的偶数")if config.MEMORY_MAX_LENGTH % 2 != 0:raise ValueError("MEMORY_MAX_LENGTH 必须是偶数(除-1外)")if config.MEMORY_QUERY_LENGTH <= 0:raise ValueError("MEMORY_QUERY_LENGTH 必须大于0")if config.MEMORY_QUERY_LENGTH % 2 != 0:raise ValueError("MEMORY_QUERY_LENGTH 必须是偶数")self.memory_key = config.MEMORY_KEYself.max_length = config.MEMORY_MAX_LENGTHself.query_length = config.MEMORY_QUERY_LENGTHasync def get_history(self, user_id: str) -> List[Dict[str, Any]]:  # 改为异步方法"""获取用户的对话历史,使用MEMORY_QUERY_LENGTH限制查询条数"""key = f"{self.memory_key}:{user_id}"try:# 获取列表总长度total_len = await self.client.llen(key)  # 添加awaitif total_len == 0:return []# 获取最新的历史记录(从头开始取 query_length 条)history_json = await self.client.lrange(  # 添加awaitkey, 0, self.query_length - 1)# 反转列表使记录按时间顺序排列(从旧到新)return [json.loads(item) for item in reversed(history_json)]except redis.RedisError as e:logger.error(f"获取记忆失败:{str(e)}", exc_info=True)return []except json.JSONDecodeError as e:logger.error(f"记忆数据解析失败:{str(e)}", exc_info=True)return []async def add_record(self, user_id: str, record: Dict[str, Any]):  # 改为异步方法"""添加新的对话记录"""key = f"{self.memory_key}:{user_id}"try:# 添加新记录await self.client.lpush(key, json.dumps(record, ensure_ascii=False))  # 添加await# 当max_length不为-1时进行修剪if self.max_length != -1:# 修剪列表,保留最近的记录await self.client.ltrim(key, 0, self.max_length - 1)  # 添加awaitexcept redis.RedisError as e:logger.error(f"保存记忆失败: {str(e)}", exc_info=True)except TypeError as e:logger.error(f"记忆数据序列化失败: {str(e)}", exc_info=True)async def clear_memory(self, user_id: str):  # 改为异步方法"""清除用户的所有记忆"""key = f"{self.memory_key}:{user_id}"try:await self.client.delete(key)  # 添加awaitlogger.info(f"用户记忆已清除: {user_id}")except redis.RedisError as e:logger.error(f"清除记忆失败: {str(e)}", exc_info=True)async def set_state(self, conversation_id: str, state_data: Dict[str, Any], expire_seconds: int = 0):"""保存状态数据到Redis"""try:# 序列化状态数据state_json = json.dumps(state_data, ensure_ascii=False)# 保存到Rediskey = f"{self.memory_key}:{conversation_id}"if expire_seconds > 0:await self.client.setex(key, expire_seconds, state_json)  # 添加awaitelse:await self.client.set(key, state_json)  # 添加awaitlogger.info(f"已保存状态: {key},过期时间: {expire_seconds}秒")except Exception as e:logger.error(f"保存状态失败: {str(e)}", exc_info=True)async def get_state(self, conversation_id: str) -> Optional[Dict[str, Any]]:"""从Redis获取状态数据"""try:key = f"{self.memory_key}:{conversation_id}"state_json = await self.client.get(key)  # 添加awaitif state_json:return json.loads(state_json)return Noneexcept Exception as e:logger.error(f"获取状态失败: {str(e)}", exc_info=True)return Noneasync def delete_state(self, conversation_id: str):"""从Redis删除状态数据"""try:key = f"{self.memory_key}:{conversation_id}"await self.client.delete(key)  # 添加awaitlogger.info(f"已删除状态: {key}")except Exception as e:logger.error(f"删除状态失败: {str(e)}", exc_info=True)# 全局记忆系统实例
redis_memory = RedisMemory()

在这里插入图片描述
在 app 目录下 core 包的 init.py 文件导入集成 Redis 对话记忆存储服务

from .knowledge_base import knowledge_base
from .memory import redis_memory__all__ = ["knowledge_base","redis_memory",
]

在这里插入图片描述

四、测试集成情况

在 tests 下创建 simple_test_memory.py 测试类

import asyncio
from app.core.memory import redis_memory
from app.utils import setup_logging# 初始化日志配置
logger = setup_logging()async def test_add_and_get_record():"""测试添加和获取记录"""logger.info("开始测试 add_record 和 get_history 方法...")try:# 使用一个测试用户IDtest_user_id = "test_user_123"# 清除之前可能存在的测试数据await redis_memory.clear_memory(test_user_id)# 添加测试记录test_records = [{"role": "user", "content": "你好,AI!"},{"role": "assistant", "content": "你好!我是一个AI助手。"},{"role": "user", "content": "能帮我回答一个问题吗?"},{"role": "assistant", "content": "当然可以,请说!"}]for record in test_records:await redis_memory.add_record(test_user_id, record)logger.info(f"已添加记录: {record}")# 获取历史记录history = await redis_memory.get_history(test_user_id)logger.info(f"获取到的历史记录数量: {len(history)}")logger.info(f"历史记录: {history}")# 验证记录数量(应该等于或小于MEMORY_QUERY_LENGTH)assert len(history) > 0, "未获取到历史记录"logger.info("add_record 和 get_history 方法测试通过!")except Exception as e:logger.error(f"add_record 或 get_history 方法测试失败: {str(e)}")async def test_clear_memory():"""测试清除记忆"""logger.info("\n开始测试 clear_memory 方法...")try:# 使用一个测试用户IDtest_user_id = "test_user_456"# 先添加一些测试记录await redis_memory.add_record(test_user_id, {"role": "user", "content": "这是一条测试消息"})# 清除记忆await redis_memory.clear_memory(test_user_id)# 验证记忆是否被清除history = await redis_memory.get_history(test_user_id)assert len(history) == 0, "记忆未被完全清除"logger.info("clear_memory 方法测试通过!")except Exception as e:logger.error(f"clear_memory 方法测试失败: {str(e)}")async def test_state_management():"""测试状态管理功能"""logger.info("\n开始测试状态管理功能(set_state, get_state, delete_state)...")try:# 使用一个测试对话IDtest_conversation_id = "test_conversation_789"# 测试数据test_state = {"step": 2,"context": "正在进行多轮对话","options": ["继续", "返回", "退出"]}# 设置状态await redis_memory.set_state(test_conversation_id, test_state)logger.info(f"已设置状态: {test_state}")# 获取状态retrieved_state = await redis_memory.get_state(test_conversation_id)logger.info(f"获取到的状态: {retrieved_state}")# 验证状态assert retrieved_state is not None, "未能获取到状态"assert retrieved_state["step"] == test_state["step"], "状态数据不匹配"logger.info("set_state 和 get_state 方法测试通过!")# 删除状态await redis_memory.delete_state(test_conversation_id)# 验证状态是否被删除deleted_state = await redis_memory.get_state(test_conversation_id)assert deleted_state is None, "状态未被完全删除"logger.info("delete_state 方法测试通过!")except Exception as e:logger.error(f"状态管理功能测试失败: {str(e)}")async def main():"""主测试函数"""await test_add_and_get_record()await test_clear_memory()await test_state_management()if __name__ == "__main__":# 运行测试asyncio.run(main())

在这里插入图片描述
运行 simple_test_memory.py 测试类

python -m tests.simple_test_memory

在这里插入图片描述

总结

通过Redis构建的对话记忆存储系统,我们实现了:

  • 高性能读写:异步操作支撑高并发场景
  • 数据持久化:应用重启不丢失对话上下文
  • 智能内存管理:配置驱动的存储策略
  • 完善容错:多层次错误处理保障服务可用

**深度思考:**Redis不是简单的键值存储,而是通过精心设计的数据结构和操作组合,构建出符合业务特性的存储方案。列表结构的LTRIM操作、键命名空间设计、配置验证机制,这些细节共同构成了生产级的解决方案。

http://www.dtcms.com/a/515932.html

相关文章:

  • SIMPLE
  • 企业专业网站建设wordpress炫酷背景
  • MTPA算法原理及仿真验证
  • 【记录62】网站输入框搜索内容页面定位
  • 2025年新版ADB工具箱下载+驱动+ADB指令集+fastboot刷机ROOT工具
  • 上海网站建设平台站霸网络快速提升关键词排名软件
  • 【Android】从源码角度理解Handler机制
  • docker技术之部署docker
  • node框架做网站国外浏览器推荐
  • 悬赏平台 wordpress免费网站优化怎么做
  • java数据结构--LinkedList与链表
  • 【笔记--如何安装python环境】
  • 汇川H5U 威纶通HMI双仿真编程
  • 平均指数移动(EMA)
  • 可灵AI邀请码
  • 做外贸的网站怎么建立矿大师德建设网站
  • C语言需要掌握的基础知识点之前缀和
  • Java Optional orElse orElseGet orElseThrow()
  • windows10 wordpress十堰seo优化
  • 优选算法:01 双指针巧解移动零问题
  • 消息队列Kafka
  • 做游戏陪玩网站连锁销售网站制作
  • 【数字逻辑】数字逻辑实验实战:74HC151实现逻辑函数+74HC138搭全加器(附接线步骤+避坑指南)
  • Ubuntu上vue3 vite使用MBTiles搭建地图服务器
  • CClink转EtherCAT协议转换落地——汇川PLC管控球磨机CClink伺服案例
  • wordpress handsome长沙seo免费诊断
  • ChatGPT Atlas 发布:把 AI 直插进浏览器的一次重构
  • 第1章:初识Linux系统——第9节:安装服务软件、维护文件系统安全与文件权限配置实例
  • openAI发布的AI浏览器:什么是Atlas?(含 ChatGPT 浏览功能)macOS 离线下载安装Atlas完整教程
  • 西安市高陵区建设局网站聊城网站制作信息