Coze源码分析-资源库-创建知识库-后端源码-详细流程梳理
10. 知识库创建流程图
10.1 CreateKnowledge接口完整调用流程
用户登录 Coze 平台点击"资源库" → 点击右上角"+"号 → 点击"知识库"场景的后端处理流程:
用户点击"知识库" → 前端发起请求 → API网关路由 → Handler处理 → 业务服务层 → 数据持久化层 → 索引创建层 → 响应返回↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
前端填写表单 HTTP POST请求 路由匹配 参数验证 权限检查 MySQL插入 ES索引创建 JSON响应↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓
/create-knowledge /api/knowledge/ Handler 请求绑定 用户身份 knowledge 事件发布 创建结果create 函数调用 参数校验 Session 表插入 异步处理 状态返回CreateKnowledge 验证 数据存储 ES索引 ↓KnowledgeApplicationService↓身份验证(登录检查)↓权限验证(创建权限检查)↓参数验证(名称、描述、类型等)↓知识库类型验证↓数据库创建事务↓创建事件发布↓返回创建结果
10.2 知识库创建详细流程说明
1. API网关层(路由处理)
文件位置:backend/api/handler/coze/knowledge_service.go
// @router /api/knowledge/create [POST]
func CreateKnowledge(ctx context.Context, c *app.RequestContext) {var err errorvar req knowledge.CreateKnowledgeRequest// 1. 请求参数绑定和验证err = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}// 2. 知识库创建参数校验if req.GetName() == "" {invalidParamRequestResponse(c, "knowledge name is invalid")return}if req.GetDescription() == "" {invalidParamRequestResponse(c, "knowledge description is invalid")return}if req.SpaceID <= 0 {invalidParamRequestResponse(c, "spaceID is invalid")return}if req.KnowledgeType == nil {invalidParamRequestResponse(c, "knowledge type is invalid")return}// 3. 调用知识库创建服务resp, err := knowledge.KnowledgeApplicationSVC.CreateKnowledge(ctx, &req)if err != nil {handleKnowledgeCreateBusinessError(c, err)return}// 4. 返回JSON响应c.JSON(consts.StatusOK, resp)
}
处理步骤:
- 路由匹配:
POST /api/knowledge/create
- 参数绑定:将HTTP请求体绑定到
CreateKnowledgeRequest
结构体 - 参数验证:验证知识库名称、描述、空间ID、知识库类型等参数的有效性
- 服务调用:调用知识库服务的
CreateKnowledge
方法 - 响应返回:返回JSON格式的创建结果
2. 业务服务层(KnowledgeApplicationService)
文件位置:backend/application/knowledge/knowledge.go
func (k *KnowledgeApplicationService) CreateKnowledge(ctx context.Context, req *knowledgeAPI.CreateKnowledgeRequest) (resp *knowledgeAPI.CreateKnowledgeResponse, err error) {// 1. 用户身份验证userID := ctxutil.GetUIDFromCtx(ctx)if userID == nil {return nil, errorx.New(errno.ErrKnowledgePermissionCode, errorx.KV(errno.KnowledgeMsgKey, "session is required"))}// 2. 验证知识库类型_knowledgeType, ok := model.ToKnowledgeType(req.GetKnowledgeType())if !ok {return nil, fmt.Errorf("invalid knowledge type '%d'", req.GetKnowledgeType())}knowledgeType := ptr.Of(_knowledgeType)// 3. 验证知识库配置var config *model.KnowledgeConfigif req.Config != nil {config = &model.KnowledgeConfig{ChunkSize: req.Config.ChunkSize,ChunkOverlap: req.Config.ChunkOverlap,Separator: req.Config.Separator,Language: req.Config.Language,}}// 4. 验证向量化配置var vectorConfig *model.VectorConfigif req.VectorConfig != nil {vectorConfig = &model.VectorConfig{Model: req.VectorConfig.Model,Dimension: req.VectorConfig.Dimension,Provider: req.VectorConfig.Provider,}}// 5. 构建创建请求r := &service.CreateKnowledgeRequest{KnowledgeType: knowledgeType,SpaceID: req.GetSpaceID(),CreatorID: *userID,Name: req.GetName(),Description: req.GetDescription(),Config: config,VectorConfig: vectorConfig,Tags: req.Tags,}// 6. 执行创建操作knowledgeID, err := k.DomainSVC.CreateKnowledge(ctx, r)if err != nil {return nil, errorx.Wrapf(err, "CreateKnowledge failed")}// 7. 发布创建事件err = k.eventbus.PublishResources(ctx, &searchEntity.ResourceDomainEvent{OpType: searchEntity.Created,Resource: &searchEntity.ResourceDocument{ResType: resCommon.ResType_Knowledge,ResSubType: ptr.Of(int32(req.GetKnowledgeType())),ResID: knowledgeID,Name: &req.Name,SpaceID: &req.SpaceID,OwnerID: userID,PublishStatus: ptr.Of(resCommon.PublishStatus_UnPublished),CreateTimeMS: ptr.Of(time.Now().UnixMilli()),},})if err != nil {return nil, fmt.Errorf("publish resource '%d' failed, err=%v", knowledgeID, err)}// 8. 返回创建结果resp = &knowledgeAPI.CreateKnowledgeResponse{KnowledgeID: knowledgeID,}return resp, nil
}
核心功能:
- 身份验证:从上下文中提取用户ID,验证用户登录状态
- 参数验证:验证知识库类型、配置参数、向量化配置等关键参数
- 权限检查:验证用户是否具有知识库创建权限
- 数据构建:构建完整的知识库创建请求数据结构
- 数据创建:调用领域服务在数据库中创建知识库记录
- 事件发布:发布知识库创建事件用于异步索引建立
- 响应组装:构建标准化的创建响应数据结构
3. 领域服务层(知识库创建领域服务)
核心功能:
- 参数验证:验证知识库创建参数的完整性和有效性
- 权限检查:验证用户是否具有知识库创建权限
- 配额验证:检查用户的知识库创建配额限制
- 数据持久化:将知识库数据保存到数据库中
// 验证知识库创建权限
func (s *KnowledgeDomainService) validateCreatePermission(ctx context.Context, userID int64, spaceID int64) error {// 1. 检查用户是否有空间访问权限hasAccess, err := s.spaceRepo.CheckUserAccess(ctx, userID, spaceID)if err != nil {return fmt.Errorf("检查空间访问权限失败: %w", err)}if !hasAccess {return errorx.New(errno.ErrKnowledgePermissionCode, errorx.KV("msg", "用户无权限在此空间创建知识库"),errorx.KV("user_id", userID),errorx.KV("space_id", spaceID))}// 2. 检查用户角色权限userRole, err := s.userRepo.GetUserRole(ctx, userID, spaceID)if err != nil {return fmt.Errorf("获取用户角色失败: %w", err)}if userRole < model.RoleEditor {return errorx.New(errno.ErrKnowledgePermissionCode,errorx.KV("msg", "用户角色不足,无法创建知识库"),errorx.KV("user_id", userID),errorx.KV("required_role", model.RoleEditor),errorx.KV("current_role", userRole))}return nil
}// 验证知识库创建配额
func (s *KnowledgeDomainService) validateCreateQuota(ctx context.Context, userID int64, spaceID int64) error {// 1. 检查创建频率限制createCount, err := s.getRecentCreateCount(ctx, userID)if err != nil {return fmt.Errorf("获取创建频率失败: %w", err)}if createCount >= 10 { // 每小时最多创建10个知识库return errorx.New(errno.ErrKnowledgeCreateRateLimitCode, errorx.KV("msg", "创建操作过于频繁,请稍后再试"),errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 2. 检查空间知识库数量限制knowledgeCount, err := s.getSpaceKnowledgeCount(ctx, spaceID)if err != nil {return fmt.Errorf("获取空间知识库数量失败: %w", err)}maxKnowledges := s.getMaxKnowledgesPerSpace(ctx, spaceID)if knowledgeCount >= maxKnowledges {return errorx.New(errno.ErrKnowledgeQuotaExceededCode,errorx.KV("msg", "空间知识库数量已达上限"),errorx.KV("space_id", spaceID),errorx.KV("current_count", knowledgeCount),errorx.KV("max_count", maxKnowledges))}return nil
}// 验证知识库参数
func (s *KnowledgeDomainService) validateKnowledgeParams(ctx context.Context, req *CreateKnowledgeRequest) error {// 1. 验证知识库名称if len(req.Name) == 0 {return errorx.New(errno.ErrKnowledgeInvalidParamCode,errorx.KV("msg", "知识库名称不能为空"),errorx.KV("field", "name"))}if len(req.Name) > 100 {return errorx.New(errno.ErrKnowledgeInvalidParamCode,errorx.KV("msg", "知识库名称长度不能超过100个字符"),errorx.KV("field", "name"),errorx.KV("max_length", 100))}// 2. 验证知识库描述if len(req.Description) > 1000 {return errorx.New(errno.ErrKnowledgeInvalidParamCode,errorx.KV("msg", "知识库描述长度不能超过1000个字符"),errorx.KV("field", "description"),errorx.KV("max_length", 1000))}// 3. 验证知识库类型if req.KnowledgeType == nil {return errorx.New(errno.ErrKnowledgeInvalidParamCode,errorx.KV("msg", "知识库类型不能为空"),errorx.KV("field", "knowledge_type"))}// 4. 检查知识库名称唯一性exists, err := s.checkKnowledgeNameExists(ctx, req.Name, req.SpaceID)if err != nil {return fmt.Errorf("检查知识库名称唯一性失败: %w", err)}if exists {return errorx.New(errno.ErrKnowledgeNameExistsCode,errorx.KV("msg", "知识库名称已存在"),errorx.KV("name", req.Name),errorx.KV("space_id", req.SpaceID))}return nil
}
4. 数据持久化层
MySQL数据库操作:
- 表名:
plugin_resource
- 操作类型:INSERT操作
- 事务处理:确保数据一致性
- 创建策略:插入新记录并建立关联关系
- 索引建立:为新创建的插件建立搜索索引
// 插件创建数据库操作
func (r *PluginRepository) Create(ctx context.Context, plugin *model.Plugin) error {// 插入插件主记录query := `INSERT INTO plugin_resource (plugin_type, space_id, developer_id, icon_uri, project_id,name, description, server_url, common_params, status,created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`now := time.Now()result, err := r.db.ExecContext(ctx, query, plugin.PluginType, plugin.SpaceID, plugin.DeveloperID, plugin.IconURI, plugin.ProjectID, plugin.Name, plugin.Desc,plugin.ServerURL, plugin.CommonParams, plugin.Status,now, now)if err != nil {return fmt.Errorf("创建插件失败: %w", err)}pluginID, err := result.LastInsertId()if err != nil {return fmt.Errorf("获取插件ID失败: %w", err)}plugin.ID = pluginIDplugin.CreatedAt = nowplugin.UpdatedAt = nowreturn nil
}// 创建插件认证信息
func (r *PluginRepository) CreateAuth(ctx context.Context, auth *model.PluginAuth) error {query := `INSERT INTO plugin_auth (plugin_id, authz_type, location, auth_key, service_token,authz_sub_type, authz_payload, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`now := time.Now()result, err := r.db.ExecContext(ctx, query,auth.PluginID, auth.AuthzType, auth.Location, auth.Key,auth.ServiceToken, auth.AuthzSubType, auth.AuthzPayload,now, now)if err != nil {return fmt.Errorf("创建插件认证信息失败: %w", err)}authID, err := result.LastInsertId()if err != nil {return fmt.Errorf("获取认证ID失败: %w", err)}auth.ID = authIDauth.CreatedAt = nowauth.UpdatedAt = nowreturn nil
}
5. 事件发布层(EventPublisher)
创建事件发布流程:
// 插件创建事件发布
func (p *EventPublisher) PublishPluginCreatedEvent(ctx context.Context, event *entity.PluginCreatedEvent) error {// 1. 构建创建事件消息eventMsg := &message.PluginCreatedMessage{PluginID: event.PluginID,SpaceID: event.SpaceID,CreatorID: event.CreatorID,Name: event.Name,PluginType: event.PluginType,CreatedAt: event.CreatedAt.Unix(),EventType: "plugin_created",ServerURL: event.ServerURL,Description: event.Description,}// 2. 序列化事件数据data, err := json.Marshal(eventMsg)if err != nil {return fmt.Errorf("序列化创建事件失败: %w", err)}// 3. 发布到消息队列err = p.messageQueue.Publish(ctx, "plugin_create_events", data)if err != nil {return fmt.Errorf("发布创建事件失败: %w", err)}logs.CtxInfof(ctx, "Published plugin created event, pluginID=%d, creator=%d", event.PluginID, event.CreatorID)return nil
}// 异步创建事件处理器
func (h *PluginEventHandler) HandlePluginCreatedEvent(ctx context.Context, event *entity.PluginCreatedEvent) error {// 1. 建立ElasticSearch索引err := h.addToESIndex(ctx, event)if err != nil {logs.CtxErrorf(ctx, "Failed to add to ES index: %v", err)return err}// 2. 更新缓存数据err = h.updateCache(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to update cache: %v", err)}// 3. 初始化插件配置err = h.initializePluginConfig(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to initialize plugin config: %v", err)}// 4. 发送创建通知err = h.sendCreateNotification(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to send create notification: %v", err)}// 5. 更新统计数据err = h.updateCreateStatistics(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to update create statistics: %v", err)}return nil
}// ElasticSearch索引创建
func (h *PluginEventHandler) addToESIndex(ctx context.Context, event *entity.PluginCreatedEvent) error {// 构建ES文档doc := map[string]interface{}{"id": event.PluginID,"space_id": event.SpaceID,"creator_id": event.CreatorID,"name": event.Name,"description": event.Description,"plugin_type": event.PluginType,"server_url": event.ServerURL,"created_at": event.CreatedAt,"status": "draft",}// 添加到ES索引err := h.esClient.Index(ctx, "coze_resource", fmt.Sprintf("%d", event.PluginID), doc)if err != nil {return fmt.Errorf("创建ES索引失败: %w", err)}logs.CtxInfof(ctx, "Added plugin to ES index, pluginID=%d", event.PluginID)return nil
}
创建事件处理内容:
- 索引建立:在ElasticSearch中创建插件文档索引
- 缓存更新:更新相关的缓存数据
- 配置初始化:初始化插件相关的配置文件
- 通知发送:向相关用户发送创建成功通知
- 统计更新:更新插件创建相关的统计数据
6. 响应数据结构
RegisterPluginMetaResponse:
type RegisterPluginMetaResponse struct {Code int64 `json:"code"` // 响应码Msg string `json:"msg"` // 响应消息Success bool `json:"success"` // 创建是否成功PluginID int64 `json:"plugin_id"` // 新创建的插件IDCreatedAt int64 `json:"created_at"` // 创建时间戳BaseResp *base.BaseResp `json:"base_resp"` // 基础响应信息
}
RegisterPluginMetaRequest请求结构:
type RegisterPluginMetaRequest struct {PluginType int32 `json:"plugin_type" binding:"required"` // 插件类型SpaceID int64 `json:"space_id" binding:"required"` // 工作空间IDName string `json:"name" binding:"required"` // 插件名称Desc string `json:"desc,omitempty"` // 插件描述URL string `json:"url,omitempty"` // 服务器URLAuthType int32 `json:"auth_type" binding:"required"` // 认证类型SubAuthType *int32 `json:"sub_auth_type,omitempty"` // 认证子类型Location string `json:"location,omitempty"` // 参数位置Key string `json:"key,omitempty"` // 认证密钥ServiceToken string `json:"service_token,omitempty"` // 服务令牌ProjectID *int64 `json:"project_id,omitempty"` // 项目IDIcon *Icon `json:"icon,omitempty"` // 插件图标CommonParams string `json:"common_params,omitempty"` // 通用参数OauthInfo *OAuthInfo `json:"oauth_info,omitempty"` // OAuth信息AuthPayload string `json:"auth_payload,omitempty"` // 认证载荷
}
PluginCreatedEvent事件结构:
type PluginCreatedEvent struct {PluginID int64 `json:"plugin_id"` // 插件IDSpaceID int64 `json:"space_id"` // 工作空间IDCreatorID int64 `json:"creator_id"` // 创建者IDName string `json:"name"` // 插件名称Description string `json:"description"` // 插件描述PluginType int32 `json:"plugin_type"` // 插件类型ServerURL string `json:"server_url"` // 服务器URLCreatedAt time.Time `json:"created_at"` // 创建时间EventType string `json:"event_type"` // 事件类型
}
响应内容说明:
- 成功响应:返回创建成功状态和新创建的插件ID
- 错误响应:返回具体的错误码和错误信息(如权限不足、参数无效等)
- 参数验证:确保所有必需参数都已提供且格式正确
- 时间戳:记录创建操作的具体时间