当前位置: 首页 > news >正文

FastGPT源码解析 Agent知识库管理维护使用详解

FastGPT 知识库后端实现分析

核心架构概览

FastGPT 知识库后端采用分层存储 + 向量检索架构,实现了完整的 RAG (Retrieval-Augmented Generation) 系统,支持多种数据源和检索模式。

1. 数据模型设计

核心实体关系

Dataset (知识库)
├── Collection (文档集合)
│   ├── DatasetData (数据块)
│   │   └── indexes (向量索引)
│   └── DatasetDataText (全文索引)
└── DatasetTraining (训练队列)

数据库 Schema

Dataset Schema (知识库)
const DatasetSchema = new Schema({parentId: { type: ObjectId, ref: 'datasets' },     // 父级知识库(支持层级)teamId: { type: ObjectId, required: true },        // 团队IDtmbId: { type: ObjectId, required: true },         // 创建者IDtype: { enum: DatasetTypeEnum },                    // 类型: dataset/folder/websiteDatasetstatus: { enum: DatasetStatusEnum },               // 状态: active/syncingname: { type: String, required: true },            // 名称vectorModel: { type: String, default: 'text-embedding-3-small' }, // 向量模型agentModel: { type: String, default: 'gpt-4o-mini' },            // 处理模型websiteConfig: { url: String, selector: String },  // 网站配置apiServer: Object,                                  // API数据源配置autoSync: Boolean                                   // 自动同步
});
DatasetData Schema (数据块)
const DatasetDataSchema = new Schema({teamId: { type: ObjectId, required: true },datasetId: { type: ObjectId, required: true },collectionId: { type: ObjectId, required: true },q: { type: String, required: true },               // 问题/标题a: { type: String, default: '' },                  // 答案/内容indexes: [{                                         // 向量索引数组defaultIndex: Boolean,dataId: String,                                   // 向量IDtext: String                                      // 索引文本}],chunkIndex: { type: Number, default: 0 },          // 分块索引updateTime: { type: Date, default: Date.now }
});

2. 数据处理流程

数据导入管道

// 1. 文件上传 → 2. 内容解析 → 3. 分块处理 → 4. 向量化 → 5. 存储
export async function pushDataListToTrainingQueue({teamId, tmbId, datasetId, collectionId,agentModel, vectorModel, data, trainingMode
}) {// 1. 模型验证const agentModelData = getLLMModel(agentModel);const vectorModelData = getEmbeddingModel(vectorModel);// 2. 数据预处理const filterResult = {success: [], overToken: [], repeat: [], error: []};data.forEach(item => {item.q = simpleText(item.q);  // 清理文本item.a = simpleText(item.a);const text = item.q + item.a;if (text.length > maxToken) {filterResult.overToken.push(item);} else if (set.has(text)) {filterResult.repeat.push(item);  // 去重} else {filterResult.success.push(item);}});// 3. 批量插入训练队列await MongoDatasetTraining.insertMany(filterResult.success.map(item => ({teamId, datasetId, collectionId,mode: trainingMode,q: item.q, a: item.a,indexes: item.indexes,retryCount: 5})));
}

训练模式

enum TrainingModeEnum {chunk = 'chunk',    // 直接分块qa = 'qa',         // QA拆分auto = 'auto'      // 自动处理
}

3. 检索系统实现

多模式检索引擎

export async function searchDatasetData({teamId, queries, datasetIds, searchMode, similarity, usingReRank, maxTokens
}: SearchDatasetDataProps): Promise<SearchDatasetDataResponse> {// 1. 检索模式配置const { embeddingLimit, fullTextLimit } = countRecallLimit();// 2. 并行检索const { embeddingRecallResults, fullTextRecallResults, tokens } = await multiQueryRecall({ embeddingLimit, fullTextLimit });// 3. 重排序 (可选)const reRankResults = usingReRank ? await datasetDataReRank({ query: reRankQuery, data: concatResults }) : [];// 4. RRF 融合const rrfConcatResults = datasetSearchResultConcat([{ k: 60, list: embeddingRecallResults },{ k: 60, list: fullTextRecallResults },{ k: 58, list: reRankResults }]);// 5. 相似度过滤 + Token限制const scoreFilter = filterBySimilarity(rrfConcatResults, similarity);const finalResults = await filterDatasetDataByMaxTokens(scoreFilter, maxTokens);return { searchRes: finalResults, tokens, searchMode };
}

向量检索

const embeddingRecall = async ({ query, limit }) => {// 1. 文本向量化const { vectors, tokens } = await getVectorsByText({model: getEmbeddingModel(model),input: query,type: 'query'});// 2. 向量数据库检索const { results } = await recallFromVectorStore({teamId, datasetIds,vector: vectors[0],limit,forbidCollectionIdList,filterCollectionIdList});// 3. 关联数据查询const [dataList, collections] = await Promise.all([MongoDatasetData.find({teamId,datasetId: { $in: datasetIds },'indexes.dataId': { $in: results.map(item => item.id) }}),MongoDatasetCollection.find({_id: { $in: collectionIdList }})]);return { embeddingRecallResults: formatResults, tokens };
};

全文检索

const fullTextRecall = async ({ query, limit }) => {// MongoDB 全文索引检索const searchResults = await MongoDatasetDataText.aggregate([{$match: {teamId: new Types.ObjectId(teamId),datasetId: { $in: datasetIds.map(id => new Types.ObjectId(id)) },$text: { $search: jiebaSplit({ text: query }) }  // 中文分词}},{ $sort: { score: { $meta: 'textScore' } } },{ $limit: limit }]);return { fullTextRecallResults: formatResults };
};

4. 高级检索功能

重排序 (ReRank)

export const datasetDataReRank = async ({ data, query }) => {const results = await reRankRecall({query,documents: data.map(item => ({id: item.id,text: `${item.q}\n${item.a}`}))});// 添加重排序分数return results.map((item, index) => ({...data.find(d => d.id === item.id),score: [{ type: SearchScoreTypeEnum.reRank, value: item.score, index }]}));
};

RRF 融合算法

// Reciprocal Rank Fusion - 多路检索结果融合
const datasetSearchResultConcat = (resultsList) => {const scoreMap = new Map();resultsList.forEach(({ k, list }) => {list.forEach((item, index) => {const score = 1 / (k + index + 1);  // RRF 公式scoreMap.set(item.id, (scoreMap.get(item.id) || 0) + score);});});return Array.from(scoreMap.entries()).sort((a, b) => b[1] - a[1])  // 按分数排序.map(([id]) => findItemById(id));
};

查询扩展

export const datasetSearchQueryExtension = async ({query, extensionModel, extensionBg
}) => {if (!extensionModel) {return { concatQueries: [query], rewriteQuery: query };}// AI 查询扩展const aiResult = await extensionModel.chat({messages: [{ role: 'system', content: extensionBg },{ role: 'user', content: query }]});const extensionQueries = parseExtensionResult(aiResult.content);return {concatQueries: [query, ...extensionQueries],extensionQueries,rewriteQuery: aiResult.rewriteQuery || query,aiExtensionResult: aiResult};
};

5. 数据源集成

多种数据源支持

// 1. 本地文件
const FileLocal = { type: 'fileLocal', formats: ['txt', 'md', 'pdf', 'docx'] };// 2. 网页爬取
const WebsiteDataset = { type: 'websiteDataset',config: { url: string, selector: string }
};// 3. API 数据源
const APIDataset = {type: 'apiDataset',config: { endpoint: string, headers: object, params: object }
};// 4. 第三方平台
const FeishuDataset = { type: 'feishuDataset' };
const YuqueDataset = { type: 'yuqueDataset' };

自动同步机制

// 定时同步任务
const autoSyncDataset = async (datasetId: string) => {const dataset = await MongoDataset.findById(datasetId);if (dataset.autoSync && dataset.type === 'websiteDataset') {// 爬取最新内容const newContent = await crawlWebsite(dataset.websiteConfig);// 对比变更const changes = await detectChanges(dataset, newContent);// 增量更新if (changes.length > 0) {await pushDataListToTrainingQueue({datasetId,data: changes,trainingMode: TrainingModeEnum.auto});}}
};

6. 权限与安全

分层权限控制

const DatasetSchema = new Schema({inheritPermission: { type: Boolean, default: true },  // 继承权限defaultPermission: Number                              // 默认权限
});// 权限验证
export const authDataset = async ({ req, datasetId, per }) => {const { teamId, tmbId } = await authCert({ req });const dataset = await MongoDataset.findById(datasetId);if (dataset.teamId !== teamId) {throw new Error('No permission');}return { dataset, hasPermission: true };
};

数据隔离

// 所有查询都基于 teamId 隔离
const searchResults = await MongoDatasetData.find({teamId: new Types.ObjectId(teamId),  // 强制团队隔离datasetId: { $in: datasetIds },// ... 其他条件
});

7. 性能优化

数据库索引策略

// 复合索引优化查询性能
DatasetDataSchema.index({teamId: 1,datasetId: 1,collectionId: 1,chunkIndex: 1,updateTime: -1
});// 向量检索索引
DatasetDataSchema.index({teamId: 1,datasetId: 1,'indexes.dataId': 1
});// 全文检索索引
DatasetDataTextSchema.index({teamId: 1,datasetId: 1,text: 'text'  // MongoDB 全文索引
});

批量处理优化

// 批量插入优化
const batchSize = 200;
const insertData = async (startIndex: number) => {const batch = data.slice(startIndex, startIndex + batchSize);await MongoDatasetTraining.insertMany(batch, {ordered: false,  // 允许部分失败session});if (startIndex + batchSize < data.length) {return insertData(startIndex + batchSize);}
};

缓存策略

// 向量缓存
const vectorCache = new Map();
const getCachedVector = async (text: string) => {const key = hashStr(text);if (vectorCache.has(key)) {return vectorCache.get(key);}const vector = await getVectorsByText({ input: text });vectorCache.set(key, vector);return vector;
};

8. 监控与运维

训练队列监控

// 获取训练队列状态
export const getDatasetTrainingQueue = async (teamId: string) => {const stats = await MongoDatasetTraining.aggregate([{ $match: { teamId: new Types.ObjectId(teamId) } },{$group: {_id: '$mode',count: { $sum: 1 },avgRetryCount: { $avg: '$retryCount' }}}]);return stats;
};

错误处理与重试

// 训练失败重试机制
const retryTraining = async (trainingId: string) => {const training = await MongoDatasetTraining.findById(trainingId);if (training.retryCount > 0) {training.retryCount -= 1;training.lockTime = null;  // 解锁重试await training.save();} else {// 标记为永久失败training.status = 'failed';await training.save();}
};

总结

FastGPT 知识库后端实现了一个企业级的 RAG 系统,具备以下特点:

  1. 分层存储: Dataset → Collection → Data 的层级结构
  2. 多模式检索: 向量检索 + 全文检索 + 重排序融合
  3. 智能处理: 自动分块、查询扩展、相似度过滤
  4. 多源集成: 支持文件、网页、API、第三方平台等数据源
  5. 性能优化: 批量处理、索引优化、缓存策略
  6. 安全可靠: 权限控制、数据隔离、错误重试

这套架构为 FastGPT 提供了强大的知识管理和检索能力,支持大规模企业级应用场景。


文章转载自:

http://4J0Yp0fe.yLLjn.cn
http://JpBM8qiL.yLLjn.cn
http://uwF20yHi.yLLjn.cn
http://kUbDabzN.yLLjn.cn
http://NKDqp6uU.yLLjn.cn
http://A6Bo9BMS.yLLjn.cn
http://xHej4Pdn.yLLjn.cn
http://MYsy20xM.yLLjn.cn
http://pM2nU2lv.yLLjn.cn
http://Yd9gq9K6.yLLjn.cn
http://rWS8RzcT.yLLjn.cn
http://n6ZMj3dq.yLLjn.cn
http://aDLJ6Jfj.yLLjn.cn
http://m1hjQvbx.yLLjn.cn
http://qjErtgxd.yLLjn.cn
http://u3Za4qgo.yLLjn.cn
http://nXCDmdqF.yLLjn.cn
http://ilknAlbA.yLLjn.cn
http://wjQwq9sQ.yLLjn.cn
http://tGwljakB.yLLjn.cn
http://v2oCQ0ci.yLLjn.cn
http://V9mJppqW.yLLjn.cn
http://etEocBy2.yLLjn.cn
http://jF1Zrq6G.yLLjn.cn
http://ciMggydU.yLLjn.cn
http://dfE2ocKy.yLLjn.cn
http://nWxbqOyv.yLLjn.cn
http://47idtss9.yLLjn.cn
http://0pyjn5YO.yLLjn.cn
http://yrPANZF4.yLLjn.cn
http://www.dtcms.com/a/368889.html

相关文章:

  • MATLAB 2023a深度学习工具箱全面解析:从CNN、RNN、GAN到YOLO与U-Net,涵盖模型解释、迁移学习、时间序列预测与图像生成的完整实战指南
  • 均匀圆形阵抗干扰MATLAB仿真实录与特点解读
  • 《深入理解双向链表:增删改查及销毁操作》
  • 属性关键字
  • 【Linux基础】Linux系统管理:MBR分区实践详细操作指南
  • 国产化FPGA开发板:2050-基于JFMK50T4(XC7A50T)的核心板
  • 时隔4年麒麟重新登场!华为这8.8英寸新「手机」给我看麻了
  • 敏感词过滤这么玩?自定义注解 + DFA 算法,优雅又高效!
  • RPC内核细节(转载)
  • 如何将 Android 设备的系统底层日志(如内核日志、系统服务日志等)拷贝到 Windows 本地
  • Vue美化文字链接(超链接)
  • 中囯移动电视盒子(魔百和)B860AV2.1-A2和CM311-5-zg刷机手记
  • TCP/IP函数——sendmsg
  • Linux网络自定义协议与序列化
  • 人工智能机器学习——聚类
  • docker-compose跨节点部署Elasticsearch 9.X集群
  • Qt控件:Item Views/Widgets
  • 轻量高效:Miniserve文件共享神器
  • Netty从0到1系列之JDK零拷贝技术
  • 从无图到轻图,大模型时代,图商的新角逐
  • 【物种分布模型】R语言物种气候生态位动态量化与分布特征模拟——气候生态位动态检验、质心转移可视化、适生区预测等
  • 盟接之桥说制造:在安全、确定与及时之间,构建品质、交期与反应速度的动态平衡
  • 【Android】SQLite使用——增删查改
  • DJANGO后端服务启动报错及解决
  • Hive使用Tez引擎出现OOM的解决方法
  • 前端三件套+springboot后端连通尝试
  • AI大模型如何重塑日常?从智能办公到生活服务的5个核心改变
  • 158-EEMD-HHT算法
  • 人机信智协同新范式:信的双重性与序位统合
  • RT-Thread源码分析字节实现socket源码