当前位置: 首页 > news >正文

RocksDB 核心入口:DB类源码解析

rocksdb::DB 

class DB 是所有 RocksDB 数据库操作的统一入口核心抽象基类。可以把它理解为与数据库实例进行交互的“遥控器”。它定义了所有用户可以调用的接口(API),但它本身不包含具体的实现逻辑。

从代码中可以看到,它是一个抽象类:

// ... existing code ...
// A DB is a persistent, versioned ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
// DB is an abstract base class with one primary implementation (DBImpl)
// and a number of wrapper implementations.
class DB {public:
// ...// Abstract class ctorDB() {}// No copying allowedDB(const DB&) = delete;void operator=(const DB&) = delete;
// ...// 很多纯虚函数,例如:virtual Status Put(const WriteOptions& options,ColumnFamilyHandle* column_family, const Slice& key,const Slice& value) = 0;
// ...
};

关键点:

  1. 抽象接口 (Interface)DB 类通过大量纯虚函数(virtual ... = 0;)定义了一套完整的操作规范。任何想要成为一个“RocksDB数据库”的类,都必须继承自 DB 并实现这些接口。
  2. 线程安全: 如注释所说,DB 的实现被设计为线程安全的。多个线程可以同时对同一个 DB 实例进行读写操作,无需外部加锁。
  3. 主要实现 DBImpl: RocksDB 官方提供了 DB 的主要实现类 DBImpl(定义在 db/db_impl/db_impl.h)。我们通常使用的就是 DBImpl 的实例。
  4. 可堆叠的装饰器: RocksDB 还有很多基于 DB 的包装类(Wrapper),比如 TransactionDBBlobDB 等。它们也继承自 DB,内部持有一个 DB 指针(通常是 DBImpl),在实现自己的特性(如事务、大 Value 存储)之外,将其他通用操作转发给内部的 DB 实例。这是一种典型的装饰器(Decorator)设计模式

核心用法分析 (API 导览)

DB 类的公共接口(public:部分)可以分为以下几大类:

1. 打开和关闭数据库 (Lifecycle)

用户不能直接 new DB(),而是必须通过静态工厂方法 Open 系列来创建和加载一个数据库实例。

  • static Status Open(...): 这是最常用的方法,用于以读写模式打开数据库。

    • 它有多个重载版本,支持只打开默认列族,或同时打开多个列族(ColumnFamily)。
    • 参数 const Options& options 或 const DBOptions& db_options / const ColumnFamilyDescriptor& column_families 用于传入丰富的配置项。
    • 返回的数据库指针通过出参 std::unique_ptr<DB>* dbptr (推荐) 或 DB** dbptr (已废弃) 获得。
  • static Status OpenForReadOnly(...): 以只读模式打开数据库。在此模式下,任何写操作都会失败,且后台的自动 Flush 和 Compaction 也不会进行。适用于数据分析、备份等场景。

  • static Status OpenAsSecondary(...): 以从实例(Secondary Instance)模式打开。这是一种特殊的只读模式,从实例可以与主实例共享 SST 文件,并能通过 TryCatchUpWithPrimary() 方法追赶主实例的数据更新。常用于构建读写分离架构中的只读副本。

  • static Status ListColumnFamilies(...): 在打开数据库之前,可以通过这个静态方法查询一个数据库路径下存在哪些列族。

  • virtual Status Close(): 关闭数据库,释放资源。这是一个实例方法,在销毁 DB 对象前调用可以获取关闭过程中的状态。

2. 写操作 (Write Operations)
  • virtual Status Put(...): 写入或覆盖一个键值对。
  • virtual Status Delete(...): 删除一个键。
  • virtual Status SingleDelete(...): 一种优化的删除操作,用于确定 key 不会再被写入或 Merge 的场景。
  • virtual Status Merge(...): 执行“读-修改-写”操作。需要用户预先定义一个 MergeOperator
  • virtual Status Write(const WriteOptions&, WriteBatch* updates)最重要、最强大的写操作接口。它通过 WriteBatch 实现原子性的批量写入。可以将多个 PutDeleteMerge 操作打包在一起,一次性提交,保证它们要么全部成功,要么全部失败。这是保证数据一致性和提升写入性能的关键。
3. 读操作 (Read Operations)
  • virtual Status Get(...): 根据 Key 读取 Value。
  • virtual void MultiGet(...): 一次性读取多个 Key 的 Value,比循环调用 Get 更高效。
  • virtual bool KeyMayExist(...): 一个性能优化接口。它利用 Bloom Filter 等内存结构快速判断一个 Key 是否可能存在。如果返回 false,则 Key 一定不存在;如果返回 true,则 Key 可能存在,需要调用 Get 进一步确认。这可以避免对不存在的 Key 进行昂贵的磁盘 I/O。
  • virtual Iterator* NewIterator(...): 创建一个迭代器,用于按序扫描(正序或逆序)一个 Key 区间内的数据。这是范围查询的基础。
4. 列族管理 (Column Family Management)
  • virtual Status CreateColumnFamily(...): 在数据库运行时动态创建一个新的列族。
  • virtual Status DropColumnFamilies(...): 动态删除一个或多个列族。
  • virtual Status DestroyColumnFamilyHandle(...): 销毁列族句柄,释放资源。
5. 数据库管理与监控
  • virtual const Snapshot* GetSnapshot(): 获取数据库在当前时间点的一个一致性视图(快照)。后续的读操作可以使用这个快照,读取到的数据不受之后写操作的影响。
  • virtual Status Flush(...): 手动触发将内存中的 MemTable 数据刷写到磁盘(SST 文件)。
  • virtual Status CompactRange(...): 手动触发对指定 Key 范围的数据进行 Compaction。
  • virtual bool GetProperty(...) / GetIntProperty(...): 获取数据库的各种内部状态和统计信息。db.h 中定义了大量的属性字符串常量(如 kNumFilesAtLevelPrefixkCurSizeAllMemTables 等),用于查询不同维度的信息。

实现关联

如前所述,DB 是一个接口,它的行为最终由具体的实现类完成,主要是 DBImpl

  • 当你调用 DB::Open(...) 时,其内部会创建一个 DBImpl 对象,并执行复杂的初始化流程:读取 MANIFEST 文件,恢复元数据,回放 WAL 日志等。成功后,将 DBImpl 对象的指针向上转型为 DB* 返回给你。
  • 当你调用 db->Put(...) 时,实际上是调用了 DBImpl::Put(...)DBImpl 会协调整个写入流程:
    1. 将操作写入 WAL (Write-Ahead Log) 以保证持久性。
    2. 将数据写入内存中的 MemTable
    3. 当 MemTable 写满后,调度后台线程将其 Flush 成磁盘上的 SST 文件。
  • 当调用 db->Get(...) 时,DBImpl::Get(...) 会构建一个查找流程:
    1. 先查找当前 active 的 MemTable
    2. 再依次查找 immutable 的 MemTable 列表。
    3. 最后在磁盘上的各层级 SST 文件中查找。
  • 所有其他的 DB 接口调用,都会最终路由到 DBImpl 中对应的实现函数,由 DBImpl 来调度和管理 VersionSetMemTableListWalManager, 后台线程池等内部组件来完成任务。

 

写组(Write Group)机制

这是一种为了提高写入吞吐量而设计的并发优化策略。它的核心思想是:将多个并发的写入请求合并成一个大的请求,然后一次性地执行底层 I/O 操作(主要是写 WAL 日志),从而摊销 I/O 开销。

写组机制就像是让一辆大巴车(Leader)载着一车乘客(Follower)一起通过收费站,只缴一次费,效率就大大提高了。

在 WriteThread 类中,定义了 Writer 结构体和几个关键的状态,它们是理解写组的基础。

  • WriteThread::Writer: 每个发起写入请求的线程都会在自己的栈上创建一个 Writer 对象。这个对象封装了本次写入的所有信息,包括 WriteBatchWriteOptions、回调函数、状态等。它就像是每个乘客的车票和行李。

  • Writer 的状态 (w.state):

    • STATE_INIT: 初始状态。
    • STATE_GROUP_LEADER: 表示当前 Writer 对应的线程成为了写组的 Leader
    • STATE_COMPLETED: 表示写入已完成。对于 Follower 来说,这是它等待结束后的状态;对于 Leader 来说,这是它完成所有工作后的最终状态。
    • (还有其他用于 Pipelined Write 的状态,我们暂时不关注)。

下面我们以最经典的标准写入路径 DBImpl::WriteImpl 为例,一步步拆解 Leader 和 Follower 的协作过程。

进入写组 (JoinBatchGroup)

当一个线程调用 DBImpl::Write -> DBImpl::WriteImpl 时,它会执行以下关键代码:

// ... existing code ...WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,log_ref, disable_memtable, batch_cnt,pre_release_callback, post_memtable_callback, wbwi);write_thread_.JoinBatchGroup(&w);
// ... existing code ...

write_thread_.JoinBatchGroup(&w) 是整个协作的起点。WriteThread 内部维护了一个队列。

  • 场景 A:队列为空

    • 当前线程发现队列是空的,它就成为了 Leader
    • 它会将自己的 Writer 对象(w)设置为 Leader,并将 w.state 标记为 STATE_GROUP_LEADER
    • 然后,它不会立即开始工作,而是会短暂地等待一小段时间(通过 cv_.TimedWait()),看看有没有其他线程(Follower)紧随其后加入。这给了其他并发写请求一个“上车”的机会。
  • 场景 B:队列已有 Leader

    • 当前线程发现队列中已经有一个 Leader 了。
    • 它就成为了 Follower
    • 它会将自己的 Writer 对象(w)链接到 Leader 的 Writer 链表中。
    • 然后,它会进入等待状态 (cv_.Wait()),直到 Leader 完成所有工作并唤醒它。

Leader 的工作

当 Leader 的短暂等待时间结束,或者写组达到一定规模后,Leader 就开始工作了。

  1. 构建写组 (EnterAsBatchGroupLeader): Leader 会遍历链表,将所有 Follower 的 Writer 对象收集起来,形成一个 WriteGroup。这个 WriteGroup 就是一个 Writer 的集合。

  2. 合并批处理 (MergeBatch): Leader 会创建一个大的 WriteBatch,然后遍历 WriteGroup,将每个 Writer 的 WriteBatch 内容合并到这个大的 WriteBatch 中。

  3. 分配序列号: Leader 会为整个合并后的大 WriteBatch 一次性地从版本管理器(versions_)中申请一块连续的序列号(Sequence Number)。

  4. 写入 WAL (WriteToWAL): 这是最关键的 I/O 操作。Leader 将合并后的大 WriteBatch 一次性写入到 WAL 文件中。如果任何一个 Writer 的 WriteOptions 要求 sync,Leader 就会在写完 WAL 后执行一次 fsync,确保数据落盘。这是写组机制性能提升的核心所在:N 次写入被合并为 1 次 WAL 写入和最多 1 次 fsync

  5. 写入 MemTable (InsertInto): WAL 写入成功后,Leader 会将大 WriteBatch 中的数据应用到对应的 MemTable 中。

  6. 发布序列号 (SetLastSequence): 所有数据都写入 MemTable 后,Leader 会更新数据库的全局最新序列号。此时,这批写入的数据才对读请求完全可见。

结束与唤醒 (ExitAsBatchGroupLeader)

Leader 完成所有工作后,会执行收尾工作:

  1. 设置最终状态: Leader 将最终的执行结果(成功或失败的 Status)设置到 WriteGroup 中。
  2. 唤醒 Follower: Leader 会遍历 WriteGroup,将最终状态赋给每一个 Follower 的 Writer 对象,并将它们的状态设置为 STATE_COMPLETED。然后,它会广播信号 (cv_.SignalAll()),唤醒所有正在等待的 Follower 线程。

Follower 的苏醒

被唤醒的 Follower 线程从 JoinBatchGroup 的等待中返回。它会发现自己的 w.state 已经变成了 STATE_COMPLETED。它从自己的 w 对象中获取最终的执行状态 w.FinalStatus(),然后从 WriteImpl 函数返回,就像是自己独立完成了写入一样。

总结

角色职责关键行为
Leader成为写组的负责人,执行实际的合并、写入和同步操作。JoinBatchGroup (成为 Leader) -> EnterAsBatchGroupLeader -> MergeBatch -> WriteToWAL -> InsertInto -> ExitAsBatchGroupLeader (唤醒 Follower)
Follower将自己的写入请求(Writer)交给 Leader,然后等待 Leader 完成工作。JoinBatchGroup (成为 Follower) -> cv_.Wait() (等待) -> 被唤醒 -> 获取最终状态 -> 返回

DB::Put

这里考虑带有时间戳(timestamp)的版本,会以此为起点,并结合整个写入链路,详细解析 RocksDB 的一次写入决策过程。

整个写入过程可以概括为:API 调用 -> 构造 WriteBatch -> 核心写入逻辑(进入写队列、写 WAL、写 MemTable)-> 可能触发 Flush

第 1 步: DBImpl::Put - API 入口与参数校验

首先,应用层调用的 DB::Put 是一个虚函数,其具体实现位于 DBImpl 类中。对于带时间戳的 Put 操作,入口是 DBImpl::Put

db_impl_write.cc

// ... existing code ...
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,const Slice& key, const Slice& ts, const Slice& val) {const Status s = FailIfTsMismatchCf(column_family, ts);if (!s.ok()) {return s;}return DB::Put(o, column_family, key, ts, val);
}// ... existing code ...
  • 功能: 这是 Put 操作的第一个实现层。
  • 核心逻辑:
    1. FailIfTsMismatchCf(column_family, ts): 在执行任何操作之前,它会先进行一个关键校验。它会检查你传入的时间戳 ts 的大小是否与目标列族(Column Family)定义的时间戳大小一致。如果一个列族没有启用时间戳,或者时间戳大小不匹配,这里会直接返回错误,防止数据写入格式错误。
    2. return DB::Put(...): 校验通过后,它会调用基类 DB 中的同名便利方法。

第 2 步: DB::Put - 构造写入批处理 (WriteBatch)

接下来,执行流进入了一个定义在基类 DB 中的静态便利方法,它的主要作用是为单次 Put 操作创建一个 WriteBatch

db_impl_write.cc

// ... existing code ...
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& ts, const Slice& value) {ColumnFamilyHandle* default_cf = DefaultColumnFamily();assert(default_cf);const Comparator* const default_cf_ucmp = default_cf->GetComparator();assert(default_cf_ucmp);WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key,default_cf_ucmp->timestamp_size());Status s = batch.Put(column_family, key, ts, value);if (!s.ok()) {return s;}return Write(opt, &batch);
}
// ... existing code ...
  • 功能: 将单次的 Put 操作封装成一个 WriteBatch 对象。RocksDB 中所有的写操作(Put, Delete, Merge)最终都会被统一成 WriteBatch 来处理。
  • 核心逻辑:
    1. 获取默认列族信息: 它获取了默认列族的比较器(Comparator),并从中得到时间戳的大小 timestamp_size()。这个信息用来初始化 WriteBatch,使其知晓如何处理时间戳。
    2. 创建 WriteBatch: 创建一个 WriteBatch 实例。WriteBatch 本质上是一个内存缓冲区,用于暂存一个或多个写操作。
    3. batch.Put(...): 调用 WriteBatch 的 Put 方法,将 (column_family, key, ts, value) 这组操作指令序列化并追加到 WriteBatch 的内部缓冲区 rep_ 中。在内部,key 和 timestamp 会被拼接在一起,形成 RocksDB 的内部 key(Internal Key)。这是 RocksDB 实现时间戳功能的关键。
    4. return Write(opt, &batch): 最后,调用核心的 DB::Write 方法,将这个只包含一个 Put 操作的 WriteBatch 提交给数据库进行处理。

第 3 步: DBImpl::Write - 写入操作的门户

// ... existing code ...
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {Status s;if (write_options.protection_bytes_per_key > 0) {s = WriteBatchInternal::UpdateProtectionInfo(my_batch, write_options.protection_bytes_per_key);}if (s.ok()) {s = WriteImpl(write_options, my_batch, /*callback=*/nullptr,/*user_write_cb=*/nullptr,/*wal_used=*/nullptr);}return s;
}
// ... existing code ...
  • 功能: 这是 WriteBatch 写入的公共入口。
  • 核心逻辑:
    1. 数据保护 (UpdateProtectionInfo): 如果用户在 WriteOptions 中设置了 protection_bytes_per_key,这里会为 WriteBatch 中的每个 key 计算并更新校验和(checksum),用于防止数据损坏。
    2. 调用 WriteImpl: 将 WriteBatch 和 WriteOptions 传递给核心实现函数 WriteImpl。其他参数(如 callback)在这里都传入 nullptr,因为这个 Write 接口是最基础的版本。

第 4 步: DBImpl::WriteImpl - 核心写入实现

这是整个写入流程的心脏。它非常复杂,包含了大量的配置检查、写入路径选择和核心的写组(Write Group)逻辑。

前置检查 (Pre-flight Checks)

在执行任何写入操作之前,WriteImpl 会进行一系列严格的检查,以确保配置的正确性和一致性。

db_impl_write.cc

// ... existing code ...
Status DBImpl::WriteImpl(const WriteOptions& write_options,WriteBatch* my_batch, WriteCallback* callback,UserWriteCallback* user_write_cb, uint64_t* wal_used,uint64_t log_ref, bool disable_memtable,uint64_t* seq_used, size_t batch_cnt,PreReleaseCallback* pre_release_callback,PostMemTableCallback* post_memtable_callback,std::shared_ptr<WriteBatchWithIndex> wbwi) {// ...if (my_batch == nullptr) {return Status::InvalidArgument("Batch is nullptr!");} else if (!disable_memtable &&WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) {// 如果要写入 MemTable,batch 必须包含时间戳return Status::InvalidArgument("write batch must have timestamp(s) set");} else if (write_options.rate_limiter_priority != Env::IO_TOTAL &&write_options.rate_limiter_priority != Env::IO_USER) {// 校验 rate_limiter_priority 的合法性return Status::InvalidArgument("WriteOptions::rate_limiter_priority only allows ""Env::IO_TOTAL and Env::IO_USER due to implementation constraints");// ... 还有很多其他检查 ...} else if (write_options.disableWAL &&immutable_db_options_.recycle_log_file_num > 0 &&!(two_write_queues_ && disable_memtable)) {// disableWAL 和 recycle_log_file_num > 0 不兼容return Status::InvalidArgument("WriteOptions::disableWAL option is not supported if ""DBOptions::recycle_log_file_num > 0");}// ...if (immutable_db_options_.enable_pipelined_write &&post_memtable_callback != nullptr) {// pipelined write 不支持 post_memtable_callbackreturn Status::NotSupported("pipelined write currently does not honor post_memtable_callback");}// ...

这里列举了一些关键检查:

  • my_batch 不能为空。
  • 如果要写入 MemTable (!disable_memtable),且列族开启了时间戳,那么 WriteBatch 中的数据必须带有时间戳。
  • 对 WriteOptions 中的各种选项进行合法性和兼容性校验,例如 rate_limiter_prioritydisableWAL 与 recycle_log_file_num 的冲突、enable_pipelined_write 与其他功能的冲突等。

写入路径选择

通过所有检查后,RocksDB 会根据配置选择不同的写入模式。

// ... existing code ...if (two_write_queues_ && disable_memtable) {// 路径 A: 双写队列,且只写 WAL (disable_memtable)// ...Status status = WriteImplWALOnly(/* ... */);// ...return status;}if (immutable_db_options_.enable_pipelined_write) {// 路径 B: 流水线写入 (Pipelined Write)return PipelinedWriteImpl(write_options, my_batch, callback, user_write_cb,wal_used, log_ref, disable_memtable, seq_used);}// 路径 C: 标准写入 (Standard Write)PERF_TIMER_GUARD(write_pre_and_post_process_time);WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,log_ref, disable_memtable, batch_cnt,pre_release_callback, post_memtable_callback, wbwi);// ... 后续进入写组逻辑 ...
// ... existing code ...
  • 路径 A (WriteImplWALOnly): 当启用了双写队列 (two_write_queues_) 并且当前写操作只写 WAL (disable_memtable,常见于 2PC 事务的 prepare 阶段) 时,会走这个专门的路径。
  • 路径 B (PipelinedWriteImpl): 当启用流水线写入 (enable_pipelined_write) 时,会进入此路径。这是一种性能优化,它将写 WAL 和写 MemTable 解耦,允许它们并行执行,从而降低写入延迟。
  • 路径 C (Standard Write): 这是最经典、最通用的写入路径。它使用了写组 (Write Group) 机制来摊销 I/O 开销。我们重点分析这个路径。

WriteImplWALOnly

在 RocksDB 中,并非所有写操作都需要同时写入 WAL 和 MemTable。在以下关键场景中,只需要先将数据写入 WAL:

  1. 两阶段提交 (2PC - Two-Phase Commit): 在事务的 Prepare 阶段,事务内容(WriteBatch)必须先持久化到 WAL 中。如果此时系统崩溃,重启后可以通过 WAL 恢复这个 "prepared" 状态的事务。只有在 Commit 阶段,一个简单的提交标记才会被写入,并更新 MemTable。WriteImplWALOnly 正是为执行这个 Prepare 操作而生。

  2. 无序写入 (unordered_write): 这是一种性能优化选项。它将一次写入拆分为两个阶段:首先同步写入 WAL,然后异步写入 MemTable。WriteImplWALOnly 执行第一阶段的同步写 WAL,允许客户端在数据持久化到日志后立即获得返回,从而降低延迟。

  3. 双写队列 (two_write_queues_): 当启用此选项时,RocksDB 会为 WAL-only 的写入操作启用一个独立的写队列 (nonmem_write_thread_)。这可以防止大的事务 Prepare 操作阻塞常规的、需要写 MemTable 的小操作,从而提高系统的并发性和响应能力。

我们先看一下函数的参数,这有助于理解它的功能:

db_impl_write.cc

Status DBImpl::WriteImplWALOnly(WriteThread* write_thread, const WriteOptions& write_options,WriteBatch* my_batch, WriteCallback* callback,UserWriteCallback* user_write_cb, uint64_t* wal_used,const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,const PublishLastSeq publish_last_seq, const bool disable_memtable)
  • WriteThread* write_thread: 要加入的写线程队列。
  • WriteBatch* my_batch: 要写入的批处理数据。
  • PreReleaseCallback* pre_release_callback: 一个非常关键的回调。它在 WAL 写入完成之后、函数返回之前被调用。在 2PC 场景中,Prepare 的调用者通过这个回调来获取分配给这批数据的序列号(Sequence Number)。
  • const AssignOrder assign_order: 枚举,决定是否需要为这批写入分配序列号。
  • const PublishLastSeq publish_last_seq: 枚举,决定是否需要更新全局的 LastSequence。在 unordered_write 模式下,写 WAL 后就需要更新,让数据可见;而在 2PC 的 Prepare 阶段则不需要,因为数据还未提交。
  • const bool disable_memtable: 是否禁止写入 MemTable。在这个函数的大多数场景下,它都为 true

WriteImplWALOnly 的内部逻辑与标准写入路径一样,也采用了写组 (Write Group) 机制。

第 1 步: 加入写组
// ... existing code ...WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,log_ref, disable_memtable, sub_batch_cnt,pre_release_callback);
// ... existing code ...write_thread->JoinBatchGroup(&w);
// ... existing code ...if (w.state == WriteThread::STATE_COMPLETED) {if (wal_used != nullptr) {*wal_used = w.wal_used;}if (seq_used != nullptr) {*seq_used = w.sequence;}return w.FinalStatus();}// else we are the leader of the write batch groupassert(w.state == WriteThread::STATE_GROUP_LEADER);
// ... existing code ...
  • 和标准写入一样,它首先创建一个 Writer 对象来封装请求。
  • 然后调用 JoinBatchGroup 加入写组。
  • 如果当前线程成为了 Follower,它会在此等待,直到 Leader 完成工作。w.state == WriteThread::STATE_COMPLETED 这个判断就是处理 Follower 的情况,它直接获取结果并返回。
  • 如果成为了 Leader,则继续执行后续逻辑。
第 2 步: Leader 的准备工作

Leader 在正式写入前,会根据 publish_last_seq 的值选择不同的准备路径。

// ... existing code ...if (publish_last_seq == kDoPublishLastSeq) {// ...// 为 unordered_write 准备,不锁 DB 主锁Status status =PreprocessWrite(write_options, &wal_context, &write_context);// ...} else {// ...// 为 2PC Prepare 准备,需要锁 DB 主锁并检查写延迟InstrumentedMutexLock lock(&mutex_);Status status =DelayWrite(/*num_bytes=*/0ull, *write_thread, write_options);// ...}
// ... existing code ...
  • kDoPublishLastSeq 路径 (用于 unordered_write): Leader 直接调用 PreprocessWrite,这个函数会处理 WAL 切换等逻辑,但它不会锁住整个数据库的 mutex_
  • kDontPublishLastSeq 路径 (用于 2PC Prepare): Leader 会先获取数据库的全局锁 mutex_,然后调用 DelayWrite 进行写流控,确保不会因为大量事务 Prepare 写入而压垮系统。
第 3 步: Leader 执行写入

这是 Leader 的核心工作,负责将整个写组的数据写入 WAL。

// ... existing code ...WriteThread::WriteGroup write_group;uint64_t last_sequence;write_thread->EnterAsBatchGroupLeader(&w, &write_group);// 1. 聚合信息并更新统计数据for (auto* writer : write_group) {// ... 计算总大小 ...}// ... 更新各种统计信息,如 BYTES_WRITTEN ...// 2. 写入 WALPERF_TIMER_GUARD(write_wal_time);size_t seq_inc = 0;if (assign_order == kDoAssignOrder) {// ... 计算需要分配的序列号数量 ...seq_inc = total_batch_cnt;}Status status;if (!write_options.disableWAL) {IOStatus io_s = ConcurrentWriteGroupToWAL(write_group, wal_used,&last_sequence, seq_inc);status = io_s;// ... 错误检查 ...} else {// 如果禁用了 WAL,只分配序列号last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);}
// ... existing code ...
  1. 聚合与统计: Leader 遍历写组中的所有 Writer,计算总的写入字节数,并更新相关的性能统计数据。
  2. 写入 WAL:
    • 计算需要分配的序列号总数 seq_inc
    • 调用 ConcurrentWriteGroupToWAL 函数。这个函数是专门为并发写入 WAL 设计的,它会将整个 write_group 的 WriteBatch 合并成一个大块,一次性写入 WAL 文件,并原子地分配 last_sequence。这是性能优化的关键。
    • 如果 disableWAL 为 true,则跳过写日志,仅通过 FetchAddLastAllocatedSequence 来预留出序列号。
第 4 步: Leader 的收尾工作

WAL 写入成功后,Leader 还需要完成一些重要的收尾工作。

// ... existing code ...// 3. 为每个 writer 分配具体的序列号auto curr_seq = last_sequence + 1;for (auto* writer : write_group) {// ...writer->sequence = curr_seq;// ...}// 4. 如果需要,同步 WALif (status.ok() && write_options.sync) {// ... 调用 SyncWAL() 或 FlushWAL(true) ...}// 5. 执行 PreReleaseCallbackif (status.ok()) {for (auto* writer : write_group) {if (!writer->CallbackFailed() && writer->pre_release_callback) {// ...Status ws = writer->pre_release_callback->Callback(writer->sequence, ...);// ...}}}// 6. 如果需要,发布 LastSequenceif (publish_last_seq == kDoPublishLastSeq) {versions_->SetLastSequence(last_sequence + seq_inc);}// 7. 退出写组,唤醒 Followerwrite_thread->ExitAsBatchGroupLeader(write_group, status);
// ... existing code ...
  1. 分配序列号: Leader 遍历写组,将连续的序列号分配给每个 Writer
  2. 同步 WAL: 如果 WriteOptions 要求同步 (sync=true),则执行 fsync 操作,确保日志落盘。
  3. 执行回调: 这是非常重要的一步。Leader 调用每个 Writer 的 pre_release_callback,并将分配到的序列号作为参数传入。这使得 2PC 的调用者可以得知 Prepare 操作成功,并记录下对应的序列号,以便后续 Commit
  4. 发布序列号: 如果是 unordered_write 场景 (publish_last_seq == kDoPublishLastSeq),则更新全局的 LastSequence,使得这批数据对读操作可见。
  5. 退出: Leader 调用 ExitAsBatchGroupLeader,将最终状态赋予组内所有 Writer,并唤醒所有等待的 Follower 线程。

总结

DBImpl::WriteImplWALOnly 是一个为特定场景(2PC、unordered_write)设计的高效写入路径。它通过以下方式实现其目的:

  • 重用写组机制: 与标准写入一样,通过批处理来摊销 I/O 成本。
  • 专用写队列: 可选地使用独立线程池,避免与常规写入互相阻塞。
  • 精细的流程控制: 通过 AssignOrder 和 PublishLastSeq 参数,精确控制序列号的分配和可见性,以适应不同场景的需求。
  • 关键的回调机制PreReleaseCallback 是实现 2PC Prepare 逻辑的核心,它在 WAL 持久化后、锁释放前提供了获取序列号的机会。

PipelinedWriteImpl

PipelinedWriteImpl 的核心思想是将一个写操作分解为两个主要阶段:

  1. WAL 写入阶段:由一个 leader 线程将一个批次(group)的所有写请求一次性写入 WAL 文件。
  2. MemTable 写入阶段:在 WAL 写入完成后,该批次中的各个写请求再被写入到 MemTable。

关键在于,当第 N 个批次正在进行 MemTable 写入时,第 N+1 个批次可以同时进行 WAL 写入。这种重叠执行(流水线)减少了线程等待,从而提高了整体的写入性能。

下面我们结合源代码,分步解析该函数的具体实现逻辑。

Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,WriteBatch* my_batch, WriteCallback* callback,UserWriteCallback* user_write_cb,uint64_t* wal_used, uint64_t log_ref,bool disable_memtable, uint64_t* seq_used)
  • write_options: 本次写入操作的选项,如是否同步(sync)、是否禁用 WAL 等。
  • my_batch: 需要写入的数据,封装在 WriteBatch 对象中。
  • callbackuser_write_cb: 用于在写入过程的不同阶段进行回调的接口。
  • wal_usedlog_refdisable_memtableseq_used: 其他控制参数和用于返回结果的指针。

入口与排队

所有写请求线程开始时都会调用 JoinBatchGroup

// ... existing code ...WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,log_ref, disable_memtable, /*_batch_cnt=*/0,/*_pre_release_callback=*/nullptr);write_thread_.JoinBatchGroup(&w);TEST_SYNC_POINT("DBImplWrite::PipeldinedWriteImpl:AfterJoinBatchGroup");
// ... existing code ...

JoinBatchGroup 会将当前 Writer 放入一个队列中。如果队列是空的,它就成为 Leader (STATE_GROUP_LEADER);否则,它就是 Follower,并在此等待。

WAL 写入阶段 (组 N)

成为 Leader 的线程会执行以下代码块。假设这是 组 N 的 Leader。

这里N只是方便称呼,实际上只是 分了两个阶段。

// ... existing code ...if (w.state == WriteThread::STATE_GROUP_LEADER) {// ...// 1. 收集 Follower 形成一个写组 (wal_write_group)last_batch_group_size_ =write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);// 2. 乐观地更新全局序列号。这很重要,因为它解耦了序列号分配//    和 MemTable 写入,使得下一个组可以提前知道该用哪个序列号。const SequenceNumber current_sequence =write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;// ... (为组内所有 writer 分配序列号) ...write_thread_.UpdateLastSequence(current_sequence + total_count - 1);// ...// 3. 将整个组的数据写入 WALio_s = WriteGroupToWAL(wal_write_group, /* ... */);w.status = io_s;// ... (处理 WAL 同步等) ...// 4. *** 关键的流水线切换点 ***write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);}
// ... existing code ...

关键点在最后一步 ExitAsBatchGroupLeader。这个函数做了两件核心的事情:

  1. 唤醒组 N 的成员:它将 wal_write_group 中所有 Follower 的状态改变,唤醒它们,使其可以进入接下来的 MemTable 写入阶段。
  2. 释放 Leader 角色:它释放了 WAL 阶段的 Leader 身份。这意味着在 JoinBatchGroup 中等待的下一个线程(也就是 组 N+1 的 Leader)现在可以获得 Leader 身份,并开始执行 if (w.state == WriteThread::STATE_GROUP_LEADER) 这个代码块。
MemTable 写入阶段 (组 N) 与 WAL 写入阶段 (组 N+1) 并发执行

当组 N 的 Leader 调用 ExitAsBatchGroupLeader 之后:

  • 组 N 的所有线程(包括前 Leader)被唤醒,它们的状态变为 STATE_MEMTABLE_WRITER_LEADER 或 STATE_PARALLEL_MEMTABLE_WRITER。它们会继续执行 PipelinedWriteImpl 函数中后面的逻辑,进入 MemTable 写入阶段。

    // ... existing code ...// 组 N 的线程会执行这里的代码if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {// ...// 串行或并行地将数据写入 MemTable// ...write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);}// ...if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {// ...// 并行地将自己的数据写入 MemTable// ...}
    // ... existing code ...
    
  • 与此同时组 N+1 的 Leader 线程 已经通过了 JoinBatchGroup 的等待,它的状态是 STATE_GROUP_LEADER,因此它会进入同一个函数开头的 if 块,开始为组 N+1 执行收集成员、分配序列号、写入 WAL 的操作。

正是 ExitAsBatchGroupLeader 这个“交接棒”的动作,实现了流水线。

我们可以这样想象两个写组(Group N, Group N+1)的时间线:

时间点Group N (线程 A, B, C)Group N+1 (线程 D, E, F)
T1线程 A 成为 Leader,开始收集组员,准备写 WAL。线程 D, E, F 在 JoinBatchGroup 中排队等待。
T2线程 A 将整个 Group N 的数据写入 WAL。线程 D, E, F 仍在等待。
T3线程 A 调用 ExitAsBatchGroupLeader。线程 D 立即成为新 Leader,开始为 Group N+1 写 WAL。
T4线程 A, B, C 开始并发地将各自数据写入 MemTable。线程 D 正在将整个 Group N+1 的数据写入 WAL。

在 T4 这个时间点,两个组的不同阶段就在同时执行了,这就是流水线效应。这种设计极大地减少了线程的等待时间,提高了系统的整体写入吞吐量。


完成与返回
// ... existing code ...if (seq_used != nullptr) {*seq_used = w.sequence;}assert(w.state == WriteThread::STATE_COMPLETED);return w.FinalStatus();
}

最后,函数会断言 Writer 的状态为 STATE_COMPLETED,表示整个流程已走完,然后返回最终的执行状态 w.FinalStatus() 给调用者。

总结

PipelinedWriteImpl 通过精巧的 WriteThread 状态机设计,将写操作流水化:

  1. 写组聚合:多个并发写请求被聚合成一个组。
  2. WAL 阶段:一个 Leader 线程负责整个组的 WAL 写入,这是一个批处理 IO。
  3. MemTable 阶段:WAL 写入后,组内成员可以并发(或串行)地将数据写入 MemTable。
  4. 流水线效应:当一个组正在写 MemTable 时,下一个组已经可以开始写 WAL。这种重叠执行隐藏了 IO 延迟,是提升写入密集型场景性能的关键。

这个函数是 RocksDB 高性能写入能力的核心体现之一,展示了其在系统工程上的深度优化。

深入标准写入路径 (Write Group)

这是 RocksDB 提高写入吞吐量的核心机制。

  1. 创建 Writer 对象:

    • WriteThread::Writer w(...): 一个 Writer 对象被创建在当前线程的栈上。它像一个包裹,封装了本次写入的所有信息:WriteBatchWriteOptions、各种回调函数等。
  2. 加入写组:

    • write_thread_.JoinBatchGroup(&w): 当前线程带着它的 Writer 包裹尝试加入写组。
    • Leader-Follower 模式:
      • 如果写组是空的,当前线程成为 Leader
      • 如果写组已经有 Leader 了,当前线程成为 Follower,并将自己的 Writer 对象链接到 Leader 的队列中,然后进入等待状态。
  3. Leader 的工作流程:

    • 合并批处理 (MergeBatch): Leader 会遍历队列,将所有 Follower 的 WriteBatch 合并到自己的 WriteBatch 中,形成一个大的 WriteBatch
    • 分配序列号 (Sequence Number): Leader 会为整个合并后的 WriteBatch 从 versions_ 中一次性申请一块连续的序列号。
    • 写入 WAL (WriteToWAL): Leader 将合并后的 WriteBatch 作为一个单独的记录写入到 WAL 文件。这是保证数据持久性的关键。如果 WriteOptions 要求 sync,这里会触发一次 fsync
    • 写入 MemTable (WriteBatchInternal::InsertInto): WAL 写入成功后,Leader 会将 WriteBatch 中的每个操作(Put/Delete/Merge)应用到对应的 MemTable 中。如果配置了 allow_concurrent_memtable_write,Leader 会启动多个线程并行地将数据写入不同的 MemTable,进一步提升性能。
    • 更新最终序列号: 所有数据都写入 MemTable 后,Leader 会调用 versions_->SetLastSequence(),更新数据库的最新序列号。此时,这批写入的数据才对读操作可见。
    • 唤醒 Follower: Leader 完成所有工作后,会唤醒所有等待的 Follower 线程。
  4. Follower 的工作流程:

    • Follower 线程在加入写组后就一直等待,直到被 Leader 唤醒。
    • 唤醒后,它会从 Leader 那里获取整个写组的最终执行状态 (w.FinalStatus())。
    • 然后从 WriteImpl 函数返回,完成自己的写入请求。

通过这种方式,多个并发的写入请求被组合成一次大的写入,大大减少了 WAL 的 I/O 次数(特别是 fsync),从而显著提高了整体的写入吞吐量。

DBImpl::Write 的执行流可以概括为:

  1. 入口 (DBImpl::Write): 简单的封装和校验,调用 WriteImpl
  2. 核心 (DBImpl::WriteImpl):
    • 前置检查: 进行大量的参数和配置校验。
    • 路径分发: 根据配置选择标准写入、流水线写入或 WAL-only 写入。
    • 标准写入(写组):
      • 线程加入写组,选举出 Leader。
      • Leader 负责合并所有 WriteBatch,分配序列号,统一写入 WAL 和 MemTable。
      • Follower 等待 Leader 完成后被唤醒。
  3. 返回: 所有线程获取最终状态并返回。

这个流程清晰地展示了 RocksDB 如何通过批处理和精巧的并发控制来平衡数据一致性、持久性和高性能写入。

写入的连锁反应 - Flush(将 MemTable 的内容刷写到 SST 文件)

在 PipelinedWriteImpl 或 WriteImpl 的代码中,不会看到直接执行 flush 的代码,因为 flush 是一个被写入操作间接触发的、在后台异步执行的过程

这个触发和调度的逻辑主要体现在 PreprocessWrite 函数中,它在数据被真正写入 MemTable 之前被调用。

下面是详细的流程:

无论是 PipelinedWriteImpl 还是 WriteImpl,在执行写入 WAL 和 MemTable 的核心逻辑之前,都会调用 PreprocessWrite 来做准备工作。

// ... existing code ...if (w.state == WriteThread::STATE_GROUP_LEADER) {// ...WalContext wal_context(!write_options.disableWAL && write_options.sync);// PreprocessWrite does its own perf timing.PERF_TIMER_STOP(write_pre_and_post_process_time);// 这是关键的调用点w.status = PreprocessWrite(write_options, &wal_context, &write_context);PERF_TIMER_START(write_pre_and_post_process_time);
// ... existing code ...

PreprocessWrite 的核心职责之一是调用 MakeRoomForWrite,确保当前的 active MemTable 有足够的空间来容纳即将写入的数据。

MakeRoomForWrite 的逻辑如下:

  1. 检查空间:判断当前 active MemTable 是否已满,或者是否因为其他原因(如 write_buffer_manager 限制了总内存)需要强制切换。
  2. 切换 MemTable:如果需要,它会调用 SwitchMemtable。这个函数是 flush 机制的“扳机”:
    • 将当前的 active MemTable 变成 immutable MemTable(不可变内存表)。
    • 将这个不可变的 MemTable 添加到 imm_ 列表中。
    • 创建一个全新的、空的 MemTable 作为新的 active MemTable。
  3. 调度 Flush:在切换完成后,系统会调用 MaybeScheduleFlushOrCompaction。这个函数会检查 imm_ 列表中是否有待刷写的 MemTable。

 调度并执行 Flush

  1. MaybeScheduleFlushOrCompaction:当此函数发现 imm_ 列表中有新的不可变 MemTable 时,它会创建一个 FlushJob
  2. FlushScheduler:这个 FlushJob 会被提交给 flush_scheduler_。可以在 WriteBatchInternal::InsertInto 的参数中看到它的身影,这表明它与写入流程紧密相关。
    // ... existing code ...w.status = WriteBatchInternal::InsertInto(write_group, current_sequence, column_family_memtables_.get(),&flush_scheduler_, &trim_history_scheduler_, // flush_scheduler 在这里
    // ... existing code ...
    
  3. 后台执行:RocksDB 的后台 flush 线程池会从 flush_scheduler_ 中获取这个 FlushJob 并执行它。这个 Job 的工作就是读取 immutable MemTable 的内容,并将其写入一个新的 SST 文件中。

所以,整个流程是这样的:

Write 请求 -> PipelinedWriteImpl -> PreprocessWrite -> MakeRoomForWrite -> 发现 MemTable 满了 -> SwitchMemtable -> 将旧 MemTable 变为 Immutable -> MaybeScheduleFlushOrCompaction -> 创建 FlushJob 并通过 flush_scheduler_ 调度 -> 后台线程执行 Flush

因此,flush 操作与 write 操作是解耦的。write 操作负责触发,而真正的 flush I/O 操作由后台线程完成,这样可以避免阻塞前台的写请求,从而保证高写入性能。

总结

一次 DB::Put 的旅程如下:

  1. 封装 (DB::Put):

    • 将单个 Put(key, value) 操作封装成一个只包含一条记录的 WriteBatch 对象。
    • 调用 DB::Write(WriteOptions, WriteBatch*)
  2. 分发写入策略 (DBImpl::Write):

    • 根据数据库配置(unordered_write 和 enable_pipelined_write),决定是走流水线写入路径 (PipelinedWriteImpl) 还是传统写入路径 (WriteImpl)
  3. 核心写入路径 (以 PipelinedWriteImpl 为例):

    • 加入写组:
      • 每个写请求(WriteBatch)被包装成一个 Writer 对象。
      • 调用 write_thread_.JoinBatchGroup(&w),尝试加入一个写组。如果队列为空,当前 Writer 成为 WAL Leader;否则成为 Follower 并等待。
    • WAL 阶段 (由 WAL Leader 执行):
      • 预处理 (PreprocessWrite): 检查 MemTable 空间。如果空间不足,会触发 SwitchMemtable,将当前 active MemTable 变为 immutable,并调度后台 Flush
      • 聚合: Leader 收集队列中所有的 Follower,形成一个 WriteGroup
      • 分配序列号: Leader 为 WriteGroup 中的所有操作分配连续的 SequenceNumber
      • 写入 WAL: Leader 将整个 WriteGroup 的数据一次性写入 WAL 文件,实现 I/O 聚合。
      • 释放 Leader: Leader 调用 ExitAsBatchGroupLeader唤醒组内成员进入 MemTable 写入阶段,并释放 WAL Leader 角色。这是实现流水线的关键,下一个写组的 Leader 可以立即开始它的 WAL 阶段。
    • MemTable 阶段 (由组内所有成员执行):
      • 被唤醒的 Writer 们会选出一个 MemTable Leader
      • 根据配置(allow_concurrent_memtable_write),由 MemTable Leader 串行写入,或由所有 Writer 并发地将各自的 WriteBatch 插入到 MemTable 中。
      • 更新版本: 最后一个完成 MemTable 写入的 Writer 负责更新 versions_->SetLastSequence(),使本次写入对读操作可见。
    • 完成: 所有阶段执行完毕,返回状态。
  4. 后台异步任务:

    • Flush: 如果在预处理阶段触发了 MemTable 切换,后台的 Flush 线程会获取到这个 immutable MemTable,并将其内容刷写(Flush)到磁盘,生成一个新的 SST 文件。
    • Compaction: Flush 产生新的 SST 文件可能会触发后台的 Compaction 任务,以合并和优化数据存储结构。

DB::PutEntity

简单来说,PutEntity 是 Put 的一个更通用、更强大的版本,主要用于支持 Wide Columns(宽列)特性。

DB::Put 是 RocksDB 传统的写入接口,它处理的是一个 key 对应一个 value 的简单模型。

// ... existing code ...
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& value) {// ...WriteBatch batch(/*...*/);Status s = batch.Put(column_family, key, value);// ...return Write(opt, &batch);
}
// ... existing code ...

它的作用非常直接:将一个 (key, value) 对包装成一个 WriteBatch,然后调用核心的 Write 方法。

DB::PutEntity 是为了支持“宽列”模型而引入的。在宽列模型中,一个 key 可以对应多个 "attribute"(属性)或 "column"(列),每个属性/列都有自己的名字和值。这在逻辑上类似于一个 key 对应一个小的 key-value map。

PutEntity 有两个主要的重载版本:

版本一:使用 WideColumns

它接受一个 key 和一个 WideColumns 对象。WideColumns 本质上是一个 (column_name, column_value) 对的集合。

// ... existing code ...
Status DB::PutEntity(const WriteOptions& options,ColumnFamilyHandle* column_family, const Slice& key,const WideColumns& columns) {
// ... existing code ...WriteBatch batch(/* ... */);const Status s = batch.PutEntity(column_family, key, columns);if (!s.ok()) {return s;}return Write(options, &batch);
}
// ... existing code ...

这个接口允许你一次性为一个 key 写入多个属性。例如,你可以为一个用户 ID(作为 key)同时写入他的姓名、年龄、地址等多个属性(作为 WideColumns)。

版本二:使用 AttributeGroups

这个版本更为灵活,它允许你为一个 key 写入跨越多个列族(Column Family)的属性。AttributeGroups 是 (ColumnFamilyHandle, WideColumns) 对的集合。

// ... existing code ...
Status DB::PutEntity(const WriteOptions& options, const Slice& key,const AttributeGroups& attribute_groups) {
// ... existing code ...WriteBatch batch(/* ... */);const Status s = batch.PutEntity(key, attribute_groups);if (!s.ok()) {return s;}return Write(options, &batch);
}
// ... existing code ...

关系总结

  1. 功能上的扩展PutEntity 是 Put 的超集。Put 可以看作是 PutEntity 的一个特例,即只写入一个默认名称的属性(value)。在 RocksDB 内部,一个简单的 Put(key, value) 最终可能会被当作写入一个 (key, {"default_column", value}) 的实体来处理。

  2. 实现上的相似性:从实现上可以看出,它们都遵循相同的模式:

    • 创建一个 WriteBatch
    • 调用 WriteBatch 对应的方法(batch.Put 或 batch.PutEntity)来填充 WriteBatch
    • 调用核心的 DB::Write 方法来执行这个 WriteBatch
  3. 使用场景

    • 当你只需要存储简单的 key-value 数据时,使用 DB::Put 更直观、更简单。
    • 当你的数据模型更复杂,一个主键(key)需要关联多个命名的属性值时,DB::PutEntity 提供了更强大和结构化的方式来组织数据,避免了将多个属性手动序列化到一个 value 中的麻烦。

总而言之,PutEntity 是为了适应更复杂的数据模型而对 Put 进行的现代化扩展。两者共享底层的写入路径和机制。

DB::Merge

Merge 是 RocksDB 提供的一种非常有特色的写操作。与 Put(覆盖写)和 Delete(删除)不同,Merge 是一种**“读取-修改-写入”**(Read-Modify-Write)的原子操作。它允许用户定义一个合并函数(MergeOperator),当对一个 key 执行 Merge 操作时,RocksDB 会将新的值(称为 operand,操作数)与已存在的旧值进行合并,生成一个新值并写回。

DB::Merge 有两个主要的重载版本,一个带时间戳,一个不带。它们是暴露给用户的最顶层接口。

db_impl_write.cc

// ... existing code ...
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& value) {WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);Status s = batch.Merge(column_family, key, value);if (!s.ok()) {return s;}return Write(opt, &batch);
}Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& ts, const Slice& value) {ColumnFamilyHandle* default_cf = DefaultColumnFamily();assert(default_cf);const Comparator* const default_cf_ucmp = default_cf->GetComparator();assert(default_cf_ucmp);WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key,default_cf_ucmp->timestamp_size());Status s = batch.Merge(column_family, key, ts, value);if (!s.ok()) {return s;}return Write(opt, &batch);
}
// ... existing code ...

分析:

  • 统一的模式: 和 PutDelete 一样,Merge 的顶层实现遵循一个标准模式:
    1. 创建一个 WriteBatch 对象。
    2. 调用 WriteBatch 相应的 Merge 方法,将 Merge 操作记录到 WriteBatch 中。
    3. 调用核心的 DB::Write 方法,将这个 WriteBatch 提交给写入流水线。
  • 原子性保证: 将 Merge 操作封装进 WriteBatch 是保证其原子性的第一步。后续的写入流程会确保 WriteBatch 中的所有操作要么全部成功,要么全部失败。

实现层检查 (DBImpl::Merge)

在 DBImpl 中,对 DB::Merge 进行了进一步的封装和检查,确保调用 Merge 的前提条件是满足的。

// ... existing code ...
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,const Slice& key, const Slice& val) {const Status s = FailIfCfHasTs(column_family);if (!s.ok()) {return s;}auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);if (!cfh->cfd()->ioptions().merge_operator) {return Status::NotSupported("Provide a merge_operator when opening DB");} else {return DB::Merge(o, column_family, key, val);}
}Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,const Slice& key, const Slice& ts, const Slice& val) {const Status s = FailIfTsMismatchCf(column_family, ts);if (!s.ok()) {return s;}return DB::Merge(o, column_family, key, ts, val);
}
// ... existing code ...

分析:

  • 前置条件检查: 这是 DBImpl 层的主要职责。
    • FailIfCfHasTs: 检查不带时间戳的 Merge 是否用在了启用了时间戳的列族上,这是不允许的。
    • FailIfTsMismatchCf: 检查带时间戳的 Merge 提供的时间戳大小是否和列族定义的一致。
    • merge_operator 检查: 这是最核心的检查。Merge 操作必须依赖用户在打开数据库时提供的 merge_operator。如果没有提供,调用 Merge 会直接返回 NotSupported 错误。
  • 调用基类: 检查通过后,它会调用基类 DB::Merge 的实现,也就是我们第一部分分析的逻辑,进入 WriteBatch 流程。

Merge 操作的本质:延迟计算

Merge 的一个关键特性是延迟计算。当调用 DB::Merge 时,RocksDB 并不会立即执行合并操作。它只是将一个类型为 kTypeMerge 的记录写入 WAL 和 MemTable。这条记录包含了 key 和新的操作数(operand)。

真正的合并计算发生在以下两个时机:

  1. 读取时 (Read Path): 当你调用 DB::Get 读取一个 key 时,如果 RocksDB 发现这个 key 的最新版本是一系列的 Merge 操作数,它会:

    1. 首先找到这个 key 的一个基础值(一个 Put 操作或者不存在)。

    2. 然后按顺序应用所有堆叠在基础值之上的 Merge 操作数,通过调用 merge_operator->FullMergeV2(...) 来计算出最终结果。

    3. 这个过程被称为 "Merge on Read"

  2. Compaction 时 (Compaction Path): 为了避免读取时合并过多的操作数导致性能下降,Compaction 过程也会执行合并。 a. 当 Compaction 处理到一个 key 时,它会收集这个 key 在不同 SST 文件和层级中的所有 Put 和 Merge 记录。 b. 它会调用 merge_operator 将这些记录合并成一个最终的 Put 记录。 c. 这个最终的 Put 记录会被写入到新的 SST 文件中。这样,未来的读取就不再需要进行合并计算了。

MergeOperator

这是用户实现 Merge 逻辑的地方。用户需要继承 MergeOperator 类并实现其核心方法,主要是 FullMergeV2 或 PartialMergeMulti

  • FullMergeV2: 定义了如何将一个基础值和一系列操作数合并成一个新值的逻辑。这是最常用的合并方法。
  • PartialMergeMulti: 定义了如何将两个操作数合并成一个操作数的逻辑。这是一种优化,可以在 Compaction 期间提前合并部分操作数,减少 FullMergeV2 的负担。

例如,一个用于实现 "计数器" 的 MergeOperator 可能会将所有操作数(字符串形式的数字)解析成整数,然后求和,最后将结果转回字符串作为新的 value。

使用场景

基础场景:计数器 (Counter)

这是最经典、最直观的 Merge 应用场景。

  • 场景描述: 需要对某个 key 关联的数值进行频繁的加/减操作。例如,统计网页的点击次数、用户收到的点赞数、商品的库存变化等。
  • 不使用 Merge 的做法:
    1. value = DB::Get(key)
    2. new_value = value + 1
    3. DB::Put(key, new_value)
  • 问题: 这个过程需要一次读和一次写,并且需要客户端加锁来保证并发修改的原子性,非常低效且复杂。
  • 使用 Merge 的做法:
    1. 定义一个 MergeOperator,其逻辑是将旧值(字符串)和操作数(operand,比如 "1")都解析为数字,相加后再转为字符串。
    2. 客户端只需要调用 DB::Merge(key, "1")
  • 优势: 客户端操作被简化为单次、快速的写入。数据库利用 WAL 保证了原子性,并将合并计算推迟,使得写入性能极高。

数据结构场景:集合/列表的构建

Merge 可以用来高效地构建一个与 key 关联的集合或列表类型的数据。

  • 场景描述: 需要为一个 key 关联一个列表或集合,并频繁地向其中添加新元素。例如,记录一个用户关注的所有人的 ID 列表、一个商品的所有标签集合。
  • 不使用 Merge 的做法:
    1. list_str = DB::Get(key)
    2. list = deserialize(list_str)
    3. list.add(new_element)
    4. new_list_str = serialize(list)
    5. DB::Put(key, new_list_str)
  • 问题: 每次添加都需要读取、反序列化、修改、序列化、写入整个列表,当列表很长时,开销巨大。
  • 使用 Merge 的做法:
    1. 定义一个 MergeOperator,其逻辑是将旧值(序列化的列表)和操作数(新元素)合并。例如,简单地将新元素追加到旧值字符串的末尾,用特殊分隔符隔开。
    2. 客户端只需调用 DB::Merge(key, new_element_str)
  • 优势: 写入非常快。真正的列表合并发生在读取时或后台 Compaction 时。这对于写多读少的场景(例如,日志记录)尤其高效。

JSON 文档的部分更新

Merge 可以模拟对 JSON 或类似半结构化文档的局部更新(JSON Patch)。

  • 场景描述: 存储一个大的 JSON 对象作为 value,但经常只需要修改其中的一两个字段。
  • 不使用 Merge 的做法: 读取整个 JSON,在内存中解析并修改,然后写回整个新的 JSON。
  • 使用 Merge 的做法:
    1. 定义一个 MergeOperator,其合并逻辑遵循 JSON Patch 规范(RFC 6902)。
    2. 操作数(operand)是一个描述如何修改的 JSON Patch 对象,例如 {"op": "replace", "path": "/baz", "value": "boo"}
    3. 客户端调用 DB::Merge(key, patch_str)
  • 优势: 避免了读取和重写整个大 value 的开销,写入操作数非常小,速度很快。

时间序列数据的聚合

Merge 可以用于时间序列数据的预聚合。

  • 场景描述: 收集大量的时序数据点(例如,某台机器每秒的 CPU 使用率),希望在存储时就能进行一些聚合,比如计算一分钟内的最大/最小/平均值。
  • 不使用 Merge 的做法: 客户端需要维护状态,攒够一分钟的数据,计算完再 Put。这会增加客户端的复杂性和状态丢失的风险。
  • 使用 Merge 的做法:
    1. Key 可以是 machine_id + minute_timestamp
    2. 定义一个 MergeOperator,其逻辑是解析旧值(例如,一个包含 min,max,sum,count 的结构体)和操作数(新的 CPU 使用率),然后更新这个结构体。
    3. 每一秒都直接调用 DB::Merge(key, new_cpu_usage)
  • 优势: 客户端无状态,写入延迟低。数据库在后台 Compaction 时会自动完成聚合计算,最终将一分钟内的所有操作数合并成一个聚合后的结果。这极大地简化了客户端逻辑,并利用了数据库的持久化和并发能力。

Merge 的本质与局限

Merge 的本质是将本应在客户端完成的逻辑下沉到存储层,利用存储层的原子性和并发控制来简化应用逻辑并优化性能。

局限性:

  1. 逻辑限制MergeOperator 的逻辑应该是无状态的,并且最好满足交换律和结合律,这样 Compaction 时的合并顺序才不会影响最终结果。
  2. 读取性能: 如果一个 key 有大量的 Merge 操作数堆积而没有被 Compaction 及时合并,那么读取这个 key 时(Merge on Read)的延迟会很高。因此,它不适合读性能要求极高的场景,除非能保证 Compaction 足够频繁。
  3. 复杂性: 实现一个正确且高效的 MergeOperator 比简单的 Put/Get 要复杂,需要仔细考虑各种边界情况。

综上所述,Merge 是一个强大的高级特性,适用于各种需要原子性“读取-修改-写入”的场景,特别是当写入性能是主要瓶颈时。它通过牺牲一定的读取性能和增加实现复杂性,换来了极高的写入吞吐量和简化的客户端逻辑。

总结

DB::Merge 的完整旅程可以概括为:

  1. API 调用: 用户调用 DB::Merge(key, operand)
  2. 实现层检查DBImpl::Merge 检查 merge_operator 是否存在以及时间戳是否匹配。
  3. 封装DB::Merge 将操作封装进 WriteBatch
  4. 写入流水线DB::Write 将 WriteBatch 提交,最终一条 kTypeMerge 记录被写入 WAL 和 MemTable。此时没有发生真正的合并计算
  5. 延迟合并:
    • 读取时DB::Get 触发 "Merge on Read",实时计算出结果返回给用户。
    • Compaction 时: 后台 Compaction 任务会将多个 Merge 操作数和一个基础值合并成一个最终的 Put 值,并写入新的 SST 文件,以优化未来的读取性能。

这种设计将昂贵的“读取-修改-写入”操作的计算部分推迟到后台(Compaction)或必要时(读取)执行,从而极大地优化了写入路径的性能,使得 Merge 操作的写入速度几乎和 Put 一样快。

DB::Write

这个函数最主要的作用是将一个 WriteBatch 对象中包含的所有写操作(Put, Delete, Merge 等)作为一个原子单元提交给数据库

  • WriteBatch: 这是一个“写入批次”的容器。你可以先创建一WriteBatch对象,然后向其中添加多个 PutDelete 或 Merge 操作。
  • 原子性 (Atomicity): 当你调用 DB::Write 并传入这个 WriteBatch 时,RocksDB 会保证这个批次里的所有操作要么全部成功写入,要么在发生错误时全部失败(不会出现只写入了一部分的情况)。这对于维护数据一致性至关重要。

在 RocksDB 中,所有独立的写操作接口,如 PutDeleteMergeSingleDelete 等,实际上都是 DB::Write 的“语法糖”。它们内部的实现逻辑都是:

  1. 创建一个只包含一个操作的 WriteBatch
  2. 调用 DB::Write 来执行这个 WriteBatch

我们可以从源码中清晰地看到这一点,例如 DB::Merge 的实现:

// ... existing code ...
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& value) {// 1. 创建一个 WriteBatchWriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);// 2. 向 Batch 中添加一个 Merge 操作Status s = batch.Merge(column_family, key, value);if (!s.ok()) {return s;}// 3. 调用核心的 Write 方法return Write(opt, &batch);
}
// ... existing code ...

这表明 DB::Write 是整个写入体系的基础。

实现探究 (DBImpl::Write)

DB 是一个抽象基类,真正的实现在 DBImpl 中。

// ... existing code ...
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {Status s;if (write_options.protection_bytes_per_key > 0) {s = WriteBatchInternal::UpdateProtectionInfo(my_batch, write_options.protection_bytes_per_key);}if (s.ok()) {// 调用更底层的 WriteImpls = WriteImpl(write_options, my_batch, /*callback=*/nullptr,/*user_write_cb=*/nullptr,/*wal_used=*/nullptr);}return s;
}
// ... existing code ...

从 DBImpl::Write 的实现可以看出:

  • 它是一个相对薄的封装。
  • 它会做一些预处理,比如根据 WriteOptions 更新 WriteBatch 的校验信息。
  • 最终,它会调用一个更为通用和强大的内部函数 WriteImpl(或者根据配置走 PipelinedWriteImpl 流程),这个 WriteImpl 才是我们之前详细分析过的、包含进入写组、写 WAL、写 MemTable等复杂逻辑的真正核心。

使用 DB::Write 进行批量写入主要有两个巨大的优势:

  1. 性能 (Performance): 写入1000条数据,调用1000次 Put 和调用1次 Write(传入包含1000个操作的 WriteBatch),后者的性能会高出几个数量级。这是因为批量写入可以:

    • 减少 WAL 写入次数: 整个批次只需要写一次预写日志(WAL)。
    • 减少锁竞争: 整个批次只需要获取一次锁,进入一次写组。
    • 摊销开销: 将线程调度、内存分配等固定开销摊销到批次中的每一个操作上。
  2. 一致性 (Consistency): 保证了多个相互关联的写操作的原子性。例如,在银行转账场景中,“A账户减钱”和“B账户加钱”这两个操作必须被打包在一个 WriteBatch 中,通过一次 DB::Write 调用来完成,以确保它们同时成功或同时失败。

总结

Status DB::Write(const WriteOptions& options, WriteBatch* updates) 不仅仅是提供了批量写的能力,它更是 RocksDB 写入操作的基石。它通过 WriteBatch 提供了原子性高性能的保证,并且是所有其他上层写入API(如PutDelete)最终依赖的核心入口。

RocksDB 中的三种删除操作

DeleteSingleDelete 和 DeleteRange。这三者虽然都用于删除数据,但它们的机制、性能特点和适用场景有很大不同。

特性DeleteSingleDeleteDeleteRange
操作粒度单个 Key单个 Key一个 Key 的范围 [begin, end)
内部机制写入一个标准的“墓碑”(Tombstone)标记写入一个带优化提示的特殊“墓碑”写入一个“范围墓碑”(Range Tombstone)
写入开销极低(删除大量数据时)
读取影响几乎无影响几乎无影响有一定影响(迭代器需要检查是否被范围覆盖)
核心优势通用、安全地删除单个键在特定条件下,Compaction 效率更高删除大量连续数据时效率最高
使用约束用户需保证同一批次中没有对该 Key 的 Put 操作WriteBatchWithIndex 等场景不支持
最佳场景删除单个、已存在的 Key“盲删”场景,即不确定 Key 是否存在就删除删除大量连续数据,如按时间清理旧数据

Delete:通用的单点删除

Delete 是最标准、最常用的删除单个 key 的方法。

当调用 DB::Delete(key) 时,RocksDB 并不会立即去磁盘上找到并删除这个数据。相反,它会在 MemTable 和后续的 WAL 中写入一个针对这个 key 的特殊标记,我们称之为**“墓碑(Tombstone)”。

  • 写入路径: 和 Put 类似,Delete 操作也是通过创建一个 WriteBatch 并调用 DB::Write 来完成的。
    // ... existing code ...
    Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key) {WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);Status s = batch.Delete(column_family, key);if (!s.ok()) {return s;}return Write(opt, &batch);
    }
    // ... existing code ...
    
  • 读取路径: 当用户查询这个 key 时,RocksDB 会看到这个墓碑标记,并向用户返回 Status::NotFound,即使在旧的 SST 文件中仍然存在这个 key 的旧值。
  • Compaction: 这个墓碑会像普通数据一样,随着 Compaction 过程在 LSM 树中下沉。直到这个墓碑到达最底层(Bottommost Level),并且可以确定更低层(不存在)没有这个 key 的更旧版本时,这个墓碑和它覆盖的所有旧值才会被真正地物理删除。

适用于绝大多数需要删除单个 key 的情况。


SingleDelete:优化的单点删除

SingleDelete 的使用有一个非常严格的契约(contract)它假定你要删除的 key,在被删除之前只被写入(Put)过一次

如果你违反了这个前提,比如对一个 key 多次 Put 后再使用 SingleDelete,或者将 SingleDelete 和 DeleteMerge 混用在同一个 key 上,其行为是未定义的(undefined behavior),可能会导致数据不一致。

这一点在多个代码文件中都有体现,尤其是 Java 的接口文档中解释得非常清楚:

WriteBatchInterface.java

// ... existing code .../*** Remove the database entry for {@code key}. Requires that the key exists* and was not overwritten. It is not an error if the key did not exist* in the database.* <p>* If a key is overwritten (by calling {@link #put(byte[], byte[])} multiple* times), then the result of calling SingleDelete() on this key is undefined.* SingleDelete() only behaves correctly if there has been only one Put()* for this key since the previous call to SingleDelete() for this key.* <p>* This feature is currently an experimental performance optimization* for a very specific workload. It is up to the caller to ensure that* SingleDelete is only used for a key that is not deleted using Delete() or* written using Merge(). Mixing SingleDelete operations with Deletes and* Merges can result in undefined behavior.*
// ... existing code ...

HISTORY.md 文件也强调了这一点:

// ... existing code ...
# Rocksdb Change Log
## 7.3.0 (05/20/2022)
### Behavior changes
* Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass.
// ... existing code ...

SingleDelete 的性能优势体现在 Compaction 过程中。由于它假定只有一个 Put 操作与之对应,Compaction 时就可以进行一种称为“操作数抵消”(operand cancellation)的优化。当 Compaction 过程同时遇到一个 Put 操作和其对应的 SingleDelete 操作时,可以直接将这两个操作同时安全地移除,而不需要像 Delete 那样去查找并清理所有可能的旧版本。这减少了 Compaction 的工作量,从而提升了性能。

从 DBImpl 的实现来看,Delete 和 SingleDelete 最终都会创建一个 WriteBatch 并写入。

而 DB::SingleDelete 的默认实现会调用 WriteBatch::SingleDelete

// ... existing code ...
Status DB::SingleDelete(const WriteOptions& opt,ColumnFamilyHandle* column_family, const Slice& key) {WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);Status s = batch.SingleDelete(column_family, key);if (!s.ok()) {return s;}return Write(opt, &batch);
}
// ... existing code ...

WriteBatch 内部会将 Delete 和 SingleDelete 标记为不同的操作类型(kTypeDeletion vs kTypeSingleDeletion),以便在后续处理(如 Compaction)中应用不同的逻辑。


DeleteRange:高效的范围删除

DeleteRange 是为了解决删除大量连续 key 的性能问题而设计的。

在 DeleteRange 出现之前,要删除一个范围的数据,主要有两种方式:

  1. 遍历并删除 (Scan-and-Delete):创建一个迭代器,遍历整个范围,并对每个 key 调用 Delete。这种方式非常慢,因为它涉及到大量的读 I/O 和写操作,会产生海量的独立墓碑,严重拖慢后续的迭代器性能。
  2. 自定义 Compaction Filter:通过过滤器在后台 Compaction 时删除数据。这种方式是异步的,无法保证读者能立即看不到被删除的数据。

DeleteRange 提供了一个原生的、原子性的范围删除操作,解决了以上痛点。

2018-11-21-delete-range.markdown

// ... existing code ...
## Motivation
### Existing mechanisms: challenges and opportunitiesThe most common pattern we see is scan-and-delete, i.e., advance an iterator
through the to-be-deleted range, and issue a `Delete` for each key. This is
slow (involves read I/O) so cannot be done in any critical path. Additionally,
it creates many tombstones, which slows down iterators and doesn't offer a deadline
for space reclamation.
// ... existing code ...

DeleteRange 会在数据库中创建“范围墓碑”(range tombstones)。当读取数据时,无论是点查询(Get)还是范围扫描(Iterator),都必须检查一个 key 是否被某个范围墓碑所覆盖。

这里的核心挑战是:范围墓碑之间可能会重叠。例如,先删除 [a, z],再删除 [f, m]。为了高效判断一个 key 是否被删除,就需要一种方法来处理这些重叠的范围。这就是 "Skyline" 算法的用武之地。

这个概念源于一个经典的计算几何问题:给定一组矩形,计算它们顶部轮廓线。在 RocksDB 中,这被引申为:将所有重叠的、有效的范围墓碑合并成一个不重叠的、连续的“天际线”

DeleteRange 的实现演进 (v1 vs v2)

根据官方博客,DeleteRange 的内部实现经历了重要的演进,这直接影响了 Skyline 的构建方式和性能表现。

v1 设计 (早期版本):

  • 实现方式: 当创建一个迭代器并进行 Seek 时,RocksDB 会收集所有相关的范围墓碑,并一次性构建一个全局的 "Skyline"
  • 性能:
    • 优点: 一旦 Skyline 构建完成,后续的 Next() / Prev() 操作就很快,因为只需要对照这个已经计算好的 Skyline 即可。
    • 缺点迭代器创建和首次定位的开销巨大。构建 Skyline 的过程需要对所有墓碑进行排序和合并,复杂度为 O(T*log(T))(T 为墓碑数量),这会导致长范围扫描的启动延迟非常高。点查询性能也很差,需要线性扫描所有相关的墓碑。

2018-11-21-delete-range.markdown

// ... existing code ...
## v1: Getting it to work
### Read pathIn iterators, we aggregate range tombstones into a skyline as we visit live
memtable, immutable memtables, and SSTs. The skyline is expensive to construct but fast to determine whether a key is covered. The skyline keeps track of the most recent range tombstone found to optimize `Next` and `Prev`.
// ... existing code ...
### Performance characteristics
// ... existing code ...
When an iterator is first created and seeked, we construct a skyline over its
tombstones. This operation is O(T*log(T)) where T is the number of tombstones
// ... existing code ...

v2 设计 (当前主流版本):

为了解决 v1 的性能问题,特别是读性能,v2 设计被提出。

  • 实现方式: v2 的核心思想是放弃构建全局 Skyline,转而进行局部处理和缓存

    • 在 SST 文件级别,范围墓碑被处理成已排序且不重叠的“片段” (fragments)。这使得在单个 SST 文件内可以通过二分查找快速定位覆盖某个 key 的墓碑。
    • 当迭代器进行范围扫描时,它会为每个相关的 SST 文件(和 Memtable)创建一个范围墓碑的子迭代器,并将它们组合起来。在迭代过程中,动态地、增量地判断当前 key 是否被任何一个子迭代器中的墓碑覆盖。这可以看作是一种**“隐式的”或“懒加载的”Skyline**。
  • 性能:

    • 优点:
      • 迭代器创建和首次定位非常快,因为它避免了 v1 中昂贵的全局构建过程。
      • 点查询和短距离扫描性能大幅提升,因为可以利用二分查找。
    • 缺点:
      • 对于长距离扫描,由于需要同时管理多个子迭代器并在每一步都进行比较,其开销可能会比 v1 构建好全局 Skyline 后的情况要高。
      • HISTORY.md 中也提到了针对 v2 设计的持续性能优化,例如让迭代器在可能的情况下直接跳过整个被删除的范围,而不是逐一检查其中的 key。

2018-11-21-delete-range.markdown

// ... existing code ...
## v2: Making it fast
### RepresentationsThe key idea of the redesign is that, instead of globally collapsing range tombstones,we can locally “fragment” them for each SST file and memtable to guarantee that:* no range tombstones overlap; and
* range tombstones are ordered by start key.Combined, these properties make range tombstones binary searchable.
// ... existing code ...

Skyline 算法和范围墓碑处理的复杂逻辑 深植于 RocksDB 的内部,主要涉及:

  • 迭代器实现 (db/db_iter.cctable/iterator_wrapper.h 等): 在 Seek() 和 Next() 等操作中融合范围墓碑的检查逻辑。
  • SST 文件格式和读取 (table/block_based/block_based_table_reader.cc): 如何存储、缓存和查询范围墓碑的“片段”。
  • Compaction 过程: 如何在合并数据时处理和最终清理掉范围墓碑。

这些部分的源码非常复杂,但其设计思想与上述博客文章中描述的 v2 版本一致。

这是删除大量连续数据的最佳方式。典型的例子包括:

  • 清理时序数据: 删除所有时间戳早于某个日期的日志或指标。
  • 多租户数据隔离: 当一个租户被删除时,如果该租户的所有 key 都有一个共同的前缀,可以使用 DeleteRange 高效地删除其全部数据。
  • TTL (Time-to-Live)DeleteRange 是实现数据过期功能的基础。

正如文档 delete-range.markdown 所述,DeleteRange 避免了“先扫描再删除(scan-and-delete)”模式的低效(大量读IO和写放大)和使用 CompactionFilter 的复杂性与异步性。

Get

Get 是 RocksDB 中最核心的单点读取操作。它的目标是高效地从可能分布在内存和多层磁盘文件中的数据里,找到指定 key 对应的最新版本的值。其执行路径完美地体现了 LSM-Tree(日志结构合并树)的读操作精髓。

整个 Get 流程可以分为四个主要阶段:API 入口与准备 -> 获取一致性视图 (SuperVersion) -> 分层查找 -> 结果处理与返回

阶段一:API 入口与准备 (DBImpl::Get)

DBImpl::Get 函数是 Get 操作的公共 API 实现。它更像一个“门面”,负责一些初步处理和参数校验,然后调用内部核心实现。

db_impl.cc

// ... existing code ...
Status DBImpl::Get(const ReadOptions& _read_options,ColumnFamilyHandle* column_family, const Slice& key,PinnableSlice* value, std::string* timestamp) {assert(value != nullptr);value->Reset();// 1. 校验 I/O 活动类型,这是为了内部监控和统计if (_read_options.io_activity != Env::IOActivity::kUnknown &&_read_options.io_activity != Env::IOActivity::kGet) {return Status::InvalidArgument("Can only call Get with `ReadOptions::io_activity` is ""`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");}// 2. 创建一个 ReadOptions 的副本,并设置默认的 io_activityReadOptions read_options(_read_options);if (read_options.io_activity == Env::IOActivity::kUnknown) {read_options.io_activity = Env::IOActivity::kGet;}// 3. 调用真正的内部核心实现Status s = GetImpl(read_options, column_family, key, value, timestamp);return s;
}
// ... existing code ...

这个入口函数主要做了三件事:

  1. 断言 value 指针非空并重置它。
  2. 检查并设置 ReadOptions 中的 io_activity,用于 I/O 相关的统计和追踪。
  3. 将所有参数传递给内部的 GetImpl 函数,开始真正的查找之旅。

阶段二:获取一致性视图 (DBImpl::GetImpl 的前半部分)

GetImpl 是 Get 的核心。它首先要做的不是立即去查找数据,而是建立一个进行查找所必需的“上下文环境”,其中最关键的就是 SuperVersion 和 SequenceNumber

// ... existing code ...
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,GetImplOptions& get_impl_options) {
// ... existing code ...// 1. 时间戳(Timestamp)相关校验if (read_options.timestamp) {const Status s = FailIfTsMismatchCf(get_impl_options.column_family,*(read_options.timestamp));if (!s.ok()) {return s;}} else {const Status s = FailIfCfHasTs(get_impl_options.column_family);if (!s.ok()) {return s;}}
// ... existing code ...// 2. 启动性能计时器和追踪PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
// ... existing code ...auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(get_impl_options.column_family);auto cfd = cfh->cfd();
// ... existing code ...// 3. 获取并引用 SuperVersionSuperVersion* sv = GetAndRefSuperVersion(cfd);
// ... existing code ...// 4. 确定查询的序列号(快照版本)SequenceNumber snapshot;if (read_options.snapshot != nullptr) {// 如果用户提供了快照,就使用快照的序列号snapshot =static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;} else {// 否则,获取当前最新的“已发布”序列号snapshot = GetLastPublishedSequence();
// ... existing code ...}
// ... existing code ...// 5. 构造 LookupKeyLookupKey lkey(key, snapshot, read_options.timestamp);
// ... existing code ...

这个阶段的步骤分解如下:

  1. 参数校验:特别是对时间戳(Timestamp)的校验,确保列族(Column Family)的配置和读请求的参数是一致的。
  2. 性能追踪:启动各种性能计数器,用于监控 Get 操作的耗时。
  3. 获取 SuperVersion:这是至关重要的一步。SuperVersion 是 RocksDB 内部的一个核心数据结构,它封装了一个列族在某个时间点的一致性视图。它包含了:
    • 一个指向当前可写 MemTable (mem) 的指针。
    • 一个指向不可变 MemTable 列表 (imm) 的指针。
    • 一个指向当前版本信息 (Version) 的指针,Version 描述了磁盘上所有 SST 文件的布局。 通过对 SuperVersion 进行引用计数 (Ref()),可以保证在本次 Get 操作期间,它所指向的这些 MemTable 和 SST 文件不会被后台的 Flush 或 Compaction 进程清理掉,从而保证了读取的一致性。
  4. 确定序列号 (Snapshot):RocksDB 通过序列号(SequenceNumber)来实现多版本并发控制(MVCC)。Get 操作必须在一个明确的序列号下进行读取。
    • 如果用户在 ReadOptions 中指定了 snapshot,就使用该快照对应的序列号。
    • 否则,获取当前数据库最新的序列号。这一步必须在获取 SuperVersion 之后做,以防止在间隙中发生 Compaction 导致数据丢失。
  5. 构造 LookupKey:将用户提供的 key、上一步确定的 snapshot 序列号以及可选的 timestamp 组合成一个内部查找键 LookupKey。后续的所有查找都将使用这个 LookupKey

阶段三:分层查找 (DBImpl::GetImpl 的核心)

这是 Get 操作的“重头戏”。LSM-Tree 的读取遵循一个基本原则:从新到旧,从快到慢。一旦找到,立即返回。查找顺序是:Mutable MemTable -> Immutable MemTable -> SST Files (L0 -> L1 -> ... -> Lmax)

// ... existing code ...// 查找路径开始...bool done = false;
// ... existing code ...if (!skip_memtable) {// 步骤 1: 在可写 MemTable (mem) 中查找if (sv->mem->Get(lkey, get_impl_options.value, &s, &merge_context,&max_covering_tombstone_seq, get_impl_options.timestamp,get_impl_options.columns, get_impl_options.callback)) {done = true;} else if (s.IsMergeInProgress()) {// 找到一个 Merge 操作,需要继续往下找基础值} else if (!s.ok()) {done = true;}if (!done && !s.IsMergeInProgress()) {// 步骤 2: 在不可变 MemTable 列表 (imm) 中查找sv->imm->Get(lkey, get_impl_options.value, &s, &merge_context,&max_covering_tombstone_seq, get_impl_options.timestamp,get_impl_options.columns, get_impl_options.callback);if (!s.ok()) {done = true;}}}if (!done) {// 步骤 3: 在 SST 文件中查找PERF_TIMER_GUARD(get_from_output_files_time);sv->current->Get(read_options, lkey, get_impl_options, &s, &merge_context,&max_covering_tombstone_seq, &read_stats,/*version_read_tier=*/kReadAllTier,/*single_read_bytes=*/nullptr,/*allow_unprepared_value=*/true);}
// ... existing code ...
  1. 查找可写 MemTable (mem):

    • 这是数据最新的地方,也是全内存操作,速度最快。
    • 如果找到一个普通值 (kTypeValue),则 done 设为 true,查找结束。
    • 如果找到一个删除标记 (kTypeDeletion 或 kTypeSingleDeletion),也意味着查找结束,结果为 NotFound
    • 如果找到一个合并操作 (kTypeMerge),说明这是一个需要合并的 key,Get 操作会记录下这个 merge 操作,然后继续向更老的数据层查找它的基础值(base value)。
  2. 查找不可变 MemTable 列表 (imm):

    • 如果在上一步没找到确切结果(done 仍为 false),则按时间从新到旧的顺序遍历这个列表。
    • 查找逻辑与可写 MemTable 完全相同。
  3. 查找 SST 文件 (sv->current->Get):

    • 如果内存中(MemTable)一无所获,就必须进入磁盘上的 SST 文件进行查找。这是 Get 操作中最复杂、也可能是最耗时的部分。
    • 布隆过滤器 (Bloom Filter): 在读取任何 SST 文件之前,会先检查它的布隆过滤器。如果过滤器判断 key 一定不存在,则可以完全跳过对该文件的读取,极大地减少了不必要的 I/O。
    • 分层查找:
      • Level 0 (L0): L0 的 SST 文件之间 key 的范围是可能重叠的。因此,需要按照从新到旧的顺序,依次检查每一个可能包含该 key 的 L0 文件。
      • Level 1+ (L1及以上): L1 及以上层级的 SST 文件,其 key 的范围是互不重叠的。因此,在每一层,通过二分查找就可以快速定位到唯一一个可能包含该 key 的文件。这比 L0 的查找效率高得多。
    • SST 文件内部查找:
      • 块缓存 (Block Cache): RocksDB 会将 SST 的数据块(Data Block)和索引块(Index Block)缓存在内存中。如果需要的数据块在缓存中,就可以避免一次磁盘 I/O。
      • 索引块 -> 数据块: 首先读取(或从缓存获取)SST 的索引块,通过索引块找到 key 可能所在的具体数据块的位置。
      • 读取数据块: 读取(或从缓存获取)对应的数据块,然后在数据块内部通过二分查找最终定位到 key

阶段四:结果处理与返回 (DBImpl::GetImpl 的后半部分)

查找到结果后,需要进行最后的处理并释放资源。

// ... existing code ...// 1. 释放 SuperVersion 的引用ReturnAndCleanupSuperVersion(cfd, sv);// 2. 处理 Merge 结果 (如果需要)// ... (复杂的 Merge 逻辑)// 3. 更新统计信息if (stats_ != nullptr) {// ...}return s;
}
// ... existing code ...
  1. 资源释放: 调用 ReturnAndCleanupSuperVersion,减少 SuperVersion 的引用计数。当计数归零时,其占用的资源会被回收。这是保证无锁读取和后台清理能正确协作的关键。
  2. 结果封装:
    • 如果找到了值,会通过 PinnableSlice 返回。PinnableSlice 是一个重要的优化,如果数据块在块缓存中,它可以直接指向缓存中的内存,并通过“钉住”(pin)该缓存块来防止它被淘汰,从而避免了不必要的数据拷贝
    • 如果最终状态是 NotFound 或其他错误,则返回对应的 Status 对象。

总结

DBImpl::Get 的执行流是一个精心设计的多层查找过程:

  • 一致性保证:通过 SuperVersion 和 SequenceNumber 确保了在并发环境下读取到一致的数据快照。
  • 分层查找:严格遵循“内存优先,磁盘其次;L0遍历,L1+二分”的策略,最大化利用高速缓存,减少慢速I/O。
  • 多重优化
    • 布隆过滤器:快速排除不含目标 key 的文件。
    • 块缓存:复用读过的索引和数据块,减少磁盘读取。
    • PinnableSlice:避免从块缓存到用户缓冲区的内存拷贝。
    • L1+文件不重叠:将多文件查找优化为单文件查找。

整个过程体现了 LSM-Tree 架构为优化写性能而在读路径上所做的权衡与优化,使其在大多数情况下依然能提供非常高效的读取性能。

MultiGet 和 GetEntity

总的来说,它们是为不同场景设计的读取接口:

  • MultiGet 是 Get 的 批量性能优化版,用于一次性高效读取多个标准的“键-值”对。
  • GetEntity (及其批量版 MultiGetEntity) 是为了一种不同的数据模型——宽列(Wide Columns)——设计的读取接口。

下面我们来逐一深入分析。


MultiGet: 高效的批量读取

MultiGet 的核心目标是解决“循环调用 Get” 效率低下的问题。当需要一次性查询多个 key 时,使用 MultiGet 会比自己写 for 循环调用 Get 快得多。

从 db/db_impl/db_impl.cc 中可以看到其实现,其高效性主要源于以下几个关键优化点:

1. 批量化与排序 (Batching & Sorting)

这是最核心的优化。MultiGet 并不是简单地在内部循环调用 Get

// ... existing code ...
void DBImpl::MultiGetCommon(const ReadOptions& read_options,const size_t num_keys,ColumnFamilyHandle** column_families,
// ... existing code ...const bool sorted_input) {
// ... existing code ...// 将所有待查询的 key 封装成 KeyContextfor (size_t i = 0; i < num_keys; ++i) {sorted_keys[i] = &key_context[i];}// 除非用户保证输入已排序,否则进行排序PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);// 按列族(Column Family)对排好序的 key 进行分组autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE>key_range_per_cf;
// ... existing code ...for (size_t i = 0; i < num_keys; ++i) {KeyContext* key_ctx = sorted_keys[i];if (key_ctx->column_family != cf) {key_range_per_cf.emplace_back(cf_start, i - cf_start);
// ... existing code ...}}
// ... existing code ...
}

如代码所示,MultiGet 会:

  1. 排序:调用 PrepareMultiGetKeys 对所有待查询的 key 进行排序。排序规则是先按列族(Column Family)ID,再按 key 的内容
  2. 分组:将排好序的 key 按列族进行分组。

为什么这么做? 排序和分组带来了巨大的好处:数据局部性(Data Locality)。当处理同一个列族的一批排好序的 key 时,这些 key 在 SST 文件中物理存储的位置也很可能是相邻的。这使得后续的磁盘读取可以更连续、更集中,从而大幅提升效率。

2. 一致性快照 (Consistent Snapshot)

MultiGet 需要保证一次调用中读取的所有 key 都来自同一个时间点的数据库快照,以确保一致性。它通过 MultiCFSnapshot 函数实现这一点,该函数可以原子性地获取所有相关列族的 SuperVersion 和一个统一的序列号(SequenceNumber),避免了在多次 Get 调用之间数据库状态发生变化的问题。

3. 并行与批量 I/O (Parallel & Batched I/O)

这是性能提升的另一个关键。根据 RocksDB 的官方文档,MultiGet 在底层实现了更智能的 I/O 模式。

参考资料docs/_posts/2022-10-07-asynchronous-io-in-rocksdb.markdown The MultiGet API accepts a batch of keys as input. Its a more efficient way of looking up multiple keys compared to a loop of Gets. One way MultiGet is more efficient is by reading multiple data blocks from an SST file in a batch, for keys in the same file. This greatly reduces the latency of the request, compared to a loop of Gets. The MultiRead FileSystem API is used to read a batch of data blocks.

文档中提到:

  • 批量读取数据块:对于落在同一个 SST 文件中的多个 key,MultiGet 可以将它们的读取请求合并,通过底层的 MultiRead 接口一次性从磁盘读取多个数据块,而不是一个一个地读。
  • 并行读取文件:更进一步,MultiGet 内部利用协程(Coroutines)等异步技术,可以并行地从不同的 SST 文件中读取数据。

这两种 I/O 优化极大地降低了由磁盘寻道和多次系统调用带来的延迟。

RocksDB 如何使用协程(Coroutines)

RocksDB 并非自己从零实现协程,而是依赖于第三方库,主要是 Facebook 开源的 folly 库。

docs/_posts/2022-10-07-asynchronous-io-in-rocksdb.markdown 明确指出了引入协程的动机:为了在 MultiGet 中实现更高效的异步并行 I/O

传统的异步编程通常使用回调(Callback)方式,但这会导致代码逻辑被分割,形成所谓的“回调地狱”,状态管理非常复杂。文章中提到:

In order to simplify it, we used async IO with C++ coroutines. The TableReader::MultiGet is implemented as a coroutine, and the coroutine is suspended after issuing async reads for missing data blocks. This allows the top-level MultiGet to iterate through the TableReaders for all the keys, before waiting for the reads to finish and resuming the coroutines.

简单来说,协程可以将异步代码写成看似同步的逻辑:

  1. TableReader::MultiGet 作为一个协程开始执行。
  2. 当它需要从磁盘读取一个数据块时,它会发起一个异步读请求。
  3. 然后,它会**挂起(suspend)**自己,将CPU控制权交还给调用者(顶层的 MultiGet 逻辑)。
  4. 顶层的 MultiGet 可以继续去处理其他 key,并发起更多的异步读请求。
  5. 当某个异步读操作完成后,对应的协程会被恢复(resume),从它上次挂起的地方继续执行。

这种方式极大地简化了在多个文件中并行读取数据块的复杂逻辑。 

a. 依赖 folly 库

CMakeLists.txt 文件清楚地表明了对 folly 的依赖关系,这是使用协程的前提。

CMakeLists.txt

// ... existing code ...
if(USE_COROUTINES)if(USE_FOLLY OR USE_FOLLY_LITE)message(FATAL_ERROR "Please specify exactly one of USE_COROUTINES,"" USE_FOLLY, and USE_FOLLY_LITE")endif()set(CMAKE_CXX_STANDARD 20)set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines -Wno-maybe-uninitialized")
// ... existing code ...add_compile_definitions(USE_COROUTINES)set(USE_FOLLY 1)
endif()if(USE_FOLLY)
// ... existing code ...find_package(folly)# If cmake could not find the folly-config.cmake file, fall back# to looking in third-party/folly for folly and its dependenciesif(NOT FOLLY_LIBRARIES)
// ... existing code ...

这里的关键点:

  • USE_COROUTINES 宏是启用协程功能的开关。
  • 启用协程需要 C++20 标准 (set(CMAKE_CXX_STANDARD 20)) 和特定的编译器标志 (-fcoroutines)。
  • 它会设置 USE_FOLLY 为 1,并调用 find_package(folly) 来寻找和链接 folly 库。
b. 头文件包含

util/coro_utils.h 文件直接包含了 folly 的协程头文件。

coro_utils.h

//  Copyright (c) Meta Platforms, Inc. and affiliates.
//
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).#if defined(USE_COROUTINES)
#include "folly/coro/Coroutine.h"
#include "folly/coro/Task.h"
#endif
#include "rocksdb/rocksdb_namespace.h"// This file has two sctions. The first section applies to all instances of
// header file inclusion and has an include guard. The second section is
// meant for multiple inclusions in the same source file, and is idempotent.

folly::coro::Task 是 folly 库中用于定义协程返回值的核心类型。

c. 协程函数的实现方式

table/block_based/block_based_table_reader.cc 文件展示了协程在 RocksDB 中是如何被巧妙实现的。它通过宏定义,将同一份逻辑代码编译成同步和**异步(协程)**两个版本。

block_based_table_reader.cc

// ... existing code ...
// Generate the regular and coroutine versions of some methods by
// including block_based_table_reader_sync_and_async.h twice
// Macros in the header will expand differently based on whether
// WITH_COROUTINES or WITHOUT_COROUTINES is defined
// clang-format off
#define WITHOUT_COROUTINES
#include "table/block_based/block_based_table_reader_sync_and_async.h"
#undef WITHOUT_COROUTINES
#define WITH_COROUTINES
#include "table/block_based/block_based_table_reader_sync_and_async.h"
#undef WITH_COROUTINES
// clang-format on

这个文件两次包含了 block_based_table_reader_sync_and_async.h

  1. 第一次定义 WITHOUT_COROUTINES,生成同步版本的函数。
  2. 第二次定义 WITH_COROUTINES,生成使用 co_awaitco_return 等协程关键字的异步版本函数。

这种技巧避免了维护两套功能相同但实现方式(同步/异步)不同的代码,降低了维护成本。

博客文章也提到了使用协程的一些代价和限制:

  • 依赖 Folly: 引入了额外的编译步骤和依赖。
  • CPU开销: 协程本身有不可忽略的CPU开销,在某些场景下(如100%缓存命中)可能会比同步版本更慢。
  • 并非所有读操作都并行化: 元数据(如索引块)的读取仍然是同步阻塞的。
总结
  1. RocksDB 使用协程主要用于优化 MultiGet 和 Iterator 的 I/O 性能。

  2. 依赖于 Facebook 的 folly 库提供的协程实现,并结合 C++20 的原生协程支持。

  3. docs/_posts/2022-10-07-asynchronous-io-in-rocksdb.markdown:详细阐述了引入协程的设计思想、实现原理和性能收益。

GetEntity: 读取宽列数据

GetEntity 和它的批量版本 MultiGetEntity 服务于一个完全不同的数据模型:宽列(Wide Columns)

在标准的 RocksDB 模型中,一个 key 对应一个 value(一个 Slice)。而宽列模型允许一个 key 对应多个命名的列(属性-值对)

  • 标准模型key -> value
  • 宽列模型key -> { "column_name_1": "value_1", "column_name_2": "value_2", ... }

例如,可以用一个用户 ID 作为 key,然后将用户的姓名、城市、年龄等作为不同的列存储,而不是将它们序列化成一个 JSON 字符串作为单个 value。

GetEntity 的作用就是读取一个 key 对应的所有列。

include/rocksdb/utilities/transaction.h

virtual void MultiGetEntity(const ReadOptions& options,ColumnFamilyHandle* column_family,size_t num_keys, const Slice* keys,PinnableWideColumns* results, Status* statuses,bool sorted_input = false) = 0;

它的接口和 MultiGet 类似,但最关键的区别在于返回值类型:

  • MultiGet 返回 PinnableSlice* values,每个 key 对应一个 PinnableSlice
  • MultiGetEntity 返回 PinnableWideColumns* results,每个 key 对应一个 PinnableWideColumns 对象,该对象内部包含了这个 key 的所有列。

GetEntity 则是 MultiGetEntity 的单 key 版本。


总结与对比

特性MultiGetGetEntity / MultiGetEntity
数据模型标准键值对 (key -> value)宽列 (key -> {col1:val1, col2:val2, ...})
核心目的性能优化:高效批量读取标准键值对功能接口:读取宽列数据模型的特定接口
输入一组 Slice 类型的 key一组 Slice 类型的 key
输出一组 PinnableSlice(每个 key 一个 value)一组 PinnableWideColumns(每个 key 一组列)
关键优化排序、分组、批量I/O、并行I/O继承了 MultiGet 的批量优化思想

结论: 应该根据 数据存储方式来选择使用哪个函数。

  • 如果 应用使用的是标准的 key-value 存储,当需要一次读取多个 key 时,请务必使用 MultiGet 以获得最佳性能。
  • 如果 使用了 RocksDB 的宽列特性(例如通过 PutEntity 写入数据),那么 必须使用 GetEntity 或 MultiGetEntity 来读取这些数据。

GetMergeOperands

GetMergeOperands 是一个诊断和调试性质的 API。它的作用不是获取最终合并后的值,而是获取一个 key 对应的所有尚未合并的 Merge 操作数(operands)

当对一个 key 多次调用 Merge 操作时,RocksDB 并不会立即将这些操作合并(这称为“懒合并” Lazy Merge)。它会把这些 Merge 操作(比如一连串的字符串追加、JSON 字段更新等)像日志一样堆叠起来。只有在 Compaction(合并)或者 Get 操作真正需要读取最终值时,才会执行“完全合并”(Full Merge)计算出结果。

GetMergeOperands 这个函数就是让你能窥探这个中间状态,它会返回:

  1. 这个 key 的原始基础值(如果存在的话)。
  2. 所有堆叠在基础值之上的、尚未被合并的 Merge 操作数列表

从头文件 db/db_impl/db_impl.h 中可以看到它的实现非常直接:

// ... existing code ...using DB::GetMergeOperands;Status GetMergeOperands(const ReadOptions& options,ColumnFamilyHandle* column_family, const Slice& key,PinnableSlice* merge_operands,GetMergeOperandsOptions* get_merge_operands_options,int* number_of_operands) override {GetImplOptions get_impl_options;get_impl_options.column_family = column_family;get_impl_options.merge_operands = merge_operands;get_impl_options.get_merge_operands_options = get_merge_operands_options;get_impl_options.number_of_operands = number_of_operands;get_impl_options.get_value = false; // <-- 关键点return GetImpl(options, key, get_impl_options);}
// ... existing code ...

它内部调用了万能的 GetImpl 函数,但传递了一个关键参数 get_impl_options.get_value = false。这个标志告诉 GetImpl:“不要执行完全合并,只需要把找到的所有 Merge 操作数收集起来然后返回”。

原来的 Get 完全支持 Merge 操作,并且是 Merge 操作的最终消费者。Get 和 GetMergeOperands  的关系可以理解为:

  • Get(key, &value): 这是面向最终用户的标准读取接口。当 Get 发现一个 key 关联了一系列 Merge 操作时,它会:

    1. 找到这个 key 的基础值(base value)。
    2. 收集所有比基础值更新的 Merge 操作数。
    3. 调用用户定义的 MergeOperator 的 FullMerge 方法,将基础值和所有操作数进行计算。
    4. 返回最终计算出的单一结果给用户。 这个过程对用户是透明的,用户只关心最终结果,不关心中间有多少次 Merge
  • GetMergeOperands(key, &operands): 这是面向开发者或高级用户的诊断接口。它会跳过上面 Get 的第 3 和第 4 步。它不进行计算,而是直接返回第 1 和第 2 步收集到的原始数据

为什么需要 GetMergeOperands

  1. 调试与问题排查:当 Merge 的结果不符合预期时,可以用这个函数查看是哪些原始操作数参与了合并,从而定位问题。
  2. 性能分析:一个 key 堆积了过多的 Merge 操作数会严重影响 Get 的性能(因为每次 Get 都要做一次昂贵的 FullMerge)。通过 GetMergeOperands 可以监控操作数的数量,判断是否需要手动触发 Compaction 来提前合并它们。
  3. 高级应用场景:在某些特殊场景下,应用可能需要自己处理合并逻辑,或者只想获取增量更新的部分。例如,HISTORY.md 中提到的修复 GetMergeOperands() 的 bug,就说明了它在一些复杂场景(如带有回调函数)下的重要性。

Get 不仅支持 Merge,还是 Merge 机制能够工作的核心环节之一。GetMergeOperands 则是为了提供更强的可观察性而分离出来的一个辅助工具。

DBImpl::Flush

Flush 是 RocksDB 中一个至关重要的操作。它的核心作用是将内存中的数据(存在于 MemTable 中)持久化到磁盘上的 SST 文件中。这个过程不仅能释放内存,也是数据从内存进入 LSM-Tree 磁盘结构的必经之路。

这里分析Flush 的一个重载版本,它允许用户一次性地对多个列族(Column Family)发起 Flush 请求。

db_impl_compaction_flush.cc

// ... existing code ...
Status DBImpl::Flush(const FlushOptions& flush_options,const std::vector<ColumnFamilyHandle*>& column_families) {Status s;if (!immutable_db_options_.atomic_flush) {for (auto cfh : column_families) {s = Flush(flush_options, cfh);if (!s.ok()) {break;}}} else {ROCKS_LOG_INFO(immutable_db_options_.info_log,"Manual atomic flush start.\n""=====Column families:=====");for (auto cfh : column_families) {auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",cfhi->GetName().c_str());}ROCKS_LOG_INFO(immutable_db_options_.info_log,"=====End of column families list=====");autovector<ColumnFamilyData*> cfds;std::for_each(column_families.begin(), column_families.end(),[&cfds](ColumnFamilyHandle* elem) {auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);cfds.emplace_back(cfh->cfd());});s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, cfds);ROCKS_LOG_INFO(immutable_db_options_.info_log,"Manual atomic flush finished, status: %s\n""=====Column families:=====",s.ToString().c_str());for (auto cfh : column_families) {auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",cfhi->GetName().c_str());}ROCKS_LOG_INFO(immutable_db_options_.info_log,"=====End of column families list=====");}return s;
}
// ... existing code ...

这个函数的执行逻辑可以清晰地分为两个分支,由一个关键的数据库选项 atomic_flush 控制。

阶段一:API 入口与分支选择

DBImpl::Flush 函数首先会检查 immutable_db_options_.atomic_flush 选项的值,这个选项决定了当刷新多个列族时,其行为是原子性的还是非原子性的

1. 非原子性 Flush (atomic_flush == false)

这是默认的行为。

// ...if (!immutable_db_options_.atomic_flush) {for (auto cfh : column_families) {s = Flush(flush_options, cfh);if (!s.ok()) {break;}}} 
// ...
  • 执行方式:代码会简单地遍历用户传入的 column_families 列表。
  • 逐一调用:在循环中,它会逐个调用处理单个列族的 Flush 函数 Flush(flush_options, cfh)
  • 非原子性:这意味着每个列族的 Flush 操作是独立的。如果中途某个列族的 Flush 失败,循环会中断并返回错误,但之前已经成功 Flush 的列族将保持完成状态。它不保证“要么全部成功,要么全部失败”的原子性。
2. 原子性 Flush (atomic_flush == true)

当用户启用原子 Flush 时,代码会走 else 分支。

// ...} else {// ... 日志记录 ...autovector<ColumnFamilyData*> cfds;std::for_each(column_families.begin(), column_families.end(),[&cfds](ColumnFamilyHandle* elem) {auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);cfds.emplace_back(cfh->cfd());});s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, cfds);// ... 日志记录 ...}
// ...
  • 准备工作:它首先会记录一些日志,然后将用户传入的 ColumnFamilyHandle* 列表转换成内部使用的 ColumnFamilyData* 列表。
  • 核心调用:接着,它会调用一个关键的内部函数 AtomicFlushMemTables。所有列族的 Flush 逻辑都被委托给了这个函数。
  • 原子性保证AtomicFlushMemTables 的核心职责是确保所有指定列族的 MemTable 要么全部成功刷到磁盘,要么全部失败。这是通过将所有新生成的 SST 文件的元数据信息通过一次原子的 MANIFEST 文件写入来实现的。如果中途发生故障,恢复时数据库会处于 Flush 操作之前的状态,保证了跨列族的一致性。

阶段二:深入核心实现

为了完全理解 Flush,我们需要进一步分析它所调用的核心内部函数。

FlushMemTable (非原子 Flush 的核心)

在非原子模式下,最终会调用到 FlushMemTable。这个函数负责单个列族的 Flush 流程。

// ... existing code ...// Force current memtable contents to be flushed.Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,FlushReason flush_reason,bool entered_write_thread = false);
// ... existing code ...

FlushMemTable 及其后续流程:

  1. 前台线程(调用 FlushMemTable 的线程)

    • 1.1 前置检查:检查是否会造成写停顿(Stall),并根据 FlushOptions 决定是否等待。
    • 1.2 加锁与排队:获取全局互斥锁 mutex_,并进入 write_thread_ 写队列,确保与普通写操作的顺序一致性。
    • 1.3 切换 MemTable (核心动作之一):调用 SwitchMemtable()
      • 将当前的可写 MemTable (Mutable MemTable) 变成只读的 (Immutable MemTable)。
      • 将这个新的 Immutable MemTable 添加到该列族的 imm() 列表(一个 MemTableList 对象)的末尾。
      • 创建一个全新的、空的 Mutable MemTable 供后续的写操作使用。
    • 1.4 创建并调度后台任务
      • 创建一个 FlushRequest 对象。
      • 调用 EnqueuePendingFlush() 将请求放入一个待处理队列
      • 调用 MaybeScheduleFlushOrCompaction() 唤醒后台线程池中的一个线程。
    • 1.5 解锁与返回:退出写队列,释放 mutex_。如果 flush_options.wait 为 false,函数到此返回,前台操作继续。
  2. 后台线程(被唤醒的 BGWorkFlush 线程)

    • 2.1 挑选任务:后台线程被唤醒后,会从待处理队列中取出 FlushRequest
    • 2.2 创建 FlushJob 对象

      db_impl_compaction_flush.cc

      // ... existing code ...
      // To address this, we make sure NotifyOnFlushBegin() executes after memtable
      // picking so that no new snapshot can be taken between the two functions.FlushJob flush_job(dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,job_context, flush_reason, log_buffer, directories_.GetDbDir(),GetDataDir(cfd, 0U),GetCompressionFlush(cfd->ioptions(), mutable_cf_options), stats_,&event_logger_, mutable_cf_options.report_bg_io_stats,true /* sync_output_directory */, true /* write_manifest */, thread_pri,io_tracer_, cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_,db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_);
      // ... existing code ...
      
      创建一个 FlushJob 实例,它包含了执行 Flush 所需的所有上下文信息。
    • 2.3 FlushJob::PickMemTable():在 mutex_ 保护下,从列族的 imm() 列表中挑选出需要被本次 FlushJob 处理的一个或多个 Immutable MemTable。
    • 2.4 FlushJob::Run() (核心动作之二)
      • 释放 mutex_,开始执行耗时的 I/O 操作。
      • 遍历挑选出的 MemTable 中的数据,并将其写入一个新的 SST 文件。这是将数据从内存持久化到磁盘的关键步骤。
      • I/O 操作完成后,重新获取 mutex_
    • 2.5 安装结果 (Install Results)
      • 在 mutex_ 保护下,调用 MemTableList::TryInstallMemtableFlushResults()
      • 生成一个 VersionEdit,记录这次变更(例如:在 L0 层增加了一个新的 SST 文件,并删除了对应的 MemTable)。
      • 将 VersionEdit 写入 MANIFEST 文件,并应用到内存中的 VersionSet,使新的 SST 文件对读操作可见。
      • 将已成功刷盘的 MemTable 从 imm() 列表中移除,并准备释放其内存。
    • 2.6 清理与唤醒:释放 mutex_。如果前台有线程在等待 (flush_options.wait == true),则唤醒它们。FlushJob 结束。
AtomicFlushMemTables (原子 Flush 的核心)

这个函数是实现原子 Flush 的关键。

db_impl.h

// ... existing code ...// Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds`// (if non-empty) or amomg all column families and atomically record the// result to the MANIFEST.Status AtomicFlushMemTables(const FlushOptions& options, FlushReason flush_reason,const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},bool entered_write_thread = false);
// ... existing code ...

它的流程比 FlushMemTable 更复杂:

  1. 统一准备:它会一次性为所有需要 Flush 的列族准备好 Flush 任务。
  2. 并行刷盘:它可能会并行地将多个列族的 Immutable MemTable 写入各自的 SST 文件。
  3. 统一提交:最关键的一步是,它会将所有列族的变更(比如 CF1 增加文件 A,CF2 增加文件 B)合并成一个单一的 VersionEdit
  4. 原子写入 MANIFEST:最后,将这个包含了所有变更的 VersionEdit 一次性地写入 MANIFEST 文件。这确保了从数据库元数据的角度看,所有列族的 Flush 是一个不可分割的操作。

Flush 到底是什么语义?

Flush 在 RocksDB 中是一个过程,而不是一个单一的原子动作。它包含两个紧密相连但又可以区分的阶段:

  1. 将 MemTable 从“可写”变为“只读” (MemTable Rotation / Switching)

    • 触发者DBImpl::FlushMemTable -> DBImpl::SwitchMemtable
    • 语义: 这是 Flush 过程的开始。它将当前活跃的、正在接收写入的 MemTable(Mutable MemTable)“固化”,使其成为一个内容不再改变的 Immutable MemTable,并放入一个待处理列表(imm())。同时,一个新的空 MemTable 会被创建出来,接替成为新的 Mutable MemTable,保证写操作可以不间断地进行。
    • Flush 包含了“把 mem 变为 read only”这一步。这是整个流程的第一阶段。
  2. 将“只读”的 MemTable 数据写入磁盘 SST 文件 (MemTable Persisting)

    • 触发者: 后台 FlushJob 线程。
    • 语义: 这是 Flush 过程的核心实体工作。后台线程会处理 imm() 列表中的一个或多个 Immutable MemTable,将其中的数据迭代出来,编码、压缩,并最终写入一个全新的 SST 文件中。写完之后,会更新数据库的元数据(MANIFEST),正式将这个 SST 文件纳入 LSM-Tree 的管理体系。
    • Flush 也包含了“把 read only 变为磁盘 SST”这一步。这是整个流程的第二阶段,也是最终目的。

Flush 的完整语义是一个从内存到磁盘的持久化过程,它起始于将可写 MemTable 切换为只读,最终完成于将该只读 MemTable 的内容写入磁盘上的 SST 文件。这两个阶段共同构成了 Flush 的完整含义。

可以将 DBImpl::FlushMemTable 函数看作是这个过程的发起者,它完成了第一阶段(切换),并启动了第二阶段(调度后台任务)。而 FlushJob 则是第二阶段的执行者

总结

DBImpl::Flush(const FlushOptions&, const std::vector<ColumnFamilyHandle*>&) 是一个功能强大的 API,它为用户提供了批量刷新多个列族的能力。

  • 它的行为由 atomic_flush 选项决定,可以在**逐个刷新(非原子)一体刷新(原子)**之间选择。
  • 非原子模式实现简单,通过循环调用单个列族的 Flush 来完成,容错性较低。
  • 原子模式实现复杂,通过 AtomicFlushMemTables 函数,确保所有列族的 Flush 作为一个整体成功或失败,为需要跨列族数据一致性的应用场景提供了重要保障。
  • 无论是哪种模式,Flush 操作的本质都是将内存中的 MemTable 数据持久化为磁盘上的 SST 文件,这是 LSM-Tree 架构的核心机制之一。

DBImpl::FlushMemTable

FlushMemTable 是 RocksDB 中执行非原子性 Flush 的核心内部函数。当用户调用 DB::Flush(options, column_family) 且 atomic_flush 选项为 false 时,最终会由这个函数来处理。它的职责是发起一个指定列族(Column Family)的 Flush 过程,即将该列族内存中的 MemTable 数据转存为磁盘上的 SST 文件。

这个函数本身并不执行 I/O 操作,而是扮演一个协调者的角色:它在正确的同步保护下,准备好待刷盘的数据,然后将实际的刷盘任务调度给后台线程去完成。

下面是该函数的详细逻辑分析,我们将按照其执行顺序逐一拆解。

db_impl_compaction_flush.cc

// ... existing code ...
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,const FlushOptions& flush_options,FlushReason flush_reason,bool entered_write_thread) {
// ... existing code ...

函数签名解析

  • ColumnFamilyData* cfd: 指向需要被 Flush 的列族的内部数据结构。
  • const FlushOptions& flush_options: Flush 操作的选项。其中两个最关键的标志是:
    • waittrue 表示调用线程需要阻塞等待,直到 Flush 操作完全结束。false 表示函数会立即返回,Flush 在后台异步进行。
    • allow_write_stalltrue 表示允许 Flush 操作导致写操作停顿。false 则会先检查是否会造成停顿,如果会,则可能提前返回。
  • FlushReason flush_reason: 发起此次 Flush 的原因(如:手动、自动、后台恢复等)。
  • bool entered_write_thread: 一个内部优化标志,如果调用者已经处于写线程上下文中,则为 true,避免重复进入。

详细执行流程分析

阶段一:前置检查与写停顿处理

这是函数的第一道防线,确保 Flush 操作可以在合适的状态下进行。

db_impl_compaction_flush.cc

// ... existing code ...
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,const FlushOptions& flush_options,FlushReason flush_reason,bool entered_write_thread) {// 该函数不应在 atomic_flush 模式下被调用assert(!immutable_db_options_.atomic_flush);// 如果是异步调用,且写控制器已停止,则无法执行,直接返回 TryAgainif (!flush_options.wait && write_controller_.IsStopped()) {std::ostringstream oss;oss << "Writes have been stopped, thus unable to perform manual flush. ""Please try again later after writes are resumed";return Status::TryAgain(oss.str());}Status s;// 如果用户不允许写停顿if (!flush_options.allow_write_stall) {bool flush_needed = true;// 等待直到 Flush 不会造成写停顿s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");// 如果等待出错,或者检查后发现不再需要 Flush,则直接返回if (!s.ok() || !flush_needed) {return s;}}
// ... existing code ...
  1. 断言检查: 首先断言 atomic_flush 为 false,确保此函数只用于非原子 Flush 场景。
  2. 写控制器检查: 如果是异步 Flush (wait == false),且写操作已被暂停,那么无法保证 Flush 能被调度,因此直接返回错误。
  3. 写停顿处理: 如果 allow_write_stall 为 false,会调用 WaitUntilFlushWouldNotStallWrites。RocksDB 有机制防止后台积压过多导致前台写操作停顿(Stall)。此调用会检查当前 Flush 是否会触发停顿,如果会,则等待;如果等待后发现由于其他后台任务的推进,当前列族已经不再需要 Flush,则直接返回成功。
阶段二:获取锁,切换 MemTable

这是 Flush 操作的核心准备阶段,在数据库全局锁的保护下进行。

// ... existing code ...const bool needs_to_join_write_thread = !entered_write_thread;autovector<FlushRequest> flush_reqs;autovector<uint64_t> memtable_ids_to_wait;{WriteContext context;// 获取数据库全局互斥锁InstrumentedMutexLock guard_lock(&mutex_);WriteThread::Writer w;WriteThread::Writer nonmem_w;// 进入写线程队列,确保与所有写操作串行化if (needs_to_join_write_thread) {write_thread_.EnterUnbatched(&w, &mutex_);if (two_write_queues_) {nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);}}// 等待任何已进入队列但在我们之前的写操作完成WaitForPendingWrites();// 如果当前的可写 MemTable 不为空,则进行切换if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load() ||IsRecoveryFlush(flush_reason)) {s = SwitchMemtable(cfd, &context);}
// ... existing code ...
  1. 加锁与排队:
    • 获取全局锁 mutex_,保证线程安全。
    • 调用 write_thread_.EnterUnbatched() 进入写线程队列。这保证了 Flush 操作与普通写操作(Put/Delete)的顺序一致性。
    • 调用 WaitForPendingWrites() 确保在自己之前的写操作都已经完成。
  2. 切换 MemTable:
    • 检查当前列族的 Mutable MemTable 是否有数据 (!cfd->mem()->IsEmpty())。
    • 如果需要 Flush,则调用 SwitchMemtable(cfd, &context)。这个关键函数会: a. 将当前正在写入的 Mutable MemTable 变成一个只读的 Immutable MemTable。 b. 将这个新的 Immutable MemTable 添加到该列族的 Immutable MemTable 列表中。 c. 创建一个全新的、空的 Mutable MemTable 供后续的写操作使用。
阶段三:创建并提交 Flush 请求

MemTable 切换完成后,并不会立即刷盘,而是创建一个请求,交给后台调度器。

// ... existing code ...const uint64_t flush_memtable_id = std::numeric_limits<uint64_t>::max();if (s.ok()) {// 如果切换成功,且确实有待刷盘的 Immutable MemTableif (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||!cached_recoverable_state_empty_.load() ||IsRecoveryFlush(flush_reason)) {// 创建一个 Flush 请求FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};flush_reqs.emplace_back(std::move(req));memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID(false /* for_atomic_flush */));}// 特殊处理:如果开启了统计信息持久化,可能会触发统计CF的Flushif (immutable_db_options_.persist_stats_to_disk) {// ... (此处省略了触发统计CF Flush的复杂逻辑)}}if (s.ok() && !flush_reqs.empty()) {for (const auto& req : flush_reqs) {// ...// 标记该列族的 Immutable MemTable 列表需要被 Flushloop_cfd->imm()->FlushRequested();}// 如果调用者要求等待,增加 cfd 的引用计数,防止在等待期间被其他线程删除if (flush_options.wait) {for (const auto& req : flush_reqs) {// ...loop_cfd->Ref();}}for (const auto& req : flush_reqs) {// ...// 将 Flush 请求放入待处理队列bool flush_req_enqueued = EnqueuePendingFlush(req);// ...}// 唤醒后台线程,检查是否有待处理的 Flush 或 Compaction 任务MaybeScheduleFlushOrCompaction();}// 退出写线程队列,释放锁if (needs_to_join_write_thread) {write_thread_.ExitUnbatched(&w);if (two_write_queues_) {nonmem_write_thread_.ExitUnbatched(&nonmem_w);}}} // 锁的作用域结束
// ... existing code ...
  1. 创建请求: 为需要 Flush 的列族创建一个 FlushRequest 对象,并记录下需要等待的 MemTable ID。
  2. 处理统计列族 (Stats CF): 这是一个有趣的细节。如果开启了 persist_stats_to_disk,为了防止统计列族持有过旧的 WAL 日志文件,当其他列族 Flush 时,可能会联动触发统计列族的 Flush。
  3. 标记与入队:
    • 调用 imm()->FlushRequested() 在列族元数据中打上一个“需要Flush”的标记。
    • 如果 flush_options.wait 为 true,则增加 cfd 的引用计数,防止在等待期间列族被删除。
    • 调用 EnqueuePendingFlush(req) 将请求放入一个全局的待处理队列。
  4. 调度: 调用 MaybeScheduleFlushOrCompaction(),此函数会检查待处理队列,并在需要时唤醒后台的 Flush/Compaction 线程池来执行任务。
  5. 释放锁: 退出写线程队列,并释放全局锁 mutex_。至此,前台线程的准备工作完成。
阶段四:等待与收尾

最后,根据 flush_options.wait 的值决定是立即返回还是等待后台任务完成。

// ... existing code ...// 通知监听器,一个手动 Flush 已被调度NotifyOnManualFlushScheduled({cfd}, flush_reason);TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");// 如果调用者要求等待if (s.ok() && flush_options.wait) {// ... (准备需要等待的cfd列表和memtable_id列表) ...// 阻塞等待,直到后台线程完成了指定的 Flush 任务s = WaitForFlushMemTables(cfds, flush_memtable_ids,flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */,flush_reason);InstrumentedMutexLock lock_guard(&mutex_);// 等待结束后,释放之前增加的引用计数for (auto* tmp_cfd : cfds) {tmp_cfd->UnrefAndTryDelete();}}TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");return s;
}
// ... existing code ...
  1. 通知: 调用 NotifyOnManualFlushScheduled 通知所有注册的监听器(Listener),一个 Flush 任务已经被调度。
  2. 等待: 如果 flush_options.wait 为 true,则调用 WaitForFlushMemTables。该函数会使用条件变量等同步原语,阻塞当前线程,直到后台线程完成了对指定 MemTable 的刷盘工作并通知唤醒。
  3. 清理: 等待结束后,释放之前为防止列族被删除而增加的引用计数。
  4. 返回状态: 返回最终的操作状态。

总结

DBImpl::FlushMemTable 是一个设计精良的协调函数,它完美地体现了 RocksDB 将前台操作与后台 I/O 分离的设计哲学。其完整流程可以概括为:

  1. 前置检查:确保操作的合法性和时机(避免写停顿)。
  2. 同步与准备:在全局锁的保护下,与写操作串行化,并将可写 MemTable 切换为只读状态。
  3. 调度:将具体的刷盘任务封装成请求,放入队列,并唤醒后台线程。
  4. 等待(可选):根据用户需求,提供同步等待或异步返回的能力。

通过这个流程,它高效地将内存数据移交给了后台处理,最大限度地减少了对前台写操作的影响。

http://www.dtcms.com/a/314813.html

相关文章:

  • 《C++多态详解:从虚函数到运行时动态绑定》
  • 强反射场景识别误差↓78%!陌讯多模态融合算法在水位监测的落地优化
  • Shell操作git,上传更新文档
  • Redshift 渲染器:GPU 加速渲染的高效之选
  • TGD第十一篇:卷积神经网络中的TGD特征
  • MS-DOS 常用指令集
  • OCR 精准识别验讫章:让登记与校验更智能
  • ssh连接VirtualBox中的Ubuntu24.04(win11、putty、NAT 模式)
  • 西门子PLC S7-1200单轴步进控制电动机
  • Exporters | 安装process_exporter
  • C语言:构造类型学习
  • 深入剖析Java Stream API性能优化实践指南
  • 【Django】-11- 后台管理界面定制
  • [机器学习]02-基于贝叶斯决策的鸢尾花数据集分类
  • 云原生攻防6(Kubernetes扩展知识)
  • 并发编程常用工具类(下):CyclicBarrier 与 Phaser 的协同应用
  • 政府财政行业云原生转型之路
  • 关于解决WinRiver项目动态XmlElement的序列化与反序列化的问题
  • 基于Java的AI工具和框架
  • PyTorch生成式人工智能(25)——基于Transformer实现机器翻译
  • spring boot开发中的资源处理等问题
  • RTOS如何保证实时性
  • 深圳南柯电子|电驱动系统EMC测试整改:“诊断-治疗-预防”方案
  • HTML5的新特性
  • 上位机知识篇---令牌
  • 如何选择合适的政务文本检测工具?
  • go 语言常见问题(2)
  • 宝塔面板安装WordPress教程:10分钟一键部署搭建个人博客 (2025)
  • 聊聊web前端的缓存问题
  • 金融专业高分简历撰写指南