当前位置: 首页 > news >正文

从零构建RAG知识库管理系统(三)

第三篇:《RAG核心模块深度解析》

引言

在前两篇文章中,我们介绍了RAG知识库管理系统的整体架构和后端服务实现。本文将深入探讨系统的核心模块——RAG(Retrieval-Augmented Generation)技术的实现细节。我们将从传统RAG与多模态RAG的区别开始,逐步分析文档处理流程、OCR技术集成、向量数据库Milvus集成以及嵌入模型的使用。

传统RAG与多模态RAG的区别与实现

RAG基类设计 (base_rag.py)

RAG类代码总结

1. 核心依赖

导入llama-index系列核心模块(向量索引、存储上下文、节点解析器等)、Milvus向量存储模块,以及自定义RagConfig配置类,同时引入abstractmethod实现抽象方法。

2. 类结构与核心功能

(1)初始化

  • 接收files(文件路径列表)作为参数,存储为实例属性,为后续数据加载提供来源。

(2)抽象方法

  • async def load_data(self):无具体实现,需子类继承后重写,用于定义数据加载逻辑(如读取特定格式文件)。

(3)本地索引操作(2个方法)

方法

功能

关键细节

async create_index_local

生成并持久化本地向量索引

1. 调用load_data加载数据,为空时设为空列表;
2. 用SentenceSplitterchunk_size=2048chunk_overlap=200)解析文档为节点;
3. 构建VectorStoreIndex,并将索引持久化到默认/指定目录

static async load_index_local

从本地持久化目录加载索引

1. 静态方法,无需实例化即可调用;
2. 通过StorageContext读取指定目录,加载索引

(4)远程索引操作(2个方法)

方法

功能

关键细节

async create_index_remote

生成并存储索引到远程Milvus

1. 调用load_data加载数据,为空时设为空列表;
2. 用默认SentenceSplitter解析文档为节点;
3. 基于RagConfig.Milvus_uri创建Milvus向量存储(指定集合名、维度512、是否覆盖等);
4. 结合存储上下文构建索引并关联远程Milvus

static async load_index_remote

从远程Milvus加载索引

1. 静态方法,无需实例化即可调用;
2. 基于RagConfig配置创建Milvus向量存储连接;
3. 通过VectorStoreIndex.from_vector_store直接从远程存储加载索引

3. 核心设计特点

  • 抽象与继承:通过load_data抽象方法,强制子类实现数据加载逻辑,保证扩展性;
  • 本地/远程双支持:分别提供本地文件持久化、远程Milvus存储两种索引管理方案;
  • 静态方法优化:本地/远程索引加载用静态方法,支持直接调用,无需创建RAG实例。
from abc import abstractmethodfrom llama_index.core import VectorStoreIndex, StorageContext, load_index_from_storage
from llama_index.core.indices.base import BaseIndex
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.storage.storage_context import DEFAULT_PERSIST_DIRfrom llama_index.vector_stores.milvus import MilvusVectorStorefrom .config import RagConfigclass RAG:def __init__(self,files:list[str]):self.files = files@abstractmethodasync  def load_data(self):"""抽象方法,子类继承重写:return:"""# 创建本地索引方法async def create_index_local(self, persist_dir=DEFAULT_PERSIST_DIR) -> BaseIndex:data = await self.load_data()  # 加载数据# 确保data不为Noneif data is None:data = []node_parser = SentenceSplitter(chunk_size=2048, chunk_overlap=200)  # 增加 chunk_sizenodeList = node_parser.get_nodes_from_documents(data)  # 获取节点index = VectorStoreIndex(nodeList)  # 创建索引index.storage_context.persist(persist_dir=persist_dir)  # 保存索引数据到索引目录中return index# 创建远程索引方法async  def create_index_remote(self, collection_name="default", overwrite=False)->BaseIndex:data = await self.load_data()     # 加载数据# 确保data不为Noneif data is None:data = []node_parser = SentenceSplitter()     # 创建节点解析器nodeList = node_parser.get_nodes_from_documents(data)    # 获取节点# 创建向量数据索引vector_store =  MilvusVectorStore(uri = RagConfig.Milvus_uri,   # Milvus服务地址collection_name = collection_name,   # 向量集合名称dim = 512,   # 向量维度overwrite= overwrite,  # 是否覆盖enable_dynamic_field=True  # 启用动态字段以支持metadata等额外字段)# 创建储存上下文,使用默认配置并指定向量存储storage_context = StorageContext.from_defaults(vector_store=vector_store)# 构建向量存储索引,基于给定的节点和存储上下⽂index = VectorStoreIndex(nodeList,storage_context=storage_context)# 返回构建好的向量存储索引return index# 从本地储存中加载索引@staticmethodasync def load_index_local(persist_dir=DEFAULT_PERSIST_DIR) -> BaseIndex:# 从储存中加载索引return load_index_from_storage(# 创建储存上下文,并指定索引目录StorageContext.from_defaults(persist_dir=persist_dir))# 从远程储存中加载索引@staticmethodasync def load_index_remote(collection_name="default")->BaseIndex:# 于从远程Milvus数据库加载索引数据源vector_store = MilvusVectorStore(uri=RagConfig.Milvus_uri,  # Milvus服务地址collection_name=collection_name,  # 向量集合名称dim=RagConfig.Milvus_dim,  # 向量维度overwrite=False,  # 是否覆盖enable_dynamic_field=True  # ✅ 新增:启用动态字段支持)# 用于从已有的向量存储中构建索引return VectorStoreIndex.from_vector_store(vector_store = vector_store)

传统RAG实现 (traditional_rag.py)

传统RAG模型(TraditionalRAG)代码简短总结
  1. 核心功能:继承自RAG类,实现传统RAG模型的数据加载逻辑,将输入文件解析为可用于后续处理的Document文档列表。
  2. 数据加载流程(load_data方法)
    • 初始化空的文档列表(docs),用于存储最终处理后的Document对象。
    • 遍历输入的文件列表(self.files),逐个处理文件:
      1. 调用ocr_file_to_text_llm(借助kimi)对文件进行OCR解析,提取文本内容。
      2. 生成含当前时间戳的临时txt文件,避免文件名冲突。
      3. 将OCR提取的文本写入临时文件。
      4. 通过SimpleDirectoryReader读取临时文件内容,加载数据。
      5. 将加载的数据转换为Document对象(含文本内容与文件路径元数据),添加到docs列表。
      6. 删除临时文件,避免残留。
    • 处理完所有文件后,返回最终的文档列表(docs)。
  1. 依赖模块:依赖asyncio(异步)、os(文件操作)、datetime(时间戳生成)、llama_index.core(文档读取与Document类),以及自定义的base_rag、utils、ocr模块中的工具函数。
# 传统RAG模型
import asyncio
import os
from datetime import datetimefrom llama_index.core import SimpleDirectoryReader, Documentfrom .base_rag import RAG
from .utils import ocr_file_to_text_llm, is_image
from .ocr import ocr_file_to_text, ocr_image_to_textfrom .utils import ocr_file_to_text_llmclass TraditionalRAG(RAG):# 遍历文件列表:检查是否有文件需要处理。async def load_data(self):'''传统RAG模型加载数据'''# 初始化文档列表docs = []# 开始:程序启动。for file in self.files:# 使用kimi实现文件的ORC解析contents = ocr_file_to_text_llm(file)# 生成临时文件名,避免文件名冲突temp_file = datetime.now().strftime("%Y%m%d%H%M%S") + ".txt"# 将OCR识别的内容写入临时文件with open(temp_file, "w", encoding="utf-8") as f:f.write(contents)# 使⽤SimpleDirectoryReader读取临时⽂件内容并加载数据data = SimpleDirectoryReader(input_files=[temp_file]).load_data()# 将读取的数据转换为Document对象,并添加到文档列表中doc = Document(text="\n\n".join([d.text for d in data[0:]]),metadata={"path": file})docs.append(doc)# 删除 临时文件os.remove(temp_file)# 返回⽂档列表return docs

多模态RAG实现 (multimodal_rag.py)

多模态RAG能够处理包含图像、表格和文本的复杂文档,例如PDF文件。它不仅能提取文本内容,还能识别和处理文档中的图像和表格。

代码简短总结

该代码实现了一个MultiModalRAG类(继承自RAG类),核心功能是加载并处理多种格式文件(图片、PDF、PPT/PPTX、文本),提取其中的文本、表格、图像信息并转换为Document对象,用于后续的检索增强生成(RAG)任务。

1. 核心依赖与工具函数
  • 依赖库os(系统交互)、asyncio/concurrent.futures(异步任务)、fitz(PyMuPDF)(PDF处理)、llama_index.coreDocument类、异步任务执行)等。
  • 工具函数(来自utils):图像描述(describe_image)、文本块处理(process_text_blocks)、表格处理(process_table)、格式转换(convert_ppt_to_pdf等)、文件保存(save_uploaded_file)。
2. 关键静态方法

(1)parse_all_tables:提取PDF中的表格

  • 功能:从PDF指定页面查找表格,过滤无外部标题的表格;
  • 流程:
    1. 将表格转为pandas DataFrame,保存为Excel文件;
    2. 提取表格边界框、周围文本,保存表格图像;
    3. 生成表格描述,构建包含表格标题、内容、列名、元数据(来源、路径、类型等)的Document对象;
  • 输出:表格Document列表、表格边界框列表。

(2)parse_all_images:提取PDF中的图像

  • 功能:从PDF指定页面提取有效图像(过滤过小图像);
  • 流程:
    1. 获取图像信息(XREF编号、边界框),提取图像二进制数据并保存;
    2. 提取图像周围文本,调用describe_image生成图像描述;
    3. 构建包含图像标题、描述、元数据的Document对象;
  • 输出:图像Document列表。
3. 核心异步方法 load_data:多格式文件处理
  • 功能:遍历输入文件列表,根据文件格式(图片、PDF、PPT/PPTX、文本)分别处理,生成Document列表;
  • 各格式处理逻辑:
    • 图片(.png/.jpg/.jpeg):异步调用describe_image生成图像描述,构建Document
    • PDF(.pdf):同步调用process_pdf_file(未展示代码,推测整合表格/图像/文本提取),获取Document并添加;
    • PPT/PPTX(.ppt/.pptx):异步调用process_ppt_file(未展示代码,推测先转PDF或直接提取文本/备注/图像),获取Document并添加;
    • 文本文件:读取文本内容,直接构建Document
  • 异步任务管理:用asyncio.gather执行多任务,捕获异常并记录日志,最终返回所有有效Document
4. 核心目标

将多模态数据(文本、表格、图像)统一转换为标准化的Document对象,为后续RAG系统的向量存储、检索匹配提供结构化数据支持。

# 导入os模块,用于与操作系统进行交互,执行如文件操作、获取环境变量等任务
import os
from typing import List
import asyncio
import concurrent.futures
from functools import partial
import fitz # PyMuPDF库,用于处理PDF文件
# 导入Document类,用于表示和处理文本文档
from llama_index.core import Document
# 导入run_jobs函数,用于异步执行任务
from llama_index.core.async_utils import run_jobs
from .base_rag import RAG
import loggingfrom .utils import (describe_image, # 描述图像内容process_text_blocks, # 处理文本块extract_text_around_item, # 提取项目周围的文本process_table, # 处理表格convert_ppt_to_pdf, # 将PPT转换为PDFconvert_pdf_to_images, # 将PDF转换为图像extract_text_and_notes_from_ppt, # 从PPT中提取文本和备注save_uploaded_file # 保存上传的文件
)import logging
logger = logging.getLogger(__name__)class MultiModalRAG(RAG):@staticmethoddef parse_all_tables(filename, page, pagenum, text_blocks, ongoing_tables):"""从PDF页面中提取表格并处理成文档。"""# 初始化表格文档和边界框列表table_docs = []table_bboxes = []try:# 在页面上查找表格tables = page.find_tables(horizontal_strategy="lines_strict",vertical_strategy="lines_strict")for tab in tables:# 检查表格是否没有外部标题if not tab.header.external:# 将表格转换为pandas DataFramepandas_df = tab.to_pandas()# 创建存储表格引用的目录tablerefdir = os.path.join(os.getcwd(), "vectorstore/table_references")os.makedirs(tablerefdir, exist_ok=True)# 保存表格为Excel文件df_xlsx_path = os.path.join(tablerefdir, f"table{len(table_docs) + 1}-page{pagenum}.xlsx")pandas_df.to_excel(df_xlsx_path)# 获取表格的边界框bbox = fitz.Rect(tab.bbox)table_bboxes.append(bbox)# 提取表格周围的文本before_text, after_text = extract_text_around_item(text_blocks, bbox, page.rect.height)# 获取表格的图像并保存table_img = page.get_pixmap(clip=bbox)table_img_path = os.path.join(tablerefdir, f"table{len(table_docs) + 1}-page{pagenum}.jpg")table_img.save(table_img_path)# 获取表格内容及描述content, description = process_table(table_img_path)# 构建表格的标题caption = before_text.replace("\n", " ") + " ".join(tab.header.names) + after_text.replace("\n", " ")# 构建表格的元数据table_metadata = {"source": f"{filename[:-4]}-page{pagenum}-table{len(table_docs) + 1}","dataframe": df_xlsx_path,"image": table_img_path,"caption": caption,"type": "table","page_num": pagenum}# 获取所有列名all_cols = ", ".join(list(pandas_df.columns.values))# 构建文档对象doc = Document(text=f"这是一个表格,标题是: {caption}\n表格的内容是: {content}\n表格的列名是: {all_cols}\n表格的解释是:{description}",metadata=table_metadata)table_docs.append(doc)except Exception as e:# 处理表格提取过程中出现的异常print(f"Error during table extraction: {e}")# 返回表格文档、边界框和正在进行的表格return table_docs, table_bboxes, ongoing_tables@staticmethoddef parse_all_images(filename, page, pagenum, text_blocks):"""从PDF页面中提取所有图像,并生成包含图像及其元数据的文档列表。"""image_docs = [] # 初始化存储图像文档的列表image_info_list = page.get_image_info(xrefs=True) # 获取页面中所有 图像的信息page_rect = page.rect # 获取页面的矩形区域# 遍历页面中的所有图像信息for image_info in image_info_list:xref = image_info['xref'] # 获取图像的XREF编号if xref == 0:continue # 跳过无效的XREF编号img_bbox = fitz.Rect(image_info['bbox']) # 获取图像的边界框# 过滤掉尺寸过小的图像if img_bbox.width < page_rect.width / 20 or img_bbox.height < page_rect.height / 20:continueextracted_image = page.parent.extract_image(xref) # 提取图像数据image_data = extracted_image["image"] # 获取图像的二进制数据imgrefpath = os.path.join(os.getcwd(), "vectorstore/image_references") # 图像保存路径os.makedirs(imgrefpath, exist_ok=True) # 创建保存路径目录image_path = os.path.join(imgrefpath, f"image{xref}-page{pagenum}.png") # 图像文件名# 图片上传到minio 文件服务器with open(image_path, "wb") as img_file:img_file.write(image_data) # 将图像数据写入文件before_text, after_text = extract_text_around_item(text_blocks, img_bbox, page.rect.height) # 获取图像周围的文本# 1、可以借助多模态模型进行图像描述,2、借助orc识别描述,3、尝试多维度描 述该图片image_description = describe_image(image_path)caption = before_text.replace("\n", " ")image_metadata = {"source": f"{filename[:-4]}-page{pagenum}-image{xref}", # 图像来源"image": image_path, # 图像路径"caption": caption, # 图像标题"type": "image", # 图像类型"page_num": pagenum # 图像所在页码}image_docs.append(Document(text="这是一张图像,标题是: " + caption + f"\n图像的描述是:{before_text}\n" + image_description + f"\n{after_text}",metadata=image_metadata)) # 添加图像文档到列表return image_docs # 返回包含图像及其元数据的文档列表async def load_data(self) -> list[Document]:"""Load and process multiple file types."""documents = []tasks = []for file_path in self.files:file_name = os.path.basename(file_path)file_extension = os.path.splitext(file_name.lower())[1]if file_extension in ('.png', '.jpg', '.jpeg'):async def process_image(image_path):image_text = await asyncio.get_event_loop().run_in_executor(None,lambda: describe_image(image_path))return Document(text=image_text,metadata={"source": file_name, "type": "image", "image": image_path})tasks.append(process_image(file_path))elif file_extension == '.pdf':try:loop = asyncio.get_event_loop()pdf_documents = await loop.run_in_executor(None, MultiModalRAG.process_pdf_file, file_path)documents.extend(pdf_documents)except Exception as e:logger.error(f"Error processing PDF {file_name}: {e}")elif file_extension in ('.ppt', '.pptx'):try:ppt_documents = await MultiModalRAG.process_ppt_file(file_path)documents.extend(ppt_documents)except Exception as e:logger.error(f"Error processing PPT {file_name}: {e}")else:async def process_text(text_path):with open(text_path, "rb") as file:text = file.read().decode("utf-8")return Document(text=text, metadata={"source": file_name, "type": "text"})tasks.append(process_text(file_path))if tasks:results = await asyncio.gather(*tasks, return_exceptions=True)for result in results:if isinstance(result, Document):documents.append(result)elif isinstance(result, list):documents.extend(result)elif isinstance(result, Exception):logger.error(f"处理文件时发生错误: {result}")return documents

文档处理流程(PDF、PPT、图片等)

PDF文档处理

PDF文档处理是多模态RAG的核心功能之一。系统使用PyMuPDF库来处理PDF文件,能够提取文本、图像和表格。

process_pdf_file 方法简短总结

该方法为静态方法,核心功能是处理PDF文件并提取关键内容,具体要点如下:

  1. 初始化操作:创建列表存储提取的Document对象,创建字典跟踪跨页的持续表格;
  2. 文件打开:尝试以PDF格式打开目标文件,若失败则打印错误信息并返回空列表;
  3. 基础信息获取:提取文件名,获取PDF总页数并打印处理起始提示;
  4. 逐页处理:遍历每一页PDF,打印当前处理页码,具体操作含:
    • 优化文本块:按“属于文本层、不在页面边缘、尺寸达标”条件过滤提取文本块;
    • 文本块组织:调用process_text_blocks对过滤后的文本块分组;
    • 表格提取:调用MultiModalRAG.parse_all_tables解析表格,更新持续表格,将表格文档加入存储列表;
    • 图像提取:调用MultiModalRAG.parse_all_images解析图像,将图像文档加入存储列表;
    • 文本块处理:遍历分组后的文本块,排除与表格位置重叠的标题块,将符合条件的文本块封装为Document对象并加入存储列表;
  1. 收尾操作:关闭PDF文件,打印处理完成提示(含生成的文档块数量),返回存储所有提取内容的列表。
@staticmethod
def process_pdf_file(pdf_file):"""处理 PDF 文件并提取文本、表格和图像。"""# 初始化一个列表来存储所有提取的 Document 对象all_pdf_documents = []# 初始化一个字典来跟踪跨页的持续表格ongoing_tables = {}# 尝试打开 PDF 文件try:f = fitz.open(filename=pdf_file, filetype="pdf")except Exception as e:print(f"pdf文件打开发生错误: {e}")return []file_name = os.path.basename(pdf_file)# 获取总页数,用于进度显示total_pages = len(f)print(f"开始处理PDF文件: {file_name},共 {total_pages} 页")# 遍历 PDF 的每一页for i in range(total_pages):print(f"正在处理第 {i + 1}/{total_pages} 页")page = f[i]# 优化文本块提取,增加过滤条件text_blocks = [block for block in page.get_text("blocks", sort=True)if block[-1] == 0 and not (block[1] < page.rect.height * 0.1 orblock[3] > page.rect.height * 0.9 orblock[3] - block[1] < 10 or  # 过滤过小的文本块block[2] - block[0] < 10)]  # 过滤过窄的文本块# 组织文本块以更好地分类grouped_text_blocks = process_text_blocks(text_blocks)# 从页面中解析表格,必要时更新持续表格table_docs, table_bboxes, ongoing_tables = MultiModalRAG.parse_all_tables(file_name, page, i, text_blocks, ongoing_tables)all_pdf_documents.extend(table_docs)# 从页面中解析图像image_docs = MultiModalRAG.parse_all_images(file_name, page, i, text_blocks)all_pdf_documents.extend(image_docs)# 遍历组织后的文本块for text_block_ctr, (heading_block, content) in enumerate(grouped_text_blocks, 1):heading_bbox = fitz.Rect(heading_block[:4])# 检查标题框是否与任何表格框相交if not any(heading_bbox.intersects(table_bbox) for table_bbox in table_bboxes):bbox = {"x1": heading_block[0], "y1": heading_block[1],"x2": heading_block[2], "y2": heading_block[3]}# 创建一个 Document 对象来存储文本块text_doc = Document(text=f"{heading_block[4]}\n{content}",metadata={**bbox,"type": "text","page_num": i,"source": f"{file_name[:-4]}-page{i}-block{text_block_ctr}"},id_=f"{file_name[:-4]}-page{i}-block{text_block_ctr}")all_pdf_documents.append(text_doc)# 关闭 PDF 文件f.close()print(f"PDF文件 {file_name} 处理完成,共生成 {len(all_pdf_documents)} 个文档块")return all_pdf_documents

PPT文档处理

PPT文档处理使用python-pptx库来提取文本和备注,并使用PyMuPDF将PPT转换为PDF进行图像处理。

process_ppt_file 函数总结
核心功能
  • 异步静态方法,处理.ppt.pptx格式文件,提取内容生成Document对象列表(每个对象对应一张幻灯片)
  • 文本提取优先级:优先提取原生文本(含备注、表格、SmartArt、占位符等),仅必要时对图像做OCR补充文本;图像描述为可选降级功能
关键处理步骤
  1. 文件格式兼容
    • .ppt文件,通过pywin32(需提前安装)转换为.pptx;无pywin32则报错返回空列表
    • 直接打开.pptx文件,打开失败则报错返回空列表
  1. 文本提取与缓存
    • 提取单张幻灯片所有原生文本(备注+普通文本+表格内容+分组/SmartArt文本+占位符文本)
    • 用文本哈希建立缓存(text_cache.json),避免重复处理
  1. 图像处理与OCR
    • 过滤过小图像(小于MIN_IMAGE_SIZE像素),保存有效图像到指定路径(vectorstore/ppt_references
    • 图像预处理(放大2倍、增强对比度)提升OCR准确率
    • OCR优先级:优先调用ocr_file_to_text_llm(LLM OCR),失败则 fallback 到pytesseract(支持中英)
    • 用图像哈希建立OCR缓存(ocr_cache.json),避免重复OCR
  1. 并行处理与结果组装
    • 异步并行处理所有幻灯片(asyncio.gather
    • 合并“原生文本+OCR文本”,生成含元数据(来源文件、图像路径、页码等)的Document对象
    • 过滤异常结果,收集有效Document返回
依赖与配置
  • 必需依赖:python-pptx(处理.pptx)、Pillow(图像处理)、pytesseract(OCR备用)、llama_index.coreDocument类)
  • 可选依赖:pywin32(处理.ppt转换)
  • 配置项:最小图像尺寸(MIN_IMAGE_SIZE)、最大摘要长度(MAX_CAPTION_LENGTH)、缓存/图像存储路径
返回值
  • 成功:list,元素为Document对象(含幻灯片文本内容及元数据)
  • 失败(如格式不支持、文件打开错误):空list(伴随日志报错)
@staticmethod
async def process_ppt_file(ppt_file):"""处理 PowerPoint 文件,支持 .ppt 和 .pptx 格式。优先提取所有原生文本(包括表格、SmartArt、备注),仅在必要时对图像进行OCR提取文本。图像描述降级为可选。参数:- ppt_file (str): PowerPoint 文件的路径。返回:- list: 包含处理后的数据列表,每个元素是一个 Document 对象。"""from pptx import Presentationfrom PIL import Image, ImageEnhanceimport ioimport osimport asyncioimport loggingimport jsonimport hashlibfrom llama_index.core import Documentfrom .utils import describe_image, ocr_file_to_text_llm  # 假设ocr_file_to_text_llm是您的LLM OCR函数try:import win32com.clientimport pythoncomexcept ImportError:win32com = Noneimport pytesseract  # 确保已安装# pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'  # 根据您的环境设置logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)processed_data = []file_name = os.path.basename(ppt_file)imgrefpath = os.path.join(os.getcwd(), "vectorstore/ppt_references")os.makedirs(imgrefpath, exist_ok=True)MAX_CAPTION_LENGTH = 500MIN_IMAGE_SIZE = 10  # 最小图像尺寸(像素)# 缓存文件text_cache_file = os.path.join(imgrefpath, "text_cache.json")  # 新增文本缓存ocr_cache_file = os.path.join(imgrefpath, "ocr_cache.json")text_cache = {}ocr_cache = {}if os.path.exists(text_cache_file):with open(text_cache_file, "r") as f:text_cache = json.load(f)if os.path.exists(ocr_cache_file):with open(ocr_cache_file, "r") as f:ocr_cache = json.load(f)file_ext = os.path.splitext(file_name.lower())[1]# 处理 .ppt 文件:转换为 .pptx 或直接提取文本if file_ext == '.ppt':if not win32com:logger.error(f"未安装 pywin32,无法处理 .ppt 文件 {file_name}。请安装或转换为 .pptx")return []logger.info(f"检测到 .ppt 文件 {file_name},使用 win32com 处理并转换为 .pptx")try:pythoncom.CoInitialize()powerpoint = win32com.client.Dispatch("PowerPoint.Application")pres = powerpoint.Presentations.Open(ppt_file, WithWindow=False)pptx_path = ppt_file + 'x'  # 转换为 .pptxpres.SaveAs(pptx_path, 24)  # 24 = ppSaveAsOpenXMLPresentationpres.Close()powerpoint.Quit()pythoncom.CoUninitialize()ppt_file = pptx_path  # 后续使用 .pptx 处理file_ext = '.pptx'except Exception as e:logger.error(f"转换 .ppt 文件 {file_name} 失败: {e}")return []# 打开 .pptx 文件try:prs = Presentation(ppt_file)except Exception as e:logger.error(f"打开 PPT 文件 {file_name} 失败: {e}")return []# 增强文本提取函数:覆盖所有可能文本源def extract_all_text_from_slide(slide):text = ""# 提取备注if slide.has_notes_slide:notes = slide.notes_slide.notes_text_frame.text.strip()if notes:text += f"\n\nSpeaker Notes: {notes}"# 遍历所有形状for shape in slide.shapes:# 普通文本if hasattr(shape, "text") and shape.text.strip():text += shape.text.strip() + "\n"# 表格if shape.has_table:for row in shape.table.rows:for cell in row.cells:text += cell.text.strip() + " | "text += "\n"# 分组/SmartArtif shape.shape_type == 6:  # Groupfor sub_shape in shape.shapes:if hasattr(sub_shape, "text") and sub_shape.text.strip():text += sub_shape.text.strip() + "\n"# 占位符/其他if shape.is_placeholder:text += shape.text.strip() + "\n"return text.strip()# 图像预处理(提高OCR准确率)def preprocess_image(image_path):try:img = Image.open(image_path)img = img.resize((img.width * 2, img.height * 2), Image.Resampling.LANCZOS)enhancer = ImageEnhance.Contrast(img)img = enhancer.enhance(2.0)  # 增强对比度temp_path = image_path.replace(".png", "_processed.png")img.save(temp_path, format="PNG")return temp_pathexcept Exception as e:logger.error(f"预处理图像 {image_path} 失败: {e}")return image_path# 异步OCR:优先LLM OCR,fallback到Tesseractasync def async_ocr_image(image_path):image_hash = hashlib.md5(open(image_path, "rb").read()).hexdigest()if image_hash in ocr_cache:return ocr_cache[image_hash]try:processed_image = preprocess_image(image_path)ocr_text = await asyncio.get_event_loop().run_in_executor(None,lambda: ocr_file_to_text_llm(processed_image))if not ocr_text or "没有解析出内容" in ocr_text:  # 如果LLM失败,fallbackraise ValueError("LLM OCR failed")except Exception:# Fallback to Tesseracttry:ocr_text = pytesseract.image_to_string(Image.open(image_path), lang='chi_sim+eng')  # 支持中英except Exception as e:logger.error(f"OCR 提取图像 {image_path} 的文本失败: {e}")ocr_text = ""if processed_image != image_path and os.path.exists(processed_image):os.remove(processed_image)ocr_cache[image_hash] = ocr_textwith open(ocr_cache_file, "w") as f:json.dump(ocr_cache, f)return ocr_text# 异步处理单张幻灯片async def process_slide(page_num, slide):logger.info(f"处理幻灯片 {page_num} / {len(prs.slides)}")# 优先提取所有原生文本(核心)slide_text = extract_all_text_from_slide(slide)file_hash = hashlib.md5(slide_text.encode()).hexdigest()  # 文本哈希用于缓存if file_hash in text_cache:slide_text = text_cache[file_hash]  # 使用缓存文本# 保存文本缓存text_cache[file_hash] = slide_textwith open(text_cache_file, "w") as f:json.dump(text_cache, f)# 提取图像(可选OCR)image_paths = []ocr_texts = []for shape_num, shape in enumerate(slide.shapes, 1):if hasattr(shape, "image") and shape.image:img_data = shape.image.blobimg = Image.open(io.BytesIO(img_data))if img.width < MIN_IMAGE_SIZE or img.height < MIN_IMAGE_SIZE:continueimage_path = os.path.join(imgrefpath, f"{file_name[:-5]}_slide_{page_num}_img_{shape_num}.png")img.save(image_path, format="PNG")image_paths.append(image_path)# 只对图像进行OCR(提取文本),不强制描述图像ocr_text = await async_ocr_image(image_path)ocr_texts.append(ocr_text)# 合并文本:原生文本 + OCR文本(如果有)combined_text = f"{slide_text}\n{' '.join(ocr_texts)}".strip()# 可选:如果需要图像描述,添加(但根据您的要求,降级)# image_descriptions = [await async_describe_image(img) for img in image_paths]  # 注释掉以减少依赖# 构建 caption 和 Documentcaption = (combined_text[:MAX_CAPTION_LENGTH] + "...") if len(combined_text) > MAX_CAPTION_LENGTH else combined_textimage_metadata = {"source": file_name,"image": image_paths[0] if image_paths else "","images": image_paths,"caption": caption,"type": "ppt_slide","page_num": page_num}doc_text = f"PPT 幻灯片 {page_num}: {combined_text}"  # 重点文本return Document(text=doc_text, metadata=image_metadata)# 并行处理所有幻灯片tasks = [process_slide(page_num, slide) for page_num, slide in enumerate(prs.slides, start=1)]results = await asyncio.gather(*tasks, return_exceptions=True)for result in results:if isinstance(result, Document):processed_data.append(result)elif isinstance(result, Exception):logger.error(f"处理幻灯片时发生错误: {result}")logger.info(f"完成处理 PPT 文件 {file_name},生成 {len(processed_data)} 个文档块")return processed_data

OCR技术集成与文本提取

OCR(Optical Character Recognition)技术用于从图像中提取文本信息。系统集成了多种OCR技术,包括基于Kimi API的OCR和Tesseract OCR。

OCR实现 (ocr.py)

代码功能简短总结

该代码是一套OCR(光学字符识别)处理工具,核心功能为将文件/图像通过调用外部OCR服务转换为文本,要点如下:

  1. 核心依赖:依赖base64(图像编码)、requests(HTTP请求)、zipfile(文件解压)等库,及项目内部RagConfig(配置)、utils(工具函数)模块。
  2. 关键配置:从RagConfig获取OCR服务基础URL、文件下载目录,请求头固定为JSON格式。
  3. 核心函数及功能
    • _upload_file:上传文件到OCR服务器,含3次重试+指数退避机制,支持原始文件名/临时文件名两种上传方式,返回文件ID。
    • ocr_file_to_text:处理文件OCR全流程(上传→OCR处理→生成结果文件→下载→清理临时文件→解压重命名),含进度打印和错误捕获,返回最终处理后文件路径。
    • ocr_image_to_text:处理图像OCR,将图像转Base64编码后调用OCR API,直接返回识别出的文本数据。
  1. 核心特性:含超时重试、错误处理、进度可视化、临时文件清理等机制,确保OCR流程稳定可靠。
# 导入必要的库
import base64
import os
import json
import time
import zipfile# 导入第三方库requests用于发送HTTP请求
import requests
# 从项目的配置模块中导入RagConfig类,用于获取OCR相关的配置
from .config import RagConfig
# 从 utils 模块中导入 get_b64_image_from_path 函数
from .utils import get_b64_image_from_path# 初始化基础URL和下载目录,以及请求头
base_url = RagConfig.ocr_base_url  # OCR服务的基础URL
download_dir = RagConfig.ocr_download_dir  # OCR下载文件的保存目录
headers = {"Content-Type": "application/json"}  # 请求头,指定发送的数据类型为JSONdef _upload_file(file_path):"""上传文件到服务器,增加超时和重试机制。"""# 构建文件上传的URLurl = "{}/api/doc/upload".format(base_url)# 配置上传选项options_json = json.dumps({"doc.extractionMode": "mixed",})# 增加重试机制max_retries = 3for attempt in range(max_retries):try:# 尝试使用文件的原始名称进行上传,增加超时时间with open(file_path, "rb") as file:response = requests.post(url,files={"file": file},data={"json": options_json},timeout=(10, 30)  # 连接超时10秒,读取超时30秒)response.raise_for_status()res_data = json.loads(response.text)# 如果上传失败且返回代码为101,则使用临时文件名重新上传if res_data["code"] == 101:file_name = os.path.basename(file_path)file_prefix, file_suffix = os.path.splitext(file_name)temp_name = "temp" + file_suffix# 使用临时文件名重新上传文件with open(file_path, "rb") as file:response = requests.post(url,files={"file": (temp_name, file)},data={"json": options_json},timeout=(10, 30))response.raise_for_status()res_data = json.loads(response.text)# 从响应数据中提取文件IDfile_id = res_data["data"]return file_idexcept requests.exceptions.Timeout:if attempt < max_retries - 1:print(f"上传超时,正在进行第 {attempt + 1} 次重试...")time.sleep(2 ** attempt)  # 指数退避else:raise Exception("文件上传超时,已达到最大重试次数")except Exception as e:if attempt < max_retries - 1:print(f"上传失败: {e},正在进行第 {attempt + 1} 次重试...")time.sleep(2 ** attempt)else:raise edef ocr_file_to_text(file_path):"""将给定的文件通过OCR转换为文本,增加进度显示和错误处理。"""try:print(f"开始处理文件: {file_path}")# 上传文件并获取文件IDprint("1. 正在上传文件...")file_id = _upload_file(file_path)print(f"文件上传成功,ID: {file_id}")# 处理上传的文件print("2. 正在进行OCR处理...")_process_file(file_id)# 生成目标文件并获取文件名和URLprint("3. 正在生成结果文件...")name, url = _generate_target_file(file_id)# 下载目标文件print("4. 正在下载结果...")_download_file(url, name)# 清理上传的文件和生成的临时文件print("5. 正在清理临时文件...")_clean_up(file_id)# 获取文件名和扩展名file_name, file_extension = os.path.splitext(file_path)# 获取不带扩展名的文件名file_name_without_extension = os.path.basename(file_name)# 重组文件名f_name = file_name_without_extension + file_extension# 解压下载的文件print("6. 正在解压文件...")with zipfile.ZipFile(f"{download_dir}/{name}", 'r') as zip_ref:# 提取特定命名规则的文件temp_name = f'[OCR]_{file_name_without_extension}.layered{file_extension}'zip_ref.extract(temp_name, download_dir)# 重命名提取的文件os.rename(os.path.join(download_dir, temp_name),os.path.join(download_dir, f_name))print(f"OCR处理完成: {os.path.join(download_dir, f_name)}")# 返回重命名后的文件路径return os.path.join(download_dir, f_name)except Exception as e:print(f"OCR处理过程中发生错误: {e}")raise edef ocr_image_to_text(file_path):"""使用OCR技术将图像文件转换为文本。"""import requestsimport json# 从指定路径获取图像并转换为Base64编码的字符串data_base64 = get_b64_image_from_path(file_path)# 构建API的URLurl = f"{base_url}/api/ocr"# 构建要发送的数据,包括图像的Base64编码和指定的选项data = {"base64": data_base64,# 可选参数示例"options": {"data.format": "text",}}# 将数据转换为JSON格式data_str = json.dumps(data)# 发送POST请求到OCR APIresponse = requests.post(url, data=data_str, headers=headers)# 确保请求成功,否则抛出异常response.raise_for_status()# 解析响应数据为字典res_dict = json.loads(response.text)# 返回转换后的文本数据return res_dict.get("data")

向量数据库Milvus集成

Milvus是一个开源的向量数据库,专门用于存储和查询向量数据。在RAG系统中,Milvus用于存储文档的向量表示,以便快速检索相关文档。

Milvus工具 (milvus.py)

代码功能简短总结
  1. 依赖导入与初始化:导入os、Milvus相关库(pymilvus)、配置类RagConfig及日志模块,初始化日志记录器;优先从环境变量获取Milvus连接地址(MILVUS_URI),未获取到时使用RagConfig中的默认地址,同时创建MilvusClient实例。
  2. 核心函数功能
    • list_collections():通过RagConfig的Milvus地址建立连接,获取数据库中所有集合名称列表并打印,异常时记录错误日志并返回空列表。
    • drop_collection(collection_name):接收集合名参数,先通过RagConfig地址连接Milvus,检查集合是否存在;存在则删除,不存在则记录警告日志;异常时记录错误日志,返回包含错误码和信息的字典。
import os
from pymilvus import MilvusClientfrom rag_back.rag.config import RagConfig
import logging
logger = logging.getLogger(__name__)# 获取Milvus URI,如果环境变量未设置则使用配置文件中的默认值
milvus_uri = os.getenv("MILVUS_URI") or RagConfig.Milvus_uri
# 创建MilvusClient实例
client = MilvusClient(uri=milvus_uri)def list_collections():"""获取 Milvus 数据库中的所有集合。返回:- list: 集合名称列表。"""from pymilvus import connections, utilitytry:# 使用RagConfig中的Milvus_uri连接connections.connect(uri=RagConfig.Milvus_uri)collections = utility.list_collections()print(f"从Milvus获取到的集合列表: {collections}")return collectionsexcept Exception as e:logger.error(f"获取 Milvus 集合列表失败: {e}")return []def drop_collection(collection_name):"""从数据库中删除指定的集合。collection_name:表示要删除的集合名。"""from pymilvus import connectionstry:# 使用RagConfig中的Milvus_uri连接connections.connect(uri=RagConfig.Milvus_uri)from pymilvus import utility# 检查集合是否存在if utility.has_collection(collection_name):return client.drop_collection(collection_name = collection_name)else:logger.warning(f"集合 {collection_name} 不存在,无需删除")return {"code": 0, "message": f"集合 {collection_name} 不存在"}except Exception as e:logger.error(f"删除 Milvus 集合 {collection_name} 失败: {e}")return {"code": -1, "message": f"删除集合失败: {str(e)}"}

在RAG中的Milvus集成

create_index_remote方法总结
  1. 方法功能:异步创建远程向量索引(BaseIndex类型),用于基于数据构建可远程访问的向量存储索引,支持自定义集合名与覆盖旧索引。
  2. 核心步骤
    • 加载数据:调用load_data()异步加载数据,若数据为None则初始化为空列表;
    • 解析节点:通过SentenceSplitter(句子分割器)将加载的数据转换为节点列表(nodeList);
    • 配置向量存储:初始化MilvusVectorStore,指定Milvus服务地址(RagConfig.Milvus_uri)、集合名、向量维度(512)、是否覆盖(overwrite)及启用动态字段(支持metadata等);
    • 构建存储上下文:基于上述向量存储,用StorageContext.from_defaults()创建默认配置的存储上下文;
    • 生成索引:以节点列表和存储上下文为参数,通过VectorStoreIndex构建向量存储索引并返回。
  1. 参数说明
    • collection_name:向量集合名称,默认值为"default";
    • overwrite:是否覆盖已有集合,默认值为False。
# 创建远程索引方法
async  def create_index_remote(self, collection_name="default", overwrite=False)->BaseIndex:data = await self.load_data()     # 加载数据# 确保data不为Noneif data is None:data = []node_parser = SentenceSplitter()     # 创建节点解析器nodeList = node_parser.get_nodes_from_documents(data)    # 获取节点# 创建向量数据索引vector_store =  MilvusVectorStore(uri = RagConfig.Milvus_uri,   # Milvus服务地址collection_name = collection_name,   # 向量集合名称dim = 512,   # 向量维度overwrite= overwrite,  # 是否覆盖enable_dynamic_field=True  # 启用动态字段以支持metadata等额外字段)# 创建储存上下文,使用默认配置并指定向量存储storage_context = StorageContext.from_defaults(vector_store=vector_store)# 构建向量存储索引,基于给定的节点和存储上下⽂index = VectorStoreIndex(nodeList,storage_context=storage_context)# 返回构建好的向量存储索引return index

嵌入模型(BGE-small)的使用

系统使用BGE-small-zh-v1.5作为嵌入模型,将文本转换为向量表示。该模型针对中文进行了优化,能够生成高质量的文本向量。

嵌入模型配置 (embeddings.py)

代码简短总结
  1. 核心功能:定义函数将文本转换为向量表示(即创建本地嵌入模型),依赖llama_index框架的HuggingFaceEmbedding模块。
  2. 关键函数embed_model_local_bge_small(**kwargs),支持传入额外参数(**kwargs)。
  3. 模型配置
    • 使用的嵌入模型为BAAI/bge-small-zh-v1.5(适用于中文文本的轻量模型);
    • 模型缓存路径设为../embed_cache,避免重复下载。
  1. 输出结果:函数执行后返回配置好的本地嵌入模型实例。
# 本地嵌入模型的创建函数,用于将文本转换为向量表示。
from llama_index.embeddings.huggingface import HuggingFaceEmbedding# 创建本地/在线嵌⼊模型库
def embed_model_local_bge_small(**kwargs):embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-zh-v1.5",cache_folder = r"../embed_cache",**kwargs)return embed_model

在系统中的使用

# 设置本地嵌入模型,避免使用OpenAI
Settings.embed_model = embed_model_local_bge_small()

总结

本文深入分析了RAG知识库管理系统的核心模块实现,包括:

  1. 传统RAG与多模态RAG的区别与实现:通过继承RAG基类,分别实现了处理纯文本和多模态文档的功能
  2. 文档处理流程:详细介绍了PDF和PPT文档的处理过程,包括文本提取、图像识别和表格处理
  3. OCR技术集成:通过集成多种OCR技术,实现了从图像中提取文本的功能
  4. 向量数据库Milvus集成:使用Milvus存储文档向量,支持高效的相似性检索
  5. 嵌入模型的使用:采用BGE-small-zh-v1.5模型生成文本向量,为检索提供基础

通过这些技术的综合应用,系统能够处理多种类型的文档,并基于文档内容进行智能问答,为用户提供高质量的知识服务。在下一篇文章中,我们将探讨前端实现的相关内容。

http://www.dtcms.com/a/500232.html

相关文章:

  • 网站建设制作小程序开发wordpress 标点排版
  • 【单调栈 离散化】P10798 「CZOI-R1」消除威胁|普及+
  • 邵武建设局网站wordpress多用户博客
  • (Kotlin高级特性三)Kotlin密封类(Sealed Class)在何时比枚举更适用?
  • kalibr进行相机内参以及相机imu的融合标定
  • 最简单的做网站工具网站发外链的好处
  • 北京神州网站建设xxx网站策划书
  • linux开启bbr网络优化
  • 前后端路径处理完整指南:从零开始理解Web开发中的路径问题
  • 为什么网站要备案头条新闻 免费下载
  • 汇通网做期货的网站做期货的网站软件开发平台 devcloud
  • 专门做橱柜衣柜效果图的网站青海网站建设怎么建设
  • 算法沉淀第六天(牛客小白月赛122 和 Codeforces Round 1059 (Div. 3))
  • 网站建设与维护蒋勇从前端开发培训机构有哪些
  • 网站建设后的心得浙江省建设通网站
  • Git的多人协作
  • 成都开发网站建设怎么下载应用商店
  • 14-哈希SHA1案例:宝钢
  • Python数据分析:小实例,数人头
  • 单页面网站怎么做软件项目开发文档模板
  • 松岗营销型网站建设软文范例大全
  • 本地网站建设方案信息大全网站数据迁移教程
  • 麦肯锡:从「AI价值悖论」到代理式 AI 的产业化落地
  • 金华市建设技工学校教育培训网站什么是网站建设整体策划方案
  • C++动态规划入门指南——助力CSP竞赛夺冠(加强版)
  • 【前端高级特效】使用 CSS 实现毛玻璃模糊背景效果(含完整源码讲解)
  • 网站备案花钱么培训学校网站
  • 【人工智能系列:机器学习学习和进阶01】机器学习初学者指南:理解核心算法与应用
  • 利用舵机实现机器人行走
  • 做网站时需要FTP工具吗济南市工程造价信息网