当前位置: 首页 > news >正文

Jaeger开源分布式追踪平台深度剖析(三)Jaeger默认存储Badger原理剖析

Badger Value Log值日志详解

概述

Value Log是Badger实现键值分离(Key-Value Separation)的核心组件。它将大值存储在独立的日志文件中,LSM树只存储键和值指针,这种设计显著减少了写放大和提高了性能。

核心设计思想

1. 键值分离机制

Small Values (< ValueThreshold): Key + Value → LSM TreeLarge Values (>= ValueThreshold):Key + ValuePointer → LSM TreeValue → Value LogValuePointer格式:[Fid(4B)] [Len(4B)] [Offset(4B)]

2. Value Log结构

type valueLog struct {dirPath              string                  // 目录路径filesLock            sync.RWMutex           // 文件锁filesMap             map[uint32]*logFile    // 文件映射maxFid               uint32                 // 最大文件IDfilesToBeDeleted     []uint32               // 待删除文件numActiveIterators   atomic.Int32           // 活跃迭代器数量writableLogOffset    atomic.Uint32          // 可写偏移numEntriesWritten    uint32                 // 已写入条目数discardStats         *discardStats          // 垃圾回收统计garbageCh            chan struct{}          // 垃圾回收通道
}

Value Log文件格式

1. 文件头部

+----------------+------------------+
| keyID(8 bytes) |  baseIV(12 bytes)|
+----------------+------------------+

2. 记录格式

+--------+--------+--------+--------+--------+--------+
| Header | KeyLen | Key    | ValLen | Value  | CRC32  |
+--------+--------+--------+--------+--------+--------+
|   ?B   |   4B   |  Var   |   4B   |  Var   |   4B   |

3. Header详细格式

type header struct {klen      uint32    // 键长度vlen      uint32    // 值长度expiresAt uint64    // 过期时间meta      byte      // 元数据标志userMeta  byte      // 用户元数据
}

写入机制

1. 值阈值判断

func (db *DB) valueThreshold() int64 {if db.threshold != nil {return db.threshold.valueThreshold.Load()}return db.opt.ValueThreshold
}

2. 写入流程

func (vlog *valueLog) write(reqs []*request) error {vlog.filesLock.RLock()maxFid := vlog.maxFidcurlf := vlog.filesMap[maxFid]vlog.filesLock.RUnlock()toDisk := func() error {if err := curlf.doneWriting(vlog.writableLogOffset.Load()); err != nil {return err}y.NumBytesWrittenToL0Add(vlog.opt.MetricsEnabled, int64(vlog.writableLogOffset.Load()))return nil}for i := range reqs {b := reqs[i]b.Ptrs = b.Ptrs[:0]for j := range b.Entries {e := b.Entries[j]var p valuePointerif shouldWriteValueToLSM(e, vlog.db.valueThreshold()) {// 小值直接存储在LSM中p.Len = uint32(len(e.Value))b.Ptrs = append(b.Ptrs, p)continue}// 大值存储在Value Log中p.Fid = curlf.fidp.Offset = vlog.woffset()p.Len = uint32(len(e.Value))b.Ptrs = append(b.Ptrs, p)// 写入Value Logbuf := vlog.encodeEntry(e, p.Offset)copy(curlf.Data[p.Offset:], buf)vlog.writableLogOffset.Add(uint32(len(buf)))}}return toDisk()
}

3. 条目编码

func (vlog *valueLog) encodeEntry(e *Entry, offset uint32) []byte {h := header{klen:      uint32(len(e.Key)),vlen:      uint32(len(e.Value)),expiresAt: e.ExpiresAt,meta:      e.meta,userMeta:  e.UserMeta,}// 计算总大小size := h.Size() + len(e.Key) + len(e.Value) + crc32.Sizebuf := make([]byte, size)written := h.EncodeTo(buf)copy(buf[written:], e.Key)written += len(e.Key)copy(buf[written:], e.Value)written += len(e.Value)// 计算并写入CRC32hash := crc32.New(y.CastagnoliCrcTable)hash.Write(buf[:written])checksum := hash.Sum32()y.U32ToBytes(buf[written:], checksum)return buf
}

读取机制

1. 值指针解码

type valuePointer struct {Fid    uint32  // 文件IDLen    uint32  // 值长度Offset uint32  // 文件内偏移
}func (p *valuePointer) Decode(b []byte) {p.Fid = binary.BigEndian.Uint32(b[:4])p.Len = binary.BigEndian.Uint32(b[4:8])p.Offset = binary.BigEndian.Uint32(b[8:12])
}

2. 读取流程

func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {// 获取目标文件lf, err := vlog.getFileRLocked(vp)if err != nil {return nil, nil, err}// 读取数据buf, err := lf.read(vp)if err != nil {return nil, nil, err}// 验证和解密reader := &safeRead{recordOffset: vp.Offset,lf:          lf,}entry, err := reader.Entry(bytes.NewReader(buf))if err != nil {return nil, nil, err}return entry.Value, func() { /* unlock callback */ }, nil
}

3. 缓存友好读取

func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {lf, err := vlog.getFileRLocked(vp)if err != nil {return nil, nil, err}// 使用mmap直接读取offset := vp.Offsetif offset >= uint32(len(lf.Data)) {return nil, lf, fmt.Errorf("Invalid value pointer offset: %d", offset)}return lf.Data[offset:], lf, nil
}

垃圾回收机制

1. 丢弃统计

type discardStats struct {sync.MutexvaluesLock  sync.RWMutexvalues      map[uint32]int64  // 每个文件的丢弃字节数bytesRead   atomic.Int64      // 读取字节数fileSize    int64             // 文件大小
}func (ds *discardStats) Update(fid uint32, discard int64) {ds.valuesLock.Lock()ds.values[fid] += discardds.valuesLock.Unlock()
}

2. GC触发条件

func (vlog *valueLog) pickLog(discardRatio float64) *logFile {vlog.filesLock.RLock()defer vlog.filesLock.RUnlock()candidate := struct {fid            uint32discardRatio   float64staleDataSize  int64}{}for fid, lf := range vlog.filesMap {if fid >= vlog.maxFid {continue  // 跳过当前写入文件}staleDataSize := vlog.discardStats.StaleDataSize(fid)totalSize := lf.size.Load()if totalSize == 0 {continue}ratio := float64(staleDataSize) / float64(totalSize)if ratio > discardRatio && ratio > candidate.discardRatio {candidate.fid = fidcandidate.discardRatio = ratiocandidate.staleDataSize = staleDataSize}}if candidate.fid != 0 {return vlog.filesMap[candidate.fid]}return nil
}

3. GC执行流程

func (vlog *valueLog) doRunGC(lf *logFile) error {// 统计有效数据var validEntries []*Entryvar totalValidSize int64err := lf.iterate(true, 0, func(e Entry, vp valuePointer) error {// 检查entry是否仍然有效vs, err := vlog.db.get(e.Key)if err != nil {return err}if !discardEntry(e, vs, vlog.db) {validEntries = append(validEntries, &e)totalValidSize += int64(vp.Len)}return nil})if err != nil {return err}vlog.opt.Infof("GC rewriting fid: %d, valid entries: %d, valid size: %d", lf.fid, len(validEntries), totalValidSize)// 重写有效数据到新的Value Log文件if len(validEntries) > 0 {return vlog.rewrite(validEntries)}// 标记文件为删除vlog.filesLock.Lock()vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, lf.fid)vlog.filesLock.Unlock()return nil
}

4. 数据重写

func (vlog *valueLog) rewrite(entries []*Entry) error {// 创建重写请求req := &request{Entries: entries,Ptrs:    make([]valuePointer, len(entries)),}req.Wg.Add(1)// 写入新的Value Logerr := vlog.write([]*request{req})if err != nil {return err}// 更新LSM中的值指针txn := vlog.db.NewTransaction(true)defer txn.Discard()for i, e := range entries {vp := req.Ptrs[i]if err := txn.SetEntry(&Entry{Key:   e.Key,Value: vp.Encode(),Meta:  bitValuePointer,}); err != nil {return err}}return txn.Commit()
}

文件管理

1. 文件创建

func (vlog *valueLog) createVlogFile() (*logFile, error) {fid := atomic.AddUint32(&vlog.maxFid, 1)fname := vlog.fpath(fid)mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR, vlog.opt.ValueLogFileSize)if err != nil {return nil, err}lf := &logFile{MmapFile: mf,fid:      fid,path:     fname,opt:      vlog.opt,}// 写入文件头if err := lf.bootstrap(); err != nil {return nil, err}vlog.filesLock.Lock()vlog.filesMap[fid] = lfvlog.filesLock.Unlock()return lf, nil
}

2. 文件轮转

func (vlog *valueLog) sync() error {vlog.filesLock.RLock()curlf := vlog.filesMap[vlog.maxFid]vlog.filesLock.RUnlock()// 检查当前文件是否需要轮转if vlog.woffset() > vlog.opt.ValueLogFileSize-uint32(maxHeaderSize) {if err := curlf.doneWriting(vlog.woffset()); err != nil {return err}// 创建新文件newlf, err := vlog.createVlogFile()if err != nil {return err}vlog.writableLogOffset.Store(vlogHeaderSize)vlog.numEntriesWritten = 0}return curlf.Sync()
}

3. 文件删除

func (vlog *valueLog) deleteLogFile(lf *logFile) error {// 检查是否还有活跃迭代器if vlog.numActiveIterators.Load() > 0 {return nil  // 延迟删除}vlog.filesLock.Lock()delete(vlog.filesMap, lf.fid)vlog.filesLock.Unlock()// 删除文件if err := lf.Delete(); err != nil {return err}vlog.opt.Infof("Deleted value log file: %s", lf.path)return nil
}

动态阈值调整

1. 阈值监控

type vlogThreshold struct {percentile     float64           // 百分位数valueThreshold atomic.Int64      // 当前阈值valueCh        chan []int64      // 值大小通道vlMetrics      *z.HistogramData  // 直方图数据
}func (v *vlogThreshold) update(sizes []int64) {v.vlMetrics.Update(sizes)// 根据百分位数计算新阈值newThreshold := v.vlMetrics.Percentile(v.percentile)v.valueThreshold.Store(int64(newThreshold))
}

2. 自适应调整

func (v *vlogThreshold) listenForValueThresholdUpdate() {defer v.closer.Done()ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for {select {case sizes := <-v.valueCh:v.update(sizes)case <-v.clearCh:v.vlMetrics.Clear()case <-ticker.C:// 定期调整阈值if v.vlMetrics.Count() > 1000 {v.update(nil)}case <-v.closer.HasBeenClosed():return}}
}

性能优化

1. 批量写入

func (vlog *valueLog) write(reqs []*request) error {// 合并多个请求减少系统调用totalSize := uint32(0)for _, req := range reqs {totalSize += estimateRequestSize(req)}// 预分配缓冲区buf := make([]byte, totalSize)written := 0for _, req := range reqs {for _, e := range req.Entries {entryBuf := vlog.encodeEntry(e, offset)copy(buf[written:], entryBuf)written += len(entryBuf)}}// 一次性写入return vlog.writeBuffer(buf)
}

2. 内存映射优化

func (lf *logFile) read(vp valuePointer) ([]byte, error) {// 直接从mmap内存读取,无需系统调用if vp.Offset >= uint32(len(lf.Data)) {return nil, fmt.Errorf("Invalid offset")}return lf.Data[vp.Offset:vp.Offset+vp.Len], nil
}

3. 并发控制优化

func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {vlog.filesLock.RLock()defer vlog.filesLock.RUnlock()RES (lf, ok := vlog.filesMap[vp.Fid]if !ok {return nil, fmt.Errorf("File not found: %d", vp.Fid)}return lf, nil
}

配置建议

1. 值阈值设置

// 小文件多读场景
ValueThreshold: 32      // 32字节// 大文件少读场景  
ValueThreshold: 2048    // 2KB// 自适应模式
VLogPercentile: 0.95    // 95%分位数

2. 文件大小配置

// SSD环境
ValueLogFileSize: 1 << 30   // 1GB// HDD环境
ValueLogFileSize: 2 << 30   // 2GB// 内存有限环境
ValueLogFileSize: 128 << 20  // 128MB

3. GC配置

// 激进GC
GCDiscardRatio: 0.3     // 30%垃圾时触发// 保守GC  
GCDiscardRatio: 0.7     // 70%垃圾时触发

Value Log的设计是Badger性能优异的关键因素,通过键值分离有效减少了写放大,同时通过智能的垃圾回收机制保证了空间利用率。理解其工作原理对于优化Badger在不同场景下的性能表现非常重要。

Badger事务与并发控制详解

概述

Badger采用MVCC(Multi-Version Concurrency Control)实现事务并发控制,支持快照隔离(Snapshot Isolation)级别的事务。通过Oracle组件管理时间戳分配和冲突检测,实现了高并发的读写操作。

核心组件

1. Oracle时间戳管理器

type oracle struct {isManaged       bool              // 是否托管模式detectConflicts bool              // 是否开启冲突检测nextTxnTs       uint64            // 下一个事务时间戳writeChLock     sync.Mutex        // 写入通道锁txnMark         *y.WaterMark      // 事务水位标记readMark        *y.WaterMark      // 读取水位标记discardTs       uint64            // 丢弃时间戳committedTxns   []committedTxn    // 已提交事务列表lastCleanupTs   uint64            // 最后清理时间戳closer          *z.Closer         // 关闭器
}

2. Transaction事务结构

type Txn struct {readTs          uint64                 // 读取时间戳commitTs        uint64                 // 提交时间戳size            int64                  // 事务大小count           int64                  // 条目数量db              *DB                    // 数据库引用reads           []uint64               // 读取键指纹conflictKeys    map[uint64]struct{}    // 冲突键指纹readsLock       sync.Mutex             // 读取锁pendingWrites   map[string]*Entry      // 待写入条目duplicateWrites []*Entry               // 重复写入条目numIterators    atomic.Int32           // 迭代器数量discarded       bool                   // 是否已丢弃doneRead        bool                   // 是否完成读取update          bool                   // 是否更新事务
}

MVCC机制详解

1. 时间戳分配

func (o *oracle) readTs() uint64 {if o.isManaged {panic("ReadTs should not be retrieved for managed DB")}var readTs uint64o.Lock()readTs = o.nextTxnTs - 1o.readMark.Begin(readTs)  // 开始读取标记o.Unlock()// 等待所有无冲突事务完成写入y.Check(o.txnMark.WaitForMark(context.Background(), readTs))return readTs
}func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {o.Lock()defer o.Unlock()// 检查冲突if o.hasConflict(txn) {return 0, true  // 返回冲突标志}var ts uint64if !o.isManaged {o.doneRead(txn)o.cleanupCommittedTransactions()// 分配新的提交时间戳ts = o.nextTxnTso.nextTxnTs++o.txnMark.Begin(ts)} else {ts = txn.commitTs  // 托管模式使用预设时间戳}// 记录已提交事务用于冲突检测if o.detectConflicts {o.committedTxns = append(o.committedTxns, committedTxn{ts:           ts,conflictKeys: txn.conflictKeys,})}return ts, false
}

2. 版本可见性规则

// 版本可见性判断
func isVisible(itemVersion, readTs uint64) bool {return itemVersion <= readTs
}// 在Get操作中的应用
func (txn *Txn) Get(key []byte) (item *Item, rerr error) {if len(key) == 0 {return nil, ErrEmptyKey}if txn.discarded {return nil, ErrDiscardedTxn}// 1. 首先检查事务内的写入缓存if txn.update {if e, has := txn.pendingWrites[string(key)]; has {if isDeletedOrExpired(e.meta, e.ExpiresAt) {return nil, ErrKeyNotFound}return &Item{e: e, txn: txn}, nil}}// 2. 添加到读取集合(用于冲突检测)txn.addReadKey(key)// 3. 从数据库读取,只读取版本 <= readTs 的数据seek := y.KeyWithTs(key, txn.readTs)vs, err := txn.db.get(seek)if err != nil {return nil, y.Wrapf(err, "DB::Get key: %q", key)}if vs.Meta&bitDelete > 0 {return nil, ErrKeyNotFound}item = &Item{key:   key,vptr:  vs.Value,meta:  vs.Meta,userMeta: vs.UserMeta,expiresAt: vs.ExpiresAt,version: y.ParseTs(vs.Value),txn:   txn,db:    txn.db,}return item, nil
}

冲突检测机制

1. 冲突键跟踪

func (txn *Txn) addReadKey(key []byte) {if !txn.update || !txn.db.opt.DetectConflicts {return}txn.readsLock.Lock()defer txn.readsLock.Unlock()fp := z.MemHash(key)// 添加到读取集合txn.reads = append(txn.reads, fp)
}func (txn *Txn) modify(e *Entry) error {if txn.discarded {return ErrDiscardedTxn}if !txn.update {return ErrReadOnlyTxn}// 记录写入键用于冲突检测if txn.db.opt.DetectConflicts {fp := z.MemHash(e.Key)if txn.conflictKeys == nil {txn.conflictKeys = make(map[uint64]struct{})}txn.conflictKeys[fp] = struct{}{}}txn.pendingWrites[string(e.Key)] = ereturn nil
}

2. 冲突检测算法

func (o *oracle) hasConflict(txn *Txn) bool {if len(txn.reads) == 0 {return false}for _, committedTxn := range o.committedTxns {// 如果已提交事务的时间戳 <= 当前事务的读时间戳// 说明已提交事务在当前事务开始前完成,无需检查冲突if committedTxn.ts <= txn.readTs {continue}// 检查读写冲突:当前事务读取的键是否被后续事务写入for _, readKey := range txn.reads {if _, has := committedTxn.conflictKeys[readKey]; has {return true  // 发现冲突}}}return false
}

3. 冲突处理策略

func (txn *Txn) Commit() error {// 预检查if err := txn.commitPrecheck(); err != nil {return err}// 尝试获取提交时间戳commitTs, conflict := txn.db.orc.newCommitTs(txn)if conflict {// 发生冲突,回滚事务return ErrConflict}txn.commitTs = commitTs// 执行提交callback, err := txn.commitAndSend()if err != nil {return err}// 等待写入完成return callback()
}

事务生命周期

1. 事务创建

func (db *DB) NewTransaction(update bool) *Txn {return db.newTransaction(update, false)
}func (db *DB) newTransaction(update, isManaged bool) *Txn {if db.IsClosed() {panic(ErrDBClosed)}txn := &Txn{update:        update,db:            db,count:         1,                    // 自身占用一个计数size:          int64(len(txnKey)),   // 事务标记键的大小discarded:     false,pendingWrites: make(map[string]*Entry),}if !isManaged {txn.readTs = db.orc.readTs()  // 分配读时间戳}return txn
}

2. 写入操作

func (txn *Txn) Set(key, val []byte) error {e := NewEntry(key, val)return txn.SetEntry(e)
}func (txn *Txn) SetEntry(e *Entry) error {return txn.modify(e)
}func (txn *Txn) Delete(key []byte) error {e := NewEntry(key, nil).WithMeta(bitDelete)return txn.modify(e)
}

3. 提交过程

func (txn *Txn) commitAndSend() (func() error, error) {// 1. 构建写入条目var entries []*Entryfor _, e := range txn.pendingWrites {// 设置版本信息e.Key = y.KeyWithTs(e.Key, txn.commitTs)e.meta |= bitTxn  // 标记为事务条目entries = append(entries, e)}// 2. 添加事务结束标记e := &Entry{Key:   y.KeyWithTs(txnKey, txn.commitTs),meta:  bitFinTxn,}entries = append(entries, e)// 3. 发送到写入通道req, err := txn.db.sendToWriteCh(entries)if err != nil {return nil, err}// 4. 返回等待函数ret := func() error {err := req.Wait()// 标记事务完成txn.db.orc.doneCommit(txn.commitTs)return err}return ret, nil
}

4. 事务清理

func (txn *Txn) Discard() {if txn.discarded {return}// 等待所有迭代器关闭if atomic.LoadInt32(&txn.numIterators) > 0 {panic("Unclosed iterator at time of Txn.Discard.")}txn.discarded = trueif !txn.db.orc.isManaged {txn.db.orc.doneRead(txn)  // 标记读取完成}
}

WaterMark水位标记

1. WaterMark机制

// WaterMark用于跟踪进行中的操作
type WaterMark struct {Name     stringmarkIdx  uint64doneUntil uint64waiters   map[uint64][]chan struct{}elog      trace.EventLoglock      sync.Mutex
}func (w *WaterMark) Begin(index uint64) {w.lock.Lock()defer w.lock.Unlock()w.markIdx = indexw.waiters[index] = make([]chan struct{}, 0)
}func (w *WaterMark) Done(index uint64) {w.lock.Lock()defer w.lock.Unlock()// 通知等待者if chs, ok := w.waiters[index]; ok {for _, ch := range chs {close(ch)}delete(w.waiters, index)}// 更新doneUntilif index == w.doneUntil+1 {w.doneUntil = index// 连续更新doneUntilfor {if _, ok := w.waiters[w.doneUntil+1]; !ok {w.doneUntil++} else {break}}}
}

2. 垃圾回收支持

func (o *oracle) discardAtOrBelow() uint64 {if o.isManaged {o.Lock()defer o.Unlock()return o.discardTs}// 返回所有读取操作都已完成的最大时间戳return o.readMark.DoneUntil()
}func (o *oracle) cleanupCommittedTransactions() {if !o.detectConflicts {return}// 清理过期的已提交事务记录discardBelow := o.discardAtOrBelow()if discardBelow > o.lastCleanupTs {// 移除时间戳 <= discardBelow 的事务var newCommittedTxns []committedTxnfor _, txn := range o.committedTxns {if txn.ts > discardBelow {newCommittedTxns = append(newCommittedTxns, txn)}}o.committedTxns = newCommittedTxnso.lastCleanupTs = discardBelow}
}

托管事务模式

1. 托管vs非托管

// 非托管模式:Badger自动分配时间戳
func (db *DB) Update(fn func(txn *Txn) error) error {txn := db.NewTransaction(true)defer txn.Discard()if err := fn(txn); err != nil {return err}return txn.Commit()  // 自动分配commitTs
}// 托管模式:用户控制时间戳
func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {wb := db.NewWriteBatch()wb.commitTs = commitTsreturn wb
}

2. 托管模式优势

  • 确定性重放:用户控制时间戳,便于复制和恢复
  • 批量操作:可以批量提交多个事务
  • 外部一致性:与外部系统保持时间戳一致性

读写隔离级别

1. 快照隔离保证

// 事务看到的是readTs时刻的快照
func (txn *Txn) Get(key []byte) (*Item, error) {// 只能看到版本 <= readTs 的数据seek := y.KeyWithTs(key, txn.readTs)vs, err := txn.db.get(seek)// ...
}// 写入时使用commitTs作为版本
func (txn *Txn) commitAndSend() {for _, e := range txn.pendingWrites {e.Key = y.KeyWithTs(e.Key, txn.commitTs)// ...}
}

2. 读一致性

  • 单调读:同一事务内多次读取同一键得到相同结果
  • 读已提交:只能读取到已提交的数据
  • 快照隔离:事务开始时获得数据库快照

性能优化

1. 批量操作

func (db *DB) Update(fn func(txn *Txn) error) error {// 自动管理事务生命周期txn := db.NewTransaction(true)defer txn.Discard()return fn(txn)
}// 批量写入
type WriteBatch struct {txn       *TxncommitTs  uint64entries   []*Entry
}func (wb *WriteBatch) Flush() error {// 批量提交所有操作return wb.txn.Commit()
}

2. 冲突检测优化

// 可选择性启用冲突检测
opt := DefaultOptions(path)
opt.DetectConflicts = false  // 禁用以提高性能// 使用指纹而非完整键进行冲突检测
fp := z.MemHash(key)  // 64位指纹,降低内存使用

3. 内存管理

// 限制事务大小
func (txn *Txn) checkSize(e *Entry) error {count := int64(len(e.Key) + len(e.Value) + 1)if txn.count+count >= txn.db.opt.maxBatchCount {return ErrTxnTooBig}if txn.size+count >= txn.db.opt.maxBatchSize {return ErrTxnTooBig}txn.count += counttxn.size += int64(e.EstimateSize(txn.db.opt.ValueThreshold))return nil
}

配置参数

1. 冲突检测配置

opt.DetectConflicts = true   // 启用冲突检测
opt.NumCompactors = 2        // 压缩线程数影响清理速度

2. 事务大小限制

opt.MaxBatchCount = 100000   // 最大批次条目数
opt.MaxBatchSize = 15MB      // 最大批次大小

3. 托管模式配置

opt.ManagedTxns = true       // 启用托管事务模式

Badger的MVCC实现提供了高性能的并发控制,通过时间戳排序和冲突检测保证了事务的ACID特性,同时支持高并发的读写操作。理解其工作原理有助于正确使用事务功能并优化应用性能。

Badger SSTable文件格式详解

概述

SSTable(Sorted String Table)是Badger中的不可变数据文件格式,存储经过排序的键值对。每个SSTable文件都包含数据块、索引、布隆过滤器等组件,通过精心设计的格式实现高效的查找和压缩。

文件整体结构

+================+
|   Data Blocks  |  ← 数据块区域(压缩)
|      ...       |
+================+
|   Index Block  |  ← 索引块(FlatBuffer格式)
+================+
| Bloom Filter   |  ← 布隆过滤器(可选)
+================+
|   Checksum     |  ← 文件校验和(8字节)
+================+
|  Index Offset  |  ← 索引偏移量(8字节)
+================+
|  Index Length  |  ← 索引长度(4字节)
+================+
|  Footer Magic  |  ← 文件魔数(4字节)
+================+

数据块(Data Blocks)

1. 块内结构

Block内部格式:
+--------+--------+-----+--------+--------+-----+
| Entry1 | Entry2 | ... | EntryN | Restart| CRC |
+--------+--------+-----+--------+--------+-----+Entry格式:
+----------+----------+----------+----------+----------+
| SharedLen| UnsharedLen| ValueLen | Key     | Value   |
+----------+----------+----------+----------+----------+
|    4B    |     4B    |    4B    |   Var   |   Var   |

2. 键压缩机制

// 前缀压缩减少存储空间
type blockBuilder struct {data            []byterestarts        []uint32  // 重启点偏移counter         int       // 当前条目计数prevKey         []byte    // 前一个键restartInterval int       // 重启间隔
}func (b *blockBuilder) Add(key, value []byte, valuePointer bool) {// 计算与前一个键的共同前缀长度shared := 0if b.counter < b.restartInterval {shared = b.sharedPrefixLen(b.prevKey, key)}unshared := len(key) - shared// 编码条目b.append4Byte(uint32(shared))     // 共享前缀长度b.append4Byte(uint32(unshared))   // 非共享部分长度b.append4Byte(uint32(len(value))) // 值长度b.data = append(b.data, key[shared:]...)  // 非共享键部分b.data = append(b.data, value...)         // 值if b.counter == 0 || b.counter >= b.restartInterval {// 设置重启点b.restarts = append(b.restarts, uint32(len(b.data)))b.counter = 0}b.prevKey = append(b.prevKey[:0], key...)b.counter++
}

3. 块尾部信息

// 块结束时添加重启点信息
func (b *blockBuilder) Finish() []byte {// 1. 添加重启点数组for _, restart := range b.restarts {b.append4Byte(restart)}// 2. 添加重启点数量b.append4Byte(uint32(len(b.restarts)))// 3. 计算并添加CRC32校验和checksum := crc32.Checksum(b.data, y.CastagnoliCrcTable)b.append4Byte(checksum)return b.data
}

索引结构(FlatBuffer)

1. TableIndex定义

// table.fbs
namespace fb;table BlockOffset {key: [ubyte];     // 块的第一个键offset: uint64;   // 块在文件中的偏移len: uint32;      // 块的长度
}table TableIndex {offsets: [BlockOffset];     // 块偏移数组bloom_filter: [ubyte];      // 布隆过滤器数据max_version: uint64;        // 最大版本号key_count: uint32;          // 键数量uncompressed_size: uint32;  // 未压缩大小on_disk_size: uint32;       // 磁盘大小stale_data_size: uint32;    // 过期数据大小
}

2. 索引构建

func (b *Builder) buildIndex() *fb.TableIndexT {index := &fb.TableIndexT{Offsets:          make([]*fb.BlockOffsetT, 0, len(b.blockList)),BloomFilter:      b.bloom.JSONMarshal(),MaxVersion:       b.maxVersion,KeyCount:         uint32(b.keyCount),UncompressedSize: uint32(b.uncompressedSize),OnDiskSize:       uint32(len(b.buf)),}// 构建块偏移数组for _, block := range b.blockList {offset := &fb.BlockOffsetT{Key:    block.firstKey,Offset: uint64(block.offset),Len:    uint32(block.len),}index.Offsets = append(index.Offsets, offset)}return index
}

3. 索引序列化

func (b *Builder) finishIndex() []byte {// 使用FlatBuffer序列化索引builder := flatbuffers.NewBuilder(1024)index := b.buildIndex()indexOffset := index.Pack(builder)builder.Finish(indexOffset)return builder.FinishedBytes()
}

布隆过滤器

1. 布隆过滤器构建

type Bloom struct {bitmap   []byte    // 位图k        uint8     // 哈希函数数量numBits  uint32    // 位数numKeys  uint32    // 键数量
}func NewBloomFilter(numEntries int, fp float64) *Bloom {// 计算最优参数bitsPerKey := -1.44 * math.Log2(fp)  // 每个键需要的位数numBits := uint32(float64(numEntries) * bitsPerKey)numHashFuncs := uint8(bitsPerKey * 0.693)  // ln(2)return &Bloom{bitmap:  make([]byte, (numBits+7)/8),k:       numHashFuncs,numBits: numBits,}
}func (bloom *Bloom) Add(key []byte) {h := z.MemHash(key)delta := h>>17 | h<<15  // 第二个哈希函数for i := uint8(0); i < bloom.k; i++ {bitPos := h % bloom.numBitsbloom.bitmap[bitPos/8] |= 1 << (bitPos % 8)h += delta}
}func (bloom *Bloom) MayContain(key []byte) bool {h := z.MemHash(key)delta := h>>17 | h<<15for i := uint8(0); i < bloom.k; i++ {bitPos := h % bloom.numBitsif bloom.bitmap[bitPos/8]&(1<<(bitPos%8)) == 0 {return false  // 肯定不存在}h += delta}return true  // 可能存在
}

2. 布隆过滤器优化

// 分块布隆过滤器,减少缓存缺失
type BlockedBloom struct {data       []bytenumProbes  intnumBlocks  uint32blockMask  uint32
}func (bf *BlockedBloom) MayContain(key []byte) bool {h := z.MemHash(key)// 选择块blockIdx := (h >> 11 | h << 21) & bf.blockMaskblock := bf.data[blockIdx*32 : (blockIdx+1)*32]  // 32字节块// 在块内进行多次探测for i := 0; i < bf.numProbes; i++ {bitPos := h & 255  // 块内位置(0-255)if block[bitPos/8]&(1<<(bitPos%8)) == 0 {return false}h += h>>17 | h<<15}return true
}

压缩机制

1. 块级压缩

func (b *Builder) finishBlock() error {if b.curBlock.IsEmpty() {return nil}// 获取原始数据data := b.curBlock.Finish()var compressed []bytevar compressionType uint32switch b.opts.Compression {case options.None:compressed = datacompressionType = 0case options.Snappy:compressed = snappy.Encode(nil, data)compressionType = 1case options.ZSTD:compressed = zstd.Compress(nil, data)compressionType = 2}// 选择压缩效果更好的版本if len(compressed) < len(data) {data = compressed} else {compressionType = 0  // 使用未压缩版本}// 写入压缩类型标记data = append(data, byte(compressionType))b.writeBlock(data)return nil
}

2. 压缩策略

// 自适应压缩:根据数据特征选择压缩算法
func (b *Builder) chooseCompression(data []byte) options.CompressionType {if len(data) < 1024 {return options.None  // 小块不压缩}// 计算数据熵entropy := calculateEntropy(data)if entropy > 7.5 {return options.None  // 高熵数据压缩效果差}if entropy > 6.0 {return options.Snappy  // 中等熵使用Snappy}return options.ZSTD  // 低熵使用ZSTD
}

读取机制

1. 表打开流程

func OpenTable(mf *z.MmapFile, opts table.Options) (*Table, error) {t := &Table{mf:   mf,opts: opts,}// 1. 读取并验证Footerif err := t.readFooter(); err != nil {return nil, err}// 2. 读取索引if err := t.readIndex(); err != nil {return nil, err}// 3. 验证校验和if err := t.verifyChecksum(); err != nil {return nil, err}return t, nil
}

2. 索引查找

func (t *Table) search(key []byte, maxVs *y.ValueStruct) (y.ValueStruct, error) {// 1. 布隆过滤器预检查if t.index.BloomFilter != nil {if !t.bloomFilter.MayContain(key) {return y.ValueStruct{}, ErrKeyNotFound}}// 2. 二分查找确定块blockIdx := t.findBlock(key)if blockIdx < 0 {return y.ValueStruct{}, ErrKeyNotFound}// 3. 从缓存或磁盘读取块block, err := t.getBlock(blockIdx)if err != nil {return y.ValueStruct{}, err}// 4. 在块内查找return block.search(key, maxVs)
}

3. 块级缓存

type Table struct {mf          *z.MmapFileindex       *fb.TableIndexbloomFilter *BloomblockCache  *ristretto.Cache  // 块缓存
}func (t *Table) getBlock(idx int) (*Block, error) {// 1. 尝试从缓存获取if cached, found := t.blockCache.Get(t.blockCacheKey(idx)); found {return cached.(*Block), nil}// 2. 从磁盘读取offset := t.index.Offsets[idx]data := t.mf.Data[offset.Offset:offset.Offset+uint64(offset.Len)]// 3. 解压缩block, err := t.decompressBlock(data)if err != nil {return nil, err}// 4. 缓存块t.blockCache.Set(t.blockCacheKey(idx), block, int64(len(data)))return block, nil
}

迭代器实现

1. 表迭代器

type Iterator struct {t           *TableblockIdx    intblockIter   *blockIteratorerr         errorreversed    bool
}func (it *Iterator) seekToFirst() {it.blockIdx = 0it.loadBlock()if it.blockIter != nil {it.blockIter.seekToFirst()}
}func (it *Iterator) Next() {if it.blockIter != nil {it.blockIter.Next()if !it.blockIter.Valid() {// 当前块已结束,移动到下一块it.blockIdx++it.loadBlock()if it.blockIter != nil {it.blockIter.seekToFirst()}}}
}

2. 块内迭代器

type blockIterator struct {data         []byte    // 块数据restarts     []uint32  // 重启点offset       uint32    // 当前偏移key          []byte    // 当前键value        []byte    // 当前值entryOffset  uint32    // 条目偏移
}func (it *blockIterator) parseNext() {if it.offset >= uint32(len(it.data)) {return}// 读取条目头部shared := binary.LittleEndian.Uint32(it.data[it.offset:])it.offset += 4unshared := binary.LittleEndian.Uint32(it.data[it.offset:])it.offset += 4valueLen := binary.LittleEndian.Uint32(it.data[it.offset:])it.offset += 4// 重构完整键it.key = it.key[:shared]it.key = append(it.key, it.data[it.offset:it.offset+unshared]...)it.offset += unshared// 读取值it.value = it.data[it.offset:it.offset+valueLen]it.offset += valueLen
}

性能优化

1. 预取优化

func (t *Table) prefetchBlocks(startIdx, count int) {for i := 0; i < count && startIdx+i < len(t.index.Offsets); i++ {idx := startIdx + igo func(blockIdx int) {t.getBlock(blockIdx)  // 异步预取}(idx)}
}

2. 内存对齐

// 确保关键数据结构内存对齐
type alignedBlock struct {_        [0]uint64  // 确保8字节对齐data     []bytechecksum uint32_        [4]byte    // 填充到8字节边界
}

3. SIMD优化(布隆过滤器)

// 使用SIMD指令优化布隆过滤器
func (bf *Bloom) mayContainSIMD(key []byte) bool {// 在支持的平台上使用SIMD指令return bf.mayContainAVX2(key)
}

文件管理

1. 原子创建

func CreateTable(fname string, builder *Builder) (*Table, error) {// 1. 写入临时文件tmpName := fname + ".tmp"bd := builder.Done()mf, err := z.OpenMmapFile(tmpName, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size)if err != nil {return nil, err}// 2. 写入数据written := bd.Copy(mf.Data)if written != len(mf.Data) {return nil, fmt.Errorf("written %d != expected %d", written, len(mf.Data))}// 3. 同步到磁盘if err := mf.Sync(); err != nil {return nil, err}// 4. 原子重命名if err := os.Rename(tmpName, fname); err != nil {return nil, err}return OpenTable(mf, builder.opts)
}

2. 错误恢复

func (t *Table) verifyIntegrity() error {// 1. 检查魔数if t.footerMagic != tableFooterMagic {return ErrInvalidMagic}// 2. 验证索引校验和if err := t.verifyIndexChecksum(); err != nil {return err}// 3. 抽样验证数据块return t.verifyDataBlocks()
}

配置参数

1. 块大小配置

BlockSize: 4096        // 4KB,适合SSD
BlockSize: 16384       // 16KB,适合HDD

2. 压缩配置

Compression: options.ZSTD     // 高压缩比
Compression: options.Snappy   // 平衡性能和压缩比
Compression: options.None     // 无压缩,最快

3. 布隆过滤器配置

BloomFalsePositive: 0.01      // 1%假阳性率
BloomFalsePositive: 0.001     // 0.1%假阳性率(更多内存)

SSTable的设计在存储效率、查询性能和内存使用之间取得了很好的平衡,通过块级组织、前缀压缩、布隆过滤器等技术实现了高效的数据存储和访问。

相关文章:

  • Docker安装openGauss
  • pont拉取代码
  • git管理github上的repository(二)
  • 2025 Java 面试大全
  • A 找倍数 (线段树)
  • 嵌入式学习笔记DAY35(数据库)
  • 龙虎榜——20250610
  • 算法题(166):占卜DIY
  • Zustand 状态管理库:极简而强大的解决方案
  • 城市照明深夜全亮太浪费?智能分时调光方案落地贵州某市
  • 精读 2025 《可信数据空间标准体系建设指南》【附全文阅读】
  • ​​扩散模型调度器(Scheduler)
  • MySQL事务——博主总结
  • pycharm最近遇到的一些问题
  • 理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
  • rapidocr v3.1.0发布
  • HDFS 3.4.1 集成Kerberos 实现账户认证
  • 6月10日星期二今日早报简报微语报早读
  • 用纯.NET开发并制作一个智能桌面机器人(五):使用.NET为树莓派开发Wifi配网功能
  • Unit 2 训练你的第一个深度强化学习智能体 LunarLander-v3
  • 我国政府门户网站的建设情况/武汉网站优化公司
  • 找事做网站/google官网登录入口
  • 做单位网站的公司吗/搜索引擎优化的核心本质
  • 陕西住房建设部网站/南宁seo主管
  • 苏州市城市建设局网站/公司网站建设公司
  • 赣榆哪里有做网站的/百度风云榜官网