FastGPT源码解析 Agent知识库管理维护使用详解
FastGPT 知识库后端实现分析
核心架构概览
FastGPT 知识库后端采用分层存储 + 向量检索架构,实现了完整的 RAG (Retrieval-Augmented Generation) 系统,支持多种数据源和检索模式。
1. 数据模型设计
核心实体关系
Dataset (知识库)
├── Collection (文档集合)
│ ├── DatasetData (数据块)
│ │ └── indexes (向量索引)
│ └── DatasetDataText (全文索引)
└── DatasetTraining (训练队列)
数据库 Schema
Dataset Schema (知识库)
const DatasetSchema = new Schema({parentId: { type: ObjectId, ref: 'datasets' }, // 父级知识库(支持层级)teamId: { type: ObjectId, required: true }, // 团队IDtmbId: { type: ObjectId, required: true }, // 创建者IDtype: { enum: DatasetTypeEnum }, // 类型: dataset/folder/websiteDatasetstatus: { enum: DatasetStatusEnum }, // 状态: active/syncingname: { type: String, required: true }, // 名称vectorModel: { type: String, default: 'text-embedding-3-small' }, // 向量模型agentModel: { type: String, default: 'gpt-4o-mini' }, // 处理模型websiteConfig: { url: String, selector: String }, // 网站配置apiServer: Object, // API数据源配置autoSync: Boolean // 自动同步
});
DatasetData Schema (数据块)
const DatasetDataSchema = new Schema({teamId: { type: ObjectId, required: true },datasetId: { type: ObjectId, required: true },collectionId: { type: ObjectId, required: true },q: { type: String, required: true }, // 问题/标题a: { type: String, default: '' }, // 答案/内容indexes: [{ // 向量索引数组defaultIndex: Boolean,dataId: String, // 向量IDtext: String // 索引文本}],chunkIndex: { type: Number, default: 0 }, // 分块索引updateTime: { type: Date, default: Date.now }
});
2. 数据处理流程
数据导入管道
// 1. 文件上传 → 2. 内容解析 → 3. 分块处理 → 4. 向量化 → 5. 存储
export async function pushDataListToTrainingQueue({teamId, tmbId, datasetId, collectionId,agentModel, vectorModel, data, trainingMode
}) {// 1. 模型验证const agentModelData = getLLMModel(agentModel);const vectorModelData = getEmbeddingModel(vectorModel);// 2. 数据预处理const filterResult = {success: [], overToken: [], repeat: [], error: []};data.forEach(item => {item.q = simpleText(item.q); // 清理文本item.a = simpleText(item.a);const text = item.q + item.a;if (text.length > maxToken) {filterResult.overToken.push(item);} else if (set.has(text)) {filterResult.repeat.push(item); // 去重} else {filterResult.success.push(item);}});// 3. 批量插入训练队列await MongoDatasetTraining.insertMany(filterResult.success.map(item => ({teamId, datasetId, collectionId,mode: trainingMode,q: item.q, a: item.a,indexes: item.indexes,retryCount: 5})));
}
训练模式
enum TrainingModeEnum {chunk = 'chunk', // 直接分块qa = 'qa', // QA拆分auto = 'auto' // 自动处理
}
3. 检索系统实现
多模式检索引擎
export async function searchDatasetData({teamId, queries, datasetIds, searchMode, similarity, usingReRank, maxTokens
}: SearchDatasetDataProps): Promise<SearchDatasetDataResponse> {// 1. 检索模式配置const { embeddingLimit, fullTextLimit } = countRecallLimit();// 2. 并行检索const { embeddingRecallResults, fullTextRecallResults, tokens } = await multiQueryRecall({ embeddingLimit, fullTextLimit });// 3. 重排序 (可选)const reRankResults = usingReRank ? await datasetDataReRank({ query: reRankQuery, data: concatResults }) : [];// 4. RRF 融合const rrfConcatResults = datasetSearchResultConcat([{ k: 60, list: embeddingRecallResults },{ k: 60, list: fullTextRecallResults },{ k: 58, list: reRankResults }]);// 5. 相似度过滤 + Token限制const scoreFilter = filterBySimilarity(rrfConcatResults, similarity);const finalResults = await filterDatasetDataByMaxTokens(scoreFilter, maxTokens);return { searchRes: finalResults, tokens, searchMode };
}
向量检索
const embeddingRecall = async ({ query, limit }) => {// 1. 文本向量化const { vectors, tokens } = await getVectorsByText({model: getEmbeddingModel(model),input: query,type: 'query'});// 2. 向量数据库检索const { results } = await recallFromVectorStore({teamId, datasetIds,vector: vectors[0],limit,forbidCollectionIdList,filterCollectionIdList});// 3. 关联数据查询const [dataList, collections] = await Promise.all([MongoDatasetData.find({teamId,datasetId: { $in: datasetIds },'indexes.dataId': { $in: results.map(item => item.id) }}),MongoDatasetCollection.find({_id: { $in: collectionIdList }})]);return { embeddingRecallResults: formatResults, tokens };
};
全文检索
const fullTextRecall = async ({ query, limit }) => {// MongoDB 全文索引检索const searchResults = await MongoDatasetDataText.aggregate([{$match: {teamId: new Types.ObjectId(teamId),datasetId: { $in: datasetIds.map(id => new Types.ObjectId(id)) },$text: { $search: jiebaSplit({ text: query }) } // 中文分词}},{ $sort: { score: { $meta: 'textScore' } } },{ $limit: limit }]);return { fullTextRecallResults: formatResults };
};
4. 高级检索功能
重排序 (ReRank)
export const datasetDataReRank = async ({ data, query }) => {const results = await reRankRecall({query,documents: data.map(item => ({id: item.id,text: `${item.q}\n${item.a}`}))});// 添加重排序分数return results.map((item, index) => ({...data.find(d => d.id === item.id),score: [{ type: SearchScoreTypeEnum.reRank, value: item.score, index }]}));
};
RRF 融合算法
// Reciprocal Rank Fusion - 多路检索结果融合
const datasetSearchResultConcat = (resultsList) => {const scoreMap = new Map();resultsList.forEach(({ k, list }) => {list.forEach((item, index) => {const score = 1 / (k + index + 1); // RRF 公式scoreMap.set(item.id, (scoreMap.get(item.id) || 0) + score);});});return Array.from(scoreMap.entries()).sort((a, b) => b[1] - a[1]) // 按分数排序.map(([id]) => findItemById(id));
};
查询扩展
export const datasetSearchQueryExtension = async ({query, extensionModel, extensionBg
}) => {if (!extensionModel) {return { concatQueries: [query], rewriteQuery: query };}// AI 查询扩展const aiResult = await extensionModel.chat({messages: [{ role: 'system', content: extensionBg },{ role: 'user', content: query }]});const extensionQueries = parseExtensionResult(aiResult.content);return {concatQueries: [query, ...extensionQueries],extensionQueries,rewriteQuery: aiResult.rewriteQuery || query,aiExtensionResult: aiResult};
};
5. 数据源集成
多种数据源支持
// 1. 本地文件
const FileLocal = { type: 'fileLocal', formats: ['txt', 'md', 'pdf', 'docx'] };// 2. 网页爬取
const WebsiteDataset = { type: 'websiteDataset',config: { url: string, selector: string }
};// 3. API 数据源
const APIDataset = {type: 'apiDataset',config: { endpoint: string, headers: object, params: object }
};// 4. 第三方平台
const FeishuDataset = { type: 'feishuDataset' };
const YuqueDataset = { type: 'yuqueDataset' };
自动同步机制
// 定时同步任务
const autoSyncDataset = async (datasetId: string) => {const dataset = await MongoDataset.findById(datasetId);if (dataset.autoSync && dataset.type === 'websiteDataset') {// 爬取最新内容const newContent = await crawlWebsite(dataset.websiteConfig);// 对比变更const changes = await detectChanges(dataset, newContent);// 增量更新if (changes.length > 0) {await pushDataListToTrainingQueue({datasetId,data: changes,trainingMode: TrainingModeEnum.auto});}}
};
6. 权限与安全
分层权限控制
const DatasetSchema = new Schema({inheritPermission: { type: Boolean, default: true }, // 继承权限defaultPermission: Number // 默认权限
});// 权限验证
export const authDataset = async ({ req, datasetId, per }) => {const { teamId, tmbId } = await authCert({ req });const dataset = await MongoDataset.findById(datasetId);if (dataset.teamId !== teamId) {throw new Error('No permission');}return { dataset, hasPermission: true };
};
数据隔离
// 所有查询都基于 teamId 隔离
const searchResults = await MongoDatasetData.find({teamId: new Types.ObjectId(teamId), // 强制团队隔离datasetId: { $in: datasetIds },// ... 其他条件
});
7. 性能优化
数据库索引策略
// 复合索引优化查询性能
DatasetDataSchema.index({teamId: 1,datasetId: 1,collectionId: 1,chunkIndex: 1,updateTime: -1
});// 向量检索索引
DatasetDataSchema.index({teamId: 1,datasetId: 1,'indexes.dataId': 1
});// 全文检索索引
DatasetDataTextSchema.index({teamId: 1,datasetId: 1,text: 'text' // MongoDB 全文索引
});
批量处理优化
// 批量插入优化
const batchSize = 200;
const insertData = async (startIndex: number) => {const batch = data.slice(startIndex, startIndex + batchSize);await MongoDatasetTraining.insertMany(batch, {ordered: false, // 允许部分失败session});if (startIndex + batchSize < data.length) {return insertData(startIndex + batchSize);}
};
缓存策略
// 向量缓存
const vectorCache = new Map();
const getCachedVector = async (text: string) => {const key = hashStr(text);if (vectorCache.has(key)) {return vectorCache.get(key);}const vector = await getVectorsByText({ input: text });vectorCache.set(key, vector);return vector;
};
8. 监控与运维
训练队列监控
// 获取训练队列状态
export const getDatasetTrainingQueue = async (teamId: string) => {const stats = await MongoDatasetTraining.aggregate([{ $match: { teamId: new Types.ObjectId(teamId) } },{$group: {_id: '$mode',count: { $sum: 1 },avgRetryCount: { $avg: '$retryCount' }}}]);return stats;
};
错误处理与重试
// 训练失败重试机制
const retryTraining = async (trainingId: string) => {const training = await MongoDatasetTraining.findById(trainingId);if (training.retryCount > 0) {training.retryCount -= 1;training.lockTime = null; // 解锁重试await training.save();} else {// 标记为永久失败training.status = 'failed';await training.save();}
};
总结
FastGPT 知识库后端实现了一个企业级的 RAG 系统,具备以下特点:
- 分层存储: Dataset → Collection → Data 的层级结构
- 多模式检索: 向量检索 + 全文检索 + 重排序融合
- 智能处理: 自动分块、查询扩展、相似度过滤
- 多源集成: 支持文件、网页、API、第三方平台等数据源
- 性能优化: 批量处理、索引优化、缓存策略
- 安全可靠: 权限控制、数据隔离、错误重试
这套架构为 FastGPT 提供了强大的知识管理和检索能力,支持大规模企业级应用场景。