Jaeger开源分布式追踪平台深度剖析(三)Jaeger默认存储Badger原理剖析

发布于:2025-06-11 ⋅ 阅读:(24) ⋅ 点赞:(0)

Badger Value Log值日志详解

概述

Value Log是Badger实现键值分离(Key-Value Separation)的核心组件。它将大值存储在独立的日志文件中,LSM树只存储键和值指针,这种设计显著减少了写放大和提高了性能。

核心设计思想

1. 键值分离机制

Small Values (< ValueThreshold): 
    Key + Value → LSM Tree

Large Values (>= ValueThreshold):
    Key + ValuePointer → LSM Tree
    Value → Value Log

ValuePointer格式:
    [Fid(4B)] [Len(4B)] [Offset(4B)]

2. Value Log结构

type valueLog struct {
    dirPath              string                  // 目录路径
    filesLock            sync.RWMutex           // 文件锁
    filesMap             map[uint32]*logFile    // 文件映射
    maxFid               uint32                 // 最大文件ID
    filesToBeDeleted     []uint32               // 待删除文件
    numActiveIterators   atomic.Int32           // 活跃迭代器数量
    writableLogOffset    atomic.Uint32          // 可写偏移
    numEntriesWritten    uint32                 // 已写入条目数
    discardStats         *discardStats          // 垃圾回收统计
    garbageCh            chan struct{}          // 垃圾回收通道
}

Value Log文件格式

1. 文件头部

+----------------+------------------+
| keyID(8 bytes) |  baseIV(12 bytes)|
+----------------+------------------+

2. 记录格式

+--------+--------+--------+--------+--------+--------+
| Header | KeyLen | Key    | ValLen | Value  | CRC32  |
+--------+--------+--------+--------+--------+--------+
|   ?B   |   4B   |  Var   |   4B   |  Var   |   4B   |

3. Header详细格式

type header struct {
    klen      uint32    // 键长度
    vlen      uint32    // 值长度
    expiresAt uint64    // 过期时间
    meta      byte      // 元数据标志
    userMeta  byte      // 用户元数据
}

写入机制

1. 值阈值判断

func (db *DB) valueThreshold() int64 {
    if db.threshold != nil {
        return db.threshold.valueThreshold.Load()
    }
    return db.opt.ValueThreshold
}

2. 写入流程

func (vlog *valueLog) write(reqs []*request) error {
    vlog.filesLock.RLock()
    maxFid := vlog.maxFid
    curlf := vlog.filesMap[maxFid]
    vlog.filesLock.RUnlock()

    toDisk := func() error {
        if err := curlf.doneWriting(vlog.writableLogOffset.Load()); err != nil {
            return err
        }
        
        y.NumBytesWrittenToL0Add(vlog.opt.MetricsEnabled, 
            int64(vlog.writableLogOffset.Load()))
        return nil
    }

    for i := range reqs {
        b := reqs[i]
        b.Ptrs = b.Ptrs[:0]
        for j := range b.Entries {
            e := b.Entries[j]
            
            var p valuePointer
            if shouldWriteValueToLSM(e, vlog.db.valueThreshold()) {
                // 小值直接存储在LSM中
                p.Len = uint32(len(e.Value))
                b.Ptrs = append(b.Ptrs, p)
                continue
            }

            // 大值存储在Value Log中
            p.Fid = curlf.fid
            p.Offset = vlog.woffset()
            p.Len = uint32(len(e.Value))
            b.Ptrs = append(b.Ptrs, p)

            // 写入Value Log
            buf := vlog.encodeEntry(e, p.Offset)
            copy(curlf.Data[p.Offset:], buf)
            vlog.writableLogOffset.Add(uint32(len(buf)))
        }
    }

    return toDisk()
}

3. 条目编码

func (vlog *valueLog) encodeEntry(e *Entry, offset uint32) []byte {
    h := header{
        klen:      uint32(len(e.Key)),
        vlen:      uint32(len(e.Value)),
        expiresAt: e.ExpiresAt,
        meta:      e.meta,
        userMeta:  e.UserMeta,
    }

    // 计算总大小
    size := h.Size() + len(e.Key) + len(e.Value) + crc32.Size
    buf := make([]byte, size)
    
    written := h.EncodeTo(buf)
    copy(buf[written:], e.Key)
    written += len(e.Key)
    copy(buf[written:], e.Value)
    written += len(e.Value)

    // 计算并写入CRC32
    hash := crc32.New(y.CastagnoliCrcTable)
    hash.Write(buf[:written])
    checksum := hash.Sum32()
    y.U32ToBytes(buf[written:], checksum)

    return buf
}

读取机制

1. 值指针解码

type valuePointer struct {
    Fid    uint32  // 文件ID
    Len    uint32  // 值长度
    Offset uint32  // 文件内偏移
}

func (p *valuePointer) Decode(b []byte) {
    p.Fid = binary.BigEndian.Uint32(b[:4])
    p.Len = binary.BigEndian.Uint32(b[4:8])
    p.Offset = binary.BigEndian.Uint32(b[8:12])
}

2. 读取流程

func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
    // 获取目标文件
    lf, err := vlog.getFileRLocked(vp)
    if err != nil {
        return nil, nil, err
    }

    // 读取数据
    buf, err := lf.read(vp)
    if err != nil {
        return nil, nil, err
    }

    // 验证和解密
    reader := &safeRead{
        recordOffset: vp.Offset,
        lf:          lf,
    }
    
    entry, err := reader.Entry(bytes.NewReader(buf))
    if err != nil {
        return nil, nil, err
    }

    return entry.Value, func() { /* unlock callback */ }, nil
}

3. 缓存友好读取

func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
    lf, err := vlog.getFileRLocked(vp)
    if err != nil {
        return nil, nil, err
    }

    // 使用mmap直接读取
    offset := vp.Offset
    if offset >= uint32(len(lf.Data)) {
        return nil, lf, fmt.Errorf("Invalid value pointer offset: %d", offset)
    }

    return lf.Data[offset:], lf, nil
}

垃圾回收机制

1. 丢弃统计

type discardStats struct {
    sync.Mutex
    valuesLock  sync.RWMutex
    values      map[uint32]int64  // 每个文件的丢弃字节数
    bytesRead   atomic.Int64      // 读取字节数
    fileSize    int64             // 文件大小
}

func (ds *discardStats) Update(fid uint32, discard int64) {
    ds.valuesLock.Lock()
    ds.values[fid] += discard
    ds.valuesLock.Unlock()
}

2. GC触发条件

func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
    vlog.filesLock.RLock()
    defer vlog.filesLock.RUnlock()

    candidate := struct {
        fid            uint32
        discardRatio   float64
        staleDataSize  int64
    }{}

    for fid, lf := range vlog.filesMap {
        if fid >= vlog.maxFid {
            continue  // 跳过当前写入文件
        }

        staleDataSize := vlog.discardStats.StaleDataSize(fid)
        totalSize := lf.size.Load()
        
        if totalSize == 0 {
            continue
        }

        ratio := float64(staleDataSize) / float64(totalSize)
        if ratio > discardRatio && ratio > candidate.discardRatio {
            candidate.fid = fid
            candidate.discardRatio = ratio
            candidate.staleDataSize = staleDataSize
        }
    }

    if candidate.fid != 0 {
        return vlog.filesMap[candidate.fid]
    }
    return nil
}

3. GC执行流程

func (vlog *valueLog) doRunGC(lf *logFile) error {
    // 统计有效数据
    var validEntries []*Entry
    var totalValidSize int64

    err := lf.iterate(true, 0, func(e Entry, vp valuePointer) error {
        // 检查entry是否仍然有效
        vs, err := vlog.db.get(e.Key)
        if err != nil {
            return err
        }

        if !discardEntry(e, vs, vlog.db) {
            validEntries = append(validEntries, &e)
            totalValidSize += int64(vp.Len)
        }
        return nil
    })

    if err != nil {
        return err
    }

    vlog.opt.Infof("GC rewriting fid: %d, valid entries: %d, valid size: %d", 
        lf.fid, len(validEntries), totalValidSize)

    // 重写有效数据到新的Value Log文件
    if len(validEntries) > 0 {
        return vlog.rewrite(validEntries)
    }

    // 标记文件为删除
    vlog.filesLock.Lock()
    vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, lf.fid)
    vlog.filesLock.Unlock()

    return nil
}

4. 数据重写

func (vlog *valueLog) rewrite(entries []*Entry) error {
    // 创建重写请求
    req := &request{
        Entries: entries,
        Ptrs:    make([]valuePointer, len(entries)),
    }
    req.Wg.Add(1)

    // 写入新的Value Log
    err := vlog.write([]*request{req})
    if err != nil {
        return err
    }

    // 更新LSM中的值指针
    txn := vlog.db.NewTransaction(true)
    defer txn.Discard()

    for i, e := range entries {
        vp := req.Ptrs[i]
        if err := txn.SetEntry(&Entry{
            Key:   e.Key,
            Value: vp.Encode(),
            Meta:  bitValuePointer,
        }); err != nil {
            return err
        }
    }

    return txn.Commit()
}

文件管理

1. 文件创建

func (vlog *valueLog) createVlogFile() (*logFile, error) {
    fid := atomic.AddUint32(&vlog.maxFid, 1)
    fname := vlog.fpath(fid)
    
    mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR, vlog.opt.ValueLogFileSize)
    if err != nil {
        return nil, err
    }

    lf := &logFile{
        MmapFile: mf,
        fid:      fid,
        path:     fname,
        opt:      vlog.opt,
    }

    // 写入文件头
    if err := lf.bootstrap(); err != nil {
        return nil, err
    }

    vlog.filesLock.Lock()
    vlog.filesMap[fid] = lf
    vlog.filesLock.Unlock()

    return lf, nil
}

2. 文件轮转

func (vlog *valueLog) sync() error {
    vlog.filesLock.RLock()
    curlf := vlog.filesMap[vlog.maxFid]
    vlog.filesLock.RUnlock()

    // 检查当前文件是否需要轮转
    if vlog.woffset() > vlog.opt.ValueLogFileSize-uint32(maxHeaderSize) {
        if err := curlf.doneWriting(vlog.woffset()); err != nil {
            return err
        }

        // 创建新文件
        newlf, err := vlog.createVlogFile()
        if err != nil {
            return err
        }

        vlog.writableLogOffset.Store(vlogHeaderSize)
        vlog.numEntriesWritten = 0
    }

    return curlf.Sync()
}

3. 文件删除

func (vlog *valueLog) deleteLogFile(lf *logFile) error {
    // 检查是否还有活跃迭代器
    if vlog.numActiveIterators.Load() > 0 {
        return nil  // 延迟删除
    }

    vlog.filesLock.Lock()
    delete(vlog.filesMap, lf.fid)
    vlog.filesLock.Unlock()

    // 删除文件
    if err := lf.Delete(); err != nil {
        return err
    }

    vlog.opt.Infof("Deleted value log file: %s", lf.path)
    return nil
}

动态阈值调整

1. 阈值监控

type vlogThreshold struct {
    percentile     float64           // 百分位数
    valueThreshold atomic.Int64      // 当前阈值
    valueCh        chan []int64      // 值大小通道
    vlMetrics      *z.HistogramData  // 直方图数据
}

func (v *vlogThreshold) update(sizes []int64) {
    v.vlMetrics.Update(sizes)
    
    // 根据百分位数计算新阈值
    newThreshold := v.vlMetrics.Percentile(v.percentile)
    v.valueThreshold.Store(int64(newThreshold))
}

2. 自适应调整

func (v *vlogThreshold) listenForValueThresholdUpdate() {
    defer v.closer.Done()
    
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case sizes := <-v.valueCh:
            v.update(sizes)
            
        case <-v.clearCh:
            v.vlMetrics.Clear()
            
        case <-ticker.C:
            // 定期调整阈值
            if v.vlMetrics.Count() > 1000 {
                v.update(nil)
            }
            
        case <-v.closer.HasBeenClosed():
            return
        }
    }
}

性能优化

1. 批量写入

func (vlog *valueLog) write(reqs []*request) error {
    // 合并多个请求减少系统调用
    totalSize := uint32(0)
    for _, req := range reqs {
        totalSize += estimateRequestSize(req)
    }

    // 预分配缓冲区
    buf := make([]byte, totalSize)
    written := 0

    for _, req := range reqs {
        for _, e := range req.Entries {
            entryBuf := vlog.encodeEntry(e, offset)
            copy(buf[written:], entryBuf)
            written += len(entryBuf)
        }
    }

    // 一次性写入
    return vlog.writeBuffer(buf)
}

2. 内存映射优化

func (lf *logFile) read(vp valuePointer) ([]byte, error) {
    // 直接从mmap内存读取,无需系统调用
    if vp.Offset >= uint32(len(lf.Data)) {
        return nil, fmt.Errorf("Invalid offset")
    }
    
    return lf.Data[vp.Offset:vp.Offset+vp.Len], nil
}

3. 并发控制优化

func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
    vlog.filesLock.RLock()
    defer vlog.filesLock.RUnlock()
    RES (
    lf, ok := vlog.filesMap[vp.Fid]
    if !ok {
        return nil, fmt.Errorf("File not found: %d", vp.Fid)
    }
    
    return lf, nil
}

配置建议

1. 值阈值设置

// 小文件多读场景
ValueThreshold: 32      // 32字节

// 大文件少读场景  
ValueThreshold: 2048    // 2KB

// 自适应模式
VLogPercentile: 0.95    // 95%分位数

2. 文件大小配置

// SSD环境
ValueLogFileSize: 1 << 30   // 1GB

// HDD环境
ValueLogFileSize: 2 << 30   // 2GB

// 内存有限环境
ValueLogFileSize: 128 << 20  // 128MB

3. GC配置

// 激进GC
GCDiscardRatio: 0.3     // 30%垃圾时触发

// 保守GC  
GCDiscardRatio: 0.7     // 70%垃圾时触发

Value Log的设计是Badger性能优异的关键因素,通过键值分离有效减少了写放大,同时通过智能的垃圾回收机制保证了空间利用率。理解其工作原理对于优化Badger在不同场景下的性能表现非常重要。

Badger事务与并发控制详解

概述

Badger采用MVCC(Multi-Version Concurrency Control)实现事务并发控制,支持快照隔离(Snapshot Isolation)级别的事务。通过Oracle组件管理时间戳分配和冲突检测,实现了高并发的读写操作。

核心组件

1. Oracle时间戳管理器

type oracle struct {
    isManaged       bool              // 是否托管模式
    detectConflicts bool              // 是否开启冲突检测
    nextTxnTs       uint64            // 下一个事务时间戳
    writeChLock     sync.Mutex        // 写入通道锁
    txnMark         *y.WaterMark      // 事务水位标记
    readMark        *y.WaterMark      // 读取水位标记
    discardTs       uint64            // 丢弃时间戳
    committedTxns   []committedTxn    // 已提交事务列表
    lastCleanupTs   uint64            // 最后清理时间戳
    closer          *z.Closer         // 关闭器
}

2. Transaction事务结构

type Txn struct {
    readTs          uint64                 // 读取时间戳
    commitTs        uint64                 // 提交时间戳
    size            int64                  // 事务大小
    count           int64                  // 条目数量
    db              *DB                    // 数据库引用
    reads           []uint64               // 读取键指纹
    conflictKeys    map[uint64]struct{}    // 冲突键指纹
    readsLock       sync.Mutex             // 读取锁
    pendingWrites   map[string]*Entry      // 待写入条目
    duplicateWrites []*Entry               // 重复写入条目
    numIterators    atomic.Int32           // 迭代器数量
    discarded       bool                   // 是否已丢弃
    doneRead        bool                   // 是否完成读取
    update          bool                   // 是否更新事务
}

MVCC机制详解

1. 时间戳分配

func (o *oracle) readTs() uint64 {
    if o.isManaged {
        panic("ReadTs should not be retrieved for managed DB")
    }

    var readTs uint64
    o.Lock()
    readTs = o.nextTxnTs - 1
    o.readMark.Begin(readTs)  // 开始读取标记
    o.Unlock()

    // 等待所有无冲突事务完成写入
    y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
    return readTs
}

func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
    o.Lock()
    defer o.Unlock()

    // 检查冲突
    if o.hasConflict(txn) {
        return 0, true  // 返回冲突标志
    }

    var ts uint64
    if !o.isManaged {
        o.doneRead(txn)
        o.cleanupCommittedTransactions()

        // 分配新的提交时间戳
        ts = o.nextTxnTs
        o.nextTxnTs++
        o.txnMark.Begin(ts)
    } else {
        ts = txn.commitTs  // 托管模式使用预设时间戳
    }

    // 记录已提交事务用于冲突检测
    if o.detectConflicts {
        o.committedTxns = append(o.committedTxns, committedTxn{
            ts:           ts,
            conflictKeys: txn.conflictKeys,
        })
    }

    return ts, false
}

2. 版本可见性规则

// 版本可见性判断
func isVisible(itemVersion, readTs uint64) bool {
    return itemVersion <= readTs
}

// 在Get操作中的应用
func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
    if len(key) == 0 {
        return nil, ErrEmptyKey
    }
    if txn.discarded {
        return nil, ErrDiscardedTxn
    }

    // 1. 首先检查事务内的写入缓存
    if txn.update {
        if e, has := txn.pendingWrites[string(key)]; has {
            if isDeletedOrExpired(e.meta, e.ExpiresAt) {
                return nil, ErrKeyNotFound
            }
            return &Item{e: e, txn: txn}, nil
        }
    }

    // 2. 添加到读取集合(用于冲突检测)
    txn.addReadKey(key)

    // 3. 从数据库读取,只读取版本 <= readTs 的数据
    seek := y.KeyWithTs(key, txn.readTs)
    vs, err := txn.db.get(seek)
    if err != nil {
        return nil, y.Wrapf(err, "DB::Get key: %q", key)
    }

    if vs.Meta&bitDelete > 0 {
        return nil, ErrKeyNotFound
    }

    item = &Item{
        key:   key,
        vptr:  vs.Value,
        meta:  vs.Meta,
        userMeta: vs.UserMeta,
        expiresAt: vs.ExpiresAt,
        version: y.ParseTs(vs.Value),
        txn:   txn,
        db:    txn.db,
    }

    return item, nil
}

冲突检测机制

1. 冲突键跟踪

func (txn *Txn) addReadKey(key []byte) {
    if !txn.update || !txn.db.opt.DetectConflicts {
        return
    }

    txn.readsLock.Lock()
    defer txn.readsLock.Unlock()

    fp := z.MemHash(key)
    // 添加到读取集合
    txn.reads = append(txn.reads, fp)
}

func (txn *Txn) modify(e *Entry) error {
    if txn.discarded {
        return ErrDiscardedTxn
    }

    if !txn.update {
        return ErrReadOnlyTxn
    }

    // 记录写入键用于冲突检测
    if txn.db.opt.DetectConflicts {
        fp := z.MemHash(e.Key)
        if txn.conflictKeys == nil {
            txn.conflictKeys = make(map[uint64]struct{})
        }
        txn.conflictKeys[fp] = struct{}{}
    }

    txn.pendingWrites[string(e.Key)] = e
    return nil
}

2. 冲突检测算法

func (o *oracle) hasConflict(txn *Txn) bool {
    if len(txn.reads) == 0 {
        return false
    }

    for _, committedTxn := range o.committedTxns {
        // 如果已提交事务的时间戳 <= 当前事务的读时间戳
        // 说明已提交事务在当前事务开始前完成,无需检查冲突
        if committedTxn.ts <= txn.readTs {
            continue
        }

        // 检查读写冲突:当前事务读取的键是否被后续事务写入
        for _, readKey := range txn.reads {
            if _, has := committedTxn.conflictKeys[readKey]; has {
                return true  // 发现冲突
            }
        }
    }

    return false
}

3. 冲突处理策略

func (txn *Txn) Commit() error {
    // 预检查
    if err := txn.commitPrecheck(); err != nil {
        return err
    }

    // 尝试获取提交时间戳
    commitTs, conflict := txn.db.orc.newCommitTs(txn)
    if conflict {
        // 发生冲突,回滚事务
        return ErrConflict
    }

    txn.commitTs = commitTs

    // 执行提交
    callback, err := txn.commitAndSend()
    if err != nil {
        return err
    }

    // 等待写入完成
    return callback()
}

事务生命周期

1. 事务创建

func (db *DB) NewTransaction(update bool) *Txn {
    return db.newTransaction(update, false)
}

func (db *DB) newTransaction(update, isManaged bool) *Txn {
    if db.IsClosed() {
        panic(ErrDBClosed)
    }

    txn := &Txn{
        update:        update,
        db:            db,
        count:         1,                    // 自身占用一个计数
        size:          int64(len(txnKey)),   // 事务标记键的大小
        discarded:     false,
        pendingWrites: make(map[string]*Entry),
    }

    if !isManaged {
        txn.readTs = db.orc.readTs()  // 分配读时间戳
    }

    return txn
}

2. 写入操作

func (txn *Txn) Set(key, val []byte) error {
    e := NewEntry(key, val)
    return txn.SetEntry(e)
}

func (txn *Txn) SetEntry(e *Entry) error {
    return txn.modify(e)
}

func (txn *Txn) Delete(key []byte) error {
    e := NewEntry(key, nil).WithMeta(bitDelete)
    return txn.modify(e)
}

3. 提交过程

func (txn *Txn) commitAndSend() (func() error, error) {
    // 1. 构建写入条目
    var entries []*Entry
    for _, e := range txn.pendingWrites {
        // 设置版本信息
        e.Key = y.KeyWithTs(e.Key, txn.commitTs)
        e.meta |= bitTxn  // 标记为事务条目
        entries = append(entries, e)
    }

    // 2. 添加事务结束标记
    e := &Entry{
        Key:   y.KeyWithTs(txnKey, txn.commitTs),
        meta:  bitFinTxn,
    }
    entries = append(entries, e)

    // 3. 发送到写入通道
    req, err := txn.db.sendToWriteCh(entries)
    if err != nil {
        return nil, err
    }

    // 4. 返回等待函数
    ret := func() error {
        err := req.Wait()
        // 标记事务完成
        txn.db.orc.doneCommit(txn.commitTs)
        return err
    }

    return ret, nil
}

4. 事务清理

func (txn *Txn) Discard() {
    if txn.discarded {
        return
    }

    // 等待所有迭代器关闭
    if atomic.LoadInt32(&txn.numIterators) > 0 {
        panic("Unclosed iterator at time of Txn.Discard.")
    }

    txn.discarded = true
    if !txn.db.orc.isManaged {
        txn.db.orc.doneRead(txn)  // 标记读取完成
    }
}

WaterMark水位标记

1. WaterMark机制

// WaterMark用于跟踪进行中的操作
type WaterMark struct {
    Name     string
    markIdx  uint64
    doneUntil uint64
    waiters   map[uint64][]chan struct{}
    elog      trace.EventLog
    lock      sync.Mutex
}

func (w *WaterMark) Begin(index uint64) {
    w.lock.Lock()
    defer w.lock.Unlock()
    
    w.markIdx = index
    w.waiters[index] = make([]chan struct{}, 0)
}

func (w *WaterMark) Done(index uint64) {
    w.lock.Lock()
    defer w.lock.Unlock()
    
    // 通知等待者
    if chs, ok := w.waiters[index]; ok {
        for _, ch := range chs {
            close(ch)
        }
        delete(w.waiters, index)
    }
    
    // 更新doneUntil
    if index == w.doneUntil+1 {
        w.doneUntil = index
        // 连续更新doneUntil
        for {
            if _, ok := w.waiters[w.doneUntil+1]; !ok {
                w.doneUntil++
            } else {
                break
            }
        }
    }
}

2. 垃圾回收支持

func (o *oracle) discardAtOrBelow() uint64 {
    if o.isManaged {
        o.Lock()
        defer o.Unlock()
        return o.discardTs
    }
    // 返回所有读取操作都已完成的最大时间戳
    return o.readMark.DoneUntil()
}

func (o *oracle) cleanupCommittedTransactions() {
    if !o.detectConflicts {
        return
    }

    // 清理过期的已提交事务记录
    discardBelow := o.discardAtOrBelow()
    if discardBelow > o.lastCleanupTs {
        // 移除时间戳 <= discardBelow 的事务
        var newCommittedTxns []committedTxn
        for _, txn := range o.committedTxns {
            if txn.ts > discardBelow {
                newCommittedTxns = append(newCommittedTxns, txn)
            }
        }
        o.committedTxns = newCommittedTxns
        o.lastCleanupTs = discardBelow
    }
}

托管事务模式

1. 托管vs非托管

// 非托管模式:Badger自动分配时间戳
func (db *DB) Update(fn func(txn *Txn) error) error {
    txn := db.NewTransaction(true)
    defer txn.Discard()
    
    if err := fn(txn); err != nil {
        return err
    }
    
    return txn.Commit()  // 自动分配commitTs
}

// 托管模式:用户控制时间戳
func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {
    wb := db.NewWriteBatch()
    wb.commitTs = commitTs
    return wb
}

2. 托管模式优势

  • 确定性重放:用户控制时间戳,便于复制和恢复
  • 批量操作:可以批量提交多个事务
  • 外部一致性:与外部系统保持时间戳一致性

读写隔离级别

1. 快照隔离保证

// 事务看到的是readTs时刻的快照
func (txn *Txn) Get(key []byte) (*Item, error) {
    // 只能看到版本 <= readTs 的数据
    seek := y.KeyWithTs(key, txn.readTs)
    vs, err := txn.db.get(seek)
    // ...
}

// 写入时使用commitTs作为版本
func (txn *Txn) commitAndSend() {
    for _, e := range txn.pendingWrites {
        e.Key = y.KeyWithTs(e.Key, txn.commitTs)
        // ...
    }
}

2. 读一致性

  • 单调读:同一事务内多次读取同一键得到相同结果
  • 读已提交:只能读取到已提交的数据
  • 快照隔离:事务开始时获得数据库快照

性能优化

1. 批量操作

func (db *DB) Update(fn func(txn *Txn) error) error {
    // 自动管理事务生命周期
    txn := db.NewTransaction(true)
    defer txn.Discard()
    
    return fn(txn)
}

// 批量写入
type WriteBatch struct {
    txn       *Txn
    commitTs  uint64
    entries   []*Entry
}

func (wb *WriteBatch) Flush() error {
    // 批量提交所有操作
    return wb.txn.Commit()
}

2. 冲突检测优化

// 可选择性启用冲突检测
opt := DefaultOptions(path)
opt.DetectConflicts = false  // 禁用以提高性能

// 使用指纹而非完整键进行冲突检测
fp := z.MemHash(key)  // 64位指纹,降低内存使用

3. 内存管理

// 限制事务大小
func (txn *Txn) checkSize(e *Entry) error {
    count := int64(len(e.Key) + len(e.Value) + 1)
    if txn.count+count >= txn.db.opt.maxBatchCount {
        return ErrTxnTooBig
    }
    if txn.size+count >= txn.db.opt.maxBatchSize {
        return ErrTxnTooBig
    }
    
    txn.count += count
    txn.size += int64(e.EstimateSize(txn.db.opt.ValueThreshold))
    return nil
}

配置参数

1. 冲突检测配置

opt.DetectConflicts = true   // 启用冲突检测
opt.NumCompactors = 2        // 压缩线程数影响清理速度

2. 事务大小限制

opt.MaxBatchCount = 100000   // 最大批次条目数
opt.MaxBatchSize = 15MB      // 最大批次大小

3. 托管模式配置

opt.ManagedTxns = true       // 启用托管事务模式

Badger的MVCC实现提供了高性能的并发控制,通过时间戳排序和冲突检测保证了事务的ACID特性,同时支持高并发的读写操作。理解其工作原理有助于正确使用事务功能并优化应用性能。

Badger SSTable文件格式详解

概述

SSTable(Sorted String Table)是Badger中的不可变数据文件格式,存储经过排序的键值对。每个SSTable文件都包含数据块、索引、布隆过滤器等组件,通过精心设计的格式实现高效的查找和压缩。

文件整体结构

+================+
|   Data Blocks  |  ← 数据块区域(压缩)
|      ...       |
+================+
|   Index Block  |  ← 索引块(FlatBuffer格式)
+================+
| Bloom Filter   |  ← 布隆过滤器(可选)
+================+
|   Checksum     |  ← 文件校验和(8字节)
+================+
|  Index Offset  |  ← 索引偏移量(8字节)
+================+
|  Index Length  |  ← 索引长度(4字节)
+================+
|  Footer Magic  |  ← 文件魔数(4字节)
+================+

数据块(Data Blocks)

1. 块内结构

Block内部格式:
+--------+--------+-----+--------+--------+-----+
| Entry1 | Entry2 | ... | EntryN | Restart| CRC |
+--------+--------+-----+--------+--------+-----+

Entry格式:
+----------+----------+----------+----------+----------+
| SharedLen| UnsharedLen| ValueLen | Key     | Value   |
+----------+----------+----------+----------+----------+
|    4B    |     4B    |    4B    |   Var   |   Var   |

2. 键压缩机制

// 前缀压缩减少存储空间
type blockBuilder struct {
    data            []byte
    restarts        []uint32  // 重启点偏移
    counter         int       // 当前条目计数
    prevKey         []byte    // 前一个键
    restartInterval int       // 重启间隔
}

func (b *blockBuilder) Add(key, value []byte, valuePointer bool) {
    // 计算与前一个键的共同前缀长度
    shared := 0
    if b.counter < b.restartInterval {
        shared = b.sharedPrefixLen(b.prevKey, key)
    }

    unshared := len(key) - shared
    
    // 编码条目
    b.append4Byte(uint32(shared))     // 共享前缀长度
    b.append4Byte(uint32(unshared))   // 非共享部分长度
    b.append4Byte(uint32(len(value))) // 值长度
    
    b.data = append(b.data, key[shared:]...)  // 非共享键部分
    b.data = append(b.data, value...)         // 值

    if b.counter == 0 || b.counter >= b.restartInterval {
        // 设置重启点
        b.restarts = append(b.restarts, uint32(len(b.data)))
        b.counter = 0
    }
    
    b.prevKey = append(b.prevKey[:0], key...)
    b.counter++
}

3. 块尾部信息

// 块结束时添加重启点信息
func (b *blockBuilder) Finish() []byte {
    // 1. 添加重启点数组
    for _, restart := range b.restarts {
        b.append4Byte(restart)
    }
    
    // 2. 添加重启点数量
    b.append4Byte(uint32(len(b.restarts)))
    
    // 3. 计算并添加CRC32校验和
    checksum := crc32.Checksum(b.data, y.CastagnoliCrcTable)
    b.append4Byte(checksum)
    
    return b.data
}

索引结构(FlatBuffer)

1. TableIndex定义

// table.fbs
namespace fb;

table BlockOffset {
    key: [ubyte];     // 块的第一个键
    offset: uint64;   // 块在文件中的偏移
    len: uint32;      // 块的长度
}

table TableIndex {
    offsets: [BlockOffset];     // 块偏移数组
    bloom_filter: [ubyte];      // 布隆过滤器数据
    max_version: uint64;        // 最大版本号
    key_count: uint32;          // 键数量
    uncompressed_size: uint32;  // 未压缩大小
    on_disk_size: uint32;       // 磁盘大小
    stale_data_size: uint32;    // 过期数据大小
}

2. 索引构建

func (b *Builder) buildIndex() *fb.TableIndexT {
    index := &fb.TableIndexT{
        Offsets:          make([]*fb.BlockOffsetT, 0, len(b.blockList)),
        BloomFilter:      b.bloom.JSONMarshal(),
        MaxVersion:       b.maxVersion,
        KeyCount:         uint32(b.keyCount),
        UncompressedSize: uint32(b.uncompressedSize),
        OnDiskSize:       uint32(len(b.buf)),
    }

    // 构建块偏移数组
    for _, block := range b.blockList {
        offset := &fb.BlockOffsetT{
            Key:    block.firstKey,
            Offset: uint64(block.offset),
            Len:    uint32(block.len),
        }
        index.Offsets = append(index.Offsets, offset)
    }

    return index
}

3. 索引序列化

func (b *Builder) finishIndex() []byte {
    // 使用FlatBuffer序列化索引
    builder := flatbuffers.NewBuilder(1024)
    
    index := b.buildIndex()
    indexOffset := index.Pack(builder)
    
    builder.Finish(indexOffset)
    return builder.FinishedBytes()
}

布隆过滤器

1. 布隆过滤器构建

type Bloom struct {
    bitmap   []byte    // 位图
    k        uint8     // 哈希函数数量
    numBits  uint32    // 位数
    numKeys  uint32    // 键数量
}

func NewBloomFilter(numEntries int, fp float64) *Bloom {
    // 计算最优参数
    bitsPerKey := -1.44 * math.Log2(fp)  // 每个键需要的位数
    numBits := uint32(float64(numEntries) * bitsPerKey)
    numHashFuncs := uint8(bitsPerKey * 0.693)  // ln(2)

    return &Bloom{
        bitmap:  make([]byte, (numBits+7)/8),
        k:       numHashFuncs,
        numBits: numBits,
    }
}

func (bloom *Bloom) Add(key []byte) {
    h := z.MemHash(key)
    delta := h>>17 | h<<15  // 第二个哈希函数
    
    for i := uint8(0); i < bloom.k; i++ {
        bitPos := h % bloom.numBits
        bloom.bitmap[bitPos/8] |= 1 << (bitPos % 8)
        h += delta
    }
}

func (bloom *Bloom) MayContain(key []byte) bool {
    h := z.MemHash(key)
    delta := h>>17 | h<<15
    
    for i := uint8(0); i < bloom.k; i++ {
        bitPos := h % bloom.numBits
        if bloom.bitmap[bitPos/8]&(1<<(bitPos%8)) == 0 {
            return false  // 肯定不存在
        }
        h += delta
    }
    return true  // 可能存在
}

2. 布隆过滤器优化

// 分块布隆过滤器,减少缓存缺失
type BlockedBloom struct {
    data       []byte
    numProbes  int
    numBlocks  uint32
    blockMask  uint32
}

func (bf *BlockedBloom) MayContain(key []byte) bool {
    h := z.MemHash(key)
    
    // 选择块
    blockIdx := (h >> 11 | h << 21) & bf.blockMask
    block := bf.data[blockIdx*32 : (blockIdx+1)*32]  // 32字节块
    
    // 在块内进行多次探测
    for i := 0; i < bf.numProbes; i++ {
        bitPos := h & 255  // 块内位置(0-255)
        if block[bitPos/8]&(1<<(bitPos%8)) == 0 {
            return false
        }
        h += h>>17 | h<<15
    }
    return true
}

压缩机制

1. 块级压缩

func (b *Builder) finishBlock() error {
    if b.curBlock.IsEmpty() {
        return nil
    }

    // 获取原始数据
    data := b.curBlock.Finish()
    
    var compressed []byte
    var compressionType uint32
    
    switch b.opts.Compression {
    case options.None:
        compressed = data
        compressionType = 0
        
    case options.Snappy:
        compressed = snappy.Encode(nil, data)
        compressionType = 1
        
    case options.ZSTD:
        compressed = zstd.Compress(nil, data)
        compressionType = 2
    }

    // 选择压缩效果更好的版本
    if len(compressed) < len(data) {
        data = compressed
    } else {
        compressionType = 0  // 使用未压缩版本
    }

    // 写入压缩类型标记
    data = append(data, byte(compressionType))
    
    b.writeBlock(data)
    return nil
}

2. 压缩策略

// 自适应压缩:根据数据特征选择压缩算法
func (b *Builder) chooseCompression(data []byte) options.CompressionType {
    if len(data) < 1024 {
        return options.None  // 小块不压缩
    }
    
    // 计算数据熵
    entropy := calculateEntropy(data)
    if entropy > 7.5 {
        return options.None  // 高熵数据压缩效果差
    }
    
    if entropy > 6.0 {
        return options.Snappy  // 中等熵使用Snappy
    }
    
    return options.ZSTD  // 低熵使用ZSTD
}

读取机制

1. 表打开流程

func OpenTable(mf *z.MmapFile, opts table.Options) (*Table, error) {
    t := &Table{
        mf:   mf,
        opts: opts,
    }

    // 1. 读取并验证Footer
    if err := t.readFooter(); err != nil {
        return nil, err
    }

    // 2. 读取索引
    if err := t.readIndex(); err != nil {
        return nil, err
    }

    // 3. 验证校验和
    if err := t.verifyChecksum(); err != nil {
        return nil, err
    }

    return t, nil
}

2. 索引查找

func (t *Table) search(key []byte, maxVs *y.ValueStruct) (y.ValueStruct, error) {
    // 1. 布隆过滤器预检查
    if t.index.BloomFilter != nil {
        if !t.bloomFilter.MayContain(key) {
            return y.ValueStruct{}, ErrKeyNotFound
        }
    }

    // 2. 二分查找确定块
    blockIdx := t.findBlock(key)
    if blockIdx < 0 {
        return y.ValueStruct{}, ErrKeyNotFound
    }

    // 3. 从缓存或磁盘读取块
    block, err := t.getBlock(blockIdx)
    if err != nil {
        return y.ValueStruct{}, err
    }

    // 4. 在块内查找
    return block.search(key, maxVs)
}

3. 块级缓存

type Table struct {
    mf          *z.MmapFile
    index       *fb.TableIndex
    bloomFilter *Bloom
    blockCache  *ristretto.Cache  // 块缓存
}

func (t *Table) getBlock(idx int) (*Block, error) {
    // 1. 尝试从缓存获取
    if cached, found := t.blockCache.Get(t.blockCacheKey(idx)); found {
        return cached.(*Block), nil
    }

    // 2. 从磁盘读取
    offset := t.index.Offsets[idx]
    data := t.mf.Data[offset.Offset:offset.Offset+uint64(offset.Len)]
    
    // 3. 解压缩
    block, err := t.decompressBlock(data)
    if err != nil {
        return nil, err
    }

    // 4. 缓存块
    t.blockCache.Set(t.blockCacheKey(idx), block, int64(len(data)))
    
    return block, nil
}

迭代器实现

1. 表迭代器

type Iterator struct {
    t           *Table
    blockIdx    int
    blockIter   *blockIterator
    err         error
    reversed    bool
}

func (it *Iterator) seekToFirst() {
    it.blockIdx = 0
    it.loadBlock()
    if it.blockIter != nil {
        it.blockIter.seekToFirst()
    }
}

func (it *Iterator) Next() {
    if it.blockIter != nil {
        it.blockIter.Next()
        if !it.blockIter.Valid() {
            // 当前块已结束,移动到下一块
            it.blockIdx++
            it.loadBlock()
            if it.blockIter != nil {
                it.blockIter.seekToFirst()
            }
        }
    }
}

2. 块内迭代器

type blockIterator struct {
    data         []byte    // 块数据
    restarts     []uint32  // 重启点
    offset       uint32    // 当前偏移
    key          []byte    // 当前键
    value        []byte    // 当前值
    entryOffset  uint32    // 条目偏移
}

func (it *blockIterator) parseNext() {
    if it.offset >= uint32(len(it.data)) {
        return
    }

    // 读取条目头部
    shared := binary.LittleEndian.Uint32(it.data[it.offset:])
    it.offset += 4
    
    unshared := binary.LittleEndian.Uint32(it.data[it.offset:])
    it.offset += 4
    
    valueLen := binary.LittleEndian.Uint32(it.data[it.offset:])
    it.offset += 4

    // 重构完整键
    it.key = it.key[:shared]
    it.key = append(it.key, it.data[it.offset:it.offset+unshared]...)
    it.offset += unshared

    // 读取值
    it.value = it.data[it.offset:it.offset+valueLen]
    it.offset += valueLen
}

性能优化

1. 预取优化

func (t *Table) prefetchBlocks(startIdx, count int) {
    for i := 0; i < count && startIdx+i < len(t.index.Offsets); i++ {
        idx := startIdx + i
        go func(blockIdx int) {
            t.getBlock(blockIdx)  // 异步预取
        }(idx)
    }
}

2. 内存对齐

// 确保关键数据结构内存对齐
type alignedBlock struct {
    _        [0]uint64  // 确保8字节对齐
    data     []byte
    checksum uint32
    _        [4]byte    // 填充到8字节边界
}

3. SIMD优化(布隆过滤器)

// 使用SIMD指令优化布隆过滤器
func (bf *Bloom) mayContainSIMD(key []byte) bool {
    // 在支持的平台上使用SIMD指令
    return bf.mayContainAVX2(key)
}

文件管理

1. 原子创建

func CreateTable(fname string, builder *Builder) (*Table, error) {
    // 1. 写入临时文件
    tmpName := fname + ".tmp"
    bd := builder.Done()
    
    mf, err := z.OpenMmapFile(tmpName, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size)
    if err != nil {
        return nil, err
    }

    // 2. 写入数据
    written := bd.Copy(mf.Data)
    if written != len(mf.Data) {
        return nil, fmt.Errorf("written %d != expected %d", written, len(mf.Data))
    }

    // 3. 同步到磁盘
    if err := mf.Sync(); err != nil {
        return nil, err
    }

    // 4. 原子重命名
    if err := os.Rename(tmpName, fname); err != nil {
        return nil, err
    }

    return OpenTable(mf, builder.opts)
}

2. 错误恢复

func (t *Table) verifyIntegrity() error {
    // 1. 检查魔数
    if t.footerMagic != tableFooterMagic {
        return ErrInvalidMagic
    }

    // 2. 验证索引校验和
    if err := t.verifyIndexChecksum(); err != nil {
        return err
    }

    // 3. 抽样验证数据块
    return t.verifyDataBlocks()
}

配置参数

1. 块大小配置

BlockSize: 4096        // 4KB,适合SSD
BlockSize: 16384       // 16KB,适合HDD

2. 压缩配置

Compression: options.ZSTD     // 高压缩比
Compression: options.Snappy   // 平衡性能和压缩比
Compression: options.None     // 无压缩,最快

3. 布隆过滤器配置

BloomFalsePositive: 0.01      // 1%假阳性率
BloomFalsePositive: 0.001     // 0.1%假阳性率(更多内存)

SSTable的设计在存储效率、查询性能和内存使用之间取得了很好的平衡,通过块级组织、前缀压缩、布隆过滤器等技术实现了高效的数据存储和访问。