当前位置: 首页 > news >正文

FastGPT源码解析 Agent 智能体插件实现代码分析

FastGPT插件实现逻辑和代码梳理

1. 插件系统概述

FastGPT通过插件丰富智能体生态,集成多种的方式的各种功能的工具,无限扩展智能体功能。插件模块支持多种类型的插件,提供了灵活的扩展机制:

1.1 插件类型

export enum PluginTypeEnum {folder = 'folder',    // 文件夹custom = 'custom',    // 自定义插件http = 'http'         // HTTP插件
}export enum PluginSourceEnum {personal = 'personal',      // 个人插件community = 'community',    // 社区插件commercial = 'commercial'   // 商业插件
}

1.2 插件ID规则

// 插件ID规则:
// personal: id (直接使用应用ID)
// community: community-id
// commercial: commercial-idexport async function splitCombinePluginId(id: string) {const splitRes = id.split('-');if (splitRes.length === 1) {// 个人应用IDreturn {source: PluginSourceEnum.personal,pluginId: id};}const [source, pluginId] = id.split('-') as [PluginSourceEnum, string];return { source, pluginId: id };
}

2. 插件注册机制

2.1 插件注册器

核心文件: packages/plugins/register.ts

// 主线程运行的静态插件
const staticPluginList = ['getTime','fetchUrl', 'feishu','DingTalkWebhook','WeWorkWebhook','google','bing','delay'
];// Worker线程运行的包插件(包含npm依赖)
const packagePluginList = ['mathExprVal','duckduckgo','duckduckgo/search','duckduckgo/searchImg','duckduckgo/searchNews','duckduckgo/searchVideo','drawing','drawing/baseChart','wiki','databaseConnection','Doc2X','Doc2X/PDF2text','searchXNG'
];export const getCommunityPlugins = () => {return Promise.all(list.map<Promise<SystemPluginTemplateItemType>>(async (name) => {const config = (await import(`./src/${name}/template.json`))?.default;const isFolder = list.find((item) => item.startsWith(`${name}/`));const parentIdList = name.split('/').slice(0, -1);const parentId = parentIdList.length > 0 ? `${PluginSourceEnum.community}-${parentIdList.join('/')}` : null;return {...config,id: `${PluginSourceEnum.community}-${name}`,isFolder,parentId,isActive: true,isOfficial: true};}));
};export const getCommunityCb = async () => {const result = await Promise.all(list.map(async (name) => {return {name,cb: staticPluginList.includes(name)? await loadCommunityModule(name)  // 主线程执行: (e: any) => {                    // Worker线程执行return runWorker(WorkerNameEnum.systemPluginRun, {pluginName: name,data: e});}};}));return result.reduce<Record<string, (e: any) => SystemPluginResponseType>>((acc, { name, cb }) => {acc[name] = cb;return acc;}, {});
};

2.2 插件模板结构

每个插件都包含两个核心文件:

  1. template.json - 插件配置和工作流定义
  2. index.ts - 插件执行逻辑(可选,用于静态插件)

示例: packages/plugins/src/getTime/template.json

{"author": "","version": "481","templateType": "tools","name": "获取当前时间","avatar": "core/workflow/template/getTime","intro": "获取用户当前时区的时间。","showStatus": false,"isTool": true,"weight": 10,"workflow": {"nodes": [{"nodeId": "lmpb9v2lo2lk","name": "插件开始","flowNodeType": "pluginInput","inputs": [],"outputs": []},{"nodeId": "i7uow4wj2wdp", "name": "插件输出","flowNodeType": "pluginOutput","inputs": [{"key": "time","valueType": "string","label": "time","value": ["ebLCxU43hHuZ", "rH4tMV02robs"]}],"outputs": []},{"nodeId": "ebLCxU43hHuZ","name": "HTTP 请求", "flowNodeType": "httpRequest468","inputs": [{"key": "system_httpReqUrl","value": "getTime"},{"key": "system_httpJsonBody","value": "{\n  \"time\": \"{{cTime}}\"\n}"}],"outputs": [{"id": "rH4tMV02robs","key": "time","label": "time","valueType": "string"}]}],"edges": [{"source": "lmpb9v2lo2lk","target": "ebLCxU43hHuZ"},{"source": "ebLCxU43hHuZ", "target": "i7uow4wj2wdp"}]}
}

3. 插件执行机制

3.1 插件运行调度

核心文件: packages/service/core/workflow/dispatch/plugin/run.ts

export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPluginResponse> => {const {node: { pluginId, version },runningAppInfo,query,params: { system_forbid_stream = false, ...data }} = props;// 1. 权限验证const pluginData = await authPluginByTmbId({appId: pluginId,tmbId: runningAppInfo.tmbId,per: ReadPermissionVal});// 2. 获取插件运行时数据const plugin = await getChildAppRuntimeById(pluginId, version);// 3. 构建运行时节点const runtimeNodes = storeNodes2RuntimeNodes(plugin.nodes,getWorkflowEntryNodeIds(plugin.nodes)).map((node) => {// 更新插件输入节点的值if (node.flowNodeType === FlowNodeTypeEnum.pluginInput) {return {...node,showStatus: false,inputs: node.inputs.map((input) => ({...input,value: data[input.key] ?? input.value}))};}return { ...node, showStatus: false };});// 4. 执行工作流const { flowResponses, flowUsages, assistantResponses, runTimes } = await dispatchWorkFlow({...props,runningAppInfo: {id: String(plugin.id),teamId: plugin.teamId || runningAppInfo.teamId,tmbId: pluginData?.tmbId || runningAppInfo.tmbId},variables: runtimeVariables,query: getPluginRunUserQuery({pluginInputs: getPluginInputsFromStoreNodes(plugin.nodes),variables: runtimeVariables,files}).value,runtimeNodes,runtimeEdges: initWorkflowEdgeStatus(plugin.edges)});// 5. 处理输出结果const output = flowResponses.find(item => item.moduleType === FlowNodeTypeEnum.pluginOutput);const usagePoints = await computedPluginUsage({plugin,childrenUsage: flowUsages,error: !!output?.pluginOutput?.error});return {assistantResponses: system_forbid_stream ? [] : assistantResponses,[DispatchNodeResponseKeyEnum.runTimes]: runTimes,[DispatchNodeResponseKeyEnum.nodeResponse]: {moduleLogo: plugin.avatar,totalPoints: usagePoints,pluginOutput: output?.pluginOutput},[DispatchNodeResponseKeyEnum.toolResponses]: output?.pluginOutput,...(output ? output.pluginOutput : {})};
};

3.2 插件输入处理

核心文件: packages/service/core/workflow/dispatch/plugin/runInput.ts

export const dispatchPluginInput = (props: PluginInputProps) => {const { params, query } = props;const { files } = chatValue2RuntimePrompt(query);// 处理文件类型参数for (const key in params) {const val = params[key];if (Array.isArray(val) &&val.every(item => item.type === ChatFileTypeEnum.file || item.type === ChatFileTypeEnum.image)) {params[key] = val.map(item => item.url);}}return {...params,[DispatchNodeResponseKeyEnum.nodeResponse]: {},[NodeOutputKeyEnum.userFiles]: files.map(item => item?.url ?? '').filter(Boolean)};
};

3.3 Worker线程执行

核心文件: packages/plugins/runtime/worker.ts

const loadModule = async (name: string): Promise<(e: any) => SystemPluginResponseType> => {const module = await import(`../src/${name}/index`);return module.default;
};parentPort?.on('message', async ({ pluginName, data }: { pluginName: string; data: any }) => {try {const cb = await loadModule(pluginName);parentPort?.postMessage({type: 'success',data: await cb(data)});} catch (error) {parentPort?.postMessage({type: 'error',data: error});}process.exit();
});

4. HTTP插件系统

4.1 HTTP插件创建

核心文件: projects/app/src/pageComponents/app/list/HttpPluginEditModal.tsx

HTTP插件支持通过OpenAPI Schema自动生成:

const HttpPluginEditModal = ({ defaultPlugin, onClose }) => {const [apiData, setApiData] = useState<OpenApiJsonSchema>({ pathData: [], serverPath: '' });// 从URL加载API Schemaconst { mutate: onClickUrlLoadApi } = useRequest({mutationFn: async () => {if (!schemaUrl || (!schemaUrl.startsWith('https://') && !schemaUrl.startsWith('http://'))) {return toast({title: t('common:plugin.Invalid URL'),status: 'warning'});}const schema = await getApiSchemaByUrl(schemaUrl);setValue('pluginData.apiSchemaStr', JSON.stringify(schema, null, 2));}});// 解析API SchemauseEffect(() => {(async () => {if (!apiSchemaStr) {return setApiData({ pathData: [], serverPath: '' });}try {setApiData(await str2OpenApiSchema(apiSchemaStr));} catch (err) {toast({status: 'warning',title: t('common:plugin.Invalid Schema')});setApiData({ pathData: [], serverPath: '' });}})();}, [apiSchemaStr]);return (<MyModal>{/* 基本信息设置 */}<Input {...register('name')} /><Textarea {...register('intro')} />{/* OpenAPI Schema输入 */}<Textarea {...register('pluginData.apiSchemaStr')} />{/* 自定义请求头 */}<Table>{customHeaders.map((item, index) => (<Tr key={index}><Td><HttpInput value={item.key}onBlur={(val) => updateHeaders(index, 'key', val)}/></Td><Td><HttpInput value={item.value}onBlur={(val) => updateHeaders(index, 'value', val)}/></Td></Tr>))}</Table>{/* API列表预览 */}<Table>{apiData.pathData?.map((item, index) => (<Tr key={index}><Td>{item.name}</Td><Td>{item.description}</Td><Td>{item.method}</Td><Td>{item.path}</Td></Tr>))}</Table></MyModal>);
};

4.2 cURL解析功能

核心文件: projects/app/src/web/core/app/templates.ts

支持从cURL命令自动生成插件:

export const parsePluginFromCurlString = (curl: string) => {const { url, method, headers, body, params, bodyArray } = parseCurl(curl);// 解析参数生成插件输入const allInputs = Array.from(new Map([...params, ...bodyArray].map(item => [item.key, item])).values());const formatPluginStartInputs = allInputs.map(item => {const valueType = item.value === null ? 'string' : typeof item.value;const valueTypeMap = {string: {renderTypeList: [FlowNodeInputTypeEnum.input, FlowNodeInputTypeEnum.reference],valueType: WorkflowIOValueTypeEnum.string,isToolType: true},number: {renderTypeList: [FlowNodeInputTypeEnum.numberInput, FlowNodeInputTypeEnum.reference],valueType: WorkflowIOValueTypeEnum.number,isToolType: true},boolean: {renderTypeList: [FlowNodeInputTypeEnum.switch, FlowNodeInputTypeEnum.reference],valueType: WorkflowIOValueTypeEnum.boolean,isToolType: true}};return {renderTypeList: valueTypeMap[valueType].renderTypeList,valueType: valueTypeMap[valueType].valueType,key: item.key,label: item.key,required: false,toolDescription: item.key};});// 生成HTTP请求节点配置const referenceBody = Object.entries(JSON.parse(body)).reduce((acc, [key, value]) => {acc[key] = typeof value === 'string' ? `###{{$pluginInput.${key}$}}###` : `{{$pluginInput.${key}$}}`;return acc;}, {} as Record<string, any>);return {nodes: [// 插件输入节点{nodeId: 'pluginInput',name: '插件开始',flowNodeType: FlowNodeTypeEnum.pluginInput,inputs: formatPluginStartInputs,outputs: formatPluginStartOutputs},// HTTP请求节点{nodeId: 'httpRequest',name: 'HTTP 请求',flowNodeType: FlowNodeTypeEnum.httpRequest468,inputs: [{key: 'system_httpMethod',value: method},{key: 'system_httpReqUrl', value: url},{key: 'system_httpHeader',value: referenceHeaders},{key: 'system_httpJsonBody',value: referenceBodyStr}]},// 插件输出节点{nodeId: 'pluginOutput',name: '插件输出',flowNodeType: FlowNodeTypeEnum.pluginOutput}],edges: [{ source: 'pluginInput', target: 'httpRequest' },{ source: 'httpRequest', target: 'pluginOutput' }]};
};

5. 插件管理系统

5.1 插件控制器

核心文件: packages/service/core/app/plugin/controller.ts

// 获取插件预览节点数据
export async function getChildAppPreviewNode({ id }: { id: string }): Promise<FlowNodeTemplateType> {const app = await (async () => {const { source, pluginId } = await splitCombinePluginId(id);if (source === PluginSourceEnum.personal) {// 个人插件const item = await MongoApp.findById(id).lean();const version = await getAppLatestVersion(id, item);return {id: String(item._id),teamId: String(item.teamId),name: item.name,avatar: item.avatar,workflow: {nodes: version.nodes,edges: version.edges,chatConfig: version.chatConfig},templateType: FlowNodeTemplateTypeEnum.teamApp};} else {// 系统插件return getSystemPluginTemplateById(pluginId);}})();const isPlugin = !!app.workflow.nodes.find(node => node.flowNodeType === FlowNodeTypeEnum.pluginInput);return {id: getNanoid(),pluginId: app.id,templateType: app.templateType,flowNodeType: isPlugin ? FlowNodeTypeEnum.pluginModule : FlowNodeTypeEnum.appModule,avatar: app.avatar,name: app.name,intro: app.intro,showStatus: app.showStatus,isTool: true,version: app.version,...(isPlugin? pluginData2FlowNodeIO({ nodes: app.workflow.nodes }): appData2FlowNodeIO({ chatConfig: app.workflow.chatConfig }))};
}// 获取插件运行时数据
export async function getChildAppRuntimeById(id: string,versionId?: string
): Promise<PluginRuntimeType> {const app = await (async () => {const { source, pluginId } = await splitCombinePluginId(id);if (source === PluginSourceEnum.personal) {const item = await MongoApp.findById(id).lean();const version = await getAppVersionById({ appId: id, versionId, app: item });return {id: String(item._id),teamId: String(item.teamId),name: item.name,avatar: item.avatar,workflow: {nodes: version.nodes,edges: version.edges,chatConfig: version.chatConfig}};} else {return getSystemPluginTemplateById(pluginId, versionId);}})();return {id: app.id,teamId: app.teamId,name: app.name,avatar: app.avatar,showStatus: app.showStatus,currentCost: app.currentCost,nodes: app.workflow.nodes,edges: app.workflow.edges,hasTokenFee: app.hasTokenFee};
}

5.2 插件加载机制

核心文件: projects/app/src/service/core/app/plugin.ts

export const getSystemPlugins = async (refresh = false) => {if (isProduction && global.systemPlugins && global.systemPlugins.length > 0 && !refresh)return cloneDeep(global.systemPlugins);try {if (!global.systemPlugins) {global.systemPlugins = [];}// 根据环境加载不同的插件global.systemPlugins = FastGPTProUrl? await getCommercialPlugins()  // 商业版插件: await getCommunityPlugins();  // 社区版插件addLog.info(`Load system plugin successfully: ${global.systemPlugins.length}`);return cloneDeep(global.systemPlugins);} catch (error) {global.systemPlugins = undefined;return Promise.reject(error);}
};export const getSystemPluginCb = async (refresh = false) => {if (isProduction &&global.systemPluginCb &&Object.keys(global.systemPluginCb).length > 0 &&!refresh)return global.systemPluginCb;try {global.systemPluginCb = {};await getSystemPlugins(refresh);// 根据环境获取不同的回调函数global.systemPluginCb = FastGPTProUrl ? await getCommercialCb()  // 商业版回调: await getCommunityCb();  // 社区版回调return global.systemPluginCb;} catch (error) {return Promise.reject(error);}
};

6. 插件开发示例

6.1 简单插件示例

文件: packages/plugins/src/getTime/index.ts

type Props = {time: string;
};
type Response = Promise<{time: string;
}>;const main = async ({ time }: Props): Response => {return {time};
};export default main;

6.2 复杂插件示例

文件: packages/plugins/src/mathExprVal/index.ts

import { evaluate } from 'mathjs';type Props = {expression: string;
};
type Response = Promise<{result: number | string;error?: string;
}>;const main = async ({ expression }: Props): Response => {try {if (!expression) {return {result: '',error: 'Expression is required'};}const result = evaluate(expression);return {result: typeof result === 'number' ? result : String(result)};} catch (error) {return {result: '',error: `Math evaluation error: ${error.message}`};}
};export default main;

7. 插件系统架构总结

7.1 核心组件

  1. 插件注册器 - 负责插件的发现和注册
  2. 插件控制器 - 管理插件的生命周期
  3. 插件调度器 - 执行插件工作流
  4. Worker管理器 - 处理需要隔离执行的插件
  5. HTTP插件系统 - 支持OpenAPI规范的HTTP插件

7.2 执行流程

  1. 插件注册 → 系统启动时扫描并注册所有插件
  2. 插件发现 → 用户在工作流中选择插件
  3. 参数绑定 → 将用户输入绑定到插件参数
  4. 插件执行 → 根据插件类型选择执行方式
  5. 结果返回 → 处理插件输出并返回结果

7.3 关键文件路径

核心系统文件
  • packages/plugins/register.ts - 插件注册器
  • packages/plugins/runtime/worker.ts - Worker执行器
  • packages/service/core/app/plugin/controller.ts - 插件控制器
  • packages/service/core/workflow/dispatch/plugin/run.ts - 插件调度器
插件源码目录
  • packages/plugins/src/ - 社区插件源码
  • plugins/model/ - 模型插件
  • plugins/webcrawler/ - 网络爬虫插件
前端界面文件
  • projects/app/src/pageComponents/app/list/HttpPluginEditModal.tsx - HTTP插件编辑
  • projects/app/src/service/core/app/plugin.ts - 插件服务

这套插件系统提供了完整的插件开发、注册、管理和执行能力,支持从简单的工具函数到复杂的HTTP API集成。


文章转载自:

http://Y5Kxhs8S.yqmmh.cn
http://Ch6lxZZo.yqmmh.cn
http://cpso2jym.yqmmh.cn
http://iTk072D9.yqmmh.cn
http://l7zKu5Ad.yqmmh.cn
http://LqywgIhk.yqmmh.cn
http://9gt1Jy3n.yqmmh.cn
http://pCLrwYsd.yqmmh.cn
http://EYmGU9tX.yqmmh.cn
http://WsskQhgk.yqmmh.cn
http://RziwzPFp.yqmmh.cn
http://0YpE5PBg.yqmmh.cn
http://UyIMVmUG.yqmmh.cn
http://y6D0MwuB.yqmmh.cn
http://p7QGKPJN.yqmmh.cn
http://VvPkwRui.yqmmh.cn
http://4ywqojWi.yqmmh.cn
http://J6cMvhqX.yqmmh.cn
http://Z08KwMTv.yqmmh.cn
http://FKKvD5zX.yqmmh.cn
http://UOOe21c6.yqmmh.cn
http://ey7qqted.yqmmh.cn
http://92mR1tUN.yqmmh.cn
http://gVatWBPk.yqmmh.cn
http://csF0IkPv.yqmmh.cn
http://7q0Y6et7.yqmmh.cn
http://B34YqxO6.yqmmh.cn
http://ZOTHOyS8.yqmmh.cn
http://UVbrN6jK.yqmmh.cn
http://C5sBtSZM.yqmmh.cn
http://www.dtcms.com/a/376448.html

相关文章:

  • 【Fastjson】Fastjson2 在不同 Modules 模块包下,@JSONField name映射无法反序列化的 BUG 及解决
  • [特殊字符] 从助手到引擎:基于 GPT 的战略协作系统演示
  • SSE 模仿 GPT 响应
  • ThingsKit物联网平台 v2.0.0 发布|前端UI重构、底层架构升级
  • 面向对象数据分析实战编程题:销售数据统计与可视化(Python)
  • Transformer vs. Diffusion:谁将主宰通用视频生成与世界模型的未来?
  • 存储卷配额管理针对海外VPS容器环境的实施流程
  • 前端开发中常见英文缩写及其全称
  • Linux第十五讲:Socket编程UDP
  • Electron 高级 UI:集成 React 或 Vue.js
  • CKAD-CN考试之路----10
  • Linux嵌入式自学笔记(基于野火EBF6ULL):1.配置环境
  • 2025【1460天】网络工程师经验之道
  • 图解设计模式【3】
  • java 将pdf转图片
  • ES(springcloud笔记第五期)
  • Day40 Web服务器原理与C语言实现:从HTTP协议到静态资源服务
  • 利用FFmpeg自动批量处理m4s文件
  • [iOS] ViewController 的生命周期
  • MySQL 核心文件解析:从配置到存储的 “说明书 + 记录仪” 系统
  • 一文了解大模型压缩与部署
  • Jenkins 构建 Node 项目报错解析与解决——pnpm lockfile 问题实战
  • Wazuh 研究记录 | 开源XDR | 安全基线检测
  • 配电网故障诊断与自愈控制工具的智慧能源开源了
  • [邮件服务器core] 安全通信(SSL/TLS) | OpenSSL库管理 | 服务端安全SECURITY.md
  • Workers API 实战教程:45 秒完成 CI/CD 云函数部署
  • MySQL收集processlist记录的shell工具mysql_collect_processlist
  • 计算机毕业设计 基于Hadoop的健康饮食推荐系统的设计与实现 Java 大数据毕业设计 Hadoop毕业设计选题【附源码+文档报告+安装调试】
  • 【nginx基础】Nginx安装指南:CentOS 7.9源码编译安装Nginx 1.28.0完整指南
  • ShardingJDBC实战指南