Coze源码分析-资源库-创建工作流-后端源码-IDL/API/应用/领域层
Coze源码分析-资源库-创建工作流-后端源码
前言
本文档深入分析Coze平台中用户在资源库中创建工作流的后端实现机制。通过对源码的详细解读,帮助开发者理解工作流创建的完整技术架构、数据流转过程以及核心业务逻辑。
工作流作为Coze平台的核心功能之一,为用户提供了强大的自动化流程编排能力。用户可以通过可视化界面创建复杂的业务流程,系统会将这些流程转换为可执行的工作流实例。本文将从技术实现角度,全面剖析工作流创建功能的后端架构设计。
项目架构概览
整体架构设计
┌─────────────────────────────────────────────────────────────┐
│ IDL接口定义层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ base.thrift │ │workflow. │ │ api.thrift │ │
│ │ │ │thrift │ │ │ │
│ │ │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ API网关层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Workflow │ │ Hertz │ │ Middleware │ │
│ │ Handler │ │ Router │ │ 中间件 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 应用服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ WorkflowApplicationService │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │CreateWork │ │SaveWorkflow │ │UpdateWork │ │ │
│ │ │flow │ │ │ │flowMeta │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 领域服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ WorkflowService │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │Create │ │Save │ │ListNodeMeta │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 数据访问层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ WorkflowRepository │ │
│ │ ┌──── ─────── ──── ──┐ ┌─────────────────────────┐│ │
│ │ │WorkflowDAO │ │workflow.gen.go ││ │
│ │ │CreateMeta │ │workflow_version.gen.go ││ │
│ │ │CreateVersion │ │GORM Generated Code ││ │
│ │ └──── ─────── ─── ───┘ └─────────────────────────┘│ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 基础设施层 │
│ ┌─ ─ ─── ─ ── ── ─ ─ ─┐ │
│ │ gorm.DB │ │
│ │ es.Client redisImpl │ │
│ └── ─ ── ── ─ ── ── ─ ┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 存储服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ MySQL数据库 Redis数据库 │ │
│ │ ElasticSearch数据库 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
核心技术栈
- 后端框架: Go + Hertz (字节跳动开源的高性能HTTP框架)
- 数据库: MySQL 8.0 (主数据存储)
- 搜索引擎: ElasticSearch (工作流搜索和索引)
- 消息队列: Kafka (异步事件处理)
- 缓存: Redis (高频数据缓存)
- 工作流引擎: 自研工作流执行引擎
- API协议: gRPC (内部服务通信) + HTTP (外部API)
工作流创建核心模块
- 工作流定义管理: 处理工作流模板的创建、编辑和版本控制
- 节点配置服务: 管理工作流中各种节点的配置和参数
- 流程编排引擎: 负责工作流的逻辑编排和执行路径规划
- 权限控制系统: 确保工作流创建和访问的安全性
- 版本管理服务: 支持工作流的版本控制和回滚机制
1. IDL接口定义层
1.1 工作流创建相关IDL定义
文件位置: idl/workflow/workflow.thrift
该文件定义了工作流相关的Thrift IDL接口,由thriftgo工具自动生成Go代码到 backend/api/model/workflow/workflow.go
。
核心数据结构定义
// WorkflowMode is used to distinguish between Workflow and chatflow.
enum WorkflowMode {Workflow = 0 ,Imageflow = 1 ,SceneFlow = 2 ,ChatFlow = 3 ,All = 100 , // Use only when querying
}// 创建工作流请求结构
struct CreateWorkflowRequest {1 : required string name , // process name2 : required string desc , // Process description, not null3 : required string icon_uri , // Process icon uri, not nullable4 : required string space_id , // Space id, cannot be empty5 : optional WorkflowMode flow_mode , // Workflow or chatflow, the default is workflow6 : optional SchemaType schema_type ,7 : optional string bind_biz_id ,8 : optional i32 bind_biz_type, // Bind the business type, do not fill in if necessary. Refer to the BindBizType structure, when the value is 3, it represents the Douyin doppelganger.9 : optional string project_id , // Application id, when filled in, it means that the process is the process under the project, and it needs to be released with the project.10 : optional bool create_conversation, // Whether to create a session, only if flow_mode = chatflow255: optional base.Base Base ,
}// 创建工作流响应数据
struct CreateWorkflowData {1: string workflow_id, // 工作流ID2: string name, // 工作流名称3: string url, // 工作流URL4: WorkFlowStatus status, // 工作流状态5: SchemaType type, // 模式类型6: list<Node> node_list, // 节点列表7: optional string external_flow_info, // 外部流程信息
}// 创建工作流响应结构
struct CreateWorkflowResponse {1 : required CreateWorkflowData data,253: required i64 code,254: required string msg,255: required base.BaseResp BaseResp,
}
生成的Go代码结构
文件位置: backend/api/model/workflow/workflow.go
// CreateWorkflowRequest 创建工作流请求(由thriftgo生成)
type CreateWorkflowRequest struct {// process nameName string `thrift:"name,1,required" form:"name,required" json:"name,required" query:"name,required"`// Process description, not nullDesc string `thrift:"desc,2,required" form:"desc,required" json:"desc,required" query:"desc,required"`// Process icon uri, not nullableIconURI string `thrift:"icon_uri,3,required" form:"icon_uri,required" json:"icon_uri,required" query:"icon_uri,required"`// Space id, cannot be emptySpaceID string `thrift:"space_id,4,required" form:"space_id,required" json:"space_id,required" query:"space_id,required"`// Workflow or chatflow, the default is workflowFlowMode *WorkflowMode `thrift:"flow_mode,5,optional" form:"flow_mode" json:"flow_mode,omitempty" query:"flow_mode"`SchemaType *SchemaType `thrift:"schema_type,6,optional" form:"schema_type" json:"schema_type,omitempty" query:"schema_type"`BindBizID *string `thrift:"bind_biz_id,7,optional" form:"bind_biz_id" json:"bind_biz_id,omitempty" query:"bind_biz_id"`// Bind the business type, do not fill in if necessary. Refer to the BindBizType structure, when the value is 3, it represents the Douyin doppelganger.BindBizType *int32 `thrift:"bind_biz_type,8,optional" form:"bind_biz_type" json:"bind_biz_type,omitempty" query:"bind_biz_type"`// Application id, when filled in, it means that the process is the process under the project, and it needs to be released with the project.ProjectID *string `thrift:"project_id,9,optional" form:"project_id" json:"project_id,omitempty" query:"project_id"`// Whether to create a session, only if flow_mode = chatflowCreateConversation *bool `thrift:"create_conversation,10,optional" form:"create_conversation" json:"create_conversation,omitempty" query:"create_conversation"`Base *base.Base `thrift:"Base,255,optional" form:"Base" json:"Base,omitempty" query:"Base"`
}// WorkflowMode 工作流模式枚举
type WorkflowMode int64const (WorkflowMode_Workflow WorkflowMode = 0WorkflowMode_Imageflow WorkflowMode = 1WorkflowMode_SceneFlow WorkflowMode = 2WorkflowMode_ChatFlow WorkflowMode = 3WorkflowMode_All WorkflowMode = 100
)
1.2 工作流状态常量定义
// 工作流状态常量
const (WorkflowStatusDraft = 1 // 草稿状态WorkflowStatusPublished = 2 // 已发布WorkflowStatusArchived = 3 // 已归档WorkflowStatusDeleted = 4 // 已删除
)// 工作流节点类型常量
const (NodeTypeStart = "start" // 开始节点NodeTypeEnd = "end" // 结束节点NodeTypeCondition = "condition" // 条件节点NodeTypeAction = "action" // 动作节点NodeTypeLoop = "loop" // 循环节点NodeTypeParallel = "parallel" // 并行节点NodeTypeSubflow = "subflow" // 子流程节点
)// 工作流版本格式
const (VersionFormat = "v%d.%d.%d" // 版本号格式:v1.0.0
)
1.3 Thrift服务接口定义
文件位置: backend/api/model/workflow/workflow_svc.go
项目使用Apache Thrift作为IDL,而非gRPC。服务接口通过thriftgo工具生成。
// WorkflowService Thrift服务接口(由thriftgo生成)
type WorkflowService interface {// 创建工作流CreateWorkflow(ctx context.Context, request *CreateWorkflowRequest) (r *CreateWorkflowResponse, err error)// 获取画布信息GetCanvasInfo(ctx context.Context, request *GetCanvasInfoRequest) (r *GetCanvasInfoResponse, err error)// 保存工作流SaveWorkflow(ctx context.Context, request *SaveWorkflowRequest) (r *SaveWorkflowResponse, err error)// 更新工作流元数据UpdateWorkflowMeta(ctx context.Context, request *UpdateWorkflowMetaRequest) (r *UpdateWorkflowMetaResponse, err error)// 删除工作流DeleteWorkflow(ctx context.Context, request *DeleteWorkflowRequest) (r *DeleteWorkflowResponse, err error)// 发布工作流PublishWorkflow(ctx context.Context, request *PublishWorkflowRequest) (r *PublishWorkflowResponse, err error)// 复制工作流CopyWorkflow(ctx context.Context, request *CopyWorkflowRequest) (r *CopyWorkflowResponse, err error)// 获取工作流列表GetWorkFlowList(ctx context.Context, request *GetWorkFlowListRequest) (r *GetWorkFlowListResponse, err error)// 运行工作流(OpenAPI)OpenAPIRunFlow(ctx context.Context, request *OpenAPIRunFlowRequest) (r *OpenAPIRunFlowResponse, err error)// 流式运行工作流(OpenAPI)OpenAPIStreamRunFlow(ctx context.Context, request *OpenAPIRunFlowRequest) (r *OpenAPIStreamRunFlowResponse, err error)
}// WorkflowServiceClient Thrift客户端实现
type WorkflowServiceClient struct {c thrift.TClient
}// NewWorkflowServiceClient 创建工作流服务客户端
func NewWorkflowServiceClient(c thrift.TClient) *WorkflowServiceClient {return &WorkflowServiceClient{c: c,}
}
Thrift与HTTP的映射关系
项目通过Hertz框架将Thrift服务映射为HTTP接口,具体映射关系在处理器中通过注解定义:
// CreateWorkflow 创建工作流
// @router /api/workflow_api/create [POST]
func CreateWorkflow(ctx context.Context, c *app.RequestContext) {// 处理HTTP请求并调用Thrift服务
}
2. API网关层
2.1 HTTP路由定义
文件位置: backend/api/handler/coze/workflow_service.go
项目使用Hertz框架,通过注解方式定义路由,而非单独的路由注册文件。每个处理器函数通过 @router
注解指定对应的HTTP路径和方法。
工作流相关路由映射
// 工作流核心操作路由
// @router /api/workflow_api/create [POST] - 创建工作流
// @router /api/workflow_api/canvas [POST] - 获取画布信息
// @router /api/workflow_api/save [POST] - 保存工作流
// @router /api/workflow_api/update_meta [POST] - 更新工作流元数据
// @router /api/workflow_api/delete [POST] - 删除工作流
// @router /api/workflow_api/batch_delete [POST] - 批量删除工作流
// @router /api/workflow_api/publish [POST] - 发布工作流
// @router /api/workflow_api/copy [POST] - 复制工作流// 工作流查询路由
// @router /api/workflow_api/workflow_list [POST] - 获取工作流列表
// @router /api/workflow_api/released_workflows [POST] - 获取已发布工作流
// @router /api/workflow_api/get_process [GET] - 获取工作流进程信息// OpenAPI路由(对外接口)
// @router /v1/workflow/run [POST] - 运行工作流
// @router /v1/workflow/stream_run [POST] - 流式运行工作流
// @router /v1/workflow/stream_resume [POST] - 恢复流式工作流
// @router /v1/workflows/:workflow_id [GET] - 获取工作流信息
// @router /v1/workflows/chat [POST] - 工作流对话
// @router /v1/workflow/conversation/create [POST] - 创建工作流会话
路由注解说明
项目使用Hertz的代码生成工具,通过注解自动生成路由注册代码:
- 内部API路由: 以
/api/workflow_api/
开头,用于前端界面调用 - OpenAPI路由: 以
/v1/
开头,用于第三方开发者调用 - HTTP方法: 主要使用POST方法,部分查询接口使用GET方法
- 路径参数: 使用
:workflow_id
等占位符表示路径参数
2.2 工作流创建Handler实现
文件位置: backend/api/handler/coze/workflow_service.go
项目将所有工作流相关的处理器函数集中在一个文件中,使用包级别函数而非结构体方法。
package cozeimport ("context""github.com/cloudwego/hertz/pkg/app""github.com/cloudwego/hertz/pkg/protocol/consts""github.com/coze-dev/coze-studio/backend/api/model/workflow"appworkflow "github.com/coze-dev/coze-studio/backend/application/workflow"
)// CreateWorkflow 创建工作流
// @router /api/workflow_api/create [POST]
func CreateWorkflow(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.CreateWorkflowRequest// 绑定和验证请求参数err = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}// 调用应用服务层创建工作流resp, err := appworkflow.SVC.CreateWorkflow(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}// 返回成功响应c.JSON(consts.StatusOK, resp)
}// GetCanvasInfo 获取画布信息
// @router /api/workflow_api/canvas [POST]
func GetCanvasInfo(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.GetCanvasInfoRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}resp, err := appworkflow.SVC.GetCanvasInfo(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}// SaveWorkflow 保存工作流
// @router /api/workflow_api/save [POST]
func SaveWorkflow(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.SaveWorkflowRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}resp, err := appworkflow.SVC.SaveWorkflow(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}// UpdateWorkflowMeta 更新工作流元数据
// @router /api/workflow_api/update_meta [POST]
func UpdateWorkflowMeta(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.UpdateWorkflowMetaRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}resp, err := appworkflow.SVC.UpdateWorkflowMeta(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}// DeleteWorkflow 删除工作流
// @router /api/workflow_api/delete [POST]
func DeleteWorkflow(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.DeleteWorkflowRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}resp, err := appworkflow.SVC.DeleteWorkflow(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}
错误处理函数
项目使用统一的错误处理函数:
// invalidParamRequestResponse 参数错误响应
func invalidParamRequestResponse(c *app.RequestContext, msg string) {// 统一的参数错误处理逻辑
}// internalServerErrorResponse 内部服务器错误响应
func internalServerErrorResponse(ctx context.Context, c *app.RequestContext, err error) {// 统一的服务器错误处理逻辑
}
应用服务调用
处理器通过 appworkflow.SVC
全局变量调用应用服务层:
// appworkflow.SVC 是应用服务层的全局实例
resp, err := appworkflow.SVC.CreateWorkflow(ctx, &req)
2.3 请求参数验证
项目使用Hertz框架内置的参数验证机制,通过结构体标签进行验证,而非单独的验证器文件。
Thrift生成的验证标签
文件位置: backend/api/model/workflow/workflow.go
// CreateWorkflowRequest 结构体中的验证标签
type CreateWorkflowRequest struct {// process nameName string `thrift:"name,1,required" form:"name,required" json:"name,required" query:"name,required"`// Process description, not nullDesc string `thrift:"desc,2,required" form:"desc,required" json:"desc,required" query:"desc,required"`// Process icon uri, not nullableIconURI string `thrift:"icon_uri,3,required" form:"icon_uri,required" json:"icon_uri,required" query:"icon_uri,required"`// Space id, cannot be emptySpaceID string `thrift:"space_id,4,required" form:"space_id,required" json:"space_id,required" query:"space_id,required"`// Workflow or chatflow, the default is workflowFlowMode *WorkflowMode `thrift:"flow_mode,5,optional" form:"flow_mode" json:"flow_mode,omitempty" query:"flow_mode"`SchemaType *SchemaType `thrift:"schema_type,6,optional" form:"schema_type" json:"schema_type,omitempty" query:"schema_type"`BindBizID *string `thrift:"bind_biz_id,7,optional" form:"bind_biz_id" json:"bind_biz_id,omitempty" query:"bind_biz_id"`// Bind the business type, do not fill in if necessary. Refer to the BindBizType structure, when the value is 3, it represents the Douyin doppelganger.BindBizType *int32 `thrift:"bind_biz_type,8,optional" form:"bind_biz_type" json:"bind_biz_type,omitempty" query:"bind_biz_type"`// Application id, when filled in, it means that the process is the process under the project, and it needs to be released with the project.ProjectID *string `thrift:"project_id,9,optional" form:"project_id" json:"project_id,omitempty" query:"project_id"`// Whether to create a session, only if flow_mode = chatflowCreateConversation *bool `thrift:"create_conversation,10,optional" form:"create_conversation" json:"create_conversation,omitempty" query:"create_conversation"`Base *base.Base `thrift:"Base,255,optional" form:"Base" json:"Base,omitempty" query:"Base"`
}
Hertz框架验证机制
// 在处理器中使用 c.BindAndValidate() 进行参数绑定和验证
func CreateWorkflow(ctx context.Context, c *app.RequestContext) {var req workflow.CreateWorkflowRequest// BindAndValidate 会根据结构体标签自动进行参数验证err = c.BindAndValidate(&req)if err != nil {// 验证失败时返回错误信息invalidParamRequestResponse(c, err.Error())return}// 验证通过后继续处理业务逻辑// ...
}
业务层验证
复杂的业务逻辑验证在应用服务层进行:
文件位置: backend/application/workflow/workflow.go
func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (_ *workflow.CreateWorkflowResponse, err error,
) {// 获取用户IDuID := ctxutil.MustGetUIDFromCtx(ctx)// 解析和验证SpaceIDspaceID := mustParseInt64(req.GetSpaceID())// 检查用户空间权限if err := checkUserSpace(ctx, uID, spaceID); err != nil {return nil, err}// 其他业务逻辑验证...
}
验证错误处理
// invalidParamRequestResponse 统一处理参数验证错误
func invalidParamRequestResponse(c *app.RequestContext, msg string) {response := map[string]interface{}{"code": 400,"msg": "参数错误: " + msg,}c.JSON(consts.StatusBadRequest, response)
}
项目通过这种方式实现了分层验证:
- 结构体标签验证: 基础的字段类型和必填验证
- 业务逻辑验证: 复杂的业务规则验证
- 权限验证: 用户权限和资源访问验证
3. 应用服务层
WorkflowApplicationService初始化
文件位置:backend/application/workflow/workflow.go
WorkflowApplicationService是工作流应用服务层的核心组件,专门负责处理工作流资源的创建等业务逻辑,是连接API层和领域层的重要桥梁。在用户点击"资源库" → 接着点击右上角的"+"号 → 最后点击弹出菜单中的"工作流"的场景中,该服务承担着核心的创建业务处理职责。
服务结构定义
文件位置:backend/application/workflow/workflow.go
type ApplicationService struct {DomainSVC domainWorkflow.ServiceImageX imagex.ImageX // 图片服务代理,用于获取认证令牌TosClient storage.StorageIDGenerator idgen.IDGenerator
}var SVC = &ApplicationService{}func GetWorkflowDomainSVC() domainWorkflow.Service {return SVC.DomainSVC
}
服务结构特点:
- 领域服务集成:通过DomainSVC字段集成工作流领域服务,实现业务逻辑的分层处理
- 外部服务依赖:集成图片服务(ImageX)和对象存储服务(TosClient),支持工作流资源管理
- ID生成器:内置ID生成器,确保工作流资源的唯一标识
- 全局单例:通过SVC全局变量提供服务实例,便于其他组件调用
创建工作流核心实现
CreateWorkflow方法详解
文件位置:backend/application/workflow/workflow.go
当用户在资源库中点击右上角的"+"号,然后点击弹出菜单中的"工作流"时,前端会调用CreateWorkflow
方法来创建新的工作流资源。
func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (_ *workflow.CreateWorkflowResponse, err error,
) {defer func() {if panicErr := recover(); panicErr != nil {err = safego.NewPanicErr(panicErr, debug.Stack())}if err != nil {err = vo.WrapIfNeeded(errno.ErrWorkflowOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))}}()// 获取用户ID并验证空间权限uID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())if err := checkUserSpace(ctx, uID, spaceID); err != nil {return nil, err}// 处理ChatFlow模式的会话创建var createConversation boolif req.ProjectID != nil && req.IsSetFlowMode() && req.GetFlowMode() == workflow.WorkflowMode_ChatFlow && req.IsSetCreateConversation() && req.GetCreateConversation() {createConversation = true_, err := GetWorkflowDomainSVC().CreateDraftConversationTemplate(ctx, &vo.CreateConversationTemplateMeta{AppID: mustParseInt64(req.GetProjectID()),UserID: uID,SpaceID: spaceID,Name: req.Name,})if err != nil {return nil, err}}// 构建工作流元数据wf := &vo.MetaCreate{CreatorID: uID,SpaceID: spaceID,ContentType: workflow.WorkFlowType_User,Name: req.Name,Desc: req.Desc,IconURI: req.IconURI,AppID: parseInt64(req.ProjectID),Mode: ternary.IFElse(req.IsSetFlowMode(), req.GetFlowMode(), workflow.WorkflowMode_Workflow),InitCanvasSchema: vo.GetDefaultInitCanvasJsonSchema(i18n.GetLocale(ctx)),}// 为ChatFlow模式设置特殊的画布模式if req.IsSetFlowMode() && req.GetFlowMode() == workflow.WorkflowMode_ChatFlow {conversationName := req.Nameif !req.IsSetProjectID() || mustParseInt64(req.GetProjectID()) == 0 || !createConversation {conversationName = "Default"}wf.InitCanvasSchema = vo.GetDefaultInitCanvasJsonSchemaChat(i18n.GetLocale(ctx), conversationName)}// 调用领域服务创建工作流id, err := GetWorkflowDomainSVC().Create(ctx, wf)if err != nil {return nil, err}// 发布工作流资源事件err = PublishWorkflowResource(ctx, id, ptr.Of(int32(wf.Mode)), search.Created, &search.ResourceDocument{Name: &wf.Name,APPID: wf.AppID,SpaceID: &wf.SpaceID,OwnerID: &wf.CreatorID,PublishStatus: ptr.Of(resource.PublishStatus_UnPublished),CreateTimeMS: ptr.Of(time.Now().UnixMilli()),})if err != nil {return nil, vo.WrapError(errno.ErrNotifyWorkflowResourceChangeErr, err)}return &workflow.CreateWorkflowResponse{Data: &workflow.CreateWorkflowData{WorkflowID: strconv.FormatInt(id, 10),},}, nil
}
方法功能特点:
- 异常处理:使用defer机制捕获panic和包装错误,确保系统稳定性
- 权限验证:验证用户身份和空间访问权限,确保安全性
- 多模式支持:支持标准工作流和ChatFlow两种模式,ChatFlow模式会自动创建会话模板
- 国际化支持:根据用户语言环境初始化不同的画布模式
- 事件发布:创建完成后发布资源事件,支持搜索索引和其他下游处理
- 错误包装:完善的错误处理和包装机制,便于问题定位
其他核心应用服务方法
SaveWorkflow - 保存工作流
func (w *ApplicationService) SaveWorkflow(ctx context.Context, req *workflow.SaveWorkflowRequest) (_ *workflow.SaveWorkflowResponse, err error,
) {// 权限验证if err := checkUserSpace(ctx, ctxutil.MustGetUIDFromCtx(ctx), mustParseInt64(req.GetSpaceID())); err != nil {return nil, err}// 调用领域服务保存工作流if err := GetWorkflowDomainSVC().Save(ctx, mustParseInt64(req.WorkflowID), req.GetSchema()); err != nil {return nil, err}return &workflow.SaveWorkflowResponse{Data: &workflow.SaveWorkflowData{},}, nil
}
UpdateWorkflowMeta - 更新工作流元数据
func (w *ApplicationService) UpdateWorkflowMeta(ctx context.Context, req *workflow.UpdateWorkflowMetaRequest) (_ *workflow.UpdateWorkflowMetaResponse, err error,
) {// 权限验证if err := checkUserSpace(ctx, ctxutil.MustGetUIDFromCtx(ctx), mustParseInt64(req.GetSpaceID())); err != nil {return nil, err}workflowID := mustParseInt64(req.GetWorkflowID())// 更新元数据err = GetWorkflowDomainSVC().UpdateMeta(ctx, workflowID, &vo.MetaUpdate{Name: req.Name,Desc: req.Desc,IconURI: req.IconURI,WorkflowMode: req.FlowMode,})if err != nil {return nil, err}// 异步发布更新事件safego.Go(ctx, func() {err := PublishWorkflowResource(ctx, workflowID, nil, search.Updated, &search.ResourceDocument{Name: req.Name,UpdateTimeMS: ptr.Of(time.Now().UnixMilli()),})if err != nil {logs.CtxErrorf(ctx, "publish update workflow resource failed, workflowID: %d, err: %v", workflowID, err)}})return &workflow.UpdateWorkflowMetaResponse{}, nil
}
创建工作流业务流程
创建工作流的完整业务流程包括以下几个关键步骤:
- 身份验证:验证用户登录状态和空间访问权限
- 参数验证:验证工作流名称、描述、模式等必要参数
- 模式处理:根据工作流模式(Workflow/ChatFlow)进行特殊处理
- 画布初始化:根据用户语言环境和工作流模式初始化默认画布
- 执行创建操作:调用领域服务执行实际的创建操作
- 事件发布:发布创建事件,通知搜索引擎等下游系统
- 返回响应:返回创建操作的结果,包含新创建的工作流ID
创建操作的安全性保障:
- 权限控制:严格的用户身份验证和空间权限检查
- 数据验证:完整的参数验证机制,确保创建数据的有效性
- 异常处理:完善的panic捕获和错误处理机制
- 事件驱动:异步事件处理,确保相关数据的同步更新
- 资源隔离:通过SpaceID实现多租户资源隔离
4. 领域服务层
工作流领域服务层架构
工作流领域服务层是Coze Studio中处理工作流业务逻辑的核心层,负责工作流资源的创建、管理和业务规则实现。该层采用领域驱动设计(DDD)模式,将业务逻辑与数据访问分离,确保代码的可维护性和可扩展性。
工作流领域服务接口定义
文件位置:backend/domain/workflow/interface.go
工作流领域服务接口定义了工作流管理的核心业务能力,包括工作流资源的完整生命周期管理。
type Service interface {// 核心工作流管理方法ListNodeMeta(ctx context.Context, nodeTypes map[entity.NodeType]bool) (map[string][]*entity.NodeTypeMeta, []entity.Category, error)Create(ctx context.Context, meta *vo.MetaCreate) (int64, error)Save(ctx context.Context, id int64, schema string) errorGet(ctx context.Context, policy *vo.GetPolicy) (*entity.Workflow, error)MGet(ctx context.Context, policy *vo.MGetPolicy) ([]*entity.Workflow, int64, error)Delete(ctx context.Context, policy *vo.DeletePolicy) (ids []int64, err error)Publish(ctx context.Context, policy *vo.PublishPolicy) (err error)UpdateMeta(ctx context.Context, id int64, metaUpdate *vo.MetaUpdate) (err error)CopyWorkflow(ctx context.Context, workflowID int64, policy vo.CopyWorkflowPolicy) (*entity.Workflow, error)WorkflowSchemaCheck(ctx context.Context, wf *entity.Workflow, checks []workflow.CheckType) ([]*workflow.CheckResult, error)// 工作流验证和查询QueryNodeProperties(ctx context.Context, id int64) (map[string]*vo.NodeProperty, error)ValidateTree(ctx context.Context, id int64, validateConfig vo.ValidateTreeConfig) ([]*workflow.ValidateTreeInfo, error)GetWorkflowReference(ctx context.Context, id int64) (map[int64]*vo.Meta, error)// 工作流执行相关ExecutableAsTool// 应用相关工作流管理ReleaseApplicationWorkflows(ctx context.Context, appID int64, config *vo.ReleaseWorkflowConfig) ([]*vo.ValidateIssue, error)CopyWorkflowFromAppToLibrary(ctx context.Context, workflowID int64, appID int64, related vo.ExternalResourceRelated) (*entity.CopyWorkflowFromAppToLibraryResult, error)DuplicateWorkflowsByAppID(ctx context.Context, sourceAPPID, targetAppID int64, related vo.ExternalResourceRelated) ([]*entity.Workflow, error)GetWorkflowDependenceResource(ctx context.Context, workflowID int64) (*vo.DependenceResource, error)SyncRelatedWorkflowResources(ctx context.Context, appID int64, relatedWorkflows map[int64]entity.IDVersionPair, related vo.ExternalResourceRelated) error// ChatFlow相关ChatFlowRoleConversation// 其他功能BindConvRelatedInfo(ctx context.Context, convID int64, info entity.ConvRelatedInfo) errorGetConvRelatedInfo(ctx context.Context, convID int64) (*entity.ConvRelatedInfo, bool, func() error, error)Suggest(ctx context.Context, input *vo.SuggestInfo) ([]string, error)
}
核心接口功能:
- 工作流资源管理:创建、获取、更新、删除、复制工作流资源
- 创建操作核心:Create方法是创建工作流的核心业务接口
- 节点管理:ListNodeMeta方法提供工作流节点模板信息
- 画布管理:Save方法处理工作流画布数据的保存
- 验证功能:提供工作流结构验证和节点属性查询
- 执行支持:集成工作流执行引擎和工具化能力
- 应用集成:支持应用级别的工作流管理和同步
工作流领域服务实现
文件位置:backend/domain/workflow/service/service_impl.go
工作流领域服务的具体实现,负责处理工作流的核心业务逻辑。
type impl struct {repo workflow.Repository*asToolImpl*executableImpl*conversationImpl
}func NewWorkflowService(repo workflow.Repository) workflow.Service {return &impl{repo: repo,asToolImpl: &asToolImpl{repo: repo,},executableImpl: &executableImpl{repo: repo,},conversationImpl: &conversationImpl{repo: repo},}
}
核心业务方法实现
ListNodeMeta - 获取节点模板信息
func (i *impl) ListNodeMeta(_ context.Context, nodeTypes map[entity.NodeType]bool) (map[string][]*entity.NodeTypeMeta, []entity.Category, error) {// 初始化结果映射nodeMetaMap := make(map[string][]*entity.NodeTypeMeta)// 检查节点类型是否应该包含的辅助函数shouldInclude := func(meta *entity.NodeTypeMeta) bool {if meta.Disabled {return false}nodeType := meta.Keyif nodeTypes == nil || len(nodeTypes) == 0 {return true // 无过滤器,包含所有}_, ok := nodeTypes[nodeType]return ok}// 处理标准节点类型for _, meta := range entity.NodeTypeMetas {if shouldInclude(meta) {nodeMetaMap[meta.Category] = append(nodeMetaMap[meta.Category], meta)}}return nodeMetaMap, entity.Categories, nil
}
Create - 创建工作流
func (i *impl) Create(ctx context.Context, meta *vo.MetaCreate) (int64, error) {// 创建工作流元数据id, err := i.repo.CreateMeta(ctx, &vo.Meta{CreatorID: meta.CreatorID,SpaceID: meta.SpaceID,ContentType: meta.ContentType,Name: meta.Name,Desc: meta.Desc,IconURI: meta.IconURI,AppID: meta.AppID,Mode: meta.Mode,})if err != nil {return 0, err}// 保存初始化的画布信息到草稿if err = i.Save(ctx, id, meta.InitCanvasSchema); err != nil {return 0, err}return id, nil
}
Save - 保存工作流画布
func (i *impl) Save(ctx context.Context, id int64, schema string) (err error) {// 解析画布数据var draft vo.Canvasif err = sonic.UnmarshalString(schema, &draft); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}// 提取输入输出参数var inputParams, outputParams stringinputs, outputs := extractInputsAndOutputsNamedInfoList(&draft)if inputParams, err = sonic.MarshalString(inputs); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}if outputParams, err = sonic.MarshalString(outputs); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}// 计算测试运行成功状态testRunSuccess, err := i.calculateTestRunSuccess(ctx, &draft, id)if err != nil {return err}// 生成新的提交IDcommitID, err := i.repo.GenID(ctx)if err != nil {return vo.WrapError(errno.ErrIDGenError, err)}// 创建或更新草稿return i.repo.CreateOrUpdateDraft(ctx, id, &vo.DraftInfo{Canvas: schema,DraftMeta: &vo.DraftMeta{TestRunSuccess: testRunSuccess,Modified: true,},InputParamsStr: inputParams,OutputParamsStr: outputParams,CommitID: strconv.FormatInt(commitID, 10),})
}
工作流业务逻辑特点
- 组合模式:通过组合asToolImpl、executableImpl、conversationImpl等实现,提供完整的工作流功能
- 画布处理:专门处理工作流画布的序列化、反序列化和验证
- 参数提取:自动提取工作流的输入输出参数,便于后续执行
- 状态管理:维护工作流的测试运行状态和修改状态
- 版本控制:通过CommitID实现工作流的版本管理
- 错误处理:完善的错误包装和处理机制