Coze源码分析-工作空间-资源查询-后端源码
前言
本文将深入分析Coze Studio项目中用户登录后点击"资源库"功能的后端实现,通过源码解读来理解整个资源库管理系统的架构设计和技术实现。
项目架构概览
整体架构设计
Coze Studio后端采用了经典的分层架构模式,将资源库功能划分为以下几个核心层次:
┌─────────────────────────────────────────────────────────────┐
│ IDL接口定义层 │
│ ┌─────────────┐ ┌───────────────── ┐ ┌─────────────┐ │
│ │ base.thrift │ │openapiauth.thrift│ │ api.thrift │ │
│ └─────────────┘ └───────────────── ┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ API网关层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Handler │ │ Router │ │ Middleware │ │
│ │ 处理器 │ │ 路由 │ │ 中间件 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 应用服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ SearchApplicationService │ │
│ │ LibraryResourceList │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 领域服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Search Plugin workflow │ │
│ │ Knowledge prompt database │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 数据访问层 │
│ ┌─────────────────────────────┐ ┌─────────────────────────┐
│ │ pluginDraftDAO │ │ RepositoryImpl │
│ │ KnowledgeDAO │ │ WorkflowDAO │
│ │ PromptDAO │ │ OnlineImpl │
│ └─────────────────────────────┘ └─────────────────────────┘
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 基础设施层 │
│ ┌─ ─ ─── ─── ── ─ ─ ─┐ │
│ │ gorm.DB │ │
│ │ es.Client │ │
│ └── ─ ── ─── ── ── ─ ┘ │
└─────────────────────────────────────────────────────────────┘↓
┌─────────────────────────────────────────────────────────────┐
│ 存储服务层 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ MySQL数据库 │ │
│ │ ElasticSearch数据库 │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
1. IDL接口定义层
IDL基础类型定义(base.thrift)
文件位置:idl/base.thrift
核心代码:
namespace py base
namespace go base
namespace java com.bytedance.thrift.basestruct TrafficEnv {1: bool Open = false,2: string Env = "" ,
}struct Base {1: string LogID = "",2: string Caller = "",3: string Addr = "",4: string Client = "",5: optional TrafficEnv TrafficEnv ,6: optional map<string,string> Extra ,
}struct BaseResp {1: string StatusMessage = "",2: i32 StatusCode = 0 ,3: optional map<string,string> Extra ,
}
文件作用:
定义了Coze Studio项目中所有接口的基础数据结构,作为其他IDL文件的依赖基础。
资源库查询接口定义(resource.thrift)
文件位置:idl/resource/resource.thrift
当Coze用户登录平台后点击"资源库"时,前端会调用LibraryResourceList
接口来获取资源库中的资源列表。该接口支持多种筛选条件,包括资源类型、发布状态、创建者等。
LibraryResourceList接口定义
LibraryResourceListResponse LibraryResourceList(1: LibraryResourceListRequest request)
(api.post='/api/plugin_api/library_resource_list', api.category="resource", api.gen_path="resource", agw.preserve_base="true")
请求结构体定义
struct LibraryResourceListRequest {1 : optional i32 user_filter , // 是否为当前用户创建,0-不过滤,1-当前用户2 : optional list<resource_common.ResType> res_type_filter , // 资源类型过滤 [4,1] 0表示不过滤3 : optional string name , // 资源名称4 : optional resource_common.PublishStatus publish_status_filter, // 发布状态,0-不过滤,1-未发布,2-已发布5 : required i64 space_id (agw.js_conv="str", api.js_conv="true"), // 用户空间ID7 : optional i32 size , // 单次读取数据条数,默认10,最大1009 : optional string cursor , // 游标,用于分页,默认0,首次请求可不传,后续请求需要带上次返回的cursor10 : optional list<string> search_keys , // 指定自定义搜索使用的字段,不填默认只name匹配11 : optional bool is_get_imageflow , // res_type_filter为[2 workflow]时是否需要返回图像审核255: base.Base Base ,
}
响应结构体定义
struct LibraryResourceListResponse {1 : i64 code ,2 : string msg ,3 : list<resource_common.ResourceInfo> resource_list, // 资源列表5 : optional string cursor , // 下次请求的游标6 : bool has_more , // 是否还有数据可拉取255: required base.BaseResp BaseResp ,
}
核心数据结构 - ResourceInfo
struct ResourceInfo{1 : optional i64 ResID , // 资源ID2 : optional ResType ResType , // 资源类型3 : optional i32 ResSubType , // 资源子类型4 : optional string Name , // 资源名称5 : optional string Desc , // 资源描述6 : optional string Icon , // 资源图标,完整url7 : optional i64 CreatorID , // 资源创建者ID8 : optional string CreatorAvatar , // 创建者头像9 : optional string CreatorName , // 创建者名称10: optional string UserName , // 创建者用户名11: optional PublishStatus PublishStatus , // 资源发布状态12: optional i32 BizResStatus , // 业务资源状态13: optional bool CollaborationEnable , // 是否启用多人编辑14: optional i64 EditTime , // 最后编辑时间15: optional i64 SpaceID , // 资源所属空间ID16: optional map<string,string> BizExtend , // 业务扩展信息17: optional list<ResourceAction> Actions , // 可执行的操作按钮18: optional bool DetailDisable , // 是否禁止进入详情页19: optional bool DelFlag , // 删除标识
}
接口作用说明:
- 主要功能: 获取用户空间下的资源库资源列表
- 支持的资源类型: Plugin(插件)、Workflow(工作流)、Knowledge(知识库)、Database(数据库)、UI组件、Prompt、Variable(变量)、Voice(语音)、Imageflow(图像流)
- 筛选能力: 支持按资源类型、发布状态、创建者、名称等多维度筛选
- 分页支持: 使用cursor游标方式实现高效分页
- 操作权限: 返回每个资源可执行的操作列表(复制、删除、编辑等)
IDL主API服务聚合文件(api.thrift)
文件位置:idl/api.thrift
该文件是整个Coze项目的API服务聚合入口点,负责将所有业务模块的IDL服务定义统一聚合,为代码生成工具提供完整的服务接口定义。
核心代码:
include "./resource/resource.thrift"namespace go coze// 资源库核心服务聚合
service ResourceService extends resource.ResourceService {}
// 其他业务服务聚合
资源库接口聚合说明:
通过 service ResourceService extends resource.ResourceService {}
聚合定义,api.thrift将resource.thrift中定义的所有资源库相关接口统一暴露,包括:
- LibraryResourceList:获取资源库资源列表
- LibraryResourceDetail:获取资源库资源详情
- LibraryResourceCreate:创建资源库资源
- LibraryResourceUpdate:更新资源库资源
- LibraryResourceDelete:删除资源库资源
2. API网关层
接口定义-resource.go文件详细分析
文件位置:backend\api\model\resource\resource.go
核心代码:
type ResourceService interface {LibraryResourceList(ctx context.Context, request *LibraryResourceListRequest) (r *LibraryResourceListResponse, err error)}
LibraryResourceList接口模型定义
当Coze用户登录平台后点击"资源库"时,前端会调用LibraryResourceList
接口来获取资源库中的资源列表。该接口支持多种筛选条件,包括资源类型、发布状态、创建者等。
LibraryResourceListRequest 请求结构体:
type LibraryResourceListRequest struct {// Whether created by the current user, 0 - unfiltered, 1 - current userUserFilter *int32 `thrift:"user_filter,1,optional" form:"user_filter" json:"user_filter,omitempty" query:"user_filter"`// [4,1] 0 means do not filterResTypeFilter []common.ResType `thrift:"res_type_filter,2,optional" form:"res_type_filter" json:"res_type_filter,omitempty" query:"res_type_filter"`// nameName *string `thrift:"name,3,optional" form:"name" json:"name,omitempty" query:"name"`// Published status, 0 - unfiltered, 1 - unpublished, 2 - publishedPublishStatusFilter *common.PublishStatus `thrift:"publish_status_filter,4,optional" form:"publish_status_filter" json:"publish_status_filter,omitempty" query:"publish_status_filter"`// User's space IDSpaceID int64 `thrift:"space_id,5,required" form:"space_id,required" json:"space_id,string,required" query:"space_id,required"`// The number of data bars read at one time, the default is 10, and the maximum is 100.Size *int32 `thrift:"size,7,optional" form:"size" json:"size,omitempty" query:"size"`// Cursor, used for paging, default 0, the first request can not be passed, subsequent requests need to bring the last returned cursorCursor *string `thrift:"cursor,9,optional" form:"cursor" json:"cursor,omitempty" query:"cursor"`// The field used to specify the custom search, do not fill in the default only name matches, eg [] string {name, custom} matches the name and custom fields full_textSearchKeys []string `thrift:"search_keys,10,optional" form:"search_keys" json:"search_keys,omitempty" query:"search_keys"`// Do you need to return image review when the res_type_filter is [2 workflow]IsGetImageflow *bool `thrift:"is_get_imageflow,11,optional" form:"is_get_imageflow" json:"is_get_imageflow,omitempty" query:"is_get_imageflow"`Base *base.Base `thrift:"Base,255" form:"Base" json:"Base" query:"Base"`
}
LibraryResourceListResponse 响应结构体:
type LibraryResourceListResponse struct {Code int64 `thrift:"code,1" form:"code" json:"code" query:"code"`Msg string `thrift:"msg,2" form:"msg" json:"msg" query:"msg"`ResourceList []*common.ResourceInfo `thrift:"resource_list,3" form:"resource_list" json:"resource_list" query:"resource_list"`// Cursor, the cursor for the next requestCursor *string `thrift:"cursor,5,optional" form:"cursor" json:"cursor,omitempty" query:"cursor"`// Is there still data to be pulled?HasMore bool `thrift:"has_more,6" form:"has_more" json:"has_more" query:"has_more"`BaseResp *base.BaseResp `thrift:"BaseResp,255,required" form:"BaseResp,required" json:"BaseResp,required" query:"BaseResp,required"`
}
接口功能说明
业务功能:
- 资源库列表获取:获取用户空间内的所有资源库资源,支持分页查询
- 多维度筛选:支持按资源类型、发布状态、创建者、名称等条件筛选
- 搜索功能:支持按名称和自定义字段进行全文搜索
- 分页支持:使用游标分页机制,支持高效的大数据量分页
- 权限控制:基于用户身份和空间权限进行访问控制
技术特性:
- 类型安全:使用强类型定义确保数据一致性
- 多格式支持:支持thrift、form、json、query等多种序列化格式
- 可选字段:使用optional标记支持向后兼容
- 统一响应:遵循统一的响应格式规范
- 游标分页:使用cursor机制实现高效分页,避免深度分页性能问题
文件作用:
由thriftgo自动生成的Go代码文件,基于IDL定义生成对应的Go结构体和接口,提供类型安全的API模型。该文件实现了完整的Thrift RPC通信机制,包括客户端调用、服务端处理、序列化/反序列化等功能,确保了分布式服务间的可靠通信。
资源库接口处理器实现
文件位置:backend/api/handler/coze/resource_service.go
该文件包含了用户登录后点击资源库功能的所有核心API接口处理器,主要负责处理资源库资源的查询、复制、管理等功能。
核心代码:
// LibraryResourceList .
// @router /api/plugin_api/library_resource_list [POST]
func LibraryResourceList(ctx context.Context, c *app.RequestContext) {var err errorvar req resource.LibraryResourceListRequesterr = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}if req.SpaceID <= 0 {invalidParamRequestResponse(c, "space_id is invalid")return}if req.GetSize() > 100 {invalidParamRequestResponse(c, "size is too large")return}resp, err := search.SearchSVC.LibraryResourceList(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}c.JSON(consts.StatusOK, resp)
}
实现功能:
- 参数验证:验证请求参数的有效性,包括SpaceID和Size的范围检查
- 请求绑定:使用Hertz框架的BindAndValidate方法自动绑定和验证请求参数
- 业务调用:调用搜索服务层的LibraryResourceList方法获取资源列表
- 错误处理:统一的错误处理机制,包括参数错误和内部服务错误
- 响应返回:以JSON格式返回标准化的响应结果
参数校验逻辑:
- SpaceID校验:必须大于0,确保是有效的空间ID
- Size限制:最大不超过100,防止单次查询数据量过大影响性能
- 自动绑定:利用struct tag自动完成参数绑定和基础验证
路由注册实现-api.go文件详细分析
文件位置:backend/api/router/coze/api.go
核心代码:
// Code generated by hertz generator. DO NOT EDIT.func Register(r *server.Hertz) {root := r.Group("/", rootMw()...){_api := root.Group("/api", _apiMw()...){_plugin_api := _api.Group("/plugin_api", _plugin_apiMw()...)_plugin_api.POST("/library_resource_list", append(_libraryresourcelistMw(), coze.LibraryResourceList)...)// ... 其他资源相关路由}}
}
文件作用:
此文件是Coze Studio后端的核心路由注册文件,由hertz generator自动生成,负责将所有HTTP API接口路由与对应的处理函数进行绑定和注册。该文件构建了完整的RESTful API路由树结构。对于资源库模块,构建了层次化的路由结构:
/api/plugin_api/library_resource_list [POST]
├── rootMw() # 根级中间件
├── _apiMw() # API组中间件
├── _plugin_apiMw() # 插件API组中间件
├── _libraryresourcelistMw() # 资源库列表接口中间件
└── coze.LibraryResourceList # 处理函数
中间件系统-middleware.go文件详细分析
文件位置:backend/api/router/coze/middleware.go
核心代码:
func _plugin_apiMw() []app.HandlerFunc {// 插件API模块中间件return nil
}func _libraryresourcelistMw() []app.HandlerFunc {// 资源库列表查询接口专用中间件return nil
}func _projectresourcelistMw() []app.HandlerFunc {// 项目资源列表查询接口专用中间件return nil
}func _resourcecopydispatchMw() []app.HandlerFunc {// 资源复制分发接口专用中间件return nil
}
文件作用:
- 中间件函数定义:为资源库模块的每个路由组和特定路由提供中间件挂载点
- 路由层级管理:按照路由的层级结构组织中间件函数,支持三层中间件架构
- 开发者扩展接口:提供统一的接口供开发者添加自定义中间件逻辑,如认证、鉴权、限流、日志记录等
- 粒度化控制:支持从模块级别到接口级别的细粒度中间件控制
- 功能扩展:可在此处添加资源访问权限检查、请求日志记录、性能监控等功能
API网关层Restful接口路由-Coze+Hertz
Hertz为每个HTTP方法维护独立的路由树,通过分组路由的方式构建层次化的API结构。对于资源库列表查询接口的完整路由链路:
/api/plugin_api/library_resource_list [POST]
├── rootMw() # 根级中间件(全局认证、CORS等)
├── _apiMw() # API组中间件(API版本控制、通用验证)
├── _plugin_apiMw() # 插件API组中间件(插件相关权限检查)
├── _libraryresourcelistMw() # 接口级中间件(资源库特定逻辑)
└── coze.LibraryResourceList # 处理函数
这种设计的优势:
- 层次化管理:不同层级的中间件处理不同的关注点,职责清晰
- 可扩展性:每个层级都可以独立添加中间件,不影响其他层级
- 性能优化:中间件按需执行,避免不必要的开销
- POST请求支持:专门处理POST请求的JSON数据绑定和验证
- 资源库管理:专门为资源库功能设计的路由结构,支持复杂的资源操作
- 统一错误处理:在中间件层面实现统一的错误处理和响应格式化
- 安全控制:多层级的安全检查,确保资源访问的安全性
3. 应用服务层
SearchApplicationService初始化
文件位置:backend/application/search/resource_search.go
和 backend/application/search/init.go
SearchApplicationService是搜索应用服务层的核心组件,负责处理项目和资源的搜索、获取、收藏等业务逻辑,是连接API层和领域层的重要桥梁。
服务结构定义
文件位置:backend/application/search/resource_search.go
// SearchApplicationService 搜索应用服务,处理项目和资源搜索的核心业务逻辑
var SearchSVC = &SearchApplicationService{}type SearchApplicationService struct {*ServiceComponents // 嵌入服务组件依赖DomainSVC search.Search // 搜索领域服务
}// 资源类型到默认图标的映射
var resType2iconURI = map[common.ResType]string{common.ResType_Plugin: consts.DefaultPluginIcon,common.ResType_Workflow: consts.DefaultWorkflowIcon,common.ResType_Knowledge: consts.DefaultDatasetIcon,common.ResType_Prompt: consts.DefaultPromptIcon,common.ResType_Database: consts.DefaultDatabaseIcon,
}
服务组件依赖
文件位置:backend/application/search/init.go
// ServiceComponents 定义搜索服务所需的所有依赖组件
type ServiceComponents struct {DB *gorm.DB // 数据库连接Cache cache.Cmdable // 缓存服务TOS storage.Storage // 对象存储服务ESClient es.Client // Elasticsearch客户端ProjectEventBus ProjectEventBus // 项目事件总线ResourceEventBus ResourceEventBus // 资源事件总线SingleAgentDomainSVC singleagent.SingleAgent // 单智能体领域服务APPDomainSVC app.AppService // APP领域服务KnowledgeDomainSVC knowledge.Knowledge // 知识库领域服务PluginDomainSVC service.PluginService // 插件领域服务WorkflowDomainSVC workflow.Service // 工作流领域服务UserDomainSVC user.User // 用户领域服务ConnectorDomainSVC connector.Connector // 连接器领域服务PromptDomainSVC prompt.Prompt // 提示词领域服务DatabaseDomainSVC database.Database // 数据库领域服务
}
服务初始化实现
文件位置:backend/application/search/init.go
// InitService 初始化搜索应用服务,注入所有依赖并设置消息队列消费者
func InitService(ctx context.Context, s *ServiceComponents) (*SearchApplicationService, error) {// 创建搜索领域服务searchDomainSVC := search.NewDomainService(ctx, s.ESClient)// 注入依赖到全局服务实例SearchSVC.DomainSVC = searchDomainSVCSearchSVC.ServiceComponents = s// 设置项目搜索消费者searchConsumer := search.NewProjectHandler(ctx, s.ESClient)logs.Infof("start search domain consumer...")nameServer := os.Getenv(consts.MQServer)// 注册项目事件消费者err := eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicApp, consts.RMQConsumeGroupApp, searchConsumer)if err != nil {return nil, fmt.Errorf("register search consumer failed, err=%w", err)}// 设置资源搜索消费者searchResourceConsumer := search.NewResourceHandler(ctx, s.ESClient)// 注册资源事件消费者err = eventbus.DefaultSVC().RegisterConsumer(nameServer, consts.RMQTopicResource, consts.RMQConsumeGroupResource, searchResourceConsumer)if err != nil {return nil, fmt.Errorf("register search consumer failed, err=%w", err)}return SearchSVC, nil
}// 事件总线类型别名
type (ResourceEventBus = search.ResourceEventBusProjectEventBus = search.ProjectEventBus
)// NewResourceEventBus 创建资源事件总线
func NewResourceEventBus(p eventbus.Producer) search.ResourceEventBus {return search.NewResourceEventBus(p)
}// NewProjectEventBus 创建项目事件总线
func NewProjectEventBus(p eventbus.Producer) search.ProjectEventBus {return search.NewProjectEventBus(p)
}
服务初始化特点:
- 依赖注入:通过ServiceComponents结构体注入15个不同的领域服务,实现完整的业务功能支持
- Elasticsearch集成:使用ES客户端提供强大的全文搜索和索引功能
- 事件驱动架构:集成项目和资源事件总线,支持异步事件处理和数据同步
- 消息队列消费者:自动注册项目和资源的MQ消费者,实现实时数据更新
- 多领域服务协调:整合智能体、APP、知识库、插件、工作流等多个领域服务
- 存储服务集成:支持数据库持久化、缓存加速和对象存储
资源库查询服务核心实现
资源库列表获取功能
文件位置:backend/application/search/resource_search.go
// LibraryResourceList 获取资源库列表
func (s *SearchApplicationService) LibraryResourceList(ctx context.Context, req *resource.LibraryResourceListRequest) (resp *resource.LibraryResourceListResponse, err error) {// 权限验证userID := ctxutil.GetUIDFromCtx(ctx)if userID == nil {return nil, errorx.New(errno.ErrSearchPermissionCode, errorx.KV("msg", "session required"))}// 构建资源搜索请求searchReq := &entity.SearchResourcesRequest{SpaceID: req.GetSpaceID(),OwnerID: 0,Name: req.GetName(),ResTypeFilter: req.GetResTypeFilter(),PublishStatusFilter: req.GetPublishStatusFilter(),SearchKeys: req.GetSearchKeys(),Cursor: req.GetCursor(),Limit: req.GetSize(),}// 设置用户过滤条件if req.IsSetUserFilter() && req.GetUserFilter() > 0 {searchReq.OwnerID = ptr.From(userID)}// 调用领域服务搜索资源searchResp, err := s.DomainSVC.SearchResources(ctx, searchReq)if err != nil {return nil, err}// 并发处理资源数据封装lock := sync.Mutex{}tasks := taskgroup.NewUninterruptibleTaskGroup(ctx, 10)resources := make([]*common.ResourceInfo, len(searchResp.Data))// 并发处理除第一个资源外的所有资源if len(searchResp.Data) > 1 {for idx := range searchResp.Data[1:] {index := idx + 1v := searchResp.Data[index]tasks.Go(func() error {ri, err := s.packResource(ctx, v)if err != nil {logs.CtxErrorf(ctx, "[LibraryResourceList] packResource failed, will ignore resID: %d, Name : %s, resType: %d, err: %v",v.ResID, v.GetName(), v.ResType, err)return err}lock.Lock()defer lock.Unlock()resources[index] = rireturn nil})}}// 同步处理第一个资源if len(searchResp.Data) != 0 {ri, err := s.packResource(ctx, searchResp.Data[0])if err != nil {return nil, err}lock.Lock()resources[0] = rilock.Unlock()}// 等待并发任务完成err = tasks.Wait()if err != nil {return nil, err}// 过滤空资源filterResource := make([]*common.ResourceInfo, 0)for _, res := range resources {if res == nil {continue}filterResource = append(filterResource, res)}return &resource.LibraryResourceListResponse{Code: 0,ResourceList: filterResource,Cursor: ptr.Of(searchResp.NextCursor),HasMore: searchResp.HasMore,}, nil
}
代码作用:
- 1.权限验证 :检查用户是否已登录,未登录则返回权限错误
- 2.资源搜索 :根据请求参数(空间ID、资源名称、类型、发布状态)搜索资源
- 3.用户过滤 :支持按当前用户创建的资源进行过滤
- 4.并发处理 :使用协程池并发处理资源数据封装,提高性能
- 5.分页支持 :通过游标机制实现分页查询
代码功能:
在用户登录后点击“资源库”的场景中会调用(LibraryResourceList 方法),系统会:
- 首先通过搜索服务获取资源列表: searchResp, err := s.DomainSVC.SearchResources(ctx, searchReq)
- 然后 循环遍历 每个资源,为每个资源调用 s.packResource(ctx, v)
- 在 packResource 中,为每个资源创建对应的 packer 并调用 GetDataInfo获取具体资源的信息。
func (s *SearchApplicationService) packResource(ctx context.Context, doc *entity.ResourceDocument) (*common.ResourceInfo, error) {ri := &common.ResourceInfo{ResID: ptr.Of(doc.ResID),ResType: ptr.Of(doc.ResType),Name: doc.Name,SpaceID: doc.SpaceID,CreatorID: doc.OwnerID,ResSubType: doc.ResSubType,PublishStatus: doc.PublishStatus,EditTime: ptr.Of(doc.GetUpdateTime() / 1000),}if doc.BizStatus != nil {ri.BizResStatus = ptr.Of(int32(*doc.BizStatus))}packer, err := NewResourcePacker(doc.ResID, doc.ResType, s.ServiceComponents)if err != nil {return nil, errorx.Wrapf(err, "NewResourcePacker failed")}ri = s.packUserInfo(ctx, ri, doc.GetOwnerID())ri.Actions = packer.GetActions(ctx)data, err := packer.GetDataInfo(ctx)if err != nil {logs.CtxWarnf(ctx, "[packResource] GetDataInfo failed, resID: %d, Name : %s, resType: %d, err: %v",doc.ResID, doc.GetName(), doc.ResType, err)ri.Icon = ptr.Of(s.getResourceDefaultIconURL(ctx, doc.ResType))return ri, nil // Warn : weak dependency data}ri.BizResStatus = data.statusri.Desc = data.descri.Icon = ternary.IFElse(len(data.iconURL) > 0,&data.iconURL, ptr.Of(s.getResourceIconURL(ctx, data.iconURI, doc.ResType)))ri.BizExtend = map[string]string{"url": ptr.From(ri.Icon),}return ri, nil
}
代码功能:
packResource 方法是 资源数据封装器 ,负责将原始的资源文档数据转换为前端可用的资源信息结构。具体功能包括:
- 基础信息封装
- 将 entity.ResourceDocument 转换为 common.ResourceInfo
- 设置资源ID、类型、名称、空间ID、创建者ID等基础字段
- 处理编辑时间(转换为秒级时间戳)
- 资源打包器创建
- 调用 NewResourcePacker 根据资源类型创建对应的打包器:
- Plugin打包器 :处理插件资源
- Workflow打包器 :处理工作流资源
- Knowledge打包器 :处理知识库资源
- Prompt打包器 :处理提示词资源
- Database打包器 :处理数据库资源
- 用户信息封装
- 调用 packUserInfo 获取创建者的用户信息(姓名、头像)
- 设置默认用户头像(如果用户头像为空)
- 操作权限设置
- 通过 packer.GetActions(ctx) 获取资源可执行的操作列表
- 不同资源类型有不同的操作权限(编辑、删除、复制等)
- 详细信息获取
- 调用 packer.GetDataInfo(ctx) 获取资源的详细信息:
- 图标信息 :iconURI、iconURL
- 描述信息 :资源描述
- 状态信息 :业务状态
- 如果获取失败,使用默认图标并记录警告日志
- 图标处理
- 优先使用资源的自定义图标URL
- 如果没有自定义图标,则使用默认图标
- 将图标URL添加到扩展信息中
- 设计特点
- 容错性强 :即使获取详细信息失败,也会返回基础信息
- 类型安全 :通过不同的打包器处理不同类型的资源
- 可扩展性 :新增资源类型只需添加对应的打包器
- 性能优化 :在并发场景中被调用,支持高并发处理
这个方法是资源库系统中的核心数据转换组件,确保了前端能够获得完整、格式化的资源信息。
GetDataInfo详解
文件位置:backend\application\search\resource_pack.go
接口定义核心代码:
type ResourcePacker interface {GetDataInfo(ctx context.Context) (*dataInfo, error)}
数据结构定义:
type dataInfo struct {iconURI *string // 资源图标URIiconURL string // 资源图标完整URLdesc *string // 资源描述status *int32 // 资源状态(仅知识库使用)
}
插件资源GetDataInfo实现:
func (p *pluginPacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {plugin, err := p.appContext.PluginDomainSVC.GetDraftPlugin(ctx, p.resID)if err != nil {return nil, err}iconURL, err := p.appContext.TOS.GetObjectUrl(ctx, plugin.GetIconURI())if err != nil {logs.CtxWarnf(ctx, "get icon url failed with '%s', err=%v", plugin.GetIconURI(), err)}return &dataInfo{iconURI: ptr.Of(plugin.GetIconURI()),iconURL: iconURL,desc: ptr.Of(plugin.GetDesc()),}, nil
}
工作流资源GetDataInfo实现:
func (w *workflowPacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {info, err := w.appContext.WorkflowDomainSVC.Get(ctx, &vo.GetPolicy{ID: w.resID,MetaOnly: true,})if err != nil {return nil, err}return &dataInfo{iconURI: &info.IconURI,iconURL: info.IconURL,desc: &info.Desc,}, nil
}
知识库资源GetDataInfo实现:
func (k *knowledgePacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {res, err := k.appContext.KnowledgeDomainSVC.GetKnowledgeByID(ctx, &service.GetKnowledgeByIDRequest{KnowledgeID: k.resID,})if err != nil {return nil, err}kn := res.Knowledgereturn &dataInfo{iconURI: ptr.Of(kn.IconURI),iconURL: kn.IconURL,desc: ptr.Of(kn.Description),status: ptr.Of(int32(kn.Status)),}, nil
}
提示词资源GetDataInfo实现:
func (p *promptPacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {pInfo, err := p.appContext.PromptDomainSVC.GetPromptResource(ctx, p.resID)if err != nil {return nil, err}return &dataInfo{iconURI: nil, // prompt don't have custom iconiconURL: "",desc: &pInfo.Description,}, nil
}
数据库资源GetDataInfo实现:
func (d *databasePacker) GetDataInfo(ctx context.Context) (*dataInfo, error) {listResp, err := d.appContext.DatabaseDomainSVC.MGetDatabase(ctx, &dbservice.MGetDatabaseRequest{Basics: []*database.DatabaseBasic{{ID: d.resID,TableType: table.TableType_OnlineTable,},}})if err != nil {return nil, err}if len(listResp.Databases) == 0 {return nil, fmt.Errorf("online database not found, id: %d", d.resID)}return &dataInfo{iconURI: ptr.Of(listResp.Databases[0].IconURI),iconURL: listResp.Databases[0].IconURL,desc: ptr.Of(listResp.Databases[0].TableDesc),}, nil
}
功能说明:
- GetDataInfo方法作用:获取不同类型资源的详细信息,包括图标、描述和状态等
- 多态实现:通过ResourcePacker接口实现多态,支持Plugin、Workflow、Knowledge、Prompt、Database五种资源类型
- 数据封装:将各种资源的详细信息统一封装到dataInfo结构体中
- 错误处理:每个实现都包含完善的错误处理机制,确保系统稳定性
- 图标处理:支持自定义图标URI和完整URL的获取,提示词资源不支持自定义图标
- 状态管理:知识库资源额外支持状态信息的获取和管理
- 工厂模式:通过NewResourcePacker工厂函数根据资源类型创建相应的实现
4. 领域服务层
搜索领域
搜索领域服务接口
文件位置:backend\domain\search\service\service.go
核心代码:
package serviceimport ("context""github.com/coze-dev/coze-studio/backend/domain/search/entity"
)type ResourceEventBus interface {PublishResources(ctx context.Context, event *entity.ResourceDomainEvent) error
}type Search interface {SearchResources(ctx context.Context, req *entity.SearchResourcesRequest) (resp *entity.SearchResourcesResponse, err error)
}
搜索领域服务实现-业务接口
文件位置:backend\domain\search\service\search.go
核心代码:
package serviceimport ("context""strconv""github.com/bytedance/sonic"model "github.com/coze-dev/coze-studio/backend/api/model/crossdomain/search"searchEntity "github.com/coze-dev/coze-studio/backend/domain/search/entity""github.com/coze-dev/coze-studio/backend/infra/contract/es""github.com/coze-dev/coze-studio/backend/pkg/lang/conv""github.com/coze-dev/coze-studio/backend/pkg/lang/ptr""github.com/coze-dev/coze-studio/backend/pkg/logs"
)var searchInstance *searchImplfunc NewDomainService(ctx context.Context, e es.Client) Search {return &searchImpl{esClient: e,}
}type searchImpl struct {esClient es.Client
}func (s *searchImpl) SearchResources(ctx context.Context, req *searchEntity.SearchResourcesRequest) (resp *searchEntity.SearchResourcesResponse, err error) {logs.CtxDebugf(ctx, "[SearchResources] search : %s", conv.DebugJsonToStr(req))searchReq := &es.Request{Query: &es.Query{Bool: &es.BoolQuery{},},}result, err := s.esClient.Search(ctx, resourceIndexName, searchReq)if err != nil {return nil, err}hits := result.Hits.HitshasMore := func() bool {if len(hits) > reqLimit {return true}return false}()if hasMore {hits = hits[:reqLimit]}docs := make([]*searchEntity.ResourceDocument, 0, len(hits))for _, hit := range hits {doc := &searchEntity.ResourceDocument{}if err = sonic.Unmarshal(hit.Source_, doc); err != nil {return nil, err}docs = append(docs, doc)}nextCursor := ""if len(docs) > 0 {nextCursor = formatResourceNextCursor(req.OrderFiledName, docs[len(docs)-1])}if nextCursor == "" {hasMore = false}var total *int64if result.Hits.Total != nil {total = ptr.Of(result.Hits.Total.Value)}resp = &searchEntity.SearchResourcesResponse{Data: docs,TotalHits: total,HasMore: hasMore,NextCursor: nextCursor,}return resp, nil
}
代码功能:
该方法是Coze Studio资源搜索的核心实现,主要功能包括:
- 多维度筛选 :支持按应用ID、空间ID、资源名称、所有者ID、资源类型、发布状态等条件筛选
- ElasticSearch查询 :构建复杂的ES Bool查询,支持精确匹配和模糊搜索
- 分页支持 :支持基于页码和游标的两种分页方式
- 排序功能 :支持按指定字段排序,默认按更新时间排序
- 结果处理 :将ES查询结果反序列化为ResourceDocument对象,并计算分页信息
该方法位于领域服务层,是资源库功能的核心搜索引擎。
插件领域
插件领域服务接口
文件位置:backend/domain/plugin/service/service.go
插件领域服务接口定义了插件管理的核心业务能力,包括草稿插件管理、在线插件发布、工具管理、Agent工具绑定等功能。
type PluginService interface {// Draft PluginGetDraftPlugin(ctx context.Context, pluginID int64) (plugin *entity.PluginInfo, err error)}
核心接口功能:
- 草稿插件管理:创建、获取、更新、删除草稿状态的插件
- 在线插件管理:发布插件到线上环境,获取在线插件信息
- 工具管理:管理插件中的具体工具,支持OpenAPI文档转换
- Agent工具绑定:将插件工具绑定到Agent,支持工具执行
- OAuth认证:处理插件的OAuth授权流程
插件领域服务实现-业务接口
文件位置:backend/domain/plugin/service/service_impl.go
和backend\domain\plugin\service\plugin_draft.go
插件服务实现类包含了所有插件相关业务逻辑的具体实现,依赖于仓储层进行数据持久化。
type Components struct {IDGen idgen.IDGeneratorDB *gorm.DBOSS storage.StoragePluginRepo repository.PluginRepositoryToolRepo repository.ToolRepositoryOAuthRepo repository.OAuthRepository
}func NewService(components *Components) PluginService {impl := &pluginServiceImpl{db: components.DB,oss: components.OSS,pluginRepo: components.PluginRepo,toolRepo: components.ToolRepo,oauthRepo: components.OAuthRepo,httpCli: resty.New(),}initOnce.Do(func() {ctx := context.Background()safego.Go(ctx, func() {impl.processOAuthAccessToken(ctx)})})return impl
}type pluginServiceImpl struct {db *gorm.DBoss storage.StoragepluginRepo repository.PluginRepositorytoolRepo repository.ToolRepositoryoauthRepo repository.OAuthRepositoryhttpCli *resty.Client
}func (p *pluginServiceImpl) GetDraftPlugin(ctx context.Context, pluginID int64) (plugin *entity.PluginInfo, err error) {pl, exist, err := p.pluginRepo.GetDraftPlugin(ctx, pluginID)if err != nil {return nil, errorx.Wrapf(err, "GetDraftPlugin failed, pluginID=%d", pluginID)}if !exist {return nil, errorx.New(errno.ErrPluginRecordNotFound)}return pl, nil
}
实现特点:
- 依赖注入:通过Components结构体注入所需的基础设施组件
- 仓储模式:使用Repository模式进行数据访问抽象
- HTTP客户端:集成Resty HTTP客户端用于外部API调用
- 异步处理:使用safego进行OAuth令牌的异步处理
- 单例初始化:使用sync.Once确保初始化逻辑只执行一次
工作流领域
工作流领域服务接口
文件位置:backend/domain/workflow/interface.go
type Service interface {Get(ctx context.Context, policy *vo.GetPolicy) (*entity.Workflow, error)}
接口功能分析:
- 工作流管理:Create、Save、Get、MGet、Delete、UpdateMeta等基础CRUD操作
工作流领域服务实现-业务接口
文件位置:backend/domain/workflow/service/service_impl.go
type impl struct {repo workflow.Repository*asToolImpl*executableImpl*conversationImpl
}func NewWorkflowService(repo workflow.Repository) workflow.Service {return &impl{repo: repo,asToolImpl: &asToolImpl{repo: repo,},executableImpl: &executableImpl{repo: repo,},conversationImpl: &conversationImpl{repo: repo},}
}func (i *impl) Get(ctx context.Context, policy *vo.GetPolicy) (*entity.Workflow, error) {return i.repo.GetEntity(ctx, policy)
}
工作流领域服务实现-业务实体
文件位置:backend/domain/workflow/entity/workflow.go
type Workflow struct {ID int64CommitID string*vo.Meta*vo.CanvasInfo*vo.DraftMeta*vo.VersionMeta
}func (w *Workflow) GetBasic() *WorkflowBasic {var version stringif w.VersionMeta != nil {version = w.VersionMeta.Version}return &WorkflowBasic{ID: w.ID,Version: version,SpaceID: w.SpaceID,AppID: w.AppID,CommitID: w.CommitID,}
}func (w *Workflow) GetLatestVersion() string {if w.LatestPublishedVersion == nil {return ""}return *w.LatestPublishedVersion
}type WorkflowBasic struct {ID int64Version stringSpaceID int64AppID *int64CommitID string
}type CopyWorkflowFromAppToLibraryResult struct {WorkflowIDVersionMap map[int64]IDVersionPairValidateIssues []*vo.ValidateIssueCopiedWorkflows []*Workflow
}
实体设计特点:
- 组合结构:Workflow实体组合了Meta、CanvasInfo、DraftMeta、VersionMeta
- 版本管理:支持草稿版本和发布版本的管理
- 基础信息:WorkflowBasic提供工作流的核心标识信息
- 复制结果:CopyWorkflowFromAppToLibraryResult封装复制操作的完整结果
- 方法封装:提供GetBasic、GetLatestVersion等便捷方法
- 空值处理:对可能为空的字段进行安全处理
知识库领域
知识库领域服务接口
文件位置:backend/domain/knowledge/service/interface.go
知识库领域服务接口定义了知识库管理的核心业务能力,包括知识库管理、文档管理、切片管理、检索功能等。
type Knowledge interface {GetKnowledgeByID(ctx context.Context, request *GetKnowledgeByIDRequest) (response *GetKnowledgeByIDResponse, err error)}
核心接口功能:
- 知识库管理:创建、获取、更新、删除、复制知识库,支持知识库在应用和资源库间移动
知识库领域服务实现-业务接口
文件位置:backend/domain/knowledge/service/knowledge.go
知识库服务实现类包含了所有知识库相关业务逻辑的具体实现,依赖于仓储层进行数据持久化。
type KnowledgeSVCConfig struct {DB *gorm.DB // requiredIDGen idgen.IDGenerator // requiredRDB rdb.RDB // Required: Form storageProducer eventbus.Producer // Required: Document indexing process goes through mq asynchronous processingSearchStoreManagers []searchstore.Manager // Required: Vector/Full TextParseManager parser.Manager // Optional: document segmentation and processing capability, default builtin parserStorage storage.Storage // required: ossModelFactory chatmodel.Factory // Required: Model factoryRewriter messages2query.MessagesToQuery // Optional: Do not overwrite when not configuredReranker rerank.Reranker // Optional: default rrf when not configuredNL2Sql nl2sql.NL2SQL // Optional: Not supported by default when not configuredEnableCompactTable *bool // Optional: Table data compression, default trueOCR ocr.OCR // Optional: ocr, ocr function is not available when not providedCacheCli cache.Cmdable // Optional: cache implementation
}type knowledgeSVC struct {knowledgeRepo repository.KnowledgeRepodocumentRepo repository.KnowledgeDocumentReposliceRepo repository.KnowledgeDocumentSliceReporeviewRepo repository.KnowledgeDocumentReviewRepomodelFactory chatmodel.Factoryidgen idgen.IDGeneratorrdb rdb.RDBproducer eventbus.ProducersearchStoreManagers []searchstore.ManagerparseManager parser.Managerrewriter messages2query.MessagesToQueryreranker rerank.Rerankerstorage storage.Storagenl2Sql nl2sql.NL2SQLcacheCli cache.CmdableenableCompactTable bool // Table data compression
}func (k *knowledgeSVC) GetKnowledgeByID(ctx context.Context, request *GetKnowledgeByIDRequest) (response *GetKnowledgeByIDResponse, err error) {if request == nil || request.KnowledgeID == 0 {return nil, errorx.New(errno.ErrKnowledgeInvalidParamCode, errorx.KV("msg", "request is empty"))}kn, err := k.knowledgeRepo.GetByID(ctx, request.KnowledgeID)if err != nil {return nil, errorx.New(errno.ErrKnowledgeDBCode, errorx.KV("msg", err.Error()))}if kn == nil || kn.ID == 0 {return nil, errorx.New(errno.ErrKnowledgeNotExistCode, errorx.KV("msg", "knowledge not found"))}knEntity, err := k.fromModelKnowledge(ctx, kn)if err != nil {return nil, err}return &GetKnowledgeByIDResponse{Knowledge: knEntity,}, nil
}
知识库领域服务实现-业务实体
文件位置:backend/domain/knowledge/entity/knowledge.go
type Knowledge struct {*knowledge.Knowledge
}type WhereKnowledgeOption struct {KnowledgeIDs []int64AppID *int64SpaceID *int64Name *string // Exact matchStatus []int32UserID *int64Query *string // fuzzy matchPage *intPageSize *intOrder *OrderOrderType *OrderTypeFormatType *int64
}type OrderType int32const (OrderTypeAsc OrderType = 1OrderTypeDesc OrderType = 2
)type Order int32const (OrderCreatedAt Order = 1OrderUpdatedAt Order = 2
)
实体设计特点:
- 组合结构:Knowledge实体组合了crossdomain中的Knowledge结构
- 查询选项:WhereKnowledgeOption提供了丰富的查询条件支持
- 排序支持:支持按创建时间、更新时间等字段排序
- 分页查询:内置分页查询支持
- 多维筛选:支持按应用ID、空间ID、状态、格式类型等多维度筛选
- 模糊搜索:支持按名称精确匹配和模糊搜索
提示词领域
提示词领域服务接口
文件位置:backend/domain/prompt/service/prompt.go
提示词领域服务接口定义了提示词管理的核心业务能力,包括提示词资源的创建、获取、更新、删除以及官方提示词模板的管理。
type Prompt interface {GetPromptResource(ctx context.Context, promptID int64) (*entity.PromptResource, error)}
核心接口功能:
- 提示词资源管理:创建、获取、更新、删除用户自定义的提示词资源
提示词领域服务实现-业务接口
文件位置:backend/domain/prompt/service/prompt_impl.go
提示词服务实现类包含了所有提示词相关业务逻辑的具体实现,依赖于仓储层进行数据持久化。
type promptService struct {Repo repository.PromptRepository
}func NewService(repo repository.PromptRepository) Prompt {return &promptService{Repo: repo,}
}func (s *promptService) GetPromptResource(ctx context.Context, promptID int64) (*entity.PromptResource, error) {return s.Repo.GetPromptResource(ctx, promptID)
}
实现特点:
- 依赖注入:通过Repository接口注入数据访问能力
- 仓储模式:使用Repository模式进行数据访问抽象
- 官方模板:集成官方提示词模板管理功能
- 搜索过滤:支持按关键词搜索和过滤提示词
- 数据复制:使用深拷贝确保数据安全性
- 错误处理:统一的错误处理和传播机制
提示词领域服务实现-业务实体
文件位置:backend/domain/prompt/entity/promot_resource.go
type PromptResource struct {ID int64SpaceID int64Name stringDescription stringPromptText stringStatus int32CreatorID int64CreatedAt int64UpdatedAt int64
}
实体设计特点:
- 基础信息:包含ID、名称、描述等基本属性
- 内容存储:PromptText字段存储完整的提示词内容
- 权限管理:SpaceID和CreatorID支持多租户和权限控制
- 状态管理:Status字段支持提示词的状态管理
- 时间追踪:CreatedAt和UpdatedAt支持创建和更新时间追踪
- 简洁设计:实体结构简洁明了,专注于提示词核心属性
数据库领域
数据库领域负责管理Coze Studio中的数据库资源,包括草稿数据库和在线数据库的创建、更新、删除、查询等核心功能。该领域支持结构化数据存储,为Agent提供数据查询和操作能力。
数据库领域服务接口
文件位置:backend/domain/memory/database/service/database.go
数据库领域服务接口定义了数据库管理的核心业务能力,包括数据库CRUD操作、记录管理、Agent绑定、SQL执行等功能。
type Database interface {// 数据库基础操作MGetDatabase(ctx context.Context, req *MGetDatabaseRequest) (*MGetDatabaseResponse, error)}
核心接口功能:
- 数据库管理:创建、更新、删除、批量获取数据库
- 记录操作:增删改查数据库记录,支持分页查询
- 模板和导入:支持Excel模板生成和数据导入
- SQL执行:支持自定义SQL查询和操作
- Agent集成:将数据库绑定到Agent,支持智能查询
- 发布管理:将草稿数据库发布到线上环境
数据库领域服务实现-业务接口
文件位置:backend/domain/memory/database/service/database_impl.go
数据库服务实现类包含了所有数据库相关业务逻辑的具体实现,依赖于仓储层进行数据持久化。
MGetDatabase方法实现:
func (d databaseService) MGetDatabase(ctx context.Context, req *MGetDatabaseRequest) (*MGetDatabaseResponse, error) {if len(req.Basics) == 0 {return &MGetDatabaseResponse{Databases: []*entity2.Database{},}, nil}onlineID2NeedSysFields := make(map[int64]bool)draftID2NeedSysFields := make(map[int64]bool)uniqueOnlineIDs := make([]int64, 0)uniqueDraftIDs := make([]int64, 0)idMap := make(map[int64]bool)for _, basic := range req.Basics {if !idMap[basic.ID] {idMap[basic.ID] = trueif basic.TableType == table.TableType_OnlineTable {uniqueOnlineIDs = append(uniqueOnlineIDs, basic.ID)onlineID2NeedSysFields[basic.ID] = basic.NeedSysFields} else {uniqueDraftIDs = append(uniqueDraftIDs, basic.ID)draftID2NeedSysFields[basic.ID] = basic.NeedSysFields}}}// 批量获取在线数据库和草稿数据库onlineDatabases, err := d.onlineDAO.MGet(ctx, uniqueOnlineIDs)if err != nil {return nil, fmt.Errorf("batch get database info failed: %v", err)}draftDatabases, err := d.draftDAO.MGet(ctx, uniqueDraftIDs)if err != nil {return nil, fmt.Errorf("batch get database info failed: %v", err)}// 处理系统字段和图标URLfor _, onlineDatabase := range onlineDatabases {if needSys, ok := onlineID2NeedSysFields[onlineDatabase.ID]; ok && needSys {if onlineDatabase.FieldList == nil {onlineDatabase.FieldList = make([]*database.FieldItem, 0, 3)}onlineDatabase.FieldList = append(onlineDatabase.FieldList, physicaltable.GetDisplayCreateTimeField(), physicaltable.GetDisplayUidField(), physicaltable.GetDisplayIDField())}if onlineDatabase.IconURI != "" {objURL, uRrr := d.storage.GetObjectUrl(ctx, onlineDatabase.IconURI)if uRrr == nil {onlineDatabase.IconURL = objURL}}}databases := make([]*entity2.Database, 0)databases = append(databases, onlineDatabases...)databases = append(databases, draftDatabases...)return &MGetDatabaseResponse{Databases: databases,}, nil
}
业务逻辑特点:
- 双表管理:同时维护草稿表和在线表,支持开发和生产环境分离
- 物理表创建:自动创建对应的物理数据库表结构
- 事务保证:使用数据库事务确保数据一致性
- 字段管理:支持自定义字段和系统字段的管理
- 图标处理:自动处理图标URI到URL的转换
- 批量操作:支持批量获取数据库信息,提高查询效率
数据库领域服务实现-业务实体
文件位置:backend/domain/memory/database/entity/database.go
数据库领域实体定义了数据库相关的核心数据结构和业务对象。
type Database = database.Database// DatabaseFilter Database filter criteria
type DatabaseFilter struct {CreatorID *int64SpaceID *int64TableName *stringAppID *int64
}// Pagination pagination
type Pagination struct {Total int64Limit intOffset int
}type TableSheet struct {SheetID int64HeaderLineIdx int64StartLineIdx int64
}type TableReaderMeta struct {TosMaxLine int64SheetId int64HeaderLineIdx int64StartLineIdx int64ReaderMethod database.TableReadDataMethodReadLineCnt int64Schema []*knowledge.DocTableColumn
}type ExcelExtraInfo struct {Sheets []*knowledge.DocTableSheetExtensionName string // extensionFileSize int64 // file sizeSourceFileID int64TosURI string
}type ColumnInfo struct {ColumnType knowledge.ColumnTypeContainsEmptyValue bool
}
实体设计特点:
- 数据库实体:复用跨域模型中的Database定义,保持一致性
- 过滤条件:DatabaseFilter支持多维度的数据库查询过滤
- 分页支持:Pagination实体支持分页查询功能
- 表格处理:TableSheet和TableReaderMeta支持Excel/CSV文件导入
- 元数据管理:ExcelExtraInfo包含文件的详细元数据信息
- 列信息:ColumnInfo描述数据库列的类型和约束信息
数据库领域核心功能:
- 结构化存储:为Agent提供结构化数据存储能力
- 数据导入:支持从Excel/CSV文件批量导入数据
- SQL查询:支持自定义SQL查询和数据分析
- Agent集成:与Agent深度集成,支持智能数据查询
- 版本管理:通过草稿和在线环境实现数据版本管理
- 权限控制:支持基于用户和应用的数据访问控制
5. 数据访问层
插件仓储
PluginRepo仓储接口定义
文件位置:backend\domain\plugin\repository\plugin_repository.go
type PluginRepository interface {GetDraftPlugin(ctx context.Context, pluginID int64, opts ...PluginSelectedOptions) (plugin *entity.PluginInfo, exist bool, err error)}
PluginRepo仓储实现
文件位置:backend\domain\plugin\repository\plugin_impl.go
func (p *pluginRepoImpl) GetDraftPlugin(ctx context.Context, pluginID int64, opts ...PluginSelectedOptions) (plugin *entity.PluginInfo, exist bool, err error) {var opt *dal.PluginSelectedOptionif len(opts) > 0 {opt = &dal.PluginSelectedOption{}for _, o := range opts {o(opt)}}return p.pluginDraftDAO.Get(ctx, pluginID, opt)
}
数据模型与查询
文件位置:backend\domain\plugin\internal\dal\plugin_draft.go
func (a *PluginDraftDAO) Get(ctx context.Context, pluginID int64) (plugin *entity.PluginInfo, exist bool, err error) {table := a.query.PluginDraftres, err := table.WithContext(ctx).Where(table.ID.Eq(pluginID)).First()if err != nil {if errors.Is(err, gorm.ErrRecordNotFound) {return nil, false, nil}return nil, false, err}plugin = pluginDraftPO(*res).ToDO()return plugin, true, nil
}
代码功能:
- 数据库查询 - 使用GORM查询框架,从 PluginDraft 表中根据ID查找记录
- 错误处理 - 区分处理两种情况:
- 记录不存在:返回 (nil, false, nil)
- 其他数据库错误:返回 (nil, false, err)
- 数据转换 - 将数据库持久化对象(PO)转换为领域实体(DO)
- pluginDraftPO(*res).ToDO() 执行PO到DO的转换
- 成功返回 - 找到记录时返回 (plugin, true, nil)
- 失败返回 - 未找到记录时返回 (nil, false, nil)
文件依赖关系:
数据库表结构 (schema.sql)(plugin_draft表)↓ gen_orm_query.go
模型文件 (model/plugin_draft.gen.go) - 生成模型↓
查询文件 (query/plugin_draft.gen.go) - 依赖对应模型↓
统一入口 (query/gen.go) - 依赖所有查询文件
工作流仓储
workflowRepo仓储接口定义
文件位置:backend\domain\workflow\interface.go
type Repository interface {GetEntity(ctx context.Context, policy *vo.GetPolicy) (*entity.Workflow, error)}
workflowRepo仓储实现
文件位置:backend\domain\workflow\internal\repo\repository.go
type RepositoryImpl struct {idgen.IDGeneratorquery *query.Queryredis cache.Cmdabletos storage.StorageeinoCompose.CheckPointStoreworkflow.InterruptEventStoreworkflow.CancelSignalStoreworkflow.ExecuteHistoryStorebuiltinModel cm.BaseChatModelworkflow.WorkflowConfigworkflow.Suggester
}func (r *RepositoryImpl) GetEntity(ctx context.Context, policy *vo.GetPolicy) (_ *entity.Workflow, err error) {defer func() {if err != nil {err = vo.WrapIfNeeded(errno.ErrDatabaseError, err)}}()meta, err := r.GetMeta(ctx, policy.ID)if err != nil {return nil, err}if policy.MetaOnly {return &entity.Workflow{ID: policy.ID,Meta: meta,}, nil}}func (r *RepositoryImpl) GetMeta(ctx context.Context, id int64) (_ *vo.Meta, err error) {defer func() {if err != nil {err = vo.WrapIfNeeded(errno.ErrDatabaseError, err)}}()meta, err := r.query.WorkflowMeta.WithContext(ctx).Debug().Where(r.query.WorkflowMeta.ID.Eq(id)).First()if err != nil {if errors.Is(err, gorm.ErrRecordNotFound) {return nil, vo.WrapError(errno.ErrWorkflowNotFound, fmt.Errorf("workflow meta not found for ID %d: %w", id, err),errorx.KV("id", strconv.FormatInt(id, 10)))}return nil, fmt.Errorf("failed to get workflow meta for ID %d: %w", id, err)}return r.convertMeta(ctx, meta)
}
数据模型与查询
文件依赖关系:
数据库表结构 (schema.sql)(workflow_meta表)↓ gen_orm_query.go
模型文件 (model/workflow_meta.gen.go) - 生成模型↓
查询文件 (query/workflow_meta.gen.go) - 依赖对应模型↓
统一入口 (query/gen.go) - 依赖所有查询文件
知识库仓储
knowledgeRepo仓储接口定义
文件位置:backend\domain\knowledge\repository\repository.go
type KnowledgeRepo interface {GetByID(ctx context.Context, id int64) (*model.Knowledge, error)}
knowledgeRepo仓储实现
文件位置:backend\domain\knowledge\internal\dal\dao\knowledge.go
func (dao *KnowledgeDAO) GetByID(ctx context.Context, id int64) (*model.Knowledge, error) {k := dao.Query.Knowledgeknowledge, err := k.WithContext(ctx).Where(k.ID.Eq(id)).First()if err != nil {if errors.Is(err, gorm.ErrRecordNotFound) {return nil, nil}return nil, err}return knowledge, nil
}
数据模型与查询
文件依赖关系:
数据库表结构 (schema.sql)(knowledge表)↓ gen_orm_query.go
模型文件 (model/knowledge.gen.go) - 生成模型↓
查询文件 (query/knowledge.gen.go) - 依赖对应模型↓
统一入口 (query/gen.go) - 依赖所有查询文件
提示词仓储
promptRepo仓储接口定义
文件位置:backend\domain\prompt\repository\repository.go
type PromptRepository interface {GetPromptResource(ctx context.Context, promptID int64) (*entity.PromptResource, error)}
promptRepo仓储实现
文件位置:backend\domain\prompt\internal\dal\prompt_resource.go
func (d *PromptDAO) GetPromptResource(ctx context.Context, promptID int64) (*entity.PromptResource, error) {promptModel := query.PromptResourcepromptWhere := []gen.Condition{promptModel.ID.Eq(promptID),}promptResource, err := promptModel.WithContext(ctx).Where(promptWhere...).First()if errors.Is(err, gorm.ErrRecordNotFound) {return nil, errorx.WrapByCode(err, errno.ErrPromptDataNotFoundCode)}if err != nil {return nil, errorx.WrapByCode(err, errno.ErrPromptGetCode)}do := d.promptResourcePO2DO(promptResource)return do, nil
}
数据模型与查询
文件依赖关系:
数据库表结构 (schema.sql)(prompt_resource表)↓ gen_orm_query.go
模型文件 (model/prompt_resource.gen.go) - 生成模型↓
查询文件 (query/prompt_resource.gen.go) - 依赖对应模型↓
统一入口 (query/gen.go) - 依赖所有查询文件
数据库仓储
databaseRepo仓储接口定义
文件位置:backend\domain\memory\database\repository\repository.go
type OnlineDAO interface {MGet(ctx context.Context, ids []int64) ([]*entity.Database, error)
}
databaseRepo仓储实现
文件位置:backend\domain\memory\database\repository\repository.go
func (o *OlineImpl) MGet(ctx context.Context, ids []int64) ([]*entity.Database, error) {if len(ids) == 0 {return []*entity.Database{}, nil}res := o.query.OnlineDatabaseInfo// Query undeleted records with IDs in the given listrecords, err := res.WithContext(ctx).Where(res.ID.In(ids...)).Find()if err != nil {return nil, fmt.Errorf("batch query online database failed: %v", err)}// Build returns resultsdatabases := make([]*entity.Database, 0, len(records))for _, info := range records {db := &entity.Database{ID: info.ID,SpaceID: info.SpaceID,CreatorID: info.CreatorID,IconURI: info.IconURI,AppID: info.AppID,DraftID: &info.RelatedDraftID,OnlineID: &info.ID,IsVisible: info.IsVisible == 1,PromptDisabled: info.PromptDisabled == 1,TableName: info.TableName_,TableDesc: info.TableDesc,FieldList: info.TableField,Status: table.BotTableStatus_Online,ActualTableName: info.PhysicalTableName,RwMode: table.BotTableRWMode(info.RwMode),CreatedAtMs: info.CreatedAt,UpdatedAtMs: info.UpdatedAt,}databases = append(databases, db)}return databases, nil
}
数据模型与查询
文件依赖关系:
数据库表结构 (schema.sql)(online_database_info表)↓ gen_orm_query.go
模型文件 (model/online_database_info.gen.go) - 生成模型↓
查询文件 (query/online_database_info.gen.go) - 依赖对应模型↓
统一入口 (query/gen.go) - 依赖所有查询文件
6. 基础设施层
database.go文件详解
文件位置:backend/infra/contract/orm/database.go
核心代码:
package ormimport ("gorm.io/gorm"
)type DB = gorm.DB
文件作用:
- 定义了 type DB = gorm.DB ,为 GORM 数据库对象提供类型别名
- 作为契约层(Contract),为上层提供统一的数据库接口抽象
- 便于后续可能的数据库实现替换(如从 MySQL 切换到 PostgreSQL)
mysql.go文件详解
文件位置:backend/infra/impl/mysql/mysql.go
核心代码:
package mysqlimport ("fmt""os""gorm.io/driver/mysql""gorm.io/gorm"
)func New() (*gorm.DB, error) {dsn := os.Getenv("MYSQL_DSN")db, err := gorm.Open(mysql.Open(dsn))if err != nil {return nil, fmt.Errorf("mysql open, dsn: %s, err: %w", dsn, err)}return db, nil
}
文件作用:
- 定义了 New() 函数,负责建立 GORM MySQL 数据库连接
- 使用环境变量 MYSQL_DSN 配置数据库连接字符串
- 返回 *gorm.DB 实例,作为整个应用的数据库连接对象
- 后端服务启动时,调用 mysql.New() 初始化数据库连接
main.go → application.Init() → appinfra.Init() → mysql.New()
ElasticSearch架构设计
Contract 层(接口定义)
backend/infra/contract/es/
目录定义了 ElasticSearch 的抽象接口:
-
es.go
: 定义了核心接口Client
接口:包含Search
、Create
、Update
、Delete
、CreateIndex
等方法Types
接口:定义属性类型创建方法BulkIndexer
接口:批量操作接口
-
model.go
: 定义数据模型Request
:搜索请求结构体,包含查询条件、分页、排序等Response
:搜索响应结构体,包含命中结果和元数据Hit
:单个搜索结果BulkIndexerItem
:批量操作项
-
query.go
: 定义查询相关结构Query
:查询结构体,支持多种查询类型QueryType
常量:equal
、match
、multi_match
、not_exists
、contains
、in
BoolQuery
:布尔查询,支持must
、should
、filter
、must_not
- 各种查询构造函数:
NewEqualQuery
、NewMatchQuery
等
Implementation 层(具体实现)
backend/infra/impl/es/
目录提供了具体实现:
-
es_impl.go
: 工厂方法New()
函数根据环境变量ES_VERSION
选择 ES7 或 ES8 实现- 类型别名导出,统一接口
-
es7.go
: ElasticSearch 7.x 实现es7Client
结构体实现Client
接口- 使用
github.com/elastic/go-elasticsearch/v7
官方客户端 Search
方法将抽象查询转换为 ES7 格式的 JSON 查询query2ESQuery
方法处理查询类型转换
-
es8.go
: ElasticSearch 8.x 实现es8Client
结构体实现Client
接口- 使用
github.com/elastic/go-elasticsearch/v8
官方客户端 - 使用类型化 API,更加类型安全
Search
方法使用 ES8 的 typed API
查询执行流程
- 业务层调用:
backend/domain/search/service/search.go
中的SearchProjects
方法 - 构建查询:创建
es.Request
对象,设置查询条件、排序、分页等 - 执行查询:调用
s.esClient.Search(ctx, projectIndexName, searchReq)
- 版本适配:根据
ES_VERSION
环境变量,自动选择 ES7 或 ES8 实现 - 查询转换:
- ES7:将抽象查询转换为 JSON 格式
- ES8:将抽象查询转换为类型化结构体
- 结果处理:将 ES 响应转换为统一的
Response
结构体
索引使用:
- 项目索引:
projectIndexName = "project_draft"
存储项目草稿信息 - 资源索引:
resourceIndexName = "coze_resource"
存储各类资源信息
设计优势
- 版本兼容:同时支持 ES7 和 ES8,通过环境变量切换
- 接口统一:业务代码无需关心具体 ES 版本
- 类型安全:ES8 使用类型化 API,减少运行时错误
- 查询抽象:提供统一的查询构建方式,支持复杂的布尔查询
- 易于扩展:新增查询类型只需在 contract 层定义,impl 层实现
这种设计模式体现了依赖倒置原则,业务层依赖抽象接口而非具体实现,使得系统更加灵活和可维护。
7. 数据存储层
数据库表结构
文件位置:docker/volumes/mysql/schema.sql
-- 插件草稿表
CREATE TABLE IF NOT EXISTS `plugin_draft` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',`creator_id` bigint(20) NOT NULL COMMENT '插件创建者ID',`name` varchar(255) NOT NULL COMMENT '插件名称',`description` text COMMENT '插件描述',`icon_uri` varchar(255) DEFAULT NULL COMMENT '插件图标URI',`plugin_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '插件类型',`config` json DEFAULT NULL COMMENT '插件配置信息',`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',PRIMARY KEY (`id`),KEY `idx_space_id` (`space_id`),KEY `idx_creator_id` (`creator_id`),KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='插件草稿表';-- 工作流元数据表
CREATE TABLE IF NOT EXISTS `workflow_meta` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',`creator_id` bigint(20) NOT NULL COMMENT '工作流创建者ID',`name` varchar(255) NOT NULL COMMENT '工作流名称',`description` text COMMENT '工作流描述',`icon_uri` varchar(255) DEFAULT NULL COMMENT '工作流图标URI',`workflow_config` json DEFAULT NULL COMMENT '工作流配置信息',`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '工作流状态',`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',PRIMARY KEY (`id`),KEY `idx_space_id` (`space_id`),KEY `idx_creator_id` (`creator_id`),KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工作流元数据表';-- 知识库表
CREATE TABLE IF NOT EXISTS `knowledge` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',`creator_id` bigint(20) NOT NULL COMMENT '知识库创建者ID',`name` varchar(255) NOT NULL COMMENT '知识库名称',`description` text COMMENT '知识库描述',`icon_uri` varchar(255) DEFAULT NULL COMMENT '知识库图标URI',`knowledge_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '知识库类型',`config` json DEFAULT NULL COMMENT '知识库配置信息',`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '知识库状态',`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',PRIMARY KEY (`id`),KEY `idx_space_id` (`space_id`),KEY `idx_creator_id` (`creator_id`),KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='知识库表';-- 提示词资源表
CREATE TABLE IF NOT EXISTS `prompt_resource` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',`creator_id` bigint(20) NOT NULL COMMENT '提示词创建者ID',`name` varchar(255) NOT NULL COMMENT '提示词名称',`description` text COMMENT '提示词描述',`content` text NOT NULL COMMENT '提示词内容',`category` varchar(100) DEFAULT NULL COMMENT '提示词分类',`tags` json DEFAULT NULL COMMENT '提示词标签',`version` varchar(50) DEFAULT '1.0' COMMENT '提示词版本',`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '提示词状态',`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',PRIMARY KEY (`id`),KEY `idx_space_id` (`space_id`),KEY `idx_creator_id` (`creator_id`),KEY `idx_category` (`category`),KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='提示词资源表';-- 在线数据库信息表
CREATE TABLE IF NOT EXISTS `online_database_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`space_id` bigint(20) NOT NULL COMMENT '工作空间ID',`creator_id` bigint(20) NOT NULL COMMENT '数据库创建者ID',`name` varchar(255) NOT NULL COMMENT '数据库名称',`description` text COMMENT '数据库描述',`database_type` varchar(50) NOT NULL COMMENT '数据库类型',`connection_config` json DEFAULT NULL COMMENT '数据库连接配置',`schema_info` json DEFAULT NULL COMMENT '数据库模式信息',`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '数据库状态',`created_at` bigint(20) NOT NULL COMMENT '创建时间(毫秒级)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间(毫秒级)',`deleted_at` bigint(20) DEFAULT NULL COMMENT '删除时间(毫秒级)',PRIMARY KEY (`id`),KEY `idx_space_id` (`space_id`),KEY `idx_creator_id` (`creator_id`),KEY `idx_database_type` (`database_type`),KEY `idx_deleted_at` (`deleted_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='在线数据库信息表';
ElasticSearch 索引结构
索引名称:coze_resource
用途:统一存储所有类型的资源信息,包括插件(plugin_draft)、工作流(workflow_meta)、知识库(knowledge)、提示词(prompt_resource)和数据库(online_database_info)等资源的元数据。
索引结构:
{"mappings": {"properties": {"res_id": {"type": "long","description": "资源ID,唯一标识"},"res_type": {"type": "integer","description": "资源类型:1=Plugin, 2=Workflow, 3=Imageflow, 4=Knowledge, 5=UI, 6=Prompt, 7=Database, 8=Variable, 9=Voice"},"res_sub_type": {"type": "integer","description": "资源子类型,可选"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword","ignore_above": 256}},"description": "资源名称,支持全文搜索"},"owner_id": {"type": "long","description": "资源所有者ID"},"space_id": {"type": "long","description": "工作空间ID"},"app_id": {"type": "long","description": "关联的应用ID,可选"},"biz_status": {"type": "long","description": "业务状态"},"publish_status": {"type": "integer","description": "发布状态"},"create_time": {"type": "long","description": "创建时间戳(毫秒)"},"update_time": {"type": "long","description": "更新时间戳(毫秒)"},"publish_time": {"type": "long","description": "发布时间戳(毫秒)"}}}
}
资源类型映射:
数据库表 | 资源类型 | ResType值 | 说明 |
---|---|---|---|
plugin_draft | Plugin | 1 | 插件资源 |
workflow_meta | Workflow | 2 | 工作流资源 |
knowledge | Knowledge | 4 | 知识库资源 |
prompt_resource | Prompt | 6 | 提示词资源 |
online_database_info | Database | 7 | 数据库资源 |
索引特点:
- 统一存储:所有类型的资源都存储在同一个索引中,通过
res_type
字段区分 - 高效搜索:支持按资源类型、名称、所有者、工作空间等多维度搜索
- 实时同步:通过事件总线机制,数据库变更会实时同步到ElasticSearch
- 权限隔离:通过
space_id
和owner_id
实现工作空间和用户级别的权限隔离 - 状态管理:支持资源的业务状态和发布状态管理
搜索功能:
- 全文搜索:支持按资源名称进行全文搜索
- 精确匹配:支持按资源ID、类型、所有者等进行精确匹配
- 范围查询:支持按时间范围、状态范围等进行查询
- 排序功能:支持按创建时间、更新时间、发布时间等排序
- 分页查询:支持高效的分页查询功能
数据同步机制:
通过 ResourceDomainEvent
事件和 resourceHandlerImpl
处理器实现数据库到ElasticSearch的实时同步:
// 资源索引名称
const resourceIndexName = "coze_resource"// 资源事件处理
func (s *resourceHandlerImpl) indexResource(ctx context.Context, opType entity.OpType, r *entity.ResourceDocument) error {switch opType {case entity.Created:return s.esClient.Create(ctx, resourceIndexName, conv.Int64ToStr(r.ResID), r)case entity.Updated:return s.esClient.Update(ctx, resourceIndexName, conv.Int64ToStr(r.ResID), r)case entity.Deleted:return s.esClient.Delete(ctx, resourceIndexName, conv.Int64ToStr(r.ResID))}return fmt.Errorf("unexpected op type: %v", opType)
}
使用场景:
- 资源搜索:在工作空间中搜索各类资源
- 资源列表:获取用户或工作空间的资源列表
- 资源推荐:基于使用频率和相关性推荐资源
- 权限控制:基于用户权限过滤可访问的资源
- 统计分析:资源使用情况统计和分析
8. 安全和权限验证机制
8.1 用户身份认证
JWT Token验证:
- 所有API请求都需要携带有效的JWT Token
- Token包含用户ID、工作空间权限等关键信息
- 通过中间件统一验证Token的有效性和完整性
// 用户身份验证中间件
func AuthMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {token := ctx.GetHeader("Authorization")if token == nil {ctx.JSON(401, gin.H{"error": "未授权访问"})ctx.Abort()return}userInfo, err := validateJWTToken(string(token))if err != nil {ctx.JSON(401, gin.H{"error": "Token无效"})ctx.Abort()return}ctx.Set("user_id", userInfo.UserID)ctx.Set("space_id", userInfo.SpaceID)ctx.Next()}
}
8.2 工作空间权限控制
空间隔离机制:
- 每个用户只能访问其所属工作空间的资源
- 通过
space_id
字段实现数据隔离 - 在数据库查询和ElasticSearch搜索中都强制添加空间过滤条件
// 工作空间权限验证
func (s *SearchApplicationService) validateSpacePermission(ctx context.Context, spaceID int64) error {userSpaceID := ctx.Value("space_id").(int64)if userSpaceID != spaceID {return errors.New("无权限访问该工作空间资源")}return nil
}
8.3 资源级权限验证
资源所有权验证:
- 验证用户是否为资源的创建者或有权限访问者
- 支持资源的读取、编辑、删除等不同级别权限
- 通过
owner_id
和权限表进行权限判断
权限矩阵:
资源类型 | 创建者权限 | 工作空间成员权限 | 访客权限 |
---|---|---|---|
插件 | 读写删除 | 读取 | 无 |
工作流 | 读写删除 | 读取、复制 | 无 |
知识库 | 读写删除 | 读取 | 无 |
提示词 | 读写删除 | 读取、使用 | 无 |
数据库 | 读写删除 | 读取 | 无 |
8.4 API访问控制
请求频率限制:
- 实现基于用户和IP的请求频率限制
- 防止恶意请求和资源滥用
- 支持不同API端点的差异化限流策略
参数验证:
- 严格验证所有输入参数的格式和范围
- 防止SQL注入、XSS等安全攻击
- 使用白名单机制验证资源类型等枚举值
// 参数验证示例
func validateSearchRequest(req *resource.SearchResourceRequest) error {if req.SpaceID <= 0 {return errors.New("无效的工作空间ID")}if len(req.Query) > 100 {return errors.New("搜索关键词过长")}for _, resType := range req.ResTypeFilter {if !isValidResourceType(resType) {return errors.New("无效的资源类型")}}return nil
}
9. 错误处理和日志记录
9.1 分层错误处理机制
错误分类体系:
// 错误类型定义
type ErrorType intconst (// 业务错误ErrBusiness ErrorType = iota + 1000ErrResourceNotFoundErrPermissionDeniedErrInvalidParameter// 系统错误ErrSystem ErrorType = iota + 2000ErrDatabaseConnectionErrElasticSearchTimeoutErrServiceUnavailable// 网络错误ErrNetwork ErrorType = iota + 3000ErrRequestTimeoutErrConnectionRefused
)
错误处理流程:
- 捕获阶段:在各层级捕获具体错误
- 包装阶段:添加上下文信息和错误码
- 记录阶段:根据错误级别记录日志
- 响应阶段:返回用户友好的错误信息
9.2 统一错误响应格式
// 统一错误响应结构
type ErrorResponse struct {Code int `json:"code"`Message string `json:"message"`Details string `json:"details,omitempty"`TraceID string `json:"trace_id"`
}// 错误处理中间件
func ErrorHandlerMiddleware() app.HandlerFunc {return func(c context.Context, ctx *app.RequestContext) {defer func() {if err := recover(); err != nil {traceID := ctx.GetString("trace_id")logs.CtxErrorf(c, "Panic recovered: %v, traceID: %s", err, traceID)ctx.JSON(500, ErrorResponse{Code: 5000,Message: "服务器内部错误",TraceID: traceID,})}}()ctx.Next()}
}
9.3 日志记录策略
日志级别定义:
- DEBUG:详细的调试信息,包括参数值、中间结果
- INFO:关键业务流程信息,如用户操作、资源访问
- WARN:潜在问题警告,如性能异常、参数异常
- ERROR:错误信息,包括业务错误和系统错误
- FATAL:严重错误,可能导致服务不可用
结构化日志格式:
// 日志记录示例
func (s *SearchApplicationService) SearchResources(ctx context.Context, req *resource.SearchResourceRequest) {traceID := generateTraceID()ctx = context.WithValue(ctx, "trace_id", traceID)// 记录请求开始logs.CtxInfof(ctx, "SearchResources started, userID=%d, spaceID=%d, query=%s, traceID=%s", req.UserID, req.SpaceID, req.Query, traceID)startTime := time.Now()defer func() {duration := time.Since(startTime)logs.CtxInfof(ctx, "SearchResources completed, duration=%dms, traceID=%s", duration.Milliseconds(), traceID)}()// 业务逻辑处理...
}
日志内容规范:
- 请求日志:记录用户ID、工作空间ID、请求参数、TraceID
- 业务日志:记录关键业务操作、数据变更、权限验证结果
- 性能日志:记录接口响应时间、数据库查询时间、ES查询时间
- 错误日志:记录错误堆栈、上下文信息、影响范围
9.4 监控和告警
关键指标监控:
- 接口性能:响应时间、QPS、错误率
- 资源使用:数据库连接数、ES查询延迟、内存使用率
- 业务指标:资源搜索成功率、用户活跃度、资源访问频次
告警策略:
- 错误率告警:当错误率超过5%时触发告警
- 性能告警:当接口响应时间超过2秒时触发告警
- 资源告警:当数据库连接数超过80%时触发告警
10. 资源列表查询流程图
10.1 LibraryResourceList接口完整调用流程
用户登录 Coze 平台点击"资源库"场景的后端处理流程:
用户点击"资源库" → 前端发起请求 → API网关路由 → Handler处理 → 业务服务层 → 数据查询层 → 响应返回↓ ↓ ↓ ↓ ↓ ↓ ↓
前端路由跳转 HTTP POST请求 路由匹配 参数验证 权限检查 ES查询 JSON响应↓ ↓ ↓ ↓ ↓ ↓ ↓
/space/:id/library /api/plugin_api/ Handler 请求绑定 用户身份 coze_ 资源列表library_resource 函数调用 参数校验 Session resource 数据返回_list LibraryResourceList 验证 索引查询↓SearchApplicationService↓构建ES查询条件↓执行资源搜索↓资源数据打包↓返回分页结果
10.2 详细流程说明
1. API网关层(路由处理)
文件位置:backend/api/handler/coze/resource_service.go
// @router /api/plugin_api/library_resource_list [POST]
func LibraryResourceList(ctx context.Context, c *app.RequestContext) {var err errorvar req resource.LibraryResourceListRequest// 1. 请求参数绑定和验证err = c.BindAndValidate(&req)if err != nil {invalidParamRequestResponse(c, err.Error())return}// 2. 业务参数校验if req.SpaceID <= 0 {invalidParamRequestResponse(c, "space_id is invalid")return}if req.GetSize() > 100 {invalidParamRequestResponse(c, "size is too large")return}// 3. 调用业务服务resp, err := search.SearchSVC.LibraryResourceList(ctx, &req)if err != nil {internalServerErrorResponse(ctx, c, err)return}// 4. 返回JSON响应c.JSON(consts.StatusOK, resp)
}
处理步骤:
- 路由匹配:
POST /api/plugin_api/library_resource_list
- 参数绑定:将HTTP请求体绑定到
LibraryResourceListRequest
结构体 - 参数验证:验证
space_id
有效性,限制size
不超过100 - 服务调用:调用搜索服务的
LibraryResourceList
方法 - 响应返回:返回JSON格式的响应数据
2. 业务服务层(SearchApplicationService)
文件位置:backend/application/search/resource_search.go
func (s *SearchApplicationService) LibraryResourceList(ctx context.Context, req *resource.LibraryResourceListRequest) (resp *resource.LibraryResourceListResponse, err error) {// 1. 用户身份验证userID := ctxutil.GetUIDFromCtx(ctx)if userID == nil {return nil, errorx.New(errno.ErrSearchPermissionCode, errorx.KV("msg", "session required"))}// 2. 构建搜索请求searchReq := &entity.SearchResourcesRequest{SpaceID: req.GetSpaceID(),OwnerID: 0, // 资源库查询不限制所有者Name: req.GetName(),ResTypeFilter: req.GetResTypeFilter(),PublishStatusFilter: req.GetPublishStatusFilter(),SearchKeys: req.GetSearchKeys(),Cursor: req.GetCursor(),Limit: req.GetSize(),}// 3. 执行资源搜索searchResp, err := s.searchDomainService.SearchResources(ctx, searchReq)if err != nil {return nil, err}// 4. 资源数据打包resourceList, err := s.packResources(ctx, searchResp.ResourceDocuments, req.GetIsGetImageflow())if err != nil {return nil, err}// 5. 构建响应resp = &resource.LibraryResourceListResponse{Code: 0,Msg: "success",ResourceList: resourceList,Cursor: searchResp.NextCursor,HasMore: searchResp.HasMore,}return resp, nil
}
核心功能:
- 身份验证:从上下文中提取用户ID,验证用户登录状态
- 权限检查:验证用户对指定工作空间的访问权限
- 查询构建:构建ElasticSearch查询条件
- 数据打包:调用资源打包器处理查询结果
- 响应组装:构建标准化的响应数据结构
3. 领域服务层(资源搜索)
核心功能:
- ES查询构建:根据过滤条件构建ElasticSearch查询DSL
- 索引查询:在
coze_resource
索引中执行搜索 - 结果处理:处理搜索结果,提取资源文档
查询条件包括:
space_id
:工作空间ID(必需)res_type_filter
:资源类型过滤(Plugin、Workflow、Knowledge、Prompt、Database)publish_status_filter
:发布状态过滤name
:资源名称模糊匹配search_keys
:自定义搜索字段
4. 数据库操作层
ElasticSearch查询:
- 索引:
coze_resource
- 查询类型:复合查询(bool query)
- 过滤条件:
term
:精确匹配(space_id、res_type等)match
:模糊匹配(name字段)range
:范围查询(时间等)
- 排序:按
update_time_ms
降序 - 分页:使用
search_after
游标分页
5. 资源打包器(ProjectPackager)
打包流程:
func (s *SearchApplicationService) packResources(ctx context.Context, docs []*entity.ResourceDocument, isGetImageflow bool) ([]*common.ResourceInfo, error) {var resourceList []*common.ResourceInfofor _, doc := range docs {// 1. 基础信息转换resourceInfo := &common.ResourceInfo{ResID: doc.GetResID(),ResType: doc.GetResType(),Name: doc.GetName(),CreateTimeMS: doc.GetCreateTimeMS(),UpdateTimeMS: doc.GetUpdateTimeMS(),PublishStatus: doc.GetPublishStatus(),}// 2. 根据资源类型调用对应打包器switch doc.GetResType() {case common.ResType_Plugin:err := s.packPluginResource(ctx, resourceInfo, doc)case common.ResType_Workflow:err := s.packWorkflowResource(ctx, resourceInfo, doc, isGetImageflow)case common.ResType_Knowledge:err := s.packKnowledgeResource(ctx, resourceInfo, doc)case common.ResType_Prompt:err := s.packPromptResource(ctx, resourceInfo, doc)case common.ResType_Database:err := s.packDatabaseResource(ctx, resourceInfo, doc)}// 3. 设置资源图标resourceInfo.IconURL = s.getResourceIconURL(doc.GetResType())resourceList = append(resourceList, resourceInfo)}return resourceList, nil
}
打包内容:
- 基础信息:资源ID、名称、类型、状态、时间等
- 类型特定信息:根据资源类型补充专有字段
- 扩展信息:图标URL、描述、标签等
- 权限信息:用户对资源的操作权限
6. 响应数据结构
LibraryResourceListResponse:
type LibraryResourceListResponse struct {Code int64 `json:"code"` // 响应码Msg string `json:"msg"` // 响应消息ResourceList []*common.ResourceInfo `json:"resource_list"` // 资源列表Cursor string `json:"cursor"` // 下一页游标HasMore bool `json:"has_more"` // 是否有更多数据BaseResp *base.BaseResp `json:"base_resp"` // 基础响应信息
}
ResourceInfo结构:
- 基础信息:ID、名称、类型、状态、时间等
- 扩展信息:图标URL、描述、标签等
- 类型特定信息:根据资源类型补充的专有字段
10.3 关键时间节点
阶段 | 预期耗时 | 超时阈值 | 说明 |
---|---|---|---|
参数验证 | <5ms | 50ms | 请求参数校验 |
身份验证 | <10ms | 100ms | Session验证 |
权限验证 | <20ms | 200ms | 工作空间权限查询 |
ES查询 | <100ms | 2000ms | ElasticSearch搜索 |
资源打包 | <50ms | 500ms | 数据转换和补充 |
总耗时 | <200ms | 3000ms | 端到端响应时间 |
10.4 技术特点
- 统一索引管理:所有资源类型统一存储在
coze_resource
索引中 - 灵活查询条件:支持多维度过滤和搜索
- 游标分页:使用ElasticSearch的
search_after
实现高效分页 - 资源类型打包:针对不同资源类型提供专门的数据打包逻辑
- 权限控制:基于用户Session和工作空间权限进行访问控制
11. 核心技术特点
11.1 分层架构设计
清晰的职责分离:
- API层:负责HTTP请求处理、参数验证、响应格式化
- 应用层:负责业务逻辑编排、服务组合、事务管理
- 领域层:负责核心业务逻辑、实体定义、业务规则
- 基础设施层:负责数据访问、外部服务集成、技术实现
依赖倒置原则:
- 高层模块不依赖低层模块,都依赖于抽象
- 通过接口定义实现解耦,便于测试和扩展
- 支持不同存储引擎和搜索引擎的灵活切换
11.2 ElasticSearch搜索引擎
统一索引设计:
- 所有资源类型统一存储在
coze_resource
索引中 - 通过
res_type
字段区分不同资源类型 - 支持全文搜索、精确匹配、范围查询等多种搜索模式
实时数据同步:
- 基于事件驱动架构实现数据库到ES的实时同步
- 支持增量更新,避免全量重建索引
- 异步处理机制确保主业务流程不受影响
高性能查询优化:
- 合理的字段映射和分析器配置
- 支持分页查询和游标查询
- 查询结果缓存和预加载机制
11.3 事件驱动架构
异步事件处理:
- 使用消息队列实现事件的异步处理
- 支持事件的重试和死信队列机制
- 确保数据最终一致性
事件类型定义:
type OpType stringconst (Created OpType = "created" // 资源创建事件Updated OpType = "updated" // 资源更新事件Deleted OpType = "deleted" // 资源删除事件
)
事件处理流程:
- 数据库操作完成后发布领域事件
- 事件总线接收并路由事件到相应处理器
- 处理器更新ElasticSearch索引
- 记录处理结果和异常信息
11.4 多数据源集成
MySQL关系型数据库:
- 存储结构化的资源元数据
- 支持ACID事务保证数据一致性
- 通过索引优化查询性能
ElasticSearch搜索引擎:
- 提供强大的全文搜索能力
- 支持复杂的聚合查询和统计分析
- 水平扩展能力强,支持大数据量
数据一致性保证:
- 以MySQL为主数据源,ES为搜索引擎
- 通过事件机制保证数据最终一致性
- 支持数据校验和修复机制
11.5 安全和权限控制
多层次权限验证:
- 用户身份认证(JWT Token)
- 工作空间权限控制(Space级别隔离)
- 资源级权限验证(Owner权限)
数据隔离机制:
- 通过
space_id
实现工作空间级别的数据隔离 - 在所有查询中强制添加空间过滤条件
- 防止跨空间的数据泄露
11.6 可观测性设计
全链路追踪:
- 为每个请求生成唯一的TraceID
- 在日志中记录TraceID便于问题排查
- 支持分布式链路追踪
结构化日志:
- 统一的日志格式和字段定义
- 支持日志聚合和分析
- 不同级别的日志记录策略
性能监控:
- 接口响应时间监控
- 数据库和ES查询性能监控
- 系统资源使用率监控
12. 总结
12.1 架构优势
Coze资源库后端采用了现代化的微服务架构设计,具有以下显著优势:
1. 高可扩展性
- 分层架构设计使得各层职责清晰,便于独立扩展
- ElasticSearch的水平扩展能力支持大规模数据处理
- 事件驱动架构支持异步处理,提高系统吞吐量
2. 高可用性
- 多数据源设计提供数据冗余和故障转移能力
- 异步事件处理确保主业务流程的稳定性
- 完善的错误处理和重试机制
3. 高性能
- ElasticSearch提供毫秒级的搜索响应
- 合理的索引设计和查询优化
- 缓存机制减少重复查询开销
4. 高安全性
- 多层次的权限验证机制
- 工作空间级别的数据隔离
- 完善的参数验证和安全防护
12.2 技术亮点
1. 统一资源索引设计
- 将不同类型的资源统一存储在一个ES索引中
- 通过资源类型字段实现逻辑分离
- 简化了索引管理和跨资源类型搜索
2. 事件驱动的数据同步
- 基于领域事件实现数据库到ES的实时同步
- 保证了数据的最终一致性
- 支持事件重放和数据修复
3. 分层的错误处理机制
- 统一的错误分类和处理流程
- 用户友好的错误信息返回
- 完善的日志记录和监控告警
通过以上的架构设计和技术实现,Coze资源库后端为用户提供了高效、安全、可靠的资源管理和搜索服务,为AI应用开发提供了强有力的基础设施支撑。