Coze源码分析-资源库-编辑数据库-后端源码-数据存储层
7. 数据存储层
数据存储层是资源库编辑数据库功能的基础,负责处理数据库元数据和物理表结构的持久化存储,同时确保在线数据和草稿数据的一致性。
7.1 数据库表结构设计
资源库编辑数据库功能采用了双层表结构设计,分别管理数据库元信息和物理表结构,同时支持在线版本和草稿版本的分离存储。
7.1.1 数据库元信息表设计
online_database_info表:存储已发布的数据库元信息
CREATE TABLE `online_database_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '数据库ID',`space_id` bigint(20) NOT NULL COMMENT '所属空间ID',`name` varchar(255) NOT NULL COMMENT '数据库名称',`description` text COMMENT '数据库描述',`creator_id` bigint(20) NOT NULL COMMENT '创建者ID',`updater_id` bigint(20) NOT NULL COMMENT '更新者ID',`created_at` bigint(20) NOT NULL COMMENT '创建时间戳(毫秒)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间戳(毫秒)',`version` bigint(20) NOT NULL COMMENT '版本号',PRIMARY KEY (`id`),KEY `idx_space_id` (`space_id`),KEY `idx_creator_id` (`creator_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='在线数据库信息表';
draft_database_info表:存储编辑中的数据库草稿信息
CREATE TABLE `draft_database_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '草稿ID',`database_id` bigint(20) NOT NULL COMMENT '关联的数据库ID',`space_id` bigint(20) NOT NULL COMMENT '所属空间ID',`name` varchar(255) NOT NULL COMMENT '数据库名称',`description` text COMMENT '数据库描述',`editor_id` bigint(20) NOT NULL COMMENT '编辑者ID',`created_at` bigint(20) NOT NULL COMMENT '创建时间戳(毫秒)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间戳(毫秒)',`is_dirty` tinyint(4) NOT NULL DEFAULT '1' COMMENT '是否有未保存的修改',PRIMARY KEY (`id`),KEY `idx_database_id` (`database_id`),KEY `idx_space_id` (`space_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库草稿信息表';
7.1.2 物理表结构存储表设计
physical_table_definition表:存储物理表的结构定义
CREATE TABLE `physical_table_definition` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '表定义ID',`database_id` bigint(20) NOT NULL COMMENT '所属数据库ID',`table_name` varchar(255) NOT NULL COMMENT '表名称',`table_comment` varchar(1000) DEFAULT NULL COMMENT '表描述',`is_draft` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否为草稿表',`created_at` bigint(20) NOT NULL COMMENT '创建时间戳(毫秒)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间戳(毫秒)',PRIMARY KEY (`id`),UNIQUE KEY `uk_database_table` (`database_id`,`table_name`,`is_draft`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='物理表定义表';
column_definition表:存储表的字段定义
CREATE TABLE `column_definition` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '字段定义ID',`table_id` bigint(20) NOT NULL COMMENT '所属表ID',`column_name` varchar(255) NOT NULL COMMENT '字段名称',`column_type` varchar(50) NOT NULL COMMENT '字段类型',`is_nullable` tinyint(4) NOT NULL DEFAULT '1' COMMENT '是否允许为空',`default_value` varchar(255) DEFAULT NULL COMMENT '默认值',`column_comment` varchar(1000) DEFAULT NULL COMMENT '字段描述',`is_primary_key` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否为主键',`position` int(11) NOT NULL COMMENT '字段顺序',`created_at` bigint(20) NOT NULL COMMENT '创建时间戳(毫秒)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间戳(毫秒)',PRIMARY KEY (`id`),UNIQUE KEY `uk_table_column` (`table_id`,`column_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='列定义表';
7.2 索引与查询优化架构
为了支持高效的数据库编辑操作,系统设计了多层次的索引结构,包括主键索引、唯一索引和复合索引,以满足不同查询场景的性能需求。
7.2.1 索引设计原则
- 唯一性保证:通过唯一索引确保数据库名称和表名在同一空间内的唯一性
- 高效查询:针对常见查询场景(如按空间查询、按创建者查询)建立合适的索引
- 更新性能:平衡查询性能和更新性能,避免过多的索引导致写入性能下降
- 外键关联:确保表之间的外键关联建立合适的索引以提高连接查询性能
7.2.2 关键索引映射
// 数据库查询索引映射示例
var databaseIndexMappings = []IndexMapping{{ // 空间内数据库查询优化Table: "online_database_info",IndexName: "idx_space_id",Columns: []string{"space_id"},UsagePattern: "空间内数据库列表查询",QueryExample: "SELECT * FROM online_database_info WHERE space_id = ? ORDER BY updated_at DESC",},{ // 表定义查询优化Table: "physical_table_definition",IndexName: "uk_database_table",Columns: []string{"database_id", "table_name", "is_draft"},IsUnique: true,UsagePattern: "快速定位特定数据库的特定表",QueryExample: "SELECT * FROM physical_table_definition WHERE database_id = ? AND table_name = ? AND is_draft = ?",},{ // 字段定义查询优化Table: "column_definition",IndexName: "idx_table_id",Columns: []string{"table_id"},UsagePattern: "获取表的所有字段定义",QueryExample: "SELECT * FROM column_definition WHERE table_id = ? ORDER BY position",},
}
7.2.3 数据库内容专用索引
对于数据库编辑功能,系统还设计了特定的索引结构来支持复杂查询场景:
// 数据库编辑操作相关索引
const (// 索引名称常量定义IndexOnlineDatabaseSpaceAndUpdatedAt = "idx_online_db_space_updated"IndexDraftDatabaseEditorAndUpdatedAt = "idx_draft_db_editor_updated"IndexPhysicalTableDraftStatus = "idx_physical_table_draft"
)// 索引使用示例
func getDatabaseEditHistory(ctx context.Context, databaseID int64, limit int) ([]DatabaseHistory, error) {var histories []DatabaseHistory// 使用索引查询数据库编辑历史err := db.WithContext(ctx).Table("database_edit_history").Where("database_id = ?", databaseID).Order("edit_time DESC").Limit(limit).Find(&histories).Errorreturn histories, err
}
7.3 物理表更新机制
数据库编辑功能的核心是支持物理表结构的动态更新,包括添加、修改和删除字段。系统采用了事务机制确保表结构更新的原子性和一致性。
7.3.1 字段更新处理流程
物理表字段更新的核心逻辑位于domain/memory/database/internal/physicaltable/physical.go
文件中:
// UpdateFieldInfo 处理字段更新和删除逻辑
func (p *PhysicalTable) UpdateFieldInfo(ctx context.Context, fieldInfos []*model.FieldInfo) error {// 1. 参数验证if len(fieldInfos) == 0 {return nil}// 2. 查找现有字段existingFields, err := p.GetFieldInfo(ctx)if err != nil {return err}// 3. 构建字段映射关系existingFieldMap := make(map[string]*model.FieldInfo)for _, field := range existingFields {existingFieldMap[field.FieldName] = field}// 4. 确定需要新增、修改和删除的字段var addFields, updateFields []*model.FieldInfovar deleteFieldNames []string// 4.1 处理新增和修改for _, newField := range fieldInfos {if existingField, exists := existingFieldMap[newField.FieldName]; exists {// 字段存在,检查是否需要更新if !isFieldEqual(existingField, newField) {updateFields = append(updateFields, newField)}// 从映射中删除,剩余的即为需要删除的字段delete(existingFieldMap, newField.FieldName)} else {// 字段不存在,需要新增addFields = append(addFields, newField)}}// 4.2 确定需要删除的字段for fieldName := range existingFieldMap {deleteFieldNames = append(deleteFieldNames, fieldName)}// 5. 执行字段更新操作return p.updatePhysicalTableFields(ctx, addFields, updateFields, deleteFieldNames)
}// 比较字段是否相等
func isFieldEqual(field1, field2 *model.FieldInfo) bool {return field1.FieldType == field2.FieldType &&field1.IsPrimaryKey == field2.IsPrimaryKey &&field1.IsNullable == field2.IsNullable &&field1.DefaultValue == field2.DefaultValue &&field1.Comment == field2.Comment
}// 执行物理表字段更新操作
func (p *PhysicalTable) updatePhysicalTableFields(ctx context.Context, addFields, updateFields []*model.FieldInfo, deleteFieldNames []string) error {// 1. 开始事务tx := p.db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()// 2. 执行字段添加if err := p.addFields(ctx, tx, addFields); err != nil {tx.Rollback()return err}// 3. 执行字段更新if err := p.updateFields(ctx, tx, updateFields); err != nil {tx.Rollback()return err}// 4. 执行字段删除if err := p.deleteFields(ctx, tx, deleteFieldNames); err != nil {tx.Rollback()return err}// 5. 提交事务return tx.Commit().Error
}
7.3.2 物理表结构更新实现
物理表结构更新的核心实现位于domain/memory/database/internal/physicaltable/physical.go
文件中的UpdatePhysicalTableWithDrops
方法:
// UpdatePhysicalTableWithDrops 物理表结构更新实现
func (p *PhysicalTable) UpdatePhysicalTableWithDrops(ctx context.Context, tableName string, fieldInfos []*model.FieldInfo) error {// 1. 检查表是否存在exists, err := p.tableExists(ctx, tableName)if err != nil {return err}// 2. 如果表不存在,创建新表if !exists {return p.createTable(ctx, tableName, fieldInfos)}// 3. 获取表的当前结构currentFields, err := p.GetFieldInfo(ctx)if err != nil {return err}// 4. 执行字段更新逻辑return p.UpdateFieldInfo(ctx, fieldInfos)
}// 获取表的字段信息
func (p *PhysicalTable) GetFieldInfo(ctx context.Context) ([]*model.FieldInfo, error) {var fields []*model.FieldInfo// 查询列定义表获取字段信息err := p.db.WithContext(ctx).Table("column_definition").Where("table_id = ?", p.tableID).Order("position").Scan(&fields).Errorreturn fields, err
}// 检查表是否存在
func (p *PhysicalTable) tableExists(ctx context.Context, tableName string) (bool, error) {var count int64err := p.db.WithContext(ctx).Table("information_schema.tables").Where("table_schema = ? AND table_name = ?", p.dbName, tableName).Count(&count).Errorreturn count > 0, err
}// 创建新表
func (p *PhysicalTable) createTable(ctx context.Context, tableName string, fieldInfos []*model.FieldInfo) error {// 1. 构建CREATE TABLE SQLsql := p.buildCreateTableSQL(tableName, fieldInfos)// 2. 执行SQL创建表return p.db.WithContext(ctx).Exec(sql).Error
}// 构建CREATE TABLE SQL
func (p *PhysicalTable) buildCreateTableSQL(tableName string, fieldInfos []*model.FieldInfo) string {var sb strings.Buildersb.WriteString(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (\n", p.quoteIdentifier(tableName)))var columns []stringvar primaryKeys []stringfor _, field := range fieldInfos {colDef := p.buildColumnDefinition(field)columns = append(columns, colDef)if field.IsPrimaryKey {primaryKeys = append(primaryKeys, p.quoteIdentifier(field.FieldName))}}sb.WriteString(strings.Join(columns, ",\n"))if len(primaryKeys) > 0 {sb.WriteString(",\n PRIMARY KEY (" + strings.Join(primaryKeys, ", ") + ")")}sb.WriteString("\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4")return sb.String()
}
7.4 事务管理与并发控制
数据库编辑功能采用严格的事务管理机制,确保多表操作的原子性,同时实现了并发编辑控制,防止数据冲突。
7.4.1 数据库编辑事务管理
在domain/memory/database/service/database_impl.go
文件中,UpdateDatabase
方法实现了数据库编辑的事务管理:
// UpdateDatabase 实现数据库编辑的事务管理
func (s *DatabaseDomainService) UpdateDatabase(ctx context.Context, req *UpdateDatabaseRequest) (*UpdateDatabaseResponse, error) {// 1. 开始事务tx := s.db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()var response *UpdateDatabaseResponse// 2. 执行数据库编辑操作if req.IsDraft {// 处理草稿编辑response, err = s.updateDraftDatabase(ctx, tx, req)} else {// 处理在线数据库编辑response, err = s.updateOnlineDatabase(ctx, tx, req)}if err != nil {tx.Rollback()return nil, err}// 3. 提交事务if err := tx.Commit().Error; err != nil {return nil, fmt.Errorf("commit transaction failed: %w", err)}return response, nil
}// 更新草稿数据库
func (s *DatabaseDomainService) updateDraftDatabase(ctx context.Context, tx *gorm.DB, req *UpdateDatabaseRequest) (*UpdateDatabaseResponse, error) {// 1. 检查并发编辑冲突if err := s.checkConcurrentEdit(ctx, tx, req.DatabaseID, req.LastEditVersion); err != nil {return nil, err}// 2. 更新草稿数据库信息draftInfo := &DraftDatabaseInfo{DatabaseID: req.DatabaseID,SpaceID: req.SpaceID,Name: req.Name,Description: req.Description,EditorID: req.EditorID,UpdatedAt: time.Now().UnixMilli(),IsDirty: true,}if err := tx.WithContext(ctx).Where("database_id = ?", req.DatabaseID).Assign(draftInfo).FirstOrCreate(&DraftDatabaseInfo{}).Error; err != nil {return nil, fmt.Errorf("update draft database info failed: %w", err)}// 3. 更新物理表结构if err := s.updatePhysicalTables(ctx, tx, req.DatabaseID, req.Tables, true); err != nil {return nil, fmt.Errorf("update physical tables failed: %w", err)}return &UpdateDatabaseResponse{DatabaseID: req.DatabaseID,DraftID: draftInfo.ID,}, nil
}// 检查并发编辑冲突
func (s *DatabaseDomainService) checkConcurrentEdit(ctx context.Context, tx *gorm.DB, databaseID int64, lastEditVersion int64) error {var onlineDB OnlineDatabaseInfoif err := tx.WithContext(ctx).Where("id = ?", databaseID).First(&onlineDB).Error; err != nil {return err}if onlineDB.Version > lastEditVersion {return errorx.New(errno.ErrDatabaseConcurrentEditCode, errorx.KV("msg", "数据库已被其他人修改,请重新加载后再编辑"),errorx.KV("database_id", databaseID),errorx.KV("current_version", onlineDB.Version),errorx.KV("last_edit_version", lastEditVersion))}return nil
}
7.4.2 在线/草稿表同步更新
数据库编辑功能实现了在线表和草稿表的同步更新机制,确保表结构的一致性:
// 更新物理表结构
func (s *DatabaseDomainService) updatePhysicalTables(ctx context.Context, tx *gorm.DB, databaseID int64, tables []*model.TableInfo, isDraft bool) error {for _, table := range tables {// 1. 获取或创建表定义var tableDef PhysicalTableDefinitionif err := tx.WithContext(ctx).Where("database_id = ? AND table_name = ? AND is_draft = ?", databaseID, table.TableName, isDraft,).FirstOrCreate(&tableDef, PhysicalTableDefinition{DatabaseID: databaseID,TableName: table.TableName,TableComment: table.Comment,IsDraft: isDraft,}).Error; err != nil {return err}// 2. 创建PhysicalTable实例physicalTable := &PhysicalTable{db: tx,dbName: s.getDatabaseName(databaseID),tableID: tableDef.ID,tableName: table.TableName,}// 3. 执行字段更新if err := physicalTable.UpdatePhysicalTableWithDrops(ctx, table.TableName, table.Fields); err != nil {return err}}return nil
}
7.5 数据迁移与备份
为了保障数据安全,系统实现了完善的数据迁移和备份机制,支持数据库编辑前后的数据结构变化追踪和回滚。
7.5.1 编辑历史记录
系统记录每次数据库编辑的详细历史,以便追踪变更和实现回滚功能:
// 记录数据库编辑历史
func (s *DatabaseDomainService) recordEditHistory(ctx context.Context, tx *gorm.DB, req *UpdateDatabaseRequest, changes []*model.SchemaChange) error {history := &DatabaseEditHistory{DatabaseID: req.DatabaseID,EditorID: req.EditorID,EditTime: time.Now().UnixMilli(),SchemaChanges: json.Marshal(changes),Version: req.LastEditVersion + 1,}return tx.WithContext(ctx).Create(history).Error
}// Schema变更记录结构
func generateSchemaChanges(oldTables, newTables []*model.TableInfo) []*model.SchemaChange {var changes []*model.SchemaChange// 1. 找出新增的表for _, newTable := range newTables {if !tableExistsIn(newTable.TableName, oldTables) {changes = append(changes, &model.SchemaChange{ChangeType: model.ChangeTypeCreateTable,TableName: newTable.TableName,TableComment: newTable.Comment,Fields: newTable.Fields,})}}// 2. 比较现有表的结构变化for _, newTable := range newTables {if oldTable := findTableByName(newTable.TableName, oldTables); oldTable != nil {fieldChanges := compareFieldChanges(oldTable.Fields, newTable.Fields)if len(fieldChanges) > 0 {changes = append(changes, &model.SchemaChange{ChangeType: model.ChangeTypeAlterTable,TableName: newTable.TableName,TableComment: newTable.Comment,FieldChanges: fieldChanges,})}}}// 3. 找出删除的表for _, oldTable := range oldTables {if !tableExistsIn(oldTable.TableName, newTables) {changes = append(changes, &model.SchemaChange{ChangeType: model.ChangeTypeDropTable,TableName: oldTable.TableName,})}}return changes
}
7.5.2 数据结构备份机制
在执行数据库结构变更前,系统会自动创建数据结构备份,以便在需要时进行回滚:
// 创建数据库结构备份
func (s *DatabaseDomainService) createStructureBackup(ctx context.Context, tx *gorm.DB, databaseID int64, operation string) error {// 1. 获取当前数据库结构tables, err := s.getDatabaseStructure(ctx, databaseID)if err != nil {return err}// 2. 创建备份记录backup := &DatabaseStructureBackup{DatabaseID: databaseID,Operation: operation,BackupTime: time.Now().UnixMilli(),Structure: json.Marshal(tables),Version: s.getCurrentVersion(ctx, tx, databaseID),}return tx.WithContext(ctx).Create(backup).Error
}// 从备份恢复数据库结构
func (s *DatabaseDomainService) restoreFromBackup(ctx context.Context, databaseID int64, backupID int64) error {// 1. 获取备份记录var backup DatabaseStructureBackupif err := s.db.WithContext(ctx).Where("id = ? AND database_id = ?", backupID, databaseID).First(&backup).Error; err != nil {return err}// 2. 解析备份的结构信息var tables []*model.TableInfoif err := json.Unmarshal(backup.Structure, &tables); err != nil {return err}// 3. 执行结构恢复tx := s.db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()// 3.1 恢复表结构if err := s.updatePhysicalTables(ctx, tx, databaseID, tables, false); err != nil {tx.Rollback()return err}// 3.2 更新版本号if err := tx.WithContext(ctx).Model(&OnlineDatabaseInfo{}).Where("id = ?", databaseID).Update("version", backup.Version).Error; err != nil {tx.Rollback()return err}return tx.Commit().Error
}