Ragflow 文档处理深度解析:从解析到存储的完整流程
1. 总体架构概览
Ragflow 的文档处理流程可以分为以下几个核心阶段:
文档上传 → 解析文档构建为Chunk → 数据增强阶段 → Embedding计算 → ES存储
- 文档上传:用户通过前端界面上传原始文档,并配置解析参数,如:切片方法、自动关键词提取、自动问题提取数量等。
- 解析文档构建为Chunk:后端根据设定的切片方法选择合适的解析器,将文档内容切分为结构化的知识片段(Chunk)
- 数据增强阶段:对切分后的 Chunk 进行数据增强,比如对每个 Chunk 通过AI提取一些关键词、检索问题等
- Embedding计算:对每个 Chunk 计算向量表示(Embedding)。
- ES存储:将完整的 Chunk 信息写入 Elasticsearch,在后续知识检索时被使用。信息包括每个 Chunk 的文本、AI提取的关键词、AI提取的问题、Embedding等
源码主要对应于:
- Task Executor (
rag/svr/task_executor.py
):解析任务执行 - Parser Factory (
rag/app/
):各种文档解析器
2. 解析文档构建Chunk阶段 (build_chunks
)
这个阶段负责将原始文档转换为结构化的知识片段。
不同类型的文档(如PDF、Word、Excel等)需要采用不同的解析策略来保证内容提取的准确性。
Ragflow 前端界面提供了多种切片方法选项,每种方法都对应特定的Python解析器类。下面详细说明前端切片方法与后端源码的对应关系:
前端切片方法与源码映射表
前端显示名称 | 对应源码文件 | 读取文档进行分开的函数名 | 适用场景 |
---|---|---|---|
General | rag/app/naive.py | naive.chunk() | 通用文档解析,支持PDF、Word、HTML等常规格式 |
Resume | rag/app/resume.py | resume.chunk() | 简历文档专用,能识别姓名、经历、技能等结构化信息 |
Manual | rag/app/manual.py | manual.chunk() | 手册类文档,支持层次化章节结构解析 |
Table | rag/app/table.py | table.chunk() | 表格数据专用,保持表格结构和关系 |
Paper | rag/app/paper.py | paper.chunk() | 学术论文专用,能识别摘要、作者、章节等学术结构 |
Book | rag/app/book.py | book.chunk() | 长篇书籍文档,支持目录识别和智能章节分割 |
Laws | rag/app/laws.py | laws.chunk() | 法律法规文档,按条款和章节进行结构化分割 |
Presentation | rag/app/presentation.py | presentation.chunk() | PPT演示文档,按页面分割并保留图像信息 |
One | rag/app/one.py | one.chunk() | 整文档模式,将整个文档作为单一chunk保持完整性 |
Picture | rag/app/picture.py | picture.chunk() | 图片文档解析,提取图像信息和OCR文本 |
Audio | rag/app/audio.py | audio.chunk() | 音频文档解析,语音转文字处理 |
rag/app/email.py | email.chunk() | 邮件文档解析,提取邮件头、正文等结构化信息 | |
Q&A | rag/app/qa.py | qa.chunk() | 问答对文档解析,识别问题-答案结构 |
Tag | rag/app/tag.py | tag.chunk() | 标签文档解析,处理标签化的结构数据 |
Knowledge Graph | rag/app/naive.py | naive.chunk() | 知识图谱模式,复用通用解析器但支持图谱构建 |
工厂模式实现
在 rag/svr/task_executor.py
中,系统通过工厂模式将前端选择映射到具体的解析器:
from rag.app import laws, paper, presentation, manual, qa, table, book, resume, picture, naive, one, audio, email, tagFACTORY = {"general": naive,ParserType.NAIVE.value: naive, // 通用文档解析器,适用于PDF、Word、HTML等常规格式ParserType.PAPER.value: paper, // 学术论文解析器,能识别摘要、作者、章节等学术结构ParserType.BOOK.value: book, // 书籍解析器,支持目录识别和章节分割ParserType.PRESENTATION.value: presentation, // 演示文稿(PPT)解析器,按页面分割并保留图像信息ParserType.MANUAL.value: manual, // 手册类文档解析器,支持层次化章节结构ParserType.LAWS.value: laws, // 法律法规解析器,按条款和章节结构化分割ParserType.QA.value: qa, // 问答类文档解析器ParserType.TABLE.value: table, // 表格数据解析器,保持表格结构和关系ParserType.RESUME.value: resume, // 简历解析器,能识别姓名、经历、技能等结构化信息ParserType.PICTURE.value: picture, // 图片文档解析器ParserType.ONE.value: one, // 整文档模式,将整个文档作为单一chunkParserType.AUDIO.value: audio, // 音频文档解析器ParserType.EMAIL.value: email, // 邮件文档解析器ParserType.KG.value: naive, // 知识图谱模式,复用naive解析器ParserType.TAG.value: tag // 标签文档解析器
}
当用户在前端选择某种切片方法时,系统会根据选择调用对应的解析器:
# 根据用户选择获取解析器
chunker = FACTORY[task["parser_id"].lower()] // task["parser_id"].lower() 示例:table
# 调用对应解析器的chunk方法
chunks = chunker.chunk(filename, binary=binary, **params)
Chunk数据格式详解
每个解析器的 chunk()
方法都会返回一个chunk列表,每个chunk是一个包含标准化字段的字典对象。不同类型的解析器会生成一些特定的字段,但所有chunk都包含以下核心字段:
基础字段(所有chunk共有):
字段名 | 数据类型 | 说明 | 示例 |
---|---|---|---|
docnm_kwd | string | 源文档文件名(关键词化) | "技术文档.pdf" |
title_tks | list | 文档标题的分词结果 | ["技术", "文档"] |
title_sm_tks | list | 文档标题的细粒度分词 | ["技", "术", "文", "档"] |
content_with_weight | string | 带权重的内容文本 | "这是文档的主要内容..." |
content_ltks | list | 内容的标准分词结果 | ["这是", "文档", "的", "主要", "内容"] |
content_sm_ltks | list | 内容的细粒度分词 | ["这", "是", "文", "档", "的", "主", "要", "内", "容"] |
位置字段(PDF等结构化文档):
字段名 | 数据类型 | 说明 | 示例 |
---|---|---|---|
page_num_int | list[int] | chunk所在页面编号列表 | [1, 2] |
position_int | list[tuple] | 精确位置坐标(页面, 左, 右, 顶, 底) | [(1, 100, 500, 200, 300)] |
top_int | list[int] | chunk在页面中的顶部位置 | [200] |
图像字段(包含图片的chunk):
字段名 | 数据类型 | 说明 | 示例 |
---|---|---|---|
image | PIL.Image | 关联的图像对象 | <PIL.Image.Image> |
doc_type_kwd | string | 文档类型标识 | "image" |
其它解析器不再一一展开介绍,需要用到的时候可以读对应源码。
3. 数据增强阶段(通过设定的AI Promt提示词生成新的数据)
如果切片方法选择 General、Book、One 等,可以在前端界面上指定 自动关键词提取、自动问题提取 数量。如果指定数量不为 0,那么在基础的文档解析完成后,Ragflow 还会调用AI服务来增强chunk的语义信息。
3.1 自动关键词提取
当配置 自动关键词提取(auto_keywords) 数量大于 0 时,系统会为每个chunk自动生成指定数量的关键词。
关键词提取的prompt (rag/prompts/keyword_prompt.md
) 如下:
## Role
You are a text analyzer.## Task
Extract the most important keywords/phrases of a given piece of text content.## Requirements
- Summarize the text content, and give the top {{ topn }} important keywords/phrases.
- The keywords MUST be in the same language as the given piece of text content.
- The keywords are delimited by ENGLISH COMMA.
- Output keywords ONLY.---## Text Content
{{ content }}
其中 content 是 Chunk 的文本,topn 是用户配置的关键词数量。
3.2 自动问题提取
当配置 自动问题提取(auto_questions) 数量大于 0 时,系统会为每个chunk自动生成指定数量的问题。
问题提取的prompt (rag/prompts/question_prompt.md
) 如下:
## Role
You are a text analyzer.## Task
Propose {{ topn }} questions about a given piece of text content.## Requirements
- Understand and summarize the text content, and propose the top {{ topn }} important questions.
- The questions SHOULD NOT have overlapping meanings.
- The questions SHOULD cover the main content of the text as much as possible.
- The questions MUST be in the same language as the given piece of text content.
- One question per line.
- Output questions ONLY.---## Text Content
{{ content }}
其中 content 也是 Chunk 的文本,topn 是用户配置的问题数量。
4. Embedding计算阶段
Embedding计算是将文本转换为向量表示的关键步骤,这些向量将用于后续的语义检索。Ragflow 采用了标题和内容加权融合的embedding策略,具体实现位于 rag/svr/task_executor.py
的 embedding
函数中。
Ragflow 计算 Chunk 的 embedding 时,计算策略的伪代码如下:
content = AI提取的问题 if exists(AI提取的问题) else 原始Chunk文本
final_embeding = encode_as_embeding(Chunk所在文档的标题文本) * 0.1 + encode_as_embeding(content) * 0.9
说明:每个 Chunk 的 embedding 向量为 文档标题文本的 embedding 向量乘以 0.1,和 Chunk 的内容文本的 embedding 向量乘以 0.9 的和。其中 Chunk 的内容文本优先设置为对该Chunk的AI提取出的问题,如果没有指定自动问题提取数量,则问题为空,此时才会使用原始的 Chunk 文本。
具体源码的注释如下:
async def embedding(docs, mdl, parser_config=None, callback=None):"""为文档chunks计算embedding向量Args:docs: chunk列表,每个chunk是一个字典mdl: embedding模型对象parser_config: 解析器配置,包含embedding权重等参数callback: 进度回调函数Returns:tk_count: 总token消耗数量vector_size: embedding向量的维度"""if parser_config is None:parser_config = {}# 第一步:准备每个Chunk的标题文本和内容文本tts, cnts = [], [] # tts=titles标题列表, cnts=contents内容列表for d in docs:# 标题文本:使用文档名称,所有chunk共享同一个文档标题tts.append(d.get("docnm_kwd", "Title"))# 内容文本:优先使用AI生成的问题,如果没有则使用原始chunk内容c = "\n".join(d.get("question_kwd", [])) # 将AI生成的问题列表合并为一个字符串if not c: # 如果没有AI问题,则使用原始内容c = d["content_with_weight"]c = re.sub(r"</?(table|td|caption|tr|th)( [^<>]{0,12})?>", " ", c)if not c: c = "None"cnts.append(c)# 第二步:计算标题embedding向量(只计算一次,然后复制给相同文档的所有chunk)tk_count = 0 if len(tts) == len(cnts): # 只对第一个标题进行embedding编码vts, c = await trio.to_thread.run_sync(lambda: mdl.encode(tts[0: 1]))# 将标题向量复制给所有chunk(因为同一文档的所有chunk共享标题)tts = np.concatenate([vts for _ in range(len(tts))], axis=0)tk_count += c # 第三步:分批计算内容embedding向量(避免内存溢出)cnts_ = np.array([]) # 存储所有内容向量for i in range(0, len(cnts), EMBEDDING_BATCH_SIZE):async with embed_limiter: # 对当前批次的内容进行embeddingvts, c = await trio.to_thread.run_sync(lambda: mdl.encode([truncate(c, mdl.max_length-10) for c in cnts[i: i + EMBEDDING_BATCH_SIZE]]))# 拼接当前批次的向量到总向量数组if len(cnts_) == 0:cnts_ = vtselse:cnts_ = np.concatenate((cnts_, vts), axis=0)tk_count += c callback(prog=0.7 + 0.2 * (i + 1) / len(cnts), msg="")cnts = cnts_ # 将批量计算出的 content embeding 结果赋值给cnts# 第四步:加权融合标题向量和内容向量# 获取标题权重配置,默认为0.1(即10%)filename_embd_weight = parser_config.get("filename_embd_weight", 0.1)if not filename_embd_weight: # 处理None值的情况filename_embd_weight = 0.1title_w = float(filename_embd_weight) # 标题权重logging.info("标题权重 title_w: %s, 内容权重 1-title_w: %s", title_w, 1-title_w)# 加权融合公式:final_vector = title_weight * title_vector + content_weight * content_vector# 如果标题和内容向量数量一致,则进行融合;否则只使用内容向量vects = (title_w * tts + (1 - title_w) * cnts) if len(tts) == len(cnts) else cnts# 第五步:将最终embedding向量存储到每个chunk中assert len(vects) == len(docs) # 确保向量数量与chunk数量一致vector_size = 0for i, d in enumerate(docs):v = vects[i].tolist() # 将numpy数组转换为Python列表,便于JSON序列化vector_size = len(v) # 记录向量维度# 存储向量到chunk中,字段名格式为 q_{维度}_vec(如q_768_vec、q_1024_vec)d["q_%d_vec" % len(v)] = vreturn tk_count, vector_size # 返回token消耗和向量维度
5. ES存储阶段
最终处理后的每个Chunk数据会被存入 Doc Store 中,默认是 ES (rag\utils\es_conn.py)。
最终 ES Chunk 结构示例
{"id": "abc123...","doc_id": "doc_001","kb_id": "kb_001","docnm_kwd": "技术文档.pdf","title_tks": "技术 文档","title_sm_tks": "技 术 文 档","content_with_weight": "这是一段技术内容...","content_ltks": "这是 一段 技术 内容","content_sm_ltks": "这 是 一 段 技 术 内 容","important_kwd": ["技术", "内容", "文档"],"important_tks": "技术 内容 文档","question_kwd": ["什么是技术?", "如何使用?"],"question_tks": "什么 是 技术 如何 使用","q_768_vec": [0.1, 0.2, ..., 0.8], // 768维embedding向量"page_num_int": [1],"position_int": [(1, 100, 500, 200, 300)],"create_time": "2024-01-01 12:00:00","create_timestamp_flt": 1704067200.0
}