当前位置: 首页 > news >正文

网站被k文章修改.net如何建设网站

网站被k文章修改,.net如何建设网站,软件开发和网页设计哪个好,做百度推广网站排名FastGPT 工作流后端实现分析 核心架构概览 FastGPT 工作流后端采用节点调度器模式,通过统一的调度引擎执行各种类型的节点,实现复杂的 AI 工作流编排。 1. 调度引擎核心 (dispatchWorkFlow) 主要职责 节点执行调度: 按依赖关系执行节点边状态管理: 控制数…

FastGPT 工作流后端实现分析

核心架构概览

FastGPT 工作流后端采用节点调度器模式,通过统一的调度引擎执行各种类型的节点,实现复杂的 AI 工作流编排。

1. 调度引擎核心 (dispatchWorkFlow)

主要职责

  • 节点执行调度: 按依赖关系执行节点
  • 边状态管理: 控制数据流向
  • 变量传递: 节点间数据共享
  • 流式响应: 实时推送执行状态
  • 错误处理: 异常捕获和恢复

核心流程

export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowResponse> {// 1. 初始化执行环境let { runtimeNodes, runtimeEdges, variables, histories } = data;// 2. 设置系统变量variables = {...getSystemVariable(data),...externalProvider.externalWorkflowVariables,...variables};// 3. 递归执行节点const entryNodes = runtimeNodes.filter(item => item.isEntry);await Promise.all(entryNodes.map(node => checkNodeCanRun(node)));// 4. 返回执行结果return {flowResponses: chatResponses,flowUsages: chatNodeUsages,assistantResponses: chatAssistantResponse,newVariables: removeSystemVariable(variables)};
}

2. 节点类型映射 (callbackMap)

节点分发器

const callbackMap: Record<FlowNodeTypeEnum, Function> = {[FlowNodeTypeEnum.chatNode]: dispatchChatCompletion,        // AI对话[FlowNodeTypeEnum.datasetSearchNode]: dispatchDatasetSearch, // 知识库搜索[FlowNodeTypeEnum.httpRequest468]: dispatchHttp468Request,   // HTTP请求[FlowNodeTypeEnum.ifElseNode]: dispatchIfElse,              // 条件判断[FlowNodeTypeEnum.code]: dispatchRunCode,                   // 代码执行[FlowNodeTypeEnum.loop]: dispatchLoop,                      // 循环控制[FlowNodeTypeEnum.tools]: dispatchRunTools,                 // 工具调用// ... 30+ 种节点类型
};

节点执行模式

  • Active 模式: 正常执行节点逻辑
  • Skip 模式: 跳过执行,传递默认值
  • Wait 模式: 等待前置条件满足

3. 节点执行机制

节点运行状态检查

async function checkNodeCanRun(node: RuntimeNodeItemType): Promise<RuntimeNodeItemType[]> {// 1. 检查节点运行状态const status = checkNodeRunStatus({ node, runtimeEdges });// 2. 根据状态执行不同逻辑if (status === 'run') {return nodeRunWithActive(node);}if (status === 'skip') {return nodeRunWithSkip(node);}// 3. 递归执行下游节点const { nextStepActiveNodes } = nodeOutput(node, result);return Promise.all(nextStepActiveNodes.map(node => checkNodeCanRun(node)));
}

节点参数注入

function getNodeRunParams(node: RuntimeNodeItemType) {const params: Record<string, any> = {};node.inputs.forEach(input => {// 1. 变量替换 {{$xx.xx$}} 和 {{xx}}let value = replaceEditorVariable({text: input.value,nodes: runtimeNodes,variables});// 2. 引用变量解析value = getReferenceVariableValue({value,nodes: runtimeNodes,variables});// 3. 类型格式化params[input.key] = valueTypeFormat(value, input.valueType);});return params;
}

4. 数据流管理

边状态控制

// 边的三种状态
type EdgeStatus = 'waiting' | 'active' | 'skipped';// 更新边状态
targetEdges.forEach(edge => {if (skipHandleId.includes(edge.sourceHandle)) {edge.status = 'skipped';  // 跳过分支} else {edge.status = 'active';   // 激活分支}
});

节点输出传递

function nodeOutput(node: RuntimeNodeItemType, result: Record<string, any>) {// 1. 更新节点输出值node.outputs.forEach(outputItem => {if (result[outputItem.key] !== undefined) {outputItem.value = result[outputItem.key];}});// 2. 获取下游节点const nextStepActiveNodes = runtimeNodes.filter(node => targetEdges.some(item => item.target === node.nodeId && item.status === 'active'));return { nextStepActiveNodes };
}

5. 特殊功能实现

流式响应处理

// SSE 流式响应配置
if (stream && res) {res.setHeader('Content-Type', 'text/event-stream;charset=utf-8');res.setHeader('Access-Control-Allow-Origin', '*');res.setHeader('Cache-Control', 'no-cache, no-transform');
}// 推送执行状态
props.workflowStreamResponse?.({event: SseResponseEventEnum.flowNodeStatus,data: { status: 'running', name: node.name }
});

交互节点处理

// 处理用户交互节点(表单输入、用户选择等)
const interactiveResponse = nodeRunResult.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {nodeInteractiveResponse = {entryNodeIds: [nodeRunResult.node.nodeId],interactiveResponse};// 暂停工作流,等待用户交互return [];
}

循环控制

// 循环节点实现
[FlowNodeTypeEnum.loop]: dispatchLoop,
[FlowNodeTypeEnum.loopStart]: dispatchLoopStart,
[FlowNodeTypeEnum.loopEnd]: dispatchLoopEnd,// 循环逻辑:通过边的连接实现循环回路

6. 错误处理机制

节点级错误处理

const dispatchRes = await (async () => {try {return await callbackMap[node.flowNodeType](dispatchData);} catch (error) {// 获取所有输出边const targetEdges = runtimeEdges.filter(item => item.source === node.nodeId);const skipHandleIds = targetEdges.map(item => item.sourceHandle);// 跳过所有边并返回错误return {[DispatchNodeResponseKeyEnum.nodeResponse]: {error: formatHttpError(error)},[DispatchNodeResponseKeyEnum.skipHandleId]: skipHandleIds};}
})();

工作流级错误恢复

  • 深度限制: 防止无限递归 (最大20层)
  • 运行次数控制: 避免死循环
  • 资源清理: 连接关闭时自动清理

7. 性能优化

并发执行

// 并行执行无依赖的节点
const nextStepActiveNodesResults = (await Promise.all(nextStepActiveNodes.map(node => checkNodeCanRun(node)))
).flat();

内存管理

// 进程让步,避免阻塞
await surrenderProcess();// 移除系统变量,减少内存占用
newVariables: removeSystemVariable(variables, externalProvider.externalWorkflowVariables)

去重优化

// 确保节点只执行一次
nextStepActiveNodes = nextStepActiveNodes.filter((node, index, self) => self.findIndex(t => t.nodeId === node.nodeId) === index
);

8. 调试支持

Debug 模式

if (props.mode === 'debug') {debugNextStepRunNodes = debugNextStepRunNodes.concat([...nextStepActiveNodes,...nextStepSkipNodes]);// Debug 模式下不继续执行,返回调试信息return { nextStepActiveNodes: [], nextStepSkipNodes: [] };
}

执行追踪

return {debugResponse: {finishedNodes: runtimeNodes,      // 已完成的节点finishedEdges: runtimeEdges,      // 边的状态nextStepRunNodes: debugNextStepRunNodes  // 下一步要执行的节点}
};

总结

FastGPT 工作流后端实现了一个高度可扩展的节点调度引擎,具备以下特点:

  1. 统一调度: 通过 callbackMap 实现节点类型的统一分发
  2. 状态管理: 精确控制节点和边的执行状态
  3. 数据流控制: 支持条件分支、循环、并行等复杂逻辑
  4. 实时响应: SSE 流式推送执行状态和结果
  5. 错误恢复: 完善的异常处理和资源清理机制
  6. 性能优化: 并发执行、内存管理、去重优化
  7. 调试支持: 完整的调试模式和执行追踪

这套架构为 FastGPT 提供了强大的 AI 工作流编排能力,支持复杂的业务逻辑实现。

http://www.dtcms.com/a/488728.html

相关文章:

  • 国家精品资源在线开放课程商城网站建设用乐云seo系统
  • 手机行情网站企业seo服务
  • 外贸服饰网站建设广州网站建设系统开发
  • 做外贸采购都是用什么网站开设网站步骤
  • 深圳建网站价格wordpress 获取参数
  • python学习之可迭代对象迭代器对象
  • 辽宁网站建设价格有没有外包活的网站
  • 安阳企业建网站wordpress变身插件
  • 【C++——模板】
  • 苏州网站开发公司兴田德润简介网站关键词排名100
  • 网站开发 百度网盘女的男的做那个视频网站
  • 我的世界做图的网站福田瑞沃es3报价及图片
  • 开源 | MeiGen-MultiTalk:基于单张照片实现多人互动演绎
  • 上海专业网站建设公司企业网站模板下载尽在
  • 肥料网站建设用c 做网站
  • 苏中建设集团网站网址免费个人网站在线制作
  • 娄底网站建设网站网站开发 旅游
  • 软件免费网站大全谷歌广告
  • hi宝贝网站建设那家好东莞做网站还赚钱吗
  • 沈阳网站建设dnglzx网站建设制作模板
  • 一个网站不兼容ie怎么做网站怎么挂广告
  • 开封建网站的公司网站开发与推广就业
  • 数字域名有哪些网站绍兴网站制作计划
  • 怎么做微信领券网站企业网站设计总结
  • 配置管理的配置与拉取、热更新的配置、动态路由
  • wordpress 书站亚马逊的网络营销方式
  • 高端网站建设创新苏州seo快速优化
  • 网校网站怎么做甘肃省兰州市城乡建设厅网站
  • 陕西省住房建设厅网站贴吧怎么做网站视频
  • cms网站管理系统源码做网站卖印度药