Coze源码分析-资源库-创建工作流-后端源码-安全/错误/流程
8. 工作流创建安全和权限验证机制
8.1 工作流创建身份认证
JWT Token验证:
- 创建工作流的所有API请求都需要携带有效的JWT Token
- Token包含用户ID、工作空间权限等关键信息
- 通过中间件统一验证Token的有效性和完整性
// 工作流创建身份验证中间件
func WorkflowCreateAuthMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {token := ctx.GetHeader("Authorization")if token == nil {ctx.JSON(401, gin.H{"error": "创建工作流需要登录认证"})ctx.Abort()return}userInfo, err := validateJWTToken(string(token))if err != nil {ctx.JSON(401, gin.H{"error": "Token无效,无法创建工作流"})ctx.Abort()return}// 验证用户是否有创建工作流的权限if !userInfo.HasWorkflowCreatePermission {ctx.JSON(403, gin.H{"error": "用户无创建工作流权限"})ctx.Abort()return}ctx.Set("user_id", userInfo.UserID)ctx.Set("space_id", userInfo.SpaceID)ctx.Next()}
}
8.2 工作流创建工作空间权限控制
空间隔离机制:
- 每个用户只能在其所属工作空间中创建工作流
- 通过
space_id
字段实现工作流创建权限隔离 - 在工作流创建操作中强制验证空间权限
// 工作流创建工作空间权限验证 - 基于实际代码 backend/application/workflow/workflow.go
func checkUserSpace(ctx context.Context, uid int64, spaceID int64) error {spaces, err := crossuser.DefaultSVC().GetUserSpaceList(ctx, uid)if err != nil {return err}var match boolfor _, s := range spaces {if s.ID == spaceID {match = truebreak}}if !match {return fmt.Errorf("user %d does not have access to space %d", uid, spaceID)}return nil
}// 工作流创建权限验证应用
func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (_ *workflow.CreateWorkflowResponse, err error,
) {uID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())// 验证用户工作空间权限if err := checkUserSpace(ctx, uID, spaceID); err != nil {return nil, err}// 继续工作流创建逻辑...
}
8.3 工作流创建资源级权限验证
工作流创建用户权限验证:
- 验证用户是否具有工作流创建权限
- 检查用户在指定工作空间的操作权限
- 通过用户ID和工作空间ID进行权限验证
// 工作流创建权限验证 - 基于实际代码结构
func (w *ApplicationService) validateWorkflowCreatePermission(ctx context.Context, req *workflow.CreateWorkflowRequest) error {userID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())// 验证用户工作空间权限if err := checkUserSpace(ctx, userID, spaceID); err != nil {return vo.WrapError(errno.ErrAuthorizationRequired, err)}// 检查工作流名称是否重复if req.Name != "" {exists, err := w.checkWorkflowNameExists(ctx, spaceID, req.Name)if err != nil {return fmt.Errorf("检查工作流名称重复失败: %w", err)}if exists {return vo.WrapError(errno.ErrConversationNameIsDuplicated, fmt.Errorf("工作流名称 %s 已存在", req.Name))}}// 验证工作流模式参数if req.IsSetFlowMode() {if req.GetFlowMode() != workflow.WorkflowMode_Workflow && req.GetFlowMode() != workflow.WorkflowMode_ChatFlow {return vo.WrapError(errno.ErrInvalidParameter, errors.New("无效的工作流模式"))}}return nil
}// 检查工作流名称是否存在
func (w *ApplicationService) checkWorkflowNameExists(ctx context.Context, spaceID int64, name string) (bool, error) {// 查询同一工作空间下是否存在同名工作流workflows, err := GetWorkflowDomainSVC().List(ctx, &vo.ListPolicy{SpaceID: spaceID,Name: &name,Limit: 1,})if err != nil {return false, err}return len(workflows) > 0, nil
}
8.4 工作流创建API访问控制
创建请求频率限制:
- 实现基于用户的工作流创建频率限制
- 防止恶意批量创建工作流
- 支持不同用户等级的差异化创建限流策略
创建操作安全验证:
- 严格验证创建请求的合法性
- 防止恶意创建和资源滥用攻击
- 使用多重安全检查机制
- 确保工作流数据的完整性和一致性
// 工作流创建参数验证
func validateWorkflowCreateRequest(req *workflow.CreateWorkflowRequest) error {if req.GetSpaceID() == "" {return vo.WrapError(errno.ErrInvalidParameter, errors.New("工作空间ID不能为空"))}spaceID, err := strconv.ParseInt(req.GetSpaceID(), 10, 64)if err != nil || spaceID <= 0 {return vo.WrapError(errno.ErrInvalidParameter, errors.New("无效的工作空间ID"))}// 验证工作流名称if req.Name == "" {return vo.WrapError(errno.ErrMissingRequiredParam, errors.New("工作流名称不能为空"))}if len(req.Name) > 100 {return vo.WrapError(errno.ErrInvalidParameter, errors.New("工作流名称长度不能超过100字符"))}// 验证工作流描述if len(req.Desc) > 2000 {return vo.WrapError(errno.ErrInvalidParameter, errors.New("工作流描述长度不能超过2000字符"))}// 验证图标URIif req.IconURI != "" {if !isValidIconURI(req.IconURI) {return vo.WrapError(errno.ErrInvalidParameter, errors.New("无效的图标URI格式"))}}// 验证工作流模式if req.IsSetFlowMode() {if !isValidWorkflowMode(req.GetFlowMode()) {return vo.WrapError(errno.ErrInvalidParameter, errors.New("无效的工作流模式"))}}return nil
}// 插件创建操作安全检查
func (s *PluginApplicationService) validatePluginCreateSafety(ctx context.Context, req *service.CreateDraftPluginRequest) error {userID := ctx.Value("user_id").(int64)// 检查用户插件创建频率限制createCount, err := s.getUserPluginCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))if err != nil {return fmt.Errorf("检查插件创建频率失败: %w", err)}if createCount >= 10 { // 24小时内最多创建10个插件return errorx.New(errno.ErrPluginCreateRateLimitCode, errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 检查服务器URL是否可访问accessible, err := s.checkServerURLAccessible(ctx, req.ServerURL)if err != nil {return fmt.Errorf("检查服务器URL可访问性失败: %w", err)}if !accessible {return errors.New("服务器URL不可访问")}// 检查用户存储配额storageUsed, err := s.getUserStorageUsage(ctx, userID)if err != nil {return fmt.Errorf("检查用户存储配额失败: %w", err)}if storageUsed >= s.getMaxStorageQuota(userID) {return errors.New("用户存储配额已满,无法创建新插件")}return nil
}// 检查服务器URL可访问性
func (s *PluginApplicationService) checkServerURLAccessible(ctx context.Context, serverURL string) (bool, error) {// 创建HTTP客户端,设置超时时间client := &http.Client{Timeout: 10 * time.Second,}// 发送HEAD请求检查URL可访问性resp, err := client.Head(serverURL)if err != nil {logs.CtxWarnf(ctx, "Server URL not accessible: %s, error: %v", serverURL, err)return false, nil}defer resp.Body.Close()// 检查响应状态码if resp.StatusCode >= 200 && resp.StatusCode < 400 {return true, nil}logs.CtxWarnf(ctx, "Server URL returned non-success status: %s, status: %d", serverURL, resp.StatusCode)return false, nil
}// 获取用户存储使用量
func (s *PluginApplicationService) getUserStorageUsage(ctx context.Context, userID int64) (int64, error) {// 查询用户所有插件的存储使用量plugins, err := s.DomainSVC.ListUserPlugins(ctx, userID)if err != nil {return 0, fmt.Errorf("获取用户插件列表失败: %w", err)}var totalSize int64for _, plugin := range plugins {// 计算插件manifest和openapi_doc的存储大小if plugin.Manifest != nil {totalSize += int64(len(plugin.Manifest))}if plugin.OpenapiDoc != nil {totalSize += int64(len(plugin.OpenapiDoc))}}return totalSize, nil
}// 获取用户最大存储配额
func (s *PluginApplicationService) getMaxStorageQuota(userID int64) int64 {// 根据用户等级返回不同的存储配额// 这里简化处理,实际应该从用户配置中获取return 100 * 1024 * 1024 // 100MB
}// URL格式验证
func isValidURL(urlStr string) bool {u, err := url.Parse(urlStr)return err == nil && u.Scheme != "" && u.Host != ""
}// 插件类型验证
func isValidPluginType(pluginType common.PluginType) bool {validTypes := []common.PluginType{common.PluginTypeHTTP,common.PluginTypeLocal,}for _, validType := range validTypes {if pluginType == validType {return true}}return false
}
9. 工作流创建错误处理和日志记录
9.1 工作流创建分层错误处理机制
工作流创建错误分类体系 - 基于实际错误码定义 backend/types/errno/workflow.go:
// 工作流创建错误类型定义
const (// 工作流创建业务错误ErrWorkflowNotPublished = 720702011ErrMissingRequiredParam = 720702002ErrInvalidParameter = 720702001ErrWorkflowNotFound = 720702004ErrConversationNameIsDuplicated = 720702200ErrConversationOfAppNotFound = 720702201ErrConversationNodeInvalidOperation = 720702250ErrOnlyDefaultConversationAllowInAgentScenario = 720712033ErrConversationNodesNotAvailable = 702093204ErrInvalidVersionName = 777777769ErrAuthorizationRequired = 777777765ErrInputFieldMissing = 777777763ErrConversationNotFoundForOperation = 777777762// 工作流创建系统错误ErrDatabaseError = 720700801ErrRedisError = 720700803ErrIDGenError = 720700808ErrWorkflowExecuteFail = 720701013ErrSerializationDeserializationFail = 720701011ErrInternalBadRequest = 720701007ErrSchemaConversionFail = 720702089ErrWorkflowCompileFail = 720701003ErrNotifyWorkflowResourceChangeErr = 777777770ErrWorkflowSnapshotNotFound = 777777771ErrCreateNodeFail = 777777772// 工作流创建执行错误ErrWorkflowTimeout = 720702085ErrInterruptNotSupported = 720702078ErrWorkflowCanceledByUser = 777777777ErrNodeTimeout = 777777776ErrWorkflowOperationFail = 777777775ErrArrIndexOutOfRange = 720712014ErrIndexingNilArray = 777777774ErrLLMStructuredOutputParseFail = 777777773
)
工作流创建错误处理流程:
- 捕获阶段:在工作流创建各层级捕获具体错误
- 包装阶段:使用vo.WrapError添加工作流创建操作相关上下文信息和错误码
- 记录阶段:根据错误级别记录工作流创建操作日志
- 响应阶段:返回用户友好的工作流创建错误信息
- 回滚阶段:工作流创建失败时进行必要的数据回滚操作
- 恢复机制:使用defer和recover处理panic异常
- 重试机制:对于可重试的创建错误提供重试建议
- 用户指导:为常见创建错误提供解决方案指导
9.2 工作流创建统一错误响应格式
// 工作流创建错误响应结构
type WorkflowCreateErrorResponse struct {Code int `json:"code"`Message string `json:"message"`Details string `json:"details,omitempty"`TraceID string `json:"trace_id"`WorkflowID int64 `json:"workflow_id,omitempty"`Operation string `json:"operation"`CanRetry bool `json:"can_retry"`ValidationErrors []string `json:"validation_errors,omitempty"`SuggestedFix string `json:"suggested_fix,omitempty"`FieldErrors map[string]string `json:"field_errors,omitempty"`SpaceID int64 `json:"space_id,omitempty"`UserID int64 `json:"user_id,omitempty"`
}// 工作流创建错误处理中间件 - 基于实际代码结构
func WorkflowCreateErrorHandlerMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {defer func() {if panicErr := recover(); panicErr != nil {traceID := ctx.GetString("trace_id")userID := ctx.GetInt64("user_id")spaceID := ctx.GetInt64("space_id")logs.CtxErrorf(c, "Workflow creation panic recovered: %v, userID=%d, spaceID=%d, traceID=%s", panicErr, userID, spaceID, traceID)// 使用safego.NewPanicErr包装panic错误err := safego.NewPanicErr(panicErr, debug.Stack())wrappedErr := vo.WrapIfNeeded(errno.ErrWorkflowOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))ctx.JSON(500, WorkflowCreateErrorResponse{Code: 5000,Message: "工作流创建服务器内部错误",TraceID: traceID,Operation: "create_workflow",CanRetry: true,SuggestedFix: "请稍后重试,如果问题持续存在请联系技术支持",UserID: userID,SpaceID: spaceID,})}}()ctx.Next()}
}// 工作流创建业务错误处理 - 基于实际错误码
func handleWorkflowCreateBusinessError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response WorkflowCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_workflow"switch {case errors.Is(err, errno.ErrInvalidParameter):response.Code = 400response.Message = "工作流参数无效"response.CanRetry = falseresponse.SuggestedFix = "请检查工作流名称、描述、工作空间ID等参数是否正确"case errors.Is(err, errno.ErrAuthorizationRequired):response.Code = 403response.Message = "无权限创建工作流"response.CanRetry = falseresponse.SuggestedFix = "请确保已登录且具有工作流创建权限"case errors.Is(err, errno.ErrConversationNameIsDuplicated):response.Code = 409response.Message = "工作流名称已存在"response.CanRetry = falseresponse.SuggestedFix = "请使用不同的工作流名称或检查是否已存在同名工作流"case errors.Is(err, errno.ErrWorkflowNotFound):response.Code = 404response.Message = "工作流不存在"response.CanRetry = falseresponse.SuggestedFix = "请检查工作流ID是否正确"case errors.Is(err, errno.ErrMissingRequiredParam):response.Code = 400response.Message = "缺少必需参数"response.CanRetry = falseresponse.SuggestedFix = "请检查所有必需参数是否已提供"case errors.Is(err, errno.ErrInvalidVersionName):response.Code = 400response.Message = "工作流版本名称无效"response.CanRetry = falseresponse.SuggestedFix = "请使用有效的版本名称格式"case errors.Is(err, errno.ErrInputFieldMissing):response.Code = 400response.Message = "输入字段缺失"response.CanRetry = falseresponse.SuggestedFix = "请检查所有必需的输入字段是否已提供"default:response.Code = 500response.Message = "工作流创建失败"response.CanRetry = trueresponse.SuggestedFix = "请稍后重试,如果问题持续存在请联系技术支持"}ctx.JSON(response.Code, response)
}// 工作流创建系统错误处理 - 基于实际错误码
func handleWorkflowCreateSystemError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response WorkflowCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_workflow"switch {case errors.Is(err, errno.ErrDatabaseError):response.Code = 500response.Message = "工作流数据库操作失败"response.CanRetry = trueresponse.SuggestedFix = "数据库连接异常,请稍后重试"case errors.Is(err, errno.ErrRedisError):response.Code = 500response.Message = "工作流缓存操作失败"response.CanRetry = trueresponse.SuggestedFix = "缓存服务异常,请稍后重试"case errors.Is(err, errno.ErrIDGenError):response.Code = 500response.Message = "工作流ID生成失败"response.CanRetry = trueresponse.SuggestedFix = "ID生成服务异常,请稍后重试"case errors.Is(err, errno.ErrNotifyWorkflowResourceChangeErr):response.Code = 500response.Message = "工作流资源变更通知失败"response.CanRetry = trueresponse.SuggestedFix = "资源变更通知异常,工作流已创建但可能影响搜索"case errors.Is(err, errno.ErrWorkflowCompileFail):response.Code = 500response.Message = "工作流编译失败"response.CanRetry = falseresponse.SuggestedFix = "工作流编译异常,请联系技术支持"case errors.Is(err, errno.ErrSerializationDeserializationFail):response.Code = 500response.Message = "工作流数据序列化失败"response.CanRetry = falseresponse.SuggestedFix = "数据序列化异常,请联系技术支持"case errors.Is(err, errno.ErrSchemaConversionFail):response.Code = 500response.Message = "工作流模式转换失败"response.CanRetry = falseresponse.SuggestedFix = "模式转换异常,请联系技术支持"case errors.Is(err, errno.ErrCreateNodeFail):response.Code = 500response.Message = "工作流节点创建失败"response.CanRetry = trueresponse.SuggestedFix = "节点创建异常,请稍后重试"default:response.Code = 5000response.Message = "工作流创建失败"response.Details = "服务器内部错误,请稍后重试"response.CanRetry = trueresponse.SuggestedFix = "系统内部错误,请稍后重试或联系技术支持"}ctx.JSON(response.Code, response)
}
9.3 工作流创建日志记录策略
工作流创建日志级别定义:
- DEBUG:工作流创建详细调试信息,包括参数值、中间结果、数据转换过程
- INFO:工作流创建关键业务流程信息,如创建开始、参数验证、数据插入、资源发布
- WARN:工作流创建潜在问题警告,如参数格式警告、性能警告、资源使用警告
- ERROR:工作流创建错误信息,包括创建失败、权限错误、数据验证失败
- FATAL:工作流创建严重错误,可能导致数据不一致或服务不可用
工作流创建结构化日志格式:
// 工作流创建日志记录示例 - 基于实际代码结构
func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (*workflow.CreateWorkflowResponse, error) {defer func() {if panicErr := recover(); panicErr != nil {err := safego.NewPanicErr(panicErr, debug.Stack())logs.CtxErrorf(ctx, "CreateWorkflow panic recovered: %v", err)}}()uID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())// 记录工作流创建开始logs.CtxInfof(ctx, "CreateWorkflow started, userID=%d, workflowName=%s, spaceID=%d", uID, req.Name, spaceID)startTime := time.Now()defer func() {duration := time.Since(startTime)logs.CtxInfof(ctx, "CreateWorkflow completed, duration=%dms, userID=%d", duration.Milliseconds(), uID)}()// 权限验证日志logs.CtxInfof(ctx, "Validating workflow create permission, userID=%d, spaceID=%d", uID, spaceID)if err := checkUserSpace(ctx, uID, spaceID); err != nil {logs.CtxErrorf(ctx, "Workflow create permission denied, userID=%d, spaceID=%d, error=%v", uID, spaceID, err)return nil, err}// 参数验证日志logs.CtxInfof(ctx, "Validating workflow parameters, name=%s, desc=%s, mode=%v", req.Name, ptr.SafeDeref(req.Desc), req.GetFlowMode())// 数据库创建操作日志logs.CtxInfof(ctx, "Creating workflow in database, workflowName=%s, userID=%d", req.Name, uID)id, err := GetWorkflowDomainSVC().Create(ctx, &vo.MetaCreate{CreatorID: uID,SpaceID: spaceID,Name: req.Name,Desc: req.Desc,IconURI: req.IconURI,// ... 其他字段})if err != nil {logs.CtxErrorf(ctx, "Failed to create workflow in database, error=%v", err)return nil, err}// 资源发布日志logs.CtxInfof(ctx, "Publishing workflow resource event, workflowID=%d", id)return &workflow.CreateWorkflowResponse{Data: &workflow.CreateWorkflowData{WorkflowID: strconv.FormatInt(id, 10),},}, nil
}// 工作流创建操作审计日志
func (w *ApplicationService) logWorkflowCreateAudit(ctx context.Context, operation string, workflowID int64, details map[string]interface{}) {userID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := details["space_id"].(int64)auditLog := map[string]interface{}{"operation": operation,"workflow_id": workflowID,"user_id": userID,"space_id": spaceID,"timestamp": time.Now().Unix(),"details": details,"workflow_name": details["workflow_name"],"workflow_mode": details["workflow_mode"],"icon_uri": details["icon_uri"],"app_id": details["app_id"],}logs.CtxInfof(ctx, "Workflow create audit log: %+v", auditLog)
}
工作流创建日志内容规范:
- 请求日志:记录用户ID、工作空间ID、工作流名称、工作流模式、应用ID
- 业务日志:记录工作流创建步骤、参数验证结果、权限验证结果、数据构建过程
- 性能日志:记录创建接口响应时间、数据库插入时间、资源发布时间、ID生成时间
- 错误日志:记录创建错误堆栈、工作流相关上下文信息、失败原因分析
- 审计日志:记录工作流的创建操作、创建参数、创建结果、关联的应用信息
- 安全日志:记录创建频率、权限验证、参数验证、可疑创建行为
9.4 工作流创建监控和告警
工作流创建关键指标监控:
- 创建性能:工作流创建响应时间、创建成功率、创建QPS、创建吞吐量
- 资源使用:工作流数据库连接数、缓存操作延迟、内存使用率、ID生成延迟
- 业务指标:工作流创建成功率、创建频率分布、不同模式工作流创建比例、用户创建活跃度
- 安全指标:权限验证通过率、恶意创建尝试次数、参数验证失败率、工作空间访问控制
- 质量指标:工作流参数验证通过率、模式转换成功率、资源发布成功率
工作流创建告警策略:
- 创建失败率告警:当工作流创建失败率超过5%时触发告警
- 性能告警:当工作流创建响应时间超过3秒时触发告警
- 资源告警:当工作流数据库连接数超过80%时触发告警
- 安全告警:当检测到异常创建行为时立即触发告警
- 数据一致性告警:当数据库和缓存创建状态不一致时触发告警
- 系统告警:当ID生成服务或资源发布服务异常时触发告警
// 工作流创建监控指标收集
type WorkflowCreateMetrics struct {CreateSuccessCount int64 // 创建成功次数CreateFailureCount int64 // 创建失败次数CreateLatency time.Duration // 创建延迟PermissionDeniedCount int64 // 权限拒绝次数ParameterValidationFailCount int64 // 参数验证失败次数IDGenerationLatency time.Duration // ID生成延迟DatabaseInsertLatency time.Duration // 数据库插入延迟ResourcePublishLatency time.Duration // 资源发布延迟CacheOperationLatency time.Duration // 缓存操作延迟WorkflowModeDistribution map[string]int64 // 工作流模式分布SpaceAccessDeniedCount int64 // 工作空间访问拒绝次数SchemaConversionFailCount int64 // 模式转换失败次数NodeCreateFailCount int64 // 节点创建失败次数CompileFailCount int64 // 编译失败次数
}// 工作流创建监控指标上报
func (w *ApplicationService) reportCreateMetrics(ctx context.Context, operation string, startTime time.Time, workflowID int64, req *workflow.CreateWorkflowRequest, err error) {latency := time.Since(startTime)if err != nil {metrics.CreateFailureCount++// 根据错误类型分类统计switch {case errors.Is(err, errno.ErrAuthorizationRequired):metrics.PermissionDeniedCount++case errors.Is(err, errno.ErrInvalidParameter):metrics.ParameterValidationFailCount++case errors.Is(err, errno.ErrSchemaConversionFail):metrics.SchemaConversionFailCount++case errors.Is(err, errno.ErrCreateNodeFail):metrics.NodeCreateFailCount++case errors.Is(err, errno.ErrWorkflowCompileFail):metrics.CompileFailCount++}logs.CtxErrorf(ctx, "Workflow %s failed, workflowName=%s, spaceID=%s, error=%v, latency=%dms", operation, req.Name, req.GetSpaceID(), err, latency.Milliseconds())} else {metrics.CreateSuccessCount++metrics.CreateLatency = latency// 记录工作流模式统计workflowMode := "workflow"if req.IsSetFlowMode() {workflowMode = req.GetFlowMode().String()}metrics.WorkflowModeDistribution[workflowMode]++logs.CtxInfof(ctx, "Workflow %s succeeded, workflowID=%d, workflowName=%s, workflowMode=%s, latency=%dms", operation, workflowID, req.Name, workflowMode, latency.Milliseconds())}// 上报到监控系统w.metricsReporter.Report(ctx, "workflow_create", map[string]interface{}{"operation": operation,"workflow_id": workflowID,"workflow_name": req.Name,"workflow_mode": req.GetFlowMode().String(),"space_id": req.GetSpaceID(),"success": err == nil,"latency_ms": latency.Milliseconds(),"error_type": getWorkflowCreateErrorType(err),})
}// 获取工作流创建错误类型
func getWorkflowCreateErrorType(err error) string {if err == nil {return "none"}// 基于真实错误码定义:backend/types/errno/workflow.goswitch {case errors.Is(err, errno.ErrAuthorizationRequired):return "authorization_required"case errors.Is(err, errno.ErrInvalidParameter):return "invalid_parameter"case errors.Is(err, errno.ErrConversationNameIsDuplicated):return "workflow_name_duplicated"case errors.Is(err, errno.ErrWorkflowNotFound):return "workflow_not_found"case errors.Is(err, errno.ErrMissingRequiredParam):return "missing_required_param"case errors.Is(err, errno.ErrInvalidVersionName):return "invalid_version_name"case errors.Is(err, errno.ErrInputFieldMissing):return "input_field_missing"case errors.Is(err, errno.ErrDatabaseError):return "database_error"case errors.Is(err, errno.ErrRedisError):return "redis_error"case errors.Is(err, errno.ErrIDGenError):return "id_generation_error"case errors.Is(err, errno.ErrWorkflowCompileFail):return "workflow_compile_fail"case errors.Is(err, errno.ErrSchemaConversionFail):return "schema_conversion_fail"case errors.Is(err, errno.ErrCreateNodeFail):return "create_node_fail"case errors.Is(err, errno.ErrNotifyWorkflowResourceChangeErr):return "resource_change_notify_fail"default:return "system_error"}
}
10. 工作流创建流程图
10.1 CreateWorkflow接口完整调用流程
用户登录 Coze 平台点击"资源库" → 点击右上角"+"号 → 点击"工作流"场景的后端处理流程:
用户点击"工作流" → 前端发起请求 → API网关路由 → Handler处理 → 业务服务层 → 数据持久化层 → 索引创建层 → 响应返回↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
前端填写表单 HTTP POST请求 路由匹配 参数验证 权限检查 MySQL插入 ES索引创建 JSON响应↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
/create-workflow /api/workflow_api/ Handler 请求绑定 用户身份 workflow_ 事件发布 创建结果create 函数调用 参数校验 Session meta/draft 异步处理 状态返回CreateWorkflow 验证 表插入 ES索引 ↓WorkflowApplicationService↓身份验证(登录检查)↓权限验证(空间权限检查)↓参数验证(名称、描述、模式等)↓工作流模式验证↓数据库创建事务↓创建事件发布↓返回创建结果
10.2 工作流创建详细流程说明
1. API网关层(路由处理)
文件位置:backend/api/handler/coze/workflow_service.go
// @router /api/workflow_api/create [POST]
func CreateWorkflow(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.CreateWorkflowRequest// 1. 请求参数绑定和验证err = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}// 2. 工作流创建参数校验if req.GetName() == "" {invalidParamRequestResponse(c, "workflow name is invalid")return}if req.GetSpaceID() == "" {invalidParamRequestResponse(c, "spaceID is invalid")return}// 3. 调用工作流创建服务resp, err := appworkflow.SVC.CreateWorkflow(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}// 4. 返回JSON响应c.JSON(consts.StatusOK, resp)
}
处理步骤:
- 路由匹配:
POST /api/workflow_api/create
- 参数绑定:将HTTP请求体绑定到
CreateWorkflowRequest
结构体 - 参数验证:验证工作流名称、空间ID、工作流模式等参数的有效性
- 服务调用:调用工作流服务的
CreateWorkflow
方法 - 响应返回:返回JSON格式的创建结果
2. 业务服务层(WorkflowApplicationService)
文件位置:backend/application/workflow/workflow.go
func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (_ *workflow.CreateWorkflowResponse, err error,
) {defer func() {if panicErr := recover(); panicErr != nil {err = safego.NewPanicErr(panicErr, debug.Stack())}if err != nil {err = vo.WrapIfNeeded(errno.ErrWorkflowOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))}}()// 1. 用户身份验证uID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())// 2. 验证用户空间权限if err := checkUserSpace(ctx, uID, spaceID); err != nil {return nil, err}// 3. 处理会话流模式的特殊逻辑var createConversation boolif req.ProjectID != nil && req.IsSetFlowMode() && req.GetFlowMode() == workflow.WorkflowMode_ChatFlow && req.IsSetCreateConversation() && req.GetCreateConversation() {createConversation = true_, err := GetWorkflowDomainSVC().CreateDraftConversationTemplate(ctx, &vo.CreateConversationTemplateMeta{AppID: mustParseInt64(req.GetProjectID()),UserID: uID,SpaceID: spaceID,Name: req.Name,})if err != nil {return nil, err}}// 4. 构建工作流创建请求wf := &vo.MetaCreate{CreatorID: uID,SpaceID: spaceID,ContentType: workflow.WorkFlowType_User,Name: req.Name,Desc: req.Desc,IconURI: req.IconURI,AppID: parseInt64(req.ProjectID),Mode: ternary.IFElse(req.IsSetFlowMode(), req.GetFlowMode(), workflow.WorkflowMode_Workflow),InitCanvasSchema: vo.GetDefaultInitCanvasJsonSchema(i18n.GetLocale(ctx)),}// 5. 设置会话流的初始画布if req.IsSetFlowMode() && req.GetFlowMode() == workflow.WorkflowMode_ChatFlow {conversationName := req.Nameif !req.IsSetProjectID() || mustParseInt64(req.GetProjectID()) == 0 || !createConversation {conversationName = "Default"}wf.InitCanvasSchema = vo.GetDefaultInitCanvasJsonSchemaChat(i18n.GetLocale(ctx), conversationName)}// 6. 执行创建操作id, err := GetWorkflowDomainSVC().Create(ctx, wf)if err != nil {return nil, err}// 7. 发布创建事件err = PublishWorkflowResource(ctx, id, ptr.Of(int32(wf.Mode)), search.Created, &search.ResourceDocument{Name: &wf.Name,APPID: wf.AppID,SpaceID: &wf.SpaceID,OwnerID: &wf.CreatorID,PublishStatus: ptr.Of(resource.PublishStatus_UnPublished),CreateTimeMS: ptr.Of(time.Now().UnixMilli()),})if err != nil {return nil, vo.WrapError(errno.ErrNotifyWorkflowResourceChangeErr, err)}// 8. 返回创建结果return &workflow.CreateWorkflowResponse{Data: &workflow.CreateWorkflowData{WorkflowID: strconv.FormatInt(id, 10),},}, nil
}
核心功能:
- 身份验证:从上下文中提取用户ID,验证用户登录状态
- 权限验证:验证用户是否具有指定空间的工作流创建权限
- 模式处理:处理不同工作流模式(标准工作流、会话流)的特殊逻辑
- 数据构建:构建完整的工作流创建请求数据结构
- 画布初始化:根据工作流模式初始化默认画布结构
- 数据创建:调用领域服务在数据库中创建工作流记录
- 事件发布:发布工作流创建事件用于异步索引建立
- 响应组装:构建标准化的创建响应数据结构
3. 领域服务层(工作流创建领域服务)
文件位置:backend/domain/workflow/service/service_impl.go
核心功能:
- 元数据创建:创建工作流的基础元数据信息
- 画布保存:保存工作流的初始画布结构
- 参数提取:提取工作流的输入输出参数信息
- 数据持久化:将工作流数据保存到数据库中
// 工作流创建核心方法
func (i *impl) Create(ctx context.Context, meta *vo.MetaCreate) (int64, error) {// 1. 创建工作流元数据id, err := i.repo.CreateMeta(ctx, &vo.Meta{CreatorID: meta.CreatorID,SpaceID: meta.SpaceID,ContentType: meta.ContentType,Name: meta.Name,Desc: meta.Desc,IconURI: meta.IconURI,AppID: meta.AppID,Mode: meta.Mode,})if err != nil {return 0, err}// 2. 保存初始化画布信息到草稿if err = i.Save(ctx, id, meta.InitCanvasSchema); err != nil {return 0, err}return id, nil
}// 保存工作流画布
func (i *impl) Save(ctx context.Context, id int64, schema string) (err error) {var draft vo.Canvasif err = sonic.UnmarshalString(schema, &draft); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}// 1. 提取输入输出参数var inputParams, outputParams stringinputs, outputs := extractInputsAndOutputsNamedInfoList(&draft)if inputParams, err = sonic.MarshalString(inputs); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}if outputParams, err = sonic.MarshalString(outputs); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}// 2. 计算测试运行状态testRunSuccess, err := i.calculateTestRunSuccess(ctx, &draft, id)if err != nil {return err}// 3. 生成提交IDcommitID, err := i.repo.GenID(ctx)if err != nil {return vo.WrapError(errno.ErrIDGenError, err)}// 4. 创建或更新草稿return i.repo.CreateOrUpdateDraft(ctx, id, &vo.DraftInfo{Canvas: schema,DraftMeta: &vo.DraftMeta{TestRunSuccess: testRunSuccess,Modified: true,},InputParamsStr: inputParams,OutputParamsStr: outputParams,CommitID: strconv.FormatInt(commitID, 10),})
}// 用户空间权限验证
func checkUserSpace(ctx context.Context, uid int64, spaceID int64) error {spaces, err := crossuser.DefaultSVC().GetUserSpaceList(ctx, uid)if err != nil {return err}var match boolfor _, s := range spaces {if s.ID == spaceID {match = truebreak}}if !match {return fmt.Errorf("user %d does not have access to space %d", uid, spaceID)}return nil}
4. 数据持久化层
MySQL数据库操作:
- 表名:
workflow_meta
、workflow_draft
- 操作类型:INSERT操作
- 事务处理:确保数据一致性
- 创建策略:先创建元数据,再保存草稿内容
- 索引建立:为新创建的工作流建立搜索索引
// 工作流元数据创建
func (r *WorkflowRepository) CreateMeta(ctx context.Context, meta *vo.Meta) (int64, error) {// 生成工作流IDid, err := r.idgen.GenID(ctx)if err != nil {return 0, fmt.Errorf("生成工作流ID失败: %w", err)}// 插入工作流元数据记录query := `INSERT INTO workflow_meta (id, creator_id, space_id, content_type, name, description,icon_uri, app_id, mode, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`now := time.Now()_, err = r.db.ExecContext(ctx, query,id, meta.CreatorID, meta.SpaceID, meta.ContentType,meta.Name, meta.Desc, meta.IconURI, meta.AppID, meta.Mode,now.UnixMilli(), now.UnixMilli())if err != nil {return 0, fmt.Errorf("创建工作流元数据失败: %w", err)}return id, nil
}// 创建或更新工作流草稿
func (r *WorkflowRepository) CreateOrUpdateDraft(ctx context.Context, workflowID int64, draft *vo.DraftInfo) error {query := `INSERT INTO workflow_draft (workflow_id, canvas, test_run_success, modified,input_params, output_params, commit_id, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)ON DUPLICATE KEY UPDATEcanvas = VALUES(canvas),test_run_success = VALUES(test_run_success),modified = VALUES(modified),input_params = VALUES(input_params),output_params = VALUES(output_params),commit_id = VALUES(commit_id),updated_at = VALUES(updated_at)`now := time.Now()_, err := r.db.ExecContext(ctx, query,workflowID, draft.Canvas, draft.DraftMeta.TestRunSuccess,draft.DraftMeta.Modified, draft.InputParamsStr,draft.OutputParamsStr, draft.CommitID,now.UnixMilli(), now.UnixMilli())if err != nil {return fmt.Errorf("创建或更新工作流草稿失败: %w", err)}return nil
}
5. 事件发布层(EventPublisher)
文件位置:backend/application/workflow/eventbus.go
创建事件发布流程:
// 工作流资源事件发布
func PublishWorkflowResource(ctx context.Context, workflowID int64, mode *int32, op search.OpType, r *search.ResourceDocument) error {if r == nil {r = &search.ResourceDocument{}}// 1. 设置资源基本信息r.ResType = common.ResType_Workflowr.ResID = workflowIDr.ResSubType = mode// 2. 构建资源领域事件event := &entity.ResourceDomainEvent{OpType: entity.OpType(op),Resource: r,}// 3. 设置时间戳信息if op == search.Created {event.Resource.CreateTimeMS = r.CreateTimeMSevent.Resource.UpdateTimeMS = r.UpdateTimeMS} else if op == search.Updated {event.Resource.UpdateTimeMS = r.UpdateTimeMS}// 4. 发布资源事件err := eventBus.PublishResources(ctx, event)if err != nil {return err}return nil
}// 异步创建事件处理器
func (h *PluginEventHandler) HandlePluginCreatedEvent(ctx context.Context, event *entity.PluginCreatedEvent) error {// 1. 建立ElasticSearch索引err := h.addToESIndex(ctx, event)if err != nil {logs.CtxErrorf(ctx, "Failed to add to ES index: %v", err)return err}// 2. 更新缓存数据err = h.updateCache(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to update cache: %v", err)}// 3. 初始化插件配置err = h.initializePluginConfig(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to initialize plugin config: %v", err)}// 4. 发送创建通知err = h.sendCreateNotification(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to send create notification: %v", err)}// 5. 更新统计数据err = h.updateCreateStatistics(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to update create statistics: %v", err)}return nil
}// ElasticSearch索引创建
func (h *PluginEventHandler) addToESIndex(ctx context.Context, event *entity.PluginCreatedEvent) error {// 构建ES文档doc := map[string]interface{}{"id": event.PluginID,"space_id": event.SpaceID,"creator_id": event.CreatorID,"name": event.Name,"description": event.Description,"plugin_type": event.PluginType,"server_url": event.ServerURL,"created_at": event.CreatedAt,"status": "draft",}// 添加到ES索引err := h.esClient.Index(ctx, "coze_resource", fmt.Sprintf("%d", event.PluginID), doc)if err != nil {return fmt.Errorf("创建ES索引失败: %w", err)}logs.CtxInfof(ctx, "Added plugin to ES index, pluginID=%d", event.PluginID)return nil
}
创建事件处理内容:
- 索引建立:在ElasticSearch中创建插件文档索引
- 缓存更新:更新相关的缓存数据
- 配置初始化:初始化插件相关的配置文件
- 通知发送:向相关用户发送创建成功通知
- 统计更新:更新插件创建相关的统计数据
6. 响应数据结构
RegisterPluginMetaResponse:
type RegisterPluginMetaResponse struct {Code int64 `json:"code"` // 响应码Msg string `json:"msg"` // 响应消息Success bool `json:"success"` // 创建是否成功PluginID int64 `json:"plugin_id"` // 新创建的插件IDCreatedAt int64 `json:"created_at"` // 创建时间戳BaseResp *base.BaseResp `json:"base_resp"` // 基础响应信息
}
RegisterPluginMetaRequest请求结构:
type RegisterPluginMetaRequest struct {PluginType int32 `json:"plugin_type" binding:"required"` // 插件类型SpaceID int64 `json:"space_id" binding:"required"` // 工作空间IDName string `json:"name" binding:"required"` // 插件名称Desc string `json:"desc,omitempty"` // 插件描述URL string `json:"url,omitempty"` // 服务器URLAuthType int32 `json:"auth_type" binding:"required"` // 认证类型SubAuthType *int32 `json:"sub_auth_type,omitempty"` // 认证子类型Location string `json:"location,omitempty"` // 参数位置Key string `json:"key,omitempty"` // 认证密钥ServiceToken string `json:"service_token,omitempty"` // 服务令牌ProjectID *int64 `json:"project_id,omitempty"` // 项目IDIcon *Icon `json:"icon,omitempty"` // 插件图标CommonParams string `json:"common_params,omitempty"` // 通用参数OauthInfo *OAuthInfo `json:"oauth_info,omitempty"` // OAuth信息AuthPayload string `json:"auth_payload,omitempty"` // 认证载荷
}
PluginCreatedEvent事件结构:
type PluginCreatedEvent struct {PluginID int64 `json:"plugin_id"` // 插件IDSpaceID int64 `json:"space_id"` // 工作空间IDCreatorID int64 `json:"creator_id"` // 创建者IDName string `json:"name"` // 插件名称Description string `json:"description"` // 插件描述PluginType int32 `json:"plugin_type"` // 插件类型ServerURL string `json:"server_url"` // 服务器URLCreatedAt time.Time `json:"created_at"` // 创建时间EventType string `json:"event_type"` // 事件类型
}
响应内容说明:
- 成功响应:返回创建成功状态和新创建的插件ID
- 错误响应:返回具体的错误码和错误信息(如权限不足、参数无效等)
- 参数验证:确保所有必需参数都已提供且格式正确
- 时间戳:记录创建操作的具体时间