Jaeger开源分布式追踪平台深度剖析(三)Jaeger默认存储Badger原理剖析
Badger Value Log值日志详解
概述
Value Log是Badger实现键值分离(Key-Value Separation)的核心组件。它将大值存储在独立的日志文件中,LSM树只存储键和值指针,这种设计显著减少了写放大和提高了性能。
核心设计思想
1. 键值分离机制
Small Values (< ValueThreshold): Key + Value → LSM TreeLarge Values (>= ValueThreshold):Key + ValuePointer → LSM TreeValue → Value LogValuePointer格式:[Fid(4B)] [Len(4B)] [Offset(4B)]
2. Value Log结构
type valueLog struct {dirPath string // 目录路径filesLock sync.RWMutex // 文件锁filesMap map[uint32]*logFile // 文件映射maxFid uint32 // 最大文件IDfilesToBeDeleted []uint32 // 待删除文件numActiveIterators atomic.Int32 // 活跃迭代器数量writableLogOffset atomic.Uint32 // 可写偏移numEntriesWritten uint32 // 已写入条目数discardStats *discardStats // 垃圾回收统计garbageCh chan struct{} // 垃圾回收通道
}
Value Log文件格式
1. 文件头部
+----------------+------------------+
| keyID(8 bytes) | baseIV(12 bytes)|
+----------------+------------------+
2. 记录格式
+--------+--------+--------+--------+--------+--------+
| Header | KeyLen | Key | ValLen | Value | CRC32 |
+--------+--------+--------+--------+--------+--------+
| ?B | 4B | Var | 4B | Var | 4B |
3. Header详细格式
type header struct {klen uint32 // 键长度vlen uint32 // 值长度expiresAt uint64 // 过期时间meta byte // 元数据标志userMeta byte // 用户元数据
}
写入机制
1. 值阈值判断
func (db *DB) valueThreshold() int64 {if db.threshold != nil {return db.threshold.valueThreshold.Load()}return db.opt.ValueThreshold
}
2. 写入流程
func (vlog *valueLog) write(reqs []*request) error {vlog.filesLock.RLock()maxFid := vlog.maxFidcurlf := vlog.filesMap[maxFid]vlog.filesLock.RUnlock()toDisk := func() error {if err := curlf.doneWriting(vlog.writableLogOffset.Load()); err != nil {return err}y.NumBytesWrittenToL0Add(vlog.opt.MetricsEnabled, int64(vlog.writableLogOffset.Load()))return nil}for i := range reqs {b := reqs[i]b.Ptrs = b.Ptrs[:0]for j := range b.Entries {e := b.Entries[j]var p valuePointerif shouldWriteValueToLSM(e, vlog.db.valueThreshold()) {// 小值直接存储在LSM中p.Len = uint32(len(e.Value))b.Ptrs = append(b.Ptrs, p)continue}// 大值存储在Value Log中p.Fid = curlf.fidp.Offset = vlog.woffset()p.Len = uint32(len(e.Value))b.Ptrs = append(b.Ptrs, p)// 写入Value Logbuf := vlog.encodeEntry(e, p.Offset)copy(curlf.Data[p.Offset:], buf)vlog.writableLogOffset.Add(uint32(len(buf)))}}return toDisk()
}
3. 条目编码
func (vlog *valueLog) encodeEntry(e *Entry, offset uint32) []byte {h := header{klen: uint32(len(e.Key)),vlen: uint32(len(e.Value)),expiresAt: e.ExpiresAt,meta: e.meta,userMeta: e.UserMeta,}// 计算总大小size := h.Size() + len(e.Key) + len(e.Value) + crc32.Sizebuf := make([]byte, size)written := h.EncodeTo(buf)copy(buf[written:], e.Key)written += len(e.Key)copy(buf[written:], e.Value)written += len(e.Value)// 计算并写入CRC32hash := crc32.New(y.CastagnoliCrcTable)hash.Write(buf[:written])checksum := hash.Sum32()y.U32ToBytes(buf[written:], checksum)return buf
}
读取机制
1. 值指针解码
type valuePointer struct {Fid uint32 // 文件IDLen uint32 // 值长度Offset uint32 // 文件内偏移
}func (p *valuePointer) Decode(b []byte) {p.Fid = binary.BigEndian.Uint32(b[:4])p.Len = binary.BigEndian.Uint32(b[4:8])p.Offset = binary.BigEndian.Uint32(b[8:12])
}
2. 读取流程
func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {// 获取目标文件lf, err := vlog.getFileRLocked(vp)if err != nil {return nil, nil, err}// 读取数据buf, err := lf.read(vp)if err != nil {return nil, nil, err}// 验证和解密reader := &safeRead{recordOffset: vp.Offset,lf: lf,}entry, err := reader.Entry(bytes.NewReader(buf))if err != nil {return nil, nil, err}return entry.Value, func() { /* unlock callback */ }, nil
}
3. 缓存友好读取
func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {lf, err := vlog.getFileRLocked(vp)if err != nil {return nil, nil, err}// 使用mmap直接读取offset := vp.Offsetif offset >= uint32(len(lf.Data)) {return nil, lf, fmt.Errorf("Invalid value pointer offset: %d", offset)}return lf.Data[offset:], lf, nil
}
垃圾回收机制
1. 丢弃统计
type discardStats struct {sync.MutexvaluesLock sync.RWMutexvalues map[uint32]int64 // 每个文件的丢弃字节数bytesRead atomic.Int64 // 读取字节数fileSize int64 // 文件大小
}func (ds *discardStats) Update(fid uint32, discard int64) {ds.valuesLock.Lock()ds.values[fid] += discardds.valuesLock.Unlock()
}
2. GC触发条件
func (vlog *valueLog) pickLog(discardRatio float64) *logFile {vlog.filesLock.RLock()defer vlog.filesLock.RUnlock()candidate := struct {fid uint32discardRatio float64staleDataSize int64}{}for fid, lf := range vlog.filesMap {if fid >= vlog.maxFid {continue // 跳过当前写入文件}staleDataSize := vlog.discardStats.StaleDataSize(fid)totalSize := lf.size.Load()if totalSize == 0 {continue}ratio := float64(staleDataSize) / float64(totalSize)if ratio > discardRatio && ratio > candidate.discardRatio {candidate.fid = fidcandidate.discardRatio = ratiocandidate.staleDataSize = staleDataSize}}if candidate.fid != 0 {return vlog.filesMap[candidate.fid]}return nil
}
3. GC执行流程
func (vlog *valueLog) doRunGC(lf *logFile) error {// 统计有效数据var validEntries []*Entryvar totalValidSize int64err := lf.iterate(true, 0, func(e Entry, vp valuePointer) error {// 检查entry是否仍然有效vs, err := vlog.db.get(e.Key)if err != nil {return err}if !discardEntry(e, vs, vlog.db) {validEntries = append(validEntries, &e)totalValidSize += int64(vp.Len)}return nil})if err != nil {return err}vlog.opt.Infof("GC rewriting fid: %d, valid entries: %d, valid size: %d", lf.fid, len(validEntries), totalValidSize)// 重写有效数据到新的Value Log文件if len(validEntries) > 0 {return vlog.rewrite(validEntries)}// 标记文件为删除vlog.filesLock.Lock()vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, lf.fid)vlog.filesLock.Unlock()return nil
}
4. 数据重写
func (vlog *valueLog) rewrite(entries []*Entry) error {// 创建重写请求req := &request{Entries: entries,Ptrs: make([]valuePointer, len(entries)),}req.Wg.Add(1)// 写入新的Value Logerr := vlog.write([]*request{req})if err != nil {return err}// 更新LSM中的值指针txn := vlog.db.NewTransaction(true)defer txn.Discard()for i, e := range entries {vp := req.Ptrs[i]if err := txn.SetEntry(&Entry{Key: e.Key,Value: vp.Encode(),Meta: bitValuePointer,}); err != nil {return err}}return txn.Commit()
}
文件管理
1. 文件创建
func (vlog *valueLog) createVlogFile() (*logFile, error) {fid := atomic.AddUint32(&vlog.maxFid, 1)fname := vlog.fpath(fid)mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR, vlog.opt.ValueLogFileSize)if err != nil {return nil, err}lf := &logFile{MmapFile: mf,fid: fid,path: fname,opt: vlog.opt,}// 写入文件头if err := lf.bootstrap(); err != nil {return nil, err}vlog.filesLock.Lock()vlog.filesMap[fid] = lfvlog.filesLock.Unlock()return lf, nil
}
2. 文件轮转
func (vlog *valueLog) sync() error {vlog.filesLock.RLock()curlf := vlog.filesMap[vlog.maxFid]vlog.filesLock.RUnlock()// 检查当前文件是否需要轮转if vlog.woffset() > vlog.opt.ValueLogFileSize-uint32(maxHeaderSize) {if err := curlf.doneWriting(vlog.woffset()); err != nil {return err}// 创建新文件newlf, err := vlog.createVlogFile()if err != nil {return err}vlog.writableLogOffset.Store(vlogHeaderSize)vlog.numEntriesWritten = 0}return curlf.Sync()
}
3. 文件删除
func (vlog *valueLog) deleteLogFile(lf *logFile) error {// 检查是否还有活跃迭代器if vlog.numActiveIterators.Load() > 0 {return nil // 延迟删除}vlog.filesLock.Lock()delete(vlog.filesMap, lf.fid)vlog.filesLock.Unlock()// 删除文件if err := lf.Delete(); err != nil {return err}vlog.opt.Infof("Deleted value log file: %s", lf.path)return nil
}
动态阈值调整
1. 阈值监控
type vlogThreshold struct {percentile float64 // 百分位数valueThreshold atomic.Int64 // 当前阈值valueCh chan []int64 // 值大小通道vlMetrics *z.HistogramData // 直方图数据
}func (v *vlogThreshold) update(sizes []int64) {v.vlMetrics.Update(sizes)// 根据百分位数计算新阈值newThreshold := v.vlMetrics.Percentile(v.percentile)v.valueThreshold.Store(int64(newThreshold))
}
2. 自适应调整
func (v *vlogThreshold) listenForValueThresholdUpdate() {defer v.closer.Done()ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for {select {case sizes := <-v.valueCh:v.update(sizes)case <-v.clearCh:v.vlMetrics.Clear()case <-ticker.C:// 定期调整阈值if v.vlMetrics.Count() > 1000 {v.update(nil)}case <-v.closer.HasBeenClosed():return}}
}
性能优化
1. 批量写入
func (vlog *valueLog) write(reqs []*request) error {// 合并多个请求减少系统调用totalSize := uint32(0)for _, req := range reqs {totalSize += estimateRequestSize(req)}// 预分配缓冲区buf := make([]byte, totalSize)written := 0for _, req := range reqs {for _, e := range req.Entries {entryBuf := vlog.encodeEntry(e, offset)copy(buf[written:], entryBuf)written += len(entryBuf)}}// 一次性写入return vlog.writeBuffer(buf)
}
2. 内存映射优化
func (lf *logFile) read(vp valuePointer) ([]byte, error) {// 直接从mmap内存读取,无需系统调用if vp.Offset >= uint32(len(lf.Data)) {return nil, fmt.Errorf("Invalid offset")}return lf.Data[vp.Offset:vp.Offset+vp.Len], nil
}
3. 并发控制优化
func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {vlog.filesLock.RLock()defer vlog.filesLock.RUnlock()RES (lf, ok := vlog.filesMap[vp.Fid]if !ok {return nil, fmt.Errorf("File not found: %d", vp.Fid)}return lf, nil
}
配置建议
1. 值阈值设置
// 小文件多读场景
ValueThreshold: 32 // 32字节// 大文件少读场景
ValueThreshold: 2048 // 2KB// 自适应模式
VLogPercentile: 0.95 // 95%分位数
2. 文件大小配置
// SSD环境
ValueLogFileSize: 1 << 30 // 1GB// HDD环境
ValueLogFileSize: 2 << 30 // 2GB// 内存有限环境
ValueLogFileSize: 128 << 20 // 128MB
3. GC配置
// 激进GC
GCDiscardRatio: 0.3 // 30%垃圾时触发// 保守GC
GCDiscardRatio: 0.7 // 70%垃圾时触发
Value Log的设计是Badger性能优异的关键因素,通过键值分离有效减少了写放大,同时通过智能的垃圾回收机制保证了空间利用率。理解其工作原理对于优化Badger在不同场景下的性能表现非常重要。
Badger事务与并发控制详解
概述
Badger采用MVCC(Multi-Version Concurrency Control)实现事务并发控制,支持快照隔离(Snapshot Isolation)级别的事务。通过Oracle组件管理时间戳分配和冲突检测,实现了高并发的读写操作。
核心组件
1. Oracle时间戳管理器
type oracle struct {isManaged bool // 是否托管模式detectConflicts bool // 是否开启冲突检测nextTxnTs uint64 // 下一个事务时间戳writeChLock sync.Mutex // 写入通道锁txnMark *y.WaterMark // 事务水位标记readMark *y.WaterMark // 读取水位标记discardTs uint64 // 丢弃时间戳committedTxns []committedTxn // 已提交事务列表lastCleanupTs uint64 // 最后清理时间戳closer *z.Closer // 关闭器
}
2. Transaction事务结构
type Txn struct {readTs uint64 // 读取时间戳commitTs uint64 // 提交时间戳size int64 // 事务大小count int64 // 条目数量db *DB // 数据库引用reads []uint64 // 读取键指纹conflictKeys map[uint64]struct{} // 冲突键指纹readsLock sync.Mutex // 读取锁pendingWrites map[string]*Entry // 待写入条目duplicateWrites []*Entry // 重复写入条目numIterators atomic.Int32 // 迭代器数量discarded bool // 是否已丢弃doneRead bool // 是否完成读取update bool // 是否更新事务
}
MVCC机制详解
1. 时间戳分配
func (o *oracle) readTs() uint64 {if o.isManaged {panic("ReadTs should not be retrieved for managed DB")}var readTs uint64o.Lock()readTs = o.nextTxnTs - 1o.readMark.Begin(readTs) // 开始读取标记o.Unlock()// 等待所有无冲突事务完成写入y.Check(o.txnMark.WaitForMark(context.Background(), readTs))return readTs
}func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {o.Lock()defer o.Unlock()// 检查冲突if o.hasConflict(txn) {return 0, true // 返回冲突标志}var ts uint64if !o.isManaged {o.doneRead(txn)o.cleanupCommittedTransactions()// 分配新的提交时间戳ts = o.nextTxnTso.nextTxnTs++o.txnMark.Begin(ts)} else {ts = txn.commitTs // 托管模式使用预设时间戳}// 记录已提交事务用于冲突检测if o.detectConflicts {o.committedTxns = append(o.committedTxns, committedTxn{ts: ts,conflictKeys: txn.conflictKeys,})}return ts, false
}
2. 版本可见性规则
// 版本可见性判断
func isVisible(itemVersion, readTs uint64) bool {return itemVersion <= readTs
}// 在Get操作中的应用
func (txn *Txn) Get(key []byte) (item *Item, rerr error) {if len(key) == 0 {return nil, ErrEmptyKey}if txn.discarded {return nil, ErrDiscardedTxn}// 1. 首先检查事务内的写入缓存if txn.update {if e, has := txn.pendingWrites[string(key)]; has {if isDeletedOrExpired(e.meta, e.ExpiresAt) {return nil, ErrKeyNotFound}return &Item{e: e, txn: txn}, nil}}// 2. 添加到读取集合(用于冲突检测)txn.addReadKey(key)// 3. 从数据库读取,只读取版本 <= readTs 的数据seek := y.KeyWithTs(key, txn.readTs)vs, err := txn.db.get(seek)if err != nil {return nil, y.Wrapf(err, "DB::Get key: %q", key)}if vs.Meta&bitDelete > 0 {return nil, ErrKeyNotFound}item = &Item{key: key,vptr: vs.Value,meta: vs.Meta,userMeta: vs.UserMeta,expiresAt: vs.ExpiresAt,version: y.ParseTs(vs.Value),txn: txn,db: txn.db,}return item, nil
}
冲突检测机制
1. 冲突键跟踪
func (txn *Txn) addReadKey(key []byte) {if !txn.update || !txn.db.opt.DetectConflicts {return}txn.readsLock.Lock()defer txn.readsLock.Unlock()fp := z.MemHash(key)// 添加到读取集合txn.reads = append(txn.reads, fp)
}func (txn *Txn) modify(e *Entry) error {if txn.discarded {return ErrDiscardedTxn}if !txn.update {return ErrReadOnlyTxn}// 记录写入键用于冲突检测if txn.db.opt.DetectConflicts {fp := z.MemHash(e.Key)if txn.conflictKeys == nil {txn.conflictKeys = make(map[uint64]struct{})}txn.conflictKeys[fp] = struct{}{}}txn.pendingWrites[string(e.Key)] = ereturn nil
}
2. 冲突检测算法
func (o *oracle) hasConflict(txn *Txn) bool {if len(txn.reads) == 0 {return false}for _, committedTxn := range o.committedTxns {// 如果已提交事务的时间戳 <= 当前事务的读时间戳// 说明已提交事务在当前事务开始前完成,无需检查冲突if committedTxn.ts <= txn.readTs {continue}// 检查读写冲突:当前事务读取的键是否被后续事务写入for _, readKey := range txn.reads {if _, has := committedTxn.conflictKeys[readKey]; has {return true // 发现冲突}}}return false
}
3. 冲突处理策略
func (txn *Txn) Commit() error {// 预检查if err := txn.commitPrecheck(); err != nil {return err}// 尝试获取提交时间戳commitTs, conflict := txn.db.orc.newCommitTs(txn)if conflict {// 发生冲突,回滚事务return ErrConflict}txn.commitTs = commitTs// 执行提交callback, err := txn.commitAndSend()if err != nil {return err}// 等待写入完成return callback()
}
事务生命周期
1. 事务创建
func (db *DB) NewTransaction(update bool) *Txn {return db.newTransaction(update, false)
}func (db *DB) newTransaction(update, isManaged bool) *Txn {if db.IsClosed() {panic(ErrDBClosed)}txn := &Txn{update: update,db: db,count: 1, // 自身占用一个计数size: int64(len(txnKey)), // 事务标记键的大小discarded: false,pendingWrites: make(map[string]*Entry),}if !isManaged {txn.readTs = db.orc.readTs() // 分配读时间戳}return txn
}
2. 写入操作
func (txn *Txn) Set(key, val []byte) error {e := NewEntry(key, val)return txn.SetEntry(e)
}func (txn *Txn) SetEntry(e *Entry) error {return txn.modify(e)
}func (txn *Txn) Delete(key []byte) error {e := NewEntry(key, nil).WithMeta(bitDelete)return txn.modify(e)
}
3. 提交过程
func (txn *Txn) commitAndSend() (func() error, error) {// 1. 构建写入条目var entries []*Entryfor _, e := range txn.pendingWrites {// 设置版本信息e.Key = y.KeyWithTs(e.Key, txn.commitTs)e.meta |= bitTxn // 标记为事务条目entries = append(entries, e)}// 2. 添加事务结束标记e := &Entry{Key: y.KeyWithTs(txnKey, txn.commitTs),meta: bitFinTxn,}entries = append(entries, e)// 3. 发送到写入通道req, err := txn.db.sendToWriteCh(entries)if err != nil {return nil, err}// 4. 返回等待函数ret := func() error {err := req.Wait()// 标记事务完成txn.db.orc.doneCommit(txn.commitTs)return err}return ret, nil
}
4. 事务清理
func (txn *Txn) Discard() {if txn.discarded {return}// 等待所有迭代器关闭if atomic.LoadInt32(&txn.numIterators) > 0 {panic("Unclosed iterator at time of Txn.Discard.")}txn.discarded = trueif !txn.db.orc.isManaged {txn.db.orc.doneRead(txn) // 标记读取完成}
}
WaterMark水位标记
1. WaterMark机制
// WaterMark用于跟踪进行中的操作
type WaterMark struct {Name stringmarkIdx uint64doneUntil uint64waiters map[uint64][]chan struct{}elog trace.EventLoglock sync.Mutex
}func (w *WaterMark) Begin(index uint64) {w.lock.Lock()defer w.lock.Unlock()w.markIdx = indexw.waiters[index] = make([]chan struct{}, 0)
}func (w *WaterMark) Done(index uint64) {w.lock.Lock()defer w.lock.Unlock()// 通知等待者if chs, ok := w.waiters[index]; ok {for _, ch := range chs {close(ch)}delete(w.waiters, index)}// 更新doneUntilif index == w.doneUntil+1 {w.doneUntil = index// 连续更新doneUntilfor {if _, ok := w.waiters[w.doneUntil+1]; !ok {w.doneUntil++} else {break}}}
}
2. 垃圾回收支持
func (o *oracle) discardAtOrBelow() uint64 {if o.isManaged {o.Lock()defer o.Unlock()return o.discardTs}// 返回所有读取操作都已完成的最大时间戳return o.readMark.DoneUntil()
}func (o *oracle) cleanupCommittedTransactions() {if !o.detectConflicts {return}// 清理过期的已提交事务记录discardBelow := o.discardAtOrBelow()if discardBelow > o.lastCleanupTs {// 移除时间戳 <= discardBelow 的事务var newCommittedTxns []committedTxnfor _, txn := range o.committedTxns {if txn.ts > discardBelow {newCommittedTxns = append(newCommittedTxns, txn)}}o.committedTxns = newCommittedTxnso.lastCleanupTs = discardBelow}
}
托管事务模式
1. 托管vs非托管
// 非托管模式:Badger自动分配时间戳
func (db *DB) Update(fn func(txn *Txn) error) error {txn := db.NewTransaction(true)defer txn.Discard()if err := fn(txn); err != nil {return err}return txn.Commit() // 自动分配commitTs
}// 托管模式:用户控制时间戳
func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {wb := db.NewWriteBatch()wb.commitTs = commitTsreturn wb
}
2. 托管模式优势
- 确定性重放:用户控制时间戳,便于复制和恢复
- 批量操作:可以批量提交多个事务
- 外部一致性:与外部系统保持时间戳一致性
读写隔离级别
1. 快照隔离保证
// 事务看到的是readTs时刻的快照
func (txn *Txn) Get(key []byte) (*Item, error) {// 只能看到版本 <= readTs 的数据seek := y.KeyWithTs(key, txn.readTs)vs, err := txn.db.get(seek)// ...
}// 写入时使用commitTs作为版本
func (txn *Txn) commitAndSend() {for _, e := range txn.pendingWrites {e.Key = y.KeyWithTs(e.Key, txn.commitTs)// ...}
}
2. 读一致性
- 单调读:同一事务内多次读取同一键得到相同结果
- 读已提交:只能读取到已提交的数据
- 快照隔离:事务开始时获得数据库快照
性能优化
1. 批量操作
func (db *DB) Update(fn func(txn *Txn) error) error {// 自动管理事务生命周期txn := db.NewTransaction(true)defer txn.Discard()return fn(txn)
}// 批量写入
type WriteBatch struct {txn *TxncommitTs uint64entries []*Entry
}func (wb *WriteBatch) Flush() error {// 批量提交所有操作return wb.txn.Commit()
}
2. 冲突检测优化
// 可选择性启用冲突检测
opt := DefaultOptions(path)
opt.DetectConflicts = false // 禁用以提高性能// 使用指纹而非完整键进行冲突检测
fp := z.MemHash(key) // 64位指纹,降低内存使用
3. 内存管理
// 限制事务大小
func (txn *Txn) checkSize(e *Entry) error {count := int64(len(e.Key) + len(e.Value) + 1)if txn.count+count >= txn.db.opt.maxBatchCount {return ErrTxnTooBig}if txn.size+count >= txn.db.opt.maxBatchSize {return ErrTxnTooBig}txn.count += counttxn.size += int64(e.EstimateSize(txn.db.opt.ValueThreshold))return nil
}
配置参数
1. 冲突检测配置
opt.DetectConflicts = true // 启用冲突检测
opt.NumCompactors = 2 // 压缩线程数影响清理速度
2. 事务大小限制
opt.MaxBatchCount = 100000 // 最大批次条目数
opt.MaxBatchSize = 15MB // 最大批次大小
3. 托管模式配置
opt.ManagedTxns = true // 启用托管事务模式
Badger的MVCC实现提供了高性能的并发控制,通过时间戳排序和冲突检测保证了事务的ACID特性,同时支持高并发的读写操作。理解其工作原理有助于正确使用事务功能并优化应用性能。
Badger SSTable文件格式详解
概述
SSTable(Sorted String Table)是Badger中的不可变数据文件格式,存储经过排序的键值对。每个SSTable文件都包含数据块、索引、布隆过滤器等组件,通过精心设计的格式实现高效的查找和压缩。
文件整体结构
+================+
| Data Blocks | ← 数据块区域(压缩)
| ... |
+================+
| Index Block | ← 索引块(FlatBuffer格式)
+================+
| Bloom Filter | ← 布隆过滤器(可选)
+================+
| Checksum | ← 文件校验和(8字节)
+================+
| Index Offset | ← 索引偏移量(8字节)
+================+
| Index Length | ← 索引长度(4字节)
+================+
| Footer Magic | ← 文件魔数(4字节)
+================+
数据块(Data Blocks)
1. 块内结构
Block内部格式:
+--------+--------+-----+--------+--------+-----+
| Entry1 | Entry2 | ... | EntryN | Restart| CRC |
+--------+--------+-----+--------+--------+-----+Entry格式:
+----------+----------+----------+----------+----------+
| SharedLen| UnsharedLen| ValueLen | Key | Value |
+----------+----------+----------+----------+----------+
| 4B | 4B | 4B | Var | Var |
2. 键压缩机制
// 前缀压缩减少存储空间
type blockBuilder struct {data []byterestarts []uint32 // 重启点偏移counter int // 当前条目计数prevKey []byte // 前一个键restartInterval int // 重启间隔
}func (b *blockBuilder) Add(key, value []byte, valuePointer bool) {// 计算与前一个键的共同前缀长度shared := 0if b.counter < b.restartInterval {shared = b.sharedPrefixLen(b.prevKey, key)}unshared := len(key) - shared// 编码条目b.append4Byte(uint32(shared)) // 共享前缀长度b.append4Byte(uint32(unshared)) // 非共享部分长度b.append4Byte(uint32(len(value))) // 值长度b.data = append(b.data, key[shared:]...) // 非共享键部分b.data = append(b.data, value...) // 值if b.counter == 0 || b.counter >= b.restartInterval {// 设置重启点b.restarts = append(b.restarts, uint32(len(b.data)))b.counter = 0}b.prevKey = append(b.prevKey[:0], key...)b.counter++
}
3. 块尾部信息
// 块结束时添加重启点信息
func (b *blockBuilder) Finish() []byte {// 1. 添加重启点数组for _, restart := range b.restarts {b.append4Byte(restart)}// 2. 添加重启点数量b.append4Byte(uint32(len(b.restarts)))// 3. 计算并添加CRC32校验和checksum := crc32.Checksum(b.data, y.CastagnoliCrcTable)b.append4Byte(checksum)return b.data
}
索引结构(FlatBuffer)
1. TableIndex定义
// table.fbs
namespace fb;table BlockOffset {key: [ubyte]; // 块的第一个键offset: uint64; // 块在文件中的偏移len: uint32; // 块的长度
}table TableIndex {offsets: [BlockOffset]; // 块偏移数组bloom_filter: [ubyte]; // 布隆过滤器数据max_version: uint64; // 最大版本号key_count: uint32; // 键数量uncompressed_size: uint32; // 未压缩大小on_disk_size: uint32; // 磁盘大小stale_data_size: uint32; // 过期数据大小
}
2. 索引构建
func (b *Builder) buildIndex() *fb.TableIndexT {index := &fb.TableIndexT{Offsets: make([]*fb.BlockOffsetT, 0, len(b.blockList)),BloomFilter: b.bloom.JSONMarshal(),MaxVersion: b.maxVersion,KeyCount: uint32(b.keyCount),UncompressedSize: uint32(b.uncompressedSize),OnDiskSize: uint32(len(b.buf)),}// 构建块偏移数组for _, block := range b.blockList {offset := &fb.BlockOffsetT{Key: block.firstKey,Offset: uint64(block.offset),Len: uint32(block.len),}index.Offsets = append(index.Offsets, offset)}return index
}
3. 索引序列化
func (b *Builder) finishIndex() []byte {// 使用FlatBuffer序列化索引builder := flatbuffers.NewBuilder(1024)index := b.buildIndex()indexOffset := index.Pack(builder)builder.Finish(indexOffset)return builder.FinishedBytes()
}
布隆过滤器
1. 布隆过滤器构建
type Bloom struct {bitmap []byte // 位图k uint8 // 哈希函数数量numBits uint32 // 位数numKeys uint32 // 键数量
}func NewBloomFilter(numEntries int, fp float64) *Bloom {// 计算最优参数bitsPerKey := -1.44 * math.Log2(fp) // 每个键需要的位数numBits := uint32(float64(numEntries) * bitsPerKey)numHashFuncs := uint8(bitsPerKey * 0.693) // ln(2)return &Bloom{bitmap: make([]byte, (numBits+7)/8),k: numHashFuncs,numBits: numBits,}
}func (bloom *Bloom) Add(key []byte) {h := z.MemHash(key)delta := h>>17 | h<<15 // 第二个哈希函数for i := uint8(0); i < bloom.k; i++ {bitPos := h % bloom.numBitsbloom.bitmap[bitPos/8] |= 1 << (bitPos % 8)h += delta}
}func (bloom *Bloom) MayContain(key []byte) bool {h := z.MemHash(key)delta := h>>17 | h<<15for i := uint8(0); i < bloom.k; i++ {bitPos := h % bloom.numBitsif bloom.bitmap[bitPos/8]&(1<<(bitPos%8)) == 0 {return false // 肯定不存在}h += delta}return true // 可能存在
}
2. 布隆过滤器优化
// 分块布隆过滤器,减少缓存缺失
type BlockedBloom struct {data []bytenumProbes intnumBlocks uint32blockMask uint32
}func (bf *BlockedBloom) MayContain(key []byte) bool {h := z.MemHash(key)// 选择块blockIdx := (h >> 11 | h << 21) & bf.blockMaskblock := bf.data[blockIdx*32 : (blockIdx+1)*32] // 32字节块// 在块内进行多次探测for i := 0; i < bf.numProbes; i++ {bitPos := h & 255 // 块内位置(0-255)if block[bitPos/8]&(1<<(bitPos%8)) == 0 {return false}h += h>>17 | h<<15}return true
}
压缩机制
1. 块级压缩
func (b *Builder) finishBlock() error {if b.curBlock.IsEmpty() {return nil}// 获取原始数据data := b.curBlock.Finish()var compressed []bytevar compressionType uint32switch b.opts.Compression {case options.None:compressed = datacompressionType = 0case options.Snappy:compressed = snappy.Encode(nil, data)compressionType = 1case options.ZSTD:compressed = zstd.Compress(nil, data)compressionType = 2}// 选择压缩效果更好的版本if len(compressed) < len(data) {data = compressed} else {compressionType = 0 // 使用未压缩版本}// 写入压缩类型标记data = append(data, byte(compressionType))b.writeBlock(data)return nil
}
2. 压缩策略
// 自适应压缩:根据数据特征选择压缩算法
func (b *Builder) chooseCompression(data []byte) options.CompressionType {if len(data) < 1024 {return options.None // 小块不压缩}// 计算数据熵entropy := calculateEntropy(data)if entropy > 7.5 {return options.None // 高熵数据压缩效果差}if entropy > 6.0 {return options.Snappy // 中等熵使用Snappy}return options.ZSTD // 低熵使用ZSTD
}
读取机制
1. 表打开流程
func OpenTable(mf *z.MmapFile, opts table.Options) (*Table, error) {t := &Table{mf: mf,opts: opts,}// 1. 读取并验证Footerif err := t.readFooter(); err != nil {return nil, err}// 2. 读取索引if err := t.readIndex(); err != nil {return nil, err}// 3. 验证校验和if err := t.verifyChecksum(); err != nil {return nil, err}return t, nil
}
2. 索引查找
func (t *Table) search(key []byte, maxVs *y.ValueStruct) (y.ValueStruct, error) {// 1. 布隆过滤器预检查if t.index.BloomFilter != nil {if !t.bloomFilter.MayContain(key) {return y.ValueStruct{}, ErrKeyNotFound}}// 2. 二分查找确定块blockIdx := t.findBlock(key)if blockIdx < 0 {return y.ValueStruct{}, ErrKeyNotFound}// 3. 从缓存或磁盘读取块block, err := t.getBlock(blockIdx)if err != nil {return y.ValueStruct{}, err}// 4. 在块内查找return block.search(key, maxVs)
}
3. 块级缓存
type Table struct {mf *z.MmapFileindex *fb.TableIndexbloomFilter *BloomblockCache *ristretto.Cache // 块缓存
}func (t *Table) getBlock(idx int) (*Block, error) {// 1. 尝试从缓存获取if cached, found := t.blockCache.Get(t.blockCacheKey(idx)); found {return cached.(*Block), nil}// 2. 从磁盘读取offset := t.index.Offsets[idx]data := t.mf.Data[offset.Offset:offset.Offset+uint64(offset.Len)]// 3. 解压缩block, err := t.decompressBlock(data)if err != nil {return nil, err}// 4. 缓存块t.blockCache.Set(t.blockCacheKey(idx), block, int64(len(data)))return block, nil
}
迭代器实现
1. 表迭代器
type Iterator struct {t *TableblockIdx intblockIter *blockIteratorerr errorreversed bool
}func (it *Iterator) seekToFirst() {it.blockIdx = 0it.loadBlock()if it.blockIter != nil {it.blockIter.seekToFirst()}
}func (it *Iterator) Next() {if it.blockIter != nil {it.blockIter.Next()if !it.blockIter.Valid() {// 当前块已结束,移动到下一块it.blockIdx++it.loadBlock()if it.blockIter != nil {it.blockIter.seekToFirst()}}}
}
2. 块内迭代器
type blockIterator struct {data []byte // 块数据restarts []uint32 // 重启点offset uint32 // 当前偏移key []byte // 当前键value []byte // 当前值entryOffset uint32 // 条目偏移
}func (it *blockIterator) parseNext() {if it.offset >= uint32(len(it.data)) {return}// 读取条目头部shared := binary.LittleEndian.Uint32(it.data[it.offset:])it.offset += 4unshared := binary.LittleEndian.Uint32(it.data[it.offset:])it.offset += 4valueLen := binary.LittleEndian.Uint32(it.data[it.offset:])it.offset += 4// 重构完整键it.key = it.key[:shared]it.key = append(it.key, it.data[it.offset:it.offset+unshared]...)it.offset += unshared// 读取值it.value = it.data[it.offset:it.offset+valueLen]it.offset += valueLen
}
性能优化
1. 预取优化
func (t *Table) prefetchBlocks(startIdx, count int) {for i := 0; i < count && startIdx+i < len(t.index.Offsets); i++ {idx := startIdx + igo func(blockIdx int) {t.getBlock(blockIdx) // 异步预取}(idx)}
}
2. 内存对齐
// 确保关键数据结构内存对齐
type alignedBlock struct {_ [0]uint64 // 确保8字节对齐data []bytechecksum uint32_ [4]byte // 填充到8字节边界
}
3. SIMD优化(布隆过滤器)
// 使用SIMD指令优化布隆过滤器
func (bf *Bloom) mayContainSIMD(key []byte) bool {// 在支持的平台上使用SIMD指令return bf.mayContainAVX2(key)
}
文件管理
1. 原子创建
func CreateTable(fname string, builder *Builder) (*Table, error) {// 1. 写入临时文件tmpName := fname + ".tmp"bd := builder.Done()mf, err := z.OpenMmapFile(tmpName, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size)if err != nil {return nil, err}// 2. 写入数据written := bd.Copy(mf.Data)if written != len(mf.Data) {return nil, fmt.Errorf("written %d != expected %d", written, len(mf.Data))}// 3. 同步到磁盘if err := mf.Sync(); err != nil {return nil, err}// 4. 原子重命名if err := os.Rename(tmpName, fname); err != nil {return nil, err}return OpenTable(mf, builder.opts)
}
2. 错误恢复
func (t *Table) verifyIntegrity() error {// 1. 检查魔数if t.footerMagic != tableFooterMagic {return ErrInvalidMagic}// 2. 验证索引校验和if err := t.verifyIndexChecksum(); err != nil {return err}// 3. 抽样验证数据块return t.verifyDataBlocks()
}
配置参数
1. 块大小配置
BlockSize: 4096 // 4KB,适合SSD
BlockSize: 16384 // 16KB,适合HDD
2. 压缩配置
Compression: options.ZSTD // 高压缩比
Compression: options.Snappy // 平衡性能和压缩比
Compression: options.None // 无压缩,最快
3. 布隆过滤器配置
BloomFalsePositive: 0.01 // 1%假阳性率
BloomFalsePositive: 0.001 // 0.1%假阳性率(更多内存)
SSTable的设计在存储效率、查询性能和内存使用之间取得了很好的平衡,通过块级组织、前缀压缩、布隆过滤器等技术实现了高效的数据存储和访问。