当前位置: 首页 > news >正文

Coze源码分析-资源库-创建数据库-后端源码-安全与错误处理

8. 数据库系统安全和权限验证机制

8.1 数据库系统身份认证

多因素认证机制

  • 数据库操作API请求需要携带有效的JWT Token进行身份验证
  • 敏感操作强制要求二次验证(如短信验证码、邮箱验证等)
  • 通过统一的认证中间件验证用户身份和权限
// 数据库系统身份认证中间件
func DatabaseAuthMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {token := ctx.GetHeader("Authorization")if token == nil {ctx.JSON(401, gin.H{"error": "访问数据库系统需要登录认证"})ctx.Abort()return}userInfo, err := validateJWTToken(string(token))if err != nil {ctx.JSON(401, gin.H{"error": "认证Token无效"})ctx.Abort()return}// 验证用户是否有数据库访问权限if !userInfo.HasDatabaseAccessPermission {ctx.JSON(403, gin.H{"error": "用户无数据库访问权限"})ctx.Abort()return}ctx.Set("user_id", userInfo.UserID)ctx.Set("database_id", userInfo.DatabaseID)ctx.Set("access_level", userInfo.AccessLevel)ctx.Next()}
}

8.2 数据库系统权限控制

细粒度权限管理

  • 基于RBAC(基于角色的访问控制)模型实现数据库操作权限控制
  • 对数据库操作进行细粒度权限划分(读、写、管理等)
  • 支持资源级别的访问控制,如特定表、字段的访问权限
// 数据库操作权限验证
func (d *DatabaseService) validateDatabaseOperationPermission(ctx context.Context, req *service.DatabaseOperationRequest) error {userID := ctx.Value("user_id").(int64)accessLevel := ctx.Value("access_level").(string)// 验证用户是否有执行特定操作的权限operationPermission := getOperationPermission(req.OperationType)if !hasPermission(accessLevel, operationPermission) {return errors.New("用户无权限执行该数据库操作")}// 检查数据库访问配额accessQuota, err := d.getUserDatabaseAccessQuota(ctx, userID)if err != nil {return fmt.Errorf("获取用户数据库访问配额失败: %w", err)}// 检查操作频率限制if err := d.checkOperationRateLimit(ctx, userID, req.OperationType); err != nil {return err}// 验证资源访问权限(如表、字段级别)if err := d.validateResourceAccessPermission(ctx, userID, req.DatabaseID, req.Resource); err != nil {return err}return nil
}

8.3 数据库创建资源级权限验证

数据库创建用户权限验证

  • 严格验证用户是否具有数据库创建权限
  • 验证用户在指定工作空间的操作权限
  • 通过存储配额和向量空间权限进行资源级控制
// 数据库系统操作权限验证
func (d *DatabaseSystemService) validateDatabaseOperationPermission(ctx context.Context, req *service.DatabaseOperationRequest) error {userID := ctx.Value("user_id").(int64)accessLevel := ctx.Value("access_level").(string)// 验证用户是否具有数据库操作权限hasPermission, err := d.userService.HasDatabaseOperationPermission(ctx, userID, req.OperationType)if err != nil {return fmt.Errorf("验证数据库操作权限失败: %w", err)}if !hasPermission {return errorx.New(errno.ErrDatabasePermissionCode, errorx.KV(errno.DatabaseMsgKey, "用户无数据库操作权限"),errorx.KV("user_id", userID),errorx.KV("operation_type", req.OperationType))}// 验证数据库访问权限级别requiredLevel := getRequiredAccessLevel(req.OperationType)if !checkAccessLevel(accessLevel, requiredLevel) {return errorx.New(errno.ErrDatabaseAccessLevelCode, errorx.KV(errno.DatabaseMsgKey, "用户访问权限级别不足"),errorx.KV("user_id", userID),errorx.KV("current_level", accessLevel),errorx.KV("required_level", requiredLevel))}// 检查操作频率限制opCount, err := d.getUserOperationCount(ctx, userID, req.OperationType, time.Now().Add(-1*time.Hour))if err != nil {return fmt.Errorf("检查数据库操作频率失败: %w", err)}if opCount >= getOperationRateLimit(req.OperationType) {return errorx.New(errno.ErrDatabaseOperationRateLimitCode, errorx.KV("user_id", userID),errorx.KV("operation_type", req.OperationType),errorx.KV("operation_count", opCount))}// 验证资源访问权限resourcePermission, err := d.checkResourceAccessPermission(ctx, userID, req.DatabaseID, req.Resource)if err != nil {return fmt.Errorf("检查资源访问权限失败: %w", err)}if !resourcePermission.CanAccess {return errorx.New(errno.ErrDatabaseResourcePermissionCode, errorx.KV(errno.DatabaseMsgKey, "用户无资源访问权限"),errorx.KV("user_id", userID),errorx.KV("resource_name", req.Resource.Name))}// 检查数据安全策略合规性if err := d.checkDataSecurityPolicy(ctx, req); err != nil {return err}// 验证数据操作审计配置if err := d.validateAuditConfig(ctx, req); err != nil {return err}return nil
}// 检查数据库名称是否存在
func (s *DatabaseApplicationService) checkDatabaseNameExists(ctx context.Context, spaceID int64, name string) (bool, error) {// 检查同一工作空间下是否存在同名数据库databases, err := s.DomainSVC.ListDatabases(ctx, &service.ListDatabasesRequest{SpaceID: spaceID,PageInfo: entity.PageInfo{PageSize: 1},})if err != nil {return false, err}for _, database := range databases.Databases {if database.Name == name {return true, nil}}return false, nil
}// 检查存储配额
func (s *DatabaseApplicationService) checkStorageQuota(ctx context.Context, userID, spaceID int64) (*StorageQuotaInfo, error) {// 获取用户存储配额信息quota, err := s.storageService.GetUserStorageQuota(ctx, userID)if err != nil {return nil, err}// 获取当前使用量usage, err := s.storageService.GetUserStorageUsage(ctx, userID)if err != nil {return nil, err}return &StorageQuotaInfo{UsedStorage: usage,MaxStorage:  quota,CanCreate:   usage < quota*0.95, // 使用率不超过95%}, nil
}// 检查向量空间权限
func (s *DatabaseApplicationService) checkVectorSpacePermission(ctx context.Context, userID int64, embeddingModel string) (*VectorSpacePermission, error) {// 检查用户是否有权限使用指定的嵌入模型modelPermission, err := s.embeddingService.CheckModelPermission(ctx, userID, embeddingModel)if err != nil {return nil, err}// 检查向量空间创建配额spaceCount, err := s.vectorService.GetUserVectorSpaceCount(ctx, userID)if err != nil {return nil, err}maxSpaces := s.getUserMaxVectorSpaces(userID)return &VectorSpacePermission{CanCreateSpace: modelPermission && spaceCount < maxSpaces,CurrentSpaces:  spaceCount,MaxSpaces:      maxSpaces,}, nil
}

8.4 数据库创建API访问控制

创建请求频率限制

  • 实现基于用户的数据库创建频率限制
  • 防止恶意批量创建数据库
  • 支持不同用户等级的差异化创建限流策略
  • 基于文档处理能力的动态限流

创建操作安全验证

  • 严格验证创建请求的合法性
  • 防止恶意创建和资源滥用攻击
  • 使用多重安全检查机制
  • 文档内容安全扫描和验证
  • 向量空间创建安全验证
// 数据库创建参数验证
func validateDatabaseCreateRequest(req *service.CreateDatabaseRequest) error {if req.SpaceID <= 0 {return errors.New("无效的工作空间ID")}if req.CreatorID <= 0 {return errors.New("无效的创建者ID")}// 验证知识库名称if req.Name == "" {return errors.New("知识库名称不能为空")}if len(req.Name) > 100 {return errors.New("知识库名称长度不能超过100字符")}// 验证知识库描述if req.Description != "" && len(req.Description) > 1000 {return errors.New("知识库描述长度不能超过1000字符")}// 验证嵌入模型if req.EmbeddingModel == "" {return errors.New("嵌入模型不能为空")}if !isValidEmbeddingModel(req.EmbeddingModel) {return errors.New("不支持的嵌入模型")}// 验证分块策略if req.ChunkStrategy != nil {if req.ChunkStrategy.ChunkSize <= 0 || req.ChunkStrategy.ChunkSize > 8192 {return errors.New("分块大小必须在1-8192之间")}if req.ChunkStrategy.ChunkOverlap < 0 || req.ChunkStrategy.ChunkOverlap >= req.ChunkStrategy.ChunkSize {return errors.New("分块重叠大小必须小于分块大小")}}// 验证图标URIif req.IconURI != "" && !isValidIconURI(req.IconURI) {return errors.New("无效的图标URI格式")}return nil
}// 数据库创建操作安全检查
func (s *KnowledgeApplicationService) validateDatabaseCreateSafety(ctx context.Context, req *service.CreateKnowledgeRequest) error {userID := ctx.Value("user_id").(int64)// 检查用户数据库创建频率限制createCount, err := s.getUserDatabaseCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))if err != nil {return fmt.Errorf("检查数据库创建频率失败: %w", err)}if createCount >= 20 { // 24小时内最多创建20个知识库return errorx.New(errno.ErrDatabaseCreateRateLimitCode, errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 检查嵌入模型可用性modelAvailable, err := s.checkEmbeddingModelAvailable(ctx, req.EmbeddingModel)if err != nil {return fmt.Errorf("检查嵌入模型可用性失败: %w", err)}if !modelAvailable {return errors.New("嵌入模型当前不可用")}// 检查用户存储配额storageQuota, err := s.checkUserStorageQuota(ctx, userID)if err != nil {return fmt.Errorf("检查用户存储配额失败: %w", err)}if !storageQuota.CanCreateKnowledge {return errorx.New(errno.ErrDatabaseStorageQuotaExceededCode, errorx.KV("user_id", userID),errorx.KV("used_storage", storageQuota.UsedStorage),errorx.KV("max_storage", storageQuota.MaxStorage))}// 检查向量数据库连接vectorDBHealthy, err := s.checkVectorDatabaseHealth(ctx)if err != nil {return fmt.Errorf("检查向量数据库健康状态失败: %w", err)}if !vectorDBHealthy {return errors.New("向量数据库当前不可用,无法创建知识库")}// 检查文档处理服务状态docProcessorHealthy, err := s.checkDocumentProcessorHealth(ctx)if err != nil {return fmt.Errorf("检查文档处理服务状态失败: %w", err)}if !docProcessorHealthy {return errors.New("文档处理服务当前不可用,无法创建知识库")}return nil
}// 检查嵌入模型可用性
func (s *KnowledgeApplicationService) checkEmbeddingModelAvailable(ctx context.Context, modelName string) (bool, error) {// 检查模型是否在支持列表中supportedModels := []string{"text-embedding-ada-002","text-embedding-3-small","text-embedding-3-large","bge-large-zh-v1.5","bge-base-zh-v1.5",}for _, model := range supportedModels {if model == modelName {// 检查模型服务是否可用return s.embeddingService.IsModelAvailable(ctx, modelName)}}return false, nil
}// 检查向量数据库健康状态
func (s *KnowledgeApplicationService) checkVectorDatabaseHealth(ctx context.Context) (bool, error) {// 发送健康检查请求到向量数据库healthCheck := &VectorDBHealthCheck{Timeout: 5 * time.Second,}healthy, err := s.vectorService.HealthCheck(ctx, healthCheck)if err != nil {logs.CtxWarnf(ctx, "Vector database health check failed: %v", err)return false, nil}return healthy, nil
}// 检查文档处理服务健康状态
func (s *KnowledgeApplicationService) checkDocumentProcessorHealth(ctx context.Context) (bool, error) {// 检查文档处理队列状态queueStatus, err := s.documentService.GetQueueStatus(ctx)if err != nil {logs.CtxWarnf(ctx, "Document processor queue status check failed: %v", err)return false, nil}// 如果队列积压过多,认为服务不健康if queueStatus.PendingJobs > 10000 {logs.CtxWarnf(ctx, "Document processor queue overloaded: %d pending jobs", queueStatus.PendingJobs)return false, nil}return true, nil
}// 获取用户存储使用量
func (s *PluginApplicationService) getUserStorageUsage(ctx context.Context, userID int64) (int64, error) {// 查询用户所有插件的存储使用量plugins, err := s.DomainSVC.ListUserPlugins(ctx, userID)if err != nil {return 0, fmt.Errorf("获取用户插件列表失败: %w", err)}var totalSize int64for _, plugin := range plugins {// 计算插件manifest和openapi_doc的存储大小if plugin.Manifest != nil {totalSize += int64(len(plugin.Manifest))}if plugin.OpenapiDoc != nil {totalSize += int64(len(plugin.OpenapiDoc))}}return totalSize, nil
}// 获取用户最大存储配额
func (s *PluginApplicationService) getMaxStorageQuota(userID int64) int64 {// 根据用户等级返回不同的存储配额// 这里简化处理,实际应该从用户配置中获取return 100 * 1024 * 1024 // 100MB
}// URL格式验证
func isValidURL(urlStr string) bool {u, err := url.Parse(urlStr)return err == nil && u.Scheme != "" && u.Host != ""
}// 插件类型验证
func isValidPluginType(pluginType common.PluginType) bool {validTypes := []common.PluginType{common.PluginTypeHTTP,common.PluginTypeLocal,}for _, validType := range validTypes {if pluginType == validType {return true}}return false
}

9. 数据库创建错误处理和日志记录

9.1 数据库创建分层错误处理机制

数据库创建错误分类体系

// 数据库创建错误类型定义
type DatabaseCreateErrorType intconst (// 数据库创建业务错误ErrDatabaseCreateBusiness DatabaseCreateErrorType = iota + 1000ErrDatabaseNameExistsErrDatabasePermissionDeniedErrDatabaseCreateRateLimitErrDatabaseInvalidParametersErrDatabaseModelNotSupportedErrDatabaseStorageQuotaExceededErrDatabaseProcessingFailedErrDatabaseInvalidFileTypeErrDatabaseFileSizeExceededErrDatabaseInvalidChunkSizeErrDatabaseInvalidIconURIErrDatabaseInvalidSpaceIDErrDatabaseDuplicateNameErrDatabaseSpaceCreateFailed// 数据库创建系统错误ErrDatabaseCreateSystem DatabaseCreateErrorType = iota + 2000ErrDatabaseDatabaseConnectionErrDatabaseSearchTimeoutErrDatabaseServiceUnavailableErrDatabaseCreateEventPublishFailedErrDatabaseIndexCreateFailedErrDatabaseTransactionRollbackFailedErrDatabaseStoreTimeoutErrDatabaseIDGenerationFailedErrDatabaseModelServiceFailedErrDatabaseContentIndexFailed// 数据库创建网络错误ErrDatabaseCreateNetwork DatabaseCreateErrorType = iota + 3000ErrDatabaseCreateRequestTimeoutErrDatabaseCreateConnectionRefusedErrDatabaseCreateServiceDownErrDatabaseCreateESConnectionFailedErrDatabaseDBConnectionFailedErrDatabaseModelAPITimeout
)

数据库创建错误处理流程

  1. 捕获阶段:在数据库创建各层级捕获具体错误
  2. 包装阶段:添加数据库创建操作相关上下文信息和错误码
  3. 记录阶段:根据错误级别记录数据库创建操作日志
  4. 响应阶段:返回用户友好的数据库创建错误信息
  5. 回滚阶段:数据库创建失败时进行必要的数据回滚操作
  6. 向量处理:处理向量空间创建失败的错误
  7. 重试机制:对于可重试的创建错误提供重试建议
  8. 用户指导:为常见创建错误提供解决方案指导

9.2 数据库创建统一错误响应格式

// 数据库创建错误响应结构
type DatabaseCreateErrorResponse struct {Code         int    `json:"code"`Message      string `json:"message"`Details      string `json:"details,omitempty"`TraceID      string `json:"trace_id"`KnowledgeID  int64  `json:"knowledge_id,omitempty"`Operation    string `json:"operation"`CanRetry     bool   `json:"can_retry"`DocumentsProcessed int    `json:"documents_processed,omitempty"`DocumentsFailed    int    `json:"documents_failed,omitempty"`ValidationErrors   []string `json:"validation_errors,omitempty"`SuggestedFix       string `json:"suggested_fix,omitempty"`FieldErrors        map[string]string `json:"field_errors,omitempty"`VectorSpaceStatus  string `json:"vector_space_status,omitempty"`EmbeddingModel     string `json:"embedding_model,omitempty"`
}// 数据库创建错误处理中间件
func DatabaseCreateErrorHandlerMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {defer func() {if err := recover(); err != nil {traceID := ctx.GetString("trace_id")userID := ctx.GetInt64("user_id")spaceID := ctx.GetInt64("space_id")logs.CtxErrorf(c, "Database creation panic recovered: %v, userID=%d, spaceID=%d, traceID=%s", err, userID, spaceID, traceID)ctx.JSON(500, DatabaseCreateErrorResponse{Code:      5000,Message:   "数据库创建服务器内部错误",TraceID:   traceID,Operation: "create_database",CanRetry:  true,SuggestedFix: "请稍后重试,如果问题持续存在请联系技术支持",})}}()ctx.Next()}
}// 插件创建业务错误处理
func handlePluginCreateBusinessError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response PluginCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_plugin"switch {case errors.Is(err, errno.ErrPluginInvalidParamCode):response.Code = 400response.Message = "插件参数无效"response.CanRetry = falseresponse.SuggestedFix = "请检查插件名称、描述、服务器URL等参数是否正确"case errors.Is(err, errno.ErrPluginPermissionCode):response.Code = 403response.Message = "无权限创建插件"response.CanRetry = falseresponse.SuggestedFix = "请确保已登录且具有插件创建权限"case errors.Is(err, errno.ErrPluginInvalidManifest):response.Code = 400response.Message = "插件清单格式无效"response.CanRetry = falseresponse.SuggestedFix = "请检查插件清单文件格式是否符合规范"case errors.Is(err, errno.ErrPluginInvalidOpenapi3Doc):response.Code = 400response.Message = "OpenAPI文档格式无效"response.CanRetry = falseresponse.SuggestedFix = "请检查OpenAPI文档格式是否符合OpenAPI 3.0规范"case errors.Is(err, errno.ErrPluginIDExist):response.Code = 409response.Message = "插件ID已存在"response.CanRetry = falseresponse.SuggestedFix = "请使用不同的插件名称或检查是否已存在同名插件"case errors.Is(err, errno.ErrPluginCreateRateLimit):response.Code = 429response.Message = "创建操作过于频繁,请稍后再试"response.CanRetry = trueresponse.SuggestedFix = "请等待一段时间后重试"case errors.Is(err, errno.ErrPluginStorageQuotaExceeded):response.Code = 413response.Message = "存储配额已满"response.CanRetry = falseresponse.SuggestedFix = "请清理不需要的插件或升级存储配额"case errors.Is(err, errno.ErrPluginServerURLNotAccessible):response.Code = 400response.Message = "插件服务器URL不可访问"response.CanRetry = trueresponse.SuggestedFix = "请检查服务器URL是否正确且可访问"default:response.Code = 500response.Message = "插件创建失败"response.CanRetry = trueresponse.SuggestedFix = "请稍后重试,如果问题持续存在请联系技术支持"}ctx.JSON(response.Code, response)
}// 插件创建系统错误处理
func handlePluginCreateSystemError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response PluginCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_plugin"switch {case errors.Is(err, errno.ErrPluginDatabaseConnection):response.Code = 500response.Message = "插件数据库连接失败"response.CanRetry = trueresponse.SuggestedFix = "数据库连接异常,请稍后重试"case errors.Is(err, errno.ErrPluginElasticSearchTimeout):response.Code = 500response.Message = "插件索引操作超时"response.CanRetry = trueresponse.SuggestedFix = "搜索服务响应超时,请稍后重试"case errors.Is(err, errno.ErrPluginServiceUnavailable):response.Code = 503response.Message = "插件创建服务暂时不可用"response.CanRetry = trueresponse.SuggestedFix = "服务正在维护中,请稍后重试"case errors.Is(err, errno.ErrPluginCreateEventPublishFailed):response.Code = 500response.Message = "插件创建事件发布失败"response.CanRetry = trueresponse.SuggestedFix = "事件发布异常,插件已创建但可能影响搜索,请稍后重试"case errors.Is(err, errno.ErrPluginIndexCreateFailed):response.Code = 500response.Message = "插件索引创建失败"response.CanRetry = trueresponse.SuggestedFix = "搜索索引创建失败,插件已创建但可能无法搜索到"case errors.Is(err, errno.ErrPluginTransactionRollbackFailed):response.Code = 500response.Message = "插件创建事务回滚失败"response.CanRetry = falseresponse.SuggestedFix = "数据一致性异常,请联系技术支持"case errors.Is(err, errno.ErrPluginIDGenerationFailed):response.Code = 500response.Message = "插件ID生成失败"response.CanRetry = trueresponse.SuggestedFix = "ID生成服务异常,请稍后重试"default:response.Code = 5000response.Message = "插件创建失败"response.Details = "服务器内部错误,请稍后重试"response.CanRetry = trueresponse.SuggestedFix = "系统内部错误,请稍后重试或联系技术支持"}ctx.JSON(response.Code, response)
}

9.3 数据库创建日志记录策略

数据库创建日志级别定义

  • DEBUG:数据库创建详细调试信息,包括参数值、向量处理过程、文档分块详情
  • INFO:数据库创建关键业务流程信息,如创建开始、参数验证、数据插入、向量空间创建
  • WARN:数据库创建潜在问题警告,如存储配额警告、文档处理警告、向量生成警告
  • ERROR:数据库创建错误信息,包括创建失败、权限错误、向量空间创建失败
  • FATAL:数据库创建严重错误,可能导致数据不一致或向量空间损坏

数据库创建结构化日志格式

// 数据库创建日志记录示例
func (s *KnowledgeApplicationService) CreateKnowledge(ctx context.Context, req *knowledgeAPI.CreateDatasetRequest) (*knowledgeAPI.CreateDatasetResponse, error) {traceID := generateTraceID()ctx = context.WithValue(ctx, "trace_id", traceID)userID := ctxutil.GetUIDFromCtx(ctx)// 记录数据库创建开始logs.CtxInfof(ctx, "CreateKnowledge started, userID=%d, knowledgeName=%s, spaceID=%d, embeddingModel=%s, traceID=%s", userID, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), traceID)startTime := time.Now()defer func() {duration := time.Since(startTime)logs.CtxInfof(ctx, "CreateKnowledge completed, duration=%dms, traceID=%s", duration.Milliseconds(), traceID)}()// 记录关键步骤logs.CtxInfof(ctx, "Validating knowledge create parameters, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, traceID=%s", req.GetName(), req.GetEmbeddingModel(), req.GetChunkSize(), traceID)// 权限验证日志logs.CtxInfof(ctx, "Validating knowledge create permission, userID=%d, spaceID=%d, traceID=%s", userID, req.GetSpaceID(), traceID)// 存储配额检查日志logs.CtxInfof(ctx, "Checking storage quota, userID=%d, traceID=%s", userID, traceID)// 向量空间创建日志logs.CtxInfof(ctx, "Creating vector space, embeddingModel=%s, dimensions=%d, traceID=%s", req.GetEmbeddingModel(), getModelDimensions(req.GetEmbeddingModel()), traceID)// 数据库创建操作日志logs.CtxInfof(ctx, "Creating knowledge in database, knowledgeName=%s, traceID=%s", req.GetName(), traceID)// ElasticSearch索引创建日志logs.CtxInfof(ctx, "Creating ElasticSearch index, knowledgeID=%d, traceID=%s", knowledgeID, traceID)// 事件发布日志logs.CtxInfof(ctx, "Publishing knowledge create event, knowledgeID=%d, traceID=%s", knowledgeID, traceID)return resp, nil
}// 数据库创建操作审计日志
func (s *KnowledgeApplicationService) logDatabaseCreateAudit(ctx context.Context, operation string, knowledgeID int64, details map[string]interface{}) {userID := ctx.Value("user_id").(int64)spaceID := ctx.Value("space_id").(int64)traceID := ctx.Value("trace_id").(string)auditLog := map[string]interface{}{"operation":       operation,"knowledge_id":    knowledgeID,"user_id":         userID,"space_id":        spaceID,"trace_id":        traceID,"timestamp":       time.Now().Unix(),"details":         details,"knowledge_name":  details["knowledge_name"],"embedding_model": details["embedding_model"],"chunk_size":      details["chunk_size"],"chunk_overlap":   details["chunk_overlap"],"vector_space_id": details["vector_space_id"],"storage_used":    details["storage_used"],}logs.CtxInfof(ctx, "Knowledge create audit log: %+v", auditLog)
}// 文档处理日志记录
func (s *KnowledgeApplicationService) logDocumentProcessing(ctx context.Context, knowledgeID int64, documentID int64, operation string, details map[string]interface{}) {traceID := ctx.Value("trace_id").(string)docLog := map[string]interface{}{"operation":     operation,"knowledge_id":  knowledgeID,"document_id":   documentID,"trace_id":      traceID,"timestamp":     time.Now().Unix(),"details":       details,"file_name":     details["file_name"],"file_size":     details["file_size"],"chunk_count":   details["chunk_count"],"vector_count":  details["vector_count"],"processing_time": details["processing_time"],}logs.CtxInfof(ctx, "Document processing log: %+v", docLog)
}// 向量空间操作日志
func (s *KnowledgeApplicationService) logVectorSpaceOperation(ctx context.Context, operation string, vectorSpaceID string, details map[string]interface{}) {traceID := ctx.Value("trace_id").(string)vectorLog := map[string]interface{}{"operation":        operation,"vector_space_id":  vectorSpaceID,"trace_id":         traceID,"timestamp":        time.Now().Unix(),"details":          details,"embedding_model":  details["embedding_model"],"dimensions":       details["dimensions"],"vector_count":     details["vector_count"],"index_type":       details["index_type"],}logs.CtxInfof(ctx, "Vector space operation log: %+v", vectorLog)
}

数据库创建日志内容规范

  • 请求日志:记录用户ID、工作空间ID、知识库名称、嵌入模型、分块策略、TraceID
  • 业务日志:记录数据库创建步骤、参数验证结果、权限验证结果、向量空间创建过程
  • 性能日志:记录创建接口响应时间、数据库插入时间、向量空间创建时间、文档处理时间
  • 错误日志:记录创建错误堆栈、知识库相关上下文信息、向量处理失败原因
  • 审计日志:记录知识库的创建操作、创建参数、创建结果、关联的文档和向量信息
  • 安全日志:记录创建频率、权限验证、存储配额检查、可疑创建行为
  • 文档处理日志:记录文档上传、分块处理、向量生成、索引创建等详细过程
  • 向量空间日志:记录向量空间创建、配置、索引构建、查询性能等信息

9.4 数据库创建监控和告警

数据库创建关键指标监控

  • 创建性能:数据库创建响应时间、创建成功率、创建QPS、创建吞吐量
  • 资源使用:数据库连接数、向量空间创建延迟、内存使用率、文档处理队列长度
  • 业务指标:数据库创建成功率、创建频率分布、不同嵌入模型使用比例、用户创建活跃度
  • 安全指标:权限验证通过率、恶意创建尝试次数、创建频率限制触发次数、存储配额检查失败率
  • 质量指标:向量空间创建成功率、文档处理成功率、嵌入模型响应率、索引创建成功率
  • 存储指标:存储使用量、向量数量、文档数量、索引大小、存储增长率
  • 向量处理指标:向量生成延迟、向量维度分布、嵌入模型调用次数、向量相似度计算性能

数据库创建告警策略

  • 创建失败率告警:当数据库创建失败率超过3%时触发告警
  • 性能告警:当数据库创建响应时间超过10秒时触发告警
  • 资源告警:当数据库连接数超过80%或向量数据库连接异常时触发告警
  • 安全告警:当检测到异常创建行为或存储配额滥用时立即触发告警
  • 数据一致性告警:当MySQL、ES和向量数据库创建状态不一致时触发告警
  • 配额告警:当用户存储使用量超过90%时触发告警
  • 向量服务告警:当嵌入模型服务不可用或响应超时时触发告警
  • 文档处理告警:当文档处理队列积压超过阈值时触发告警
// 数据库创建监控指标收集
type DatabaseCreateMetrics struct {CreateSuccessCount      int64         // 创建成功次数CreateFailureCount      int64         // 创建失败次数CreateLatency           time.Duration // 创建延迟PermissionDeniedCount   int64         // 权限拒绝次数RateLimitCount          int64         // 频率限制次数ParameterValidationFailCount int64    // 参数验证失败次数VectorSpaceCreateLatency time.Duration // 向量空间创建延迟VectorSpaceCreateFailCount int64      // 向量空间创建失败次数DocumentProcessingLatency time.Duration // 文档处理延迟EmbeddingGenerationLatency time.Duration // 嵌入生成延迟EmbeddingModelFailCount int64         // 嵌入模型调用失败次数StorageQuotaExceededCount int64       // 存储配额超限次数IndexCreateLatency      time.Duration // 索引创建延迟IndexCreateFailCount    int64         // 索引创建失败次数EventPublishLatency     time.Duration // 事件发布延迟DatabaseInsertLatency   time.Duration // 数据库插入延迟VectorDatabaseLatency   time.Duration // 向量数据库操作延迟TotalStorageUsed        int64         // 总存储使用量TotalVectorCount        int64         // 总向量数量TotalDocumentCount      int64         // 总文档数量
}// 数据库创建监控指标上报
func (s *DatabaseApplicationService) reportCreateMetrics(ctx context.Context, operation string, startTime time.Time, databaseID int64, req *databaseAPI.CreateDatabaseRequest, err error) {latency := time.Since(startTime)if err != nil {metrics.CreateFailureCount++// 根据错误类型分类统计switch {case errors.Is(err, errno.ErrDatabasePermissionCode):metrics.PermissionDeniedCount++case errors.Is(err, errno.ErrDatabaseCreateRateLimitCode):metrics.RateLimitCount++case errors.Is(err, errno.ErrDatabaseInvalidParamCode):metrics.ParameterValidationFailCount++case errors.Is(err, errno.ErrDatabaseStorageQuotaExceededCode):metrics.StorageQuotaExceededCount++case errors.Is(err, errno.ErrDatabaseSpaceCreateFailedCode):metrics.VectorSpaceCreateFailCount++case errors.Is(err, errno.ErrDatabaseEmbeddingModelFailedCode):metrics.EmbeddingModelFailCount++case errors.Is(err, errno.ErrDatabaseIndexCreateFailedCode):metrics.IndexCreateFailCount++}logs.CtxErrorf(ctx, "Database %s failed, databaseName=%s, spaceID=%d, embeddingModel=%s, error=%v, latency=%dms", operation, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), err, latency.Milliseconds())} else {metrics.CreateSuccessCount++metrics.CreateLatency = latency// 记录数据库类型统计embeddingModel := req.GetEmbeddingModel()chunkSize := req.GetChunkSize()logs.CtxInfof(ctx, "Database %s succeeded, databaseID=%d, databaseName=%s, embeddingModel=%s, chunkSize=%d, latency=%dms", operation, databaseID, req.GetName(), embeddingModel, chunkSize, latency.Milliseconds())}// 上报到监控系统s.metricsReporter.Report(ctx, "database_create", map[string]interface{}{"operation":       operation,"database_id":     databaseID,"database_name":   req.GetName(),"embedding_model": req.GetEmbeddingModel(),"chunk_size":      req.GetChunkSize(),"chunk_overlap":   req.GetChunkOverlap(),"space_id":        req.GetSpaceID(),"success":         err == nil,"latency_ms":      latency.Milliseconds(),"error_type":      getDatabaseCreateErrorType(err),"vector_dimensions": getModelDimensions(req.GetEmbeddingModel()),"storage_used":    getStorageUsed(ctx, req.GetSpaceID()),})
}// 获取数据库创建错误类型
func getDatabaseCreateErrorType(err error) string {if err == nil {return "none"}// 基于数据库错误码定义switch {case errors.Is(err, errno.ErrDatabasePermissionCode):return "permission_denied"case errors.Is(err, errno.ErrDatabaseNameExistsCode):return "database_exists"case errors.Is(err, errno.ErrDatabaseInvalidParamCode):return "invalid_parameters"case errors.Is(err, errno.ErrDatabaseStorageQuotaExceededCode):return "storage_quota_exceeded"case errors.Is(err, errno.ErrDatabaseSpaceCreateFailedCode):return "vector_space_create_failed"case errors.Is(err, errno.ErrDatabaseEmbeddingModelFailedCode):return "embedding_model_failed"case errors.Is(err, errno.ErrDatabaseIndexCreateFailedCode):return "index_create_failed"case errors.Is(err, errno.ErrDatabaseProcessingFailedCode):return "document_processing_failed"case errors.Is(err, errno.ErrDatabaseVectorDatabaseTimeoutCode):return "vector_database_timeout"case errors.Is(err, errno.ErrDatabaseCreateRateLimitCode):return "rate_limit_exceeded"default:return "system_error"}
}// 数据库创建告警检查
func (s *DatabaseApplicationService) checkCreateAlerts(ctx context.Context, metrics *DatabaseCreateMetrics) {// 创建失败率告警totalCreates := metrics.CreateSuccessCount + metrics.CreateFailureCountif totalCreates > 100 {failureRate := float64(metrics.CreateFailureCount) / float64(totalCreates)if failureRate > 0.03 { // 3%s.alertManager.SendAlert(ctx, &Alert{Level:   "warning",Type:    "database_create_failure_rate",Message: fmt.Sprintf("数据库创建失败率过高: %.2f%%", failureRate*100),Metrics: map[string]interface{}{"failure_rate": failureRate,"total_creates": totalCreates,},})}}// 性能告警if metrics.CreateLatency > 10*time.Second {s.alertManager.SendAlert(ctx, &Alert{Level:   "warning",Type:    "database_create_latency",Message: fmt.Sprintf("数据库创建延迟过高: %dms", metrics.CreateLatency.Milliseconds()),Metrics: map[string]interface{}{"latency_ms": metrics.CreateLatency.Milliseconds(),},})}// 存储配额告警if metrics.StorageQuotaExceededCount > 10 {s.alertManager.SendAlert(ctx, &Alert{Level:   "critical",Type:    "database_storage_quota_exceeded",Message: fmt.Sprintf("存储配额超限次数过多: %d", metrics.StorageQuotaExceededCount),Metrics: map[string]interface{}{"quota_exceeded_count": metrics.StorageQuotaExceededCount,},})}// 向量空间创建失败告警if metrics.VectorSpaceCreateFailCount > 5 {s.alertManager.SendAlert(ctx, &Alert{Level:   "critical",Type:    "database_vector_space_create_failed",Message: fmt.Sprintf("向量空间创建失败次数过多: %d", metrics.VectorSpaceCreateFailCount),Metrics: map[string]interface{}{"vector_space_fail_count": metrics.VectorSpaceCreateFailCount,},})}// 嵌入模型失败告警if metrics.EmbeddingModelFailCount > 20 {s.alertManager.SendAlert(ctx, &Alert{Level:   "warning",Type:    "database_embedding_model_failed",Message: fmt.Sprintf("嵌入模型调用失败次数过多: %d", metrics.EmbeddingModelFailCount),Metrics: map[string]interface{}{"embedding_fail_count": metrics.EmbeddingModelFailCount,},})}
}
http://www.dtcms.com/a/403083.html

相关文章:

  • LeetCode:52.腐烂的橘子
  • LeetCode算法日记 - Day 52: 求根节点到叶节点数字之和、二叉树剪枝
  • 四种方法解决——力扣189.轮转数组
  • ⸢ 伍-Ⅱ⸥ ⤳ 默认安全治理实践:水平越权检测 前端安全防控
  • 力扣856
  • Leetcode94.二叉数的中序遍历练习
  • 多种解法全解析——力扣217. 存在重复元素
  • 用python做的网站多吗二手书交易网站策划书
  • phpcms网站源码ui培训班多少钱
  • Android Studio 导入 opencv
  • 在线网站做成appdede网站地图样式修改
  • 全新尚界H5凭借HUAWEI XMC数字底盘引擎技术,让湿滑路面也“稳”操胜券
  • iOS 26 性能测试实战,如何评估启动速度、CPUGPU 负载、帧率与系统资源适配(uni-app 与 iOS 原生应用性能方案)
  • 腾讯会议→微课操作
  • html原生表格,实现左侧列固定
  • Idea提高开发效率的快捷键最佳学习方式
  • 做网站一定需要icp么中国建设协会官网
  • Selenium使用教程
  • 多线程——单例模式
  • 镜头调焦的 调整的是焦距还是像距?
  • (四)React+.Net+Typescript全栈(错误处理)
  • @ant-design/icons-vue 打包成多格式库
  • 什么是营销型网站?杭州建设教育网站
  • C++开发环境(VSCode + CMake + gdb)
  • JAVA CodeX精选实用代码示例
  • 肥东网站建设南京医院网站建设
  • Qt 多线程解析
  • ZooKeeper与Kafka分布式:从基础原理到集群部署
  • 免费网站服务器安全软件下载wordpress权限设置方法
  • three.js射线拾取点击位置与屏幕坐标映射