当前位置: 首页 > news >正文

Coze源码分析-资源库-创建工作流-后端源码-安全/错误/流程

8. 工作流创建安全和权限验证机制

8.1 工作流创建身份认证

JWT Token验证

  • 创建工作流的所有API请求都需要携带有效的JWT Token
  • Token包含用户ID、工作空间权限等关键信息
  • 通过中间件统一验证Token的有效性和完整性
// 工作流创建身份验证中间件
func WorkflowCreateAuthMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {token := ctx.GetHeader("Authorization")if token == nil {ctx.JSON(401, gin.H{"error": "创建工作流需要登录认证"})ctx.Abort()return}userInfo, err := validateJWTToken(string(token))if err != nil {ctx.JSON(401, gin.H{"error": "Token无效,无法创建工作流"})ctx.Abort()return}// 验证用户是否有创建工作流的权限if !userInfo.HasWorkflowCreatePermission {ctx.JSON(403, gin.H{"error": "用户无创建工作流权限"})ctx.Abort()return}ctx.Set("user_id", userInfo.UserID)ctx.Set("space_id", userInfo.SpaceID)ctx.Next()}
}

8.2 工作流创建工作空间权限控制

空间隔离机制

  • 每个用户只能在其所属工作空间中创建工作流
  • 通过 space_id 字段实现工作流创建权限隔离
  • 在工作流创建操作中强制验证空间权限
// 工作流创建工作空间权限验证 - 基于实际代码 backend/application/workflow/workflow.go
func checkUserSpace(ctx context.Context, uid int64, spaceID int64) error {spaces, err := crossuser.DefaultSVC().GetUserSpaceList(ctx, uid)if err != nil {return err}var match boolfor _, s := range spaces {if s.ID == spaceID {match = truebreak}}if !match {return fmt.Errorf("user %d does not have access to space %d", uid, spaceID)}return nil
}// 工作流创建权限验证应用
func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (_ *workflow.CreateWorkflowResponse, err error,
) {uID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())// 验证用户工作空间权限if err := checkUserSpace(ctx, uID, spaceID); err != nil {return nil, err}// 继续工作流创建逻辑...
}

8.3 工作流创建资源级权限验证

工作流创建用户权限验证

  • 验证用户是否具有工作流创建权限
  • 检查用户在指定工作空间的操作权限
  • 通过用户ID和工作空间ID进行权限验证
// 工作流创建权限验证 - 基于实际代码结构
func (w *ApplicationService) validateWorkflowCreatePermission(ctx context.Context, req *workflow.CreateWorkflowRequest) error {userID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())// 验证用户工作空间权限if err := checkUserSpace(ctx, userID, spaceID); err != nil {return vo.WrapError(errno.ErrAuthorizationRequired, err)}// 检查工作流名称是否重复if req.Name != "" {exists, err := w.checkWorkflowNameExists(ctx, spaceID, req.Name)if err != nil {return fmt.Errorf("检查工作流名称重复失败: %w", err)}if exists {return vo.WrapError(errno.ErrConversationNameIsDuplicated, fmt.Errorf("工作流名称 %s 已存在", req.Name))}}// 验证工作流模式参数if req.IsSetFlowMode() {if req.GetFlowMode() != workflow.WorkflowMode_Workflow && req.GetFlowMode() != workflow.WorkflowMode_ChatFlow {return vo.WrapError(errno.ErrInvalidParameter, errors.New("无效的工作流模式"))}}return nil
}// 检查工作流名称是否存在
func (w *ApplicationService) checkWorkflowNameExists(ctx context.Context, spaceID int64, name string) (bool, error) {// 查询同一工作空间下是否存在同名工作流workflows, err := GetWorkflowDomainSVC().List(ctx, &vo.ListPolicy{SpaceID: spaceID,Name:    &name,Limit:   1,})if err != nil {return false, err}return len(workflows) > 0, nil
}

8.4 工作流创建API访问控制

创建请求频率限制

  • 实现基于用户的工作流创建频率限制
  • 防止恶意批量创建工作流
  • 支持不同用户等级的差异化创建限流策略

创建操作安全验证

  • 严格验证创建请求的合法性
  • 防止恶意创建和资源滥用攻击
  • 使用多重安全检查机制
  • 确保工作流数据的完整性和一致性
// 工作流创建参数验证
func validateWorkflowCreateRequest(req *workflow.CreateWorkflowRequest) error {if req.GetSpaceID() == "" {return vo.WrapError(errno.ErrInvalidParameter, errors.New("工作空间ID不能为空"))}spaceID, err := strconv.ParseInt(req.GetSpaceID(), 10, 64)if err != nil || spaceID <= 0 {return vo.WrapError(errno.ErrInvalidParameter, errors.New("无效的工作空间ID"))}// 验证工作流名称if req.Name == "" {return vo.WrapError(errno.ErrMissingRequiredParam, errors.New("工作流名称不能为空"))}if len(req.Name) > 100 {return vo.WrapError(errno.ErrInvalidParameter, errors.New("工作流名称长度不能超过100字符"))}// 验证工作流描述if len(req.Desc) > 2000 {return vo.WrapError(errno.ErrInvalidParameter, errors.New("工作流描述长度不能超过2000字符"))}// 验证图标URIif req.IconURI != "" {if !isValidIconURI(req.IconURI) {return vo.WrapError(errno.ErrInvalidParameter, errors.New("无效的图标URI格式"))}}// 验证工作流模式if req.IsSetFlowMode() {if !isValidWorkflowMode(req.GetFlowMode()) {return vo.WrapError(errno.ErrInvalidParameter, errors.New("无效的工作流模式"))}}return nil
}// 插件创建操作安全检查
func (s *PluginApplicationService) validatePluginCreateSafety(ctx context.Context, req *service.CreateDraftPluginRequest) error {userID := ctx.Value("user_id").(int64)// 检查用户插件创建频率限制createCount, err := s.getUserPluginCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))if err != nil {return fmt.Errorf("检查插件创建频率失败: %w", err)}if createCount >= 10 { // 24小时内最多创建10个插件return errorx.New(errno.ErrPluginCreateRateLimitCode, errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 检查服务器URL是否可访问accessible, err := s.checkServerURLAccessible(ctx, req.ServerURL)if err != nil {return fmt.Errorf("检查服务器URL可访问性失败: %w", err)}if !accessible {return errors.New("服务器URL不可访问")}// 检查用户存储配额storageUsed, err := s.getUserStorageUsage(ctx, userID)if err != nil {return fmt.Errorf("检查用户存储配额失败: %w", err)}if storageUsed >= s.getMaxStorageQuota(userID) {return errors.New("用户存储配额已满,无法创建新插件")}return nil
}// 检查服务器URL可访问性
func (s *PluginApplicationService) checkServerURLAccessible(ctx context.Context, serverURL string) (bool, error) {// 创建HTTP客户端,设置超时时间client := &http.Client{Timeout: 10 * time.Second,}// 发送HEAD请求检查URL可访问性resp, err := client.Head(serverURL)if err != nil {logs.CtxWarnf(ctx, "Server URL not accessible: %s, error: %v", serverURL, err)return false, nil}defer resp.Body.Close()// 检查响应状态码if resp.StatusCode >= 200 && resp.StatusCode < 400 {return true, nil}logs.CtxWarnf(ctx, "Server URL returned non-success status: %s, status: %d", serverURL, resp.StatusCode)return false, nil
}// 获取用户存储使用量
func (s *PluginApplicationService) getUserStorageUsage(ctx context.Context, userID int64) (int64, error) {// 查询用户所有插件的存储使用量plugins, err := s.DomainSVC.ListUserPlugins(ctx, userID)if err != nil {return 0, fmt.Errorf("获取用户插件列表失败: %w", err)}var totalSize int64for _, plugin := range plugins {// 计算插件manifest和openapi_doc的存储大小if plugin.Manifest != nil {totalSize += int64(len(plugin.Manifest))}if plugin.OpenapiDoc != nil {totalSize += int64(len(plugin.OpenapiDoc))}}return totalSize, nil
}// 获取用户最大存储配额
func (s *PluginApplicationService) getMaxStorageQuota(userID int64) int64 {// 根据用户等级返回不同的存储配额// 这里简化处理,实际应该从用户配置中获取return 100 * 1024 * 1024 // 100MB
}// URL格式验证
func isValidURL(urlStr string) bool {u, err := url.Parse(urlStr)return err == nil && u.Scheme != "" && u.Host != ""
}// 插件类型验证
func isValidPluginType(pluginType common.PluginType) bool {validTypes := []common.PluginType{common.PluginTypeHTTP,common.PluginTypeLocal,}for _, validType := range validTypes {if pluginType == validType {return true}}return false
}

9. 工作流创建错误处理和日志记录

9.1 工作流创建分层错误处理机制

工作流创建错误分类体系 - 基于实际错误码定义 backend/types/errno/workflow.go:

// 工作流创建错误类型定义
const (// 工作流创建业务错误ErrWorkflowNotPublished                        = 720702011ErrMissingRequiredParam                        = 720702002ErrInvalidParameter                            = 720702001ErrWorkflowNotFound                            = 720702004ErrConversationNameIsDuplicated                = 720702200ErrConversationOfAppNotFound                   = 720702201ErrConversationNodeInvalidOperation            = 720702250ErrOnlyDefaultConversationAllowInAgentScenario = 720712033ErrConversationNodesNotAvailable               = 702093204ErrInvalidVersionName                          = 777777769ErrAuthorizationRequired                       = 777777765ErrInputFieldMissing                           = 777777763ErrConversationNotFoundForOperation            = 777777762// 工作流创建系统错误ErrDatabaseError                               = 720700801ErrRedisError                                  = 720700803ErrIDGenError                                  = 720700808ErrWorkflowExecuteFail                         = 720701013ErrSerializationDeserializationFail            = 720701011ErrInternalBadRequest                          = 720701007ErrSchemaConversionFail                        = 720702089ErrWorkflowCompileFail                         = 720701003ErrNotifyWorkflowResourceChangeErr             = 777777770ErrWorkflowSnapshotNotFound                    = 777777771ErrCreateNodeFail                              = 777777772// 工作流创建执行错误ErrWorkflowTimeout                             = 720702085ErrInterruptNotSupported                       = 720702078ErrWorkflowCanceledByUser                      = 777777777ErrNodeTimeout                                 = 777777776ErrWorkflowOperationFail                       = 777777775ErrArrIndexOutOfRange                          = 720712014ErrIndexingNilArray                            = 777777774ErrLLMStructuredOutputParseFail                = 777777773
)

工作流创建错误处理流程

  1. 捕获阶段:在工作流创建各层级捕获具体错误
  2. 包装阶段:使用vo.WrapError添加工作流创建操作相关上下文信息和错误码
  3. 记录阶段:根据错误级别记录工作流创建操作日志
  4. 响应阶段:返回用户友好的工作流创建错误信息
  5. 回滚阶段:工作流创建失败时进行必要的数据回滚操作
  6. 恢复机制:使用defer和recover处理panic异常
  7. 重试机制:对于可重试的创建错误提供重试建议
  8. 用户指导:为常见创建错误提供解决方案指导

9.2 工作流创建统一错误响应格式

// 工作流创建错误响应结构
type WorkflowCreateErrorResponse struct {Code         int    `json:"code"`Message      string `json:"message"`Details      string `json:"details,omitempty"`TraceID      string `json:"trace_id"`WorkflowID   int64  `json:"workflow_id,omitempty"`Operation    string `json:"operation"`CanRetry     bool   `json:"can_retry"`ValidationErrors []string `json:"validation_errors,omitempty"`SuggestedFix string `json:"suggested_fix,omitempty"`FieldErrors  map[string]string `json:"field_errors,omitempty"`SpaceID      int64  `json:"space_id,omitempty"`UserID       int64  `json:"user_id,omitempty"`
}// 工作流创建错误处理中间件 - 基于实际代码结构
func WorkflowCreateErrorHandlerMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {defer func() {if panicErr := recover(); panicErr != nil {traceID := ctx.GetString("trace_id")userID := ctx.GetInt64("user_id")spaceID := ctx.GetInt64("space_id")logs.CtxErrorf(c, "Workflow creation panic recovered: %v, userID=%d, spaceID=%d, traceID=%s", panicErr, userID, spaceID, traceID)// 使用safego.NewPanicErr包装panic错误err := safego.NewPanicErr(panicErr, debug.Stack())wrappedErr := vo.WrapIfNeeded(errno.ErrWorkflowOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))ctx.JSON(500, WorkflowCreateErrorResponse{Code:      5000,Message:   "工作流创建服务器内部错误",TraceID:   traceID,Operation: "create_workflow",CanRetry:  true,SuggestedFix: "请稍后重试,如果问题持续存在请联系技术支持",UserID:    userID,SpaceID:   spaceID,})}}()ctx.Next()}
}// 工作流创建业务错误处理 - 基于实际错误码
func handleWorkflowCreateBusinessError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response WorkflowCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_workflow"switch {case errors.Is(err, errno.ErrInvalidParameter):response.Code = 400response.Message = "工作流参数无效"response.CanRetry = falseresponse.SuggestedFix = "请检查工作流名称、描述、工作空间ID等参数是否正确"case errors.Is(err, errno.ErrAuthorizationRequired):response.Code = 403response.Message = "无权限创建工作流"response.CanRetry = falseresponse.SuggestedFix = "请确保已登录且具有工作流创建权限"case errors.Is(err, errno.ErrConversationNameIsDuplicated):response.Code = 409response.Message = "工作流名称已存在"response.CanRetry = falseresponse.SuggestedFix = "请使用不同的工作流名称或检查是否已存在同名工作流"case errors.Is(err, errno.ErrWorkflowNotFound):response.Code = 404response.Message = "工作流不存在"response.CanRetry = falseresponse.SuggestedFix = "请检查工作流ID是否正确"case errors.Is(err, errno.ErrMissingRequiredParam):response.Code = 400response.Message = "缺少必需参数"response.CanRetry = falseresponse.SuggestedFix = "请检查所有必需参数是否已提供"case errors.Is(err, errno.ErrInvalidVersionName):response.Code = 400response.Message = "工作流版本名称无效"response.CanRetry = falseresponse.SuggestedFix = "请使用有效的版本名称格式"case errors.Is(err, errno.ErrInputFieldMissing):response.Code = 400response.Message = "输入字段缺失"response.CanRetry = falseresponse.SuggestedFix = "请检查所有必需的输入字段是否已提供"default:response.Code = 500response.Message = "工作流创建失败"response.CanRetry = trueresponse.SuggestedFix = "请稍后重试,如果问题持续存在请联系技术支持"}ctx.JSON(response.Code, response)
}// 工作流创建系统错误处理 - 基于实际错误码
func handleWorkflowCreateSystemError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response WorkflowCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_workflow"switch {case errors.Is(err, errno.ErrDatabaseError):response.Code = 500response.Message = "工作流数据库操作失败"response.CanRetry = trueresponse.SuggestedFix = "数据库连接异常,请稍后重试"case errors.Is(err, errno.ErrRedisError):response.Code = 500response.Message = "工作流缓存操作失败"response.CanRetry = trueresponse.SuggestedFix = "缓存服务异常,请稍后重试"case errors.Is(err, errno.ErrIDGenError):response.Code = 500response.Message = "工作流ID生成失败"response.CanRetry = trueresponse.SuggestedFix = "ID生成服务异常,请稍后重试"case errors.Is(err, errno.ErrNotifyWorkflowResourceChangeErr):response.Code = 500response.Message = "工作流资源变更通知失败"response.CanRetry = trueresponse.SuggestedFix = "资源变更通知异常,工作流已创建但可能影响搜索"case errors.Is(err, errno.ErrWorkflowCompileFail):response.Code = 500response.Message = "工作流编译失败"response.CanRetry = falseresponse.SuggestedFix = "工作流编译异常,请联系技术支持"case errors.Is(err, errno.ErrSerializationDeserializationFail):response.Code = 500response.Message = "工作流数据序列化失败"response.CanRetry = falseresponse.SuggestedFix = "数据序列化异常,请联系技术支持"case errors.Is(err, errno.ErrSchemaConversionFail):response.Code = 500response.Message = "工作流模式转换失败"response.CanRetry = falseresponse.SuggestedFix = "模式转换异常,请联系技术支持"case errors.Is(err, errno.ErrCreateNodeFail):response.Code = 500response.Message = "工作流节点创建失败"response.CanRetry = trueresponse.SuggestedFix = "节点创建异常,请稍后重试"default:response.Code = 5000response.Message = "工作流创建失败"response.Details = "服务器内部错误,请稍后重试"response.CanRetry = trueresponse.SuggestedFix = "系统内部错误,请稍后重试或联系技术支持"}ctx.JSON(response.Code, response)
}

9.3 工作流创建日志记录策略

工作流创建日志级别定义

  • DEBUG:工作流创建详细调试信息,包括参数值、中间结果、数据转换过程
  • INFO:工作流创建关键业务流程信息,如创建开始、参数验证、数据插入、资源发布
  • WARN:工作流创建潜在问题警告,如参数格式警告、性能警告、资源使用警告
  • ERROR:工作流创建错误信息,包括创建失败、权限错误、数据验证失败
  • FATAL:工作流创建严重错误,可能导致数据不一致或服务不可用

工作流创建结构化日志格式

// 工作流创建日志记录示例 - 基于实际代码结构
func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (*workflow.CreateWorkflowResponse, error) {defer func() {if panicErr := recover(); panicErr != nil {err := safego.NewPanicErr(panicErr, debug.Stack())logs.CtxErrorf(ctx, "CreateWorkflow panic recovered: %v", err)}}()uID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())// 记录工作流创建开始logs.CtxInfof(ctx, "CreateWorkflow started, userID=%d, workflowName=%s, spaceID=%d", uID, req.Name, spaceID)startTime := time.Now()defer func() {duration := time.Since(startTime)logs.CtxInfof(ctx, "CreateWorkflow completed, duration=%dms, userID=%d", duration.Milliseconds(), uID)}()// 权限验证日志logs.CtxInfof(ctx, "Validating workflow create permission, userID=%d, spaceID=%d", uID, spaceID)if err := checkUserSpace(ctx, uID, spaceID); err != nil {logs.CtxErrorf(ctx, "Workflow create permission denied, userID=%d, spaceID=%d, error=%v", uID, spaceID, err)return nil, err}// 参数验证日志logs.CtxInfof(ctx, "Validating workflow parameters, name=%s, desc=%s, mode=%v", req.Name, ptr.SafeDeref(req.Desc), req.GetFlowMode())// 数据库创建操作日志logs.CtxInfof(ctx, "Creating workflow in database, workflowName=%s, userID=%d", req.Name, uID)id, err := GetWorkflowDomainSVC().Create(ctx, &vo.MetaCreate{CreatorID:   uID,SpaceID:     spaceID,Name:        req.Name,Desc:        req.Desc,IconURI:     req.IconURI,// ... 其他字段})if err != nil {logs.CtxErrorf(ctx, "Failed to create workflow in database, error=%v", err)return nil, err}// 资源发布日志logs.CtxInfof(ctx, "Publishing workflow resource event, workflowID=%d", id)return &workflow.CreateWorkflowResponse{Data: &workflow.CreateWorkflowData{WorkflowID: strconv.FormatInt(id, 10),},}, nil
}// 工作流创建操作审计日志
func (w *ApplicationService) logWorkflowCreateAudit(ctx context.Context, operation string, workflowID int64, details map[string]interface{}) {userID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := details["space_id"].(int64)auditLog := map[string]interface{}{"operation":     operation,"workflow_id":   workflowID,"user_id":       userID,"space_id":      spaceID,"timestamp":     time.Now().Unix(),"details":       details,"workflow_name": details["workflow_name"],"workflow_mode": details["workflow_mode"],"icon_uri":      details["icon_uri"],"app_id":        details["app_id"],}logs.CtxInfof(ctx, "Workflow create audit log: %+v", auditLog)
}

工作流创建日志内容规范

  • 请求日志:记录用户ID、工作空间ID、工作流名称、工作流模式、应用ID
  • 业务日志:记录工作流创建步骤、参数验证结果、权限验证结果、数据构建过程
  • 性能日志:记录创建接口响应时间、数据库插入时间、资源发布时间、ID生成时间
  • 错误日志:记录创建错误堆栈、工作流相关上下文信息、失败原因分析
  • 审计日志:记录工作流的创建操作、创建参数、创建结果、关联的应用信息
  • 安全日志:记录创建频率、权限验证、参数验证、可疑创建行为

9.4 工作流创建监控和告警

工作流创建关键指标监控

  • 创建性能:工作流创建响应时间、创建成功率、创建QPS、创建吞吐量
  • 资源使用:工作流数据库连接数、缓存操作延迟、内存使用率、ID生成延迟
  • 业务指标:工作流创建成功率、创建频率分布、不同模式工作流创建比例、用户创建活跃度
  • 安全指标:权限验证通过率、恶意创建尝试次数、参数验证失败率、工作空间访问控制
  • 质量指标:工作流参数验证通过率、模式转换成功率、资源发布成功率

工作流创建告警策略

  • 创建失败率告警:当工作流创建失败率超过5%时触发告警
  • 性能告警:当工作流创建响应时间超过3秒时触发告警
  • 资源告警:当工作流数据库连接数超过80%时触发告警
  • 安全告警:当检测到异常创建行为时立即触发告警
  • 数据一致性告警:当数据库和缓存创建状态不一致时触发告警
  • 系统告警:当ID生成服务或资源发布服务异常时触发告警
// 工作流创建监控指标收集
type WorkflowCreateMetrics struct {CreateSuccessCount      int64         // 创建成功次数CreateFailureCount      int64         // 创建失败次数CreateLatency           time.Duration // 创建延迟PermissionDeniedCount   int64         // 权限拒绝次数ParameterValidationFailCount int64    // 参数验证失败次数IDGenerationLatency     time.Duration // ID生成延迟DatabaseInsertLatency   time.Duration // 数据库插入延迟ResourcePublishLatency  time.Duration // 资源发布延迟CacheOperationLatency   time.Duration // 缓存操作延迟WorkflowModeDistribution map[string]int64 // 工作流模式分布SpaceAccessDeniedCount  int64         // 工作空间访问拒绝次数SchemaConversionFailCount int64       // 模式转换失败次数NodeCreateFailCount     int64         // 节点创建失败次数CompileFailCount        int64         // 编译失败次数
}// 工作流创建监控指标上报
func (w *ApplicationService) reportCreateMetrics(ctx context.Context, operation string, startTime time.Time, workflowID int64, req *workflow.CreateWorkflowRequest, err error) {latency := time.Since(startTime)if err != nil {metrics.CreateFailureCount++// 根据错误类型分类统计switch {case errors.Is(err, errno.ErrAuthorizationRequired):metrics.PermissionDeniedCount++case errors.Is(err, errno.ErrInvalidParameter):metrics.ParameterValidationFailCount++case errors.Is(err, errno.ErrSchemaConversionFail):metrics.SchemaConversionFailCount++case errors.Is(err, errno.ErrCreateNodeFail):metrics.NodeCreateFailCount++case errors.Is(err, errno.ErrWorkflowCompileFail):metrics.CompileFailCount++}logs.CtxErrorf(ctx, "Workflow %s failed, workflowName=%s, spaceID=%s, error=%v, latency=%dms", operation, req.Name, req.GetSpaceID(), err, latency.Milliseconds())} else {metrics.CreateSuccessCount++metrics.CreateLatency = latency// 记录工作流模式统计workflowMode := "workflow"if req.IsSetFlowMode() {workflowMode = req.GetFlowMode().String()}metrics.WorkflowModeDistribution[workflowMode]++logs.CtxInfof(ctx, "Workflow %s succeeded, workflowID=%d, workflowName=%s, workflowMode=%s, latency=%dms", operation, workflowID, req.Name, workflowMode, latency.Milliseconds())}// 上报到监控系统w.metricsReporter.Report(ctx, "workflow_create", map[string]interface{}{"operation":     operation,"workflow_id":   workflowID,"workflow_name": req.Name,"workflow_mode": req.GetFlowMode().String(),"space_id":      req.GetSpaceID(),"success":       err == nil,"latency_ms":    latency.Milliseconds(),"error_type":    getWorkflowCreateErrorType(err),})
}// 获取工作流创建错误类型
func getWorkflowCreateErrorType(err error) string {if err == nil {return "none"}// 基于真实错误码定义:backend/types/errno/workflow.goswitch {case errors.Is(err, errno.ErrAuthorizationRequired):return "authorization_required"case errors.Is(err, errno.ErrInvalidParameter):return "invalid_parameter"case errors.Is(err, errno.ErrConversationNameIsDuplicated):return "workflow_name_duplicated"case errors.Is(err, errno.ErrWorkflowNotFound):return "workflow_not_found"case errors.Is(err, errno.ErrMissingRequiredParam):return "missing_required_param"case errors.Is(err, errno.ErrInvalidVersionName):return "invalid_version_name"case errors.Is(err, errno.ErrInputFieldMissing):return "input_field_missing"case errors.Is(err, errno.ErrDatabaseError):return "database_error"case errors.Is(err, errno.ErrRedisError):return "redis_error"case errors.Is(err, errno.ErrIDGenError):return "id_generation_error"case errors.Is(err, errno.ErrWorkflowCompileFail):return "workflow_compile_fail"case errors.Is(err, errno.ErrSchemaConversionFail):return "schema_conversion_fail"case errors.Is(err, errno.ErrCreateNodeFail):return "create_node_fail"case errors.Is(err, errno.ErrNotifyWorkflowResourceChangeErr):return "resource_change_notify_fail"default:return "system_error"}
}

10. 工作流创建流程图

10.1 CreateWorkflow接口完整调用流程

用户登录 Coze 平台点击"资源库" → 点击右上角"+"号 → 点击"工作流"场景的后端处理流程:

用户点击"工作流" → 前端发起请求 → API网关路由 → Handler处理 → 业务服务层 → 数据持久化层 → 索引创建层 → 响应返回↓                    ↓           ↓          ↓         ↓          ↓            ↓          ↓
前端填写表单        HTTP POST请求   路由匹配     参数验证    权限检查     MySQL插入    ES索引创建   JSON响应↓                    ↓           ↓          ↓         ↓          ↓            ↓          ↓
/create-workflow    /api/workflow_api/ Handler   请求绑定   用户身份    workflow_    事件发布     创建结果create           函数调用   参数校验   Session    meta/draft   异步处理     状态返回CreateWorkflow       验证       表插入       ES索引       ↓WorkflowApplicationService↓身份验证(登录检查)↓权限验证(空间权限检查)↓参数验证(名称、描述、模式等)↓工作流模式验证↓数据库创建事务↓创建事件发布↓返回创建结果

10.2 工作流创建详细流程说明

1. API网关层(路由处理)

文件位置backend/api/handler/coze/workflow_service.go

// @router /api/workflow_api/create [POST]
func CreateWorkflow(ctx context.Context, c *app.RequestContext) {var err errorvar req workflow.CreateWorkflowRequest// 1. 请求参数绑定和验证err = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}// 2. 工作流创建参数校验if req.GetName() == "" {invalidParamRequestResponse(c, "workflow name is invalid")return}if req.GetSpaceID() == "" {invalidParamRequestResponse(c, "spaceID is invalid")return}// 3. 调用工作流创建服务resp, err := appworkflow.SVC.CreateWorkflow(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}// 4. 返回JSON响应c.JSON(consts.StatusOK, resp)
}

处理步骤

  • 路由匹配POST /api/workflow_api/create
  • 参数绑定:将HTTP请求体绑定到 CreateWorkflowRequest 结构体
  • 参数验证:验证工作流名称、空间ID、工作流模式等参数的有效性
  • 服务调用:调用工作流服务的 CreateWorkflow 方法
  • 响应返回:返回JSON格式的创建结果
2. 业务服务层(WorkflowApplicationService)

文件位置backend/application/workflow/workflow.go

func (w *ApplicationService) CreateWorkflow(ctx context.Context, req *workflow.CreateWorkflowRequest) (_ *workflow.CreateWorkflowResponse, err error,
) {defer func() {if panicErr := recover(); panicErr != nil {err = safego.NewPanicErr(panicErr, debug.Stack())}if err != nil {err = vo.WrapIfNeeded(errno.ErrWorkflowOperationFail, err, errorx.KV("cause", vo.UnwrapRootErr(err).Error()))}}()// 1. 用户身份验证uID := ctxutil.MustGetUIDFromCtx(ctx)spaceID := mustParseInt64(req.GetSpaceID())// 2. 验证用户空间权限if err := checkUserSpace(ctx, uID, spaceID); err != nil {return nil, err}// 3. 处理会话流模式的特殊逻辑var createConversation boolif req.ProjectID != nil && req.IsSetFlowMode() && req.GetFlowMode() == workflow.WorkflowMode_ChatFlow && req.IsSetCreateConversation() && req.GetCreateConversation() {createConversation = true_, err := GetWorkflowDomainSVC().CreateDraftConversationTemplate(ctx, &vo.CreateConversationTemplateMeta{AppID:   mustParseInt64(req.GetProjectID()),UserID:  uID,SpaceID: spaceID,Name:    req.Name,})if err != nil {return nil, err}}// 4. 构建工作流创建请求wf := &vo.MetaCreate{CreatorID:        uID,SpaceID:          spaceID,ContentType:      workflow.WorkFlowType_User,Name:             req.Name,Desc:             req.Desc,IconURI:          req.IconURI,AppID:            parseInt64(req.ProjectID),Mode:             ternary.IFElse(req.IsSetFlowMode(), req.GetFlowMode(), workflow.WorkflowMode_Workflow),InitCanvasSchema: vo.GetDefaultInitCanvasJsonSchema(i18n.GetLocale(ctx)),}// 5. 设置会话流的初始画布if req.IsSetFlowMode() && req.GetFlowMode() == workflow.WorkflowMode_ChatFlow {conversationName := req.Nameif !req.IsSetProjectID() || mustParseInt64(req.GetProjectID()) == 0 || !createConversation {conversationName = "Default"}wf.InitCanvasSchema = vo.GetDefaultInitCanvasJsonSchemaChat(i18n.GetLocale(ctx), conversationName)}// 6. 执行创建操作id, err := GetWorkflowDomainSVC().Create(ctx, wf)if err != nil {return nil, err}// 7. 发布创建事件err = PublishWorkflowResource(ctx, id, ptr.Of(int32(wf.Mode)), search.Created, &search.ResourceDocument{Name:          &wf.Name,APPID:         wf.AppID,SpaceID:       &wf.SpaceID,OwnerID:       &wf.CreatorID,PublishStatus: ptr.Of(resource.PublishStatus_UnPublished),CreateTimeMS:  ptr.Of(time.Now().UnixMilli()),})if err != nil {return nil, vo.WrapError(errno.ErrNotifyWorkflowResourceChangeErr, err)}// 8. 返回创建结果return &workflow.CreateWorkflowResponse{Data: &workflow.CreateWorkflowData{WorkflowID: strconv.FormatInt(id, 10),},}, nil
}

核心功能

  • 身份验证:从上下文中提取用户ID,验证用户登录状态
  • 权限验证:验证用户是否具有指定空间的工作流创建权限
  • 模式处理:处理不同工作流模式(标准工作流、会话流)的特殊逻辑
  • 数据构建:构建完整的工作流创建请求数据结构
  • 画布初始化:根据工作流模式初始化默认画布结构
  • 数据创建:调用领域服务在数据库中创建工作流记录
  • 事件发布:发布工作流创建事件用于异步索引建立
  • 响应组装:构建标准化的创建响应数据结构
3. 领域服务层(工作流创建领域服务)

文件位置backend/domain/workflow/service/service_impl.go

核心功能

  • 元数据创建:创建工作流的基础元数据信息
  • 画布保存:保存工作流的初始画布结构
  • 参数提取:提取工作流的输入输出参数信息
  • 数据持久化:将工作流数据保存到数据库中
// 工作流创建核心方法
func (i *impl) Create(ctx context.Context, meta *vo.MetaCreate) (int64, error) {// 1. 创建工作流元数据id, err := i.repo.CreateMeta(ctx, &vo.Meta{CreatorID:   meta.CreatorID,SpaceID:     meta.SpaceID,ContentType: meta.ContentType,Name:        meta.Name,Desc:        meta.Desc,IconURI:     meta.IconURI,AppID:       meta.AppID,Mode:        meta.Mode,})if err != nil {return 0, err}// 2. 保存初始化画布信息到草稿if err = i.Save(ctx, id, meta.InitCanvasSchema); err != nil {return 0, err}return id, nil
}// 保存工作流画布
func (i *impl) Save(ctx context.Context, id int64, schema string) (err error) {var draft vo.Canvasif err = sonic.UnmarshalString(schema, &draft); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}// 1. 提取输入输出参数var inputParams, outputParams stringinputs, outputs := extractInputsAndOutputsNamedInfoList(&draft)if inputParams, err = sonic.MarshalString(inputs); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}if outputParams, err = sonic.MarshalString(outputs); err != nil {return vo.WrapError(errno.ErrSerializationDeserializationFail, err)}// 2. 计算测试运行状态testRunSuccess, err := i.calculateTestRunSuccess(ctx, &draft, id)if err != nil {return err}// 3. 生成提交IDcommitID, err := i.repo.GenID(ctx)if err != nil {return vo.WrapError(errno.ErrIDGenError, err)}// 4. 创建或更新草稿return i.repo.CreateOrUpdateDraft(ctx, id, &vo.DraftInfo{Canvas: schema,DraftMeta: &vo.DraftMeta{TestRunSuccess: testRunSuccess,Modified:       true,},InputParamsStr:  inputParams,OutputParamsStr: outputParams,CommitID:        strconv.FormatInt(commitID, 10),})
}// 用户空间权限验证
func checkUserSpace(ctx context.Context, uid int64, spaceID int64) error {spaces, err := crossuser.DefaultSVC().GetUserSpaceList(ctx, uid)if err != nil {return err}var match boolfor _, s := range spaces {if s.ID == spaceID {match = truebreak}}if !match {return fmt.Errorf("user %d does not have access to space %d", uid, spaceID)}return nil}
4. 数据持久化层

MySQL数据库操作

  • 表名workflow_metaworkflow_draft
  • 操作类型:INSERT操作
  • 事务处理:确保数据一致性
  • 创建策略:先创建元数据,再保存草稿内容
  • 索引建立:为新创建的工作流建立搜索索引
// 工作流元数据创建
func (r *WorkflowRepository) CreateMeta(ctx context.Context, meta *vo.Meta) (int64, error) {// 生成工作流IDid, err := r.idgen.GenID(ctx)if err != nil {return 0, fmt.Errorf("生成工作流ID失败: %w", err)}// 插入工作流元数据记录query := `INSERT INTO workflow_meta (id, creator_id, space_id, content_type, name, description,icon_uri, app_id, mode, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`now := time.Now()_, err = r.db.ExecContext(ctx, query,id, meta.CreatorID, meta.SpaceID, meta.ContentType,meta.Name, meta.Desc, meta.IconURI, meta.AppID, meta.Mode,now.UnixMilli(), now.UnixMilli())if err != nil {return 0, fmt.Errorf("创建工作流元数据失败: %w", err)}return id, nil
}// 创建或更新工作流草稿
func (r *WorkflowRepository) CreateOrUpdateDraft(ctx context.Context, workflowID int64, draft *vo.DraftInfo) error {query := `INSERT INTO workflow_draft (workflow_id, canvas, test_run_success, modified,input_params, output_params, commit_id, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)ON DUPLICATE KEY UPDATEcanvas = VALUES(canvas),test_run_success = VALUES(test_run_success),modified = VALUES(modified),input_params = VALUES(input_params),output_params = VALUES(output_params),commit_id = VALUES(commit_id),updated_at = VALUES(updated_at)`now := time.Now()_, err := r.db.ExecContext(ctx, query,workflowID, draft.Canvas, draft.DraftMeta.TestRunSuccess,draft.DraftMeta.Modified, draft.InputParamsStr,draft.OutputParamsStr, draft.CommitID,now.UnixMilli(), now.UnixMilli())if err != nil {return fmt.Errorf("创建或更新工作流草稿失败: %w", err)}return nil
}
5. 事件发布层(EventPublisher)

文件位置backend/application/workflow/eventbus.go

创建事件发布流程

// 工作流资源事件发布
func PublishWorkflowResource(ctx context.Context, workflowID int64, mode *int32, op search.OpType, r *search.ResourceDocument) error {if r == nil {r = &search.ResourceDocument{}}// 1. 设置资源基本信息r.ResType = common.ResType_Workflowr.ResID = workflowIDr.ResSubType = mode// 2. 构建资源领域事件event := &entity.ResourceDomainEvent{OpType:   entity.OpType(op),Resource: r,}// 3. 设置时间戳信息if op == search.Created {event.Resource.CreateTimeMS = r.CreateTimeMSevent.Resource.UpdateTimeMS = r.UpdateTimeMS} else if op == search.Updated {event.Resource.UpdateTimeMS = r.UpdateTimeMS}// 4. 发布资源事件err := eventBus.PublishResources(ctx, event)if err != nil {return err}return nil
}// 异步创建事件处理器
func (h *PluginEventHandler) HandlePluginCreatedEvent(ctx context.Context, event *entity.PluginCreatedEvent) error {// 1. 建立ElasticSearch索引err := h.addToESIndex(ctx, event)if err != nil {logs.CtxErrorf(ctx, "Failed to add to ES index: %v", err)return err}// 2. 更新缓存数据err = h.updateCache(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to update cache: %v", err)}// 3. 初始化插件配置err = h.initializePluginConfig(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to initialize plugin config: %v", err)}// 4. 发送创建通知err = h.sendCreateNotification(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to send create notification: %v", err)}// 5. 更新统计数据err = h.updateCreateStatistics(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to update create statistics: %v", err)}return nil
}// ElasticSearch索引创建
func (h *PluginEventHandler) addToESIndex(ctx context.Context, event *entity.PluginCreatedEvent) error {// 构建ES文档doc := map[string]interface{}{"id":          event.PluginID,"space_id":    event.SpaceID,"creator_id":  event.CreatorID,"name":        event.Name,"description": event.Description,"plugin_type": event.PluginType,"server_url":  event.ServerURL,"created_at":  event.CreatedAt,"status":      "draft",}// 添加到ES索引err := h.esClient.Index(ctx, "coze_resource", fmt.Sprintf("%d", event.PluginID), doc)if err != nil {return fmt.Errorf("创建ES索引失败: %w", err)}logs.CtxInfof(ctx, "Added plugin to ES index, pluginID=%d", event.PluginID)return nil
}

创建事件处理内容

  • 索引建立:在ElasticSearch中创建插件文档索引
  • 缓存更新:更新相关的缓存数据
  • 配置初始化:初始化插件相关的配置文件
  • 通知发送:向相关用户发送创建成功通知
  • 统计更新:更新插件创建相关的统计数据
6. 响应数据结构

RegisterPluginMetaResponse

type RegisterPluginMetaResponse struct {Code      int64  `json:"code"`       // 响应码Msg       string `json:"msg"`        // 响应消息Success   bool   `json:"success"`    // 创建是否成功PluginID  int64  `json:"plugin_id"`  // 新创建的插件IDCreatedAt int64  `json:"created_at"` // 创建时间戳BaseResp  *base.BaseResp `json:"base_resp"` // 基础响应信息
}

RegisterPluginMetaRequest请求结构

type RegisterPluginMetaRequest struct {PluginType   int32  `json:"plugin_type" binding:"required"`   // 插件类型SpaceID      int64  `json:"space_id" binding:"required"`      // 工作空间IDName         string `json:"name" binding:"required"`          // 插件名称Desc         string `json:"desc,omitempty"`                   // 插件描述URL          string `json:"url,omitempty"`                    // 服务器URLAuthType     int32  `json:"auth_type" binding:"required"`     // 认证类型SubAuthType  *int32 `json:"sub_auth_type,omitempty"`          // 认证子类型Location     string `json:"location,omitempty"`               // 参数位置Key          string `json:"key,omitempty"`                    // 认证密钥ServiceToken string `json:"service_token,omitempty"`          // 服务令牌ProjectID    *int64 `json:"project_id,omitempty"`             // 项目IDIcon         *Icon  `json:"icon,omitempty"`                   // 插件图标CommonParams string `json:"common_params,omitempty"`          // 通用参数OauthInfo    *OAuthInfo `json:"oauth_info,omitempty"`         // OAuth信息AuthPayload  string `json:"auth_payload,omitempty"`           // 认证载荷
}

PluginCreatedEvent事件结构

type PluginCreatedEvent struct {PluginID    int64     `json:"plugin_id"`    // 插件IDSpaceID     int64     `json:"space_id"`     // 工作空间IDCreatorID   int64     `json:"creator_id"`   // 创建者IDName        string    `json:"name"`         // 插件名称Description string    `json:"description"`  // 插件描述PluginType  int32     `json:"plugin_type"`  // 插件类型ServerURL   string    `json:"server_url"`   // 服务器URLCreatedAt   time.Time `json:"created_at"`   // 创建时间EventType   string    `json:"event_type"`   // 事件类型
}

响应内容说明

  • 成功响应:返回创建成功状态和新创建的插件ID
  • 错误响应:返回具体的错误码和错误信息(如权限不足、参数无效等)
  • 参数验证:确保所有必需参数都已提供且格式正确
  • 时间戳:记录创建操作的具体时间
http://www.dtcms.com/a/389756.html

相关文章:

  • OneTerm开源堡垒机实战(四):访问授权与安全管控
  • 【赵渝强老师】基于PostgreSQL的MPP集群:Greenplum
  • leetCode算法题记录:27.移除元素
  • 自动化运维工具ansible
  • Roo Code 设置导入、导出与重置
  • 视觉检测技术讲解
  • LibreCAD-2.2+QT5.12+RTKLIB2.4.3
  • Pydantic Schemas 及其在 FastAPI 中的作用
  • SMS05 TVS二极管阵列的ESD和闭锁保护SOT23-6封装
  • Stream的常用API应用场景
  • 【DMA】DMA实战:用DMA操控外设
  • 深入理解传输层协议:UDP 与 TCP 的核心原理与应用
  • 教育行业数字化资料管理:构建安全合规、高效协同的一体化知识共享平台
  • Smart Launcher安卓版(安卓桌面启动器):安卓设备的智能启动器
  • Ansible如何写Callback 插件
  • 自动化测试框架需要具备哪些功能?
  • Pix2Pix中的对抗损失与L1损失:高频细节与低频结构的平衡艺术
  • mkcert生成证书本地或内网使用https
  • 【Python】关于移除Conda中已搭建环境的相关问题
  • 基于SpringBoot+Vue的校园兼职管理系统(WebSocket及时通讯、地图API、Echarts图形化分析)
  • 【K8S默认容器运行时】
  • Makefile学习(二)- 语法(变量、伪目标)
  • Winform自定义无边框窗体
  • 文献综述是什么?怎么写好一篇综述?
  • CLIP:开启多模态AI新时代的密钥(上)
  • @[TOC](位运算) # 常见位运算总结
  • 【Block总结】sMLP,全新的“稀疏MLP”模块|即插即用|原模型改进
  • TDengine IDMP 基本功能——数据可视化(4. 仪表盘)
  • 亚信安全与中国联通共同打造的联通联信一体化安全检测与响应平台亮相网安周
  • 短脉冲计数