当前位置: 首页 > news >正文

Coze源码分析-资源库-编辑工作流-后端源码-数据存储/安全/错误

7. 数据存储层

7.1 MySQL数据库表结构

工作流编辑相关的MySQL表结构设计,包含了版本控制、锁定状态等编辑相关字段。

-- 工作流草稿表
CREATE TABLE `workflow_draft` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',`workflow_id` varchar(64) NOT NULL COMMENT '工作流唯一标识',`space_id` bigint(20) NOT NULL COMMENT '所属空间ID',`name` varchar(100) NOT NULL COMMENT '工作流名称',`description` text COMMENT '工作流描述',`creator_id` bigint(20) NOT NULL COMMENT '创建者ID',`editor_id` bigint(20) NOT NULL COMMENT '最后编辑者ID',`version` varchar(64) NOT NULL COMMENT '版本号(乐观锁)',`edit_count` bigint(20) NOT NULL DEFAULT '0' COMMENT '编辑次数',`lock_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁定状态:0-未锁定,1-已锁定',`lock_owner_id` bigint(20) DEFAULT NULL COMMENT '锁定所有者ID',`lock_expire_at` bigint(20) DEFAULT NULL COMMENT '锁定过期时间戳',`status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态:0-草稿,1-已发布',`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_workflow_id` (`workflow_id`),KEY `idx_space_id` (`space_id`),KEY `idx_creator_id` (`creator_id`),KEY `idx_editor_id` (`editor_id`),KEY `idx_version` (`version`),KEY `idx_lock_status` (`lock_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工作流草稿表';-- 画布信息表
CREATE TABLE `canvas_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',`workflow_id` varchar(64) NOT NULL COMMENT '工作流唯一标识',`nodes` json DEFAULT NULL COMMENT '节点数据',`edges` json DEFAULT NULL COMMENT '边数据',`config` json DEFAULT NULL COMMENT '画布配置',`version` bigint(20) NOT NULL DEFAULT '1' COMMENT '版本号',`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_workflow_id` (`workflow_id`),KEY `idx_version` (`version`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='画布信息表';

7.2 ElasticSearch索引架构

工作流编辑相关的ElasticSearch索引设计,支持实时搜索和过滤。

// 工作流ES索引映射
func (s *WorkflowSearchService) CreateWorkflowIndex(ctx context.Context) error {mappings := map[string]interface{}{"mappings": map[string]interface{}{"properties": map[string]interface{}{"res_id": map[string]interface{}{"type": "long",},"res_type": map[string]interface{}{"type": "integer",},"workflow_id": map[string]interface{}{"type": "keyword",},"space_id": map[string]interface{}{"type": "long",},"name": map[string]interface{}{"type": "text","analyzer": "ik_max_word",},"description": map[string]interface{}{"type": "text","analyzer": "ik_max_word",},"creator_id": map[string]interface{}{"type": "long",},"editor_id": map[string]interface{}{"type": "long",},"version": map[string]interface{}{"type": "keyword",},"edit_count": map[string]interface{}{"type": "long",},"create_time": map[string]interface{}{"type": "long",},"update_time": map[string]interface{}{"type": "long",},"status": map[string]interface{}{"type": "integer",},},},}return s.esClient.CreateIndex(ctx, "coze_resource", mappings)
}

7.3 数据同步机制

工作流编辑后的数据同步机制,确保数据库和搜索索引的一致性。

// 工作流编辑事件处理
func (h *resourceHandlerImpl) HandleWorkflowEditEvent(ctx context.Context, event *WorkflowEditEvent) error {// 1. 准备索引更新数据doc := map[string]interface{}{"res_id":      event.WorkflowID,"res_type":    2, // 工作流类型"workflow_id": event.WorkflowID,"space_id":    event.SpaceID,"name":        event.Name,"description": event.Description,"creator_id":  event.CreatorID,"editor_id":   event.EditorID,"version":     event.Version,"edit_count":  event.EditCount,"update_time": event.UpdatedAt.Unix(),"status":      event.Status,}// 2. 执行ES索引更新err := h.esClient.Update(ctx, "coze_resource", fmt.Sprintf("%d", event.WorkflowID), doc)if err != nil {// 记录失败并尝试重试return h.recordFailedSync(ctx, event, err)}// 3. 更新缓存cacheKey := fmt.Sprintf("workflow:%d", event.WorkflowID)h.cacheClient.Delete(ctx, cacheKey)// 4. 同步锁定状态if err := h.syncWorkflowLockStatus(ctx, event.WorkflowID); err != nil {logs.CtxWarnf(ctx, "同步锁定状态失败: %v", err)}return nil
}// 同步工作流锁定状态
func (h *resourceHandlerImpl) syncWorkflowLockStatus(ctx context.Context, workflowID int64) error {// 实现锁定状态同步逻辑// ...return nil
}// 记录同步失败
func (h *resourceHandlerImpl) recordFailedSync(ctx context.Context, event *WorkflowEditEvent, err error) error {// 记录失败并安排重试logs.CtxErrorf(ctx, "同步工作流失败: %v, event: %+v", err, event)// 实现重试逻辑// ...return err
}

8. 工作流编辑安全和权限验证机制

8.1 JWT身份认证

JWT Token验证中间件,确保用户身份的合法性。

// JWT验证中间件
func JWTAuthMiddleware() app.HandlerFunc {return func(c *app.RequestContext) {token := c.GetHeader("Authorization")if token == "" {c.AbortWithStatusJSON(consts.StatusUnauthorized, common.ErrorResponse{Code:    int64(consts.StatusUnauthorized),Message: "未提供认证令牌",})return}// 移除Bearer前缀if strings.HasPrefix(token, "Bearer ") {token = token[7:]}// 解析Tokenclaims, err := ParseToken(token)if err != nil {c.AbortWithStatusJSON(consts.StatusUnauthorized, common.ErrorResponse{Code:    int64(consts.StatusUnauthorized),Message: "无效的认证令牌",})return}// 将用户信息存入上下文c.Set("user_id", claims.UserID)c.Set("user_name", claims.UserName)c.Next()}
}// ParseToken 解析JWT Token
func ParseToken(tokenString string) (*Claims, error) {// 实现Token解析逻辑// ...return claims, nil
}

8.2 工作空间权限控制

工作空间级别的权限控制,确保用户只能访问有权限的工作空间。

// 工作空间访问权限验证
func (s *SpaceDomainService) HasSpaceAccess(ctx context.Context, userID, spaceID int64) (bool, error) {// 检查用户是否为空间成员var count int64err := s.db.Model(&SpaceMember{}).Where("space_id = ? AND user_id = ?", spaceID, userID).Count(&count).Errorif err != nil {return false, fmt.Errorf("检查空间访问权限失败: %w", err)}return count > 0, nil
}// 检查是否为空间管理员
func (s *SpaceDomainService) IsSpaceAdmin(ctx context.Context, userID, spaceID int64) (bool, error) {var count int64err := s.db.Model(&SpaceMember{}).Where("space_id = ? AND user_id = ? AND role = ?", spaceID, userID, model.RoleAdmin).Count(&count).Errorif err != nil {return false, fmt.Errorf("检查空间管理员权限失败: %w", err)}return count > 0, nil
}

8.3 工作流编辑权限验证

工作流级别的编辑权限验证,确保用户只能编辑有权限的工作流。

// 资源级权限验证
func (s *WorkflowDomainService) validateEditPermission(ctx context.Context, userID int64, workflow *Workflow) error {// 1. 检查是否为工作流所有者if workflow.CreatorID == userID {return nil}// 2. 检查是否为空间管理员isAdmin, err := s.spaceService.IsSpaceAdmin(ctx, userID, workflow.SpaceID)if err != nil {return fmt.Errorf("检查空间管理员权限失败: %w", err)}if isAdmin {return nil}// 3. 检查是否为工作流编辑者isEditor, err := s.workflowRepo.IsEditor(ctx, workflow.ID, userID)if err != nil {return fmt.Errorf("检查编辑者权限失败: %w", err)}if isEditor {return nil}return errorx.New(errno.ErrWorkflowPermissionCode, errorx.KV("msg", "用户无权限编辑此工作流"),errorx.KV("user_id", userID),errorx.KV("workflow_id", workflow.ID),errorx.KV("required_role", model.RoleDeveloper))
}

8.4 工作流编辑API访问控制

工作流编辑API的访问控制,包括频率限制、参数验证等安全措施。

// 编辑请求频率限制
func RateLimitMiddleware() app.HandlerFunc {return func(c *app.RequestContext) {userID := c.GetInt64("user_id")key := fmt.Sprintf("rate_limit:workflow_edit:%d", userID)// 使用Redis实现频率限制current, err := redisClient.Incr(ctx, key).Result()if err != nil {c.Next()return}if current == 1 {// 设置过期时间redisClient.Expire(ctx, key, time.Minute)}// 限制每分钟最多10次编辑请求if current > 10 {c.AbortWithStatusJSON(consts.StatusTooManyRequests, common.ErrorResponse{Code:    int64(consts.StatusTooManyRequests),Message: "编辑请求过于频繁,请稍后再试",})return}c.Next()}
}// 编辑操作安全验证
func (s *WorkflowEditValidator) ValidateWorkflowEdit(ctx context.Context, req *UpdateWorkflowRequest, userID int64) error {// 1. 身份验证if userID == 0 {return errors.New("用户未登录,无法编辑工作流")}// 2. 权限检查if !s.permissionChecker.CanEditWorkflow(ctx, userID, req.WorkflowID) {return errors.New("用户没有编辑该工作流的权限")}// 3. 锁定验证if err := s.lockValidator.ValidateLock(ctx, req.WorkflowID, req.LockID, userID); err != nil {return fmt.Errorf("锁定验证失败: %w", err)}// 4. 版本验证if err := s.versionValidator.ValidateVersion(ctx, req.WorkflowID, req.Version); err != nil {return fmt.Errorf("版本验证失败: %w", err)}// 5. 参数验证if err := s.paramValidator.ValidateEditParams(req); err != nil {return fmt.Errorf("参数验证失败: %w", err)}// 6. 画布数据安全性检查if req.CanvasData != "" {if err := s.validateCanvasDataSecurity(req.CanvasData); err != nil {return fmt.Errorf("画布数据安全检查失败: %w", err)}}return nil
}

9. 工作流编辑错误处理和日志记录

9.1 工作流编辑错误类型定义

定义了工作流编辑过程中可能遇到的各种错误类型和错误码。

// 工作流编辑错误码定义
const (ErrWorkflowNotExistCode     = 10001 // 工作流不存在ErrWorkflowPermissionCode   = 10002 // 无权限编辑工作流ErrWorkflowLockMismatch     = 10003 // 锁定信息不匹配ErrWorkflowLockNotExist     = 10004 // 锁定不存在ErrWorkflowLockExpired      = 10005 // 锁定已过期ErrWorkflowVersionConflict  = 10006 // 版本冲突ErrWorkflowInvalidParamCode = 10007 // 参数无效ErrWorkflowInvalidCanvas    = 10008 // 画布数据无效ErrWorkflowSystemError      = 10009 // 系统错误
)// 工作流编辑错误类型
const (ErrorTypeBusiness  = "business"  // 业务错误ErrorTypeSystem    = "system"    // 系统错误ErrorTypeNetwork   = "network"   // 网络错误
)// WorkflowEditError 工作流编辑错误
type WorkflowEditError struct {Code       int64            `json:"code"`       // 错误码Message    string           `json:"message"`    // 错误消息ErrorType  string           `json:"error_type"` // 错误类型Details    map[string]interface{} `json:"details"`    // 错误详情Timestamp  int64            `json:"timestamp"`  // 错误时间戳RequestID  string           `json:"request_id"` // 请求ID
}// NewWorkflowEditError 创建工作流编辑错误
func NewWorkflowEditError(code int64, message string, errorType string, details map[string]interface{}) *WorkflowEditError {return &WorkflowEditError{Code:      code,Message:   message,ErrorType: errorType,Details:   details,Timestamp: time.Now().Unix(),RequestID: generateRequestID(),}
}

9.2 工作流编辑错误处理

工作流编辑过程中的错误处理机制,包括错误捕获、包装、记录和响应。

// 统一错误响应格式
type WorkflowEditErrorResponse struct {Code       int64       `json:"code"`       // 错误码Message    string      `json:"message"`    // 错误消息ErrorType  string      `json:"error_type"` // 错误类型LockInfo   *LockInfo   `json:"lock_info,omitempty"` // 锁定信息RequestID  string      `json:"request_id"` // 请求IDTimestamp  int64       `json:"timestamp"`  // 时间戳
}type LockInfo struct {LockOwnerID int64 `json:"lock_owner_id"` // 锁定所有者IDExpireTime  int64 `json:"expire_time"`  // 锁定过期时间
}// 错误处理中间件
func WorkflowEditErrorHandlerMiddleware() app.HandlerFunc {return func(c *app.RequestContext) {c.Next()// 处理工作流编辑相关错误if len(c.Errors) > 0 {for _, err := range c.Errors {if workflowErr, ok := err.Err.(*WorkflowEditError); ok {handleWorkflowEditError(c, workflowErr)return}}}}
}// 处理工作流编辑业务错误
func handleWorkflowEditBusinessError(c *app.RequestContext, err *WorkflowEditError) {resp := WorkflowEditErrorResponse{Code:      err.Code,Message:   err.Message,ErrorType: err.ErrorType,RequestID: err.RequestID,Timestamp: err.Timestamp,}// 如果是锁定相关错误,包含锁定信息if err.Code == ErrWorkflowLockMismatch || err.Code == ErrWorkflowLockExpired {if lockOwnerID, ok := err.Details["lock_owner_id"].(int64); ok {if expireTime, ok := err.Details["expire_time"].(int64); ok {resp.LockInfo = &LockInfo{LockOwnerID: lockOwnerID,ExpireTime:  expireTime,}}}}c.JSON(consts.StatusBadRequest, resp)
}// 处理工作流编辑锁定错误
func handleWorkflowEditLockError(c *app.RequestContext, err *WorkflowEditError) {resp := WorkflowEditErrorResponse{Code:      err.Code,Message:   err.Message,ErrorType: err.ErrorType,RequestID: err.RequestID,Timestamp: err.Timestamp,}// 提取锁定信息if lockInfo, ok := err.Details["lock_info"].(*LockInfo); ok {resp.LockInfo = lockInfo}c.JSON(consts.StatusConflict, resp)
}// 处理工作流编辑系统错误
func handleWorkflowEditSystemError(c *app.RequestContext, err *WorkflowEditError) {resp := WorkflowEditErrorResponse{Code:      err.Code,Message:   "系统内部错误,请稍后重试",ErrorType: err.ErrorType,RequestID: err.RequestID,Timestamp: err.Timestamp,}// 记录详细错误日志logs.CtxErrorf(c.Request.Context(), "Workflow edit system error: %v, details: %+v", err.Message, err.Details)c.JSON(consts.StatusInternalServerError, resp)
}

9.3 工作流编辑日志记录策略

工作流编辑过程中的日志记录策略,包括日志级别、格式和内容规范。

// 日志级别定义
const (LogLevelDebug = "debug"LogLevelInfo  = "info"LogLevelWarn  = "warn"LogLevelError = "error"LogLevelFatal = "fatal"
)// 结构化日志格式
func logWorkflowEditOperation(ctx context.Context, level string, msg string, fields ...interface{}) {switch level {case LogLevelDebug:logs.CtxDebugf(ctx, msg, fields...)case LogLevelInfo:logs.CtxInfof(ctx, msg, fields...)case LogLevelWarn:logs.CtxWarnf(ctx, msg, fields...)case LogLevelError:logs.CtxErrorf(ctx, msg, fields...)case LogLevelFatal:logs.CtxFatalf(ctx, msg, fields...)}
}// 工作流编辑审计日志
func logWorkflowEditAudit(ctx context.Context, userID int64, workflowID int64, action string, success bool, errorMsg string) {logFields := []interface{}{"user_id", userID,"workflow_id", workflowID,"action", action,"success", success,"timestamp", time.Now().Unix(),"request_id", getRequestID(ctx),}if !success {logFields = append(logFields, "error_msg", errorMsg)}logs.CtxInfof(ctx, "Workflow edit audit", logFields...)
}// 记录工作流编辑详细日志
func (s *WorkflowApplicationService) UpdateWorkflowMeta(ctx context.Context, req *UpdateWorkflowMetaRequest) (*UpdateWorkflowMetaResponse, error) {// 记录开始编辑日志logs.CtxInfof(ctx, "Start updating workflow meta, userID=%d, workflowID=%d, version=%s", req.UserID, req.WorkflowID, req.Version)// 记录请求参数(敏感信息脱敏)safeReq := *reqif len(safeReq.CanvasData) > 100 {safeReq.CanvasData = safeReq.CanvasData[:100] + "...(truncated)"}logs.CtxDebugf(ctx, "Update workflow meta request: %+v", safeReq)defer func() {// 记录编辑结束日志logs.CtxInfof(ctx, "Finish updating workflow meta, userID=%d, workflowID=%d", req.UserID, req.WorkflowID)}()// 执行编辑逻辑// ...return resp, nil
}
http://www.dtcms.com/a/434299.html

相关文章:

  • 什么是Java反射机制?
  • 使用Docker安装Neo4j
  • 建立网站的步骤筝晃湖南岚鸿官网深圳专业建设网站哪个公司好
  • 20软件测试需求分析评审
  • SQL 多表查询实用技巧:ON 和 WHERE 的区别速览
  • 网站备案 内容央企八大设计院
  • 从汇编角度看C++优化:编译器真正做了什么
  • 分布式专题——25 深入理解网络通信和TCP、IP协议
  • UV python多版本管理
  • Schema是什么?
  • 许昌做网站优化wordpress 控制每页显示文章数
  • MAX31865模块和PT100实现温度测量使用配置笔记教程
  • Elasticsearch MCP 服务器:与你的 Index 聊天
  • 【ROS2学习笔记】话题通信篇:话题通信再探
  • 网络编程中“地址重用(SO_REUSEADDR)”
  • 汕头网站建设推广厂家wordpress 响应式图片
  • Rust的错误处理
  • 可视化地图
  • Rust与C接口交互
  • 【C++实战(64)】C++ 邂逅SQLite3:数据库编程实战之旅
  • 泉州网页建站模板开发网址
  • 中华建设杂志网站记者管理网站英文
  • React 18+TS中使用Cesium 1.95
  • View:new关键词干了什么事,还有原型链是什么
  • 如何在新的Spring Boot项目中关闭Spring Security?
  • 药企做网站需要哪些手续国内新闻最新消息今天在线
  • 【Flutter】GetX最佳实践与避坑指南
  • AIFoto 1.15.4 | AI图片工具,AI擦除衣服,变性感衣服
  • 数据合规与ISO标准体系
  • 在Ubuntu22.04系统下安装Jellyfin