RAG系统向量化存储技术深度解析:双索引架构与批量处理实践
代码在最后
前言
在构建企业级RAG(检索增强生成)系统中,向量化存储是将非结构化文本转换为可检索知识的关键环节。本文基于RAG Challenge竞赛获奖方案中的ingestion.py
模块,深入分析双索引架构(FAISS向量索引 + BM25关键词索引)、批量embedding生成、错误处理机制等核心技术,为企业知识库构建提供完整的向量化存储实践指南。
1. 向量化存储技术概述
1.1 为什么需要向量化存储?
在RAG系统中,向量化存储是连接文本分块和智能检索的桥梁:
语义检索:将文本转换为向量,支持语义相似度计算
快速检索:通过向量索引实现毫秒级检索
可扩展性:支持大规模文档的存储和检索
混合检索:结合向量检索和关键词检索的优势
1.2 双索引架构设计
现代RAG系统通常采用双索引架构:
向量索引(FAISS):基于embedding的语义检索
关键词索引(BM25):基于关键词的精确匹配
这种设计结合了两种检索方式的优势,提升整体检索效果。
2. 模块架构分析
2.1 核心依赖
import os import json import pickle from typing import List, Union from pathlib import Path from tqdm import tqdm from dotenv import load_dotenv from openai import OpenAI from rank_bm25 import BM25Okapi import faiss import numpy as np from tenacity import retry, wait_fixed, stop_after_attempt import dashscope from dashscope import TextEmbedding
技术栈特点:
FAISS:Facebook开源的向量相似度搜索库
BM25:经典的关键词检索算法
DashScope:阿里云的embedding服务
Tenacity:重试机制库
tqdm:进度条显示
2.2 双索引架构
# BM25Ingestor:BM25索引构建与保存工具 class BM25Ingestor:"""传统关键词检索索引构建器""" # VectorDBIngestor:向量库构建与保存工具 class VectorDBIngestor:"""现代语义向量检索索引构建器"""
架构优势:
互补性:向量检索和关键词检索相互补充
灵活性:可根据场景选择不同的检索策略
可扩展性:支持大规模文档处理
容错性:单一索引失败不影响整体系统
3. BM25Ingestor类深度解析
3.1 类设计理念
class BM25Ingestor:def __init__(self):pass
设计特点:
轻量级:无需复杂初始化
专注性:专门处理BM25索引构建
独立性:可独立使用,不依赖其他组件
3.2 BM25索引创建
def create_bm25_index(self, chunks: List[str]) -> BM25Okapi:"""从文本块列表创建BM25索引"""tokenized_chunks = [chunk.split() for chunk in chunks]return BM25Okapi(tokenized_chunks)
技术原理:
分词处理:将文本块分割为单词列表
BM25算法:计算词频和文档频率
索引构建:创建可快速检索的索引结构
BM25优势:
精确匹配:对关键词匹配效果优秀
计算效率:检索速度快
内存友好:索引占用内存相对较小
3.3 批量报告处理
def process_reports(self, all_reports_dir: Path, output_dir: Path):"""批量处理所有报告,生成并保存BM25索引。参数:all_reports_dir (Path): 存放JSON报告的目录output_dir (Path): 保存BM25索引的目录"""output_dir.mkdir(parents=True, exist_ok=True)all_report_paths = list(all_reports_dir.glob("*.json")) for report_path in tqdm(all_report_paths, desc="Processing reports for BM25"):# 加载报告with open(report_path, 'r', encoding='utf-8') as f:report_data = json.load(f)# 提取文本块并创建BM25索引text_chunks = [chunk['text'] for chunk in report_data['content']['chunks']]bm25_index = self.create_bm25_index(text_chunks)# 保存BM25索引,文件名用sha1_namesha1_name = report_data["metainfo"]["sha1_name"]output_file = output_dir / f"{sha1_name}.pkl"with open(output_file, 'wb') as f:pickle.dump(bm25_index, f)print(f"Processed {len(all_report_paths)} reports")
处理流程:
文件发现:扫描目录中的所有JSON报告文件
内容提取:从报告中提取文本块
索引构建:为每个报告创建BM25索引
索引保存:使用pickle保存索引到文件
进度跟踪:使用tqdm显示处理进度
设计亮点:
按报告分索引:每个报告独立的BM25索引
SHA1命名:使用文档的SHA1值作为文件名
进度可视化:实时显示处理进度
错误隔离:单个报告失败不影响其他报告
4. VectorDBIngestor类深度解析
4.1 初始化与配置
class VectorDBIngestor:def __init__(self):# 初始化DashScope API Keydashscope.api_key = os.getenv("DASHSCOPE_API_KEY")
配置特点:
环境变量:从环境变量读取API密钥
服务集成:集成阿里云DashScope服务
安全性:API密钥不硬编码在代码中
4.2 Embedding生成机制
@retry(wait=wait_fixed(20), stop=stop_after_attempt(2)) def _get_embeddings(self, text: Union[str, List[str]], model: str = "text-embedding-v1") -> List[float]:"""获取文本或文本块的嵌入向量,支持重试(使用阿里云DashScope,分批处理)"""if isinstance(text, str) and not text.strip():raise ValueError("Input text cannot be an empty string.")# 保证 input 为一维字符串列表或单个字符串if isinstance(text, list):text_chunks = textelse:text_chunks = [text] # 类型检查,确保每一项都是字符串if not all(isinstance(x, str) for x in text_chunks):raise ValueError("所有待嵌入文本必须为字符串类型!实际类型: {}".format([type(x) for x in text_chunks])) # 过滤空字符串text_chunks = [x for x in text_chunks if x.strip()]if not text_chunks:raise ValueError("所有待嵌入文本均为空字符串!")
输入验证机制:
空字符串检查:防止处理空文本
类型检查:确保所有输入都是字符串
数据清理:过滤空字符串
格式统一:统一处理单个字符串和字符串列表
4.3 批量处理与错误处理
embeddings = [] MAX_BATCH_SIZE = 25 LOG_FILE = 'embedding_error.log' for i in range(0, len(text_chunks), MAX_BATCH_SIZE):batch = text_chunks[i:i+MAX_BATCH_SIZE]resp = TextEmbedding.call(model=TextEmbedding.Models.text_embedding_v1,input=batch)
批量处理策略:
分批处理:每批处理25个文本块
API优化:减少API调用次数
内存控制:避免一次性处理过多文本
错误隔离:单批失败不影响其他批次
4.4 响应处理与错误处理
# 兼容单条和多条输入 if 'output' in resp and 'embeddings' in resp['output']:print('11111111')for emb in resp['output']['embeddings']:if emb['embedding'] is None or len(emb['embedding']) == 0:error_text = batch[emb.text_index] if hasattr(emb, 'text_index') else Nonewith open(LOG_FILE, 'a', encoding='utf-8') as f:f.write(f"DashScope返回的embedding为空,text_index={getattr(emb, 'text_index', None)},文本内容如下:\n{error_text}\n{'-'*60}\n")raise RuntimeError(f"DashScope返回的embedding为空,text_index={getattr(emb, 'text_index', None)},文本内容已写入 {LOG_FILE}")embeddings.append(emb['embedding']) elif 'output' in resp and 'embedding' in resp['output']:if resp['output']['embedding'] is None or len(resp['output']['embedding']) == 0:print('22222222')with open(LOG_FILE, 'a', encoding='utf-8') as f:f.write("DashScope返回的embedding为空,文本内容如下:\n{}\n{}\n".format(batch[0] if batch else None, '-'*60))raise RuntimeError("DashScope返回的embedding为空,文本内容已写入 {}".format(LOG_FILE))embeddings.append(resp.output.embedding) else:print('33333333')raise RuntimeError(f"DashScope embedding API返回格式异常: {resp}")
错误处理机制:
格式兼容:处理不同的API响应格式
空值检查:检查embedding是否为空
详细日志:记录错误信息和相关文本
异常抛出:及时抛出异常避免数据污染
4.5 FAISS向量索引构建
def _create_vector_db(self, embeddings: List[float]):"""用faiss构建向量库,采用内积(余弦距离)"""embeddings_array = np.array(embeddings, dtype=np.float32)dimension = len(embeddings[0])index = faiss.IndexFlatIP(dimension) # Cosine distanceindex.add(embeddings_array)return index
FAISS配置:
数据类型:使用float32提升性能
索引类型:IndexFlatIP支持内积相似度
余弦距离:适合文本相似度计算
内存索引:适合中小规模数据集
索引类型选择:
IndexFlatIP:精确搜索,适合小数据集
IndexIVFFlat:近似搜索,适合大数据集
IndexHNSW:图索引,平衡精度和速度
4.6 单报告处理
def _process_report(self, report: dict):"""针对单份报告,提取文本块并生成向量库"""text_chunks = [chunk['text'] for chunk in report['content']['chunks']]embeddings = self._get_embeddings(text_chunks)index = self._create_vector_db(embeddings)return index
处理流程:
文本提取:从报告中提取所有文本块
向量生成:调用embedding API生成向量
索引构建:使用FAISS构建向量索引
索引返回:返回构建好的索引对象
4.7 批量报告处理
def process_reports(self, all_reports_dir: Path, output_dir: Path):"""批量处理所有报告,生成并保存faiss向量库"""all_report_paths = list(all_reports_dir.glob("*.json"))output_dir.mkdir(parents=True, exist_ok=True)for report_path in tqdm(all_report_paths, desc="Processing reports"):with open(report_path, 'r', encoding='utf-8') as file:report_data = json.load(file)index = self._process_report(report_data)sha1_name = report_data["metainfo"]["sha1_name"]faiss_file_path = output_dir / f"{sha1_name}.faiss"faiss.write_index(index, str(faiss_file_path))print(f"Processed {len(all_report_paths)} reports")
批量处理特点:
文件发现:自动发现所有JSON报告文件
独立处理:每个报告独立处理,互不影响
索引保存:使用FAISS原生格式保存索引
进度跟踪:实时显示处理进度
错误隔离:单个报告失败不影响整体处理
5. 重试机制与容错设计
5.1 Tenacity重试机制
@retry(wait=wait_fixed(20), stop=stop_after_attempt(2)) def _get_embeddings(self, text: Union[str, List[str]], model: str = "text-embedding-v1") -> List[float]:
重试策略:
等待时间:固定等待20秒
重试次数:最多重试2次
适用场景:网络波动、API限流等临时性错误
5.2 错误日志记录
LOG_FILE = 'embedding_error.log' # 记录错误信息到日志文件 with open(LOG_FILE, 'a', encoding='utf-8') as f:f.write(f"DashScope返回的embedding为空,text_index={getattr(emb, 'text_index', None)},文本内容如下:\n{error_text}\n{'-'*60}\n")
日志记录特点:
详细信息:记录错误类型、索引、文本内容
追加模式:不覆盖历史日志
UTF-8编码:支持中文字符
结构化格式:便于后续分析
6. 性能优化策略
6.1 批量处理优化
MAX_BATCH_SIZE = 25 for i in range(0, len(text_chunks), MAX_BATCH_SIZE):batch = text_chunks[i:i+MAX_BATCH_SIZE]
优化策略:
批次大小:25个文本块为一批,平衡效率和内存
API调用:减少API调用次数
内存控制:避免一次性加载过多数据
并发控制:避免API限流
6.2 数据类型优化
embeddings_array = np.array(embeddings, dtype=np.float32)
优化效果:
内存节省:float32比float64节省一半内存
计算加速:现代GPU对float32优化更好
精度保持:float32精度足够embedding使用
6.3 索引类型选择
index = faiss.IndexFlatIP(dimension) # Cosine distance
选择理由:
精确搜索:保证检索精度
简单可靠:无需训练,直接可用
内存友好:适合中小规模数据
7. 企业应用实践
7.1 大规模数据处理
挑战:
海量文档的向量化处理
API调用成本和限流
内存和存储资源管理
解决方案:
class ScalableVectorDBIngestor(VectorDBIngestor):def __init__(self, max_batch_size=50, max_concurrent_requests=5):super().__init__()self.max_batch_size = max_batch_sizeself.max_concurrent_requests = max_concurrent_requestsdef process_large_dataset(self, all_reports_dir: Path, output_dir: Path):"""处理大规模数据集的优化版本"""# 实现分片处理、并发控制等优化策略pass
7.2 多模型支持
class MultiModelVectorDBIngestor(VectorDBIngestor):def __init__(self, embedding_provider="dashscope"):super().__init__()self.embedding_provider = embedding_providerdef _get_embeddings(self, text: Union[str, List[str]], model: str = None):"""支持多种embedding提供商"""if self.embedding_provider == "dashscope":return self._get_dashscope_embeddings(text, model)elif self.embedding_provider == "openai":return self._get_openai_embeddings(text, model)else:raise ValueError(f"不支持的embedding提供商: {self.embedding_provider}")
7.3 增量更新机制
class IncrementalVectorDBIngestor(VectorDBIngestor):def update_index(self, new_documents: List[dict], existing_index_path: Path):"""增量更新向量索引"""# 加载现有索引existing_index = faiss.read_index(str(existing_index_path))# 处理新文档new_embeddings = self._get_embeddings([doc['text'] for doc in new_documents])# 更新索引new_embeddings_array = np.array(new_embeddings, dtype=np.float32)existing_index.add(new_embeddings_array)return existing_index
8. 使用示例与配置
8.1 基本使用
# 创建BM25索引 bm25_ingestor = BM25Ingestor() bm25_ingestor.process_reports(all_reports_dir=Path('./chunked_reports'),output_dir=Path('./bm25_dbs') )# 创建向量索引 vector_ingestor = VectorDBIngestor() vector_ingestor.process_reports(all_reports_dir=Path('./chunked_reports'),output_dir=Path('./vector_dbs') )
8.2 环境配置
# 设置DashScope API密钥 export DASHSCOPE_API_KEY="your_api_key_here"# 或者创建.env文件 echo "DASHSCOPE_API_KEY=your_api_key_here" > .env
8.3 高级配置
# 自定义embedding模型 vector_ingestor = VectorDBIngestor() embeddings = vector_ingestor._get_embeddings(text_chunks, model="text-embedding-v2" # 使用v2模型 )# 自定义FAISS索引类型 def create_optimized_index(self, embeddings: List[float]):"""创建优化的FAISS索引"""embeddings_array = np.array(embeddings, dtype=np.float32)dimension = len(embeddings[0])# 使用IVF索引提升检索速度quantizer = faiss.IndexFlatIP(dimension)index = faiss.IndexIVFFlat(quantizer, dimension, 100) # 100个聚类中心# 训练索引index.train(embeddings_array)index.add(embeddings_array)return index
9. 技术优势与创新点
9.1 核心技术优势
双索引架构:结合向量检索和关键词检索的优势
批量处理:高效的批量embedding生成
错误处理:完善的错误处理和重试机制
可扩展性:支持大规模文档处理
多格式支持:支持多种索引格式
9.2 创新设计
混合检索:向量检索 + BM25关键词检索
按报告分索引:每个报告独立的索引文件
智能重试:基于Tenacity的重试机制
详细日志:完整的错误日志记录
进度可视化:实时处理进度显示
9.3 企业应用价值
知识库构建:为企业知识库提供高效的检索能力
检索优化:通过双索引提升检索精度和召回率
成本控制:通过批量处理优化API调用成本
可维护性:清晰的错误日志便于问题排查
10. 性能监控与优化
10.1 性能指标
import time from typing import Dict, Anyclass PerformanceMonitor:def __init__(self):self.metrics = {}def start_timer(self, operation: str):self.metrics[operation] = {'start_time': time.time()}def end_timer(self, operation: str):if operation in self.metrics:self.metrics[operation]['duration'] = time.time() - self.metrics[operation]['start_time']def get_metrics(self) -> Dict[str, Any]:return self.metrics
10.2 资源监控
import psutil import osdef monitor_resources():"""监控系统资源使用情况"""cpu_percent = psutil.cpu_percent()memory = psutil.virtual_memory()disk = psutil.disk_usage('/')return {'cpu_percent': cpu_percent,'memory_percent': memory.percent,'disk_percent': disk.percent}
11. 总结
ingestion.py
模块展示了现代RAG系统中向量化存储的最佳实践:
技术亮点
双索引架构:FAISS向量索引 + BM25关键词索引
批量处理优化:高效的批量embedding生成
完善错误处理:重试机制和详细日志记录
可扩展设计:支持大规模文档处理
多格式支持:灵活的索引格式选择
企业应用价值
检索能力提升:通过双索引架构提升检索效果
成本优化:通过批量处理降低API调用成本
可维护性:完善的错误处理和日志记录
可扩展性:支持企业级大规模数据处理
最佳实践建议
索引选择:根据数据规模选择合适的FAISS索引类型
批量优化:合理设置批量大小平衡效率和资源使用
错误处理:建立完善的错误处理和恢复机制
性能监控:监控处理性能和资源使用情况
通过深入理解这个模块的设计和实现,我们可以为企业级RAG系统构建高效的向量化存储能力,为后续的智能检索和生成奠定坚实的基础。
作者简介:专注于AI技术在企业应用中的实践,擅长RAG系统设计和向量化存储技术。
技术交流:欢迎在评论区分享您的向量化存储实践经验和遇到的问题,共同探讨向量检索技术的最佳实践。
import os
import json
import pickle
from typing import List, Union
from pathlib import Path
from tqdm import tqdmfrom dotenv import load_dotenv
from openai import OpenAI
from rank_bm25 import BM25Okapi
import faiss
import numpy as np
from tenacity import retry, wait_fixed, stop_after_attempt
import dashscope
from dashscope import TextEmbedding# BM25Ingestor:BM25索引构建与保存工具
class BM25Ingestor:def __init__(self):passdef create_bm25_index(self, chunks: List[str]) -> BM25Okapi:"""从文本块列表创建BM25索引"""tokenized_chunks = [chunk.split() for chunk in chunks]return BM25Okapi(tokenized_chunks)def process_reports(self, all_reports_dir: Path, output_dir: Path):"""批量处理所有报告,生成并保存BM25索引。参数:all_reports_dir (Path): 存放JSON报告的目录output_dir (Path): 保存BM25索引的目录"""output_dir.mkdir(parents=True, exist_ok=True)all_report_paths = list(all_reports_dir.glob("*.json"))for report_path in tqdm(all_report_paths, desc="Processing reports for BM25"):# 加载报告with open(report_path, 'r', encoding='utf-8') as f:report_data = json.load(f)# 提取文本块并创建BM25索引text_chunks = [chunk['text'] for chunk in report_data['content']['chunks']]bm25_index = self.create_bm25_index(text_chunks)# 保存BM25索引,文件名用sha1_namesha1_name = report_data["metainfo"]["sha1_name"]output_file = output_dir / f"{sha1_name}.pkl"with open(output_file, 'wb') as f:pickle.dump(bm25_index, f)print(f"Processed {len(all_report_paths)} reports")# VectorDBIngestor:向量库构建与保存工具
class VectorDBIngestor:def __init__(self):# 初始化DashScope API Keydashscope.api_key = os.getenv("DASHSCOPE_API_KEY")@retry(wait=wait_fixed(20), stop=stop_after_attempt(2))def _get_embeddings(self, text: Union[str, List[str]], model: str = "text-embedding-v1") -> List[float]:# 获取文本或文本块的嵌入向量,支持重试(使用阿里云DashScope,分批处理)if isinstance(text, str) and not text.strip():raise ValueError("Input text cannot be an empty string.")# 保证 input 为一维字符串列表或单个字符串if isinstance(text, list):text_chunks = textelse:text_chunks = [text]# 类型检查,确保每一项都是字符串if not all(isinstance(x, str) for x in text_chunks):raise ValueError("所有待嵌入文本必须为字符串类型!实际类型: {}".format([type(x) for x in text_chunks]))# 过滤空字符串text_chunks = [x for x in text_chunks if x.strip()]if not text_chunks:raise ValueError("所有待嵌入文本均为空字符串!")# print('start embedding ================================')embeddings = []MAX_BATCH_SIZE = 25LOG_FILE = 'embedding_error.log'for i in range(0, len(text_chunks), MAX_BATCH_SIZE):batch = text_chunks[i:i+MAX_BATCH_SIZE]resp = TextEmbedding.call(model=TextEmbedding.Models.text_embedding_v1,input=batch)# print('i=',i)# print('resp=',resp)# with open(LOG_FILE, 'a', encoding='utf-8') as f:# f.write('i='+str(i)+'\n')# f.write('resp='+str(resp)+'\n')# 兼容单条和多条输入if 'output' in resp and 'embeddings' in resp['output']:print('11111111')for emb in resp['output']['embeddings']:if emb['embedding'] is None or len(emb['embedding']) == 0:error_text = batch[emb.text_index] if hasattr(emb, 'text_index') else Nonewith open(LOG_FILE, 'a', encoding='utf-8') as f:f.write(f"DashScope返回的embedding为空,text_index={getattr(emb, 'text_index', None)},文本内容如下:\n{error_text}\n{'-'*60}\n")raise RuntimeError(f"DashScope返回的embedding为空,text_index={getattr(emb, 'text_index', None)},文本内容已写入 {LOG_FILE}")embeddings.append(emb['embedding'])elif 'output' in resp and 'embedding' in resp['output']:if resp['output']['embedding'] is None or len(resp['output']['embedding']) == 0:print('22222222')with open(LOG_FILE, 'a', encoding='utf-8') as f:f.write("DashScope返回的embedding为空,文本内容如下:\n{}\n{}\n".format(batch[0] if batch else None, '-'*60))raise RuntimeError("DashScope返回的embedding为空,文本内容已写入 {}".format(LOG_FILE))embeddings.append(resp.output.embedding)else:print('33333333')raise RuntimeError(f"DashScope embedding API返回格式异常: {resp}")return embeddingsdef _create_vector_db(self, embeddings: List[float]):# 用faiss构建向量库,采用内积(余弦距离)embeddings_array = np.array(embeddings, dtype=np.float32)dimension = len(embeddings[0])index = faiss.IndexFlatIP(dimension) # Cosine distanceindex.add(embeddings_array)return indexdef _process_report(self, report: dict):# 针对单份报告,提取文本块并生成向量库text_chunks = [chunk['text'] for chunk in report['content']['chunks']]embeddings = self._get_embeddings(text_chunks)index = self._create_vector_db(embeddings)return indexdef process_reports(self, all_reports_dir: Path, output_dir: Path):# 批量处理所有报告,生成并保存faiss向量库all_report_paths = list(all_reports_dir.glob("*.json"))output_dir.mkdir(parents=True, exist_ok=True)for report_path in tqdm(all_report_paths, desc="Processing reports"):with open(report_path, 'r', encoding='utf-8') as file:report_data = json.load(file)index = self._process_report(report_data)sha1_name = report_data["metainfo"]["sha1_name"]faiss_file_path = output_dir / f"{sha1_name}.faiss"faiss.write_index(index, str(faiss_file_path))print(f"Processed {len(all_report_paths)} reports")