从零搭建 RAG 智能问答系统3:聊天信息持久化和登录注册
在前两篇博客中,我们已经完成了 RAG 智能问答系统的基础架构搭建(包括本地 LLM 与嵌入模型集成)和 Chainlit 聊天界面的初步开发,实现了 “上传文件 + 基于知识库对话” 的核心功能。但此时系统仍有明显短板:聊天记录和上传的文件会随会话结束丢失,缺乏用户身份验证机制,且文件交互仅支持 “上传 - 解析” 不支持预览。
本篇作为系列第三篇,将重点解决这些问题,完整实现聊天信息持久化(对话数据 + 文件数据)和登录权限控制,让系统从 “临时试用版” 升级为 “可稳定使用的应用”。
一、聊天信息持久化:解决数据丢失问题
持久化的核心是将 “临时存在内存中的聊天记录” 和 “用户上传的文件” 存储到可靠的存储介质中。我们采用 “关系型数据库存对话元数据 + 对象存储存文件” 的方案:
- 对话数据(用户信息、聊天线程、消息步骤):使用 PostgreSQL(支持异步操作和复杂查询);
- 文件数据(用户上传的 PDF、图片等):使用 MinIO(轻量级对象存储,兼容 S3 协议,适合中小规模文件管理)。
1.1 第一步:搭建 PostgreSQL 数据库(对话数据存储)
要实现聊天记录的持久化存储,那么肯定就需要有数据库,这里我们选用的是PostgerSQL,但是单纯的有数据库还是不行,我们还要先创建表,用来存储用户的聊天记录。做了这些之后我们还是不够,我们还需要创建数据库的操作函数,实现对聊天数据的增删改查,后面我在github上获取的相关的数据库表和操作函数,希望能够给大家一个参考。
PostgreSQL 是开源的企业级关系型数据库,支持 JSONB 类型(适合存储聊天元数据),且有成熟的 Python 异步驱动(asyncpg),非常适合作为聊天数据的存储后端。
1.1.1 安装 PostgreSQL
- 访问 PostgreSQL 官网,下载对应系统的安装包(以 Windows 为例);
- 安装过程中设置数据库超级用户密码并记住,记住端口号(默认 5432);
- 验证安装:打开命令行输入
psql -U postgres
,输入密码能进入数据库交互界面即成功。
官网和参考教程:
官⽹:https://www.postgresql.org/download/windows/https://www.enterprisedb.com/downloads/postgres-postgresql-downloads参考教程:https://blog.csdn.net/weixin_51484460/article/details/140162425
官⽹:https://www.postgresql.org/download/windows/https://www.enterprisedb.com/downloads/postgres-postgresql-downloads参考教程:https://blog.csdn.net/weixin_51484460/article/details/140162425
1.1.2 使用 pgAdmin4 管理数据库
PostgreSQL 自带 pgAdmin4 可视化工具,方便创建数据库和执行 SQL 脚本:
1、启动 pgAdmin4,首次打开需设置主密码;
2、连接数据库服务器:在左侧「Servers」右键 →「Register」→「Server」,输入名称(如 PostgreSQL17)、主机(localhost)、端口(5432)、用户名和密码(注意:这里的账号和密码一点要记住);
3、创建业务数据库:右键「Databases」→「Create」→「Database」,数据库名设为 chainlit_db
(后续代码会用到);
4、执行表结构 SQL:右键 chainlit_db
→「Query Tool」,复制以下 SQL 脚本执行,创建存储聊天数据的 5 张核心表。
-- 用户表:存储用户身份信息
CREATE TABLE users ("id" UUID PRIMARY KEY,"identifier" TEXT NOT NULL UNIQUE, "metadata" JSONB NOT NULL,"createdAt" TEXT
);-- 线程表:每个聊天会话对应一个线程
CREATE TABLE IF NOT EXISTS threads ( "id" UUID PRIMARY KEY,"createdAt" TEXT, "name" TEXT,"userId" UUID,"userIdentifier" TEXT,"tags" TEXT[],"metadata" JSONB, FOREIGN KEY ("userId") REFERENCES users("id") ON DELETE CASCADE
);-- 步骤表:存储单条消息(用户/助手)
CREATE TABLE IF NOT EXISTS steps ( "id" UUID PRIMARY KEY,"name" TEXT NOT NULL,"type" TEXT NOT NULL,"threadId" UUID NOT NULL,"parentId" UUID,"isError" BOOLEAN, "input" TEXT, "disableFeedback" BOOLEAN NOT NULL DEFAULT true, "streaming" BOOLEAN NOT NULL, "waitForAnswer" BOOLEAN, "metadata" JSONB, "tags" TEXT[], "output" TEXT,"createdAt" TEXT,"start" TEXT,"end" TEXT,"generation" JSONB, "showInput" TEXT,"language" TEXT,"indent" INT,FOREIGN KEY ("threadId") REFERENCES threads("id") ON DELETE CASCADE
);-- 元素表:存储文件、图片等附加元素
CREATE TABLE IF NOT EXISTS elements ( "id" UUID PRIMARY KEY, "threadId" UUID,"type" TEXT,"url" TEXT,"objectKey" TEXT, "chainlitKey" TEXT, "name" TEXT NOT NULL, "display" TEXT,"size" TEXT,"page" INT,"language" TEXT, "forId" UUID,"mime" TEXT,FOREIGN KEY ("threadId") REFERENCES threads("id") ON DELETE CASCADE
);-- 反馈表:存储用户对助手回复的评价
CREATE TABLE IF NOT EXISTS feedbacks ( "id" UUID PRIMARY KEY, "forId" UUID NOT NULL, "threadId" UUID NOT NULL,"value" INT NOT NULL,"comment" TEXT,FOREIGN KEY ("forId") REFERENCES steps("id") ON DELETE CASCADE,FOREIGN KEY ("threadId") REFERENCES threads("id") ON DELETE CASCADE
);
表结构说明(用表格更清晰):
表名 | 核心作用 | 关键字段 |
---|---|---|
users | 存储用户身份(登录后关联) | id (用户 UUID)、identifier (唯一标识) |
threads | 管理聊天会话(每个会话一个线程) | userId (关联用户)、tags (会话标签) |
steps | 存储单条消息(用户输入 / 助手回复) | threadId (关联线程)、input/output (消息内容) |
elements | 存储文件 / 图片元数据 | threadId (关联线程)、objectKey (MinIO 文件键) |
feedbacks | 存储用户反馈 | forId (关联消息)、value (反馈分值) |
1.1.3 封装 PostgreSQL 数据层
为了让 Chainlit 能与 PostgreSQL 交互,我们需要封装一个数据层类(postgresql_data_layer.py
),实现用户、线程、消息的增删改查。核心逻辑基于 SQLAlchemy 异步操作,避免阻塞聊天界面。
核心代码片段(关键函数说明):
import json
import uuid
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text
from chainlit.data.base import BaseDataLayer
from chainlit.user import PersistedUser, User
from chainlit.types import ThreadDictclass PostgreSQLDataLayer(BaseDataLayer):def __init__(self, conninfo: str, storage_provider=None):# 初始化异步数据库引擎self.engine: AsyncEngine = create_async_engine(conninfo)# 创建异步会话工厂self.async_session = sessionmaker(bind=self.engine, expire_on_commit=False, class_=AsyncSession)self.storage_provider = storage_provider # 关联MinIO存储# 异步执行SQL(防注入,支持事务)async def execute_sql(self, query: str, parameters: dict) -> Optional[List[Dict[str, Any]]]:async with self.async_session() as session:try:await session.begin()result = await session.execute(text(query), parameters)await session.commit()if result.returns_rows:return [dict(row._mapping) for row in result.fetchall()]return result.rowcountexcept Exception as e:await session.rollback()print(f"SQL执行错误: {e}")return None# 创建/更新用户async def create_user(self, user: User) -> Optional[PersistedUser]:existing_user = await self.get_user(user.identifier)if not existing_user:# 新用户:生成UUID和时间戳user_dict = {"id": str(uuid.uuid4()),"identifier": user.identifier,"metadata": json.dumps(user.metadata),"createdAt": datetime.now().isoformat() + "Z"}await self.execute_sql(query="INSERT INTO users (id, identifier, createdAt, metadata) VALUES (:id, :identifier, :createdAt, :metadata)",parameters=user_dict)else:# 老用户:更新元数据await self.execute_sql(query="UPDATE users SET metadata = :metadata WHERE identifier = :identifier",parameters={"metadata": json.dumps(user.metadata), "identifier": user.identifier})return await self.get_user(user.identifier)# 其他核心函数:get_user(获取用户)、update_thread(更新会话)、delete_thread(删除会话)等# 完整代码可参考文档中的postgresql_data_layer.py
1.2 第二步:搭建 MinIO 文件存储(文件数据持久化)
用户上传的 PDF、图片等文件不能存在本地(多用户环境下会冲突),MinIO 是轻量级的对象存储服务,支持通过 API 管理文件,非常适合存储这类非结构化数据。
1.2.1 安装 MinIO(两种方式)
- 方式 1:直接下载安装(Windows/Mac)访问 MinIO 官网,下载对应系统的安装包,解压后双击
minio.exe
,执行命令启动:minio server ./data # 数据存储在当前目录的data文件夹
- 方式 2:Docker 安装(推荐,跨系统兼容)已安装 Docker 的情况下,执行以下命令:
# 拉取镜像 docker pull minio/minio # 启动容器(9000端口为API端口,9001为控制台端口) docker run -p 9000:9000 -p 9001:9001 minio/minio server /data --address ":9000" --console-address ":9001"
启动后访问 MinIO 控制台:http://localhost:9001
,默认账号密码为 minioadmin/minioadmin
。
1.2.2 MinIO 基础配置(创建 Bucket 和访问秘钥)
1、创建 Bucket(存储桶)登录控制台后,点击左侧「Buckets」→「Create Bucket」,输入桶名(如 rag-qa-files
),关闭版本控制和对象锁定,点击「Create Bucket」。
2、创建访问秘钥点击左侧「Access Keys」→「Create access key」,输入描述(如 rag-system-key
),创建后保存「Access Key」和「Secret Key」(仅显示一次,后续配置需要)。
1.2.3 封装 MinIO 存储客户端
同样,我们需要封装一个 MinIO 客户端类(minio_storage_client.py
),实现文件的上传、下载和获取访问链接。
核心代码片段:
import os
import io
from typing import Union, Dict, Any
from minio import Minio
from chainlit.data.storage_clients.base import BaseStorageClientclass MinioStorageClient(BaseStorageClient):def __init__(self):# 从环境变量读取MinIO配置(避免硬编码)self.client = Minio(endpoint=os.environ["MINIO_ENDPOINT"], # 如 "127.0.0.1:9000"access_key=os.environ["MINIO_ACCESS_KEY"], # 刚才保存的Access Keysecret_key=os.environ["MINIO_SECRET_KEY"], # 刚才保存的Secret Keysecure=False # 本地测试关闭HTTPS)self.bucket_name = os.environ.get("MINIO_BUCKET_NAME", "rag-qa-files") # 默认桶名# 上传文件到MinIOasync def upload_file(self, object_key: str, data: Union[bytes, str], mime: str = "application/octet-stream") -> Dict[str, Any]:try:# 转换数据为BytesIOif isinstance(data, str):data = io.BytesIO(data.encode("utf-8"))else:data = io.BytesIO(data)# 上传文件self.client.put_object(bucket_name=self.bucket_name,object_name=object_key,data=data,length=len(data.getvalue()),content_type=mime)# 获取文件预览链接(临时链接,有效期默认7天)url = self.client.get_presigned_url("GET", self.bucket_name, object_key)return {"objectKey": object_key, "url": url}except Exception as e:print(f"MinIO上传错误: {e}")return {}
1.3 第三步:集成持久化到 Chainlit 主应用
将 PostgreSQL 数据层和 MinIO 存储客户端集成到 Chainlit 主程序(app_chat_ui.py
),让聊天数据和文件自动持久化。
核心集成代码:
import os
import chainlit as cl
from dotenv import load_dotenv
from llama_index.core import Settings
from embeddings import embed_model_local_bge_small
from llms import deepseek_llm
# 导入自定义数据层和存储客户端
from persistent.postgresql_data_layer import PostgreSQLDataLayer
from persistent.minio_storage_client import MinioStorageClient
import chainlit.data as cl_data# 1. 加载环境变量(避免硬编码配置)
load_dotenv() # 读取.env文件# 2. 初始化LLM和嵌入模型(与前两篇一致)
Settings.llm = deepseek_llm()
Settings.embed_model = embed_model_local_bge_small()# 3. 初始化MinIO存储客户端
minio_client = MinioStorageClient()# 4. 初始化PostgreSQL数据层,并关联MinIO
pg_conn_str = os.environ["PG_CONNECTION_STRING"] # 如 "postgresql+asyncpg://postgres:root123@localhost/chainlit_db"
cl_data._data_layer = PostgreSQLDataLayer(conninfo=pg_conn_str,storage_provider=minio_client
)# 5. 聊天会话启动(与前两篇一致,新增持久化自动生效)
@cl.on_chat_start
async def start():from llama_index.core.chat_engine import SimpleChatEnginechat_engine = SimpleChatEngine.from_defaults()cl.user_session.set("chat_engine", chat_engine)await cl.Message(author="Assistant", content="你好!我是AI助手,可基于文件回答问题~").send()# 6. 消息处理(文件上传后自动存储到MinIO,消息存储到PostgreSQL)
@cl.on_message
async def main(message: cl.Message):chat_engine = cl.user_session.get("chat_engine")msg = cl.Message(content="", author="Assistant")files = []# 处理用户上传的文件(自动存储到MinIO)for element in message.elements:if isinstance(element, cl.File) or isinstance(element, cl.Image):files.append(element.path)if files:# 读取文件并创建RAG索引(与前两篇一致)from llama_index.core import SimpleDirectoryReaderfrom rag.base_rag import RAGfrom llama_index.core.chat_engine.types import ChatModedata = SimpleDirectoryReader(input_files=files).load_data()index = await RAG.create_index_local(data)chat_engine = index.as_chat_engine(chat_mode=ChatMode.CONTEXT)cl.user_session.set("chat_engine", chat_engine)# 流式返回结果(消息自动存储到PostgreSQL)res = await cl.make_async(chat_engine.stream_chat)(message.content)for token in res.response_gen:await msg.stream_token(token)await msg.send()
1.4 环境变量配置(.env 文件)
为了避免硬编码敏感信息(如数据库密码、MinIO 秘钥),我们在项目根目录创建 .env
文件,统一管理配置:
# Chainlit认证秘钥(登录用)
CHAINLIT_AUTH_SECRET="hj%J?i*=po7SA_/XW_H^6X@4_hVeGzjct0rM5xbfmkJ^$$,K.*u=.wq,o9L%9Rc6"# LLM API Key(如DeepSeek、Moonshot)
DEEPSEEK_API_KEY="57ed57fa0eb24be09f005d2ec83324"
MOONSHOT_API_KEY="sk-AgqF096M2qCam8RmWeYnhsvAP6bXJ889sQ6d7wjWxan3"# MinIO配置
MINIO_ENDPOINT="127.0.0.1:9000"
MINIO_ACCESS_KEY="c0ubcoDw1W72Y8xrkJPF" # 你创建的MinIO Access Key
MINIO_SECRET_KEY="lfW9xm1TwjG2c7gijGCe69eNisU56KyEWdGSDTzE" # 你创建的MinIO Secret Key
MINIO_BUCKET_NAME="rag-qa-files" # 你创建的MinIO Bucket名# PostgreSQL配置(asyncpg驱动)
PG_CONNECTION_STRING="postgresql+asyncpg://postgres:root123@localhost/chainlit_db"
1.5 测试持久化功能
- 安装依赖:确保安装了异步 PostgreSQL 驱动和 MinIO 客户端
pip install asyncpg minio python-dotenv
- 启动 Chainlit:
chainlit run app_chat_ui.py --port 8080 -w # -w表示自动重载
- 测试操作:
- 上传一个 PDF 文件,发送消息询问文件内容;
- 关闭浏览器,重新打开
http://localhost:8080
,查看聊天历史是否保留; - 查看 MinIO 控制台的 Bucket,确认文件已上传;
- 查看 pgAdmin4 的
elements
表,确认文件元数据已存储。
二、登录权限实现:保障系统安全
默认情况下,Chainlit 应用允许任何人访问,添加登录功能可以限制未授权用户使用,保护知识库数据安全。我们使用 Chainlit 自带的password_auth_callback
实现简单的密码登录(后续可扩展为手机号 / 邮箱登录)。
2.1 实现登录回调函数
在 app_chat_ui.py
中添加登录认证逻辑,目前采用 “硬编码账号密码”(适合测试,生产环境需改为从数据库读取):
@cl.password_auth_callback
def auth_callback(username: str, password: str) -> Optional[cl.User]:"""密码认证回调:验证用户名密码,返回用户对象(认证成功)或None(失败)"""# 测试用账号密码(生产环境需替换为数据库查询)if (username, password) == ("admin", "admin123"):return cl.User(identifier="admin", # 用户唯一标识(需与PostgreSQL的users表对应)metadata={"role": "admin", # 角色(后续可基于角色控制权限)"provider": "credentials" # 认证方式})# 认证失败return None
2.2 测试登录功能
- 重启 Chainlit 应用,访问
http://localhost:8080
,会自动跳转到登录页(/login
); - 输入用户名
admin
、密码admin123
,点击 “继续” 即可登录; - 登录后右上角会显示 “admin” 和 “登出” 按钮,点击 “登出” 可退出登录。
2.3 后续扩展方向
- 支持用户注册:添加注册页面,将新用户信息存入 PostgreSQL 的
users
表; - 多角色权限:区分 “管理员”“普通用户”,限制普通用户删除会话 / 文件的权限;
- 第三方登录:集成 OAuth2(如 GitHub、企业微信登录)。
三、总结与下一章预告
本章成果
通过本章开发,我们的 RAG 系统实现了两个关键突破:
- 数据持久化:对话记录从 “临时存储” 升级为 “数据库持久化”,支持历史会话管理。
- 用户隔离:通过登录注册和 JWT 认证,实现了不同用户的对话数据隔离,保障数据安全。
下一章预告
当前系统仍有优化空间,下一章我们将聚焦「系统可用性提升」:
- 会话管理优化:支持会话重命名、删除会话。
- 消息加载优化:实现消息分页(避免大量消息加载缓慢)。
- 权限控制:添加管理员角色,支持文档权限分配(如某些文档仅特定用户可访问)。
如果你在开发过程中遇到任何问题,欢迎在评论区留言讨论!