Coze源码分析-资源库-创建数据库-后端源码-安全与错误处理
8. 数据库系统安全和权限验证机制
8.1 数据库系统身份认证
多因素认证机制:
- 数据库操作API请求需要携带有效的JWT Token进行身份验证
- 敏感操作强制要求二次验证(如短信验证码、邮箱验证等)
- 通过统一的认证中间件验证用户身份和权限
// 数据库系统身份认证中间件
func DatabaseAuthMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {token := ctx.GetHeader("Authorization")if token == nil {ctx.JSON(401, gin.H{"error": "访问数据库系统需要登录认证"})ctx.Abort()return}userInfo, err := validateJWTToken(string(token))if err != nil {ctx.JSON(401, gin.H{"error": "认证Token无效"})ctx.Abort()return}// 验证用户是否有数据库访问权限if !userInfo.HasDatabaseAccessPermission {ctx.JSON(403, gin.H{"error": "用户无数据库访问权限"})ctx.Abort()return}ctx.Set("user_id", userInfo.UserID)ctx.Set("database_id", userInfo.DatabaseID)ctx.Set("access_level", userInfo.AccessLevel)ctx.Next()}
}
8.2 数据库系统权限控制
细粒度权限管理:
- 基于RBAC(基于角色的访问控制)模型实现数据库操作权限控制
- 对数据库操作进行细粒度权限划分(读、写、管理等)
- 支持资源级别的访问控制,如特定表、字段的访问权限
// 数据库操作权限验证
func (d *DatabaseService) validateDatabaseOperationPermission(ctx context.Context, req *service.DatabaseOperationRequest) error {userID := ctx.Value("user_id").(int64)accessLevel := ctx.Value("access_level").(string)// 验证用户是否有执行特定操作的权限operationPermission := getOperationPermission(req.OperationType)if !hasPermission(accessLevel, operationPermission) {return errors.New("用户无权限执行该数据库操作")}// 检查数据库访问配额accessQuota, err := d.getUserDatabaseAccessQuota(ctx, userID)if err != nil {return fmt.Errorf("获取用户数据库访问配额失败: %w", err)}// 检查操作频率限制if err := d.checkOperationRateLimit(ctx, userID, req.OperationType); err != nil {return err}// 验证资源访问权限(如表、字段级别)if err := d.validateResourceAccessPermission(ctx, userID, req.DatabaseID, req.Resource); err != nil {return err}return nil
}
8.3 数据库创建资源级权限验证
数据库创建用户权限验证:
- 严格验证用户是否具有数据库创建权限
- 验证用户在指定工作空间的操作权限
- 通过存储配额和向量空间权限进行资源级控制
// 数据库系统操作权限验证
func (d *DatabaseSystemService) validateDatabaseOperationPermission(ctx context.Context, req *service.DatabaseOperationRequest) error {userID := ctx.Value("user_id").(int64)accessLevel := ctx.Value("access_level").(string)// 验证用户是否具有数据库操作权限hasPermission, err := d.userService.HasDatabaseOperationPermission(ctx, userID, req.OperationType)if err != nil {return fmt.Errorf("验证数据库操作权限失败: %w", err)}if !hasPermission {return errorx.New(errno.ErrDatabasePermissionCode, errorx.KV(errno.DatabaseMsgKey, "用户无数据库操作权限"),errorx.KV("user_id", userID),errorx.KV("operation_type", req.OperationType))}// 验证数据库访问权限级别requiredLevel := getRequiredAccessLevel(req.OperationType)if !checkAccessLevel(accessLevel, requiredLevel) {return errorx.New(errno.ErrDatabaseAccessLevelCode, errorx.KV(errno.DatabaseMsgKey, "用户访问权限级别不足"),errorx.KV("user_id", userID),errorx.KV("current_level", accessLevel),errorx.KV("required_level", requiredLevel))}// 检查操作频率限制opCount, err := d.getUserOperationCount(ctx, userID, req.OperationType, time.Now().Add(-1*time.Hour))if err != nil {return fmt.Errorf("检查数据库操作频率失败: %w", err)}if opCount >= getOperationRateLimit(req.OperationType) {return errorx.New(errno.ErrDatabaseOperationRateLimitCode, errorx.KV("user_id", userID),errorx.KV("operation_type", req.OperationType),errorx.KV("operation_count", opCount))}// 验证资源访问权限resourcePermission, err := d.checkResourceAccessPermission(ctx, userID, req.DatabaseID, req.Resource)if err != nil {return fmt.Errorf("检查资源访问权限失败: %w", err)}if !resourcePermission.CanAccess {return errorx.New(errno.ErrDatabaseResourcePermissionCode, errorx.KV(errno.DatabaseMsgKey, "用户无资源访问权限"),errorx.KV("user_id", userID),errorx.KV("resource_name", req.Resource.Name))}// 检查数据安全策略合规性if err := d.checkDataSecurityPolicy(ctx, req); err != nil {return err}// 验证数据操作审计配置if err := d.validateAuditConfig(ctx, req); err != nil {return err}return nil
}// 检查数据库名称是否存在
func (s *DatabaseApplicationService) checkDatabaseNameExists(ctx context.Context, spaceID int64, name string) (bool, error) {// 检查同一工作空间下是否存在同名数据库databases, err := s.DomainSVC.ListDatabases(ctx, &service.ListDatabasesRequest{SpaceID: spaceID,PageInfo: entity.PageInfo{PageSize: 1},})if err != nil {return false, err}for _, database := range databases.Databases {if database.Name == name {return true, nil}}return false, nil
}// 检查存储配额
func (s *DatabaseApplicationService) checkStorageQuota(ctx context.Context, userID, spaceID int64) (*StorageQuotaInfo, error) {// 获取用户存储配额信息quota, err := s.storageService.GetUserStorageQuota(ctx, userID)if err != nil {return nil, err}// 获取当前使用量usage, err := s.storageService.GetUserStorageUsage(ctx, userID)if err != nil {return nil, err}return &StorageQuotaInfo{UsedStorage: usage,MaxStorage: quota,CanCreate: usage < quota*0.95, // 使用率不超过95%}, nil
}// 检查向量空间权限
func (s *DatabaseApplicationService) checkVectorSpacePermission(ctx context.Context, userID int64, embeddingModel string) (*VectorSpacePermission, error) {// 检查用户是否有权限使用指定的嵌入模型modelPermission, err := s.embeddingService.CheckModelPermission(ctx, userID, embeddingModel)if err != nil {return nil, err}// 检查向量空间创建配额spaceCount, err := s.vectorService.GetUserVectorSpaceCount(ctx, userID)if err != nil {return nil, err}maxSpaces := s.getUserMaxVectorSpaces(userID)return &VectorSpacePermission{CanCreateSpace: modelPermission && spaceCount < maxSpaces,CurrentSpaces: spaceCount,MaxSpaces: maxSpaces,}, nil
}
8.4 数据库创建API访问控制
创建请求频率限制:
- 实现基于用户的数据库创建频率限制
- 防止恶意批量创建数据库
- 支持不同用户等级的差异化创建限流策略
- 基于文档处理能力的动态限流
创建操作安全验证:
- 严格验证创建请求的合法性
- 防止恶意创建和资源滥用攻击
- 使用多重安全检查机制
- 文档内容安全扫描和验证
- 向量空间创建安全验证
// 数据库创建参数验证
func validateDatabaseCreateRequest(req *service.CreateDatabaseRequest) error {if req.SpaceID <= 0 {return errors.New("无效的工作空间ID")}if req.CreatorID <= 0 {return errors.New("无效的创建者ID")}// 验证知识库名称if req.Name == "" {return errors.New("知识库名称不能为空")}if len(req.Name) > 100 {return errors.New("知识库名称长度不能超过100字符")}// 验证知识库描述if req.Description != "" && len(req.Description) > 1000 {return errors.New("知识库描述长度不能超过1000字符")}// 验证嵌入模型if req.EmbeddingModel == "" {return errors.New("嵌入模型不能为空")}if !isValidEmbeddingModel(req.EmbeddingModel) {return errors.New("不支持的嵌入模型")}// 验证分块策略if req.ChunkStrategy != nil {if req.ChunkStrategy.ChunkSize <= 0 || req.ChunkStrategy.ChunkSize > 8192 {return errors.New("分块大小必须在1-8192之间")}if req.ChunkStrategy.ChunkOverlap < 0 || req.ChunkStrategy.ChunkOverlap >= req.ChunkStrategy.ChunkSize {return errors.New("分块重叠大小必须小于分块大小")}}// 验证图标URIif req.IconURI != "" && !isValidIconURI(req.IconURI) {return errors.New("无效的图标URI格式")}return nil
}// 数据库创建操作安全检查
func (s *KnowledgeApplicationService) validateDatabaseCreateSafety(ctx context.Context, req *service.CreateKnowledgeRequest) error {userID := ctx.Value("user_id").(int64)// 检查用户数据库创建频率限制createCount, err := s.getUserDatabaseCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))if err != nil {return fmt.Errorf("检查数据库创建频率失败: %w", err)}if createCount >= 20 { // 24小时内最多创建20个知识库return errorx.New(errno.ErrDatabaseCreateRateLimitCode, errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 检查嵌入模型可用性modelAvailable, err := s.checkEmbeddingModelAvailable(ctx, req.EmbeddingModel)if err != nil {return fmt.Errorf("检查嵌入模型可用性失败: %w", err)}if !modelAvailable {return errors.New("嵌入模型当前不可用")}// 检查用户存储配额storageQuota, err := s.checkUserStorageQuota(ctx, userID)if err != nil {return fmt.Errorf("检查用户存储配额失败: %w", err)}if !storageQuota.CanCreateKnowledge {return errorx.New(errno.ErrDatabaseStorageQuotaExceededCode, errorx.KV("user_id", userID),errorx.KV("used_storage", storageQuota.UsedStorage),errorx.KV("max_storage", storageQuota.MaxStorage))}// 检查向量数据库连接vectorDBHealthy, err := s.checkVectorDatabaseHealth(ctx)if err != nil {return fmt.Errorf("检查向量数据库健康状态失败: %w", err)}if !vectorDBHealthy {return errors.New("向量数据库当前不可用,无法创建知识库")}// 检查文档处理服务状态docProcessorHealthy, err := s.checkDocumentProcessorHealth(ctx)if err != nil {return fmt.Errorf("检查文档处理服务状态失败: %w", err)}if !docProcessorHealthy {return errors.New("文档处理服务当前不可用,无法创建知识库")}return nil
}// 检查嵌入模型可用性
func (s *KnowledgeApplicationService) checkEmbeddingModelAvailable(ctx context.Context, modelName string) (bool, error) {// 检查模型是否在支持列表中supportedModels := []string{"text-embedding-ada-002","text-embedding-3-small","text-embedding-3-large","bge-large-zh-v1.5","bge-base-zh-v1.5",}for _, model := range supportedModels {if model == modelName {// 检查模型服务是否可用return s.embeddingService.IsModelAvailable(ctx, modelName)}}return false, nil
}// 检查向量数据库健康状态
func (s *KnowledgeApplicationService) checkVectorDatabaseHealth(ctx context.Context) (bool, error) {// 发送健康检查请求到向量数据库healthCheck := &VectorDBHealthCheck{Timeout: 5 * time.Second,}healthy, err := s.vectorService.HealthCheck(ctx, healthCheck)if err != nil {logs.CtxWarnf(ctx, "Vector database health check failed: %v", err)return false, nil}return healthy, nil
}// 检查文档处理服务健康状态
func (s *KnowledgeApplicationService) checkDocumentProcessorHealth(ctx context.Context) (bool, error) {// 检查文档处理队列状态queueStatus, err := s.documentService.GetQueueStatus(ctx)if err != nil {logs.CtxWarnf(ctx, "Document processor queue status check failed: %v", err)return false, nil}// 如果队列积压过多,认为服务不健康if queueStatus.PendingJobs > 10000 {logs.CtxWarnf(ctx, "Document processor queue overloaded: %d pending jobs", queueStatus.PendingJobs)return false, nil}return true, nil
}// 获取用户存储使用量
func (s *PluginApplicationService) getUserStorageUsage(ctx context.Context, userID int64) (int64, error) {// 查询用户所有插件的存储使用量plugins, err := s.DomainSVC.ListUserPlugins(ctx, userID)if err != nil {return 0, fmt.Errorf("获取用户插件列表失败: %w", err)}var totalSize int64for _, plugin := range plugins {// 计算插件manifest和openapi_doc的存储大小if plugin.Manifest != nil {totalSize += int64(len(plugin.Manifest))}if plugin.OpenapiDoc != nil {totalSize += int64(len(plugin.OpenapiDoc))}}return totalSize, nil
}// 获取用户最大存储配额
func (s *PluginApplicationService) getMaxStorageQuota(userID int64) int64 {// 根据用户等级返回不同的存储配额// 这里简化处理,实际应该从用户配置中获取return 100 * 1024 * 1024 // 100MB
}// URL格式验证
func isValidURL(urlStr string) bool {u, err := url.Parse(urlStr)return err == nil && u.Scheme != "" && u.Host != ""
}// 插件类型验证
func isValidPluginType(pluginType common.PluginType) bool {validTypes := []common.PluginType{common.PluginTypeHTTP,common.PluginTypeLocal,}for _, validType := range validTypes {if pluginType == validType {return true}}return false
}
9. 数据库创建错误处理和日志记录
9.1 数据库创建分层错误处理机制
数据库创建错误分类体系:
// 数据库创建错误类型定义
type DatabaseCreateErrorType intconst (// 数据库创建业务错误ErrDatabaseCreateBusiness DatabaseCreateErrorType = iota + 1000ErrDatabaseNameExistsErrDatabasePermissionDeniedErrDatabaseCreateRateLimitErrDatabaseInvalidParametersErrDatabaseModelNotSupportedErrDatabaseStorageQuotaExceededErrDatabaseProcessingFailedErrDatabaseInvalidFileTypeErrDatabaseFileSizeExceededErrDatabaseInvalidChunkSizeErrDatabaseInvalidIconURIErrDatabaseInvalidSpaceIDErrDatabaseDuplicateNameErrDatabaseSpaceCreateFailed// 数据库创建系统错误ErrDatabaseCreateSystem DatabaseCreateErrorType = iota + 2000ErrDatabaseDatabaseConnectionErrDatabaseSearchTimeoutErrDatabaseServiceUnavailableErrDatabaseCreateEventPublishFailedErrDatabaseIndexCreateFailedErrDatabaseTransactionRollbackFailedErrDatabaseStoreTimeoutErrDatabaseIDGenerationFailedErrDatabaseModelServiceFailedErrDatabaseContentIndexFailed// 数据库创建网络错误ErrDatabaseCreateNetwork DatabaseCreateErrorType = iota + 3000ErrDatabaseCreateRequestTimeoutErrDatabaseCreateConnectionRefusedErrDatabaseCreateServiceDownErrDatabaseCreateESConnectionFailedErrDatabaseDBConnectionFailedErrDatabaseModelAPITimeout
)
数据库创建错误处理流程:
- 捕获阶段:在数据库创建各层级捕获具体错误
- 包装阶段:添加数据库创建操作相关上下文信息和错误码
- 记录阶段:根据错误级别记录数据库创建操作日志
- 响应阶段:返回用户友好的数据库创建错误信息
- 回滚阶段:数据库创建失败时进行必要的数据回滚操作
- 向量处理:处理向量空间创建失败的错误
- 重试机制:对于可重试的创建错误提供重试建议
- 用户指导:为常见创建错误提供解决方案指导
9.2 数据库创建统一错误响应格式
// 数据库创建错误响应结构
type DatabaseCreateErrorResponse struct {Code int `json:"code"`Message string `json:"message"`Details string `json:"details,omitempty"`TraceID string `json:"trace_id"`KnowledgeID int64 `json:"knowledge_id,omitempty"`Operation string `json:"operation"`CanRetry bool `json:"can_retry"`DocumentsProcessed int `json:"documents_processed,omitempty"`DocumentsFailed int `json:"documents_failed,omitempty"`ValidationErrors []string `json:"validation_errors,omitempty"`SuggestedFix string `json:"suggested_fix,omitempty"`FieldErrors map[string]string `json:"field_errors,omitempty"`VectorSpaceStatus string `json:"vector_space_status,omitempty"`EmbeddingModel string `json:"embedding_model,omitempty"`
}// 数据库创建错误处理中间件
func DatabaseCreateErrorHandlerMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {defer func() {if err := recover(); err != nil {traceID := ctx.GetString("trace_id")userID := ctx.GetInt64("user_id")spaceID := ctx.GetInt64("space_id")logs.CtxErrorf(c, "Database creation panic recovered: %v, userID=%d, spaceID=%d, traceID=%s", err, userID, spaceID, traceID)ctx.JSON(500, DatabaseCreateErrorResponse{Code: 5000,Message: "数据库创建服务器内部错误",TraceID: traceID,Operation: "create_database",CanRetry: true,SuggestedFix: "请稍后重试,如果问题持续存在请联系技术支持",})}}()ctx.Next()}
}// 插件创建业务错误处理
func handlePluginCreateBusinessError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response PluginCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_plugin"switch {case errors.Is(err, errno.ErrPluginInvalidParamCode):response.Code = 400response.Message = "插件参数无效"response.CanRetry = falseresponse.SuggestedFix = "请检查插件名称、描述、服务器URL等参数是否正确"case errors.Is(err, errno.ErrPluginPermissionCode):response.Code = 403response.Message = "无权限创建插件"response.CanRetry = falseresponse.SuggestedFix = "请确保已登录且具有插件创建权限"case errors.Is(err, errno.ErrPluginInvalidManifest):response.Code = 400response.Message = "插件清单格式无效"response.CanRetry = falseresponse.SuggestedFix = "请检查插件清单文件格式是否符合规范"case errors.Is(err, errno.ErrPluginInvalidOpenapi3Doc):response.Code = 400response.Message = "OpenAPI文档格式无效"response.CanRetry = falseresponse.SuggestedFix = "请检查OpenAPI文档格式是否符合OpenAPI 3.0规范"case errors.Is(err, errno.ErrPluginIDExist):response.Code = 409response.Message = "插件ID已存在"response.CanRetry = falseresponse.SuggestedFix = "请使用不同的插件名称或检查是否已存在同名插件"case errors.Is(err, errno.ErrPluginCreateRateLimit):response.Code = 429response.Message = "创建操作过于频繁,请稍后再试"response.CanRetry = trueresponse.SuggestedFix = "请等待一段时间后重试"case errors.Is(err, errno.ErrPluginStorageQuotaExceeded):response.Code = 413response.Message = "存储配额已满"response.CanRetry = falseresponse.SuggestedFix = "请清理不需要的插件或升级存储配额"case errors.Is(err, errno.ErrPluginServerURLNotAccessible):response.Code = 400response.Message = "插件服务器URL不可访问"response.CanRetry = trueresponse.SuggestedFix = "请检查服务器URL是否正确且可访问"default:response.Code = 500response.Message = "插件创建失败"response.CanRetry = trueresponse.SuggestedFix = "请稍后重试,如果问题持续存在请联系技术支持"}ctx.JSON(response.Code, response)
}// 插件创建系统错误处理
func handlePluginCreateSystemError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response PluginCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_plugin"switch {case errors.Is(err, errno.ErrPluginDatabaseConnection):response.Code = 500response.Message = "插件数据库连接失败"response.CanRetry = trueresponse.SuggestedFix = "数据库连接异常,请稍后重试"case errors.Is(err, errno.ErrPluginElasticSearchTimeout):response.Code = 500response.Message = "插件索引操作超时"response.CanRetry = trueresponse.SuggestedFix = "搜索服务响应超时,请稍后重试"case errors.Is(err, errno.ErrPluginServiceUnavailable):response.Code = 503response.Message = "插件创建服务暂时不可用"response.CanRetry = trueresponse.SuggestedFix = "服务正在维护中,请稍后重试"case errors.Is(err, errno.ErrPluginCreateEventPublishFailed):response.Code = 500response.Message = "插件创建事件发布失败"response.CanRetry = trueresponse.SuggestedFix = "事件发布异常,插件已创建但可能影响搜索,请稍后重试"case errors.Is(err, errno.ErrPluginIndexCreateFailed):response.Code = 500response.Message = "插件索引创建失败"response.CanRetry = trueresponse.SuggestedFix = "搜索索引创建失败,插件已创建但可能无法搜索到"case errors.Is(err, errno.ErrPluginTransactionRollbackFailed):response.Code = 500response.Message = "插件创建事务回滚失败"response.CanRetry = falseresponse.SuggestedFix = "数据一致性异常,请联系技术支持"case errors.Is(err, errno.ErrPluginIDGenerationFailed):response.Code = 500response.Message = "插件ID生成失败"response.CanRetry = trueresponse.SuggestedFix = "ID生成服务异常,请稍后重试"default:response.Code = 5000response.Message = "插件创建失败"response.Details = "服务器内部错误,请稍后重试"response.CanRetry = trueresponse.SuggestedFix = "系统内部错误,请稍后重试或联系技术支持"}ctx.JSON(response.Code, response)
}
9.3 数据库创建日志记录策略
数据库创建日志级别定义:
- DEBUG:数据库创建详细调试信息,包括参数值、向量处理过程、文档分块详情
- INFO:数据库创建关键业务流程信息,如创建开始、参数验证、数据插入、向量空间创建
- WARN:数据库创建潜在问题警告,如存储配额警告、文档处理警告、向量生成警告
- ERROR:数据库创建错误信息,包括创建失败、权限错误、向量空间创建失败
- FATAL:数据库创建严重错误,可能导致数据不一致或向量空间损坏
数据库创建结构化日志格式:
// 数据库创建日志记录示例
func (s *KnowledgeApplicationService) CreateKnowledge(ctx context.Context, req *knowledgeAPI.CreateDatasetRequest) (*knowledgeAPI.CreateDatasetResponse, error) {traceID := generateTraceID()ctx = context.WithValue(ctx, "trace_id", traceID)userID := ctxutil.GetUIDFromCtx(ctx)// 记录数据库创建开始logs.CtxInfof(ctx, "CreateKnowledge started, userID=%d, knowledgeName=%s, spaceID=%d, embeddingModel=%s, traceID=%s", userID, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), traceID)startTime := time.Now()defer func() {duration := time.Since(startTime)logs.CtxInfof(ctx, "CreateKnowledge completed, duration=%dms, traceID=%s", duration.Milliseconds(), traceID)}()// 记录关键步骤logs.CtxInfof(ctx, "Validating knowledge create parameters, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, traceID=%s", req.GetName(), req.GetEmbeddingModel(), req.GetChunkSize(), traceID)// 权限验证日志logs.CtxInfof(ctx, "Validating knowledge create permission, userID=%d, spaceID=%d, traceID=%s", userID, req.GetSpaceID(), traceID)// 存储配额检查日志logs.CtxInfof(ctx, "Checking storage quota, userID=%d, traceID=%s", userID, traceID)// 向量空间创建日志logs.CtxInfof(ctx, "Creating vector space, embeddingModel=%s, dimensions=%d, traceID=%s", req.GetEmbeddingModel(), getModelDimensions(req.GetEmbeddingModel()), traceID)// 数据库创建操作日志logs.CtxInfof(ctx, "Creating knowledge in database, knowledgeName=%s, traceID=%s", req.GetName(), traceID)// ElasticSearch索引创建日志logs.CtxInfof(ctx, "Creating ElasticSearch index, knowledgeID=%d, traceID=%s", knowledgeID, traceID)// 事件发布日志logs.CtxInfof(ctx, "Publishing knowledge create event, knowledgeID=%d, traceID=%s", knowledgeID, traceID)return resp, nil
}// 数据库创建操作审计日志
func (s *KnowledgeApplicationService) logDatabaseCreateAudit(ctx context.Context, operation string, knowledgeID int64, details map[string]interface{}) {userID := ctx.Value("user_id").(int64)spaceID := ctx.Value("space_id").(int64)traceID := ctx.Value("trace_id").(string)auditLog := map[string]interface{}{"operation": operation,"knowledge_id": knowledgeID,"user_id": userID,"space_id": spaceID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"knowledge_name": details["knowledge_name"],"embedding_model": details["embedding_model"],"chunk_size": details["chunk_size"],"chunk_overlap": details["chunk_overlap"],"vector_space_id": details["vector_space_id"],"storage_used": details["storage_used"],}logs.CtxInfof(ctx, "Knowledge create audit log: %+v", auditLog)
}// 文档处理日志记录
func (s *KnowledgeApplicationService) logDocumentProcessing(ctx context.Context, knowledgeID int64, documentID int64, operation string, details map[string]interface{}) {traceID := ctx.Value("trace_id").(string)docLog := map[string]interface{}{"operation": operation,"knowledge_id": knowledgeID,"document_id": documentID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"file_name": details["file_name"],"file_size": details["file_size"],"chunk_count": details["chunk_count"],"vector_count": details["vector_count"],"processing_time": details["processing_time"],}logs.CtxInfof(ctx, "Document processing log: %+v", docLog)
}// 向量空间操作日志
func (s *KnowledgeApplicationService) logVectorSpaceOperation(ctx context.Context, operation string, vectorSpaceID string, details map[string]interface{}) {traceID := ctx.Value("trace_id").(string)vectorLog := map[string]interface{}{"operation": operation,"vector_space_id": vectorSpaceID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"embedding_model": details["embedding_model"],"dimensions": details["dimensions"],"vector_count": details["vector_count"],"index_type": details["index_type"],}logs.CtxInfof(ctx, "Vector space operation log: %+v", vectorLog)
}
数据库创建日志内容规范:
- 请求日志:记录用户ID、工作空间ID、知识库名称、嵌入模型、分块策略、TraceID
- 业务日志:记录数据库创建步骤、参数验证结果、权限验证结果、向量空间创建过程
- 性能日志:记录创建接口响应时间、数据库插入时间、向量空间创建时间、文档处理时间
- 错误日志:记录创建错误堆栈、知识库相关上下文信息、向量处理失败原因
- 审计日志:记录知识库的创建操作、创建参数、创建结果、关联的文档和向量信息
- 安全日志:记录创建频率、权限验证、存储配额检查、可疑创建行为
- 文档处理日志:记录文档上传、分块处理、向量生成、索引创建等详细过程
- 向量空间日志:记录向量空间创建、配置、索引构建、查询性能等信息
9.4 数据库创建监控和告警
数据库创建关键指标监控:
- 创建性能:数据库创建响应时间、创建成功率、创建QPS、创建吞吐量
- 资源使用:数据库连接数、向量空间创建延迟、内存使用率、文档处理队列长度
- 业务指标:数据库创建成功率、创建频率分布、不同嵌入模型使用比例、用户创建活跃度
- 安全指标:权限验证通过率、恶意创建尝试次数、创建频率限制触发次数、存储配额检查失败率
- 质量指标:向量空间创建成功率、文档处理成功率、嵌入模型响应率、索引创建成功率
- 存储指标:存储使用量、向量数量、文档数量、索引大小、存储增长率
- 向量处理指标:向量生成延迟、向量维度分布、嵌入模型调用次数、向量相似度计算性能
数据库创建告警策略:
- 创建失败率告警:当数据库创建失败率超过3%时触发告警
- 性能告警:当数据库创建响应时间超过10秒时触发告警
- 资源告警:当数据库连接数超过80%或向量数据库连接异常时触发告警
- 安全告警:当检测到异常创建行为或存储配额滥用时立即触发告警
- 数据一致性告警:当MySQL、ES和向量数据库创建状态不一致时触发告警
- 配额告警:当用户存储使用量超过90%时触发告警
- 向量服务告警:当嵌入模型服务不可用或响应超时时触发告警
- 文档处理告警:当文档处理队列积压超过阈值时触发告警
// 数据库创建监控指标收集
type DatabaseCreateMetrics struct {CreateSuccessCount int64 // 创建成功次数CreateFailureCount int64 // 创建失败次数CreateLatency time.Duration // 创建延迟PermissionDeniedCount int64 // 权限拒绝次数RateLimitCount int64 // 频率限制次数ParameterValidationFailCount int64 // 参数验证失败次数VectorSpaceCreateLatency time.Duration // 向量空间创建延迟VectorSpaceCreateFailCount int64 // 向量空间创建失败次数DocumentProcessingLatency time.Duration // 文档处理延迟EmbeddingGenerationLatency time.Duration // 嵌入生成延迟EmbeddingModelFailCount int64 // 嵌入模型调用失败次数StorageQuotaExceededCount int64 // 存储配额超限次数IndexCreateLatency time.Duration // 索引创建延迟IndexCreateFailCount int64 // 索引创建失败次数EventPublishLatency time.Duration // 事件发布延迟DatabaseInsertLatency time.Duration // 数据库插入延迟VectorDatabaseLatency time.Duration // 向量数据库操作延迟TotalStorageUsed int64 // 总存储使用量TotalVectorCount int64 // 总向量数量TotalDocumentCount int64 // 总文档数量
}// 数据库创建监控指标上报
func (s *DatabaseApplicationService) reportCreateMetrics(ctx context.Context, operation string, startTime time.Time, databaseID int64, req *databaseAPI.CreateDatabaseRequest, err error) {latency := time.Since(startTime)if err != nil {metrics.CreateFailureCount++// 根据错误类型分类统计switch {case errors.Is(err, errno.ErrDatabasePermissionCode):metrics.PermissionDeniedCount++case errors.Is(err, errno.ErrDatabaseCreateRateLimitCode):metrics.RateLimitCount++case errors.Is(err, errno.ErrDatabaseInvalidParamCode):metrics.ParameterValidationFailCount++case errors.Is(err, errno.ErrDatabaseStorageQuotaExceededCode):metrics.StorageQuotaExceededCount++case errors.Is(err, errno.ErrDatabaseSpaceCreateFailedCode):metrics.VectorSpaceCreateFailCount++case errors.Is(err, errno.ErrDatabaseEmbeddingModelFailedCode):metrics.EmbeddingModelFailCount++case errors.Is(err, errno.ErrDatabaseIndexCreateFailedCode):metrics.IndexCreateFailCount++}logs.CtxErrorf(ctx, "Database %s failed, databaseName=%s, spaceID=%d, embeddingModel=%s, error=%v, latency=%dms", operation, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), err, latency.Milliseconds())} else {metrics.CreateSuccessCount++metrics.CreateLatency = latency// 记录数据库类型统计embeddingModel := req.GetEmbeddingModel()chunkSize := req.GetChunkSize()logs.CtxInfof(ctx, "Database %s succeeded, databaseID=%d, databaseName=%s, embeddingModel=%s, chunkSize=%d, latency=%dms", operation, databaseID, req.GetName(), embeddingModel, chunkSize, latency.Milliseconds())}// 上报到监控系统s.metricsReporter.Report(ctx, "database_create", map[string]interface{}{"operation": operation,"database_id": databaseID,"database_name": req.GetName(),"embedding_model": req.GetEmbeddingModel(),"chunk_size": req.GetChunkSize(),"chunk_overlap": req.GetChunkOverlap(),"space_id": req.GetSpaceID(),"success": err == nil,"latency_ms": latency.Milliseconds(),"error_type": getDatabaseCreateErrorType(err),"vector_dimensions": getModelDimensions(req.GetEmbeddingModel()),"storage_used": getStorageUsed(ctx, req.GetSpaceID()),})
}// 获取数据库创建错误类型
func getDatabaseCreateErrorType(err error) string {if err == nil {return "none"}// 基于数据库错误码定义switch {case errors.Is(err, errno.ErrDatabasePermissionCode):return "permission_denied"case errors.Is(err, errno.ErrDatabaseNameExistsCode):return "database_exists"case errors.Is(err, errno.ErrDatabaseInvalidParamCode):return "invalid_parameters"case errors.Is(err, errno.ErrDatabaseStorageQuotaExceededCode):return "storage_quota_exceeded"case errors.Is(err, errno.ErrDatabaseSpaceCreateFailedCode):return "vector_space_create_failed"case errors.Is(err, errno.ErrDatabaseEmbeddingModelFailedCode):return "embedding_model_failed"case errors.Is(err, errno.ErrDatabaseIndexCreateFailedCode):return "index_create_failed"case errors.Is(err, errno.ErrDatabaseProcessingFailedCode):return "document_processing_failed"case errors.Is(err, errno.ErrDatabaseVectorDatabaseTimeoutCode):return "vector_database_timeout"case errors.Is(err, errno.ErrDatabaseCreateRateLimitCode):return "rate_limit_exceeded"default:return "system_error"}
}// 数据库创建告警检查
func (s *DatabaseApplicationService) checkCreateAlerts(ctx context.Context, metrics *DatabaseCreateMetrics) {// 创建失败率告警totalCreates := metrics.CreateSuccessCount + metrics.CreateFailureCountif totalCreates > 100 {failureRate := float64(metrics.CreateFailureCount) / float64(totalCreates)if failureRate > 0.03 { // 3%s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "database_create_failure_rate",Message: fmt.Sprintf("数据库创建失败率过高: %.2f%%", failureRate*100),Metrics: map[string]interface{}{"failure_rate": failureRate,"total_creates": totalCreates,},})}}// 性能告警if metrics.CreateLatency > 10*time.Second {s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "database_create_latency",Message: fmt.Sprintf("数据库创建延迟过高: %dms", metrics.CreateLatency.Milliseconds()),Metrics: map[string]interface{}{"latency_ms": metrics.CreateLatency.Milliseconds(),},})}// 存储配额告警if metrics.StorageQuotaExceededCount > 10 {s.alertManager.SendAlert(ctx, &Alert{Level: "critical",Type: "database_storage_quota_exceeded",Message: fmt.Sprintf("存储配额超限次数过多: %d", metrics.StorageQuotaExceededCount),Metrics: map[string]interface{}{"quota_exceeded_count": metrics.StorageQuotaExceededCount,},})}// 向量空间创建失败告警if metrics.VectorSpaceCreateFailCount > 5 {s.alertManager.SendAlert(ctx, &Alert{Level: "critical",Type: "database_vector_space_create_failed",Message: fmt.Sprintf("向量空间创建失败次数过多: %d", metrics.VectorSpaceCreateFailCount),Metrics: map[string]interface{}{"vector_space_fail_count": metrics.VectorSpaceCreateFailCount,},})}// 嵌入模型失败告警if metrics.EmbeddingModelFailCount > 20 {s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "database_embedding_model_failed",Message: fmt.Sprintf("嵌入模型调用失败次数过多: %d", metrics.EmbeddingModelFailCount),Metrics: map[string]interface{}{"embedding_fail_count": metrics.EmbeddingModelFailCount,},})}
}