[手写系列]Go手写db — — 第七版(实现Disk存储引擎、Docker化支持)
[手写系列]Go手写db — — 第七版(实现Disk存储引擎、Docker化支持)
第一版文章:[手写系列]Go手写db — — 完整教程_go手写数据库-CSDN博客
第二版文章:[手写系列]Go手写db — — 第二版-CSDN博客
第三版文章:[手写系列]Go手写db — — 第三版(实现分组、排序、聚合函数等)-CSDN博客
第四版文章:[手写系列]Go手写db — — 第四版(实现事务、网络模块)
第五版文章:[手写系列]Go手写db — — 第五版(实现数据库操作模块)
第六版文章:[手写系列]Go手写db — — 第六版(实现表连接)
整体项目Github地址:https://github.com/ziyifast/ZiyiDB- 请大家多多支持,也欢迎大家star⭐️和共同维护这个项目~
本文主要介绍如何在 ZiyiDB 第六版的基础上,实现持久化存储引擎,将数据保存到磁盘,使数据库在重启后依然能够保留数据,同时支持Docker方式部署。
🚀
同时ZiyiDB的核心功能,至此也基本完成,感谢大家一直以来的支持~
前置
本文会涉及存储引擎与本地磁盘文件的关系,Disk存储引擎底层参考MySQL8.0版本,大家感兴趣的也可以去了解MySQL的存储系统发展。
MySQL底层存储系统发展之路-CSDN博客
一、功能列表
- 实现其他存储引擎,与本地磁盘交互,实现数据持久化。
- 实现配置文件,配置程序默认server端端口、默认选择的存储引擎等
- 打包到docker
实现Disk存储引擎,与本地文件进行交互。
-- 通过下面SQL可查看MySQL支持的存储引擎
show engines;

二、实现细节
功能点一:实现磁盘存储引擎
实现思路
1.新增持久化存储引擎结构
- 新增internal/storage/engine.go文件,抽取Engine接口,定义存储引擎必要接口

- 新增internal/storage/disk.go文件,创建 DiskBackend 结构体,实现 Engine 接口,与现有的 MemoryBackend 并行存在,保证ZiyiDB支持多个引擎,实现类MySQL多引擎能力。

- 新增internal/storage/base.go文件,抽象baseEngine,因为MemoryBackend和DiskBackend解析SQL、调用函数、拼装结果等步骤是类似的,只是对结果的保存方式不同。因此我们可以抽取一个baseEngine,复用通用的逻辑。
2.数据组织结构设计
参考MySQL最新的InnoDB引擎设计,采用数据字典存储表元数据相关信息,每个表的数据单独存储为.ibd文件。这里的数据字典为了简单方便,我直接采取.json方式统一存储表的名称、表的列名等元数据信息,实际MySQL将其分散在多张表中存储。
详情可参考:MySQL底层存储系统发展之路-CSDN博客
- 使用 data_dictionary.json 文件存储所有数据库和表的元数据
- 每个数据库对应一个目录
- 每个表对应一个 .ibd 文件,存储实际数据行
首先internal/storage/disk.go文件新增数据字典相关结构:

然后在disk.go中实现对数据字典的维护,比如加载和保存等,底层逻辑就是本地磁盘创建文件、读取、更新文件。

最后针对每个数据库下的表创建对应的.ibd文件,用于存储每个表中的数据,同时表数据更新时也对应更新.ibd文件。
MySQL底层.ibd实际是采用二进制方式存储,且做过压缩。ZiyiDB的.ibd文件这里为了简单也采用json方式存储。

3. 实现核心操作
- CreateDatabase: 创建数据库目录并更新数据字典
每个数据库会在本地磁盘创建一个目录

- CreateTable: 创建表结构并保存到数据字典和.ibd文件

- Insert/Update/Delete: 修改数据后保存到.ibd文件
以更新数据为例:加载表 - 更新数据到内存 - 内存数据保存到.ibd文件中(更新.ibd文件)


- Select: 从.ibd文件加载数据后进行查询
读取.ibd文件中的表数据,加载到内存,然后select逻辑与之前一样。


- loadTable/saveTable: 核心的加载和保存表数据的方法
loadTable:从磁盘加载表数据到内存

saveTable:将内存中的数据保存更新到本地磁盘中

4. 事务支持
与Memory存储引擎相比,Disk引擎主要新增了Cache层,存储事务内提交的变动,因为Disk引擎的数据主要来源于磁盘,但如果事务未提交我们先不写磁盘,先写入缓存,提交后再刷新缓存表数据到磁盘中。



代码实现
因为代码行数比较多,本次主要展示核心代码
1. 存储引擎接口定义
internal/storage/engine.go:
// internal/storage/engine.go
package storageimport ("ziyi.db.com/internal/ast""ziyi.db.com/internal/context"
)// Engine 定义存储引擎接口
type Engine interface {// 数据库操作CreateDatabase(stmt *ast.CreateDatabaseStatement) errorDropDatabase(stmt *ast.DropDatabaseStatement) errorUseDatabase(stmt *ast.UseDatabaseStatement, connCtx context.DBContext) errorShowDatabases() *ResultsShowTables(connCtx context.DBContext) *Results// 表操作CreateTable(databaseName string, stmt *ast.CreateTableStatement) errorDropTable(databaseName string, stmt *ast.DropTableStatement) error// 数据操作Insert(databaseName string, stmt *ast.InsertStatement, txn *Transaction) errorSelect(databaseName string, stmt *ast.SelectStatement, txn *Transaction) (*Results, error)Update(databaseName string, stmt *ast.UpdateStatement, txn *Transaction) errorDelete(databaseName string, stmt *ast.DeleteStatement, txn *Transaction) error// 事务支持BeginTransaction() *Transaction// 提交事务CommitTransaction(txn *Transaction) error// 回滚事务RollbackTransaction(txn *Transaction) error
}
2. disk存储引擎实现
internal/storage/disk.go:
// internal/storage/disk.go
package storageimport ("encoding/json""fmt""os""path/filepath""sync""time""ziyi.db.com/internal/ast""ziyi.db.com/internal/context"
)type DiskBackend struct {dataPath stringmu sync.RWMutextxnMgr *TransactionManagerbase *BaseEngine// 数据字典,存储所有表的元数据。参考MySQL8.0设计dataDictionary map[string]map[string]*TableMetadatadictMutex sync.RWMutex// 事务表缓存,存储事务期间的表修改// key: txnID, value: map[tableName]*TabletxnTableCache map[uint64]map[string]*TablecacheMutex sync.RWMutex
}// TableMetadata 表示表的元数据(替代.frm文件)
type TableMetadata struct {Name stringColumns []ast.ColumnDefinitionIndexes map[string]*IndexMetadata
}// IndexMetadata 索引元数据
type IndexMetadata struct {Column stringType string // "PRIMARY", "UNIQUE", "INDEX"
}func NewDiskBackend(dataPath string) *DiskBackend {// 确保数据目录存在if err := os.MkdirAll(dataPath, 0755); err != nil {panic(fmt.Sprintf("无法创建数据目录: %v", err))}backend := &DiskBackend{dataPath: dataPath,txnMgr: NewTransactionManager(),base: &BaseEngine{},dataDictionary: make(map[string]map[string]*TableMetadata),txnTableCache: make(map[uint64]map[string]*Table),}// 初始化数据字典backend.initDataDictionary()return backend
}// initDataDictionary 初始化数据字典
func (d *DiskBackend) initDataDictionary() {dictPath := filepath.Join(d.dataPath, "data_dictionary.json")// 尝试加载现有的数据字典if file, err := os.Open(dictPath); err == nil {defer file.Close()var dict map[string]map[string]*TableMetadataif err := json.NewDecoder(file).Decode(&dict); err == nil {d.dataDictionary = dictreturn}}// 如果没有现有字典,创建新的d.dataDictionary = make(map[string]map[string]*TableMetadata)
}// saveDataDictionary 保存数据字典到磁盘
func (d *DiskBackend) saveDataDictionary() error {dictPath := filepath.Join(d.dataPath, "data_dictionary.json")tempPath := dictPath + ".tmp"file, err := os.Create(tempPath)if err != nil {return err}if err := json.NewEncoder(file).Encode(d.dataDictionary); err != nil {file.Close()os.Remove(tempPath)return err}if err := file.Close(); err != nil {os.Remove(tempPath)return err}return os.Rename(tempPath, dictPath)
}// getDatabasePath 获取数据库路径
func (d *DiskBackend) getDatabasePath(dbName string) string {return filepath.Join(d.dataPath, dbName)
}// getTableDataPath 获取表数据文件路径 (.ibd)
func (d *DiskBackend) getTableDataPath(dbName, tableName string) string {return filepath.Join(d.getDatabasePath(dbName), tableName+".ibd")
}// loadTable 加载表(从.ibd文件和数据字典)
func (d *DiskBackend) loadTable(dbName, tableName string, txn *Transaction) (*Table, error) {// 如果在事务中,首先检查事务缓存if txn != nil {d.cacheMutex.RLock()if tables, exists := d.txnTableCache[txn.ID]; exists {if table, exists := tables[tableName]; exists {d.cacheMutex.RUnlock()// 直接返回缓存中的表引用,而不是创建副本// 这样可以确保在事务中看到最新的修改return table, nil}}d.cacheMutex.RUnlock()}d.dictMutex.RLock()// 从数据字典获取表结构dbTables, dbExists := d.dataDictionary[dbName]if !dbExists {d.dictMutex.RUnlock()return nil, fmt.Errorf("database '%s' not exist", dbName)}metadata, tableExists := dbTables[tableName]if !tableExists {d.dictMutex.RUnlock()return nil, fmt.Errorf("table '%s' not exist", tableName)}d.dictMutex.RUnlock()// 创建表结构table := &Table{Name: metadata.Name,Columns: metadata.Columns,Indexes: make(map[string]*Index),RowLocks: make(map[int]*sync.RWMutex),Rows: make([][]VersionedCell, 0),}// 恢复索引结构for name, idxMeta := range metadata.Indexes {table.Indexes[name] = &Index{Column: idxMeta.Column,Values: make(map[string][]int),}}// 加载表数据dataPath := d.getTableDataPath(dbName, tableName)if dataFile, err := os.Open(dataPath); err == nil {defer dataFile.Close()var tableData struct {Rows [][]VersionedCell `json:"rows"`}if err := json.NewDecoder(dataFile).Decode(&tableData); err == nil {table.Rows = tableData.Rows}}// 重建索引数据d.rebuildIndexes(table)// 如果在事务中,将加载的表放入缓存if txn != nil {d.cacheMutex.Lock()if d.txnTableCache[txn.ID] == nil {d.txnTableCache[txn.ID] = make(map[string]*Table)}// 存储表的引用到缓存中d.txnTableCache[txn.ID][tableName] = tabled.cacheMutex.Unlock()}return table, nil
}// saveTable 保存表(保存到.ibd文件,元数据保存在数据字典中)
func (d *DiskBackend) saveTable(dbName, tableName string, table *Table) error {dbPath := d.getDatabasePath(dbName)// 确保数据库目录存在if err := os.MkdirAll(dbPath, 0755); err != nil {return err}// 更新数据字典中的表元数据d.dictMutex.Lock()if d.dataDictionary[dbName] == nil {d.dataDictionary[dbName] = make(map[string]*TableMetadata)}// 创建表元数据metadata := &TableMetadata{Name: table.Name,Columns: table.Columns,Indexes: make(map[string]*IndexMetadata),}// 保存索引元数据for name, idx := range table.Indexes {indexType := "INDEX"for _, col := range table.Columns {if col.Name == idx.Column && col.Primary {indexType = "PRIMARY"break}}metadata.Indexes[name] = &IndexMetadata{Column: idx.Column,Type: indexType,}}d.dataDictionary[dbName][tableName] = metadata// 保存数据字典if err := d.saveDataDictionary(); err != nil {d.dictMutex.Unlock()return err}d.dictMutex.Unlock()// 保存表数据到.ibd文件dataPath := d.getTableDataPath(dbName, tableName)tempPath := dataPath + ".tmp"dataFile, err := os.Create(tempPath)if err != nil {return err}// 只保存数据行tableData := struct {Rows [][]VersionedCell `json:"rows"`}{Rows: table.Rows,}if err := json.NewEncoder(dataFile).Encode(tableData); err != nil {dataFile.Close()os.Remove(tempPath)return err}if err := dataFile.Close(); err != nil {os.Remove(tempPath)return err}// 原子性替换表数据文件return os.Rename(tempPath, dataPath)
}// rebuildIndexes 重建索引
func (d *DiskBackend) rebuildIndexes(table *Table) {// 清空现有索引值for _, index := range table.Indexes {index.Values = make(map[string][]int)}// 重建索引数据for i, row := range table.Rows {for j, col := range table.Columns {if col.Primary && j < len(row) {key := row[j].Data.String()if index, exists := table.Indexes[col.Name]; exists {index.Values[key] = append(index.Values[key], i)}}}}
}// 实现 Engine 接口的方法
func (d *DiskBackend) CreateDatabase(stmt *ast.CreateDatabaseStatement) error {d.mu.Lock()defer d.mu.Unlock()dbPath := d.getDatabasePath(stmt.Name)// 检查数据库是否已存在if _, err := os.Stat(dbPath); !os.IsNotExist(err) {return fmt.Errorf("数据库 '%s' 已存在", stmt.Name)}// 创建数据库目录if err := os.MkdirAll(dbPath, 0755); err != nil {return err}// 更新数据字典d.dictMutex.Lock()d.dataDictionary[stmt.Name] = make(map[string]*TableMetadata)err := d.saveDataDictionary()d.dictMutex.Unlock()return err
}func (d *DiskBackend) DropDatabase(stmt *ast.DropDatabaseStatement) error {d.mu.Lock()defer d.mu.Unlock()dbPath := d.getDatabasePath(stmt.Name)// 检查数据库是否存在if _, err := os.Stat(dbPath); os.IsNotExist(err) {return fmt.Errorf("database '%s' does not exist", stmt.Name)}// 删除数据库目录if err := os.RemoveAll(dbPath); err != nil {return err}// 更新数据字典d.dictMutex.Lock()delete(d.dataDictionary, stmt.Name)err := d.saveDataDictionary()d.dictMutex.Unlock()return err
}func (d *DiskBackend) UseDatabase(stmt *ast.UseDatabaseStatement, connCtx context.DBContext) error {d.mu.RLock()defer d.mu.RUnlock()// 检查数据库是否存在(通过数据字典)d.dictMutex.RLock()_, exists := d.dataDictionary[stmt.Name]d.dictMutex.RUnlock()if !exists {return fmt.Errorf("database '%s' does not exist", stmt.Name)}// 更新连接上下文中的当前数据库connCtx.SetDBName(stmt.Name)return nil
}func (d *DiskBackend) ShowDatabases() *Results {d.mu.RLock()defer d.mu.RUnlock()results := &Results{Columns: []ResultColumn{{Name: "Database", Type: "TEXT"},},Rows: make([][]Cell, 0),}d.dictMutex.RLock()for dbName := range d.dataDictionary {results.Rows = append(results.Rows, []Cell{{Type: CellTypeText, TextValue: dbName},})}d.dictMutex.RUnlock()return results
}func (d *DiskBackend) ShowTables(connCtx context.DBContext) *Results {d.mu.RLock()defer d.mu.RUnlock()results := &Results{Columns: []ResultColumn{{Name: "Tables", Type: "TEXT"},},Rows: make([][]Cell, 0),}dbName := connCtx.GetDBName()if dbName == "" {return results}d.dictMutex.RLock()if dbTables, exists := d.dataDictionary[dbName]; exists {for tableName := range dbTables {results.Rows = append(results.Rows, []Cell{{Type: CellTypeText, TextValue: tableName},})}}d.dictMutex.RUnlock()return results
}// CreateTable 创建表
func (d *DiskBackend) CreateTable(databaseName string, stmt *ast.CreateTableStatement) error {d.mu.Lock()defer d.mu.Unlock()// 检查数据库是否存在d.dictMutex.RLock()_, dbExists := d.dataDictionary[databaseName]d.dictMutex.RUnlock()if !dbExists {return fmt.Errorf("database '%s' does not exist", databaseName)}// 检查表是否已存在d.dictMutex.RLock()_, tableExists := d.dataDictionary[databaseName][stmt.TableName]d.dictMutex.RUnlock()if tableExists {return fmt.Errorf("table '%s' 已存在", stmt.TableName)}// 创建新表table := &Table{Name: stmt.TableName,Columns: stmt.Columns,Rows: make([][]VersionedCell, 0),Indexes: make(map[string]*Index),RowLocks: make(map[int]*sync.RWMutex),}// 为主键创建索引for _, col := range stmt.Columns {if col.Primary {table.Indexes[col.Name] = &Index{Column: col.Name,Values: make(map[string][]int),}}}// 保存表到磁盘return d.saveTable(databaseName, stmt.TableName, table)
}func (d *DiskBackend) DropTable(databaseName string, stmt *ast.DropTableStatement) error {d.mu.Lock()defer d.mu.Unlock()// 检查数据库是否存在d.dictMutex.RLock()_, dbExists := d.dataDictionary[databaseName]d.dictMutex.RUnlock()if !dbExists {return fmt.Errorf("database '%s' does not exist", databaseName)}// 检查表是否存在d.dictMutex.RLock()_, tableExists := d.dataDictionary[databaseName][stmt.TableName]d.dictMutex.RUnlock()if !tableExists {return fmt.Errorf("table '%s' does not exist", stmt.TableName)}// 删除表数据文件dataPath := d.getTableDataPath(databaseName, stmt.TableName)if err := os.Remove(dataPath); err != nil && !os.IsNotExist(err) {return fmt.Errorf("can not delete table data '%s': %v", stmt.TableName, err)}// 从数据字典中移除表d.dictMutex.Lock()delete(d.dataDictionary[databaseName], stmt.TableName)err := d.saveDataDictionary()d.dictMutex.Unlock()return err
}// Select 查询数据
func (d *DiskBackend) Select(databaseName string, stmt *ast.SelectStatement, txn *Transaction) (*Results, error) {d.mu.RLock()defer d.mu.RUnlock()// 加载表(支持事务缓存)table, err := d.loadTable(databaseName, stmt.TableName, txn)if err != nil {return nil, err}// 使用通用逻辑处理查询return d.base.Select(table, stmt, txn, d.getVisibleRow, d.base.evaluateWhereCondition)
}// Insert 插入数据
func (d *DiskBackend) Insert(databaseName string, stmt *ast.InsertStatement, txn *Transaction) error {d.mu.Lock()defer d.mu.Unlock()// 加载表(支持事务缓存)table, err := d.loadTable(databaseName, stmt.TableName, txn)if err != nil {return err}// 使用通用逻辑处理插入err = d.base.Insert(table, stmt, txn, d.getVisibleRow, d.base.convertToCell, d.base.evaluateExpression)if err != nil {return err}// 在非事务模式下直接保存表,否则更新事务缓存if txn == nil {return d.saveTable(databaseName, stmt.TableName, table)} else {// 更新事务缓存中的表d.cacheMutex.Lock()if d.txnTableCache[txn.ID] == nil {d.txnTableCache[txn.ID] = make(map[string]*Table)}d.txnTableCache[txn.ID][stmt.TableName] = tabled.cacheMutex.Unlock()}return nil
}// Update 更新数据
func (d *DiskBackend) Update(databaseName string, stmt *ast.UpdateStatement, txn *Transaction) error {d.mu.Lock()defer d.mu.Unlock()// 加载表(支持事务缓存)table, err := d.loadTable(databaseName, stmt.TableName, txn)if err != nil {return err}// 使用通用逻辑处理更新err = d.base.Update(table, stmt, txn, d.getVisibleRow, d.base.convertToCell,d.base.evaluateExpression, d.base.evaluateWhereCondition)if err != nil {return err}// 在非事务模式下直接保存表,否则更新事务缓存if txn == nil {return d.saveTable(databaseName, stmt.TableName, table)} else {// 更新事务缓存中的表d.cacheMutex.Lock()if d.txnTableCache[txn.ID] == nil {d.txnTableCache[txn.ID] = make(map[string]*Table)}d.txnTableCache[txn.ID][stmt.TableName] = tabled.cacheMutex.Unlock()}return nil
}// Delete 删除数据
func (d *DiskBackend) Delete(databaseName string, stmt *ast.DeleteStatement, txn *Transaction) error {d.mu.Lock()defer d.mu.Unlock()// 加载表(支持事务缓存)table, err := d.loadTable(databaseName, stmt.TableName, txn)if err != nil {return err}// 使用通用逻辑处理删除err = d.base.Delete(table, stmt, txn, d.getVisibleRow, d.base.evaluateWhereCondition)if err != nil {return err}// 在非事务模式下直接保存表,否则更新事务缓存if txn == nil {return d.saveTable(databaseName, stmt.TableName, table)} else {// 更新事务缓存中的表d.cacheMutex.Lock()if d.txnTableCache[txn.ID] == nil {d.txnTableCache[txn.ID] = make(map[string]*Table)}d.txnTableCache[txn.ID][stmt.TableName] = tabled.cacheMutex.Unlock()}return nil
}// updateIndexesAfterInsert 在插入后更新索引
func (d *DiskBackend) updateIndexesAfterInsert(table *Table, oldRowCount int) {// 只需要更新新插入行的索引for i := oldRowCount; i < len(table.Rows); i++ {row := table.Rows[i]for _, col := range table.Columns {if col.Primary && i < len(row) {key := row[i].Data.String()if index, exists := table.Indexes[col.Name]; exists {index.Values[key] = append(index.Values[key], i)}}}}
}// BeginTransaction 开启事务
func (d *DiskBackend) BeginTransaction() *Transaction {return d.txnMgr.BeginTransaction(d)
}// CommitTransaction 提交事务中的更改
func (d *DiskBackend) CommitTransaction(txn *Transaction) error {d.mu.Lock()defer d.mu.Unlock()// 首先调用事务管理器的提交方法if err := d.txnMgr.CommitTransaction(txn); err != nil {return err}// 获取事务缓存中的表d.cacheMutex.RLock()tables, cacheExists := d.txnTableCache[txn.ID]d.cacheMutex.RUnlock()if cacheExists {// 遍历事务缓存中的所有表,将修改持久化到磁盘for tableName, table := range tables {// 查找表所在的数据库var dbName stringvar found boold.dictMutex.RLock()for db, dbTables := range d.dataDictionary {if _, exists := dbTables[tableName]; exists {dbName = dbfound = truebreak}}d.dictMutex.RUnlock()if !found {continue}// 更新表中所有版本化单元格的状态for i := range table.Rows {for j := range table.Rows[i] {if table.Rows[i][j].TxnID == txn.ID {table.Rows[i][j].Committed = truetable.Rows[i][j].Timestamp = time.Now()}}}// 重建索引以确保一致性d.rebuildIndexes(table)// 保存表到磁盘if err := d.saveTable(dbName, tableName, table); err != nil {return err}}}// 清理事务缓存d.cacheMutex.Lock()delete(d.txnTableCache, txn.ID)d.cacheMutex.Unlock()// 保存数据字典return d.saveDataDictionary()
}// RollbackTransaction 回滚事务
func (d *DiskBackend) RollbackTransaction(txn *Transaction) error {// 先执行事务管理器的回滚逻辑(更新事务状态等)if err := d.txnMgr.RollbackTransaction(txn); err != nil {return err}// 清理事务缓存(回滚时直接丢弃所有未提交的修改)d.cacheMutex.Lock()delete(d.txnTableCache, txn.ID)d.cacheMutex.Unlock()// 保存数据字典(确保元数据一致性)return d.saveDataDictionary()
}// getVisibleRow 获取可见行版本
func (d *DiskBackend) getVisibleRow(versionedRow []VersionedCell, txn *Transaction) []Cell {if len(versionedRow) == 0 {return nil}if txn == nil {// 非事务查询,返回最新提交的版本for i := len(versionedRow) - 1; i >= 0; i-- {if versionedRow[i].Committed {row := make([]Cell, len(versionedRow))for j, v := range versionedRow {row[j] = v.Data}return row}}return nil}// 事务查询,根据读已提交隔离级别规则// 查找对当前事务可见的最新版本for i := len(versionedRow) - 1; i >= 0; i-- {version := versionedRow[i]// 如果是当前事务自己的修改,可见(即使未提交)if version.TxnID == txn.ID {row := make([]Cell, len(versionedRow))for j, v := range versionedRow {row[j] = v.Data}return row}// 对于读已提交隔离级别,只能看到已提交的数据if version.Committed {row := make([]Cell, len(versionedRow))for j, v := range versionedRow {row[j] = v.Data}return row}}return nil
}
测试
测试命令:
-- 创建数据库test
create database test;
use test;
create table users (id INT PRIMARY KEY,name text,age INT);
insert into users values (1, 'Tom', 20);
select * from users;
-- 重启数据库,然后观察数据库及表中数据是否存在
use test;
select * from users;
效果:
可以看到我们插入数据,重启数据库之后依然可以看到之前的数据,并且本地生成了对应的持久化文件,整体符合预期。


功能点二:配置文件支持
MySQL中可通过my.cnf等文件进行系统配置,我们也可参考MySQL,提供一个配置文件用于配置服务默认端口、默认存储引擎等信息。

实现思路
只需在程序启动时,读取本地的配置文件即可。然后判断程序监听的端口和默认初始化的存储引擎
1.新建config/config.go文件:

2. main.go中读取配置文件:

代码实现
config/config.go:
// config/config.go
package configimport ("encoding/json""os"
)type Config struct {Storage struct {Type string `json:"type"` // "memory" 或 "disk"DataPath string `json:"dataPath"` // 磁盘存储的数据路径} `json:"storage"`Server struct {Port string `json:"port"`} `json:"server"`
}func LoadConfig(configPath string) (*Config, error) {file, err := os.Open(configPath)if err != nil {return nil, err}defer file.Close()var config Configif err := json.NewDecoder(file).Decode(&config); err != nil {return nil, err}return &config, nil
}
cmd/main.go:
// cmd/main.go
package mainimport ("flag""fmt""github.com/c-bata/go-prompt""log""os""strings""ziyi.db.com/config""ziyi.db.com/internal/ast""ziyi.db.com/internal/lexer""ziyi.db.com/internal/parser""ziyi.db.com/internal/storage""ziyi.db.com/network"
)var history []string // 存储命令历史
var backend storage.Engine // 存储引擎实例
var currentTxn *storage.Transaction // 当前事务
var historyIndex int // 当前历史记录索引
var currentDatabase string // 当前用户选择的数据库type dbContextAdapter struct {dbName *string
}func (d *dbContextAdapter) GetDBName() string {return *d.dbName
}func (d *dbContextAdapter) SetDBName(dbName string) {*d.dbName = dbName
}func executor(t string) {// 分割多个SQL语句(用分号分隔)statements := strings.Split(t, ";")for _, stmt := range statements {stmt = strings.TrimSpace(stmt)if stmt == "" {continue}// 添加到历史记录history = append(history, stmt)historyIndex = len(history) // 重置历史记录索引// 处理退出命令if strings.ToLower(stmt) == "exit" {fmt.Println("Bye!")os.Exit(0)}// 处理事务相关命令if strings.HasPrefix(strings.ToLower(stmt), "begin") {currentTxn = backend.BeginTransaction()fmt.Printf("Transaction %d started\n", currentTxn.ID)continue}if strings.HasPrefix(strings.ToLower(stmt), "commit") {if currentTxn == nil {fmt.Println("Error: No active transaction")continue}if err := backend.CommitTransaction(currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Printf("Transaction %d committed\n", currentTxn.ID)}currentTxn = nilcontinue}if strings.HasPrefix(strings.ToLower(stmt), "rollback") {if currentTxn == nil {fmt.Println("Error: No active transaction")continue}if err := backend.RollbackTransaction(currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Printf("Transaction %d rolled back\n", currentTxn.ID)}currentTxn = nilcontinue}// 创建词法分析器l := lexer.NewLexer(strings.NewReader(stmt))// 创建语法分析器p := parser.NewParser(l)// 解析SQL语句parsedStmt, err := p.ParseProgram()if err != nil {fmt.Printf("Parse error: %v\n", err)continue}// 执行SQL语句for _, statement := range parsedStmt.Statements {if currentDatabase == "" {// 检查是否是非数据库操作语句_, isCreateDB := statement.(*ast.CreateDatabaseStatement)_, isShowDBs := statement.(*ast.ShowDatabasesStatement)_, isDropDB := statement.(*ast.DropDatabaseStatement)_, isUseDB := statement.(*ast.UseDatabaseStatement)_, isShowTables := statement.(*ast.ShowTablesStatement)// 如果不是允许的语句类型,则提示需要选择数据库if !isCreateDB && !isShowDBs && !isDropDB && !isUseDB && !isShowTables {fmt.Println("No database selected. Use 'USE database_name' to select a database.")continue}}switch s := statement.(type) {case *ast.CreateDatabaseStatement:if err := backend.CreateDatabase(s); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Database created successfully")}case *ast.DropDatabaseStatement:if err := backend.DropDatabase(s); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Database dropped successfully")}case *ast.ShowDatabasesStatement:result := backend.ShowDatabases()printResults(result)case *ast.ShowTablesStatement:result := backend.ShowTables(&dbContextAdapter{¤tDatabase})printResults(result)case *ast.UseDatabaseStatement:if err := backend.UseDatabase(s, &dbContextAdapter{¤tDatabase}); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Printf("Database changed to '%s'\n", currentDatabase)}case *ast.CreateTableStatement:if err := backend.CreateTable(currentDatabase, s); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Table created successfully")}case *ast.InsertStatement:if err := backend.Insert(currentDatabase, s, currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("1 row inserted")}case *ast.SelectStatement:results, err := backend.Select(currentDatabase, s, currentTxn)if err != nil {fmt.Printf("Error: %v\n", err)} else {printResults(results)}case *ast.UpdateStatement:if err := backend.Update(currentDatabase, s, currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Query OK")}case *ast.DeleteStatement:if err := backend.Delete(currentDatabase, s, currentTxn); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Query OK")}case *ast.DropTableStatement:if err := backend.DropTable(currentDatabase, s); err != nil {fmt.Printf("Error: %v\n", err)} else {fmt.Println("Table dropped successfully")}default:fmt.Printf("Unsupported statement type: %T\n", s)}}}
}func printResults(results *storage.Results) {// 计算每列的最大宽度colWidths := make([]int, len(results.Columns))for i, col := range results.Columns {colWidths[i] = len(col.Name)}for _, row := range results.Rows {for i, cell := range row {cellLen := len(cell.String())if cellLen > colWidths[i] {colWidths[i] = cellLen}}}// 打印表头fmt.Print("+")for _, width := range colWidths {fmt.Print(strings.Repeat("-", width+2))fmt.Print("+")}fmt.Println()// 打印列名fmt.Print("|")for i, col := range results.Columns {fmt.Printf(" %-*s |", colWidths[i], col.Name)}fmt.Println()// 打印分隔线fmt.Print("+")for _, width := range colWidths {fmt.Print(strings.Repeat("-", width+2))fmt.Print("+")}fmt.Println()// 打印数据行for _, row := range results.Rows {fmt.Print("|")for i, cell := range row {fmt.Printf(" %-*s |", colWidths[i], cell.String())}fmt.Println()}// 打印底部边框fmt.Print("+")for _, width := range colWidths {fmt.Print(strings.Repeat("-", width+2))fmt.Print("+")}fmt.Println()// 打印行数统计fmt.Printf("%d rows in set\n", len(results.Rows))
}// 提供命令补全功能
func completer(d prompt.Document) []prompt.Suggest {s := []prompt.Suggest{}return prompt.FilterHasPrefix(s, d.GetWordBeforeCursor(), true)
}func main() {// 添加配置文件参数configPath := flag.String("config", "config.json", "Path to config file")port := flag.String("port", "3118", "Port to listen on")flag.Parse()// 加载配置config, err := config.LoadConfig(*configPath)if err != nil {fmt.Printf("无法加载配置文件: %v\n", err)os.Exit(1)}// 初始化存储引擎switch config.Storage.Type {case "memory":backend = storage.NewMemoryBackend()case "disk":backend = storage.NewDiskBackend(config.Storage.DataPath)default:fmt.Printf("未知的存储引擎类型: %s\n", config.Storage.Type)os.Exit(1)}// 检查是否以服务器模式运行args := flag.Args()if len(args) > 0 && args[0] == "server" {// 启动服务器模式portEnv := os.Getenv("ZIYIDB_PORT")if portEnv != "" {*port = portEnv} else if config.Server.Port != "" {*port = config.Server.Port}// 类型断言获取 MemoryBackend(如果使用的是内存引擎)if memoryBackend, ok := backend.(*storage.MemoryBackend); ok {server := network.NewServer(memoryBackend, *port)fmt.Printf("Starting ZiyiDB server on port %s with %s storage...\n", *port, config.Storage.Type)log.Fatal(server.Start())} else {fmt.Println("Server mode only supports memory storage engine")os.Exit(1)}}fmt.Println("Welcome to ZiyiDB!")fmt.Println("Type your SQL commands (type 'exit' to quit)")p := prompt.New(executor,completer,prompt.OptionTitle("ZiyiDB: A Simple SQL Database"),prompt.OptionPrefix("ziyidb> "),prompt.OptionHistory(history),prompt.OptionLivePrefix(func() (string, bool) {return "ziyidb> ", true}),//实现方向键上下翻阅历史命令// 上键绑定prompt.OptionAddKeyBind(prompt.KeyBind{Key: prompt.Up,Fn: func(buf *prompt.Buffer) {if historyIndex > 0 {historyIndex--buf.DeleteBeforeCursor(len(buf.Text()))buf.InsertText(history[historyIndex], false, true)}},}),// 下键绑定prompt.OptionAddKeyBind(prompt.KeyBind{Key: prompt.Down,Fn: func(buf *prompt.Buffer) {if historyIndex < len(history)-1 {historyIndex++buf.DeleteBeforeCursor(len(buf.Text()))buf.InsertText(history[historyIndex], false, true)} else if historyIndex == len(history)-1 {historyIndex++buf.DeleteBeforeCursor(len(buf.Text()))}},}),)p.Run()
}
拓展:docker化支持
为了适应现在互联网开发的要求,ZiyiDB也需要提供docker部署的方式。
- 项目根目录下,新建Dockerfile文件
Dockerfile:
# Dockerfile
FROM golang:1.23.8-alpine AS builder# 设置ZiyiDB工作目录
WORKDIR /app# 复制go mod和sum文件
COPY go.mod go.sum ./# 下载依赖
RUN go mod download# 复制源代码
COPY . .# 编译服务端程序
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o ziyidb cmd/main.go# 编译客户端程序
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o ziyidb-cli cmd/client.go# 使用基础镜像
FROM alpine:latest# 安装ca-certificates
RUN apk --no-cache add ca-certificates# 设置工作目录
WORKDIR /root/# 从builder阶段复制编译好的程序
COPY --from=builder /app/ziyidb .
COPY --from=builder /app/ziyidb-cli .# 复制配置文件
COPY --from=builder /app/config.json .# 暴露默认端口
EXPOSE 3118# 启动服务器模式
ENTRYPOINT ["./ziyidb", "server", "-port=3118"]
- 执行docker命令打包镜像
# 在Dockerfile同一目录,构建镜像
docker build -t ziyidb .

查看是否构建成功:
docker images | grep ziyidb

3. 创建容器
# 创建容器并挂载本地目录
docker run -d \--name ziyidb \-p 3118:3118 \--restart unless-stopped \-v /Users/ziyi/GolandProjects/ZiyiDB/config.json:/root/config.json \-v /Users/ziyi/GolandProjects/ZiyiDB/data:/root/data \ziyidb# 查看容器运行状态
docker ps | grep ziyidb

PS:同样的,除了docker方式搭建,也可以编写docker-compose.yml启动。
docker-compose.yml:
services:ziyidb:build: .container_name: ziyidbports:- "3118:3118"restart: unless-stoppedenvironment:- ZIYIDB_PORT=3118
# 通过docker-compose方式启动
docker-compose up -d
效果:

- 测试功能
- 验证memory存储引擎
# 进入容器
docker exec -it ziyidb /bin/sh# 执行客户端,连接服务端(后续可优化:默认链接本地的服务端)
./ziyidb-cli localhost:3118# 测试memory存储引擎
CREATE DATABASE test_memory;
USE test_memory;
CREATE TABLE users (id INT PRIMARY KEY, name TEXT, age INT);
INSERT INTO users VALUES (1, 'Alice', 25);
SELECT * FROM users;
exit
exit# 重启容器,观察之前创建的数据是否还存在
docker restart ziyidb
docker exec -it ziyidb /bin/sh
./ziyidb-cli localhost:3118
show databases;


- 验证disk存储引擎
调整本地挂载的config.json配置文件,然后重启容器

重启容器,并插入数据:
# 重启容器
docker restart ziyidb# 进入ziyidb,连接到客户端
docker exec -it ziyidb /bin/sh
./ziyidb-cli localhost:3118# 创建数据库、表,并插入数据
CREATE DATABASE test_disk;
USE test_disk;
CREATE TABLE users (id INT PRIMARY KEY, name TEXT, age INT);
INSERT INTO users VALUES (1, 'Alice', 25);
SELECT * FROM users;

观察挂载的本地磁盘是否有对应目录生成:

重启ziyidb,验证之前插入的数据是否还能查询:
# 进入ziyidb
docker exec -it ziyidb /bin/sh -c "./ziyidb-cli localhost:3118"
# 查询之前数据
show databases;
use test_disk;
select * from users;

5. 打包镜像到docker-hub
在Dockerfile同级目录执行下面命令:
# ziyigun是我dockerhub的用户名
# ziyidb是镜像名,latest是版本号,代表最新
docker build -t ziyigun/ziyidb:latest .# 查询是否打包成功
docker images | grep ziyigun


登录dockerhub:
# 登录dockerhub
docker login


推送镜像到远程仓库:
# 推送到dockerhub
docker push ziyigun/ziyidb:latest

在仓库搜索ziyidb,查看是否能搜索到镜像:https://hub.docker.com/search?q=ziyidb

参考文章:
https://blog.csdn.net/weixin_45565886/article/details/154035863
https://dev.mysql.com/doc/refman/8.0/en/system-schema.html
https://dev.mysql.com/doc/refman/8.0/en/mysql-nutshell.html#mysql-nutshell-ddl
