当前位置: 首页 > news >正文

Coze源码分析-资源库-创建工作流-后端源码-IDL/API/应用/领域层

Coze源码分析-资源库-创建工作流-后端源码

前言

本文档深入分析Coze平台中用户在资源库中创建工作流的后端实现机制。通过对源码的详细解读,帮助开发者理解工作流创建的完整技术架构、数据流转过程以及核心业务逻辑。

工作流作为Coze平台的核心功能之一,为用户提供了强大的自动化流程编排能力。用户可以通过可视化界面创建复杂的业务流程,系统会将这些流程转换为可执行的工作流实例。本文将从技术实现角度,全面剖析工作流创建功能的后端架构设计。

项目架构概览

整体架构设计

┌─────────────────────────────────────────────────────────────┐
│                    IDL接口定义层                             │
│  ┌─────────────┐  ┌─────────────┐    ┌─────────────┐        │
│  │ base.thrift │  │workflow.    │    │ api.thrift  │        │
│  │             │  │thrift       │    │             │        │
│  │             │  │             │    │             │        │
│  └─────────────┘  └─────────────┘    └─────────────┘        │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│                    API网关层                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │
│  │ Workflow    │  │ Hertz       │  │ Middleware  │          │
│  │ Handler     │  │ Router      │  │ 中间件       │          │
│  └─────────────┘  └─────────────┘  └─────────────┘          │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│                   应用服务层                                 │
│  ┌─────────────────────────────────────────────────────┐    │
│  │         WorkflowApplicationService                  │    │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐   │    │
│  │  │CreateWork   │ │SaveWorkflow │ │UpdateWork   │   │    │
│  │  │flow         │ │             │ │flowMeta     │   │    │
│  │  └─────────────┘ └─────────────┘ └─────────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│                   领域服务层                                 │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              WorkflowService                        │    │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐   │    │
│  │  │Create       │ │Save         │ │ListNodeMeta │   │    │
│  │  │             │ │             │ │             │   │    │
│  │  └─────────────┘ └─────────────┘ └─────────────┘   │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│                   数据访问层                                 │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              WorkflowRepository                     │    │
│  │  ┌──── ─────── ──── ──┐  ┌─────────────────────────┐│    │
│  │  │WorkflowDAO         │  │workflow.gen.go          ││    │
│  │  │CreateMeta          │  │workflow_version.gen.go  ││    │
│  │  │CreateVersion       │  │GORM Generated Code      ││    │
│  │  └──── ─────── ─── ───┘  └─────────────────────────┘│    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│                   基础设施层                                 │
│              ┌─ ─ ─── ─ ── ── ─ ─ ─┐                        │
│              │       gorm.DB       │                        │
│              │ es.Client redisImpl │                        │
│              └── ─ ── ── ─ ── ── ─ ┘                        │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│                   存储服务层                                 │
│  ┌─────────────────────────────────────────────────────┐    │
│  │       MySQL数据库       Redis数据库                  │    │
│  │            ElasticSearch数据库                      │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

核心技术栈

  • 后端框架: Go + Hertz (字节跳动开源的高性能HTTP框架)
  • 数据库: MySQL 8.0 (主数据存储)
  • 搜索引擎: ElasticSearch (工作流搜索和索引)
  • 消息队列: Kafka (异步事件处理)
  • 缓存: Redis (高频数据缓存)
  • 工作流引擎: 自研工作流执行引擎
  • API协议: gRPC (内部服务通信) + HTTP (外部API)

工作流创建核心模块

  1. 工作流定义管理: 处理工作流模板的创建、编辑和版本控制
  2. 节点配置服务: 管理工作流中各种节点的配置和参数
  3. 流程编排引擎: 负责工作流的逻辑编排和执行路径规划
  4. 权限控制系统: 确保工作流创建和访问的安全性
  5. 版本管理服务: 支持工作流的版本控制和回滚机制

1. IDL接口定义层

1.1 工作流创建相关IDL定义

文件位置: idl/workflow/workflow.thrift

该文件定义了工作流相关的Thrift IDL接口,由thriftgo工具自动生成Go代码到 backend/api/model/workflow/workflow.go

核心数据结构定义
// WorkflowMode is used to distinguish between Workflow and chatflow.
enum WorkflowMode {Workflow  = 0   ,Imageflow = 1   ,SceneFlow = 2   ,ChatFlow  = 3   ,All       = 100 , // Use only when querying
}// 创建工作流请求结构
struct CreateWorkflowRequest {1  : required string       name         , // process name2  : required string       desc         , // Process description, not null3  : required string       icon_uri     , // Process icon uri, not nullable4  : required string       space_id     , // Space id, cannot be empty5  : optional WorkflowMode flow_mode    , // Workflow or chatflow, the default is workflow6  : optional SchemaType   schema_type  ,7  : optional string       bind_biz_id  ,8  : optional i32          bind_biz_type, // Bind the business type, do not fill in if necessary. Refer to the BindBizType structure, when the value is 3, it represents the Douyin doppelganger.9  : optional string       project_id   , // Application id, when filled in, it means that the process is the process under the project, and it needs to be released with the project.10 : optional bool         create_conversation, // Whether to create a session, only if flow_mode = chatflow255: optional base.Base    Base     ,
}// 创建工作流响应数据
struct CreateWorkflowData {1: string         workflow_id, // 工作流ID2: string         name,        // 工作流名称3: string         url,         // 工作流URL4: WorkFlowStatus status,      // 工作流状态5: SchemaType     type,        // 模式类型6: list<Node>     node_list,   // 节点列表7: optional string external_flow_info, // 外部流程信息
}// 创建工作流响应结构
struct CreateWorkflowResponse {1  : required CreateWorkflowData data,253: required i64                code,254: required string             msg,255: required base.BaseResp      BaseResp,
}
生成的Go代码结构

文件位置: backend/api/model/workflow/workflow.go

// CreateWorkflowRequest 创建工作流请求(由thriftgo生成)
type CreateWorkflowRequest struct {// process nameName string `thrift:"name,1,required" form:"name,required" json:"name,required" query:"name,required"`// Process description, not nullDesc string `thrift:"desc,2,required" form:"desc,required" json:"desc,required" query:"desc,required"`// Process icon uri, not nullableIconURI string `thrift:"icon_uri,3,required" form:"icon_uri,required" json:"icon_uri,required" query:"icon_uri,required"`// Space id, cannot be emptySpaceID string `thrift:"space_id,4,required" form:"space_id,required" json:"space_id,required" query:"space_id,required"`// Workflow or chatflow, the default is workflowFlowMode   *WorkflowMode `thrift:"flow_mode,5,optional" form:"flow_mode" json:"flow_mode,omitempty" query:"flow_mode"`SchemaType *SchemaType   `thrift:"schema_type,6,optional" form:"schema_type" json:"schema_type,omitempty" query:"schema_type"`BindBizID  *string       `thrift:"bind_biz_id,7,optional" form:"bind_biz_id" json:"bind_biz_id,omitempty" query:"bind_biz_id"`// Bind the business type, do not fill in if necessary. Refer to the BindBizType structure, when the value is 3, it represents the Douyin doppelganger.BindBizType *int32 `thrift:"bind_biz_type,8,optional" form:"bind_biz_type" json:"bind_biz_type,omitempty" query:"bind_biz_type"`// Application id, when filled in, it means that the process is the process under the project, and it needs to be released with the project.ProjectID *string `thrift:"project_id,9,optional" form:"project_id" json:"project_id,omitempty" query:"project_id"`// Whether to create a session, only if flow_mode = chatflowCreateConversation *bool      `thrift:"create_conversation,10,optional" form:"create_conversation" json:"create_conversation,omitempty" query:"create_conversation"`Base               *base.Base `thrift:"Base,255,optional" form:"Base" json:"Base,omitempty" query:"Base"`
}// WorkflowMode 工作流模式枚举
type WorkflowMode int64const (WorkflowMode_Workflow  WorkflowMode = 0WorkflowMode_Imageflow WorkflowMode = 1WorkflowMode_SceneFlow WorkflowMode = 2WorkflowMode_ChatFlow  WorkflowMode = 3WorkflowMode_All       WorkflowMode = 100
)

1.2 工作流状态常量定义

// 工作流状态常量
const (WorkflowStatusDraft     = 1  // 草稿状态WorkflowStatusPublished = 2  // 已发布WorkflowStatusArchived  = 3  // 已归档WorkflowStatusDeleted   = 4  // 已删除
)// 工作流节点类型常量
const (NodeTypeStart      = "start"       // 开始节点NodeTypeEnd        = "end"         // 结束节点NodeTypeCondition  = "condition"   // 条件节点NodeTypeAction     = "action"      // 动作节点NodeTypeLoop       = "loop"        // 循环节点NodeTypeParallel   = "parallel"    // 并行节点NodeTypeSubflow    = "subflow"     // 子流程节点
)// 工作流版本格式
const (VersionFormat = "v%d.%d.%d" // 版本号格式:v1.0.0
)

1.3 Thrift服务接口定义

文件位置: backend/api/model/workflow/workflow_svc.go

项目使用Apache Thrift作为IDL,而非gRPC。服务接口通过thriftgo工具生成。

// WorkflowService Thrift服务接口(由thriftgo生成)
type WorkflowService interface {// 创建工作流CreateWorkflow(ctx context.Context, request *CreateWorkflowRequest) (r *CreateWorkflowResponse, err error)// 获取画布信息GetCanvasInfo(ctx context.Context, request *GetCanvasInfoRequest) (r *GetCanvasInfoResponse, err error)// 保存工作流SaveWorkflow(ctx context.Context, request *SaveWorkflowRequest) (r *SaveWorkflowResponse, err error)// 更新工作流元数据UpdateWorkflowMeta(ctx context.Context, request *UpdateWorkflowMetaRequest) (r *UpdateWorkflowMetaResponse, err error)// 删除工作流DeleteWorkflow(ctx context.Context, request *DeleteWorkflowRequest) (r *DeleteWorkflowResponse, err error)// 发布工作流PublishWorkflow(ctx context.Context, request *PublishWorkflowRequest) (r *PublishWorkflowResponse, err error)// 复制工作流CopyWorkflow(ctx context.Context, request *CopyWorkflowRequest) (r *CopyWorkflowResponse, err error)// 获取工作流列表GetWorkFlowList(ctx context.Context, request *GetWorkFlowListRequest) (r *GetWorkFlowListResponse, err error)// 运行工作流(OpenAPI)OpenAPIRunFlow(ctx context.Context, request *OpenAPIRunFlowRequest) (r *OpenAPIRunFlowResponse, err error)// 流式运行工作流(OpenAPI)OpenAPIStreamRunFlow(ctx context.Context, request *OpenAPIRunFlowRequest) (r *OpenAPIStreamRunFlowResponse, err error)
}// WorkflowServiceClient Thrift客户端实现
type WorkflowServiceClient struct {c thrift.TClient
}// NewWorkflowServiceClient 创建工作流服务客户端
func NewWorkflowServiceClient(c thrift.TClient) *WorkflowServiceClient {return &WorkflowServiceClient{c: c,}
}
Thrift与HTTP的映射关系

项目通过Hertz框架将Thrift服务映射为HTTP接口,具体映射关系在处理器中通过注解定义:

// CreateWorkflow 创建工作流
// @router /api/workflow_api/create [POST]
func CreateWorkflow(ctx context.Context, c *app.RequestContext) {// 处理HTTP请求并调用Thrift服务
}

2. API网关层

2.1 HTTP路由定义

文件位置: backend/api/handler/coze/workflow_service.go

项目使用Hertz框架,通过注解方式定义路由,而非单独的路由注册文件。每个处理器函数通过 @router 注解指定对应的HTTP路径和方法。

工作流相关路由映射
// 工作流核心操作路由
// @router /api/workflow_api/create [POST]          - 创建工作流
// @router /api/workflow_api/canvas [POST]          - 获取画布信息
// @router /api/workflow_api/save [POST]            - 保存工作流
// @router /api/workflow_api/update_meta [POST]     - 更新工作流元数据
// @router /api/workflow_api/delete [POST]          - 删除工作流
// @router /api/workflow_api/batch_delete [POST]    - 批量删除工作流
// @router /api/workflow_api/publish [POST]         - 发布工作流
// @router /api/workflow_api/copy [POST]            - 复制工作流// 工作流查询路由
// @router /api/workflow_api/workflow_list [POST]   - 获取工作流列表
// @router /api/workflow_api/released_workflows [POST] - 获取已发布工作流
// @router /api/workflow_api/get_process [GET]      - 获取工作流进程信息// OpenAPI路由(对外接口)
// @router /v1/workflow/run [POST]                  - 运行工作流
// @router /v1/workflow/stream_run [POST]           - 流式运行工作流
// @router /v1/workflow/stream_resume [POST]        - 恢复流式工作流
// @router /v1/workflows/:workflow_id [GET]         - 获取工作流信息
// @router /v1/workflows/chat [POST]                - 工作流对话
// @router /v1/workflow/conversation/create [POST]  - 创建工作流会话
路由注解说明

项目使用Hertz的代码生成工具,通过注解自动生成路由注册代码:

  1. 内部API路由: 以 /api/workflow_api/ 开头,用于前端界面调用
  2. OpenAPI路由: 以 /v1/ 开头,用于第三方开发者调用
  3. HTTP方法: 主要使用POST方法,部分查询接口使用GET方法
  4. 路径参数: 使用 :workflow_id 等占位符表示路径参数

2.2 工作流创建Handler实现

文件位置: backend/api/handler/coze/workflow_service.go

项目将所有工作流相关的处理器函数集中在一个文件中,使用包级别函数而非结构体方法。

package cozeimport ("context""github.com/cloudwego/hertz/pkg/app""github.com/cloudwego/hertz/pkg/protocol/consts""github.com/coze-dev/coze-studio/backend/api/model/workflow"appworkflow "github.com/coze-dev/coze-studio/backend/application/workflow"
)// CreateWorkflow 创建工作流
// @router /api/workflow_api/create [POST]
func CreateWorkflow(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.CreateWorkflowRequest// 绑定和验证请求参数err = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}// 调用应用服务层创建工作流resp, err := appworkflow.SVC.CreateWorkflow(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}// 返回成功响应c.JSON(consts.StatusOK, resp)
}// GetCanvasInfo 获取画布信息
// @router /api/workflow_api/canvas [POST]
func GetCanvasInfo(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.GetCanvasInfoRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}resp, err := appworkflow.SVC.GetCanvasInfo(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}// SaveWorkflow 保存工作流
// @router /api/workflow_api/save [POST]
func SaveWorkflow(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.SaveWorkflowRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}resp, err := appworkflow.SVC.SaveWorkflow(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}// UpdateWorkflowMeta 更新工作流元数据
// @router /api/workflow_api/update_meta [POST]
func UpdateWorkflowMeta(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.UpdateWorkflowMetaRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}resp, err := appworkflow.SVC.UpdateWorkflowMeta(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}// DeleteWorkflow 删除工作流
// @router /api/workflow_api/delete [POST]
func DeleteWorkflow(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.DeleteWorkflowRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}resp, err := appworkflow.SVC.DeleteWorkflow(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}
错误处理函数

项目使用统一的错误处理函数:

// invalidParamRequestResponse 参数错误响应
func invalidParamRequestResponse(c *app.RequestContext, msg string) {// 统一的参数错误处理逻辑
}// internalServerErrorResponse 内部服务器错误响应
func internalServerErrorResponse(ctx context.Context, c *app.RequestContext, err error) {// 统一的服务器错误处理逻辑
}
应用服务调用

处理器通过 appworkflow.SVC 全局变量调用应用服务层:

// appworkflow.SVC 是应用服务层的全局实例
resp, err := appworkflow.SVC.CreateWorkflow(ctx, &req)

2.3 请求参数验证

项目使用Hertz框架内置的参数验证机制,通过结构体标签进行验证,而非单独的验证器文件。

Thrift生成的验证标签

文件位置: backend/api/model/workflow/workflow.go

// CreateWorkflowRequest 结构体中的验证标签
type CreateWorkflowRequest struct {// process nameName string `thrift:"name,1,required" form:"name,required" json:"name,required" query:"name,required"`// Process description, not nullDesc string `thrift:"desc,2,required" form:"desc,required" json:"desc,required" query:"desc,required"`// Process icon uri, not nullableIconURI string `thrift:"icon_uri,3,required" form:"icon_uri,required" json:"icon_uri,required" query:"icon_uri,required"`// Space id, cannot be emptySpaceID string `thrift:"space_id,4,required" form:"space_id,required" json:"space_id,required" query:"space_id,required"`// Workflow or chatflow, the default is workflowFlowMode   *WorkflowMode `thrift:"flow_mode,5,optional" form:"flow_mode" json:"flow_mode,omitempty" query:"flow_mode"`SchemaType *SchemaType   `thrift:"schema_type,6,optional" form:"schema_type" json:"schema_type,omitempty" query:"schema_type"`BindBizID  *string       `thrift:"bind_biz_id,7,optional" form:"bind_biz_id" json:"bind_biz_id,omitempty" query:"bind_biz_id"`// Bind the business type, do not fill in if necessary. Refer to the BindBizType structure, when the value is 3, it represents the Douyin doppelganger.BindBizType *int32 `thrift:"bind_biz_type,8,optional" form:"bind_biz_type" json:"bind_biz_type,omitempty" query:"bind_biz_type"`// Application id, when filled in, it means that the process is the process under the project, and it needs to be released with the project.ProjectID *string `thrift:"project_id,9,optional" form:"project_id" json:"project_id,omitempty" query:"project_id"`// Whether to create a session, only if flow_mode = chatflowCreateConversation *bool      `thrift:"create_conversation,10,optional" form:"create_conversation" json:"create_conversation,omitempty" query:"create_conversation"`Base               *base.Base `thrift:"Base,255,optional" form:"Base" json:"Base,omitempty" query:"Base"`
}
Hertz框架验证机制
// 在处理器中使用 c.BindAndValidate() 进行参数绑定和验证
func CreateWorkflow(ctx context.Context, c *app.RequestContext) {var req workflow.CreateWorkflowRequest// BindAndValidate 会根据结构体标签自动进行参数验证err = c.BindAndValidate(&req)if err != nil {// 验证失败时返回错误信息invalidParamRequestResponse(c, err.Error())return}// 验证通过后继续处理业务逻辑// ...
}
业务层验证

复杂的业务逻辑验证在应用服务层进行:

文件位置: backend/application/workflow/workflow.go

func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (_ *workflow.CreateWorkflowResponse, err error,
) {// 获取用户IDuID := ctxutil.MustGetUIDFromCtx(ctx)// 解析和验证SpaceIDspaceID := mustParseInt64(req.GetSpaceID())// 检查用户空间权限if err := checkUserSpace(ctx, uID, spaceID); err != nil {return nil, err}// 其他业务逻辑验证...
}
验证错误处理
// invalidParamRequestResponse 统一处理参数验证错误
func invalidParamRequestResponse(c *app.RequestContext, msg string) {response := map[string]interface{}{"code": 400,"msg":  "参数错误: " + msg,}c.JSON(consts.StatusBadRequest, response)
}

项目通过这种方式实现了分层验证:

  1. 结构体标签验证: 基础的字段类型和必填验证
  2. 业务逻辑验证: 复杂的业务规则验证
  3. 权限验证: 用户权限和资源访问验证

3. 应用服务层

WorkflowApplicationService初始化

文件位置:backend/application/workflow/workflow.go

WorkflowApplicationService是工作流应用服务层的核心组件,专门负责处理工作流资源的创建等业务逻辑,是连接API层和领域层的重要桥梁。在用户点击"资源库" → 接着点击右上角的"+"号 → 最后点击弹出菜单中的"工作流"的场景中,该服务承担着核心的创建业务处理职责。

服务结构定义

文件位置:backend/application/workflow/workflow.go

type ApplicationService struct {DomainSVC   domainWorkflow.ServiceImageX      imagex.ImageX // 图片服务代理,用于获取认证令牌TosClient   storage.StorageIDGenerator idgen.IDGenerator
}var SVC = &ApplicationService{}func GetWorkflowDomainSVC() domainWorkflow.Service {return SVC.DomainSVC
}

服务结构特点

  1. 领域服务集成:通过DomainSVC字段集成工作流领域服务,实现业务逻辑的分层处理
  2. 外部服务依赖:集成图片服务(ImageX)和对象存储服务(TosClient),支持工作流资源管理
  3. ID生成器:内置ID生成器,确保工作流资源的唯一标识
  4. 全局单例:通过SVC全局变量提供服务实例,便于其他组件调用

创建工作流核心实现

CreateWorkflow方法详解

文件位置:backend/application/workflow/workflow.go

当用户在资源库中点击右上角的"+"号,然后点击弹出菜单中的"工作流"时,前端会调用CreateWorkflow方法来创建新的工作流资源。

func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (_ *workflow.CreateWorkflowResponse, err error,
) {defer func() {if panicErr := recover(); panicErr != nil {err = safego.NewPanicErr(panicErr, debug.Stack())}if err != nil {err = vo.WrapIfNeeded(errno.ErrWorkflowOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))}}()// 获取用户ID并验证空间权限uID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())if err := checkUserSpace(ctx, uID, spaceID); err != nil {return nil, err}// 处理ChatFlow模式的会话创建var createConversation boolif req.ProjectID != nil && req.IsSetFlowMode() && req.GetFlowMode() == workflow.WorkflowMode_ChatFlow && req.IsSetCreateConversation() && req.GetCreateConversation() {createConversation = true_, err := GetWorkflowDomainSVC().CreateDraftConversationTemplate(ctx, &vo.CreateConversationTemplateMeta{AppID:   mustParseInt64(req.GetProjectID()),UserID:  uID,SpaceID: spaceID,Name:    req.Name,})if err != nil {return nil, err}}// 构建工作流元数据wf := &vo.MetaCreate{CreatorID:        uID,SpaceID:          spaceID,ContentType:      workflow.WorkFlowType_User,Name:             req.Name,Desc:             req.Desc,IconURI:          req.IconURI,AppID:            parseInt64(req.ProjectID),Mode:             ternary.IFElse(req.IsSetFlowMode(), req.GetFlowMode(), workflow.WorkflowMode_Workflow),InitCanvasSchema: vo.GetDefaultInitCanvasJsonSchema(i18n.GetLocale(ctx)),}// 为ChatFlow模式设置特殊的画布模式if req.IsSetFlowMode() && req.GetFlowMode() == workflow.WorkflowMode_ChatFlow {conversationName := req.Nameif !req.IsSetProjectID() || mustParseInt64(req.GetProjectID()) == 0 || !createConversation {conversationName = "Default"}wf.InitCanvasSchema = vo.GetDefaultInitCanvasJsonSchemaChat(i18n.GetLocale(ctx), conversationName)}// 调用领域服务创建工作流id, err := GetWorkflowDomainSVC().Create(ctx, wf)if err != nil {return nil, err}// 发布工作流资源事件err = PublishWorkflowResource(ctx, id, ptr.Of(int32(wf.Mode)), search.Created, &search.ResourceDocument{Name:          &wf.Name,APPID:         wf.AppID,SpaceID:       &wf.SpaceID,OwnerID:       &wf.CreatorID,PublishStatus: ptr.Of(resource.PublishStatus_UnPublished),CreateTimeMS:  ptr.Of(time.Now().UnixMilli()),})if err != nil {return nil, vo.WrapError(errno.ErrNotifyWorkflowResourceChangeErr, err)}return &workflow.CreateWorkflowResponse{Data: &workflow.CreateWorkflowData{WorkflowID: strconv.FormatInt(id, 10),},}, nil
}

方法功能特点

  1. 异常处理:使用defer机制捕获panic和包装错误,确保系统稳定性
  2. 权限验证:验证用户身份和空间访问权限,确保安全性
  3. 多模式支持:支持标准工作流和ChatFlow两种模式,ChatFlow模式会自动创建会话模板
  4. 国际化支持:根据用户语言环境初始化不同的画布模式
  5. 事件发布:创建完成后发布资源事件,支持搜索索引和其他下游处理
  6. 错误包装:完善的错误处理和包装机制,便于问题定位

其他核心应用服务方法

SaveWorkflow - 保存工作流
func (w *ApplicationService) SaveWorkflow(ctx context.Context, req *workflow.SaveWorkflowRequest) (_ *workflow.SaveWorkflowResponse, err error,
) {// 权限验证if err := checkUserSpace(ctx, ctxutil.MustGetUIDFromCtx(ctx), mustParseInt64(req.GetSpaceID())); err != nil {return nil, err}// 调用领域服务保存工作流if err := GetWorkflowDomainSVC().Save(ctx, mustParseInt64(req.WorkflowID), req.GetSchema()); err != nil {return nil, err}return &workflow.SaveWorkflowResponse{Data: &workflow.SaveWorkflowData{},}, nil
}
UpdateWorkflowMeta - 更新工作流元数据
func (w *ApplicationService) UpdateWorkflowMeta(ctx context.Context, req *workflow.UpdateWorkflowMetaRequest) (_ *workflow.UpdateWorkflowMetaResponse, err error,
) {// 权限验证if err := checkUserSpace(ctx, ctxutil.MustGetUIDFromCtx(ctx), mustParseInt64(req.GetSpaceID())); err != nil {return nil, err}workflowID := mustParseInt64(req.GetWorkflowID())// 更新元数据err = GetWorkflowDomainSVC().UpdateMeta(ctx, workflowID, &vo.MetaUpdate{Name:         req.Name,Desc:         req.Desc,IconURI:      req.IconURI,WorkflowMode: req.FlowMode,})if err != nil {return nil, err}// 异步发布更新事件safego.Go(ctx, func() {err := PublishWorkflowResource(ctx, workflowID, nil, search.Updated, &search.ResourceDocument{Name:         req.Name,UpdateTimeMS: ptr.Of(time.Now().UnixMilli()),})if err != nil {logs.CtxErrorf(ctx, "publish update workflow resource failed, workflowID: %d, err: %v", workflowID, err)}})return &workflow.UpdateWorkflowMetaResponse{}, nil
}
创建工作流业务流程

创建工作流的完整业务流程包括以下几个关键步骤:

  1. 身份验证:验证用户登录状态和空间访问权限
  2. 参数验证:验证工作流名称、描述、模式等必要参数
  3. 模式处理:根据工作流模式(Workflow/ChatFlow)进行特殊处理
  4. 画布初始化:根据用户语言环境和工作流模式初始化默认画布
  5. 执行创建操作:调用领域服务执行实际的创建操作
  6. 事件发布:发布创建事件,通知搜索引擎等下游系统
  7. 返回响应:返回创建操作的结果,包含新创建的工作流ID

创建操作的安全性保障

  • 权限控制:严格的用户身份验证和空间权限检查
  • 数据验证:完整的参数验证机制,确保创建数据的有效性
  • 异常处理:完善的panic捕获和错误处理机制
  • 事件驱动:异步事件处理,确保相关数据的同步更新
  • 资源隔离:通过SpaceID实现多租户资源隔离

4. 领域服务层

工作流领域服务层架构

工作流领域服务层是Coze Studio中处理工作流业务逻辑的核心层,负责工作流资源的创建、管理和业务规则实现。该层采用领域驱动设计(DDD)模式,将业务逻辑与数据访问分离,确保代码的可维护性和可扩展性。

工作流领域服务接口定义

文件位置:backend/domain/workflow/interface.go

工作流领域服务接口定义了工作流管理的核心业务能力,包括工作流资源的完整生命周期管理。

type Service interface {// 核心工作流管理方法ListNodeMeta(ctx context.Context, nodeTypes map[entity.NodeType]bool) (map[string][]*entity.NodeTypeMeta, []entity.Category, error)Create(ctx context.Context, meta *vo.MetaCreate) (int64, error)Save(ctx context.Context, id int64, schema string) errorGet(ctx context.Context, policy *vo.GetPolicy) (*entity.Workflow, error)MGet(ctx context.Context, policy *vo.MGetPolicy) ([]*entity.Workflow, int64, error)Delete(ctx context.Context, policy *vo.DeletePolicy) (ids []int64, err error)Publish(ctx context.Context, policy *vo.PublishPolicy) (err error)UpdateMeta(ctx context.Context, id int64, metaUpdate *vo.MetaUpdate) (err error)CopyWorkflow(ctx context.Context, workflowID int64, policy vo.CopyWorkflowPolicy) (*entity.Workflow, error)WorkflowSchemaCheck(ctx context.Context, wf *entity.Workflow, checks []workflow.CheckType) ([]*workflow.CheckResult, error)// 工作流验证和查询QueryNodeProperties(ctx context.Context, id int64) (map[string]*vo.NodeProperty, error)ValidateTree(ctx context.Context, id int64, validateConfig vo.ValidateTreeConfig) ([]*workflow.ValidateTreeInfo, error)GetWorkflowReference(ctx context.Context, id int64) (map[int64]*vo.Meta, error)// 工作流执行相关ExecutableAsTool// 应用相关工作流管理ReleaseApplicationWorkflows(ctx context.Context, appID int64, config *vo.ReleaseWorkflowConfig) ([]*vo.ValidateIssue, error)CopyWorkflowFromAppToLibrary(ctx context.Context, workflowID int64, appID int64, related vo.ExternalResourceRelated) (*entity.CopyWorkflowFromAppToLibraryResult, error)DuplicateWorkflowsByAppID(ctx context.Context, sourceAPPID, targetAppID int64, related vo.ExternalResourceRelated) ([]*entity.Workflow, error)GetWorkflowDependenceResource(ctx context.Context, workflowID int64) (*vo.DependenceResource, error)SyncRelatedWorkflowResources(ctx context.Context, appID int64, relatedWorkflows map[int64]entity.IDVersionPair, related vo.ExternalResourceRelated) error// ChatFlow相关ChatFlowRoleConversation// 其他功能BindConvRelatedInfo(ctx context.Context, convID int64, info entity.ConvRelatedInfo) errorGetConvRelatedInfo(ctx context.Context, convID int64) (*entity.ConvRelatedInfo, bool, func() error, error)Suggest(ctx context.Context, input *vo.SuggestInfo) ([]string, error)
}

核心接口功能

  1. 工作流资源管理:创建、获取、更新、删除、复制工作流资源
  2. 创建操作核心:Create方法是创建工作流的核心业务接口
  3. 节点管理:ListNodeMeta方法提供工作流节点模板信息
  4. 画布管理:Save方法处理工作流画布数据的保存
  5. 验证功能:提供工作流结构验证和节点属性查询
  6. 执行支持:集成工作流执行引擎和工具化能力
  7. 应用集成:支持应用级别的工作流管理和同步
工作流领域服务实现

文件位置:backend/domain/workflow/service/service_impl.go

工作流领域服务的具体实现,负责处理工作流的核心业务逻辑。

type impl struct {repo workflow.Repository*asToolImpl*executableImpl*conversationImpl
}func NewWorkflowService(repo workflow.Repository) workflow.Service {return &impl{repo: repo,asToolImpl: &asToolImpl{repo: repo,},executableImpl: &executableImpl{repo: repo,},conversationImpl: &conversationImpl{repo: repo},}
}

核心业务方法实现

ListNodeMeta - 获取节点模板信息
func (i *impl) ListNodeMeta(_ context.Context, nodeTypes map[entity.NodeType]bool) (map[string][]*entity.NodeTypeMeta, []entity.Category, error) {// 初始化结果映射nodeMetaMap := make(map[string][]*entity.NodeTypeMeta)// 检查节点类型是否应该包含的辅助函数shouldInclude := func(meta *entity.NodeTypeMeta) bool {if meta.Disabled {return false}nodeType := meta.Keyif nodeTypes == nil || len(nodeTypes) == 0 {return true // 无过滤器,包含所有}_, ok := nodeTypes[nodeType]return ok}// 处理标准节点类型for _, meta := range entity.NodeTypeMetas {if shouldInclude(meta) {nodeMetaMap[meta.Category] = append(nodeMetaMap[meta.Category], meta)}}return nodeMetaMap, entity.Categories, nil
}
Create - 创建工作流
func (i *impl) Create(ctx context.Context, meta *vo.MetaCreate) (int64, error) {// 创建工作流元数据id, err := i.repo.CreateMeta(ctx, &vo.Meta{CreatorID:   meta.CreatorID,SpaceID:     meta.SpaceID,ContentType: meta.ContentType,Name:        meta.Name,Desc:        meta.Desc,IconURI:     meta.IconURI,AppID:       meta.AppID,Mode:        meta.Mode,})if err != nil {return 0, err}// 保存初始化的画布信息到草稿if err = i.Save(ctx, id, meta.InitCanvasSchema); err != nil {return 0, err}return id, nil
}
Save - 保存工作流画布
func (i *impl) Save(ctx context.Context, id int64, schema string) (err error) {// 解析画布数据var draft vo.Canvasif err = sonic.UnmarshalString(schema, &draft); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}// 提取输入输出参数var inputParams, outputParams stringinputs, outputs := extractInputsAndOutputsNamedInfoList(&draft)if inputParams, err = sonic.MarshalString(inputs); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}if outputParams, err = sonic.MarshalString(outputs); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}// 计算测试运行成功状态testRunSuccess, err := i.calculateTestRunSuccess(ctx, &draft, id)if err != nil {return err}// 生成新的提交IDcommitID, err := i.repo.GenID(ctx)if err != nil {return vo.WrapError(errno.ErrIDGenError, err)}// 创建或更新草稿return i.repo.CreateOrUpdateDraft(ctx, id, &vo.DraftInfo{Canvas: schema,DraftMeta: &vo.DraftMeta{TestRunSuccess: testRunSuccess,Modified:       true,},InputParamsStr:  inputParams,OutputParamsStr: outputParams,CommitID:        strconv.FormatInt(commitID, 10),})
}

工作流业务逻辑特点

  1. 组合模式:通过组合asToolImpl、executableImpl、conversationImpl等实现,提供完整的工作流功能
  2. 画布处理:专门处理工作流画布的序列化、反序列化和验证
  3. 参数提取:自动提取工作流的输入输出参数,便于后续执行
  4. 状态管理:维护工作流的测试运行状态和修改状态
  5. 版本控制:通过CommitID实现工作流的版本管理
  6. 错误处理:完善的错误包装和处理机制

文章转载自:

http://EGb3iKMI.xfdkh.cn
http://qhoJYwSh.xfdkh.cn
http://hNRulfpE.xfdkh.cn
http://HmdSVoBR.xfdkh.cn
http://pG0Ly5oK.xfdkh.cn
http://FiuQElK2.xfdkh.cn
http://VA7iVugL.xfdkh.cn
http://BWcsk8CE.xfdkh.cn
http://vbcnrUTV.xfdkh.cn
http://WxpHpFlX.xfdkh.cn
http://mpAku9br.xfdkh.cn
http://I9bLSiFq.xfdkh.cn
http://WTIfXPOX.xfdkh.cn
http://9EOvQAqD.xfdkh.cn
http://hWoUIXK2.xfdkh.cn
http://W8ynsyDW.xfdkh.cn
http://89AD4Yim.xfdkh.cn
http://DeTVyVXt.xfdkh.cn
http://AFWdgNV7.xfdkh.cn
http://f19APs4m.xfdkh.cn
http://80LF8Ghz.xfdkh.cn
http://HZikHX5S.xfdkh.cn
http://B6W6pFSY.xfdkh.cn
http://VTfR7jpl.xfdkh.cn
http://EBkpoAvB.xfdkh.cn
http://tP3JN96c.xfdkh.cn
http://xbzurU6U.xfdkh.cn
http://WAf9ZMME.xfdkh.cn
http://AQJFMvMZ.xfdkh.cn
http://pIZKjJlk.xfdkh.cn
http://www.dtcms.com/a/388508.html

相关文章:

  • 5 分钟将网站打包成 APP:高效实现方案
  • 物联网智能网关核心功能实现:解析西门子1500 PLC的MQTT通信配置全流程
  • 新国标电动自行车实施,BMS 静电浪涌风险与对策
  • 【Python】Python文件操作
  • C#如何使用ApiPost接口,将数据显示在unity面板
  • 零基础从头教学Linux(Day 36)
  • 深度学习(2)
  • 火山 17 声音回调
  • Flash芯片的VCC上电到可操作时间过长
  • CSP-S——各算法可以实现的问题
  • 第十七章 Arm C1-Premium性能监控单元(PMU)事件详解
  • vue锚点导航
  • 软件体系结构——后端三层架构
  • Nmap 端口扫描
  • 关于青春的沉浸式回忆录-《学生时代》评测
  • 深入理解虚拟 DOM(VDOM):原理、优势与应用
  • React 18笔记
  • 模块化演进史:从 IIFE / CommonJS / AMD 到 ES Modules(含 Tree Shaking 原理)
  • Python+PyQt构建自动化定时任务执行工具
  • 前端如何终止请求
  • Ubuntu 系统 MySQL 全面管理指南(认证、用户、密码、服务及安全)
  • 《UE5_C++多人TPS完整教程》学习笔记53 ——《P54 转身(Turning in Place)》
  • 【Cyansdk 插件详细介绍文档】
  • IDEA 如何打开eclipse项目
  • linux C++ opencv 绘制中文(源码编译opencv)
  • 线性回归到 Softmax 回归
  • Python实现剑龙优化算法 (Stegosaurus Optimization Algorithm, SOA)优化函数(付完整代码)
  • 微软开始在Win11上全屏打广告了,怎么关?
  • 深度学习-线性回归与 Softmax 回归
  • OpenCV:背景建模