Coze源码分析-资源库-创建知识库-基础设施/存储/安全
6. 基础设施层
基础设施层为知识库创建功能提供底层技术支撑,包括数据存储、缓存、消息队列、文档处理、向量化等核心服务。
6.1 数据存储服务
6.1.1 MySQL数据库
文件位置: backend/infra/rdb/mysql.go
// MySQLConfig MySQL配置
type MySQLConfig struct {Host string `yaml:"host"`Port int `yaml:"port"`Username string `yaml:"username"`Password string `yaml:"password"`Database string `yaml:"database"`MaxOpenConns int `yaml:"max_open_conns"`MaxIdleConns int `yaml:"max_idle_conns"`MaxLifetime int `yaml:"max_lifetime"`
}// NewMySQLConnection 创建MySQL连接
func NewMySQLConnection(config *MySQLConfig) (*gorm.DB, error) {dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",config.Username, config.Password, config.Host, config.Port, config.Database)db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: logger.Default.LogMode(logger.Info),NamingStrategy: schema.NamingStrategy{SingularTable: true,},})if err != nil {return nil, fmt.Errorf("连接MySQL失败: %w", err)}sqlDB, err := db.DB()if err != nil {return nil, fmt.Errorf("获取SQL DB失败: %w", err)}// 设置连接池参数sqlDB.SetMaxOpenConns(config.MaxOpenConns)sqlDB.SetMaxIdleConns(config.MaxIdleConns)sqlDB.SetConnMaxLifetime(time.Duration(config.MaxLifetime) * time.Second)return db, nil
}
6.1.2 Redis缓存
文件位置: backend/infra/cache/redis.go
// RedisConfig Redis配置
type RedisConfig struct {Host string `yaml:"host"`Port int `yaml:"port"`Password string `yaml:"password"`DB int `yaml:"db"`PoolSize int `yaml:"pool_size"`
}// NewRedisClient 创建Redis客户端
func NewRedisClient(config *RedisConfig) *redis.Client {rdb := redis.NewClient(&redis.Options{Addr: fmt.Sprintf("%s:%d", config.Host, config.Port),Password: config.Password,DB: config.DB,PoolSize: config.PoolSize,})return rdb
}// KnowledgeCacheManager 知识库缓存管理器
type KnowledgeCacheManager struct {redisClient *redis.ClientlocalCache *cache.Cache
}func (c *KnowledgeCacheManager) SetKnowledge(ctx context.Context, knowledge *model.Knowledge) error {// 1. 序列化知识库数据data, err := json.Marshal(knowledge)if err != nil {return fmt.Errorf("序列化知识库数据失败: %w", err)}// 2. 设置Redis缓存cacheKey := fmt.Sprintf("knowledge:%d", knowledge.ID)err = c.redisClient.Set(ctx, cacheKey, data, time.Hour).Err()if err != nil {return fmt.Errorf("设置Redis缓存失败: %w", err)}// 3. 设置本地缓存c.localCache.Set(cacheKey, knowledge, time.Hour)return nil
}
6.2 文档处理服务
6.2.1 文档解析器
文件位置: backend/infra/document/parser.go
// DocumentParser 文档解析器接口
type DocumentParser interface {Parse(ctx context.Context, file io.Reader, fileType string) (*ParseResult, error)SupportedTypes() []string
}// ParseResult 解析结果
type ParseResult struct {Content string `json:"content"`Metadata map[string]string `json:"metadata"`Sections []*Section `json:"sections"`WordCount int `json:"word_count"`
}// Section 文档章节
type Section struct {Title string `json:"title"`Content string `json:"content"`Level int `json:"level"`
}// UniversalDocumentParser 通用文档解析器
type UniversalDocumentParser struct {parsers map[string]DocumentParser
}func (p *UniversalDocumentParser) Parse(ctx context.Context, file io.Reader, fileType string) (*ParseResult, error) {parser, exists := p.parsers[fileType]if !exists {return nil, fmt.Errorf("不支持的文件类型: %s", fileType)}result, err := parser.Parse(ctx, file, fileType)if err != nil {return nil, fmt.Errorf("解析文档失败: %w", err)}return result, nil
}
6.2.2 文档分片器
文件位置: backend/infra/document/splitter.go
// DocumentSplitter 文档分片器
type DocumentSplitter struct {maxChunkSize intoverlapSize intseparators []string
}// SplitDocument 分割文档
func (s *DocumentSplitter) SplitDocument(ctx context.Context, content string) ([]*DocumentSlice, error) {var slices []*DocumentSlice// 1. 按段落分割paragraphs := strings.Split(content, "\n\n")var currentSlice strings.Buildervar currentSize intfor _, paragraph := range paragraphs {paragraphSize := len(paragraph)// 2. 检查是否需要创建新分片if currentSize+paragraphSize > s.maxChunkSize && currentSize > 0 {// 创建当前分片slice := &DocumentSlice{Content: currentSlice.String(),WordCount: currentSize,Index: len(slices),}slices = append(slices, slice)// 重置当前分片currentSlice.Reset()currentSize = 0// 添加重叠内容if s.overlapSize > 0 {overlapContent := s.getOverlapContent(slice.Content, s.overlapSize)currentSlice.WriteString(overlapContent)currentSize = len(overlapContent)}}// 3. 添加段落到当前分片if currentSize > 0 {currentSlice.WriteString("\n\n")currentSize += 2}currentSlice.WriteString(paragraph)currentSize += paragraphSize}// 4. 处理最后一个分片if currentSize > 0 {slice := &DocumentSlice{Content: currentSlice.String(),WordCount: currentSize,Index: len(slices),}slices = append(slices, slice)}return slices, nil
}// DocumentSlice 文档分片
type DocumentSlice struct {Content string `json:"content"`WordCount int `json:"word_count"`Index int `json:"index"`Vector []float32 `json:"vector,omitempty"`
}
6.3 向量化服务
6.3.1 向量化引擎
文件位置: backend/infra/embedding/engine.go
// EmbeddingEngine 向量化引擎接口
type EmbeddingEngine interface {Embed(ctx context.Context, texts []string) ([][]float32, error)GetDimension() intGetModel() string
}// OpenAIEmbeddingEngine OpenAI向量化引擎
type OpenAIEmbeddingEngine struct {client *openai.Clientmodel stringdimension int
}func (e *OpenAIEmbeddingEngine) Embed(ctx context.Context, texts []string) ([][]float32, error) {// 1. 构建请求req := openai.EmbeddingRequest{Input: texts,Model: openai.EmbeddingModel(e.model),}// 2. 调用OpenAI APIresp, err := e.client.CreateEmbeddings(ctx, req)if err != nil {return nil, fmt.Errorf("调用OpenAI向量化API失败: %w", err)}// 3. 提取向量数据vectors := make([][]float32, len(resp.Data))for i, embedding := range resp.Data {vectors[i] = make([]float32, len(embedding.Embedding))for j, val := range embedding.Embedding {vectors[i][j] = float32(val)}}return vectors, nil
}
6.4 向量存储服务
6.4.1 Milvus向量数据库
文件位置: backend/infra/searchstore/milvus/client.go
// MilvusClient Milvus客户端
type MilvusClient struct {client milvus.Clientconfig *MilvusConfig
}// MilvusConfig Milvus配置
type MilvusConfig struct {Host string `yaml:"host"`Port int `yaml:"port"`Username string `yaml:"username"`Password string `yaml:"password"`Database string `yaml:"database"`
}// CreateCollection 创建集合
func (c *MilvusClient) CreateCollection(ctx context.Context, collectionName string, dimension int) error {// 1. 定义字段fields := []*entity.Field{{Name: "id",DataType: entity.FieldTypeInt64,PrimaryKey: true,AutoID: false,},{Name: "knowledge_id",DataType: entity.FieldTypeInt64,},{Name: "document_id",DataType: entity.FieldTypeInt64,},{Name: "slice_id",DataType: entity.FieldTypeInt64,},{Name: "content",DataType: entity.FieldTypeVarChar,TypeParams: map[string]string{"max_length": "65535",},},{Name: "vector",DataType: entity.FieldTypeFloatVector,TypeParams: map[string]string{"dim": fmt.Sprintf("%d", dimension),},},}// 2. 创建集合schema := &entity.Schema{CollectionName: collectionName,Description: "知识库向量集合",Fields: fields,}err := c.client.CreateCollection(ctx, schema, entity.DefaultShardNumber)if err != nil {return fmt.Errorf("创建Milvus集合失败: %w", err)}// 3. 创建索引indexParam := entity.NewIndexIvfFlat(entity.L2, 1024)err = c.client.CreateIndex(ctx, collectionName, "vector", indexParam, false)if err != nil {return fmt.Errorf("创建向量索引失败: %w", err)}return nil
}// InsertVectors 插入向量
func (c *MilvusClient) InsertVectors(ctx context.Context, collectionName string, data *VectorData) error {// 1. 准备数据ids := make([]int64, len(data.IDs))knowledgeIDs := make([]int64, len(data.IDs))documentIDs := make([]int64, len(data.IDs))sliceIDs := make([]int64, len(data.IDs))contents := make([]string, len(data.IDs))vectors := make([][]float32, len(data.IDs))for i, item := range data.Items {ids[i] = item.IDknowledgeIDs[i] = item.KnowledgeIDdocumentIDs[i] = item.DocumentIDsliceIDs[i] = item.SliceIDcontents[i] = item.Contentvectors[i] = item.Vector}// 2. 构建列数据columns := []entity.Column{entity.NewColumnInt64("id", ids),entity.NewColumnInt64("knowledge_id", knowledgeIDs),entity.NewColumnInt64("document_id", documentIDs),entity.NewColumnInt64("slice_id", sliceIDs),entity.NewColumnVarChar("content", contents),entity.NewColumnFloatVector("vector", dimension, vectors),}// 3. 插入数据_, err := c.client.Insert(ctx, collectionName, "", columns...)if err != nil {return fmt.Errorf("插入向量数据失败: %w", err)}return nil
}
6.5 消息队列服务
6.5.1 事件总线
文件位置: backend/infra/eventbus/eventbus.go
// EventBus 事件总线接口
type EventBus interface {Publish(ctx context.Context, topic string, event interface{}) errorSubscribe(topic string, handler EventHandler) errorStart(ctx context.Context) errorStop() error
}// EventHandler 事件处理器
type EventHandler func(ctx context.Context, event interface{}) error// KafkaEventBus Kafka事件总线
type KafkaEventBus struct {producer sarama.SyncProducerconsumer sarama.ConsumerGroupconfig *KafkaConfighandlers map[string][]EventHandler
}// KafkaConfig Kafka配置
type KafkaConfig struct {Brokers []string `yaml:"brokers"`GroupID string `yaml:"group_id"`Username string `yaml:"username"`Password string `yaml:"password"`
}// Publish 发布事件
func (k *KafkaEventBus) Publish(ctx context.Context, topic string, event interface{}) error {// 1. 序列化事件data, err := json.Marshal(event)if err != nil {return fmt.Errorf("序列化事件失败: %w", err)}// 2. 构建消息msg := &sarama.ProducerMessage{Topic: topic,Value: sarama.StringEncoder(data),Headers: []sarama.RecordHeader{{Key: []byte("event_type"),Value: []byte(reflect.TypeOf(event).Name()),},{Key: []byte("timestamp"),Value: []byte(fmt.Sprintf("%d", time.Now().Unix())),},},}// 3. 发送消息_, _, err = k.producer.SendMessage(msg)if err != nil {return fmt.Errorf("发送Kafka消息失败: %w", err)}return nil
}
6.6 搜索服务
6.6.1 ElasticSearch
文件位置: backend/infra/es/client.go
// ESClient ElasticSearch客户端
type ESClient struct {client *elasticsearch.Clientconfig *ESConfig
}// ESConfig ElasticSearch配置
type ESConfig struct {Addresses []string `yaml:"addresses"`Username string `yaml:"username"`Password string `yaml:"password"`Index string `yaml:"index"`
}// CreateKnowledgeIndex 创建知识库索引
func (c *ESClient) CreateKnowledgeIndex(ctx context.Context, indexName string) error {// 1. 定义索引映射mapping := map[string]interface{}{"mappings": map[string]interface{}{"properties": map[string]interface{}{"knowledge_id": map[string]interface{}{"type": "long",},"name": map[string]interface{}{"type": "text","analyzer": "ik_max_word",},"description": map[string]interface{}{"type": "text","analyzer": "ik_max_word",},"content": map[string]interface{}{"type": "text","analyzer": "ik_max_word",},"space_id": map[string]interface{}{"type": "long",},"creator_id": map[string]interface{}{"type": "long",},"created_at": map[string]interface{}{"type": "date",},"status": map[string]interface{}{"type": "integer",},},},"settings": map[string]interface{}{"number_of_shards": 1,"number_of_replicas": 1,"analysis": map[string]interface{}{"analyzer": map[string]interface{}{"ik_max_word": map[string]interface{}{"type": "ik_max_word","tokenizer": "ik_max_word",},},},},}// 2. 创建索引mappingJSON, _ := json.Marshal(mapping)req := esapi.IndicesCreateRequest{Index: indexName,Body: strings.NewReader(string(mappingJSON)),}res, err := req.Do(ctx, c.client)if err != nil {return fmt.Errorf("创建ES索引失败: %w", err)}defer res.Body.Close()if res.IsError() {return fmt.Errorf("创建ES索引失败: %s", res.String())}return nil
}
6.7 配置管理
6.7.1 配置中心
文件位置: backend/infra/config/config.go
// Config 应用配置
type Config struct {Server ServerConfig `yaml:"server"`Database DatabaseConfig `yaml:"database"`Redis RedisConfig `yaml:"redis"`Kafka KafkaConfig `yaml:"kafka"`Milvus MilvusConfig `yaml:"milvus"`ES ESConfig `yaml:"elasticsearch"`Embedding EmbeddingConfig `yaml:"embedding"`
}// ServerConfig 服务器配置
type ServerConfig struct {Host string `yaml:"host"`Port int `yaml:"port"`Mode string `yaml:"mode"`
}// DatabaseConfig 数据库配置
type DatabaseConfig struct {MySQL MySQLConfig `yaml:"mysql"`
}// EmbeddingConfig 向量化配置
type EmbeddingConfig struct {Provider string `yaml:"provider"`Model string `yaml:"model"`APIKey string `yaml:"api_key"`Dimension int `yaml:"dimension"`
}// LoadConfig 加载配置
func LoadConfig(configPath string) (*Config, error) {// 1. 读取配置文件data, err := ioutil.ReadFile(configPath)if err != nil {return nil, fmt.Errorf("读取配置文件失败: %w", err)}// 2. 解析YAML配置var config Configerr = yaml.Unmarshal(data, &config)if err != nil {return nil, fmt.Errorf("解析配置文件失败: %w", err)}// 3. 环境变量覆盖err = envconfig.Process("", &config)if err != nil {return nil, fmt.Errorf("处理环境变量失败: %w", err)}return &config, nil
}
6.8 基础设施层总结
基础设施层为知识库创建功能提供了完整的技术支撑:
- 数据存储: MySQL主数据库 + Redis缓存
- 文档处理: 多格式文档解析 + 智能分片
- 向量化: OpenAI/本地模型向量化
- 向量存储: Milvus向量数据库
- 搜索引擎: ElasticSearch全文搜索
- 消息队列: Kafka事件驱动
- 配置管理: 统一配置中心
这些基础设施服务通过依赖注入的方式集成到上层业务逻辑中,确保了系统的可扩展性和可维护性。
7. 数据存储层
7.1 数据库表结构
knowledge_base 表设计
文件位置:helm/charts/opencoze/files/mysql/schema.sql
真实DDL结构:
CREATE TABLE IF NOT EXISTS `knowledge_base` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'knowledge base id',`space_id` bigint NOT NULL COMMENT 'space id',`creator_id` bigint NOT NULL COMMENT 'creator user id',`name` varchar(255) NOT NULL COMMENT 'knowledge base name',`description` text NULL COMMENT 'knowledge base description',`icon_uri` varchar(255) NULL COMMENT 'icon uri',`status` int NOT NULL DEFAULT 1 COMMENT 'status: 1-active, 2-deleted',`embedding_model` varchar(100) NOT NULL COMMENT 'embedding model name',`chunk_size` int NOT NULL DEFAULT 1000 COMMENT 'document chunk size',`chunk_overlap` int NOT NULL DEFAULT 200 COMMENT 'chunk overlap size',`document_count` int NOT NULL DEFAULT 0 COMMENT 'total document count',`total_size` bigint NOT NULL DEFAULT 0 COMMENT 'total storage size in bytes',`settings` json NULL COMMENT 'knowledge base settings',`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',PRIMARY KEY (`id`),INDEX `idx_creator_id` (`creator_id`),INDEX `idx_space_id` (`space_id`),INDEX `idx_status` (`status`),INDEX `idx_created_at` (`created_at`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_base';
knowledge_document 表设计
真实DDL结构:
CREATE TABLE IF NOT EXISTS `knowledge_document` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'document id',`knowledge_base_id` bigint NOT NULL COMMENT 'knowledge base id',`name` varchar(255) NOT NULL COMMENT 'document name',`file_type` varchar(50) NOT NULL COMMENT 'file type: pdf, txt, docx, etc',`file_size` bigint NOT NULL COMMENT 'file size in bytes',`file_path` varchar(500) NOT NULL COMMENT 'file storage path',`content_hash` varchar(64) NOT NULL COMMENT 'content hash for deduplication',`chunk_count` int NOT NULL DEFAULT 0 COMMENT 'total chunk count',`processing_status` int NOT NULL DEFAULT 1 COMMENT 'processing status: 1-pending, 2-processing, 3-completed, 4-failed',`error_message` text NULL COMMENT 'error message if processing failed',`metadata` json NULL COMMENT 'document metadata',`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',PRIMARY KEY (`id`),INDEX `idx_knowledge_base_id` (`knowledge_base_id`),INDEX `idx_processing_status` (`processing_status`),INDEX `idx_content_hash` (`content_hash`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_document';
knowledge_chunk 表设计
真实DDL结构:
CREATE TABLE IF NOT EXISTS `knowledge_chunk` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'chunk id',`knowledge_base_id` bigint NOT NULL COMMENT 'knowledge base id',`document_id` bigint NOT NULL COMMENT 'document id',`chunk_index` int NOT NULL COMMENT 'chunk index in document',`content` text NOT NULL COMMENT 'chunk content',`content_hash` varchar(64) NOT NULL COMMENT 'content hash',`token_count` int NOT NULL DEFAULT 0 COMMENT 'token count',`embedding_vector` json NULL COMMENT 'embedding vector data',`metadata` json NULL COMMENT 'chunk metadata',`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Create Time in Milliseconds',`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT 'Update Time in Milliseconds',PRIMARY KEY (`id`),INDEX `idx_knowledge_base_id` (`knowledge_base_id`),INDEX `idx_document_id` (`document_id`),INDEX `idx_content_hash` (`content_hash`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT 'knowledge_chunk';
表结构特点:
- 关联设计:knowledge_base、knowledge_document和knowledge_chunk通过外键关联,支持级联查询
- 空间隔离:通过
space_id
实现多租户数据隔离 - JSON存储:
settings
、metadata
和embedding_vector
使用JSON类型,支持复杂结构数据 - 状态管理:knowledge_document表包含处理状态字段,支持异步处理流程
- 索引优化:在关键查询字段上建立索引,优化查询性能
- 字符集:使用
utf8mb4_0900_ai_ci
排序规则,支持完整的Unicode字符集 - 向量存储:支持嵌入向量的JSON存储,便于语义搜索
- 去重机制:通过content_hash实现内容去重
knowledge_base字段详解:
id
:自增主键,唯一标识每个知识库space_id
:工作空间ID,实现租户级别的数据隔离creator_id
:创建者用户ID,用于权限控制和查询优化name
:知识库名称description
:知识库描述信息icon_uri
:知识库图标URIstatus
:知识库状态(1-活跃,2-已删除)embedding_model
:嵌入模型名称chunk_size
:文档分块大小chunk_overlap
:分块重叠大小document_count
:文档总数total_size
:总存储大小(字节)settings
:知识库设置,JSON格式created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
knowledge_document字段详解:
id
:自增主键,唯一标识每个文档knowledge_base_id
:关联的知识库IDname
:文档名称file_type
:文件类型(pdf、txt、docx等)file_size
:文件大小(字节)file_path
:文件存储路径content_hash
:内容哈希,用于去重chunk_count
:分块总数processing_status
:处理状态(1-待处理,2-处理中,3-已完成,4-失败)error_message
:处理失败时的错误信息metadata
:文档元数据,JSON格式created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
knowledge_chunk字段详解:
id
:自增主键,唯一标识每个分块knowledge_base_id
:关联的知识库IDdocument_id
:关联的文档IDchunk_index
:在文档中的分块索引content
:分块内容content_hash
:内容哈希token_count
:令牌数量embedding_vector
:嵌入向量数据,JSON格式metadata
:分块元数据,JSON格式created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
7.2 ElasticSearch 索引架构
coze_resource 统一索引
索引设计理念:
Coze平台采用统一索引策略,将所有资源类型(插件、工作流、知识库、提示词、数据库等)存储在同一个 coze_resource
索引中,通过 res_type
字段进行类型区分。
知识库在索引中的映射:
{"mappings": {"properties": {"res_id": {"type": "long","description": "资源ID,对应knowledge_base.id"},"res_type": {"type": "integer", "description": "资源类型,知识库为4"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword","ignore_above": 256}},"description": "知识库名称,支持全文搜索和精确匹配"},"description": {"type": "text","analyzer": "standard","description": "知识库描述,支持全文搜索"},"owner_id": {"type": "long","description": "所有者ID,对应creator_id"},"space_id": {"type": "long","description": "工作空间ID"},"embedding_model": {"type": "keyword","description": "嵌入模型名称"},"document_count": {"type": "integer","description": "文档数量"},"total_size": {"type": "long","description": "总存储大小"},"status": {"type": "integer","description": "知识库状态"},"create_time": {"type": "long","description": "创建时间戳(毫秒)"},"update_time": {"type": "long","description": "更新时间戳(毫秒)"}}}
}
knowledge_content 内容索引
知识库内容专用索引:
{"mappings": {"properties": {"chunk_id": {"type": "long","description": "分块ID"},"knowledge_base_id": {"type": "long","description": "知识库ID"},"document_id": {"type": "long","description": "文档ID"},"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart","description": "分块内容,支持中文分词"},"embedding_vector": {"type": "dense_vector","dims": 1536,"description": "嵌入向量,用于语义搜索"},"metadata": {"type": "object","description": "分块元数据"},"token_count": {"type": "integer","description": "令牌数量"}}}
}
资源类型常量定义:
const (ResTypePlugin = 1 // 插件ResTypeWorkflow = 2 // 工作流ResTypeKnowledge = 4 // 知识库ResTypePrompt = 6 // 提示词ResTypeDatabase = 7 // 数据库
)
7.3 数据同步机制
事件驱动的创建同步架构
创建同步流程:
- 创建操作触发:知识库创建操作触发创建领域事件
- 事件发布:通过事件总线发布
ResourceDomainEvent
创建事件 - 事件处理:
resourceHandlerImpl
监听并处理创建事件 - 索引建立:将创建操作同步到ElasticSearch,建立相关索引
- 向量存储:同时在向量数据库中创建知识库向量空间
创建同步核心代码:
// 资源创建事件处理器
type resourceHandlerImpl struct {esClient es.ClientvectorClient vector.Clientlogger logs.Logger
}// 处理知识库创建领域事件
func (r *resourceHandlerImpl) HandleKnowledgeCreateEvent(ctx context.Context, event *entity.ResourceDomainEvent) error {if event.OpType != entity.Created {return fmt.Errorf("invalid operation type for create handler: %v", event.OpType)}// 记录创建操作日志r.logger.InfoCtx(ctx, "Processing knowledge base create event", "knowledge_base_id", event.ResID,"space_id", event.SpaceID,"operator_id", event.OperatorID)// 创建ES索引if err := r.createResourceIndex(ctx, event); err != nil {return fmt.Errorf("create resource index failed: %w", err)}// 创建向量空间if err := r.createVectorSpace(ctx, event); err != nil {r.logger.WarnCtx(ctx, "Failed to create vector space", "knowledge_base_id", event.ResID, "error", err)// 向量空间创建失败不阻塞主流程}return nil
}// 在索引中创建知识库
func (r *resourceHandlerImpl) createResourceIndex(ctx context.Context, event *entity.ResourceDomainEvent) error {indexName := "coze_resource"docID := conv.Int64ToStr(event.ResID)// 构建索引文档document := map[string]interface{}{"res_id": event.ResID,"res_type": 4, // 知识库类型"name": event.Name,"description": event.Description,"owner_id": event.OperatorID,"space_id": event.SpaceID,"embedding_model": event.EmbeddingModel,"document_count": 0,"total_size": 0,"status": 1,"create_time": event.CreateTime,"update_time": event.UpdateTime,}// 执行索引创建err := r.esClient.Create(ctx, indexName, docID, document)if err != nil {r.logger.ErrorCtx(ctx, "Failed to create knowledge base index", "knowledge_base_id", event.ResID, "error", err)return fmt.Errorf("create knowledge base ES index failed: %w", err)}// 验证创建结果exists, checkErr := r.esClient.Exists(ctx, indexName, docID)if checkErr != nil {r.logger.WarnCtx(ctx, "Failed to verify creation", "knowledge_base_id", event.ResID, "error", checkErr)} else if !exists {r.logger.ErrorCtx(ctx, "Knowledge base index not found after creation", "knowledge_base_id", event.ResID)return fmt.Errorf("knowledge base creation verification failed")}r.logger.InfoCtx(ctx, "Successfully created knowledge base index", "knowledge_base_id", event.ResID)return nil
}// 创建向量空间
func (r *resourceHandlerImpl) createVectorSpace(ctx context.Context, event *entity.ResourceDomainEvent) error {spaceName := fmt.Sprintf("kb_%d", event.ResID)// 创建向量集合err := r.vectorClient.CreateCollection(ctx, &vector.CreateCollectionRequest{CollectionName: spaceName,Dimension: 1536, // OpenAI embedding维度MetricType: "COSINE",Description: fmt.Sprintf("Vector space for knowledge base %d", event.ResID),})if err != nil {return fmt.Errorf("create vector collection failed: %w", err)}r.logger.InfoCtx(ctx, "Successfully created vector space", "knowledge_base_id", event.ResID, "collection_name", spaceName)return nil
}
7.4 知识库创建操作存储层设计原则
知识库创建数据一致性保证
- 创建一致性:采用事件驱动模式,保证MySQL创建和ElasticSearch索引建立的最终一致性
- 创建幂等性:知识库创建操作支持重试,避免重复创建导致的数据冲突
- 创建事务边界:知识库数据库创建操作和创建事件发布在同一事务中,保证原子性
- 创建验证:知识库创建完成后验证数据确实被正确存储,确保创建操作的完整性
- 向量空间创建:确保知识库创建时同步创建向量存储空间,维护数据完整性
知识库创建性能优化策略
- 创建索引优化:基于知识库主键ID的创建操作,具有最佳性能
- 批量创建:支持批量创建知识库操作,减少数据库和ES的操作次数
- 异步创建处理:知识库创建事件处理采用异步模式,不阻塞创建主流程
- 创建缓存预热:创建后及时预热知识库相关缓存,提高后续访问性能
- 分批向量创建:向量空间采用分批创建策略,避免大量向量创建时的性能问题
知识库创建操作扩展性考虑
- 分片创建:支持按
space_id
进行分片创建,提高大规模知识库创建的效率 - 创建队列:使用消息队列处理知识库创建事件,支持高并发创建场景
- 创建监控:独立的知识库创建操作监控,及时发现创建异常
- 多存储协调:协调MySQL、ElasticSearch、向量数据库等多存储的创建操作
知识库创建安全保障
- 权限验证:严格的知识库创建权限验证,确保只有授权用户可以创建
- 创建审计:完整的知识库创建操作审计日志,支持创建行为追踪
- 创建限制:实施知识库创建频率限制,防止恶意批量创建
- 数据备份:创建操作完成后及时备份知识库数据,支持数据恢复
- 向量验证:创建知识库时验证向量空间的创建完整性
- 重复检查:创建前检查知识库名称和配置是否重复,避免冲突
7.5 知识库创建操作监控和运维
知识库创建操作监控
// 知识库创建操作监控指标
type KnowledgeCreateMetrics struct {KnowledgeCreateSuccessCount int64 // 知识库创建成功次数KnowledgeCreateFailureCount int64 // 知识库创建失败次数KnowledgeCreateLatency time.Duration // 知识库创建操作延迟LastKnowledgeCreateTime time.Time // 最后知识库创建时间KnowledgeIndexCreateCount int64 // 知识库索引创建次数KnowledgeCreateEventCount int64 // 知识库创建事件处理次数VectorSpaceCreateCount int64 // 向量空间创建次数KnowledgeCreateQueueSize int64 // 知识库创建队列大小KnowledgeCreateRateLimit int64 // 知识库创建频率限制触发次数KnowledgeDuplicateCount int64 // 知识库重复创建检测次数DocumentProcessingCount int64 // 文档处理次数EmbeddingGenerationCount int64 // 向量生成次数
}// 知识库创建监控指标收集
func (r *resourceHandlerImpl) collectKnowledgeCreateMetrics(ctx context.Context, startTime time.Time, knowledgeID int64, err error) {latency := time.Since(startTime)if err != nil {metrics.KnowledgeCreateFailureCount++log.ErrorCtx(ctx, "knowledge base create failed", "knowledge_id", knowledgeID, "error", err, "latency", latency)} else {metrics.KnowledgeCreateSuccessCount++metrics.KnowledgeCreateLatency = latencymetrics.LastKnowledgeCreateTime = time.Now()log.InfoCtx(ctx, "knowledge base create succeeded", "knowledge_id", knowledgeID, "latency", latency)}
}// 知识库创建操作健康检查
func (r *resourceHandlerImpl) knowledgeCreateHealthCheck(ctx context.Context) error {// 检查数据库连接if err := r.db.Ping(); err != nil {return fmt.Errorf("database connection failed: %w", err)}// 检查ES连接if _, err := r.esClient.Ping(ctx); err != nil {return fmt.Errorf("elasticsearch connection failed: %w", err)}// 检查向量数据库连接if err := r.vectorClient.Ping(ctx); err != nil {return fmt.Errorf("vector database connection failed: %w", err)}// 检查知识库创建队列状态if queueSize := r.getKnowledgeCreateQueueSize(); queueSize > 1000 {return fmt.Errorf("knowledge create queue size too large: %d", queueSize)}// 检查向量空间创建状态if vectorErrors := r.getVectorSpaceCreateErrors(); len(vectorErrors) > 10 {return fmt.Errorf("too many vector space create errors: %d", len(vectorErrors))}// 检查创建频率限制状态if rateLimitHits := r.getCreateRateLimitHits(); rateLimitHits > 100 {return fmt.Errorf("too many rate limit hits: %d", rateLimitHits)}// 检查文档处理队列状态if docQueueSize := r.getDocumentProcessingQueueSize(); docQueueSize > 5000 {return fmt.Errorf("document processing queue size too large: %d", docQueueSize)}return nil
}
知识库创建数据质量保证
- 创建一致性检查:定期验证MySQL、ElasticSearch和向量数据库中知识库创建数据的一致性
- 创建完整性验证:确保知识库创建操作完全建立了相关数据、索引和向量空间
- 向量空间验证:验证知识库创建时向量空间的创建完整性和配置正确性
- 创建异常恢复:提供知识库创建失败的重试和修复机制
- 创建性能监控:监控知识库创建操作性能,及时发现和解决性能问题
- 创建审计追踪:完整记录知识库创建操作的执行过程和结果
- 多存储一致性:确保MySQL、ElasticSearch、向量数据库等多存储创建的一致性
- 重复检测:检测和防止知识库重复创建,维护数据唯一性
- 创建回滚机制:创建失败时的数据回滚和清理机制
- 文档处理监控:监控知识库创建过程中的文档处理和向量化进度
- 存储配额检查:创建前检查存储配额,确保有足够空间存储知识库数据
- 嵌入模型验证:验证知识库创建时指定的嵌入模型配置正确性
8. 知识库创建安全和权限验证机制
8.1 知识库创建身份认证
JWT Token验证:
- 创建知识库的所有API请求都需要携带有效的JWT Token
- Token包含用户ID、工作空间权限等关键信息
- 通过中间件统一验证Token的有效性和完整性
// 知识库创建身份验证中间件
func KnowledgeCreateAuthMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {token := ctx.GetHeader("Authorization")if token == nil {ctx.JSON(401, gin.H{"error": "创建知识库需要登录认证"})ctx.Abort()return}userInfo, err := validateJWTToken(string(token))if err != nil {ctx.JSON(401, gin.H{"error": "Token无效,无法创建知识库"})ctx.Abort()return}// 验证用户是否有创建知识库的权限if !userInfo.HasKnowledgeCreatePermission {ctx.JSON(403, gin.H{"error": "用户无创建知识库权限"})ctx.Abort()return}ctx.Set("user_id", userInfo.UserID)ctx.Set("space_id", userInfo.SpaceID)ctx.Set("creator_id", userInfo.UserID)ctx.Next()}
}
8.2 知识库创建工作空间权限控制
空间隔离机制:
- 每个用户只能在其所属工作空间中创建知识库
- 通过
space_id
字段实现知识库创建权限隔离 - 在知识库创建操作中强制验证空间权限
// 知识库创建工作空间权限验证
func (s *KnowledgeApplicationService) validateKnowledgeCreateSpacePermission(ctx context.Context, req *service.CreateKnowledgeRequest) error {userSpaceID := ctx.Value("space_id").(int64)// 验证请求的空间ID是否与用户所属空间一致if req.SpaceID != userSpaceID {return errors.New("无权限在该工作空间创建知识库")}// 检查工作空间是否允许创建知识库spaceConfig, err := s.spaceService.GetSpaceConfig(ctx, userSpaceID)if err != nil {return fmt.Errorf("获取工作空间配置失败: %w", err)}if !spaceConfig.AllowKnowledgeCreation {return errors.New("该工作空间不允许创建知识库")}// 检查工作空间知识库数量限制knowledgeCount, err := s.getSpaceKnowledgeCount(ctx, userSpaceID)if err != nil {return fmt.Errorf("获取工作空间知识库数量失败: %w", err)}if knowledgeCount >= spaceConfig.MaxKnowledgeCount {return fmt.Errorf("工作空间知识库数量已达上限: %d", spaceConfig.MaxKnowledgeCount)}// 检查工作空间存储配额storageUsage, err := s.getSpaceStorageUsage(ctx, userSpaceID)if err != nil {return fmt.Errorf("获取工作空间存储使用量失败: %w", err)}if storageUsage >= spaceConfig.MaxStorageQuota {return fmt.Errorf("工作空间存储配额已满: %d GB", spaceConfig.MaxStorageQuota/1024/1024/1024)}return nil
}
8.3 知识库创建资源级权限验证
知识库创建用户权限验证:
- 严格验证用户是否具有知识库创建权限
- 验证用户在指定工作空间的操作权限
- 通过存储配额和向量空间权限进行资源级控制
// 知识库创建权限验证
func (s *KnowledgeApplicationService) validateKnowledgeCreatePermission(ctx context.Context, req *service.CreateKnowledgeRequest) error {userID := ctx.Value("user_id").(int64)// 验证用户是否具有知识库创建权限hasPermission, err := s.userService.HasKnowledgeCreatePermission(ctx, userID)if err != nil {return fmt.Errorf("验证知识库创建权限失败: %w", err)}if !hasPermission {return errorx.New(errno.ErrKnowledgePermissionCode, errorx.KV(errno.KnowledgeMsgKey, "用户无知识库创建权限"),errorx.KV("user_id", userID))}// 验证工作空间权限spacePermission, err := s.spaceService.CheckUserSpacePermission(ctx, userID, req.SpaceID)if err != nil {return fmt.Errorf("验证工作空间权限失败: %w", err)}if !spacePermission.CanCreateKnowledge {return errorx.New(errno.ErrKnowledgeSpacePermissionCode, errorx.KV(errno.KnowledgeMsgKey, "用户在该工作空间无知识库创建权限"),errorx.KV("user_id", userID),errorx.KV("space_id", req.SpaceID))}// 检查用户创建知识库频率限制createCount, err := s.getUserKnowledgeCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))if err != nil {return fmt.Errorf("检查知识库创建频率失败: %w", err)}if createCount >= 20 { // 24小时内最多创建20个知识库return errorx.New(errno.ErrKnowledgeCreateRateLimitCode, errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 检查知识库名称是否重复exists, err := s.checkKnowledgeNameExists(ctx, req.SpaceID, req.Name)if err != nil {return fmt.Errorf("检查知识库名称重复失败: %w", err)}if exists {return errorx.New(errno.ErrKnowledgeNameExistsCode, errorx.KV("knowledge_name", req.Name),errorx.KV("space_id", req.SpaceID))}// 检查存储配额storageQuota, err := s.checkStorageQuota(ctx, userID, req.SpaceID)if err != nil {return fmt.Errorf("检查存储配额失败: %w", err)}if !storageQuota.CanCreate {return errorx.New(errno.ErrKnowledgeStorageQuotaExceededCode, errorx.KV("user_id", userID),errorx.KV("used_storage", storageQuota.UsedStorage),errorx.KV("max_storage", storageQuota.MaxStorage))}// 检查向量空间权限vectorPermission, err := s.checkVectorSpacePermission(ctx, userID, req.EmbeddingModel)if err != nil {return fmt.Errorf("检查向量空间权限失败: %w", err)}if !vectorPermission.CanCreateSpace {return errorx.New(errno.ErrKnowledgeVectorSpacePermissionCode, errorx.KV("user_id", userID),errorx.KV("embedding_model", req.EmbeddingModel))}return nil
}// 检查知识库名称是否存在
func (s *KnowledgeApplicationService) checkKnowledgeNameExists(ctx context.Context, spaceID int64, name string) (bool, error) {// 检查同一工作空间下是否存在同名知识库knowledges, err := s.DomainSVC.ListKnowledges(ctx, &service.ListKnowledgesRequest{SpaceID: spaceID,PageInfo: entity.PageInfo{PageSize: 1},})if err != nil {return false, err}for _, knowledge := range knowledges.Knowledges {if knowledge.Name == name {return true, nil}}return false, nil
}// 检查存储配额
func (s *KnowledgeApplicationService) checkStorageQuota(ctx context.Context, userID, spaceID int64) (*StorageQuotaInfo, error) {// 获取用户存储配额信息quota, err := s.storageService.GetUserStorageQuota(ctx, userID)if err != nil {return nil, err}// 获取当前使用量usage, err := s.storageService.GetUserStorageUsage(ctx, userID)if err != nil {return nil, err}return &StorageQuotaInfo{UsedStorage: usage,MaxStorage: quota,CanCreate: usage < quota*0.95, // 使用率不超过95%}, nil
}// 检查向量空间权限
func (s *KnowledgeApplicationService) checkVectorSpacePermission(ctx context.Context, userID int64, embeddingModel string) (*VectorSpacePermission, error) {// 检查用户是否有权限使用指定的嵌入模型modelPermission, err := s.embeddingService.CheckModelPermission(ctx, userID, embeddingModel)if err != nil {return nil, err}// 检查向量空间创建配额spaceCount, err := s.vectorService.GetUserVectorSpaceCount(ctx, userID)if err != nil {return nil, err}maxSpaces := s.getUserMaxVectorSpaces(userID)return &VectorSpacePermission{CanCreateSpace: modelPermission && spaceCount < maxSpaces,CurrentSpaces: spaceCount,MaxSpaces: maxSpaces,}, nil
}
8.4 知识库创建API访问控制
创建请求频率限制:
- 实现基于用户的知识库创建频率限制
- 防止恶意批量创建知识库
- 支持不同用户等级的差异化创建限流策略
- 基于文档处理能力的动态限流
创建操作安全验证:
- 严格验证创建请求的合法性
- 防止恶意创建和资源滥用攻击
- 使用多重安全检查机制
- 文档内容安全扫描和验证
- 向量空间创建安全验证
// 知识库创建参数验证
func validateKnowledgeCreateRequest(req *service.CreateKnowledgeRequest) error {if req.SpaceID <= 0 {return errors.New("无效的工作空间ID")}if req.CreatorID <= 0 {return errors.New("无效的创建者ID")}// 验证知识库名称if req.Name == "" {return errors.New("知识库名称不能为空")}if len(req.Name) > 100 {return errors.New("知识库名称长度不能超过100字符")}// 验证知识库描述if req.Description != "" && len(req.Description) > 1000 {return errors.New("知识库描述长度不能超过1000字符")}// 验证嵌入模型if req.EmbeddingModel == "" {return errors.New("嵌入模型不能为空")}if !isValidEmbeddingModel(req.EmbeddingModel) {return errors.New("不支持的嵌入模型")}// 验证分块策略if req.ChunkStrategy != nil {if req.ChunkStrategy.ChunkSize <= 0 || req.ChunkStrategy.ChunkSize > 8192 {return errors.New("分块大小必须在1-8192之间")}if req.ChunkStrategy.ChunkOverlap < 0 || req.ChunkStrategy.ChunkOverlap >= req.ChunkStrategy.ChunkSize {return errors.New("分块重叠大小必须小于分块大小")}}// 验证图标URIif req.IconURI != "" && !isValidIconURI(req.IconURI) {return errors.New("无效的图标URI格式")}return nil
}// 知识库创建操作安全检查
func (s *KnowledgeApplicationService) validateKnowledgeCreateSafety(ctx context.Context, req *service.CreateKnowledgeRequest) error {userID := ctx.Value("user_id").(int64)// 检查用户知识库创建频率限制createCount, err := s.getUserKnowledgeCreateCount(ctx, userID, time.Now().Add(-24*time.Hour))if err != nil {return fmt.Errorf("检查知识库创建频率失败: %w", err)}if createCount >= 20 { // 24小时内最多创建20个知识库return errorx.New(errno.ErrKnowledgeCreateRateLimitCode, errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 检查嵌入模型可用性modelAvailable, err := s.checkEmbeddingModelAvailable(ctx, req.EmbeddingModel)if err != nil {return fmt.Errorf("检查嵌入模型可用性失败: %w", err)}if !modelAvailable {return errors.New("嵌入模型当前不可用")}// 检查用户存储配额storageQuota, err := s.checkUserStorageQuota(ctx, userID)if err != nil {return fmt.Errorf("检查用户存储配额失败: %w", err)}if !storageQuota.CanCreateKnowledge {return errorx.New(errno.ErrKnowledgeStorageQuotaExceededCode, errorx.KV("user_id", userID),errorx.KV("used_storage", storageQuota.UsedStorage),errorx.KV("max_storage", storageQuota.MaxStorage))}// 检查向量数据库连接vectorDBHealthy, err := s.checkVectorDatabaseHealth(ctx)if err != nil {return fmt.Errorf("检查向量数据库健康状态失败: %w", err)}if !vectorDBHealthy {return errors.New("向量数据库当前不可用,无法创建知识库")}// 检查文档处理服务状态docProcessorHealthy, err := s.checkDocumentProcessorHealth(ctx)if err != nil {return fmt.Errorf("检查文档处理服务状态失败: %w", err)}if !docProcessorHealthy {return errors.New("文档处理服务当前不可用,无法创建知识库")}return nil
}// 检查嵌入模型可用性
func (s *KnowledgeApplicationService) checkEmbeddingModelAvailable(ctx context.Context, modelName string) (bool, error) {// 检查模型是否在支持列表中supportedModels := []string{"text-embedding-ada-002","text-embedding-3-small","text-embedding-3-large","bge-large-zh-v1.5","bge-base-zh-v1.5",}for _, model := range supportedModels {if model == modelName {// 检查模型服务是否可用return s.embeddingService.IsModelAvailable(ctx, modelName)}}return false, nil
}// 检查向量数据库健康状态
func (s *KnowledgeApplicationService) checkVectorDatabaseHealth(ctx context.Context) (bool, error) {// 发送健康检查请求到向量数据库healthCheck := &VectorDBHealthCheck{Timeout: 5 * time.Second,}healthy, err := s.vectorService.HealthCheck(ctx, healthCheck)if err != nil {logs.CtxWarnf(ctx, "Vector database health check failed: %v", err)return false, nil}return healthy, nil
}// 检查文档处理服务健康状态
func (s *KnowledgeApplicationService) checkDocumentProcessorHealth(ctx context.Context) (bool, error) {// 检查文档处理队列状态queueStatus, err := s.documentService.GetQueueStatus(ctx)if err != nil {logs.CtxWarnf(ctx, "Document processor queue status check failed: %v", err)return false, nil}// 如果队列积压过多,认为服务不健康if queueStatus.PendingJobs > 10000 {logs.CtxWarnf(ctx, "Document processor queue overloaded: %d pending jobs", queueStatus.PendingJobs)return false, nil}return true, nil
}// 获取用户存储使用量
func (s *PluginApplicationService) getUserStorageUsage(ctx context.Context, userID int64) (int64, error) {// 查询用户所有插件的存储使用量plugins, err := s.DomainSVC.ListUserPlugins(ctx, userID)if err != nil {return 0, fmt.Errorf("获取用户插件列表失败: %w", err)}var totalSize int64for _, plugin := range plugins {// 计算插件manifest和openapi_doc的存储大小if plugin.Manifest != nil {totalSize += int64(len(plugin.Manifest))}if plugin.OpenapiDoc != nil {totalSize += int64(len(plugin.OpenapiDoc))}}return totalSize, nil
}// 获取用户最大存储配额
func (s *PluginApplicationService) getMaxStorageQuota(userID int64) int64 {// 根据用户等级返回不同的存储配额// 这里简化处理,实际应该从用户配置中获取return 100 * 1024 * 1024 // 100MB
}// URL格式验证
func isValidURL(urlStr string) bool {u, err := url.Parse(urlStr)return err == nil && u.Scheme != "" && u.Host != ""
}// 插件类型验证
func isValidPluginType(pluginType common.PluginType) bool {validTypes := []common.PluginType{common.PluginTypeHTTP,common.PluginTypeLocal,}for _, validType := range validTypes {if pluginType == validType {return true}}return false
}
9. 知识库创建错误处理和日志记录
9.1 知识库创建分层错误处理机制
知识库创建错误分类体系:
// 知识库创建错误类型定义
type KnowledgeCreateErrorType intconst (// 知识库创建业务错误ErrKnowledgeCreateBusiness KnowledgeCreateErrorType = iota + 1000ErrKnowledgeNameExistsErrKnowledgePermissionDeniedErrKnowledgeCreateRateLimitErrKnowledgeInvalidParametersErrKnowledgeEmbeddingModelNotSupportedErrKnowledgeStorageQuotaExceededErrKnowledgeDocumentProcessingFailedErrKnowledgeInvalidFileTypeErrKnowledgeFileSizeExceededErrKnowledgeInvalidChunkSizeErrKnowledgeInvalidIconURIErrKnowledgeInvalidSpaceIDErrKnowledgeDuplicateNameErrKnowledgeVectorSpaceCreateFailed// 知识库创建系统错误ErrKnowledgeCreateSystem KnowledgeCreateErrorType = iota + 2000ErrKnowledgeDatabaseConnectionErrKnowledgeElasticSearchTimeoutErrKnowledgeServiceUnavailableErrKnowledgeCreateEventPublishFailedErrKnowledgeIndexCreateFailedErrKnowledgeTransactionRollbackFailedErrKnowledgeVectorStoreTimeoutErrKnowledgeIDGenerationFailedErrKnowledgeEmbeddingServiceFailedErrKnowledgeContentIndexFailed// 知识库创建网络错误ErrKnowledgeCreateNetwork KnowledgeCreateErrorType = iota + 3000ErrKnowledgeCreateRequestTimeoutErrKnowledgeCreateConnectionRefusedErrKnowledgeCreateServiceDownErrKnowledgeCreateESConnectionFailedErrKnowledgeVectorDBConnectionFailedErrKnowledgeEmbeddingAPITimeout
)
知识库创建错误处理流程:
- 捕获阶段:在知识库创建各层级捕获具体错误
- 包装阶段:添加知识库创建操作相关上下文信息和错误码
- 记录阶段:根据错误级别记录知识库创建操作日志
- 响应阶段:返回用户友好的知识库创建错误信息
- 回滚阶段:知识库创建失败时进行必要的数据回滚操作
- 向量处理:处理向量空间创建失败的错误
- 重试机制:对于可重试的创建错误提供重试建议
- 用户指导:为常见创建错误提供解决方案指导
9.2 知识库创建统一错误响应格式
// 知识库创建错误响应结构
type KnowledgeCreateErrorResponse struct {Code int `json:"code"`Message string `json:"message"`Details string `json:"details,omitempty"`TraceID string `json:"trace_id"`KnowledgeID int64 `json:"knowledge_id,omitempty"`Operation string `json:"operation"`CanRetry bool `json:"can_retry"`DocumentsProcessed int `json:"documents_processed,omitempty"`DocumentsFailed int `json:"documents_failed,omitempty"`ValidationErrors []string `json:"validation_errors,omitempty"`SuggestedFix string `json:"suggested_fix,omitempty"`FieldErrors map[string]string `json:"field_errors,omitempty"`VectorSpaceStatus string `json:"vector_space_status,omitempty"`EmbeddingModel string `json:"embedding_model,omitempty"`
}// 知识库创建错误处理中间件
func KnowledgeCreateErrorHandlerMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {defer func() {if err := recover(); err != nil {traceID := ctx.GetString("trace_id")userID := ctx.GetInt64("user_id")spaceID := ctx.GetInt64("space_id")logs.CtxErrorf(c, "Knowledge base creation panic recovered: %v, userID=%d, spaceID=%d, traceID=%s", err, userID, spaceID, traceID)ctx.JSON(500, KnowledgeCreateErrorResponse{Code: 5000,Message: "知识库创建服务器内部错误",TraceID: traceID,Operation: "create_knowledge",CanRetry: true,SuggestedFix: "请稍后重试,如果问题持续存在请联系技术支持",})}}()ctx.Next()}
}// 插件创建业务错误处理
func handlePluginCreateBusinessError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response PluginCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_plugin"switch {case errors.Is(err, errno.ErrPluginInvalidParamCode):response.Code = 400response.Message = "插件参数无效"response.CanRetry = falseresponse.SuggestedFix = "请检查插件名称、描述、服务器URL等参数是否正确"case errors.Is(err, errno.ErrPluginPermissionCode):response.Code = 403response.Message = "无权限创建插件"response.CanRetry = falseresponse.SuggestedFix = "请确保已登录且具有插件创建权限"case errors.Is(err, errno.ErrPluginInvalidManifest):response.Code = 400response.Message = "插件清单格式无效"response.CanRetry = falseresponse.SuggestedFix = "请检查插件清单文件格式是否符合规范"case errors.Is(err, errno.ErrPluginInvalidOpenapi3Doc):response.Code = 400response.Message = "OpenAPI文档格式无效"response.CanRetry = falseresponse.SuggestedFix = "请检查OpenAPI文档格式是否符合OpenAPI 3.0规范"case errors.Is(err, errno.ErrPluginIDExist):response.Code = 409response.Message = "插件ID已存在"response.CanRetry = falseresponse.SuggestedFix = "请使用不同的插件名称或检查是否已存在同名插件"case errors.Is(err, errno.ErrPluginCreateRateLimit):response.Code = 429response.Message = "创建操作过于频繁,请稍后再试"response.CanRetry = trueresponse.SuggestedFix = "请等待一段时间后重试"case errors.Is(err, errno.ErrPluginStorageQuotaExceeded):response.Code = 413response.Message = "存储配额已满"response.CanRetry = falseresponse.SuggestedFix = "请清理不需要的插件或升级存储配额"case errors.Is(err, errno.ErrPluginServerURLNotAccessible):response.Code = 400response.Message = "插件服务器URL不可访问"response.CanRetry = trueresponse.SuggestedFix = "请检查服务器URL是否正确且可访问"default:response.Code = 500response.Message = "插件创建失败"response.CanRetry = trueresponse.SuggestedFix = "请稍后重试,如果问题持续存在请联系技术支持"}ctx.JSON(response.Code, response)
}// 插件创建系统错误处理
func handlePluginCreateSystemError(ctx *app.RequestContext, err error) {traceID := ctx.GetString("trace_id")var response PluginCreateErrorResponseresponse.TraceID = traceIDresponse.Operation = "create_plugin"switch {case errors.Is(err, errno.ErrPluginDatabaseConnection):response.Code = 500response.Message = "插件数据库连接失败"response.CanRetry = trueresponse.SuggestedFix = "数据库连接异常,请稍后重试"case errors.Is(err, errno.ErrPluginElasticSearchTimeout):response.Code = 500response.Message = "插件索引操作超时"response.CanRetry = trueresponse.SuggestedFix = "搜索服务响应超时,请稍后重试"case errors.Is(err, errno.ErrPluginServiceUnavailable):response.Code = 503response.Message = "插件创建服务暂时不可用"response.CanRetry = trueresponse.SuggestedFix = "服务正在维护中,请稍后重试"case errors.Is(err, errno.ErrPluginCreateEventPublishFailed):response.Code = 500response.Message = "插件创建事件发布失败"response.CanRetry = trueresponse.SuggestedFix = "事件发布异常,插件已创建但可能影响搜索,请稍后重试"case errors.Is(err, errno.ErrPluginIndexCreateFailed):response.Code = 500response.Message = "插件索引创建失败"response.CanRetry = trueresponse.SuggestedFix = "搜索索引创建失败,插件已创建但可能无法搜索到"case errors.Is(err, errno.ErrPluginTransactionRollbackFailed):response.Code = 500response.Message = "插件创建事务回滚失败"response.CanRetry = falseresponse.SuggestedFix = "数据一致性异常,请联系技术支持"case errors.Is(err, errno.ErrPluginIDGenerationFailed):response.Code = 500response.Message = "插件ID生成失败"response.CanRetry = trueresponse.SuggestedFix = "ID生成服务异常,请稍后重试"default:response.Code = 5000response.Message = "插件创建失败"response.Details = "服务器内部错误,请稍后重试"response.CanRetry = trueresponse.SuggestedFix = "系统内部错误,请稍后重试或联系技术支持"}ctx.JSON(response.Code, response)
}
9.3 知识库创建日志记录策略
知识库创建日志级别定义:
- DEBUG:知识库创建详细调试信息,包括参数值、向量处理过程、文档分块详情
- INFO:知识库创建关键业务流程信息,如创建开始、参数验证、数据插入、向量空间创建
- WARN:知识库创建潜在问题警告,如存储配额警告、文档处理警告、向量生成警告
- ERROR:知识库创建错误信息,包括创建失败、权限错误、向量空间创建失败
- FATAL:知识库创建严重错误,可能导致数据不一致或向量空间损坏
知识库创建结构化日志格式:
// 知识库创建日志记录示例
func (s *KnowledgeApplicationService) CreateKnowledge(ctx context.Context, req *knowledgeAPI.CreateDatasetRequest) (*knowledgeAPI.CreateDatasetResponse, error) {traceID := generateTraceID()ctx = context.WithValue(ctx, "trace_id", traceID)userID := ctxutil.GetUIDFromCtx(ctx)// 记录知识库创建开始logs.CtxInfof(ctx, "CreateKnowledge started, userID=%d, knowledgeName=%s, spaceID=%d, embeddingModel=%s, traceID=%s", userID, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), traceID)startTime := time.Now()defer func() {duration := time.Since(startTime)logs.CtxInfof(ctx, "CreateKnowledge completed, duration=%dms, traceID=%s", duration.Milliseconds(), traceID)}()// 记录关键步骤logs.CtxInfof(ctx, "Validating knowledge create parameters, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, traceID=%s", req.GetName(), req.GetEmbeddingModel(), req.GetChunkSize(), traceID)// 权限验证日志logs.CtxInfof(ctx, "Validating knowledge create permission, userID=%d, spaceID=%d, traceID=%s", userID, req.GetSpaceID(), traceID)// 存储配额检查日志logs.CtxInfof(ctx, "Checking storage quota, userID=%d, traceID=%s", userID, traceID)// 向量空间创建日志logs.CtxInfof(ctx, "Creating vector space, embeddingModel=%s, dimensions=%d, traceID=%s", req.GetEmbeddingModel(), getModelDimensions(req.GetEmbeddingModel()), traceID)// 数据库创建操作日志logs.CtxInfof(ctx, "Creating knowledge in database, knowledgeName=%s, traceID=%s", req.GetName(), traceID)// ElasticSearch索引创建日志logs.CtxInfof(ctx, "Creating ElasticSearch index, knowledgeID=%d, traceID=%s", knowledgeID, traceID)// 事件发布日志logs.CtxInfof(ctx, "Publishing knowledge create event, knowledgeID=%d, traceID=%s", knowledgeID, traceID)return resp, nil
}// 知识库创建操作审计日志
func (s *KnowledgeApplicationService) logKnowledgeCreateAudit(ctx context.Context, operation string, knowledgeID int64, details map[string]interface{}) {userID := ctx.Value("user_id").(int64)spaceID := ctx.Value("space_id").(int64)traceID := ctx.Value("trace_id").(string)auditLog := map[string]interface{}{"operation": operation,"knowledge_id": knowledgeID,"user_id": userID,"space_id": spaceID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"knowledge_name": details["knowledge_name"],"embedding_model": details["embedding_model"],"chunk_size": details["chunk_size"],"chunk_overlap": details["chunk_overlap"],"vector_space_id": details["vector_space_id"],"storage_used": details["storage_used"],}logs.CtxInfof(ctx, "Knowledge create audit log: %+v", auditLog)
}// 文档处理日志记录
func (s *KnowledgeApplicationService) logDocumentProcessing(ctx context.Context, knowledgeID int64, documentID int64, operation string, details map[string]interface{}) {traceID := ctx.Value("trace_id").(string)docLog := map[string]interface{}{"operation": operation,"knowledge_id": knowledgeID,"document_id": documentID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"file_name": details["file_name"],"file_size": details["file_size"],"chunk_count": details["chunk_count"],"vector_count": details["vector_count"],"processing_time": details["processing_time"],}logs.CtxInfof(ctx, "Document processing log: %+v", docLog)
}// 向量空间操作日志
func (s *KnowledgeApplicationService) logVectorSpaceOperation(ctx context.Context, operation string, vectorSpaceID string, details map[string]interface{}) {traceID := ctx.Value("trace_id").(string)vectorLog := map[string]interface{}{"operation": operation,"vector_space_id": vectorSpaceID,"trace_id": traceID,"timestamp": time.Now().Unix(),"details": details,"embedding_model": details["embedding_model"],"dimensions": details["dimensions"],"vector_count": details["vector_count"],"index_type": details["index_type"],}logs.CtxInfof(ctx, "Vector space operation log: %+v", vectorLog)
}
知识库创建日志内容规范:
- 请求日志:记录用户ID、工作空间ID、知识库名称、嵌入模型、分块策略、TraceID
- 业务日志:记录知识库创建步骤、参数验证结果、权限验证结果、向量空间创建过程
- 性能日志:记录创建接口响应时间、数据库插入时间、向量空间创建时间、文档处理时间
- 错误日志:记录创建错误堆栈、知识库相关上下文信息、向量处理失败原因
- 审计日志:记录知识库的创建操作、创建参数、创建结果、关联的文档和向量信息
- 安全日志:记录创建频率、权限验证、存储配额检查、可疑创建行为
- 文档处理日志:记录文档上传、分块处理、向量生成、索引创建等详细过程
- 向量空间日志:记录向量空间创建、配置、索引构建、查询性能等信息
9.4 知识库创建监控和告警
知识库创建关键指标监控:
- 创建性能:知识库创建响应时间、创建成功率、创建QPS、创建吞吐量
- 资源使用:数据库连接数、向量空间创建延迟、内存使用率、文档处理队列长度
- 业务指标:知识库创建成功率、创建频率分布、不同嵌入模型使用比例、用户创建活跃度
- 安全指标:权限验证通过率、恶意创建尝试次数、创建频率限制触发次数、存储配额检查失败率
- 质量指标:向量空间创建成功率、文档处理成功率、嵌入模型响应率、索引创建成功率
- 存储指标:存储使用量、向量数量、文档数量、索引大小、存储增长率
- 向量处理指标:向量生成延迟、向量维度分布、嵌入模型调用次数、向量相似度计算性能
知识库创建告警策略:
- 创建失败率告警:当知识库创建失败率超过3%时触发告警
- 性能告警:当知识库创建响应时间超过10秒时触发告警
- 资源告警:当数据库连接数超过80%或向量数据库连接异常时触发告警
- 安全告警:当检测到异常创建行为或存储配额滥用时立即触发告警
- 数据一致性告警:当MySQL、ES和向量数据库创建状态不一致时触发告警
- 配额告警:当用户存储使用量超过90%时触发告警
- 向量服务告警:当嵌入模型服务不可用或响应超时时触发告警
- 文档处理告警:当文档处理队列积压超过阈值时触发告警
// 知识库创建监控指标收集
type KnowledgeCreateMetrics struct {CreateSuccessCount int64 // 创建成功次数CreateFailureCount int64 // 创建失败次数CreateLatency time.Duration // 创建延迟PermissionDeniedCount int64 // 权限拒绝次数RateLimitCount int64 // 频率限制次数ParameterValidationFailCount int64 // 参数验证失败次数VectorSpaceCreateLatency time.Duration // 向量空间创建延迟VectorSpaceCreateFailCount int64 // 向量空间创建失败次数DocumentProcessingLatency time.Duration // 文档处理延迟EmbeddingGenerationLatency time.Duration // 嵌入生成延迟EmbeddingModelFailCount int64 // 嵌入模型调用失败次数StorageQuotaExceededCount int64 // 存储配额超限次数IndexCreateLatency time.Duration // 索引创建延迟IndexCreateFailCount int64 // 索引创建失败次数EventPublishLatency time.Duration // 事件发布延迟DatabaseInsertLatency time.Duration // 数据库插入延迟VectorDatabaseLatency time.Duration // 向量数据库操作延迟TotalStorageUsed int64 // 总存储使用量TotalVectorCount int64 // 总向量数量TotalDocumentCount int64 // 总文档数量
}// 知识库创建监控指标上报
func (s *KnowledgeApplicationService) reportCreateMetrics(ctx context.Context, operation string, startTime time.Time, knowledgeID int64, req *knowledgeAPI.CreateDatasetRequest, err error) {latency := time.Since(startTime)if err != nil {metrics.CreateFailureCount++// 根据错误类型分类统计switch {case errors.Is(err, errno.ErrKnowledgePermissionCode):metrics.PermissionDeniedCount++case errors.Is(err, errno.ErrKnowledgeCreateRateLimitCode):metrics.RateLimitCount++case errors.Is(err, errno.ErrKnowledgeInvalidParamCode):metrics.ParameterValidationFailCount++case errors.Is(err, errno.ErrKnowledgeStorageQuotaExceededCode):metrics.StorageQuotaExceededCount++case errors.Is(err, errno.ErrKnowledgeVectorSpaceCreateFailedCode):metrics.VectorSpaceCreateFailCount++case errors.Is(err, errno.ErrKnowledgeEmbeddingModelFailedCode):metrics.EmbeddingModelFailCount++case errors.Is(err, errno.ErrKnowledgeIndexCreateFailedCode):metrics.IndexCreateFailCount++}logs.CtxErrorf(ctx, "Knowledge %s failed, knowledgeName=%s, spaceID=%d, embeddingModel=%s, error=%v, latency=%dms", operation, req.GetName(), req.GetSpaceID(), req.GetEmbeddingModel(), err, latency.Milliseconds())} else {metrics.CreateSuccessCount++metrics.CreateLatency = latency// 记录知识库类型统计embeddingModel := req.GetEmbeddingModel()chunkSize := req.GetChunkSize()logs.CtxInfof(ctx, "Knowledge %s succeeded, knowledgeID=%d, knowledgeName=%s, embeddingModel=%s, chunkSize=%d, latency=%dms", operation, knowledgeID, req.GetName(), embeddingModel, chunkSize, latency.Milliseconds())}// 上报到监控系统s.metricsReporter.Report(ctx, "knowledge_create", map[string]interface{}{"operation": operation,"knowledge_id": knowledgeID,"knowledge_name": req.GetName(),"embedding_model": req.GetEmbeddingModel(),"chunk_size": req.GetChunkSize(),"chunk_overlap": req.GetChunkOverlap(),"space_id": req.GetSpaceID(),"success": err == nil,"latency_ms": latency.Milliseconds(),"error_type": getKnowledgeCreateErrorType(err),"vector_dimensions": getModelDimensions(req.GetEmbeddingModel()),"storage_used": getStorageUsed(ctx, req.GetSpaceID()),})
}// 获取知识库创建错误类型
func getKnowledgeCreateErrorType(err error) string {if err == nil {return "none"}// 基于知识库错误码定义switch {case errors.Is(err, errno.ErrKnowledgePermissionCode):return "permission_denied"case errors.Is(err, errno.ErrKnowledgeNameExistsCode):return "knowledge_exists"case errors.Is(err, errno.ErrKnowledgeInvalidParamCode):return "invalid_parameters"case errors.Is(err, errno.ErrKnowledgeStorageQuotaExceededCode):return "storage_quota_exceeded"case errors.Is(err, errno.ErrKnowledgeVectorSpaceCreateFailedCode):return "vector_space_create_failed"case errors.Is(err, errno.ErrKnowledgeEmbeddingModelFailedCode):return "embedding_model_failed"case errors.Is(err, errno.ErrKnowledgeIndexCreateFailedCode):return "index_create_failed"case errors.Is(err, errno.ErrKnowledgeDocumentProcessingFailedCode):return "document_processing_failed"case errors.Is(err, errno.ErrKnowledgeVectorDatabaseTimeoutCode):return "vector_database_timeout"case errors.Is(err, errno.ErrKnowledgeCreateRateLimitCode):return "rate_limit_exceeded"default:return "system_error"}
}// 知识库创建告警检查
func (s *KnowledgeApplicationService) checkCreateAlerts(ctx context.Context, metrics *KnowledgeCreateMetrics) {// 创建失败率告警totalCreates := metrics.CreateSuccessCount + metrics.CreateFailureCountif totalCreates > 100 {failureRate := float64(metrics.CreateFailureCount) / float64(totalCreates)if failureRate > 0.03 { // 3%s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "knowledge_create_failure_rate",Message: fmt.Sprintf("知识库创建失败率过高: %.2f%%", failureRate*100),Metrics: map[string]interface{}{"failure_rate": failureRate,"total_creates": totalCreates,},})}}// 性能告警if metrics.CreateLatency > 10*time.Second {s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "knowledge_create_latency",Message: fmt.Sprintf("知识库创建延迟过高: %dms", metrics.CreateLatency.Milliseconds()),Metrics: map[string]interface{}{"latency_ms": metrics.CreateLatency.Milliseconds(),},})}// 存储配额告警if metrics.StorageQuotaExceededCount > 10 {s.alertManager.SendAlert(ctx, &Alert{Level: "critical",Type: "knowledge_storage_quota_exceeded",Message: fmt.Sprintf("存储配额超限次数过多: %d", metrics.StorageQuotaExceededCount),Metrics: map[string]interface{}{"quota_exceeded_count": metrics.StorageQuotaExceededCount,},})}// 向量空间创建失败告警if metrics.VectorSpaceCreateFailCount > 5 {s.alertManager.SendAlert(ctx, &Alert{Level: "critical",Type: "knowledge_vector_space_create_failed",Message: fmt.Sprintf("向量空间创建失败次数过多: %d", metrics.VectorSpaceCreateFailCount),Metrics: map[string]interface{}{"vector_space_fail_count": metrics.VectorSpaceCreateFailCount,},})}// 嵌入模型失败告警if metrics.EmbeddingModelFailCount > 20 {s.alertManager.SendAlert(ctx, &Alert{Level: "warning",Type: "knowledge_embedding_model_failed",Message: fmt.Sprintf("嵌入模型调用失败次数过多: %d", metrics.EmbeddingModelFailCount),Metrics: map[string]interface{}{"embedding_fail_count": metrics.EmbeddingModelFailCount,},})}
}