当前位置: 首页 > news >正文

[手写系列]Go手写db — — 第七版(实现Disk存储引擎、Docker化支持)

[手写系列]Go手写db — — 第七版(实现Disk存储引擎、Docker化支持)

第一版文章:[手写系列]Go手写db — — 完整教程_go手写数据库-CSDN博客
第二版文章:[手写系列]Go手写db — — 第二版-CSDN博客
第三版文章:[手写系列]Go手写db — — 第三版(实现分组、排序、聚合函数等)-CSDN博客
第四版文章:[手写系列]Go手写db — — 第四版(实现事务、网络模块)
第五版文章:[手写系列]Go手写db — — 第五版(实现数据库操作模块)
第六版文章:[手写系列]Go手写db — — 第六版(实现表连接)

  • 整体项目Github地址:https://github.com/ziyifast/ZiyiDB
  • 请大家多多支持,也欢迎大家star⭐️和共同维护这个项目~

本文主要介绍如何在 ZiyiDB 第六版的基础上,实现持久化存储引擎,将数据保存到磁盘,使数据库在重启后依然能够保留数据,同时支持Docker方式部署。

🚀 同时ZiyiDB的核心功能,至此也基本完成,感谢大家一直以来的支持~

前置

本文会涉及存储引擎与本地磁盘文件的关系,Disk存储引擎底层参考MySQL8.0版本,大家感兴趣的也可以去了解MySQL的存储系统发展。

MySQL底层存储系统发展之路-CSDN博客

一、功能列表

  1. 实现其他存储引擎,与本地磁盘交互,实现数据持久化。
  2. 实现配置文件,配置程序默认server端端口、默认选择的存储引擎等
  3. 打包到docker

实现Disk存储引擎,与本地文件进行交互。

-- 通过下面SQL可查看MySQL支持的存储引擎
show engines;

在这里插入图片描述

二、实现细节

功能点一:实现磁盘存储引擎

实现思路

1.新增持久化存储引擎结构

  • 新增internal/storage/engine.go文件,抽取Engine接口,定义存储引擎必要接口
    在这里插入图片描述
  • 新增internal/storage/disk.go文件,创建 DiskBackend 结构体,实现 Engine 接口,与现有的 MemoryBackend 并行存在,保证ZiyiDB支持多个引擎,实现类MySQL多引擎能力。
    在这里插入图片描述
  • 新增internal/storage/base.go文件,抽象baseEngine,因为MemoryBackend和DiskBackend解析SQL、调用函数、拼装结果等步骤是类似的,只是对结果的保存方式不同。因此我们可以抽取一个baseEngine,复用通用的逻辑。
    在这里插入图片描述2.数据组织结构设计

参考MySQL最新的InnoDB引擎设计,采用数据字典存储表元数据相关信息,每个表的数据单独存储为.ibd文件。这里的数据字典为了简单方便,我直接采取.json方式统一存储表的名称、表的列名等元数据信息,实际MySQL将其分散在多张表中存储。
详情可参考:MySQL底层存储系统发展之路-CSDN博客

  • 使用 data_dictionary.json 文件存储所有数据库和表的元数据
  • 每个数据库对应一个目录
  • 每个表对应一个 .ibd 文件,存储实际数据行

首先internal/storage/disk.go文件新增数据字典相关结构:
在这里插入图片描述
然后在disk.go中实现对数据字典的维护,比如加载和保存等,底层逻辑就是本地磁盘创建文件、读取、更新文件。
在这里插入图片描述
最后针对每个数据库下的表创建对应的.ibd文件,用于存储每个表中的数据,同时表数据更新时也对应更新.ibd文件。

MySQL底层.ibd实际是采用二进制方式存储,且做过压缩。ZiyiDB的.ibd文件这里为了简单也采用json方式存储。

在这里插入图片描述
3. 实现核心操作

  • CreateDatabase: 创建数据库目录并更新数据字典

每个数据库会在本地磁盘创建一个目录

在这里插入图片描述

  • CreateTable: 创建表结构并保存到数据字典和.ibd文件
    在这里插入图片描述
  • Insert/Update/Delete: 修改数据后保存到.ibd文件

以更新数据为例:加载表 - 更新数据到内存 - 内存数据保存到.ibd文件中(更新.ibd文件)

在这里插入图片描述
在这里插入图片描述

  • Select: 从.ibd文件加载数据后进行查询

读取.ibd文件中的表数据,加载到内存,然后select逻辑与之前一样。

在这里插入图片描述
在这里插入图片描述

  • loadTable/saveTable: 核心的加载和保存表数据的方法

loadTable:从磁盘加载表数据到内存

在这里插入图片描述

saveTable:将内存中的数据保存更新到本地磁盘中
在这里插入图片描述
4. 事务支持

与Memory存储引擎相比,Disk引擎主要新增了Cache层,存储事务内提交的变动,因为Disk引擎的数据主要来源于磁盘,但如果事务未提交我们先不写磁盘,先写入缓存,提交后再刷新缓存表数据到磁盘中。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

代码实现

因为代码行数比较多,本次主要展示核心代码

1. 存储引擎接口定义

internal/storage/engine.go:

// internal/storage/engine.go
package storageimport ("ziyi.db.com/internal/ast""ziyi.db.com/internal/context"
)// Engine 定义存储引擎接口
type Engine interface {// 数据库操作CreateDatabase(stmt *ast.CreateDatabaseStatement) errorDropDatabase(stmt *ast.DropDatabaseStatement) errorUseDatabase(stmt *ast.UseDatabaseStatement, connCtx context.DBContext) errorShowDatabases() *ResultsShowTables(connCtx context.DBContext) *Results// 表操作CreateTable(databaseName string, stmt *ast.CreateTableStatement) errorDropTable(databaseName string, stmt *ast.DropTableStatement) error// 数据操作Insert(databaseName string, stmt *ast.InsertStatement, txn *Transaction) errorSelect(databaseName string, stmt *ast.SelectStatement, txn *Transaction) (*Results, error)Update(databaseName string, stmt *ast.UpdateStatement, txn *Transaction) errorDelete(databaseName string, stmt *ast.DeleteStatement, txn *Transaction) error// 事务支持BeginTransaction() *Transaction// 提交事务CommitTransaction(txn *Transaction) error// 回滚事务RollbackTransaction(txn *Transaction) error
}
2. disk存储引擎实现

internal/storage/disk.go:

// internal/storage/disk.go
package storageimport ("encoding/json""fmt""os""path/filepath""sync""time""ziyi.db.com/internal/ast""ziyi.db.com/internal/context"
)type DiskBackend struct {dataPath stringmu       sync.RWMutextxnMgr   *TransactionManagerbase     *BaseEngine// 数据字典,存储所有表的元数据。参考MySQL8.0设计dataDictionary map[string]map[string]*TableMetadatadictMutex      sync.RWMutex// 事务表缓存,存储事务期间的表修改// key: txnID, value: map[tableName]*TabletxnTableCache map[uint64]map[string]*TablecacheMutex    sync.RWMutex
}// TableMetadata 表示表的元数据(替代.frm文件)
type TableMetadata struct {Name    stringColumns []ast.ColumnDefinitionIndexes map[string]*IndexMetadata
}// IndexMetadata 索引元数据
type IndexMetadata struct {Column stringType   string // "PRIMARY", "UNIQUE", "INDEX"
}func NewDiskBackend(dataPath string) *DiskBackend {// 确保数据目录存在if err := os.MkdirAll(dataPath, 0755); err != nil {panic(fmt.Sprintf("无法创建数据目录: %v", err))}backend := &DiskBackend{dataPath:       dataPath,txnMgr:         NewTransactionManager(),base:           &BaseEngine{},dataDictionary: make(map[string]map[string]*TableMetadata),txnTableCache:  make(map[uint64]map[string]*Table),}// 初始化数据字典backend.initDataDictionary()return backend
}// initDataDictionary 初始化数据字典
func (d *DiskBackend) initDataDictionary() {dictPath := filepath.Join(d.dataPath, "data_dictionary.json")// 尝试加载现有的数据字典if file, err := os.Open(dictPath); err == nil {defer file.Close()var dict map[string]map[string]*TableMetadataif err := json.NewDecoder(file).Decode(&dict); err == nil {d.dataDictionary = dictreturn}}// 如果没有现有字典,创建新的d.dataDictionary = make(map[string]map[string]*TableMetadata)
}// saveDataDictionary 保存数据字典到磁盘
func (d *DiskBackend) saveDataDictionary() error {dictPath := filepath.Join(d.dataPath, "data_dictionary.json")tempPath := dictPath + ".tmp"file, err := os.Create(tempPath)if err != nil {return err}if err := json.NewEncoder(file).Encode(d.dataDictionary); err != nil {file.Close()os.Remove(tempPath)return err}if err := file.Close(); err != nil {os.Remove(tempPath)return err}return os.Rename(tempPath, dictPath)
}// getDatabasePath 获取数据库路径
func (d *DiskBackend) getDatabasePath(dbName string) string {return filepath.Join(d.dataPath, dbName)
}// getTableDataPath 获取表数据文件路径 (.ibd)
func (d *DiskBackend) getTableDataPath(dbName, tableName string) string {return filepath.Join(d.getDatabasePath(dbName), tableName+".ibd")
}// loadTable 加载表(从.ibd文件和数据字典)
func (d *DiskBackend) loadTable(dbName, tableName string, txn *Transaction) (*Table, error) {// 如果在事务中,首先检查事务缓存if txn != nil {d.cacheMutex.RLock()if tables, exists := d.txnTableCache[txn.ID]; exists {if table, exists := tables[tableName]; exists {d.cacheMutex.RUnlock()// 直接返回缓存中的表引用,而不是创建副本// 这样可以确保在事务中看到最新的修改return table, nil}}d.cacheMutex.RUnlock()}d.dictMutex.RLock()// 从数据字典获取表结构dbTables, dbExists := d.dataDictionary[dbName]if !dbExists {d.dictMutex.RUnlock()return nil, fmt.Errorf("database '%s' not exist", dbName)}metadata, tableExists := dbTables[tableName]if !tableExists {d.dictMutex.RUnlock()return nil, fmt.Errorf("table '%s' not exist", tableName)}d.dictMutex.RUnlock()// 创建表结构table := &Table{Name:     metadata.Name,Columns:  metadata.Columns,Indexes:  make(map[string]*Index),RowLocks: make(map[int]*sync.RWMutex),Rows:     make([][]VersionedCell, 0),}// 恢复索引结构for name, idxMeta := range metadata.Indexes {table.Indexes[name] = &Index{Column: idxMeta.Column,Values: make(map[string][]int),}}// 加载表数据dataPath := d.getTableDataPath(dbName, tableName)if dataFile, err := os.Open(dataPath); err == nil {defer dataFile.Close()var tableData struct {Rows [][]VersionedCell `json:"rows"`}if err := json.NewDecoder(dataFile).Decode(&tableData); err == nil {table.Rows = tableData.Rows}}// 重建索引数据d.rebuildIndexes(table)// 如果在事务中,将加载的表放入缓存if txn != nil {d.cacheMutex.Lock()if d.txnTableCache[txn.ID] == nil {d.txnTableCache[txn.ID] = make(map[string]*Table)}// 存储表的引用到缓存中d.txnTableCache[txn.ID][tableName] = tabled.cacheMutex.Unlock()}return table, nil
}// saveTable 保存表(保存到.ibd文件,元数据保存在数据字典中)
func (d *DiskBackend) saveTable(dbName, tableName string, table *Table) error {dbPath := d.getDatabasePath(dbName)// 确保数据库目录存在if err := os.MkdirAll(dbPath, 0755); err != nil {return err}// 更新数据字典中的表元数据d.dictMutex.Lock()if d.dataDictionary[dbName] == nil {d.dataDictionary[dbName] = make(map[string]*TableMetadata)}// 创建表元数据metadata := &TableMetadata{Name:    table.Name,Columns: table.Columns,Indexes: make(map[string]*IndexMetadata),}// 保存索引元数据for name, idx := range table.Indexes {indexType := "INDEX"for _, col := range table.Columns {if col.Name == idx.Column && col.Primary {indexType = "PRIMARY"break}}metadata.Indexes[name] = &IndexMetadata{Column: idx.Column,Type:   indexType,}}d.dataDictionary[dbName][tableName] = metadata// 保存数据字典if err := d.saveDataDictionary(); err != nil {d.dictMutex.Unlock()return err}d.dictMutex.Unlock()// 保存表数据到.ibd文件dataPath := d.getTableDataPath(dbName, tableName)tempPath := dataPath + ".tmp"dataFile, err := os.Create(tempPath)if err != nil {return err}// 只保存数据行tableData := struct {Rows [][]VersionedCell `json:"rows"`}{Rows: table.Rows,}if err := json.NewEncoder(dataFile).Encode(tableData); err != nil {dataFile.Close()os.Remove(tempPath)return err}if err := dataFile.Close(); err != nil {os.Remove(tempPath)return err}// 原子性替换表数据文件return os.Rename(tempPath, dataPath)
}// rebuildIndexes 重建索引
func (d *DiskBackend) rebuildIndexes(table *Table) {// 清空现有索引值for _, index := range table.Indexes {index.Values = make(map[string][]int)}// 重建索引数据for i, row := range table.Rows {for j, col := range table.Columns {if col.Primary && j < len(row) {key := row[j].Data.String()if index, exists := table.Indexes[col.Name]; exists {index.Values[key] = append(index.Values[key], i)}}}}
}// 实现 Engine 接口的方法
func (d *DiskBackend) CreateDatabase(stmt *ast.CreateDatabaseStatement) error {d.mu.Lock()defer d.mu.Unlock()dbPath := d.getDatabasePath(stmt.Name)// 检查数据库是否已存在if _, err := os.Stat(dbPath); !os.IsNotExist(err) {return fmt.Errorf("数据库 '%s' 已存在", stmt.Name)}// 创建数据库目录if err := os.MkdirAll(dbPath, 0755); err != nil {return err}// 更新数据字典d.dictMutex.Lock()d.dataDictionary[stmt.Name] = make(map[string]*TableMetadata)err := d.saveDataDictionary()d.dictMutex.Unlock()return err
}func (d *DiskBackend) DropDatabase(stmt *ast.DropDatabaseStatement) error {d.mu.Lock()defer d.mu.Unlock()dbPath := d.getDatabasePath(stmt.Name)// 检查数据库是否存在if _, err := os.Stat(dbPath); os.IsNotExist(err) {return fmt.Errorf("database '%s' does not exist", stmt.Name)}// 删除数据库目录if err := os.RemoveAll(dbPath); err != nil {return err}// 更新数据字典d.dictMutex.Lock()delete(d.dataDictionary, stmt.Name)err := d.saveDataDictionary()d.dictMutex.Unlock()return err
}func (d *DiskBackend) UseDatabase(stmt *ast.UseDatabaseStatement, connCtx context.DBContext) error {d.mu.RLock()defer d.mu.RUnlock()// 检查数据库是否存在(通过数据字典)d.dictMutex.RLock()_, exists := d.dataDictionary[stmt.Name]d.dictMutex.RUnlock()if !exists {return fmt.Errorf("database '%s' does not exist", stmt.Name)}// 更新连接上下文中的当前数据库connCtx.SetDBName(stmt.Name)return nil
}func (d *DiskBackend) ShowDatabases() *Results {d.mu.RLock()defer d.mu.RUnlock()results := &Results{Columns: []ResultColumn{{Name: "Database", Type: "TEXT"},},Rows: make([][]Cell, 0),}d.dictMutex.RLock()for dbName := range d.dataDictionary {results.Rows = append(results.Rows, []Cell{{Type: CellTypeText, TextValue: dbName},})}d.dictMutex.RUnlock()return results
}func (d *DiskBackend) ShowTables(connCtx context.DBContext) *Results {d.mu.RLock()defer d.mu.RUnlock()results := &Results{Columns: []ResultColumn{{Name: "Tables", Type: "TEXT"},},Rows: make([][]Cell, 0),}dbName := connCtx.GetDBName()if dbName == "" {return results}d.dictMutex.RLock()if dbTables, exists := d.dataDictionary[dbName]; exists {for tableName := range dbTables {results.Rows = append(results.Rows, []Cell{{Type: CellTypeText, TextValue: tableName},})}}d.dictMutex.RUnlock()return results
}// CreateTable 创建表
func (d *DiskBackend) CreateTable(databaseName string, stmt *ast.CreateTableStatement) error {d.mu.Lock()defer d.mu.Unlock()// 检查数据库是否存在d.dictMutex.RLock()_, dbExists := d.dataDictionary[databaseName]d.dictMutex.RUnlock()if !dbExists {return fmt.Errorf("database '%s' does not exist", databaseName)}// 检查表是否已存在d.dictMutex.RLock()_, tableExists := d.dataDictionary[databaseName][stmt.TableName]d.dictMutex.RUnlock()if tableExists {return fmt.Errorf("table '%s' 已存在", stmt.TableName)}// 创建新表table := &Table{Name:     stmt.TableName,Columns:  stmt.Columns,Rows:     make([][]VersionedCell, 0),Indexes:  make(map[string]*Index),RowLocks: make(map[int]*sync.RWMutex),}// 为主键创建索引for _, col := range stmt.Columns {if col.Primary {table.Indexes[col.Name] = &Index{Column: col.Name,Values: make(map[string][]int),}}}// 保存表到磁盘return d.saveTable(databaseName, stmt.TableName, table)
}func (d *DiskBackend) DropTable(databaseName string, stmt *ast.DropTableStatement) error {d.mu.Lock()defer d.mu.Unlock()// 检查数据库是否存在d.dictMutex.RLock()_, dbExists := d.dataDictionary[databaseName]d.dictMutex.RUnlock()if !dbExists {return fmt.Errorf("database '%s' does not exist", databaseName)}// 检查表是否存在d.dictMutex.RLock()_, tableExists := d.dataDictionary[databaseName][stmt.TableName]d.dictMutex.RUnlock()if !tableExists {return fmt.Errorf("table '%s' does not exist", stmt.TableName)}// 删除表数据文件dataPath := d.getTableDataPath(databaseName, stmt.TableName)if err := os.Remove(dataPath); err != nil && !os.IsNotExist(err) {return fmt.Errorf("can not delete table data '%s': %v", stmt.TableName, err)}// 从数据字典中移除表d.dictMutex.Lock()delete(d.dataDictionary[databaseName], stmt.TableName)err := d.saveDataDictionary()d.dictMutex.Unlock()return err
}// Select 查询数据
func (d *DiskBackend) Select(databaseName string, stmt *ast.SelectStatement, txn *Transaction) (*Results, error) {d.mu.RLock()defer d.mu.RUnlock()// 加载表(支持事务缓存)table, err := d.loadTable(databaseName, stmt.TableName, txn)if err != nil {return nil, err}// 使用通用逻辑处理查询return d.base.Select(table, stmt, txn, d.getVisibleRow, d.base.evaluateWhereCondition)
}// Insert 插入数据
func (d *DiskBackend) Insert(databaseName string, stmt *ast.InsertStatement, txn *Transaction) error {d.mu.Lock()defer d.mu.Unlock()// 加载表(支持事务缓存)table, err := d.loadTable(databaseName, stmt.TableName, txn)if err != nil {return err}// 使用通用逻辑处理插入err = d.base.Insert(table, stmt, txn, d.getVisibleRow, d.base.convertToCell, d.base.evaluateExpression)if err != nil {return err}// 在非事务模式下直接保存表,否则更新事务缓存if txn == nil {return d.saveTable(databaseName, stmt.TableName, table)} else {// 更新事务缓存中的表d.cacheMutex.Lock()if d.txnTableCache[txn.ID] == nil {d.txnTableCache[txn.ID] = make(map[string]*Table)}d.txnTableCache[txn.ID][stmt.TableName] = tabled.cacheMutex.Unlock()}return nil
}// Update 更新数据
func (d *DiskBackend) Update(databaseName string, stmt *ast.UpdateStatement, txn *Transaction) error {d.mu.Lock()defer d.mu.Unlock()// 加载表(支持事务缓存)table, err := d.loadTable(databaseName, stmt.TableName, txn)if err != nil {return err}// 使用通用逻辑处理更新err = d.base.Update(table, stmt, txn, d.getVisibleRow, d.base.convertToCell,d.base.evaluateExpression, d.base.evaluateWhereCondition)if err != nil {return err}// 在非事务模式下直接保存表,否则更新事务缓存if txn == nil {return d.saveTable(databaseName, stmt.TableName, table)} else {// 更新事务缓存中的表d.cacheMutex.Lock()if d.txnTableCache[txn.ID] == nil {d.txnTableCache[txn.ID] = make(map[string]*Table)}d.txnTableCache[txn.ID][stmt.TableName] = tabled.cacheMutex.Unlock()}return nil
}// Delete 删除数据
func (d *DiskBackend) Delete(databaseName string, stmt *ast.DeleteStatement, txn *Transaction) error {d.mu.Lock()defer d.mu.Unlock()// 加载表(支持事务缓存)table, err := d.loadTable(databaseName, stmt.TableName, txn)if err != nil {return err}// 使用通用逻辑处理删除err = d.base.Delete(table, stmt, txn, d.getVisibleRow, d.base.evaluateWhereCondition)if err != nil {return err}// 在非事务模式下直接保存表,否则更新事务缓存if txn == nil {return d.saveTable(databaseName, stmt.TableName, table)} else {// 更新事务缓存中的表d.cacheMutex.Lock()if d.txnTableCache[txn.ID] == nil {d.txnTableCache[txn.ID] = make(map[string]*Table)}d.txnTableCache[txn.ID][stmt.TableName] = tabled.cacheMutex.Unlock()}return nil
}// updateIndexesAfterInsert 在插入后更新索引
func (d *DiskBackend) updateIndexesAfterInsert(table *Table, oldRowCount int) {// 只需要更新新插入行的索引for i := oldRowCount; i < len(table.Rows); i++ {row := table.Rows[i]for _, col := range table.Columns {if col.Primary && i < len(row) {key := row[i].Data.String()if index, exists := table.Indexes[col.Name]; exists {index.Values[key] = append(index.Values[key], i)}}}}
}// BeginTransaction 开启事务
func (d *DiskBackend) BeginTransaction() *Transaction {return d.txnMgr.BeginTransaction(d)
}// CommitTransaction 提交事务中的更改
func (d *DiskBackend) CommitTransaction(txn *Transaction) error {d.mu.Lock()defer d.mu.Unlock()// 首先调用事务管理器的提交方法if err := d.txnMgr.CommitTransaction(txn); err != nil {return err}// 获取事务缓存中的表d.cacheMutex.RLock()tables, cacheExists := d.txnTableCache[txn.ID]d.cacheMutex.RUnlock()if cacheExists {// 遍历事务缓存中的所有表,将修改持久化到磁盘for tableName, table := range tables {// 查找表所在的数据库var dbName stringvar found boold.dictMutex.RLock()for db, dbTables := range d.dataDictionary {if _, exists := dbTables[tableName]; exists {dbName = dbfound = truebreak}}d.dictMutex.RUnlock()if !found {continue}// 更新表中所有版本化单元格的状态for i := range table.Rows {for j := range table.Rows[i] {if table.Rows[i][j].TxnID == txn.ID {table.Rows[i][j].Committed = truetable.Rows[i][j].Timestamp = time.Now()}}}// 重建索引以确保一致性d.rebuildIndexes(table)// 保存表到磁盘if err := d.saveTable(dbName, tableName, table); err != nil {return err}}}// 清理事务缓存d.cacheMutex.Lock()delete(d.txnTableCache, txn.ID)d.cacheMutex.Unlock()// 保存数据字典return d.saveDataDictionary()
}// RollbackTransaction 回滚事务
func (d *DiskBackend) RollbackTransaction(txn *Transaction) error {// 先执行事务管理器的回滚逻辑(更新事务状态等)if err := d.txnMgr.RollbackTransaction(txn); err != nil {return err}// 清理事务缓存(回滚时直接丢弃所有未提交的修改)d.cacheMutex.Lock()delete(d.txnTableCache, txn.ID)d.cacheMutex.Unlock()// 保存数据字典(确保元数据一致性)return d.saveDataDictionary()
}// getVisibleRow 获取可见行版本
func (d *DiskBackend) getVisibleRow(versionedRow []VersionedCell, txn *Transaction) []Cell {if len(versionedRow) == 0 {return nil}if txn == nil {// 非事务查询,返回最新提交的版本for i := len(versionedRow) - 1; i >= 0; i-- {if versionedRow[i].Committed {row := make([]Cell, len(versionedRow))for j, v := range versionedRow {row[j] = v.Data}return row}}return nil}// 事务查询,根据读已提交隔离级别规则// 查找对当前事务可见的最新版本for i := len(versionedRow) - 1; i >= 0; i-- {version := versionedRow[i]// 如果是当前事务自己的修改,可见(即使未提交)if version.TxnID == txn.ID {row := make([]Cell, len(versionedRow))for j, v := range versionedRow {row[j] = v.Data}return row}// 对于读已提交隔离级别,只能看到已提交的数据if version.Committed {row := make([]Cell, len(versionedRow))for j, v := range versionedRow {row[j] = v.Data}return row}}return nil
}

测试

测试命令:

-- 创建数据库test
create database test;
use test;
create table users (id INT PRIMARY KEY,name text,age INT);
insert into users values (1, 'Tom', 20);
select * from users;
-- 重启数据库,然后观察数据库及表中数据是否存在
use test;
select * from users;

效果:

可以看到我们插入数据,重启数据库之后依然可以看到之前的数据,并且本地生成了对应的持久化文件,整体符合预期。

在这里插入图片描述

在这里插入图片描述

功能点二:配置文件支持

MySQL中可通过my.cnf等文件进行系统配置,我们也可参考MySQL,提供一个配置文件用于配置服务默认端口、默认存储引擎等信息。

在这里插入图片描述

实现思路

只需在程序启动时,读取本地的配置文件即可。然后判断程序监听的端口和默认初始化的存储引擎

1.新建config/config.go文件:
在这里插入图片描述
2. main.go中读取配置文件:
在这里插入图片描述

代码实现

config/config.go:

// config/config.go
package configimport ("encoding/json""os"
)type Config struct {Storage struct {Type     string `json:"type"`     // "memory" 或 "disk"DataPath string `json:"dataPath"` // 磁盘存储的数据路径} `json:"storage"`Server struct {Port string `json:"port"`} `json:"server"`
}func LoadConfig(configPath string) (*Config, error) {file, err := os.Open(configPath)if err != nil {return nil, err}defer file.Close()var config Configif err := json.NewDecoder(file).Decode(&config); err != nil {return nil, err}return &config, nil
}

cmd/main.go:

// cmd/main.go
package mainimport ("flag""fmt""github.com/c-bata/go-prompt""log""os""strings""ziyi.db.com/config""ziyi.db.com/internal/ast""ziyi.db.com/internal/lexer""ziyi.db.com/internal/parser""ziyi.db.com/internal/storage""ziyi.db.com/network"
)var history []string                // 存储命令历史
var backend storage.Engine          // 存储引擎实例
var currentTxn *storage.Transaction // 当前事务
var historyIndex int                // 当前历史记录索引
var currentDatabase string          // 当前用户选择的数据库type dbContextAdapter struct {dbName *string
}func (d *dbContextAdapter) GetDBName() string {return *d.dbName
}func (d *dbContextAdapter) SetDBName(dbName string) {*d.dbName = dbName
}func executor(t string) {// 分割多个SQL语句(用分号分隔)statements := strings.Split(t, ";")for _, stmt := range statements {stmt = strings.TrimSpace(stmt)if stmt == "" {continue}// 添加到历史记录history = append(history, stmt)historyIndex = len(history) // 重置历史记录索引// 处理退出命令if strings.ToLower(stmt) == "exit" {fmt.Println("Bye!")os.Exit(0)}// 处理事务相关命令if strings.HasPrefix(strings.ToLower(stmt), "begin") {currentTxn = backend.BeginTransaction()fmt.Printf("Transaction %d started\n", currentTxn.ID)continue}if strings.HasPrefix(strings.ToLower(stmt), "commit") {if currentTxn == nil {fmt.Println("Error: No active transaction")continue}if err := backend.CommitTransaction(currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Printf("Transaction %d committed\n", currentTxn.ID)}currentTxn = nilcontinue}if strings.HasPrefix(strings.ToLower(stmt), "rollback") {if currentTxn == nil {fmt.Println("Error: No active transaction")continue}if err := backend.RollbackTransaction(currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Printf("Transaction %d rolled back\n", currentTxn.ID)}currentTxn = nilcontinue}// 创建词法分析器l := lexer.NewLexer(strings.NewReader(stmt))// 创建语法分析器p := parser.NewParser(l)// 解析SQL语句parsedStmt, err := p.ParseProgram()if err != nil {fmt.Printf("Parse error: %v\n", err)continue}// 执行SQL语句for _, statement := range parsedStmt.Statements {if currentDatabase == "" {// 检查是否是非数据库操作语句_, isCreateDB := statement.(*ast.CreateDatabaseStatement)_, isShowDBs := statement.(*ast.ShowDatabasesStatement)_, isDropDB := statement.(*ast.DropDatabaseStatement)_, isUseDB := statement.(*ast.UseDatabaseStatement)_, isShowTables := statement.(*ast.ShowTablesStatement)// 如果不是允许的语句类型,则提示需要选择数据库if !isCreateDB && !isShowDBs && !isDropDB && !isUseDB && !isShowTables {fmt.Println("No database selected. Use 'USE database_name' to select a database.")continue}}switch s := statement.(type) {case *ast.CreateDatabaseStatement:if err := backend.CreateDatabase(s); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Database created successfully")}case *ast.DropDatabaseStatement:if err := backend.DropDatabase(s); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Database dropped successfully")}case *ast.ShowDatabasesStatement:result := backend.ShowDatabases()printResults(result)case *ast.ShowTablesStatement:result := backend.ShowTables(&dbContextAdapter{&currentDatabase})printResults(result)case *ast.UseDatabaseStatement:if err := backend.UseDatabase(s, &dbContextAdapter{&currentDatabase}); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Printf("Database changed to '%s'\n", currentDatabase)}case *ast.CreateTableStatement:if err := backend.CreateTable(currentDatabase, s); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Table created successfully")}case *ast.InsertStatement:if err := backend.Insert(currentDatabase, s, currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("1 row inserted")}case *ast.SelectStatement:results, err := backend.Select(currentDatabase, s, currentTxn)if err != nil {fmt.Printf("Error: %v\n", err)} else {printResults(results)}case *ast.UpdateStatement:if err := backend.Update(currentDatabase, s, currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Query OK")}case *ast.DeleteStatement:if err := backend.Delete(currentDatabase, s, currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Query OK")}case *ast.DropTableStatement:if err := backend.DropTable(currentDatabase, s); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Table dropped successfully")}default:fmt.Printf("Unsupported statement type: %T\n", s)}}}
}func printResults(results *storage.Results) {// 计算每列的最大宽度colWidths := make([]int, len(results.Columns))for i, col := range results.Columns {colWidths[i] = len(col.Name)}for _, row := range results.Rows {for i, cell := range row {cellLen := len(cell.String())if cellLen > colWidths[i] {colWidths[i] = cellLen}}}// 打印表头fmt.Print("+")for _, width := range colWidths {fmt.Print(strings.Repeat("-", width+2))fmt.Print("+")}fmt.Println()// 打印列名fmt.Print("|")for i, col := range results.Columns {fmt.Printf(" %-*s |", colWidths[i], col.Name)}fmt.Println()// 打印分隔线fmt.Print("+")for _, width := range colWidths {fmt.Print(strings.Repeat("-", width+2))fmt.Print("+")}fmt.Println()// 打印数据行for _, row := range results.Rows {fmt.Print("|")for i, cell := range row {fmt.Printf(" %-*s |", colWidths[i], cell.String())}fmt.Println()}// 打印底部边框fmt.Print("+")for _, width := range colWidths {fmt.Print(strings.Repeat("-", width+2))fmt.Print("+")}fmt.Println()// 打印行数统计fmt.Printf("%d rows in set\n", len(results.Rows))
}// 提供命令补全功能
func completer(d prompt.Document) []prompt.Suggest {s := []prompt.Suggest{}return prompt.FilterHasPrefix(s, d.GetWordBeforeCursor(), true)
}func main() {// 添加配置文件参数configPath := flag.String("config", "config.json", "Path to config file")port := flag.String("port", "3118", "Port to listen on")flag.Parse()// 加载配置config, err := config.LoadConfig(*configPath)if err != nil {fmt.Printf("无法加载配置文件: %v\n", err)os.Exit(1)}// 初始化存储引擎switch config.Storage.Type {case "memory":backend = storage.NewMemoryBackend()case "disk":backend = storage.NewDiskBackend(config.Storage.DataPath)default:fmt.Printf("未知的存储引擎类型: %s\n", config.Storage.Type)os.Exit(1)}// 检查是否以服务器模式运行args := flag.Args()if len(args) > 0 && args[0] == "server" {// 启动服务器模式portEnv := os.Getenv("ZIYIDB_PORT")if portEnv != "" {*port = portEnv} else if config.Server.Port != "" {*port = config.Server.Port}// 类型断言获取 MemoryBackend(如果使用的是内存引擎)if memoryBackend, ok := backend.(*storage.MemoryBackend); ok {server := network.NewServer(memoryBackend, *port)fmt.Printf("Starting ZiyiDB server on port %s with %s storage...\n", *port, config.Storage.Type)log.Fatal(server.Start())} else {fmt.Println("Server mode only supports memory storage engine")os.Exit(1)}}fmt.Println("Welcome to ZiyiDB!")fmt.Println("Type your SQL commands (type 'exit' to quit)")p := prompt.New(executor,completer,prompt.OptionTitle("ZiyiDB: A Simple SQL Database"),prompt.OptionPrefix("ziyidb> "),prompt.OptionHistory(history),prompt.OptionLivePrefix(func() (string, bool) {return "ziyidb> ", true}),//实现方向键上下翻阅历史命令// 上键绑定prompt.OptionAddKeyBind(prompt.KeyBind{Key: prompt.Up,Fn: func(buf *prompt.Buffer) {if historyIndex > 0 {historyIndex--buf.DeleteBeforeCursor(len(buf.Text()))buf.InsertText(history[historyIndex], false, true)}},}),// 下键绑定prompt.OptionAddKeyBind(prompt.KeyBind{Key: prompt.Down,Fn: func(buf *prompt.Buffer) {if historyIndex < len(history)-1 {historyIndex++buf.DeleteBeforeCursor(len(buf.Text()))buf.InsertText(history[historyIndex], false, true)} else if historyIndex == len(history)-1 {historyIndex++buf.DeleteBeforeCursor(len(buf.Text()))}},}),)p.Run()
}

拓展:docker化支持

为了适应现在互联网开发的要求,ZiyiDB也需要提供docker部署的方式。

  1. 项目根目录下,新建Dockerfile文件

Dockerfile:

# Dockerfile
FROM golang:1.23.8-alpine AS builder# 设置ZiyiDB工作目录
WORKDIR /app# 复制go mod和sum文件
COPY go.mod go.sum ./# 下载依赖
RUN go mod download# 复制源代码
COPY . .# 编译服务端程序
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o ziyidb cmd/main.go# 编译客户端程序
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o ziyidb-cli cmd/client.go# 使用基础镜像
FROM alpine:latest# 安装ca-certificates
RUN apk --no-cache add ca-certificates# 设置工作目录
WORKDIR /root/# 从builder阶段复制编译好的程序
COPY --from=builder /app/ziyidb .
COPY --from=builder /app/ziyidb-cli .# 复制配置文件
COPY --from=builder /app/config.json .# 暴露默认端口
EXPOSE 3118# 启动服务器模式
ENTRYPOINT ["./ziyidb", "server", "-port=3118"]
  1. 执行docker命令打包镜像
# 在Dockerfile同一目录,构建镜像
docker build -t ziyidb .

在这里插入图片描述

查看是否构建成功:

docker images | grep ziyidb

在这里插入图片描述
3. 创建容器

# 创建容器并挂载本地目录
docker run -d \--name ziyidb \-p 3118:3118 \--restart unless-stopped \-v /Users/ziyi/GolandProjects/ZiyiDB/config.json:/root/config.json \-v /Users/ziyi/GolandProjects/ZiyiDB/data:/root/data \ziyidb# 查看容器运行状态
docker ps | grep ziyidb

在这里插入图片描述
PS:同样的,除了docker方式搭建,也可以编写docker-compose.yml启动。

docker-compose.yml:

services:ziyidb:build: .container_name: ziyidbports:- "3118:3118"restart: unless-stoppedenvironment:- ZIYIDB_PORT=3118
# 通过docker-compose方式启动
docker-compose up -d 

效果:
在这里插入图片描述

  1. 测试功能
  • 验证memory存储引擎
# 进入容器
docker exec -it ziyidb /bin/sh# 执行客户端,连接服务端(后续可优化:默认链接本地的服务端)
./ziyidb-cli localhost:3118# 测试memory存储引擎
CREATE DATABASE test_memory;
USE test_memory;
CREATE TABLE users (id INT PRIMARY KEY, name TEXT, age INT);
INSERT INTO users VALUES (1, 'Alice', 25);
SELECT * FROM users;
exit
exit# 重启容器,观察之前创建的数据是否还存在
docker restart ziyidb
docker exec -it ziyidb /bin/sh
./ziyidb-cli localhost:3118
show databases;

在这里插入图片描述
在这里插入图片描述

  • 验证disk存储引擎

调整本地挂载的config.json配置文件,然后重启容器
在这里插入图片描述
重启容器,并插入数据:

# 重启容器
docker restart ziyidb# 进入ziyidb,连接到客户端
docker exec -it ziyidb /bin/sh
./ziyidb-cli localhost:3118# 创建数据库、表,并插入数据
CREATE DATABASE test_disk;
USE test_disk;
CREATE TABLE users (id INT PRIMARY KEY, name TEXT, age INT);
INSERT INTO users VALUES (1, 'Alice', 25);
SELECT * FROM users;

在这里插入图片描述
观察挂载的本地磁盘是否有对应目录生成:
在这里插入图片描述
重启ziyidb,验证之前插入的数据是否还能查询:

# 进入ziyidb
docker exec -it ziyidb /bin/sh -c "./ziyidb-cli localhost:3118"
# 查询之前数据
show databases;
use test_disk;
select * from users;

在这里插入图片描述
5. 打包镜像到docker-hub

在Dockerfile同级目录执行下面命令:

# ziyigun是我dockerhub的用户名
# ziyidb是镜像名,latest是版本号,代表最新
docker build -t ziyigun/ziyidb:latest .# 查询是否打包成功
docker images | grep ziyigun

在这里插入图片描述
在这里插入图片描述
登录dockerhub:

# 登录dockerhub
docker login

在这里插入图片描述
在这里插入图片描述

推送镜像到远程仓库:

# 推送到dockerhub
docker push ziyigun/ziyidb:latest 

在这里插入图片描述

在仓库搜索ziyidb,查看是否能搜索到镜像:https://hub.docker.com/search?q=ziyidb

在这里插入图片描述

参考文章:
https://blog.csdn.net/weixin_45565886/article/details/154035863
https://dev.mysql.com/doc/refman/8.0/en/system-schema.html
https://dev.mysql.com/doc/refman/8.0/en/mysql-nutshell.html#mysql-nutshell-ddl

http://www.dtcms.com/a/577965.html

相关文章:

  • win11系统 Android Studio AVD 模拟器创建【记录】
  • 架构论文《论UP(统一过程)在开发中的设计和应用》
  • 中文域名网站好不好优化公司网站建设整体架构
  • 【LeetCode】102. 二叉树的层序遍历
  • 梅州市做试块网站wordpress mo
  • wps excel中把特定几列除以某一列,然后以百分比显示
  • 如何预览常见格式word、excel、ppt、图片等格式的文档
  • 免费网络短剧网站小学生编程软件
  • linux服务器常用组件巡检脚本
  • MySQL----case的用法
  • 硅云网站建设视频软件开发交易平台
  • 贵阳建站模板搭建wordpress相册滑动
  • 山东外贸建站工作简历
  • Qt/C++编写GB28181服务/前后端分离/定义一套交互协议/视频点播/录像回放和控制/警情通知
  • langchain基础教程(3)---langchain一些高级用法
  • Palantir Foundry本体层次与数据存储
  • 开源版coreshop微信商城显示产品列表显示的修正(2)
  • 昆山建设局图审中心网站温州做网站老师
  • 网站判断手机跳转代码用wordpress搭建娱乐网
  • 追剧喵 v3.2.0 手机影视播放器工具
  • 三角洲行动-java游戏程序
  • 清晰地说明 NVM、NPM 和 NRM 在 Node.js 开发过程中的作用
  • Java IDEA学习之路:第七、八周课程笔记归纳
  • Vue2 首屏加载慢打包优化的详细记录
  • 【AI应用探索】-LLaMA-Factory微调模型
  • 最有效的网站推广方案企业网站管理系统怎么用
  • linux系统如何做网站小程序会员系统怎么做
  • 网站建设费用福州建设工程招投标信息网
  • 使用 llama.cpp 在本地高效运行大语言模型,支持 Docker 一键启动,兼容CPU与GPU
  • MTPA-最大转矩电流比控制解析