RAG知识库构建
文章目录
- RAG知识库用什么来存储?
- 一个PDF文档从上传到插入到向量数据库,中间经历了什么
- 用户在提问时,选择 知识库检索,从用户提问到获取用户所要信息,中间经历了什么?结 合上述的PDF插入向量数据库为例,进行说明
- 详细步骤解析
- 总结
- 图片在向量数据库中单独占一个文本块吗?
- 上传完PDF后,存入向量数据库后,那原始的PDF文件还有用吗?
- 元数据(metadata)添加了访问文件路径,那元数据的原始PDF文档的id有什么作用?
- 在实际的企业知识库中,会有单独的元数据数据库吗?
- 知识库增量更新的实现方式
- 增量更新,不是说的只增不修改么,为啥会有文档级别更新和块级别更新,难道不是按全新的文档生成的向量,进行插入操作吗?
- 块级增量更新的完整实现方案
- A[新版本文档上传] --> B{首次上传?}; 这段代码未提供?具体怎么判断文档是否是首次上传?
- 不是说只修改一个字,计算出来的file_hash也不相同吗?
- 提供一个使用“业务键”来区分“新文档”和“文档新版本”的代码示例。
- 用户每次提问题,知识库内部都要进行全量查询,对吗?
- 那知识库索引又是什么和向量检索的关系?
- 近似最近邻搜索 (ANN)和向量索引算法什么关系?
- 向量索引HNSW是怎么构建的?请详细解释一下
- HNSW 索引和具体的向量数据之间的关系?
- 根据地图上的节点信息,直接去仓库里把对应的几个包裹(原始向量或其元数据)取出来给你。这句话是拿到向量的ID,去仓库里去取指定的向量数据是吗?
- HNSW的索引图是如何构建的?
RAG知识库用什么来存储?
RAG(检索增强生成)知识库的存储方案,主要取决于你的检索需求和数据规模。
简单来说,有三种主流选择:
- 向量数据库:专门为 RAG 设计,检索速度最快,效果最好。
- 关系型数据库:适合已有数据库且预算有限的场景。
- 云服务:开箱即用,运维简单,适合快速部署。
一个PDF文档从上传到插入到向量数据库,中间经历了什么
一个 PDF 从上传到存入向量数据库,中间经历了一个典型的数据处理流水线。这个过程可以概括为三大步:内容提取 → 文本分割 → 向量生成与入库。
整体流程
详细步骤解析
1. 上传与接收 (Upload & Reception)
- 前端:用户通过网页或应用界面选择并上传 PDF 文件。
- 后端:服务器接收文件,通常会:
- 进行初步校验(文件类型、大小、是否损坏)。
- 将文件保存在临时存储区。
- 为这个处理任务创建一个唯一的 ID,方便后续追踪。
2. 内容提取 (Content Extraction)
3. 文本分割 (Text Chunking)
4. 生成向量 (Embedding Generation)
5. 存入向量数据库 (Storage in Vector DB)
总结
用户在提问时,选择 知识库检索,从用户提问到获取用户所要信息,中间经历了什么?结 合上述的PDF插入向量数据库为例,进行说明
用户提问到获取答案的过程,就像一次精准的 “知识寻宝”。基于你之前了解的 PDF 入库流程,现在我们来看检索端是如何工作的。
这个过程可以概括为:问题向量化 → 向量数据库检索 → 结果重排 → 生成最终答案。
整体流程图
详细步骤解析
1. 用户提问 (User Query)
2. 问题向量化 (Query Embedding)
3. 向量数据库检索 (Vector Search)
4. 结果重排 (Reranking - 可选但推荐)
5. 生成最终答案 (Answer Generation)
总结
图片在向量数据库中单独占一个文本块吗?
上传完PDF后,存入向量数据库后,那原始的PDF文件还有用吗?
架构建议
一个典型的 RAG 系统文件处理流程如下
实践建议
元数据(metadata)添加了访问文件路径,那元数据的原始PDF文档的id有什么作用?
元数据中的 PDF 文档 ID 非常重要,它主要用于唯一标识和高效关联,是连接不同系统组件的 “钥匙”。
简单来说:文件路径负责 “去哪里找”,文档 ID 负责 “找哪个” 以及 “怎么关联”。
文档 ID 的四大核心作用
在实际的企业知识库中,会有单独的元数据数据库吗?
是的,在实际的企业知识库中,使用单独的元数据数据库是非常普遍和推荐的做法。
虽然在小型项目或原型中可以混合存储,但在企业级应用中,分离存储带来的好处远超其管理成本。
知识库增量更新的实现方式
- 文档级别更新:在导入知识文档时,识别出新增或更新的文档,然后对其进行全量解析与向量化,并做索引合并更新。这种方式相对简单,适用于文档整体变化较大的情况。
- 块级别更新:更加精细化,需要识别文档中哪些块是新增的、哪些块发生了更新、哪些块需要删除,以及哪些块没有变化可以跳过。通常借助于文档或块的 “指纹” 来实现,比如计算每个块的 hash 指纹,通过与上一次保存的处理信息对比,确定需要执行的动作。
增量更新,不是说的只增不修改么,为啥会有文档级别更新和块级别更新,难道不是按全新的文档生成的向量,进行插入操作吗?
块级增量更新的完整实现方案
graph TDA[新版本文档上传] --> B{首次上传?};B -- 是 --> C[文档分割成块];B -- 否 --> D[新旧版本文档分割成块];C --> E[计算每个块的哈希值];D --> F[分别计算新旧块的哈希值];E --> G[建立块索引表<br>(文档ID+块ID+哈希+向量ID)];F --> H[对比哈希值,识别变化类型<br>(新增/删除/修改/未变)];G --> I[为所有块生成向量];H --> J[仅为新增/修改块生成向量];I --> K[批量插入向量到向量数据库];J --> L[插入新向量 + 删除旧向量];K --> M[更新元数据数据库];L --> M;M --> N[更新块索引表];
A[新版本文档上传] --> B{首次上传?}; 这段代码未提供?具体怎么判断文档是否是首次上传?
判断文档是否为首次上传,核心是通过文件唯一标识(通常是文件哈希)与系统中已存储的记录进行比对。以下是具体实现方案,包含代码示例和流程说明:
判断文档是否首次上传的实现:
import hashlib
import uuid
from database import metadata_db # 假设的元数据数据库客户端class DocumentUploadChecker:def __init__(self):# 初始化数据库连接(实际项目中可能用ORM框架)self.db = metadata_dbdef calculate_file_hash(self, file_path):"""计算文件的整体哈希值,作为唯一标识"""sha256 = hashlib.sha256()with open(file_path, "rb") as f:# 分块读取大文件,避免内存溢出while chunk := f.read(4096):sha256.update(chunk)return sha256.hexdigest()def is_first_upload(self, file_path, file_name, user_id):"""判断是否为首次上传返回值:(is_first, document_id)- is_first: True/False 是否首次上传- document_id: 已存在的文档ID(首次上传则为None)"""# 1. 计算当前文件的唯一哈希file_hash = self.calculate_file_hash(file_path)# 2. 检查数据库中是否存在相同哈希的记录# 查document表,根据file_hash匹配existing_doc = self.db.query("""SELECT document_id FROM documents WHERE file_hash = %s AND is_deleted = 0""",params=[file_hash])if existing_doc:# 存在相同哈希的文档,不是首次上传return (False, existing_doc[0]["document_id"])else:# 不存在相同哈希,是首次上传return (True, None)def handle_upload(self, file_path, file_name, user_id):"""完整处理上传流程:判断是否首次上传 + 生成文档ID"""is_first, doc_id = self.is_first_upload(file_path, file_name, user_id)if not is_first:# 非首次上传:返回已有文档ID,后续走版本更新流程print(f"文档已存在,document_id: {doc_id}")return doc_idelse:# 首次上传:生成新文档ID并创建基础记录new_doc_id = str(uuid.uuid4())# 计算文件哈希(复用之前的结果或重新计算)file_hash = self.calculate_file_hash(file_path)# 写入documents表(基础元数据)self.db.execute("""INSERT INTO documents (document_id, file_name, file_hash, uploader_id, created_at)VALUES (%s, %s, %s, %s, NOW())""",params=[new_doc_id, file_name, file_hash, user_id])print(f"首次上传,新document_id: {new_doc_id}")return new_doc_id# 示例用法
if __name__ == "__main__":checker = DocumentUploadChecker()# 模拟上传一个文件doc_id = checker.handle_upload(file_path="/tmp/report_v2.pdf",file_name="年度财务报告.pdf",user_id="user_123")# 后续可根据doc_id判断走首次处理还是增量更新流程
不是说只修改一个字,计算出来的file_hash也不相同吗?
提供一个使用“业务键”来区分“新文档”和“文档新版本”的代码示例。
上述的生成的代码:document_version_manager.py
import uuid
import hashlib
from datetime import datetime
from database import Database # 假设的数据库连接类class DocumentVersionManager:def __init__(self):self.db = Database() # 初始化数据库连接self.chunk_manager = ChunkManager() # 块管理工具(后续用于增量更新)def calculate_file_hash(self, file_content):"""计算文件内容的哈希值,用于检测内容变化"""sha256 = hashlib.sha256()sha256.update(file_content)return sha256.hexdigest()def get_latest_version(self, document_id):"""获取文档的最新版本信息"""query = """SELECT * FROM document_versions WHERE document_id = %s ORDER BY uploaded_at DESC LIMIT 1"""return self.db.fetch_one(query, (document_id,))def is_new_document(self, document_id):"""判断文档ID是否为新文档(不存在于系统中)"""query = "SELECT 1 FROM documents WHERE document_id = %s"result = self.db.fetch_one(query, (document_id,))return result is Nonedef upload_document(self, document_id, file_content, file_name, user_id):"""处理文档上传(支持新文档和版本更新):param document_id: 业务键(客户端生成,首次上传为新UUID,更新时复用旧ID):param file_content: 文件二进制内容:param file_name: 文件名:param user_id: 上传者ID:return: 处理结果"""# 1. 计算当前文件的哈希值file_hash = self.calculate_file_hash(file_content)file_size = len(file_content)# 2. 检查是否为新文档if self.is_new_document(document_id):# 2.1 处理新文档return self._handle_new_document(document_id=document_id,file_content=file_content,file_name=file_name,file_hash=file_hash,file_size=file_size,user_id=user_id)else:# 2.2 处理文档更新(版本升级)return self._handle_document_update(document_id=document_id,file_content=file_content,file_name=file_name,file_hash=file_hash,file_size=file_size,user_id=user_id)def _handle_new_document(self, document_id, file_content, file_name, file_hash, file_size, user_id):"""处理首次上传的新文档"""# 生成版本IDversion_id = str(uuid.uuid4())# 保存文件到对象存储(实际项目中会用S3/MinIO等)file_path = self._save_to_storage(document_id=document_id,version_id=version_id,file_content=file_content)# 1. 创建文档根记录self.db.execute("""INSERT INTO documents (document_id, title, owner_id, created_at, updated_at)VALUES (%s, %s, %s, %s, %s)""", (document_id,file_name, # 初始标题使用文件名user_id,datetime.now(),datetime.now()))# 2. 创建首个版本记录self.db.execute("""INSERT INTO document_versions (version_id, document_id, file_name, file_hash, file_path, size_in_bytes, uploaded_by, uploaded_at)VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (version_id,document_id,file_name,file_hash,file_path,file_size,user_id,datetime.now()))# 3. 首次处理:全量分割并生成向量text_content = self._extract_text(file_content) # 从文件提取文本chunks = self.chunk_manager.split(text_content) # 分割为块self.chunk_manager.process_and_store_chunks(document_id=document_id,version_id=version_id,chunks=chunks)return {"status": "success","message": "新文档创建成功","document_id": document_id,"version_id": version_id,"is_new": True}def _handle_document_update(self, document_id, file_content, file_name, file_hash, file_size, user_id):"""处理文档更新(创建新版本)"""# 检查是否与最新版本内容完全相同latest_version = self.get_latest_version(document_id)if latest_version and latest_version['file_hash'] == file_hash:return {"status": "warning","message": "上传的文件与最新版本内容完全相同,未创建新版本","document_id": document_id,"version_id": latest_version['version_id'],"is_new": False}# 生成新版本IDversion_id = str(uuid.uuid4())# 保存新版本文件file_path = self._save_to_storage(document_id=document_id,version_id=version_id,file_content=file_content)# 1. 创建新版本记录self.db.execute("""INSERT INTO document_versions (version_id, document_id, file_name, file_hash, file_path, size_in_bytes, uploaded_by, uploaded_at)VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (version_id,document_id,file_name,file_hash,file_path,file_size,user_id,datetime.now()))# 2. 更新文档根记录的更新时间self.db.execute("""UPDATE documents SET updated_at = %s WHERE document_id = %s""", (datetime.now(), document_id))# 3. 增量更新:只处理变化的块old_text = self._extract_text(self._load_from_storage(latest_version['file_path']))new_text = self._extract_text(file_content)# 对比新旧文本,获取变化的块并更新向量库self.chunk_manager.incremental_update(document_id=document_id,old_version_id=latest_version['version_id'],new_version_id=version_id,old_text=old_text,new_text=new_text)return {"status": "success","message": "文档版本更新成功","document_id": document_id,"version_id": version_id,"is_new": False}# 以下为辅助方法(实际实现会更复杂)def _save_to_storage(self, document_id, version_id, file_content):"""保存文件到对象存储,返回存储路径"""# 示例路径格式:/documents/{document_id}/{version_id}.pdfpath = f"/documents/{document_id}/{version_id}.pdf"# 实际项目中会调用S3/MinIO的SDK保存文件return pathdef _load_from_storage(self, file_path):"""从存储加载文件内容"""# 实际项目中会调用存储服务的SDK读取文件return b"file_content"def _extract_text(self, file_content):"""从文件内容中提取文本(PDF/Word等)"""# 实际项目中会使用PyPDF2、textract等库提取文本return "extracted text content from file"# 块管理工具类(用于文本分割和增量更新)
class ChunkManager:def split(self, text, chunk_size=1000, overlap=100):"""将文本分割为语义块"""chunks = []for i in range(0, len(text), chunk_size - overlap):chunk = text[i:i + chunk_size]chunks.append(chunk)return chunksdef process_and_store_chunks(self, document_id, version_id, chunks):"""全量处理并存储块向量"""# 实际项目中会调用Embedding模型生成向量并存储到向量数据库for i, chunk in enumerate(chunks):chunk_id = str(uuid.uuid4())# 向量生成和存储逻辑...print(f"存储块: document_id={document_id}, version_id={version_id}, chunk_id={chunk_id}")def incremental_update(self, document_id, old_version_id, new_version_id, old_text, new_text):"""增量更新:只处理变化的块"""old_chunks = self.split(old_text)new_chunks = self.split(new_text)# 计算块哈希并对比差异(实际实现会更复杂)old_chunk_hashes = {hash(chunk): chunk for chunk in old_chunks}new_chunk_hashes = {hash(chunk): chunk for chunk in new_chunks}# 新增的块added_chunks = [v for k, v in new_chunk_hashes.items() if k not in old_chunk_hashes]# 删除的块removed_chunks = [v for k, v in old_chunk_hashes.items() if k not in new_chunk_hashes]# 处理新增块(生成向量并存储)for chunk in added_chunks:chunk_id = str(uuid.uuid4())# 向量生成和存储逻辑...print(f"新增块: document_id={document_id}, version_id={new_version_id}, chunk_id={chunk_id}")# 处理删除块(从向量数据库删除)for chunk in removed_chunks:# 查找块ID并删除向量...print(f"删除块: document_id={document_id}, version_id={old_version_id}")# 示例用法
if __name__ == "__main__":manager = DocumentVersionManager()# 1. 首次上传(客户端生成新的document_id)new_doc_id = str(uuid.uuid4())result = manager.upload_document(document_id=new_doc_id,file_content=b"这是第一版文档内容",file_name="报告.pdf",user_id="user_001")print(result)# 2. 再次上传(更新文档,使用相同的document_id)result = manager.upload_document(document_id=new_doc_id, # 复用同一个业务键file_content=b"这是第二版文档内容(修改了一个字)",file_name="报告_v2.pdf",user_id="user_001")print(result)
用户每次提问题,知识库内部都要进行全量查询,对吗?
代码示例:向量检索流程
下面是一个简化的 Python 代码示例,展示了使用 langchain 库进行向量检索的核心流程:
# 1. 准备环境和依赖
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import CharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings# 2. 加载并分割文档 (这是构建知识库的步骤)
loader = TextLoader("your_knowledge_base.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)# 3. 创建向量存储 (这是构建知识库的步骤)
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
db = FAISS.from_documents(docs, embeddings)# 4. 用户提问 (这是问答的步骤)
query = "如何判断文档是否是首次上传?"# 5. 进行向量检索 (而不是全量查询)
# 这一步会快速找出最相似的2个文档片段
retriever = db.as_retriever(search_kwargs={"k": 2})
relevant_docs = retriever.get_relevant_documents(query)# 6. 输出检索结果
print(f"为问题 `{query}` 找到了以下相关文档:\n")
for i, doc in enumerate(relevant_docs):print(f"--- 相关文档 {i+1} ---")print(f"内容: {doc.page_content[:200]}...") # 打印前200个字符print(f"来源: {doc.metadata}\n")
那知识库索引又是什么和向量检索的关系?
近似最近邻搜索 (ANN)和向量索引算法什么关系?
这是一个非常好的问题,它触及了向量检索技术的核心。简单来说:
近似最近邻搜索 (ANN) 是一个目标,而向量索引算法是实现这个目标的手段。
它们是目标与手段的关系。
向量索引HNSW是怎么构建的?请详细解释一下
HNSW(Hierarchical Navigable Small Worlds,分层可导航小世界)是目前性能最优的向量索引算法之一,它通过构建多层 “导航图” 实现高效的近似最近邻搜索。其核心思想借鉴了现实世界的 “小世界网络” 特性 —— 大多数节点可以通过少量中间节点连接。