【AI WorkFow】n8n 源码分析-核心结构体设计思考(六)
目录
- 思考
- 一、核心数据模型结构
- 1、IWorkflowBase - 工作流基础结构
- 2、INode - 节点结构
- 2.1 节点结构
- 2.2 节点类型结构(给节点选择类型)
- 2.3 节点类型应用举例
- 2.3.1 HTTP Request 节点
- 2.3.2 Cron 节点
- 2.3.3 Webhook 节点
 
- 2.4 节点类型描述结构
- 2.4.1 HTTP Request 节点描述示例
- 2.4.2 AI 节点描述示例
 
 
- 3、IConnections - 连接结构
- 4、INodeParameters - 节点参数
- 4.1 类型关系和嵌套结构
- 4.2 应用实例
- 简单结构
- 复杂嵌套结构
 
- 4.3 特殊参数类型
 
- 5、INodeExecutionData - 节点执行数据
 
- 二、工作流执行相关数据结构
- 1、IRunExecutionData - 执行数据
- 2、ITaskData - 任务数据
 
 
思考
分析核心结构体字段设计和说明,为什么要这么设计模型?
一、核心数据模型结构
1、IWorkflowBase - 工作流基础结构
基础的工作流结构,在别的结构中有嵌套
export interface IWorkflowBase {id: string;                        // 工作流唯一标识符name: string;                      // 工作流名称active: boolean;                   // 工作流是否激活isArchived: boolean;               // 工作流是否已归档createdAt: Date;                   // 创建时间startedAt?: Date;                  // 启动时间updatedAt: Date;                   // 更新时间nodes: INode[];                    // 节点列表connections: IConnections;         // 节点连接信息,类型 IConnections 连接结构settings?: IWorkflowSettings;      // 工作流设置,类型 IWorkflowSettings 工作流设置结构staticData?: IDataObject;          // 静态数据,类型 IDataObject 数据对象结构pinData?: IPinData;                // 固定数据(用于测试),类型IPinData 固定数据结构versionId?: string;                // 版本标识符
}
相关的数据结构
- IConnections:下面有
- IWorkflowSettings:
export interface IWorkflowSettings {timezone?: 'DEFAULT' | string;errorWorkflow?: string;callerIds?: string;callerPolicy?: WorkflowSettings.CallerPolicy;saveDataErrorExecution?: WorkflowSettings.SaveDataExecution;saveDataSuccessExecution?: WorkflowSettings.SaveDataExecution;saveManualExecutions?: 'DEFAULT' | boolean;saveExecutionProgress?: 'DEFAULT' | boolean;executionTimeout?: number;executionOrder?: 'v0' | 'v1';timeSavedPerExecution?: number;availableInMCP?: boolean;
}
- IDataObject:
export interface IDataObject {[key: string]: GenericValue | IDataObject | GenericValue[] | IDataObject[];
}
- IPinData:
export interface IPinData {[nodeName: string]: INodeExecutionData[];
}
2、INode - 节点结构
2.1 节点结构
export type OnError = 'continueErrorOutput' | 'continueRegularOutput' | 'stopWorkflow';
export interface INode {id: string;                                    // 节点的唯一标识符name: string;                                  // 节点的名称,在工作流中应唯一typeVersion: number;                           // 节点类型的版本号type: string;                                  // 节点的类型,例如 "@n8n/n8n-nodes-langchain.chatTrigger"position: [number, number];                    // 节点在画布上的位置坐标 [x, y]disabled?: boolean;                            // 节点是否被禁用,禁用的节点不会被执行notes?: string;                                // 节点的备注信息notesInFlow?: boolean;                         // 是否在流程图中显示备注retryOnFail?: boolean;                         // 当节点执行失败时是否重试maxTries?: number;                             // 节点执行的最大尝试次数waitBetweenTries?: number;                     // 重试之间的等待时间(毫秒)alwaysOutputData?: boolean;                    // 是否始终输出数据,即使没有输入数据也输出空数据executeOnce?: boolean;                         // 是否只执行一次,用于处理多个输入数据时只处理第一个// 节点出错时的处理方式: // 'continueErrorOutput' - 将错误发送到错误输出// 'continueRegularOutput' - 将错误作为常规数据继续执行// 'stopWorkflow' - 停止工作流执行onError?: OnError;                             // 类型 上面已经定义continueOnFail?: boolean;                      // 节点失败时是否继续执行后续节点parameters: INodeParameters;                   // 节点的参数配置,类型 INodeParameters 参数结构credentials?: INodeCredentials;                // 节点使用的凭证信息,类型 INodeCredentials 凭证结构webhookId?: string;                            // Webhook节点的唯一标识符extendsCredential?: string;                    // 继承的凭证类型rewireOutputLogTo?: NodeConnectionType;  // 重定向输出日志到指定的连接类型,类型 NodeConnectionType 节点连接类型// 强制节点执行特定的自定义操作// 基于资源和操作,而不是调用默认的执行函数// 由评估测试运行器使用forceCustomOperation?: {resource: string;    // 资源operation: string;   // 操作};
}
2.2 节点类型结构(给节点选择类型)
INodeType 接口定义了节点类型的基本结构和功能。它是所有 n8n 节点实现的核心接口,规定了节点必须实现的方法和属性
export interface INodeType {description: INodeTypeDescription;   // 必须有一个描述对象,包含节点的基本信息,如名称、版本、输入输出配置、属性等supplyData?(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData>; // 用于向 AI 系统提供数据的方法,主要用于 AI 相关节点execute?(this: IExecuteFunctions, response?: EngineResponse): Promise<NodeOutput>;  // 节点的主要执行方法,用于处理节点的业务逻辑。当工作流执行到该节点时,会调用这个方法。/*** A function called when a node receives a chat message. Allows it to react* to the message before it gets executed.*/onMessage?(context: IExecuteFunctions, data: INodeExecutionData): Promise<NodeOutput>;poll?(this: IPollFunctions): Promise<INodeExecutionData[][] | null>; // 轮询方法,用于定期检查是否有新数据需要处理,常用于触发器节点trigger?(this: ITriggerFunctions): Promise<ITriggerResponse | undefined>;// 触发器方法,用于响应外部事件触发工作流执行webhook?(this: IWebhookFunctions): Promise<IWebhookResponseData>; // Webhook 方法,用于处理来自外部的 Webhook 请求// methods 属性定义了节点可以使用的各种辅助方法,解释如下methods?: {// 用于动态加载节点属性选项loadOptions?: {[key: string]: (this: ILoadOptionsFunctions) => Promise<INodePropertyOptions[]>;};// 用于搜索和列出资源listSearch?: {[key: string]: (this: ILoadOptionsFunctions,filter?: string,paginationToken?: string,) => Promise<INodeListSearchResult>;};// 用于测试凭证credentialTest?: {// Contains a group of functions that test credentials.[functionName: string]: ICredentialTestFunction;};// 用于资源映射resourceMapping?: {[functionName: string]: (this: ILoadOptionsFunctions) => Promise<ResourceMapperFields>;};// 用于本地资源映射localResourceMapping?: {[functionName: string]: (this: ILocalLoadOptionsFunctions) => Promise<ResourceMapperFields>;};// 用于处理特定操作actionHandler?: {[functionName: string]: (this: ILoadOptionsFunctions,payload: IDataObject | string | undefined,) => Promise<NodeParameterValueType>;};};// 用于配置 Webhook 相关的方法,包括检查 Webhook 是否存在、创建和删除 WebhookwebhookMethods?: {[name in WebhookType]?: {[method in WebhookSetupMethodNames]: (this: IHookFunctions) => Promise<boolean>;};};/*** Defines custom operations for nodes that do not implement an `execute` method, such as declarative nodes.* This function will be invoked instead of `execute` for a specific resource and operation.* Should be either `execute` or `customOperations` defined for a node, but not both.** @property customOperations - Maps specific resource and operation to a custom function*/// 为没有实现 execute 方法的声明式节点定义自定义操作。这个属性允许为特定的资源和操作定义专门的函数customOperations?: {[resource: string]: {[operation: string]: (this: IExecuteFunctions) => Promise<NodeOutput>;};};
}
节点标准化作用:
- 确保所有的节点都遵循相同的结构和规范,可以统一处理各种不同类型的节点
- 通过定义不同的执行方法(execute, poll, trigger, webhook),n8n 可以根据节点类型采用不同的执行策略
- 通过methods属性,节点可以实现动态配置,动态配置加载,资源搜索等
- 接口允许节点根据不同的需要实现不同的方法,不需要实现所有方法,扩展性强

2.3 节点类型应用举例
2.3.1 HTTP Request 节点
HTTP Request 节点主要实现 execute 方法来发送 HTTP 请求:
export class HttpRequest implements INodeType {description: INodeTypeDescription = {// 节点描述};async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {// 发送 HTTP 请求的逻辑}
}
2.3.2 Cron 节点
Cron 节点实现 trigger 方法来定期触发工作流:
export class Cron implements INodeType {description: INodeTypeDescription = {// 节点描述};async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {// 设置定时器的逻辑}
}
2.3.3 Webhook 节点
Webhook 节点实现 webhook 方法来处理外部请求:
export class Webhook implements INodeType {description: INodeTypeDescription = {// 节点描述};async webhook(this: IWebhookFunctions): Promise<IWebhookResponseData> {// 处理 Webhook 请求的逻辑}
}
INodeType 接口是 n8n 节点系统的核心,它:
- 标准化节点结构 - 确保所有节点遵循统一的规范
- 支持多种执行模式 - 通过不同的方法支持不同类型节点的需求
- 提供扩展能力 - 通过 methods 和 customOperations 支持丰富的功能
- 增强用户体验 - 通过动态配置和搜索功能提升交互体验
这个接口设计使得 n8n 能够支持各种类型的节点,从简单的数据处理节点到复杂的 AI 节点,都能够在统一的框架下工作。
2.4 节点类型描述结构
INodeTypeBaseDescription 节点类型基础描述结构(节点类型描述基类)
type NodeGroupType = 'input' | 'output' | 'organization' | 'schedule' | 'transform' | 'trigger';
export interface INodeTypeBaseDescription {displayName: string;name: string;icon?: Icon;iconColor?: ThemeIconColor;iconUrl?: Themed<string>;badgeIconUrl?: Themed<string>;group: NodeGroupType[];description: string;documentationUrl?: string;subtitle?: string;defaultVersion?: number;codex?: CodexData;parameterPane?: 'wide';/*** Whether the node must be hidden in the node creator panel,* due to deprecation or as a special case (e.g. Start node)*/hidden?: true;/*** Whether the node will be wrapped for tool-use by AI Agents,* optionally replacing provided parts of the description*/usableAsTool?: true | UsableAsToolDescription;
}
INodeTypeDescription 节点类型描述结构
export interface INodeTypeDescription extends INodeTypeBaseDescription {version: number | number[]; // 定义节点的版本号,可以是单个版本号或版本号数组defaults: NodeDefaults; // 节点的默认设置,如默认名称和颜色eventTriggerDescription?: string;activationMessage?: string;// 输入输出配置 // inputs/outputs: 定义节点的输入和输出连接类型// requiredInputs: 指定哪些输入是必需的(仅在 executionOrder 为 "v1" 时可用)// inputNames/outputNames: 为输入和输出连接点提供自定义名称inputs: Array<NodeConnectionType | INodeInputConfiguration> | ExpressionString;requiredInputs?: string | number[] | number; // Ony available with executionOrder => "v1"inputNames?: string[];outputs: Array<NodeConnectionType | INodeOutputConfiguration> | ExpressionString;outputNames?: string[];properties: INodeProperties[]; // 定义节点的配置属性,即用户在节点设置面板中看到的选项credentials?: INodeCredentialDescription[]; // 定义节点可以使用的凭证类型maxNodes?: number; // How many nodes of that type can be created in a workflowpolling?: true | undefined; // 标识是否支持轮询功能supportsCORS?: true | undefined; // 标识是否支持CORSrequestDefaults?: DeclarativeRestApiSettings.HttpRequestOptions;// 为声明式节点提供默认的 HTTP 请求设置配置requestOperations?: IN8nRequestOperations;// 为声明式节点提供默认的 HTTP 请求操作配置hooks?: {[key: string]: INodeHookDescription[] | undefined;activate?: INodeHookDescription[];deactivate?: INodeHookDescription[];};webhooks?: IWebhookDescription[]; // 定义节点支持的 webhook 配置,用于接收外部事件触发translation?: { [key: string]: object };mockManualExecution?: true; // 标识是否支持手动执行模拟triggerPanel?: TriggerPanelDefinition | boolean;extendsCredential?: string;hints?: NodeHint[];communityNodePackageVersion?: string;waitingNodeTooltip?: string;__loadOptionsMethods?: string[]; // only for validation during build
}
INodeTypeDescription 接口是 n8n 中描述节点类型信息的核心数据结构。它继承自 INodeTypeBaseDescription,扩展了更多关于节点行为和配置的详细信息。这个接口定义了节点在 UI 中如何展示以及在执行时如何工作。
 
2.4.1 HTTP Request 节点描述示例
{"name": "httpRequest","version": 1,"defaults": {"name": "HTTP Request"},"inputs": ["main"],"outputs": ["main"],"properties": [{"displayName": "HTTP Method","name": "method","type": "options","options": [{"name": "GET", "value": "GET"},{"name": "POST", "value": "POST"}],"default": "GET"}],"credentials": [{"name": "httpBasicAuth","required": false}]
}
2.4.2 AI 节点描述示例
{"name": "aiLanguageModel","version": 1,"defaults": {"name": "AI Language Model"},"inputs": [{"type": "ai_languageModel", "required": true}],"outputs": [{"type": "ai_languageModel"}],"properties": [{"displayName": "Model","name": "model","type": "string","default": "gpt-3.5-turbo"}]
}
3、IConnections - 连接结构
连接定义了节点之间的数据流关系:
// NodeInputConnections 是一个数组,表示特定类型输入的连接集合。
// 数组中的每个元素代表一个输入索引位置的连接列表。
export type NodeInputConnections = Array<IConnection[] | null>;// INodeConnections 表示某个节点的所有输出连接。
// 它是一个以连接类型为键的对象,值是 NodeInputConnections 类型。
export interface INodeConnections {// Input name 连接类型为键[key: string]: NodeInputConnections; // 值是一个类型不是对象
}// IConnections 接口表示整个工作流的所有连接关系。
// 它是一个以节点名称为键的对象,每个节点对应一个 INodeConnections 对象。
export interface IConnections {// Node name 节点名称为键[key: string]: INodeConnections;
}// 接口表示一个单一的连接,描述从一个节点的输出到另一个节点输入的连接关系
export interface IConnection {node: string;   // 连接去向的节点 - 目标节点名称type: NodeConnectionType;  // 目标节点输入类型 - 连接类型index: number;  // 目标节点的输入/输出索引(当节点具有多个同类型输入输出时使用)
}// 代码中使用相对较少,用于表示节点间的连接索引信息,特别是处理节点间数据配对时
// 跟踪源节点和目标节点的连接索引
// 在处理pairdItem配对项目时,用于标识数据的来源和目标位置【更像是一个辅助结构】
export interface INodeConnection {sourceIndex: number;  // 源节点输出的索引位置destinationIndex: number; // 目标节点输入的索引位置
}// 连接类型 NodeConnectionType
export const NodeConnectionTypes = {AiAgent: 'ai_agent',AiChain: 'ai_chain',AiDocument: 'ai_document',AiEmbedding: 'ai_embedding',AiLanguageModel: 'ai_languageModel',AiMemory: 'ai_memory',AiOutputParser: 'ai_outputParser',AiRetriever: 'ai_retriever',AiReranker: 'ai_reranker',AiTextSplitter: 'ai_textSplitter',AiTool: 'ai_tool',AiVectorStore: 'ai_vectorStore',Main: 'main',
} as const;
示例:
 工作流图:
 
对应的结构如下:
{"HTTP Request": {"main": [[{"node": "Function","type": "main","index": 0}]]},"Manual Trigger": {"main": [[{"node": "Function","type": "main","index": 1}]]}
}
- “HTTP Request” 节点有一个 main 类型的输出连接到 “Function” 节点的第 0 个 main 输入
- “Manual Trigger” 节点有一个 main 类型的输出连接到 “Function” 节点的第 1 个 main 输入
层次关系
 
解释:
- IConnections - 最外层,以节点名称为键
- INodeConnections - 中间层,以连接类型为键(如 “main”, “ai_languageModel” 等)
- NodeInputConnections - 数组,以输入索引为数组索引
- IConnection[] - 数组,包含实际的连接信息
- IConnection - 单个连接的具体信息
4、INodeParameters - 节点参数
节点参数定义了节点的具体配置,根据节点类型不同而不同:
 定义了节点参数的结构和可能的值的类型
// INodeParameters 是一个以字符串为键、NodeParameterValueType 为值的对象 
// 表示节点的所有参数
export interface INodeParameters {[key: string]: NodeParameterValueType;
}// 是一个联合类型,表示节点参数可能的值类型
export type NodeParameterValueType =// TODO: Later also has to be possible to add multiple ones with the name name. So array has to be possible| NodeParameterValue    // 基本参数值类型(string|number|boolean|undefined|null) | INodeParameters    // 嵌套的参数对象| INodeParameterResourceLocator  // 资源定位器类型| ResourceMapperValue  // 资源映射器值| FilterValue     // 过滤器值| AssignmentCollectionValue  // 赋值集合值| NodeParameterValue[]   // 数组类型 | INodeParameters[]        // 数组类型| INodeParameterResourceLocator[]   // 数组类型| ResourceMapperValue[];   // 数组类型
4.1 类型关系和嵌套结构
这两个类型是相互递归的:
- INodeParameters 包含 NodeParameterValueType 类型的值
- NodeParameterValueType 可以是 INodeParameters 类型
这种设计允许参数具有复杂的嵌套结构:
4.2 应用实例
简单结构
{"operation": "get","resource": "user","userId": 123
}
复杂嵌套结构
{"operation": "create","resource": "user","additionalFields": {"name": "John Doe","email": "john.doe@example.com","address": {"street": "123 Main St","city": "New York","zipcode": "10001"}},"tags": ["customer", "vip"]
}
4.3 特殊参数类型
- ResourceMapperValue 资源映射器用于映射字段:
type ResourceMapperValue = {mappingMode: string;value: { [key: string]: string | number | boolean | null } | null;matchingColumns: string[];schema: ResourceMapperField[];
};
- FilterValue 过滤器用于定义过滤条件:
type FilterValue = {options: FilterOptionsValue;conditions: FilterConditionValue[];combinator: 'and' | 'or';
};
5、INodeExecutionData - 节点执行数据
节点执行时处理的数据结构:
export interface INodeExecutionData {[key: string]:| IDataObject| IBinaryKeyData| IPairedItemData| IPairedItemData[]| NodeApiError| NodeOperationError| number| string| undefined;json: IDataObject;  // 核心数据对象,存储节点的主要输出数据binary?: IBinaryKeyData;  // 可选的二进制数据(如文件、图片等)error?: NodeApiError | NodeOperationError;  // 节点执行错误信pairedItem?: IPairedItemData | IPairedItemData[] | number; // 用于追踪数据来源,标识当前数据项与上游节点数据项的对应关系metadata?: {subExecution: RelatedExecution;};  // 元数据信息,例如相关的子执行信息evaluationData?: Record<string, GenericValue>;  // 评估相关数据/*** Use this key to send a message to the chat.** - Workflow has to be started by a chat node.* - Put execution to wait after sending.** See example in* packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/Chat.node.ts*/sendMessage?: string;   // 用于聊天功能的消息发送/*** @deprecated This key was added by accident and should not be used as it* will be removed in future. For more information see PR #12469.*/index?: number; // 已废弃的索引字段
}
二、工作流执行相关数据结构
1、IRunExecutionData - 执行数据
包含工作流执行过程中产生的所有数据:
该接口封装了一个工作流执行实例的所有相关数据,包括:
- 启动执行的初始条件
- 执行结果数据
- 实际执行过程中的上下文信息
- 等待执行的信息
- 父级执行引用(用于子工作流)
- 手动执行相关数据
// Contains all the data which is needed to execute a workflow and so also to
// start restart it again after it did fail.
// The RunData, ExecuteData and WaitForExecution contain often the same data.
export interface IRunExecutionData {startData?: {  // 包含工作流启动的相关信息startNodes?: StartNodeData[];  // 起始节点列表destinationNode?: string; // 目标节点(用于测试特定节点)originalDestinationNode?: string;runNodeFilter?: string[]; // 需要运行的节点过滤器};resultData: {  // 包含执行结果数据error?: ExecutionError; // 执行错误信息runData: IRunData; // 节点运行数据记录pinData?: IPinData; // 固定数据(用户手动设置的测试数据)lastNodeExecuted?: string; // 最后执行的节点名称metadata?: Record<string, string>; // 执行元数据};executionData?: { // 包含实际执行过程中使用的数据:contextData: IExecuteContextData;  // 执行上下文数据nodeExecutionStack: IExecuteData[]; // 节点执行堆栈metadata: {  // 节点任务元数据// node-name: metadata by runIndex[key: string]: ITaskMetadata[];};waitingExecution: IWaitingForExecution; // 等待执行的任务waitingExecutionSource: IWaitingForExecutionSource | null; // 等待执行的源信息};parentExecution?: RelatedExecution; // 父级执行信息(用于嵌套工作流)/*** This is used to prevent breaking change* for waiting executions started before signature validation was added*/validateSignature?: boolean; // 是否验证签名waitTill?: Date; // 等待到指定时间pushRef?: string; // 推送引用标识/** Data needed for a worker to run a manual execution. */manualData?: Pick<   // 手动执行所需数据IWorkflowExecutionDataProcess,'dirtyNodeNames' | 'triggerToStartFrom' | 'userId'>;
}
2、ITaskData - 任务数据
单个节点执行的结果数据:
 ITaskData 接口表示一个节点任务执行完成后的结果数据。它继承自 ITaskStartedData 接口,并添加了执行完成后特有的属性。
/** The data that gets returned when a node execution ends */
export interface ITaskData extends ITaskStartedData {executionTime: number; // 执行耗时(毫秒),记录节点执行所花费的时间executionStatus?: ExecutionStatus;// 执行状态(可选),表明节点执行的成功或失败状态data?: ITaskDataConnections; // 执行结果数据(可选),包含节点输出到各个连接的数据inputOverride?: ITaskDataConnections; // 输入覆盖数据(可选),可能用于测试或特殊情况下的输入替换error?: ExecutionError;  // 执行错误信息(可选),如果执行失败,则包含具体的错误详情metadata?: ITaskMetadata; // 任务元数据(可选),包含关于执行任务的额外信息
}
这个接口主要用于:
- 记录和跟踪每个节点的执行结果
- 提供执行性能指标(executionTime)
- 存储节点输出数据以便后续节点使用
- 错误处理和调试信息收集
- 工作流执行历史记录和监控
by 久违 2025.10.24
