Dify笔记 知识库
一、知识库操作
1、创建知识库
dify支持文档级别的元数据,在知识库添加元数据key,在每个文档上配置对应的key-value元数据。在知识检索节点开启元数据过滤功能,可以实现自定义文档文档过滤
2、添加文件
1、分段设置
选择本地文件上传,设置文档分块方式。在这一步之前,需要对文档进行预处理:
PDF用pdfplumber和PyMuPDF,docx使用python-docx。
根据需求制定分块规则,将原始文档解析、分块,在块之间插入自定义分隔符,比如
$$$$
结果保存为txt或者markdown。
dify上传文本时分段标识符选择自定义分隔符
$$$$
分段最大长度=4000
2、索引方式
选择合适的embedding模型,一般选择bge系列
3、知识库使用
在工作流中设置知识检索节点,并将输出结果作为LLM节点的入参
二、源码分析
1、文件上传
接口:http://127.0.0.1/console/api/files/upload?source=datasets
# api\controllers\console\__init__.py
bp = Blueprint("console", __name__, url_prefix="/console/api")
api = ExternalApi(bp)# File
api.add_resource(FileApi, "/files/upload")
经过条件判断之后,最终执行FileService.upload_file
if source == "datasets" and extension not in DOCUMENT_EXTENSIONS:raise UnsupportedFileTypeError()
可知:dify只支持在DOCUMENT_EXTENSIONS列表的文件拓展名
经过文件名、文件后缀名、文件大小判断之后,调用storage.save将该文件以对应存储方式保存
# save file to storagestorage.save(file_key, content)
最后,创建UploadFile对象实例,将其保存到数据库。
2、文件处理
根据用户选择的分块、索引方式完成分档分块和向量化
# api\controllers\console\datasets\datasets_document.py
api.add_resource(DatasetDocumentListApi, "/datasets/<uuid:dataset_id>/documents")
验证用户权限, 解析请求参数
KnowledgeConfig -> DocumentService.document_create_args_validate(knowledge_config)
调用 DocumentService.save_document_with_dataset_id,进行文档保存
FeatureService.get_features(...)
专门用于处理与“功能开关”或“特性配置”相关的业务逻辑。
用于进行文档个数校验,默认最大20个
if knowledge_config.original_document_id:document = DocumentService.update_document_with_dataset_id(dataset, knowledge_config, account)documents.append(document)batch = document.batch
检查是否提供了原始文档 ID;如果有,则调用服务更新该文档的内容与配置
lock_name = "add_document_lock_dataset_id_{}".format(dataset.id)
with redis_client.lock(lock_name, timeout=600):
获取Redis 分布式锁,开启
position = DocumentService.get_documents_position(dataset.id)
为新添加的文档分配一个递增的顺序编号(position),保证文档在知识库中的顺序性。
遍历上传的文件,创建Document对象,document_ids保存文档id。
document_indexing_task.delay(dataset.id, document_ids)
当所有的规则处理完成以后,会把任务通过document_indexing_task.delay(dataset.id, document_ids)
推到队列中。
在tasks目录下,有一个document_indexing_task.py
进行异步解析。异步celery task,执行完成后将结果保存到数据库和向量数据库
核心逻辑在IndexingRunner.run
中
# api\tasks\document_indexing_task.pyindexing_runner = IndexingRunner()indexing_runner.run(documents)
这里用了模板方法+工厂+策略模式实现
-
首先对数据集进行校验,不存在,直接终止
-
获取处理规则,没有处理规则也直接终止
-
根据索引类型
IndexProcessorFactory
,获取对应的索引处理器:ParagraphIndexProcessor(通用)
、QAIndexProcessor(问答)
、ParentChildIndexProcessor(父子分段)
-
获取文档内容
_extract
,解决了不同文档的差异性支持三种数据源类型
统一返回Document对象列表
自动更新处理进度
-
转换阶段
_transform
执行文本清洗(根据处理规则)
分块处理(自定义或自动分块)
语言处理(国际化支持)
向量化准备(高质量索引模式)
-
分段处理
_load_segments
使用DatasetDocumentStore管理分段
支持父子文档结构
原子化状态更新
-
索引构建
_load
高质量模式:使用嵌入模型并行处理
经济模式:构建关键词倒排索引
状态一致性管理