当前位置: 首页 > news >正文

Coze源码分析-资源库-编辑数据库-后端源码-数据存储层

7. 数据存储层

数据存储层是资源库编辑数据库功能的基础,负责处理数据库元数据和物理表结构的持久化存储,同时确保在线数据和草稿数据的一致性。

7.1 数据库表结构设计

资源库编辑数据库功能采用了双层表结构设计,分别管理数据库元信息和物理表结构,同时支持在线版本和草稿版本的分离存储。

7.1.1 数据库元信息表设计

online_database_info表:存储已发布的数据库元信息

CREATE TABLE `online_database_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '数据库ID',`space_id` bigint(20) NOT NULL COMMENT '所属空间ID',`name` varchar(255) NOT NULL COMMENT '数据库名称',`description` text COMMENT '数据库描述',`creator_id` bigint(20) NOT NULL COMMENT '创建者ID',`updater_id` bigint(20) NOT NULL COMMENT '更新者ID',`created_at` bigint(20) NOT NULL COMMENT '创建时间戳(毫秒)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间戳(毫秒)',`version` bigint(20) NOT NULL COMMENT '版本号',PRIMARY KEY (`id`),KEY `idx_space_id` (`space_id`),KEY `idx_creator_id` (`creator_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='在线数据库信息表';

draft_database_info表:存储编辑中的数据库草稿信息

CREATE TABLE `draft_database_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '草稿ID',`database_id` bigint(20) NOT NULL COMMENT '关联的数据库ID',`space_id` bigint(20) NOT NULL COMMENT '所属空间ID',`name` varchar(255) NOT NULL COMMENT '数据库名称',`description` text COMMENT '数据库描述',`editor_id` bigint(20) NOT NULL COMMENT '编辑者ID',`created_at` bigint(20) NOT NULL COMMENT '创建时间戳(毫秒)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间戳(毫秒)',`is_dirty` tinyint(4) NOT NULL DEFAULT '1' COMMENT '是否有未保存的修改',PRIMARY KEY (`id`),KEY `idx_database_id` (`database_id`),KEY `idx_space_id` (`space_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库草稿信息表';
7.1.2 物理表结构存储表设计

physical_table_definition表:存储物理表的结构定义

CREATE TABLE `physical_table_definition` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '表定义ID',`database_id` bigint(20) NOT NULL COMMENT '所属数据库ID',`table_name` varchar(255) NOT NULL COMMENT '表名称',`table_comment` varchar(1000) DEFAULT NULL COMMENT '表描述',`is_draft` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否为草稿表',`created_at` bigint(20) NOT NULL COMMENT '创建时间戳(毫秒)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间戳(毫秒)',PRIMARY KEY (`id`),UNIQUE KEY `uk_database_table` (`database_id`,`table_name`,`is_draft`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='物理表定义表';

column_definition表:存储表的字段定义

CREATE TABLE `column_definition` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '字段定义ID',`table_id` bigint(20) NOT NULL COMMENT '所属表ID',`column_name` varchar(255) NOT NULL COMMENT '字段名称',`column_type` varchar(50) NOT NULL COMMENT '字段类型',`is_nullable` tinyint(4) NOT NULL DEFAULT '1' COMMENT '是否允许为空',`default_value` varchar(255) DEFAULT NULL COMMENT '默认值',`column_comment` varchar(1000) DEFAULT NULL COMMENT '字段描述',`is_primary_key` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否为主键',`position` int(11) NOT NULL COMMENT '字段顺序',`created_at` bigint(20) NOT NULL COMMENT '创建时间戳(毫秒)',`updated_at` bigint(20) NOT NULL COMMENT '更新时间戳(毫秒)',PRIMARY KEY (`id`),UNIQUE KEY `uk_table_column` (`table_id`,`column_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='列定义表';

7.2 索引与查询优化架构

为了支持高效的数据库编辑操作,系统设计了多层次的索引结构,包括主键索引、唯一索引和复合索引,以满足不同查询场景的性能需求。

7.2.1 索引设计原则
  • 唯一性保证:通过唯一索引确保数据库名称和表名在同一空间内的唯一性
  • 高效查询:针对常见查询场景(如按空间查询、按创建者查询)建立合适的索引
  • 更新性能:平衡查询性能和更新性能,避免过多的索引导致写入性能下降
  • 外键关联:确保表之间的外键关联建立合适的索引以提高连接查询性能
7.2.2 关键索引映射
// 数据库查询索引映射示例
var databaseIndexMappings = []IndexMapping{{ // 空间内数据库查询优化Table:        "online_database_info",IndexName:    "idx_space_id",Columns:      []string{"space_id"},UsagePattern: "空间内数据库列表查询",QueryExample: "SELECT * FROM online_database_info WHERE space_id = ? ORDER BY updated_at DESC",},{ // 表定义查询优化Table:        "physical_table_definition",IndexName:    "uk_database_table",Columns:      []string{"database_id", "table_name", "is_draft"},IsUnique:     true,UsagePattern: "快速定位特定数据库的特定表",QueryExample: "SELECT * FROM physical_table_definition WHERE database_id = ? AND table_name = ? AND is_draft = ?",},{ // 字段定义查询优化Table:        "column_definition",IndexName:    "idx_table_id",Columns:      []string{"table_id"},UsagePattern: "获取表的所有字段定义",QueryExample: "SELECT * FROM column_definition WHERE table_id = ? ORDER BY position",},
}
7.2.3 数据库内容专用索引

对于数据库编辑功能,系统还设计了特定的索引结构来支持复杂查询场景:

// 数据库编辑操作相关索引
const (// 索引名称常量定义IndexOnlineDatabaseSpaceAndUpdatedAt = "idx_online_db_space_updated"IndexDraftDatabaseEditorAndUpdatedAt = "idx_draft_db_editor_updated"IndexPhysicalTableDraftStatus        = "idx_physical_table_draft"
)// 索引使用示例
func getDatabaseEditHistory(ctx context.Context, databaseID int64, limit int) ([]DatabaseHistory, error) {var histories []DatabaseHistory// 使用索引查询数据库编辑历史err := db.WithContext(ctx).Table("database_edit_history").Where("database_id = ?", databaseID).Order("edit_time DESC").Limit(limit).Find(&histories).Errorreturn histories, err
}

7.3 物理表更新机制

数据库编辑功能的核心是支持物理表结构的动态更新,包括添加、修改和删除字段。系统采用了事务机制确保表结构更新的原子性和一致性。

7.3.1 字段更新处理流程

物理表字段更新的核心逻辑位于domain/memory/database/internal/physicaltable/physical.go文件中:

// UpdateFieldInfo 处理字段更新和删除逻辑
func (p *PhysicalTable) UpdateFieldInfo(ctx context.Context, fieldInfos []*model.FieldInfo) error {// 1. 参数验证if len(fieldInfos) == 0 {return nil}// 2. 查找现有字段existingFields, err := p.GetFieldInfo(ctx)if err != nil {return err}// 3. 构建字段映射关系existingFieldMap := make(map[string]*model.FieldInfo)for _, field := range existingFields {existingFieldMap[field.FieldName] = field}// 4. 确定需要新增、修改和删除的字段var addFields, updateFields []*model.FieldInfovar deleteFieldNames []string// 4.1 处理新增和修改for _, newField := range fieldInfos {if existingField, exists := existingFieldMap[newField.FieldName]; exists {// 字段存在,检查是否需要更新if !isFieldEqual(existingField, newField) {updateFields = append(updateFields, newField)}// 从映射中删除,剩余的即为需要删除的字段delete(existingFieldMap, newField.FieldName)} else {// 字段不存在,需要新增addFields = append(addFields, newField)}}// 4.2 确定需要删除的字段for fieldName := range existingFieldMap {deleteFieldNames = append(deleteFieldNames, fieldName)}// 5. 执行字段更新操作return p.updatePhysicalTableFields(ctx, addFields, updateFields, deleteFieldNames)
}// 比较字段是否相等
func isFieldEqual(field1, field2 *model.FieldInfo) bool {return field1.FieldType == field2.FieldType &&field1.IsPrimaryKey == field2.IsPrimaryKey &&field1.IsNullable == field2.IsNullable &&field1.DefaultValue == field2.DefaultValue &&field1.Comment == field2.Comment
}// 执行物理表字段更新操作
func (p *PhysicalTable) updatePhysicalTableFields(ctx context.Context, addFields, updateFields []*model.FieldInfo, deleteFieldNames []string) error {// 1. 开始事务tx := p.db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()// 2. 执行字段添加if err := p.addFields(ctx, tx, addFields); err != nil {tx.Rollback()return err}// 3. 执行字段更新if err := p.updateFields(ctx, tx, updateFields); err != nil {tx.Rollback()return err}// 4. 执行字段删除if err := p.deleteFields(ctx, tx, deleteFieldNames); err != nil {tx.Rollback()return err}// 5. 提交事务return tx.Commit().Error
}
7.3.2 物理表结构更新实现

物理表结构更新的核心实现位于domain/memory/database/internal/physicaltable/physical.go文件中的UpdatePhysicalTableWithDrops方法:

// UpdatePhysicalTableWithDrops 物理表结构更新实现
func (p *PhysicalTable) UpdatePhysicalTableWithDrops(ctx context.Context, tableName string, fieldInfos []*model.FieldInfo) error {// 1. 检查表是否存在exists, err := p.tableExists(ctx, tableName)if err != nil {return err}// 2. 如果表不存在,创建新表if !exists {return p.createTable(ctx, tableName, fieldInfos)}// 3. 获取表的当前结构currentFields, err := p.GetFieldInfo(ctx)if err != nil {return err}// 4. 执行字段更新逻辑return p.UpdateFieldInfo(ctx, fieldInfos)
}// 获取表的字段信息
func (p *PhysicalTable) GetFieldInfo(ctx context.Context) ([]*model.FieldInfo, error) {var fields []*model.FieldInfo// 查询列定义表获取字段信息err := p.db.WithContext(ctx).Table("column_definition").Where("table_id = ?", p.tableID).Order("position").Scan(&fields).Errorreturn fields, err
}// 检查表是否存在
func (p *PhysicalTable) tableExists(ctx context.Context, tableName string) (bool, error) {var count int64err := p.db.WithContext(ctx).Table("information_schema.tables").Where("table_schema = ? AND table_name = ?", p.dbName, tableName).Count(&count).Errorreturn count > 0, err
}// 创建新表
func (p *PhysicalTable) createTable(ctx context.Context, tableName string, fieldInfos []*model.FieldInfo) error {// 1. 构建CREATE TABLE SQLsql := p.buildCreateTableSQL(tableName, fieldInfos)// 2. 执行SQL创建表return p.db.WithContext(ctx).Exec(sql).Error
}// 构建CREATE TABLE SQL
func (p *PhysicalTable) buildCreateTableSQL(tableName string, fieldInfos []*model.FieldInfo) string {var sb strings.Buildersb.WriteString(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (\n", p.quoteIdentifier(tableName)))var columns []stringvar primaryKeys []stringfor _, field := range fieldInfos {colDef := p.buildColumnDefinition(field)columns = append(columns, colDef)if field.IsPrimaryKey {primaryKeys = append(primaryKeys, p.quoteIdentifier(field.FieldName))}}sb.WriteString(strings.Join(columns, ",\n"))if len(primaryKeys) > 0 {sb.WriteString(",\n  PRIMARY KEY (" + strings.Join(primaryKeys, ", ") + ")")}sb.WriteString("\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4")return sb.String()
}

7.4 事务管理与并发控制

数据库编辑功能采用严格的事务管理机制,确保多表操作的原子性,同时实现了并发编辑控制,防止数据冲突。

7.4.1 数据库编辑事务管理

domain/memory/database/service/database_impl.go文件中,UpdateDatabase方法实现了数据库编辑的事务管理:

// UpdateDatabase 实现数据库编辑的事务管理
func (s *DatabaseDomainService) UpdateDatabase(ctx context.Context, req *UpdateDatabaseRequest) (*UpdateDatabaseResponse, error) {// 1. 开始事务tx := s.db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()var response *UpdateDatabaseResponse// 2. 执行数据库编辑操作if req.IsDraft {// 处理草稿编辑response, err = s.updateDraftDatabase(ctx, tx, req)} else {// 处理在线数据库编辑response, err = s.updateOnlineDatabase(ctx, tx, req)}if err != nil {tx.Rollback()return nil, err}// 3. 提交事务if err := tx.Commit().Error; err != nil {return nil, fmt.Errorf("commit transaction failed: %w", err)}return response, nil
}// 更新草稿数据库
func (s *DatabaseDomainService) updateDraftDatabase(ctx context.Context, tx *gorm.DB, req *UpdateDatabaseRequest) (*UpdateDatabaseResponse, error) {// 1. 检查并发编辑冲突if err := s.checkConcurrentEdit(ctx, tx, req.DatabaseID, req.LastEditVersion); err != nil {return nil, err}// 2. 更新草稿数据库信息draftInfo := &DraftDatabaseInfo{DatabaseID: req.DatabaseID,SpaceID:    req.SpaceID,Name:       req.Name,Description: req.Description,EditorID:   req.EditorID,UpdatedAt:  time.Now().UnixMilli(),IsDirty:    true,}if err := tx.WithContext(ctx).Where("database_id = ?", req.DatabaseID).Assign(draftInfo).FirstOrCreate(&DraftDatabaseInfo{}).Error; err != nil {return nil, fmt.Errorf("update draft database info failed: %w", err)}// 3. 更新物理表结构if err := s.updatePhysicalTables(ctx, tx, req.DatabaseID, req.Tables, true); err != nil {return nil, fmt.Errorf("update physical tables failed: %w", err)}return &UpdateDatabaseResponse{DatabaseID: req.DatabaseID,DraftID:    draftInfo.ID,}, nil
}// 检查并发编辑冲突
func (s *DatabaseDomainService) checkConcurrentEdit(ctx context.Context, tx *gorm.DB, databaseID int64, lastEditVersion int64) error {var onlineDB OnlineDatabaseInfoif err := tx.WithContext(ctx).Where("id = ?", databaseID).First(&onlineDB).Error; err != nil {return err}if onlineDB.Version > lastEditVersion {return errorx.New(errno.ErrDatabaseConcurrentEditCode, errorx.KV("msg", "数据库已被其他人修改,请重新加载后再编辑"),errorx.KV("database_id", databaseID),errorx.KV("current_version", onlineDB.Version),errorx.KV("last_edit_version", lastEditVersion))}return nil
}
7.4.2 在线/草稿表同步更新

数据库编辑功能实现了在线表和草稿表的同步更新机制,确保表结构的一致性:

// 更新物理表结构
func (s *DatabaseDomainService) updatePhysicalTables(ctx context.Context, tx *gorm.DB, databaseID int64, tables []*model.TableInfo, isDraft bool) error {for _, table := range tables {// 1. 获取或创建表定义var tableDef PhysicalTableDefinitionif err := tx.WithContext(ctx).Where("database_id = ? AND table_name = ? AND is_draft = ?", databaseID, table.TableName, isDraft,).FirstOrCreate(&tableDef, PhysicalTableDefinition{DatabaseID:  databaseID,TableName:   table.TableName,TableComment: table.Comment,IsDraft:     isDraft,}).Error; err != nil {return err}// 2. 创建PhysicalTable实例physicalTable := &PhysicalTable{db:        tx,dbName:    s.getDatabaseName(databaseID),tableID:   tableDef.ID,tableName: table.TableName,}// 3. 执行字段更新if err := physicalTable.UpdatePhysicalTableWithDrops(ctx, table.TableName, table.Fields); err != nil {return err}}return nil
}

7.5 数据迁移与备份

为了保障数据安全,系统实现了完善的数据迁移和备份机制,支持数据库编辑前后的数据结构变化追踪和回滚。

7.5.1 编辑历史记录

系统记录每次数据库编辑的详细历史,以便追踪变更和实现回滚功能:

// 记录数据库编辑历史
func (s *DatabaseDomainService) recordEditHistory(ctx context.Context, tx *gorm.DB, req *UpdateDatabaseRequest, changes []*model.SchemaChange) error {history := &DatabaseEditHistory{DatabaseID:   req.DatabaseID,EditorID:     req.EditorID,EditTime:     time.Now().UnixMilli(),SchemaChanges: json.Marshal(changes),Version:      req.LastEditVersion + 1,}return tx.WithContext(ctx).Create(history).Error
}// Schema变更记录结构
func generateSchemaChanges(oldTables, newTables []*model.TableInfo) []*model.SchemaChange {var changes []*model.SchemaChange// 1. 找出新增的表for _, newTable := range newTables {if !tableExistsIn(newTable.TableName, oldTables) {changes = append(changes, &model.SchemaChange{ChangeType:   model.ChangeTypeCreateTable,TableName:    newTable.TableName,TableComment: newTable.Comment,Fields:       newTable.Fields,})}}// 2. 比较现有表的结构变化for _, newTable := range newTables {if oldTable := findTableByName(newTable.TableName, oldTables); oldTable != nil {fieldChanges := compareFieldChanges(oldTable.Fields, newTable.Fields)if len(fieldChanges) > 0 {changes = append(changes, &model.SchemaChange{ChangeType:   model.ChangeTypeAlterTable,TableName:    newTable.TableName,TableComment: newTable.Comment,FieldChanges: fieldChanges,})}}}// 3. 找出删除的表for _, oldTable := range oldTables {if !tableExistsIn(oldTable.TableName, newTables) {changes = append(changes, &model.SchemaChange{ChangeType: model.ChangeTypeDropTable,TableName:  oldTable.TableName,})}}return changes
}
7.5.2 数据结构备份机制

在执行数据库结构变更前,系统会自动创建数据结构备份,以便在需要时进行回滚:

// 创建数据库结构备份
func (s *DatabaseDomainService) createStructureBackup(ctx context.Context, tx *gorm.DB, databaseID int64, operation string) error {// 1. 获取当前数据库结构tables, err := s.getDatabaseStructure(ctx, databaseID)if err != nil {return err}// 2. 创建备份记录backup := &DatabaseStructureBackup{DatabaseID:   databaseID,Operation:    operation,BackupTime:   time.Now().UnixMilli(),Structure:    json.Marshal(tables),Version:      s.getCurrentVersion(ctx, tx, databaseID),}return tx.WithContext(ctx).Create(backup).Error
}// 从备份恢复数据库结构
func (s *DatabaseDomainService) restoreFromBackup(ctx context.Context, databaseID int64, backupID int64) error {// 1. 获取备份记录var backup DatabaseStructureBackupif err := s.db.WithContext(ctx).Where("id = ? AND database_id = ?", backupID, databaseID).First(&backup).Error; err != nil {return err}// 2. 解析备份的结构信息var tables []*model.TableInfoif err := json.Unmarshal(backup.Structure, &tables); err != nil {return err}// 3. 执行结构恢复tx := s.db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()// 3.1 恢复表结构if err := s.updatePhysicalTables(ctx, tx, databaseID, tables, false); err != nil {tx.Rollback()return err}// 3.2 更新版本号if err := tx.WithContext(ctx).Model(&OnlineDatabaseInfo{}).Where("id = ?", databaseID).Update("version", backup.Version).Error; err != nil {tx.Rollback()return err}return tx.Commit().Error
}
http://www.dtcms.com/a/457520.html

相关文章:

  • Python学习之Day07-08学习(Django网页Web开发)
  • STM32之IWDG-独立看门狗
  • Linux 系统编程:(一)从历史演进到 XShell 远程登录实操
  • 基于cherryusb自制daplink,并对stm32u575进行烧录过程,daplink的执行流进行trace分析
  • 洛阳瀍河建设局网站2021年10月新闻摘抄
  • 学习Java第三十四天——黑马点评48~60
  • 全功能按键非阻塞式实现
  • 学做网站的视频南京谷歌推广
  • iptables
  • STM32+8266+小程序智能家居【小白实战项目】
  • 如何部署一个Java项目
  • 联想乐享赋能笔记本选购新体验:智能解析五大系列,精准匹配用户需求
  • 西安网站设计报价怎样创建网站和网页
  • Go中使用反射的动态方法调用
  • 泰安市住房和城乡建设部网站哪个网站diy做宝宝衣服
  • springboot+vue心理咨询服务小程序(源码+文档+调试+基础修改+答疑)
  • 优秀电商网站设计上海网站建设管理系统
  • 速通JavaWeb1
  • 【开题答辩全过程】以 vue基于SSM框架的高考志愿填报辅助系统设计与实现为例,包含答辩的问题和答案
  • linux网站建设论文logo免费设计图案
  • 怎么把网站源码扒下来wordpress缓存方案
  • 整体设计 逻辑系统程序 之17 Source 容器(Docker)承载 C/P/D 三式的完整设计与双闭环验证 之1
  • 汕头网站设计哪家好网页设计图片与文字对齐左对齐
  • GIS PAD平板手机移动端地理信息系统
  • kafka解决了什么问题?mmap 和sendfile
  • 做网站一定要买免费ppt制作
  • c++ 是静态编译语言
  • 寻找哈尔滨网站建设淘宝客网站的建设
  • 打造机器人行业的「安卓」,Meta的野心能否照进现实?
  • GW级智算中心:开启中国AI算力新纪元