当前位置: 首页 > news >正文

RAG知识库构建

文章目录

  • RAG知识库用什么来存储?
  • 一个PDF文档从上传到插入到向量数据库,中间经历了什么
  • 用户在提问时,选择 知识库检索,从用户提问到获取用户所要信息,中间经历了什么?结 合上述的PDF插入向量数据库为例,进行说明
  • 详细步骤解析
  • 总结
  • 图片在向量数据库中单独占一个文本块吗?
  • 上传完PDF后,存入向量数据库后,那原始的PDF文件还有用吗?
  • 元数据(metadata)添加了访问文件路径,那元数据的原始PDF文档的id有什么作用?
  • 在实际的企业知识库中,会有单独的元数据数据库吗?
  • 知识库增量更新的实现方式
  • 增量更新,不是说的只增不修改么,为啥会有文档级别更新和块级别更新,难道不是按全新的文档生成的向量,进行插入操作吗?
  • 块级增量更新的完整实现方案
  • A[新版本文档上传] --> B{首次上传?}; 这段代码未提供?具体怎么判断文档是否是首次上传?
  • 不是说只修改一个字,计算出来的file_hash也不相同吗?
  • 提供一个使用“业务键”来区分“新文档”和“文档新版本”的代码示例。
  • 用户每次提问题,知识库内部都要进行全量查询,对吗?
  • 那知识库索引又是什么和向量检索的关系?
  • 近似最近邻搜索 (ANN)和向量索引算法什么关系?
  • 向量索引HNSW是怎么构建的?请详细解释一下
  • HNSW 索引和具体的向量数据之间的关系?
  • 根据地图上的节点信息,直接去仓库里把对应的几个包裹(原始向量或其元数据)取出来给你。这句话是拿到向量的ID,去仓库里去取指定的向量数据是吗?
  • HNSW的索引图是如何构建的?

RAG知识库用什么来存储?

RAG(检索增强生成)知识库的存储方案,主要取决于你的检索需求数据规模
简单来说,有三种主流选择:

  1. 向量数据库:专门为 RAG 设计,检索速度最快,效果最好。
  2. 关系型数据库:适合已有数据库且预算有限的场景。
  3. 云服务:开箱即用,运维简单,适合快速部署。
    在这里插入图片描述

一个PDF文档从上传到插入到向量数据库,中间经历了什么

一个 PDF 从上传到存入向量数据库,中间经历了一个典型的数据处理流水线。这个过程可以概括为三大步:内容提取 → 文本分割 → 向量生成与入库
整体流程
在这里插入图片描述

详细步骤解析
1. 上传与接收 (Upload & Reception)

  • 前端:用户通过网页或应用界面选择并上传 PDF 文件。
  • 后端:服务器接收文件,通常会:
    • 进行初步校验(文件类型、大小、是否损坏)。
    • 将文件保存在临时存储区。
    • 为这个处理任务创建一个唯一的 ID,方便后续追踪。

2. 内容提取 (Content Extraction)
在这里插入图片描述
3. 文本分割 (Text Chunking)
在这里插入图片描述
4. 生成向量 (Embedding Generation)
在这里插入图片描述
5. 存入向量数据库 (Storage in Vector DB)
在这里插入图片描述
总结
在这里插入图片描述

用户在提问时,选择 知识库检索,从用户提问到获取用户所要信息,中间经历了什么?结 合上述的PDF插入向量数据库为例,进行说明

用户提问到获取答案的过程,就像一次精准的 “知识寻宝”。基于你之前了解的 PDF 入库流程,现在我们来看检索端是如何工作的。
这个过程可以概括为:问题向量化 → 向量数据库检索 → 结果重排 → 生成最终答案
整体流程图
在这里插入图片描述

详细步骤解析

1. 用户提问 (User Query)
在这里插入图片描述
2. 问题向量化 (Query Embedding)
在这里插入图片描述
3. 向量数据库检索 (Vector Search)

在这里插入图片描述
4. 结果重排 (Reranking - 可选但推荐)
在这里插入图片描述
5. 生成最终答案 (Answer Generation)
在这里插入图片描述

总结

在这里插入图片描述

图片在向量数据库中单独占一个文本块吗?

在这里插入图片描述

上传完PDF后,存入向量数据库后,那原始的PDF文件还有用吗?

在这里插入图片描述
架构建议
一个典型的 RAG 系统文件处理流程如下
在这里插入图片描述
实践建议
在这里插入图片描述

元数据(metadata)添加了访问文件路径,那元数据的原始PDF文档的id有什么作用?

元数据中的 PDF 文档 ID 非常重要,它主要用于唯一标识高效关联,是连接不同系统组件的 “钥匙”。
简单来说:文件路径负责 “去哪里找”,文档 ID 负责 “找哪个” 以及 “怎么关联”
文档 ID 的四大核心作用
在这里插入图片描述

在实际的企业知识库中,会有单独的元数据数据库吗?

是的,在实际的企业知识库中,使用单独的元数据数据库是非常普遍和推荐的做法
虽然在小型项目或原型中可以混合存储,但在企业级应用中,分离存储带来的好处远超其管理成本。
在这里插入图片描述

知识库增量更新的实现方式

  • 文档级别更新:在导入知识文档时,识别出新增或更新的文档,然后对其进行全量解析与向量化,并做索引合并更新。这种方式相对简单,适用于文档整体变化较大的情况。
  • 块级别更新:更加精细化,需要识别文档中哪些块是新增的、哪些块发生了更新、哪些块需要删除,以及哪些块没有变化可以跳过。通常借助于文档或块的 “指纹” 来实现,比如计算每个块的 hash 指纹,通过与上一次保存的处理信息对比,确定需要执行的动作。

增量更新,不是说的只增不修改么,为啥会有文档级别更新和块级别更新,难道不是按全新的文档生成的向量,进行插入操作吗?

在这里插入图片描述

块级增量更新的完整实现方案

graph TDA[新版本文档上传] --> B{首次上传?};B -- 是 --> C[文档分割成块];B -- 否 --> D[新旧版本文档分割成块];C --> E[计算每个块的哈希值];D --> F[分别计算新旧块的哈希值];E --> G[建立块索引表<br>(文档ID+块ID+哈希+向量ID)];F --> H[对比哈希值,识别变化类型<br>(新增/删除/修改/未变)];G --> I[为所有块生成向量];H --> J[仅为新增/修改块生成向量];I --> K[批量插入向量到向量数据库];J --> L[插入新向量 + 删除旧向量];K --> M[更新元数据数据库];L --> M;M --> N[更新块索引表];

在这里插入图片描述

A[新版本文档上传] --> B{首次上传?}; 这段代码未提供?具体怎么判断文档是否是首次上传?

判断文档是否为首次上传,核心是通过文件唯一标识(通常是文件哈希)与系统中已存储的记录进行比对。以下是具体实现方案,包含代码示例和流程说明:
判断文档是否首次上传的实现:

import hashlib
import uuid
from database import metadata_db  # 假设的元数据数据库客户端class DocumentUploadChecker:def __init__(self):# 初始化数据库连接(实际项目中可能用ORM框架)self.db = metadata_dbdef calculate_file_hash(self, file_path):"""计算文件的整体哈希值,作为唯一标识"""sha256 = hashlib.sha256()with open(file_path, "rb") as f:# 分块读取大文件,避免内存溢出while chunk := f.read(4096):sha256.update(chunk)return sha256.hexdigest()def is_first_upload(self, file_path, file_name, user_id):"""判断是否为首次上传返回值:(is_first, document_id)- is_first: True/False 是否首次上传- document_id: 已存在的文档ID(首次上传则为None)"""# 1. 计算当前文件的唯一哈希file_hash = self.calculate_file_hash(file_path)# 2. 检查数据库中是否存在相同哈希的记录# 查document表,根据file_hash匹配existing_doc = self.db.query("""SELECT document_id FROM documents WHERE file_hash = %s AND is_deleted = 0""",params=[file_hash])if existing_doc:# 存在相同哈希的文档,不是首次上传return (False, existing_doc[0]["document_id"])else:# 不存在相同哈希,是首次上传return (True, None)def handle_upload(self, file_path, file_name, user_id):"""完整处理上传流程:判断是否首次上传 + 生成文档ID"""is_first, doc_id = self.is_first_upload(file_path, file_name, user_id)if not is_first:# 非首次上传:返回已有文档ID,后续走版本更新流程print(f"文档已存在,document_id: {doc_id}")return doc_idelse:# 首次上传:生成新文档ID并创建基础记录new_doc_id = str(uuid.uuid4())# 计算文件哈希(复用之前的结果或重新计算)file_hash = self.calculate_file_hash(file_path)# 写入documents表(基础元数据)self.db.execute("""INSERT INTO documents (document_id, file_name, file_hash, uploader_id, created_at)VALUES (%s, %s, %s, %s, NOW())""",params=[new_doc_id, file_name, file_hash, user_id])print(f"首次上传,新document_id: {new_doc_id}")return new_doc_id# 示例用法
if __name__ == "__main__":checker = DocumentUploadChecker()# 模拟上传一个文件doc_id = checker.handle_upload(file_path="/tmp/report_v2.pdf",file_name="年度财务报告.pdf",user_id="user_123")# 后续可根据doc_id判断走首次处理还是增量更新流程

不是说只修改一个字,计算出来的file_hash也不相同吗?

在这里插入图片描述

提供一个使用“业务键”来区分“新文档”和“文档新版本”的代码示例。

在这里插入图片描述
上述的生成的代码:document_version_manager.py

import uuid
import hashlib
from datetime import datetime
from database import Database  # 假设的数据库连接类class DocumentVersionManager:def __init__(self):self.db = Database()  # 初始化数据库连接self.chunk_manager = ChunkManager()  # 块管理工具(后续用于增量更新)def calculate_file_hash(self, file_content):"""计算文件内容的哈希值,用于检测内容变化"""sha256 = hashlib.sha256()sha256.update(file_content)return sha256.hexdigest()def get_latest_version(self, document_id):"""获取文档的最新版本信息"""query = """SELECT * FROM document_versions WHERE document_id = %s ORDER BY uploaded_at DESC LIMIT 1"""return self.db.fetch_one(query, (document_id,))def is_new_document(self, document_id):"""判断文档ID是否为新文档(不存在于系统中)"""query = "SELECT 1 FROM documents WHERE document_id = %s"result = self.db.fetch_one(query, (document_id,))return result is Nonedef upload_document(self, document_id, file_content, file_name, user_id):"""处理文档上传(支持新文档和版本更新):param document_id: 业务键(客户端生成,首次上传为新UUID,更新时复用旧ID):param file_content: 文件二进制内容:param file_name: 文件名:param user_id: 上传者ID:return: 处理结果"""# 1. 计算当前文件的哈希值file_hash = self.calculate_file_hash(file_content)file_size = len(file_content)# 2. 检查是否为新文档if self.is_new_document(document_id):# 2.1 处理新文档return self._handle_new_document(document_id=document_id,file_content=file_content,file_name=file_name,file_hash=file_hash,file_size=file_size,user_id=user_id)else:# 2.2 处理文档更新(版本升级)return self._handle_document_update(document_id=document_id,file_content=file_content,file_name=file_name,file_hash=file_hash,file_size=file_size,user_id=user_id)def _handle_new_document(self, document_id, file_content, file_name, file_hash, file_size, user_id):"""处理首次上传的新文档"""# 生成版本IDversion_id = str(uuid.uuid4())# 保存文件到对象存储(实际项目中会用S3/MinIO等)file_path = self._save_to_storage(document_id=document_id,version_id=version_id,file_content=file_content)# 1. 创建文档根记录self.db.execute("""INSERT INTO documents (document_id, title, owner_id, created_at, updated_at)VALUES (%s, %s, %s, %s, %s)""", (document_id,file_name,  # 初始标题使用文件名user_id,datetime.now(),datetime.now()))# 2. 创建首个版本记录self.db.execute("""INSERT INTO document_versions (version_id, document_id, file_name, file_hash, file_path, size_in_bytes, uploaded_by, uploaded_at)VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (version_id,document_id,file_name,file_hash,file_path,file_size,user_id,datetime.now()))# 3. 首次处理:全量分割并生成向量text_content = self._extract_text(file_content)  # 从文件提取文本chunks = self.chunk_manager.split(text_content)  # 分割为块self.chunk_manager.process_and_store_chunks(document_id=document_id,version_id=version_id,chunks=chunks)return {"status": "success","message": "新文档创建成功","document_id": document_id,"version_id": version_id,"is_new": True}def _handle_document_update(self, document_id, file_content, file_name, file_hash, file_size, user_id):"""处理文档更新(创建新版本)"""# 检查是否与最新版本内容完全相同latest_version = self.get_latest_version(document_id)if latest_version and latest_version['file_hash'] == file_hash:return {"status": "warning","message": "上传的文件与最新版本内容完全相同,未创建新版本","document_id": document_id,"version_id": latest_version['version_id'],"is_new": False}# 生成新版本IDversion_id = str(uuid.uuid4())# 保存新版本文件file_path = self._save_to_storage(document_id=document_id,version_id=version_id,file_content=file_content)# 1. 创建新版本记录self.db.execute("""INSERT INTO document_versions (version_id, document_id, file_name, file_hash, file_path, size_in_bytes, uploaded_by, uploaded_at)VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (version_id,document_id,file_name,file_hash,file_path,file_size,user_id,datetime.now()))# 2. 更新文档根记录的更新时间self.db.execute("""UPDATE documents SET updated_at = %s WHERE document_id = %s""", (datetime.now(), document_id))# 3. 增量更新:只处理变化的块old_text = self._extract_text(self._load_from_storage(latest_version['file_path']))new_text = self._extract_text(file_content)# 对比新旧文本,获取变化的块并更新向量库self.chunk_manager.incremental_update(document_id=document_id,old_version_id=latest_version['version_id'],new_version_id=version_id,old_text=old_text,new_text=new_text)return {"status": "success","message": "文档版本更新成功","document_id": document_id,"version_id": version_id,"is_new": False}# 以下为辅助方法(实际实现会更复杂)def _save_to_storage(self, document_id, version_id, file_content):"""保存文件到对象存储,返回存储路径"""# 示例路径格式:/documents/{document_id}/{version_id}.pdfpath = f"/documents/{document_id}/{version_id}.pdf"# 实际项目中会调用S3/MinIO的SDK保存文件return pathdef _load_from_storage(self, file_path):"""从存储加载文件内容"""# 实际项目中会调用存储服务的SDK读取文件return b"file_content"def _extract_text(self, file_content):"""从文件内容中提取文本(PDF/Word等)"""# 实际项目中会使用PyPDF2、textract等库提取文本return "extracted text content from file"# 块管理工具类(用于文本分割和增量更新)
class ChunkManager:def split(self, text, chunk_size=1000, overlap=100):"""将文本分割为语义块"""chunks = []for i in range(0, len(text), chunk_size - overlap):chunk = text[i:i + chunk_size]chunks.append(chunk)return chunksdef process_and_store_chunks(self, document_id, version_id, chunks):"""全量处理并存储块向量"""# 实际项目中会调用Embedding模型生成向量并存储到向量数据库for i, chunk in enumerate(chunks):chunk_id = str(uuid.uuid4())# 向量生成和存储逻辑...print(f"存储块: document_id={document_id}, version_id={version_id}, chunk_id={chunk_id}")def incremental_update(self, document_id, old_version_id, new_version_id, old_text, new_text):"""增量更新:只处理变化的块"""old_chunks = self.split(old_text)new_chunks = self.split(new_text)# 计算块哈希并对比差异(实际实现会更复杂)old_chunk_hashes = {hash(chunk): chunk for chunk in old_chunks}new_chunk_hashes = {hash(chunk): chunk for chunk in new_chunks}# 新增的块added_chunks = [v for k, v in new_chunk_hashes.items() if k not in old_chunk_hashes]# 删除的块removed_chunks = [v for k, v in old_chunk_hashes.items() if k not in new_chunk_hashes]# 处理新增块(生成向量并存储)for chunk in added_chunks:chunk_id = str(uuid.uuid4())# 向量生成和存储逻辑...print(f"新增块: document_id={document_id}, version_id={new_version_id}, chunk_id={chunk_id}")# 处理删除块(从向量数据库删除)for chunk in removed_chunks:# 查找块ID并删除向量...print(f"删除块: document_id={document_id}, version_id={old_version_id}")# 示例用法
if __name__ == "__main__":manager = DocumentVersionManager()# 1. 首次上传(客户端生成新的document_id)new_doc_id = str(uuid.uuid4())result = manager.upload_document(document_id=new_doc_id,file_content=b"这是第一版文档内容",file_name="报告.pdf",user_id="user_001")print(result)# 2. 再次上传(更新文档,使用相同的document_id)result = manager.upload_document(document_id=new_doc_id,  # 复用同一个业务键file_content=b"这是第二版文档内容(修改了一个字)",file_name="报告_v2.pdf",user_id="user_001")print(result)  

用户每次提问题,知识库内部都要进行全量查询,对吗?

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
代码示例:向量检索流程
下面是一个简化的 Python 代码示例,展示了使用 langchain 库进行向量检索的核心流程:

# 1. 准备环境和依赖
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import CharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings# 2. 加载并分割文档 (这是构建知识库的步骤)
loader = TextLoader("your_knowledge_base.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)# 3. 创建向量存储 (这是构建知识库的步骤)
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
db = FAISS.from_documents(docs, embeddings)# 4. 用户提问 (这是问答的步骤)
query = "如何判断文档是否是首次上传?"# 5. 进行向量检索 (而不是全量查询)
# 这一步会快速找出最相似的2个文档片段
retriever = db.as_retriever(search_kwargs={"k": 2})
relevant_docs = retriever.get_relevant_documents(query)# 6. 输出检索结果
print(f"为问题 `{query}` 找到了以下相关文档:\n")
for i, doc in enumerate(relevant_docs):print(f"--- 相关文档 {i+1} ---")print(f"内容: {doc.page_content[:200]}...") # 打印前200个字符print(f"来源: {doc.metadata}\n")

在这里插入图片描述

那知识库索引又是什么和向量检索的关系?

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

近似最近邻搜索 (ANN)和向量索引算法什么关系?

这是一个非常好的问题,它触及了向量检索技术的核心。简单来说:
近似最近邻搜索 (ANN) 是一个目标,而向量索引算法是实现这个目标的手段。
它们是目标与手段的关系。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

向量索引HNSW是怎么构建的?请详细解释一下

HNSW(Hierarchical Navigable Small Worlds,分层可导航小世界)是目前性能最优的向量索引算法之一,它通过构建多层 “导航图” 实现高效的近似最近邻搜索。其核心思想借鉴了现实世界的 “小世界网络” 特性 —— 大多数节点可以通过少量中间节点连接。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

HNSW 索引和具体的向量数据之间的关系?

在这里插入图片描述

根据地图上的节点信息,直接去仓库里把对应的几个包裹(原始向量或其元数据)取出来给你。这句话是拿到向量的ID,去仓库里去取指定的向量数据是吗?

在这里插入图片描述

HNSW的索引图是如何构建的?

在这里插入图片描述

http://www.dtcms.com/a/398027.html

相关文章:

  • MinMaxScaler Scikit-learn sparkml 稀疏向量
  • 基于WPF实现打印机连接与打印功能
  • 储能材料:弹性势能储能及材料技术突破
  • RapidJSON 自定义内存分配器详解与实战
  • 深度学习-PyTorch 模型
  • WPF依赖属性学习
  • 云原生-高级阶段-利用rsync备份全网服务器数据
  • wordpress建购物网站抚顺营销型网站建设
  • Pythoner 的Flask项目实践-添加Shapefile面数据并展示功能Mapboxgl底图
  • Flutter混合Android开发Release 打包失败GeneratedPluginRegistrant.java,Plugin不存在
  • docker 安装TDengine 并创建新用户
  • 网站推广实施方案珠海网站制作软件
  • 为世界添彩 - WebGL 中的颜色与着色器变量
  • 初识MYSQL —— mysql的安装
  • c回顾 01
  • 【LeetCode 每日一题】3484. 设计电子表格——(解法一)二维数组
  • python+django/flask+springboot实践性教学系统 实训任务发布 学生作业提交 教师评阅管理系统
  • 洞悉未来,智驭不确定性:蒙特卡洛模拟决策模型实践
  • 长宁哪里有做网站优化比较好利润在100万到300万之间税率2021
  • 沈阳网站设计外包广西建设网官网桂建云
  • vscode 插件怎么实现编辑器行号处添加图标标记
  • Git 从零到一:以 Gitee 为例的实战与可视化指南
  • React 标准 SPA 项目 入门学习记录
  • HAProxy 完整指南:简介、负载均衡原理与安装配置
  • 领码课堂 | React 核心组件与高级性能优化全实战指南
  • 涡轮丝杆升降机的丝杆材质有哪些?
  • 前端笔记:vue中 Map、Set之间的使用和区别
  • 中美关系最新消息视频重庆seo优化公司
  • 【Cesium 开发实战教程】第六篇:三维模型高级交互:点击查询、材质修改与动画控制
  • 英雄联盟视频网站源码做产品设计之前怎么查资料国外网站