Ragflow 源码:task_executor.py
目录
- 介绍
- 主要功能
- 核心组件
- 流程图
- 核心代码解释
- 1. 系统架构与核心组件
- 2. 核心处理流程
- 3. 高级处理能力
- 4. 关键创新点
- 5. 容错与监控机制
- 6. 性能优化技巧
介绍
task_executor.py
是RAGFlow系统中的任务执行器(Task Executor)核心部分,主要负责文档的解析、分块(chunking)、向量化(embedding)和索引(indexing)处理流程。
主要功能
-
文档处理流水线:
- 从存储系统(如MinIO)获取文档
- 根据文档类型选择相应的解析器(parser)
- 将文档分块处理
- 生成向量表示(embeddings)
- 存储到向量数据库中
-
高级处理能力:
- 支持RAPTOR(递归抽象处理)算法
- 支持知识图谱(GraphRAG)构建
- 自动关键词提取和问题生成
- 内容标签自动标注
-
任务管理:
- 从Redis队列获取任务
- 任务状态跟踪和报告
- 任务取消处理
- 失败任务恢复
核心组件
-
文档解析器工厂(FACTORY):
- 针对不同类型的文档(论文、书籍、演示文稿、法律文件等)有不同的解析器
- 使用注册模式动态选择解析器
-
并发控制:
- 使用Trio库实现异步并发
- 通过CapacityLimiter控制并发任务数
- 分块构建、MinIO操作等都有独立的并发限制
-
错误处理和监控:
- 详细的任务状态跟踪
- 心跳报告机制
- 内存监控和快照功能
- 任务取消和超时处理
流程图
核心代码解释
1. 系统架构与核心组件
-
并发控制体系:
- 使用
trio
异步框架实现高效I/O操作 - 四级并发限制器:
task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) # 任务级并发(默认5) chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) # 分块处理并发(默认1) minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO) # 存储操作并发(默认10) kg_limiter = trio.CapacityLimiter(2) # 知识图谱处理并发
- 使用
-
文档处理工厂模式:
FACTORY = {"general": naive,ParserType.PAPER.value: paper, # 学术论文处理器ParserType.BOOK.value: book, # 书籍处理器ParserType.TABLE.value: table, # 表格处理器# ...其他15+种文档类型处理器 }
2. 核心处理流程
-
任务处理主循环 (
handle_task
函数):async def handle_task():redis_msg, task = await collect() # 从Redis获取任务CURRENT_TASKS[task["id"]] = task # 登记当前任务try:await do_handle_task(task) # 执行实际处理redis_msg.ack() # 确认任务完成except Exception as e:FAILED_TASKS += 1set_progress(task["id"], prog=-1, msg=f"[Exception]: {str(e)}")
-
文档处理三阶段 (
do_handle_task
函数):# 阶段1:文档解析与分块 chunks = await build_chunks(task, progress_callback)# 阶段2:向量化处理 token_count, vector_size = await embedding(chunks, embedding_model)# 阶段3:存储索引 await settings.docStoreConn.insert(chunks, index_name, kb_id)
3. 高级处理能力
-
RAPTOR算法实现:
raptor = Raptor(max_cluster=64, # 最大聚类数chat_model=chat_mdl, # LLM模型embd_model=embd_mdl, # 嵌入模型prompt=config["prompt"], # 聚类提示词max_token=config["max_token"], # 最大token数threshold=config["threshold"] # 相似度阈值 ) chunks = await raptor.process(original_chunks)
-
知识图谱构建:
await run_graphrag(task, language=task_language,with_resolution=True, # 是否解析关系with_community=True, # 是否构建社区chat_model=chat_mdl,embd_model=embd_mdl )
4. 关键创新点
-
智能分块增强:
# 自动关键词提取 d["important_kwd"] = keyword_extraction(chat_mdl, content)# 自动问题生成 d["question_kwd"] = question_proposal(chat_mdl, content)# 智能标签系统 d[TAG_FLD] = content_tagging(chat_mdl, content, all_tags)
-
混合向量生成:
# 标题向量权重调整 title_w = parser_config.get("filename_embd_weight", 0.1) vects = (title_w * title_vectors + (1-title_w) * content_vectors)
5. 容错与监控机制
-
分布式锁管理:
with RedisDistributedLock("clean_task_executor"):# 清理超时workerREDIS_CONN.srem("TASKEXE", expired_workers)
-
内存监控系统:
def start_tracemalloc_and_snapshot():tracemalloc.start()snapshot = tracemalloc.take_snapshot()snapshot.dump(f"snapshot_{timestamp}.trace")logging.info(f"Peak memory: {peak / 10**6:.2f} MB")
-
心跳监测系统:
REDIS_CONN.zadd(CONSUMER_NAME, json.dumps({"pending": PENDING_TASKS,"current": CURRENT_TASKS,# ...其他状态指标}), timestamp )
6. 性能优化技巧
-
批量处理策略:
# 向量化批量处理 for i in range(0, len(texts), batch_size=16):vectors = await mdl.encode(texts[i:i+16])
-
缓存机制:
# LLM结果缓存 cached = get_llm_cache(llm_name, text, "keywords") if not cached:cached = await keyword_extraction(llm, text)set_llm_cache(llm_name, text, cached)