当前位置: 首页 > news >正文

Coze源码分析-资源库-创建知识库-后端源码-详细流程梳理

10. 知识库创建流程图

10.1 CreateKnowledge接口完整调用流程

用户登录 Coze 平台点击"资源库" → 点击右上角"+"号 → 点击"知识库"场景的后端处理流程:

用户点击"知识库" → 前端发起请求 → API网关路由 → Handler处理 → 业务服务层 → 数据持久化层 → 索引创建层 → 响应返回↓                    ↓           ↓          ↓         ↓          ↓            ↓          ↓
前端填写表单        HTTP POST请求   路由匹配     参数验证    权限检查     MySQL插入    ES索引创建   JSON响应↓                    ↓           ↓          ↓         ↓          ↓            ↓          ↓
/create-knowledge   /api/knowledge/  Handler    请求绑定   用户身份    knowledge    事件发布     创建结果create           函数调用   参数校验   Session    表插入       异步处理     状态返回CreateKnowledge      验证       数据存储     ES索引       ↓KnowledgeApplicationService↓身份验证(登录检查)↓权限验证(创建权限检查)↓参数验证(名称、描述、类型等)↓知识库类型验证↓数据库创建事务↓创建事件发布↓返回创建结果

10.2 知识库创建详细流程说明

1. API网关层(路由处理)

文件位置backend/api/handler/coze/knowledge_service.go

// @router /api/knowledge/create [POST]
func CreateKnowledge(ctx context.Context, c *app.RequestContext) {var err errorvar req knowledge.CreateKnowledgeRequest// 1. 请求参数绑定和验证err = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}// 2. 知识库创建参数校验if req.GetName() == "" {invalidParamRequestResponse(c, "knowledge name is invalid")return}if req.GetDescription() == "" {invalidParamRequestResponse(c, "knowledge description is invalid")return}if req.SpaceID <= 0 {invalidParamRequestResponse(c, "spaceID is invalid")return}if req.KnowledgeType == nil {invalidParamRequestResponse(c, "knowledge type is invalid")return}// 3. 调用知识库创建服务resp, err := knowledge.KnowledgeApplicationSVC.CreateKnowledge(ctx, &req)if err != nil {handleKnowledgeCreateBusinessError(c, err)return}// 4. 返回JSON响应c.JSON(consts.StatusOK, resp)
}

处理步骤

  • 路由匹配POST /api/knowledge/create
  • 参数绑定:将HTTP请求体绑定到 CreateKnowledgeRequest 结构体
  • 参数验证:验证知识库名称、描述、空间ID、知识库类型等参数的有效性
  • 服务调用:调用知识库服务的 CreateKnowledge 方法
  • 响应返回:返回JSON格式的创建结果
2. 业务服务层(KnowledgeApplicationService)

文件位置backend/application/knowledge/knowledge.go

func (k *KnowledgeApplicationService) CreateKnowledge(ctx context.Context, req *knowledgeAPI.CreateKnowledgeRequest) (resp *knowledgeAPI.CreateKnowledgeResponse, err error) {// 1. 用户身份验证userID := ctxutil.GetUIDFromCtx(ctx)if userID == nil {return nil, errorx.New(errno.ErrKnowledgePermissionCode, errorx.KV(errno.KnowledgeMsgKey, "session is required"))}// 2. 验证知识库类型_knowledgeType, ok := model.ToKnowledgeType(req.GetKnowledgeType())if !ok {return nil, fmt.Errorf("invalid knowledge type '%d'", req.GetKnowledgeType())}knowledgeType := ptr.Of(_knowledgeType)// 3. 验证知识库配置var config *model.KnowledgeConfigif req.Config != nil {config = &model.KnowledgeConfig{ChunkSize:    req.Config.ChunkSize,ChunkOverlap: req.Config.ChunkOverlap,Separator:    req.Config.Separator,Language:     req.Config.Language,}}// 4. 验证向量化配置var vectorConfig *model.VectorConfigif req.VectorConfig != nil {vectorConfig = &model.VectorConfig{Model:      req.VectorConfig.Model,Dimension:  req.VectorConfig.Dimension,Provider:   req.VectorConfig.Provider,}}// 5. 构建创建请求r := &service.CreateKnowledgeRequest{KnowledgeType: knowledgeType,SpaceID:       req.GetSpaceID(),CreatorID:     *userID,Name:          req.GetName(),Description:   req.GetDescription(),Config:        config,VectorConfig:  vectorConfig,Tags:          req.Tags,}// 6. 执行创建操作knowledgeID, err := k.DomainSVC.CreateKnowledge(ctx, r)if err != nil {return nil, errorx.Wrapf(err, "CreateKnowledge failed")}// 7. 发布创建事件err = k.eventbus.PublishResources(ctx, &searchEntity.ResourceDomainEvent{OpType: searchEntity.Created,Resource: &searchEntity.ResourceDocument{ResType:       resCommon.ResType_Knowledge,ResSubType:    ptr.Of(int32(req.GetKnowledgeType())),ResID:         knowledgeID,Name:          &req.Name,SpaceID:       &req.SpaceID,OwnerID:       userID,PublishStatus: ptr.Of(resCommon.PublishStatus_UnPublished),CreateTimeMS:  ptr.Of(time.Now().UnixMilli()),},})if err != nil {return nil, fmt.Errorf("publish resource '%d' failed, err=%v", knowledgeID, err)}// 8. 返回创建结果resp = &knowledgeAPI.CreateKnowledgeResponse{KnowledgeID: knowledgeID,}return resp, nil
}

核心功能

  • 身份验证:从上下文中提取用户ID,验证用户登录状态
  • 参数验证:验证知识库类型、配置参数、向量化配置等关键参数
  • 权限检查:验证用户是否具有知识库创建权限
  • 数据构建:构建完整的知识库创建请求数据结构
  • 数据创建:调用领域服务在数据库中创建知识库记录
  • 事件发布:发布知识库创建事件用于异步索引建立
  • 响应组装:构建标准化的创建响应数据结构
3. 领域服务层(知识库创建领域服务)

核心功能

  • 参数验证:验证知识库创建参数的完整性和有效性
  • 权限检查:验证用户是否具有知识库创建权限
  • 配额验证:检查用户的知识库创建配额限制
  • 数据持久化:将知识库数据保存到数据库中
// 验证知识库创建权限
func (s *KnowledgeDomainService) validateCreatePermission(ctx context.Context, userID int64, spaceID int64) error {// 1. 检查用户是否有空间访问权限hasAccess, err := s.spaceRepo.CheckUserAccess(ctx, userID, spaceID)if err != nil {return fmt.Errorf("检查空间访问权限失败: %w", err)}if !hasAccess {return errorx.New(errno.ErrKnowledgePermissionCode, errorx.KV("msg", "用户无权限在此空间创建知识库"),errorx.KV("user_id", userID),errorx.KV("space_id", spaceID))}// 2. 检查用户角色权限userRole, err := s.userRepo.GetUserRole(ctx, userID, spaceID)if err != nil {return fmt.Errorf("获取用户角色失败: %w", err)}if userRole < model.RoleEditor {return errorx.New(errno.ErrKnowledgePermissionCode,errorx.KV("msg", "用户角色不足,无法创建知识库"),errorx.KV("user_id", userID),errorx.KV("required_role", model.RoleEditor),errorx.KV("current_role", userRole))}return nil
}// 验证知识库创建配额
func (s *KnowledgeDomainService) validateCreateQuota(ctx context.Context, userID int64, spaceID int64) error {// 1. 检查创建频率限制createCount, err := s.getRecentCreateCount(ctx, userID)if err != nil {return fmt.Errorf("获取创建频率失败: %w", err)}if createCount >= 10 { // 每小时最多创建10个知识库return errorx.New(errno.ErrKnowledgeCreateRateLimitCode, errorx.KV("msg", "创建操作过于频繁,请稍后再试"),errorx.KV("user_id", userID),errorx.KV("create_count", createCount))}// 2. 检查空间知识库数量限制knowledgeCount, err := s.getSpaceKnowledgeCount(ctx, spaceID)if err != nil {return fmt.Errorf("获取空间知识库数量失败: %w", err)}maxKnowledges := s.getMaxKnowledgesPerSpace(ctx, spaceID)if knowledgeCount >= maxKnowledges {return errorx.New(errno.ErrKnowledgeQuotaExceededCode,errorx.KV("msg", "空间知识库数量已达上限"),errorx.KV("space_id", spaceID),errorx.KV("current_count", knowledgeCount),errorx.KV("max_count", maxKnowledges))}return nil
}// 验证知识库参数
func (s *KnowledgeDomainService) validateKnowledgeParams(ctx context.Context, req *CreateKnowledgeRequest) error {// 1. 验证知识库名称if len(req.Name) == 0 {return errorx.New(errno.ErrKnowledgeInvalidParamCode,errorx.KV("msg", "知识库名称不能为空"),errorx.KV("field", "name"))}if len(req.Name) > 100 {return errorx.New(errno.ErrKnowledgeInvalidParamCode,errorx.KV("msg", "知识库名称长度不能超过100个字符"),errorx.KV("field", "name"),errorx.KV("max_length", 100))}// 2. 验证知识库描述if len(req.Description) > 1000 {return errorx.New(errno.ErrKnowledgeInvalidParamCode,errorx.KV("msg", "知识库描述长度不能超过1000个字符"),errorx.KV("field", "description"),errorx.KV("max_length", 1000))}// 3. 验证知识库类型if req.KnowledgeType == nil {return errorx.New(errno.ErrKnowledgeInvalidParamCode,errorx.KV("msg", "知识库类型不能为空"),errorx.KV("field", "knowledge_type"))}// 4. 检查知识库名称唯一性exists, err := s.checkKnowledgeNameExists(ctx, req.Name, req.SpaceID)if err != nil {return fmt.Errorf("检查知识库名称唯一性失败: %w", err)}if exists {return errorx.New(errno.ErrKnowledgeNameExistsCode,errorx.KV("msg", "知识库名称已存在"),errorx.KV("name", req.Name),errorx.KV("space_id", req.SpaceID))}return nil
}
4. 数据持久化层

MySQL数据库操作

  • 表名plugin_resource
  • 操作类型:INSERT操作
  • 事务处理:确保数据一致性
  • 创建策略:插入新记录并建立关联关系
  • 索引建立:为新创建的插件建立搜索索引
// 插件创建数据库操作
func (r *PluginRepository) Create(ctx context.Context, plugin *model.Plugin) error {// 插入插件主记录query := `INSERT INTO plugin_resource (plugin_type, space_id, developer_id, icon_uri, project_id,name, description, server_url, common_params, status,created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`now := time.Now()result, err := r.db.ExecContext(ctx, query, plugin.PluginType, plugin.SpaceID, plugin.DeveloperID, plugin.IconURI, plugin.ProjectID, plugin.Name, plugin.Desc,plugin.ServerURL, plugin.CommonParams, plugin.Status,now, now)if err != nil {return fmt.Errorf("创建插件失败: %w", err)}pluginID, err := result.LastInsertId()if err != nil {return fmt.Errorf("获取插件ID失败: %w", err)}plugin.ID = pluginIDplugin.CreatedAt = nowplugin.UpdatedAt = nowreturn nil
}// 创建插件认证信息
func (r *PluginRepository) CreateAuth(ctx context.Context, auth *model.PluginAuth) error {query := `INSERT INTO plugin_auth (plugin_id, authz_type, location, auth_key, service_token,authz_sub_type, authz_payload, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`now := time.Now()result, err := r.db.ExecContext(ctx, query,auth.PluginID, auth.AuthzType, auth.Location, auth.Key,auth.ServiceToken, auth.AuthzSubType, auth.AuthzPayload,now, now)if err != nil {return fmt.Errorf("创建插件认证信息失败: %w", err)}authID, err := result.LastInsertId()if err != nil {return fmt.Errorf("获取认证ID失败: %w", err)}auth.ID = authIDauth.CreatedAt = nowauth.UpdatedAt = nowreturn nil
}
5. 事件发布层(EventPublisher)

创建事件发布流程

// 插件创建事件发布
func (p *EventPublisher) PublishPluginCreatedEvent(ctx context.Context, event *entity.PluginCreatedEvent) error {// 1. 构建创建事件消息eventMsg := &message.PluginCreatedMessage{PluginID:    event.PluginID,SpaceID:     event.SpaceID,CreatorID:   event.CreatorID,Name:        event.Name,PluginType:  event.PluginType,CreatedAt:   event.CreatedAt.Unix(),EventType:   "plugin_created",ServerURL:   event.ServerURL,Description: event.Description,}// 2. 序列化事件数据data, err := json.Marshal(eventMsg)if err != nil {return fmt.Errorf("序列化创建事件失败: %w", err)}// 3. 发布到消息队列err = p.messageQueue.Publish(ctx, "plugin_create_events", data)if err != nil {return fmt.Errorf("发布创建事件失败: %w", err)}logs.CtxInfof(ctx, "Published plugin created event, pluginID=%d, creator=%d", event.PluginID, event.CreatorID)return nil
}// 异步创建事件处理器
func (h *PluginEventHandler) HandlePluginCreatedEvent(ctx context.Context, event *entity.PluginCreatedEvent) error {// 1. 建立ElasticSearch索引err := h.addToESIndex(ctx, event)if err != nil {logs.CtxErrorf(ctx, "Failed to add to ES index: %v", err)return err}// 2. 更新缓存数据err = h.updateCache(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to update cache: %v", err)}// 3. 初始化插件配置err = h.initializePluginConfig(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to initialize plugin config: %v", err)}// 4. 发送创建通知err = h.sendCreateNotification(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to send create notification: %v", err)}// 5. 更新统计数据err = h.updateCreateStatistics(ctx, event)if err != nil {logs.CtxWarnf(ctx, "Failed to update create statistics: %v", err)}return nil
}// ElasticSearch索引创建
func (h *PluginEventHandler) addToESIndex(ctx context.Context, event *entity.PluginCreatedEvent) error {// 构建ES文档doc := map[string]interface{}{"id":          event.PluginID,"space_id":    event.SpaceID,"creator_id":  event.CreatorID,"name":        event.Name,"description": event.Description,"plugin_type": event.PluginType,"server_url":  event.ServerURL,"created_at":  event.CreatedAt,"status":      "draft",}// 添加到ES索引err := h.esClient.Index(ctx, "coze_resource", fmt.Sprintf("%d", event.PluginID), doc)if err != nil {return fmt.Errorf("创建ES索引失败: %w", err)}logs.CtxInfof(ctx, "Added plugin to ES index, pluginID=%d", event.PluginID)return nil
}

创建事件处理内容

  • 索引建立:在ElasticSearch中创建插件文档索引
  • 缓存更新:更新相关的缓存数据
  • 配置初始化:初始化插件相关的配置文件
  • 通知发送:向相关用户发送创建成功通知
  • 统计更新:更新插件创建相关的统计数据
6. 响应数据结构

RegisterPluginMetaResponse

type RegisterPluginMetaResponse struct {Code      int64  `json:"code"`       // 响应码Msg       string `json:"msg"`        // 响应消息Success   bool   `json:"success"`    // 创建是否成功PluginID  int64  `json:"plugin_id"`  // 新创建的插件IDCreatedAt int64  `json:"created_at"` // 创建时间戳BaseResp  *base.BaseResp `json:"base_resp"` // 基础响应信息
}

RegisterPluginMetaRequest请求结构

type RegisterPluginMetaRequest struct {PluginType   int32  `json:"plugin_type" binding:"required"`   // 插件类型SpaceID      int64  `json:"space_id" binding:"required"`      // 工作空间IDName         string `json:"name" binding:"required"`          // 插件名称Desc         string `json:"desc,omitempty"`                   // 插件描述URL          string `json:"url,omitempty"`                    // 服务器URLAuthType     int32  `json:"auth_type" binding:"required"`     // 认证类型SubAuthType  *int32 `json:"sub_auth_type,omitempty"`          // 认证子类型Location     string `json:"location,omitempty"`               // 参数位置Key          string `json:"key,omitempty"`                    // 认证密钥ServiceToken string `json:"service_token,omitempty"`          // 服务令牌ProjectID    *int64 `json:"project_id,omitempty"`             // 项目IDIcon         *Icon  `json:"icon,omitempty"`                   // 插件图标CommonParams string `json:"common_params,omitempty"`          // 通用参数OauthInfo    *OAuthInfo `json:"oauth_info,omitempty"`         // OAuth信息AuthPayload  string `json:"auth_payload,omitempty"`           // 认证载荷
}

PluginCreatedEvent事件结构

type PluginCreatedEvent struct {PluginID    int64     `json:"plugin_id"`    // 插件IDSpaceID     int64     `json:"space_id"`     // 工作空间IDCreatorID   int64     `json:"creator_id"`   // 创建者IDName        string    `json:"name"`         // 插件名称Description string    `json:"description"`  // 插件描述PluginType  int32     `json:"plugin_type"`  // 插件类型ServerURL   string    `json:"server_url"`   // 服务器URLCreatedAt   time.Time `json:"created_at"`   // 创建时间EventType   string    `json:"event_type"`   // 事件类型
}

响应内容说明

  • 成功响应:返回创建成功状态和新创建的插件ID
  • 错误响应:返回具体的错误码和错误信息(如权限不足、参数无效等)
  • 参数验证:确保所有必需参数都已提供且格式正确
  • 时间戳:记录创建操作的具体时间

文章转载自:

http://xEKp7Iiu.pbbzn.cn
http://1FBl0OFy.pbbzn.cn
http://BGgUQEzm.pbbzn.cn
http://lmgIjRZi.pbbzn.cn
http://R4R5MG6x.pbbzn.cn
http://gMvq7Atu.pbbzn.cn
http://fPfyG0Yf.pbbzn.cn
http://QpIIYwQQ.pbbzn.cn
http://IGXfyQMp.pbbzn.cn
http://KroaCIQt.pbbzn.cn
http://ByBZJSz9.pbbzn.cn
http://SEDCWRB9.pbbzn.cn
http://eVMSiz92.pbbzn.cn
http://ECQhHsSu.pbbzn.cn
http://Zsnaz1Dd.pbbzn.cn
http://rhflXffv.pbbzn.cn
http://SJKFdgIN.pbbzn.cn
http://00yOuua3.pbbzn.cn
http://FiYKQGOK.pbbzn.cn
http://BJpYq01i.pbbzn.cn
http://i209LQFu.pbbzn.cn
http://o4VN2y3O.pbbzn.cn
http://bKCLsNRv.pbbzn.cn
http://bP7Panju.pbbzn.cn
http://mMA7xo4a.pbbzn.cn
http://bjh7pvNy.pbbzn.cn
http://XwZyYbFx.pbbzn.cn
http://ZA3PijV1.pbbzn.cn
http://nLjQbY39.pbbzn.cn
http://kGq5DrZx.pbbzn.cn
http://www.dtcms.com/a/385446.html

相关文章:

  • 极简版 Nginx 反向代理实验步骤
  • python-86-基于Graphviz或Mermaid绘制流程图
  • 智能农机无人驾驶作业套圈路径规划
  • Rayon Rust中的数据并行库入门教程
  • NumPy数组与Python列表的赋值行为解析
  • 基于 AI 的大前端智能家居控制应用开发
  • RAGFlow集成SGLang部署的大模型:实现OpenAI API兼容的自定义LLM调用
  • sqlsever 内存配置错误无法连接,后面恢复连接
  • 51c大模型~合集182
  • 2025.9.15总结
  • 深入理解 Roo Code 的 Code Actions 功能
  • Java---线程池讲解
  • PEFT QLora Deepspeed Zero Stage 3 Offload Trainning
  • 线程概念,控制
  • 扫描仪常见样式:平板与馈纸的特性与适用场景
  • Python进程和线程——多线程
  • 2025年AIOCR审核革命!七大智能费控报销系统终结手工录入
  • 从循环到矩阵运算:矢量化加速机器学习的秘诀
  • R 语言入门实战|第七章 程序:从“老虎机”项目学透流程控制与代码优化
  • clickhouse 中SUM(CASE WHEN ...) 返回什么类型?
  • NR帧结构
  • 【联合查询】
  • 常见IC封装详解:从DIP到BGA的演进与应用
  • DockerComposeUI+cpolar:容器管理的远程可视化方案
  • tcp的三次握手与四次挥手简介
  • 2025算法八股——深度学习——MHA MQA GQA
  • 常见岩性分类与油气勘探意义笔记
  • 贪心算法应用:内存分配(First Fit)问题详解
  • RTK基站模块技术要点与作用解析
  • Istio与系统软中断:深度解析与问题排查全指南