DeerFlow多智能体项目分析-向量数据库实现知识检索的源码解析
概述
DeerFlow 项目采用模块化设计,支持多种向量数据库和RAG(检索增强生成)提供商,为AI研究提供强大的知识检索能力。
项目地址:https://github.com/bytedance/deer-flow
LangGraph中文在线文档:https://github.langchain.ac.cn/langgraph/agents/agents/
支持的向量数据库
1. 核心抽象接口
项目定义了统一的检索器抽象基类:
文件路径: src/rag/retriever.py
class Retriever(abc.ABC):@abc.abstractmethoddef list_resources(self, query: str | None = None) -> list[Resource]:"""列出RAG提供商的资源"""pass@abc.abstractmethoddef query_relevant_documents(self, query: str, resources: list[Resource] = []) -> list[Document]:"""从资源中查询相关文档"""pass
2. 数据模型定义
文件路径: src/rag/retriever.py
class Chunk:"""文本块"""content: strsimilarity: floatclass Document:"""文档对象"""id: strurl: str | None = Nonetitle: str | None = Nonechunks: list[Chunk] = []class Resource(BaseModel):"""资源描述"""uri: str = Field(..., description="资源URI")title: str = Field(..., description="资源标题")description: str | None = Field("", description="资源描述")
向量数据库实现
1. Milvus 实现
文件路径: src/rag/milvus.py
嵌入模型支持
class DashscopeEmbeddings:"""OpenAI兼容的嵌入包装器"""def embed_query(self, text: str) -> List[float]:"""返回给定文本的嵌入向量"""def embed_documents(self, texts: List[str]) -> List[List[float]]:"""返回多个文档的嵌入向量(LangChain接口)"""
Milvus检索器配置
class MilvusRetriever(Retriever):"""基于Milvus向量存储的检索器实现"""def __init__(self) -> None:# 连接配置self.uri: str = get_str_env("MILVUS_URI", "http://localhost:19530")self.collection_name: str = get_str_env("MILVUS_COLLECTION", "documents")# 嵌入配置self.embedding_provider = get_str_env("MILVUS_EMBEDDING_PROVIDER", "openai")self.embedding_dim: int = self._get_embedding_dimension(self.embedding_model)
集合Schema定义
def _create_collection_schema(self) -> CollectionSchema:"""构建Milvus集合Schema"""fields = [FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True),FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.embedding_dim),FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),FieldSchema(name="url", dtype=DataType.VARCHAR, max_length=1024),]
2. RAGFlow 实现
文件路径: src/rag/ragflow.py
class RAGFlowProvider(Retriever):"""使用RAGFlow检索文档的提供商"""def query_relevant_documents(self, query: str, resources: list[Resource] = []) -> list[Document]:"""查询相关文档"""payload = {"question": query,"dataset_ids": dataset_ids,"document_ids": document_ids,"page_size": self.page_size,}response = requests.post(f"{self.api_url}/api/v1/retrieval", headers=headers, json=payload)
3. VikingDB 实现
文件路径: src/rag/vikingdb_knowledge_base.py
class VikingDBKnowledgeBaseProvider(Retriever):"""使用VikingDB知识库API检索文档的提供商"""def _create_signature(self, method: str, path: str, query_params: dict, headers: dict, payload: bytes) -> str:"""创建API签名"""# HMAC-SHA256签名实现def _make_signed_request(self, method: str, path: str, params: dict = None, data: dict = None):"""发起签名请求"""
配置管理
1. RAG提供商配置
文件路径: src/config/tools.py
class RAGProvider(enum.Enum):DIFY = "dify"RAGFLOW = "ragflow"VIKINGDB_KNOWLEDGE_BASE = "vikingdb_knowledge_base"MOI = "moi"MILVUS = "milvus"SELECTED_RAG_PROVIDER = os.getenv("RAG_PROVIDER")
2. 构建器模式
文件路径: src/rag/builder.py
def build_retriever() -> Retriever | None:if SELECTED_RAG_PROVIDER == RAGProvider.DIFY.value:return DifyProvider()elif SELECTED_RAG_PROVIDER == RAGProvider.RAGFLOW.value:return RAGFlowProvider()elif SELECTED_RAG_PROVIDER == RAGProvider.MILVUS.value:return MilvusProvider()# ... 其他提供商
工具集成
检索工具
文件路径: src/tools/retriever.py
class RetrieverTool(BaseTool):name: str = "local_search_tool"description: str = "从带有`rag://`前缀的文件中检索信息"def _run(self, keywords: str, run_manager: Optional[CallbackManagerForToolRun] = None) -> list[Document]:documents = self.retriever.query_relevant_documents(keywords, self.resources)return [doc.to_dict() for doc in documents]def get_retriever_tool(resources: List[Resource]) -> RetrieverTool | None:"""获取检索工具实例"""retriever = build_retriever()return RetrieverTool(retriever=retriever, resources=resources)
环境配置
Milvus 配置示例
文件路径: .env.example
# Milvus向量数据库配置
RAG_PROVIDER=milvus
MILVUS_URI=http://localhost:19530 # 或 ./milvus_demo.db (Lite版)
MILVUS_COLLECTION=documents
MILVUS_EMBEDDING_PROVIDER=openai
MILVUS_EMBEDDING_MODEL=text-embedding-ada-002
MILVUS_EMBEDDING_API_KEY=your_api_key
MILVUS_TOP_K=10
MILVUS_AUTO_LOAD_EXAMPLES=true
RAGFlow 配置示例
# RAGFlow配置
RAG_PROVIDER=ragflow
RAGFLOW_API_URL="http://localhost:9388"
RAGFLOW_API_KEY="ragflow-xxx"
RAGFLOW_RETRIEVAL_SIZE=10
RAGFLOW_CROSS_LANGUAGES=English,Chinese,Spanish,French,German,Japanese,Korean
VikingDB 配置示例
# VikingDB知识库配置
RAG_PROVIDER=vikingdb_knowledge_base
VIKINGDB_KNOWLEDGE_BASE_API_URL="api-knowledgebase.mlp.cn-beijing.volces.com"
VIKINGDB_KNOWLEDGE_BASE_API_AK="AKxxx"
VIKINGDB_KNOWLEDGE_BASE_API_SK="your_secret_key"
VIKINGDB_KNOWLEDGE_BASE_RETRIEVAL_SIZE=15
技术特性
1. 模块化设计
- 统一的抽象接口,易于扩展新的向量数据库
- 工厂模式动态创建检索器实例
- 配置驱动的提供商选择
2. 多模态支持
- 支持文本、图片等多种数据类型
- 灵活的嵌入模型配置
- 跨语言检索能力
3. 高性能检索
- 向量相似度计算
- 可配置的检索结果数量
- 智能文档分块和聚合
4. 企业级特性
- API签名认证(VikingDB)
- 连接池和错误处理
- 详细的日志记录
使用场景
- 私有知识库检索: 从用户上传的文档中检索相关信息
- 上下文增强: 为AI研究提供领域特定的背景知识
- 多源数据融合: 整合来自不同数据源的信息
- 实时查询: 在研究过程中动态检索相关文档
总结
DeerFlow的向量数据库架构体现了现代AI应用的最佳实践:
- 抽象化设计提供了良好的扩展性
- 多提供商支持满足不同部署需求
- 统一的工具接口简化了集成复杂度
- 丰富的配置选项支持灵活的定制化
这种设计使得DeerFlow能够适应各种企业环境和技术栈,为AI研究提供强大而灵活的知识检索能力。
