Coze源码分析-资源库-删除数据库-后端源码-基础设施/数据存储层
6. 基础设施层
基础设施层为数据库删除功能提供了核心的技术支撑,包括数据库连接、关系型数据库操作、缓存管理和搜索引擎等关键组件。这些组件通过契约层(Contract)和实现层(Implementation)的分离设计,确保了删除操作的可靠性、一致性和高性能。
6.1 数据库基础设施
数据库契约层
文件位置:backend/infra/contract/orm/database.go
package ormimport ("gorm.io/gorm"
)type DB = gorm.DB
设计作用:
- 为GORM数据库对象提供类型别名,统一数据库接口
- 作为契约层抽象,便于后续数据库实现的替换
- 为数据库相关的数据访问层提供统一的数据库连接接口
MySQL数据库实现
文件位置:backend/infra/impl/mysql/mysql.go
package mysqlimport ("fmt""os""gorm.io/driver/mysql""gorm.io/gorm"
)func New() (*gorm.DB, error) {dsn := os.Getenv("MYSQL_DSN")db, err := gorm.Open(mysql.Open(dsn))if err != nil {return nil, fmt.Errorf("mysql open, dsn: %s, err: %w", dsn, err)}return db, nil
}
在数据库删除中的作用:
- 为
DraftDatabaseInfo
和OnlineDatabaseInfo
DAO提供数据库连接,支持数据库信息的删除操作 - 通过GORM ORM框架,执行安全的数据库信息表删除操作
- 支持事务处理,确保数据库删除过程的数据一致性和原子性
- 连接池管理,提高数据库并发删除的性能和稳定性
删除操作初始化流程:
main.go → application.Init() → appinfra.Init() → mysql.New() → DatabaseDAO注入 → 执行删除
6.2 关系型数据库操作基础设施
数据库操作契约层
文件位置:backend/infra/contract/rdb/rdb.go
package rdbimport ("context"
)type Service interface {// 删除数据表DropTable(ctx context.Context, req *DropTableRequest) (*DropTableResponse, error)// 删除数据DeleteData(ctx context.Context, req *DeleteDataRequest) (*DeleteDataResponse, error)// 其他数据库操作方法...
}
MySQL实现层
文件位置:backend/infra/impl/rdb/mysql.go
// DeleteData delete data
func (m *mysqlService) DeleteData(ctx context.Context, req *rdb.DeleteDataRequest) (*rdb.DeleteDataResponse, error) {if req == nil {return nil, fmt.Errorf("invalid request")}whereClause, whereValues, err := m.buildWhereClause(req.Where)if err != nil {return nil, fmt.Errorf("failed to build where clause: %v", err)}limitClause := ""if req.Limit != nil {limitClause = fmt.Sprintf(" LIMIT %d", *req.Limit)}deleteSQL := fmt.Sprintf("DELETE FROM `%s`%s%s",req.TableName,whereClause,limitClause,)logs.CtxInfof(ctx, "[DeleteData] execute sql is %s, value is %v, req is %v", deleteSQL, whereValues, req)result := m.db.WithContext(ctx).Exec(deleteSQL, whereValues...)if result.Error != nil {return nil, fmt.Errorf("failed to delete data: %v", result.Error)}affectedRows := result.RowsAffectedreturn &rdb.DeleteDataResponse{AffectedRows: affectedRows}, nil
}
在数据库删除中的作用:
- 物理表删除:执行实际的数据库物理表删除操作,清理数据库资源
- 数据清理:提供删除数据库记录的基础操作
- 事务支持:支持在事务中执行复杂的删除操作
- SQL构建:安全构建删除SQL语句,防止SQL注入
- 日志记录:记录删除操作的SQL语句和参数,便于审计和问题排查
删除流程:
DatabaseService.DeleteDatabase → 执行事务 → 删除元数据 → DropTable → 提交事务
6.3 缓存系统基础设施
缓存契约层
文件位置:backend/infra/contract/cache/cache.go
package cachetype Cmdable interface {Pipeline() PipelinerStringCmdableHashCmdableGenericCmdableListCmdable
}type StringCmdable interface {Set(ctx context.Context, key string, value interface{}, expiration time.Duration) StatusCmdGet(ctx context.Context, key string) StringCmdIncrBy(ctx context.Context, key string, value int64) IntCmd
}
Redis缓存实现
文件位置:backend/infra/impl/cache/redis/redis.go
func New() cache.Cmdable {addr := os.Getenv("REDIS_ADDR")password := os.Getenv("REDIS_PASSWORD")return NewWithAddrAndPassword(addr, password)
}func NewWithAddrAndPassword(addr, password string) cache.Cmdable {rdb := redis.NewClient(&redis.Options{Addr: addr,Password: password,PoolSize: 100,MinIdleConns: 10,MaxIdleConns: 30,ConnMaxIdleTime: 5 * time.Minute,DialTimeout: 5 * time.Second,ReadTimeout: 3 * time.Second,WriteTimeout: 3 * time.Second,})return &redisImpl{client: rdb}
}
在数据库删除中的作用:
- 权限验证缓存:缓存用户权限信息,快速验证删除数据库权限
- 数据库信息缓存:缓存待删除数据库的基本信息,减少数据库查询
- 分布式锁:防止并发删除同一数据库,确保删除操作的原子性
- 删除状态缓存:临时存储删除操作的状态,支持删除进度查询
- 事件去重:缓存已处理的删除事件ID,避免重复处理
删除操作缓存使用场景:
1. 权限缓存:user_perm:{user_id}:{space_id}:{database_id}
2. 数据库缓存:database_info:{database_id}
3. 删除锁:lock:database_delete:{database_id}
4. 删除状态:delete_status:{database_id}:{operation_id}
5. 事件去重:event_processed:{event_id}
6.4 ElasticSearch搜索基础设施
ElasticSearch契约层
文件位置:backend/infra/contract/es/es.go
package estype Client interface {Create(ctx context.Context, index, id string, document any) errorUpdate(ctx context.Context, index, id string, document any) errorDelete(ctx context.Context, index, id string) errorSearch(ctx context.Context, index string, req *Request) (*Response, error)Exists(ctx context.Context, index string) (bool, error)CreateIndex(ctx context.Context, index string, properties map[string]any) error
}type BulkIndexer interface {Add(ctx context.Context, item BulkIndexerItem) errorClose(ctx context.Context) error
}
ElasticSearch实现层
文件位置:backend/infra/impl/es/es_impl.go
func New() (es.Client, error) {version := os.Getenv("ES_VERSION")switch version {case "7":return newES7Client()case "8":return newES8Client()default:return newES8Client() // 默认使用ES8}
}
在数据库删除中的作用:
- 索引删除:将删除的数据库从ES的
coze_resource
索引中移除 - 搜索结果更新:确保删除的数据库不再出现在搜索结果中
- 关联数据清理:清理与删除数据库相关的搜索索引和元数据
- 实时同步:数据库删除后实时从搜索引擎中移除
删除操作的索引处理:
{"operation": "delete","res_id": 123456789,"res_type": 1,"delete_time": 1703123456789,"operator_id": 987654321,"space_id": 111222333
}
删除索引执行流程:
1. 用户删除工作流 → API Gateway → WorkflowService.DeleteDraftWorkflow()
2. 执行数据库删除 → 发布删除事件 → ES删除处理器
3. 构建删除请求 → esClient.Delete(ctx, "coze_resource", workflowID)
4. 索引清理 → 验证删除结果 → 记录删除日志
6.5 基础设施层架构优势
依赖倒置原则
- 契约层抽象:业务层依赖接口而非具体实现
- 实现层解耦:可以灵活替换数据库、缓存、搜索引擎的具体实现
- 测试友好:通过Mock接口进行单元测试
配置驱动
- 环境变量配置:通过环境变量控制各组件的连接参数
- 版本兼容:支持ES7/ES8版本切换,数据库驱动切换
- 性能调优:连接池、超时时间等参数可配置
高可用设计
- 连接池管理:数据库和Redis连接池,提高并发性能
- 错误处理:完善的错误处理和重试机制
- 监控支持:提供性能指标和健康检查接口
扩展性支持
- 水平扩展:分布式ID生成支持多实例部署
- 存储扩展:支持分库分表、读写分离
- 搜索扩展:支持ES集群部署和索引分片
这种基础设施层的设计为工作流删除功能提供了稳定、高效、可扩展的技术底座,确保了删除操作在高并发场景下的安全性、一致性和可靠性。
7. 数据存储层
7.1 数据库表结构
bot_table_info 表设计
文件位置:helm/charts/opencoze/files/mysql/schema.sql
真实DDL结构:
CREATE TABLE IF NOT EXISTS `bot_table_info` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',`table_name` varchar(255) NOT NULL COMMENT '表名',`table_desc` varchar(500) DEFAULT NULL COMMENT '表描述',`creator_id` bigint NOT NULL COMMENT '创建者ID',`space_id` bigint NOT NULL COMMENT '工作空间ID',`actual_table_name` varchar(255) NOT NULL COMMENT '实际物理表名',`fields` longtext NOT NULL COMMENT '表字段定义JSON',`status` tinyint NOT NULL DEFAULT 1 COMMENT '状态:0草稿 1在线 2删除',`version` int NOT NULL DEFAULT 0 COMMENT '版本号',`draft_id` bigint NOT NULL COMMENT '对应草稿ID',`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT '创建时间戳',`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT '更新时间戳',PRIMARY KEY (`id`),UNIQUE KEY `idx_table_name_space` (`table_name`,`space_id`),KEY `idx_creator_id` (`creator_id`),KEY `idx_space_id` (`space_id`),KEY `idx_draft_id` (`draft_id`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT='数据库元数据表';
表结构特点:
- 元数据存储:存储数据库的元信息,不存储实际数据
- 空间隔离:通过
space_id
实现多租户数据隔离 - JSON存储:
fields
字段使用JSON格式存储表结构定义 - 关联关系:通过
draft_id
关联草稿表和在线表 - 索引优化:在关键查询字段上建立索引,特别是表名+空间ID的唯一索引
- 版本控制:使用
version
字段支持乐观锁机制
bot_table_info字段详解:
id
:自增主键,数据库唯一标识table_name
:数据库表名,用户可见的名称table_desc
:数据库描述信息creator_id
:创建者用户ID,用于权限验证space_id
:所属工作空间ID,用于空间隔离actual_table_name
:实际物理表名,系统内部使用fields
:表字段定义JSON,存储表结构信息status
:状态标识version
:版本号,用于乐观锁draft_id
:对应草稿表的ID,关联草稿信息created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
bot_table_info_draft 表设计
真实DDL结构:
CREATE TABLE IF NOT EXISTS `bot_table_info_draft` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',`table_name` varchar(255) NOT NULL COMMENT '表名',`table_desc` varchar(500) DEFAULT NULL COMMENT '表描述',`creator_id` bigint NOT NULL COMMENT '创建者ID',`space_id` bigint NOT NULL COMMENT '工作空间ID',`actual_table_name` varchar(255) NOT NULL COMMENT '实际物理表名',`fields` longtext NOT NULL COMMENT '表字段定义JSON',`status` tinyint NOT NULL DEFAULT 0 COMMENT '状态:0草稿 1在线 2删除',`version` int NOT NULL DEFAULT 0 COMMENT '版本号',`created_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT '创建时间戳',`updated_at` bigint unsigned NOT NULL DEFAULT 0 COMMENT '更新时间戳',PRIMARY KEY (`id`),KEY `idx_creator_id` (`creator_id`),KEY `idx_space_id` (`space_id`)
) ENGINE=InnoDB CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT='数据库草稿表';
表结构特点:
- 草稿存储:存储数据库的草稿版本信息
- 空间隔离:通过
space_id
实现多租户数据隔离 - 与在线表结构一致:保持与在线表相似的结构设计
- 索引设计:针对常用查询场景优化索引结构
bot_table_info_draft字段详解:
id
:草稿表主键ID,自增table_name
:数据库表名table_desc
:数据库描述信息creator_id
:创建者用户ID,用于权限验证space_id
:所属工作空间ID,用于空间隔离actual_table_name
:实际物理表名fields
:表字段定义JSON,存储表结构信息status
:状态标识,默认草稿状态version
:版本号,用于乐观锁created_at
/updated_at
:毫秒级时间戳,记录创建和更新时间
7.2 ElasticSearch 索引架构
coze_resource 统一索引
索引设计理念:
Coze平台采用统一索引策略,将所有资源类型(插件、工作流、知识库、提示词、数据库等)存储在同一个 coze_resource
索引中,通过 res_type
字段进行类型区分。
数据库在索引中的映射:
{"mappings": {"properties": {"res_id": {"type": "long","description": "资源ID,对应bot_table_info.id"},"res_type": {"type": "integer", "description": "资源类型,数据库为7"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword","ignore_above": 256}},"description": "数据库表名,支持全文搜索和精确匹配"},"owner_id": {"type": "long","description": "所有者ID"},"space_id": {"type": "long","description": "工作空间ID"},"status": {"type": "integer","description": "数据库状态"},"create_time": {"type": "long","description": "创建时间戳(毫秒)"},"update_time": {"type": "long","description": "更新时间戳(毫秒)"}}}
}
资源类型常量定义:
const (ResTypePlugin = 1 // 插件ResTypeWorkflow = 2 // 工作流ResTypeKnowledge = 4 // 知识库ResTypePrompt = 6 // 提示词ResTypeDatabase = 7 // 数据库
)
7.3 数据同步机制
事件驱动的删除同步架构
删除同步流程:
- 删除操作触发:数据库删除操作触发删除领域事件
- 事件发布:通过事件总线发布
ResourceDomainEvent
删除事件 - 事件处理:
resourceHandlerImpl
监听并处理删除事件 - 索引清理:将删除操作同步到ElasticSearch,移除相关索引
删除同步核心代码:
// 资源删除事件处理器
type resourceHandlerImpl struct {esClient es.Clientlogger logs.Logger
}// 处理数据库删除领域事件
func (r *resourceHandlerImpl) HandleDatabaseDeleteEvent(ctx context.Context, event *entity.ResourceDomainEvent) error {if event.OpType != entity.Deleted {return fmt.Errorf("invalid operation type for delete handler: %v", event.OpType)}// 记录删除操作日志r.logger.InfoCtx(ctx, "Processing database delete event", "database_id", event.ResID,"space_id", event.SpaceID,"operator_id", event.OperatorID)return r.deleteFromIndex(ctx, event.ResID)
}// 从索引中删除数据库
func (r *resourceHandlerImpl) deleteFromIndex(ctx context.Context, databaseID int64) error {indexName := "coze_resource"docID := conv.Int64ToStr(databaseID)// 执行索引删除err := r.esClient.Delete(ctx, indexName, docID)if err != nil {r.logger.ErrorCtx(ctx, "Failed to delete database from index", "database_id", databaseID, "error", err)return fmt.Errorf("delete database from ES index failed: %w", err)}// 验证删除结果exists, checkErr := r.esClient.Exists(ctx, indexName, docID)if checkErr != nil {r.logger.WarnCtx(ctx, "Failed to verify deletion", "database_id", databaseID, "error", checkErr)} else if exists {r.logger.ErrorCtx(ctx, "Database still exists in index after deletion", "database_id", databaseID)return fmt.Errorf("database deletion verification failed")}r.logger.InfoCtx(ctx, "Successfully deleted database from index", "database_id", databaseID)// 检查物理表资源状态if err := r.checkPhysicalTableResource(ctx); err != nil {return fmt.Errorf("physical table resource check failed: %w", err)}
}
7.4 数据库删除操作存储层设计原则
数据库删除数据一致性保证
- 删除一致性:采用事件驱动模式,保证MySQL删除和ElasticSearch索引清理的最终一致性
- 删除幂等性:数据库删除操作支持重试,避免重复删除导致的异常
- 删除事务边界:数据库删除操作在同一事务中完成在线库和草稿库的删除,保证原子性
- 删除验证:数据库删除完成后验证数据确实被移除,确保删除操作的完整性
- 物理表删除:除了删除元数据,还需要删除实际的物理数据表,确保资源完全释放
数据库删除性能优化策略
- 删除索引优化:基于数据库主键ID的删除操作,具有最佳性能
- 异步删除处理:数据库索引删除事件处理采用异步模式,不阻塞删除主流程
- 删除缓存清理:及时清理数据库相关缓存,避免删除后的脏数据
- 事务隔离级别:使用适当的事务隔离级别,平衡一致性和性能
数据库删除操作扩展性考虑
- 分片删除:支持按
space_id
进行分片删除,提高大规模数据库删除的效率 - 删除队列:使用消息队列处理数据库删除事件,支持高并发删除场景
- 删除监控:独立的数据库删除操作监控,及时发现删除异常
- 物理资源管理:统一管理数据库物理表资源,支持不同存储引擎的扩展
数据库删除安全保障
- 权限验证:严格的数据库删除权限验证,确保只有授权用户可以删除
- 删除审计:完整的数据库删除操作审计日志,支持删除行为追踪
- 删除确认:重要数据库删除前的二次确认机制
- 删除恢复:通过备份支持数据库数据恢复
- 引用检查:删除前检查数据库是否被其他资源引用
7.5 数据库删除操作监控和运维
数据库删除操作监控
// 数据库删除操作监控指标
type DatabaseDeleteMetrics struct {DatabaseDeleteSuccessCount int64 // 数据库删除成功次数DatabaseDeleteFailureCount int64 // 数据库删除失败次数DatabaseDeleteLatency time.Duration // 数据库删除操作延迟LastDatabaseDeleteTime time.Time // 最后数据库删除时间DatabaseIndexCleanupCount int64 // 数据库索引清理次数DatabaseDeleteEventCount int64 // 数据库删除事件处理次数DatabaseDeleteQueueSize int64 // 数据库删除队列大小PhysicalTableDropCount int64 // 物理表删除次数PhysicalTableDropLatency time.Duration // 物理表删除延迟
}// 数据库删除监控指标收集
func (r *resourceHandlerImpl) collectDatabaseDeleteMetrics(ctx context.Context, startTime time.Time, databaseID int64, err error) {latency := time.Since(startTime)if err != nil {metrics.DatabaseDeleteFailureCount++log.ErrorCtx(ctx, "database delete failed", "database_id", databaseID, "error", err, "latency", latency)} else {metrics.DatabaseDeleteSuccessCount++metrics.DatabaseDeleteLatency = latencymetrics.LastDatabaseDeleteTime = time.Now()log.InfoCtx(ctx, "database delete succeeded", "database_id", databaseID, "latency", latency)}
}// 数据库删除操作健康检查
func (r *resourceHandlerImpl) databaseDeleteHealthCheck(ctx context.Context) error {// 检查数据库连接if err := r.db.Ping(); err != nil {return fmt.Errorf("database connection failed: %w", err)}// 检查ES连接if _, err := r.esClient.Ping(ctx); err != nil {return fmt.Errorf("elasticsearch connection failed: %w", err)}// 检查数据库删除队列状态if queueSize := r.getDatabaseDeleteQueueSize(); queueSize > 1000 {return fmt.Errorf("database delete queue size too large: %d", queueSize)}
数据库删除数据质量保证
- 删除一致性检查:定期验证MySQL和ElasticSearch中数据库删除数据的一致性
- 删除完整性验证:确保数据库删除操作完全清理了相关数据、索引和物理表
- 删除异常恢复:提供数据库删除失败的重试和修复机制
- 删除性能监控:监控数据库删除操作性能,特别是物理表删除的资源消耗
- 删除审计追踪:完整记录数据库删除操作的执行过程和结果
- 物理资源验证:确认物理表已成功删除,避免资源泄漏