RocksDB 核心入口:DB类源码解析
rocksdb::DB
class DB 是所有 RocksDB 数据库操作的统一入口和核心抽象基类。可以把它理解为与数据库实例进行交互的“遥控器”。它定义了所有用户可以调用的接口(API),但它本身不包含具体的实现逻辑。
从代码中可以看到,它是一个抽象类:
// ... existing code ...
// A DB is a persistent, versioned ordered map from keys to values.
// A DB is safe for concurrent access from multiple threads without
// any external synchronization.
// DB is an abstract base class with one primary implementation (DBImpl)
// and a number of wrapper implementations.
class DB {public:
// ...// Abstract class ctorDB() {}// No copying allowedDB(const DB&) = delete;void operator=(const DB&) = delete;
// ...// 很多纯虚函数,例如:virtual Status Put(const WriteOptions& options,ColumnFamilyHandle* column_family, const Slice& key,const Slice& value) = 0;
// ...
};
关键点:
- 抽象接口 (Interface):
DB类通过大量纯虚函数(virtual ... = 0;)定义了一套完整的操作规范。任何想要成为一个“RocksDB数据库”的类,都必须继承自DB并实现这些接口。 - 线程安全: 如注释所说,
DB的实现被设计为线程安全的。多个线程可以同时对同一个DB实例进行读写操作,无需外部加锁。 - 主要实现
DBImpl: RocksDB 官方提供了DB的主要实现类DBImpl(定义在db/db_impl/db_impl.h)。我们通常使用的就是DBImpl的实例。 - 可堆叠的装饰器: RocksDB 还有很多基于
DB的包装类(Wrapper),比如TransactionDB、BlobDB等。它们也继承自DB,内部持有一个DB指针(通常是DBImpl),在实现自己的特性(如事务、大 Value 存储)之外,将其他通用操作转发给内部的DB实例。这是一种典型的装饰器(Decorator)设计模式。
核心用法分析 (API 导览)
DB 类的公共接口(public:部分)可以分为以下几大类:
1. 打开和关闭数据库 (Lifecycle)
用户不能直接 new DB(),而是必须通过静态工厂方法 Open 系列来创建和加载一个数据库实例。
static Status Open(...): 这是最常用的方法,用于以读写模式打开数据库。- 它有多个重载版本,支持只打开默认列族,或同时打开多个列族(
ColumnFamily)。 - 参数
const Options& options或const DBOptions& db_options/const ColumnFamilyDescriptor& column_families用于传入丰富的配置项。 - 返回的数据库指针通过出参
std::unique_ptr<DB>* dbptr(推荐) 或DB** dbptr(已废弃) 获得。
- 它有多个重载版本,支持只打开默认列族,或同时打开多个列族(
static Status OpenForReadOnly(...): 以只读模式打开数据库。在此模式下,任何写操作都会失败,且后台的自动 Flush 和 Compaction 也不会进行。适用于数据分析、备份等场景。static Status OpenAsSecondary(...): 以从实例(Secondary Instance)模式打开。这是一种特殊的只读模式,从实例可以与主实例共享 SST 文件,并能通过TryCatchUpWithPrimary()方法追赶主实例的数据更新。常用于构建读写分离架构中的只读副本。static Status ListColumnFamilies(...): 在打开数据库之前,可以通过这个静态方法查询一个数据库路径下存在哪些列族。virtual Status Close(): 关闭数据库,释放资源。这是一个实例方法,在销毁DB对象前调用可以获取关闭过程中的状态。
2. 写操作 (Write Operations)
virtual Status Put(...): 写入或覆盖一个键值对。virtual Status Delete(...): 删除一个键。virtual Status SingleDelete(...): 一种优化的删除操作,用于确定 key 不会再被写入或Merge的场景。virtual Status Merge(...): 执行“读-修改-写”操作。需要用户预先定义一个MergeOperator。virtual Status Write(const WriteOptions&, WriteBatch* updates): 最重要、最强大的写操作接口。它通过WriteBatch实现原子性的批量写入。可以将多个Put,Delete,Merge操作打包在一起,一次性提交,保证它们要么全部成功,要么全部失败。这是保证数据一致性和提升写入性能的关键。
3. 读操作 (Read Operations)
virtual Status Get(...): 根据 Key 读取 Value。virtual void MultiGet(...): 一次性读取多个 Key 的 Value,比循环调用Get更高效。virtual bool KeyMayExist(...): 一个性能优化接口。它利用 Bloom Filter 等内存结构快速判断一个 Key 是否可能存在。如果返回false,则 Key 一定不存在;如果返回true,则 Key 可能存在,需要调用Get进一步确认。这可以避免对不存在的 Key 进行昂贵的磁盘 I/O。virtual Iterator* NewIterator(...): 创建一个迭代器,用于按序扫描(正序或逆序)一个 Key 区间内的数据。这是范围查询的基础。
4. 列族管理 (Column Family Management)
virtual Status CreateColumnFamily(...): 在数据库运行时动态创建一个新的列族。virtual Status DropColumnFamilies(...): 动态删除一个或多个列族。virtual Status DestroyColumnFamilyHandle(...): 销毁列族句柄,释放资源。
5. 数据库管理与监控
virtual const Snapshot* GetSnapshot(): 获取数据库在当前时间点的一个一致性视图(快照)。后续的读操作可以使用这个快照,读取到的数据不受之后写操作的影响。virtual Status Flush(...): 手动触发将内存中的 MemTable 数据刷写到磁盘(SST 文件)。virtual Status CompactRange(...): 手动触发对指定 Key 范围的数据进行 Compaction。virtual bool GetProperty(...)/GetIntProperty(...): 获取数据库的各种内部状态和统计信息。db.h中定义了大量的属性字符串常量(如kNumFilesAtLevelPrefix,kCurSizeAllMemTables等),用于查询不同维度的信息。
实现关联
如前所述,DB 是一个接口,它的行为最终由具体的实现类完成,主要是 DBImpl。
- 当你调用
DB::Open(...)时,其内部会创建一个DBImpl对象,并执行复杂的初始化流程:读取MANIFEST文件,恢复元数据,回放 WAL 日志等。成功后,将DBImpl对象的指针向上转型为DB*返回给你。 - 当你调用
db->Put(...)时,实际上是调用了DBImpl::Put(...)。DBImpl会协调整个写入流程:- 将操作写入 WAL (Write-Ahead Log) 以保证持久性。
- 将数据写入内存中的
MemTable。 - 当
MemTable写满后,调度后台线程将其 Flush 成磁盘上的 SST 文件。
- 当调用
db->Get(...)时,DBImpl::Get(...)会构建一个查找流程:- 先查找当前 active 的
MemTable。 - 再依次查找 immutable 的
MemTable列表。 - 最后在磁盘上的各层级 SST 文件中查找。
- 先查找当前 active 的
- 所有其他的
DB接口调用,都会最终路由到DBImpl中对应的实现函数,由DBImpl来调度和管理VersionSet,MemTableList,WalManager, 后台线程池等内部组件来完成任务。
写组(Write Group)机制
这是一种为了提高写入吞吐量而设计的并发优化策略。它的核心思想是:将多个并发的写入请求合并成一个大的请求,然后一次性地执行底层 I/O 操作(主要是写 WAL 日志),从而摊销 I/O 开销。
写组机制就像是让一辆大巴车(Leader)载着一车乘客(Follower)一起通过收费站,只缴一次费,效率就大大提高了。
在 WriteThread 类中,定义了 Writer 结构体和几个关键的状态,它们是理解写组的基础。
WriteThread::Writer: 每个发起写入请求的线程都会在自己的栈上创建一个Writer对象。这个对象封装了本次写入的所有信息,包括WriteBatch、WriteOptions、回调函数、状态等。它就像是每个乘客的车票和行李。Writer的状态 (w.state):STATE_INIT: 初始状态。STATE_GROUP_LEADER: 表示当前Writer对应的线程成为了写组的 Leader。STATE_COMPLETED: 表示写入已完成。对于 Follower 来说,这是它等待结束后的状态;对于 Leader 来说,这是它完成所有工作后的最终状态。- (还有其他用于 Pipelined Write 的状态,我们暂时不关注)。
下面我们以最经典的标准写入路径 DBImpl::WriteImpl 为例,一步步拆解 Leader 和 Follower 的协作过程。
进入写组 (JoinBatchGroup)
当一个线程调用 DBImpl::Write -> DBImpl::WriteImpl 时,它会执行以下关键代码:
// ... existing code ...WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,log_ref, disable_memtable, batch_cnt,pre_release_callback, post_memtable_callback, wbwi);write_thread_.JoinBatchGroup(&w);
// ... existing code ...
write_thread_.JoinBatchGroup(&w) 是整个协作的起点。WriteThread 内部维护了一个队列。
场景 A:队列为空
- 当前线程发现队列是空的,它就成为了 Leader。
- 它会将自己的
Writer对象(w)设置为 Leader,并将w.state标记为STATE_GROUP_LEADER。 - 然后,它不会立即开始工作,而是会短暂地等待一小段时间(通过
cv_.TimedWait()),看看有没有其他线程(Follower)紧随其后加入。这给了其他并发写请求一个“上车”的机会。
场景 B:队列已有 Leader
- 当前线程发现队列中已经有一个 Leader 了。
- 它就成为了 Follower。
- 它会将自己的
Writer对象(w)链接到 Leader 的Writer链表中。 - 然后,它会进入等待状态 (
cv_.Wait()),直到 Leader 完成所有工作并唤醒它。
Leader 的工作
当 Leader 的短暂等待时间结束,或者写组达到一定规模后,Leader 就开始工作了。
构建写组 (
EnterAsBatchGroupLeader): Leader 会遍历链表,将所有 Follower 的Writer对象收集起来,形成一个WriteGroup。这个WriteGroup就是一个Writer的集合。合并批处理 (
MergeBatch): Leader 会创建一个大的WriteBatch,然后遍历WriteGroup,将每个Writer的WriteBatch内容合并到这个大的WriteBatch中。分配序列号: Leader 会为整个合并后的大
WriteBatch一次性地从版本管理器(versions_)中申请一块连续的序列号(Sequence Number)。写入 WAL (
WriteToWAL): 这是最关键的 I/O 操作。Leader 将合并后的大WriteBatch一次性写入到 WAL 文件中。如果任何一个Writer的WriteOptions要求sync,Leader 就会在写完 WAL 后执行一次fsync,确保数据落盘。这是写组机制性能提升的核心所在:N 次写入被合并为 1 次 WAL 写入和最多 1 次fsync。写入 MemTable (
InsertInto): WAL 写入成功后,Leader 会将大WriteBatch中的数据应用到对应的 MemTable 中。发布序列号 (
SetLastSequence): 所有数据都写入 MemTable 后,Leader 会更新数据库的全局最新序列号。此时,这批写入的数据才对读请求完全可见。
结束与唤醒 (ExitAsBatchGroupLeader)
Leader 完成所有工作后,会执行收尾工作:
- 设置最终状态: Leader 将最终的执行结果(成功或失败的
Status)设置到WriteGroup中。 - 唤醒 Follower: Leader 会遍历
WriteGroup,将最终状态赋给每一个 Follower 的Writer对象,并将它们的状态设置为STATE_COMPLETED。然后,它会广播信号 (cv_.SignalAll()),唤醒所有正在等待的 Follower 线程。
Follower 的苏醒
被唤醒的 Follower 线程从 JoinBatchGroup 的等待中返回。它会发现自己的 w.state 已经变成了 STATE_COMPLETED。它从自己的 w 对象中获取最终的执行状态 w.FinalStatus(),然后从 WriteImpl 函数返回,就像是自己独立完成了写入一样。
总结
| 角色 | 职责 | 关键行为 |
|---|---|---|
| Leader | 成为写组的负责人,执行实际的合并、写入和同步操作。 | JoinBatchGroup (成为 Leader) -> EnterAsBatchGroupLeader -> MergeBatch -> WriteToWAL -> InsertInto -> ExitAsBatchGroupLeader (唤醒 Follower) |
| Follower | 将自己的写入请求(Writer)交给 Leader,然后等待 Leader 完成工作。 | JoinBatchGroup (成为 Follower) -> cv_.Wait() (等待) -> 被唤醒 -> 获取最终状态 -> 返回 |
DB::Put
这里考虑带有时间戳(timestamp)的版本,会以此为起点,并结合整个写入链路,详细解析 RocksDB 的一次写入决策过程。
整个写入过程可以概括为:API 调用 -> 构造 WriteBatch -> 核心写入逻辑(进入写队列、写 WAL、写 MemTable)-> 可能触发 Flush。
第 1 步: DBImpl::Put - API 入口与参数校验
首先,应用层调用的 DB::Put 是一个虚函数,其具体实现位于 DBImpl 类中。对于带时间戳的 Put 操作,入口是 DBImpl::Put。
db_impl_write.cc
// ... existing code ...
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,const Slice& key, const Slice& ts, const Slice& val) {const Status s = FailIfTsMismatchCf(column_family, ts);if (!s.ok()) {return s;}return DB::Put(o, column_family, key, ts, val);
}// ... existing code ...
- 功能: 这是
Put操作的第一个实现层。 - 核心逻辑:
FailIfTsMismatchCf(column_family, ts): 在执行任何操作之前,它会先进行一个关键校验。它会检查你传入的时间戳ts的大小是否与目标列族(Column Family)定义的时间戳大小一致。如果一个列族没有启用时间戳,或者时间戳大小不匹配,这里会直接返回错误,防止数据写入格式错误。return DB::Put(...): 校验通过后,它会调用基类DB中的同名便利方法。
第 2 步: DB::Put - 构造写入批处理 (WriteBatch)
接下来,执行流进入了一个定义在基类 DB 中的静态便利方法,它的主要作用是为单次 Put 操作创建一个 WriteBatch。
db_impl_write.cc
// ... existing code ...
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& ts, const Slice& value) {ColumnFamilyHandle* default_cf = DefaultColumnFamily();assert(default_cf);const Comparator* const default_cf_ucmp = default_cf->GetComparator();assert(default_cf_ucmp);WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key,default_cf_ucmp->timestamp_size());Status s = batch.Put(column_family, key, ts, value);if (!s.ok()) {return s;}return Write(opt, &batch);
}
// ... existing code ...
- 功能: 将单次的
Put操作封装成一个WriteBatch对象。RocksDB 中所有的写操作(Put, Delete, Merge)最终都会被统一成WriteBatch来处理。 - 核心逻辑:
- 获取默认列族信息: 它获取了默认列族的比较器(Comparator),并从中得到时间戳的大小
timestamp_size()。这个信息用来初始化WriteBatch,使其知晓如何处理时间戳。 - 创建
WriteBatch: 创建一个WriteBatch实例。WriteBatch本质上是一个内存缓冲区,用于暂存一个或多个写操作。 batch.Put(...): 调用WriteBatch的Put方法,将(column_family, key, ts, value)这组操作指令序列化并追加到WriteBatch的内部缓冲区rep_中。在内部,key 和 timestamp 会被拼接在一起,形成 RocksDB 的内部 key(Internal Key)。这是 RocksDB 实现时间戳功能的关键。return Write(opt, &batch): 最后,调用核心的DB::Write方法,将这个只包含一个Put操作的WriteBatch提交给数据库进行处理。
- 获取默认列族信息: 它获取了默认列族的比较器(Comparator),并从中得到时间戳的大小
第 3 步: DBImpl::Write - 写入操作的门户
// ... existing code ...
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {Status s;if (write_options.protection_bytes_per_key > 0) {s = WriteBatchInternal::UpdateProtectionInfo(my_batch, write_options.protection_bytes_per_key);}if (s.ok()) {s = WriteImpl(write_options, my_batch, /*callback=*/nullptr,/*user_write_cb=*/nullptr,/*wal_used=*/nullptr);}return s;
}
// ... existing code ...
- 功能: 这是
WriteBatch写入的公共入口。 - 核心逻辑:
- 数据保护 (
UpdateProtectionInfo): 如果用户在WriteOptions中设置了protection_bytes_per_key,这里会为WriteBatch中的每个 key 计算并更新校验和(checksum),用于防止数据损坏。 - 调用
WriteImpl: 将WriteBatch和WriteOptions传递给核心实现函数WriteImpl。其他参数(如 callback)在这里都传入nullptr,因为这个Write接口是最基础的版本。
- 数据保护 (
第 4 步: DBImpl::WriteImpl - 核心写入实现
这是整个写入流程的心脏。它非常复杂,包含了大量的配置检查、写入路径选择和核心的写组(Write Group)逻辑。
前置检查 (Pre-flight Checks)
在执行任何写入操作之前,WriteImpl 会进行一系列严格的检查,以确保配置的正确性和一致性。
db_impl_write.cc
// ... existing code ...
Status DBImpl::WriteImpl(const WriteOptions& write_options,WriteBatch* my_batch, WriteCallback* callback,UserWriteCallback* user_write_cb, uint64_t* wal_used,uint64_t log_ref, bool disable_memtable,uint64_t* seq_used, size_t batch_cnt,PreReleaseCallback* pre_release_callback,PostMemTableCallback* post_memtable_callback,std::shared_ptr<WriteBatchWithIndex> wbwi) {// ...if (my_batch == nullptr) {return Status::InvalidArgument("Batch is nullptr!");} else if (!disable_memtable &&WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) {// 如果要写入 MemTable,batch 必须包含时间戳return Status::InvalidArgument("write batch must have timestamp(s) set");} else if (write_options.rate_limiter_priority != Env::IO_TOTAL &&write_options.rate_limiter_priority != Env::IO_USER) {// 校验 rate_limiter_priority 的合法性return Status::InvalidArgument("WriteOptions::rate_limiter_priority only allows ""Env::IO_TOTAL and Env::IO_USER due to implementation constraints");// ... 还有很多其他检查 ...} else if (write_options.disableWAL &&immutable_db_options_.recycle_log_file_num > 0 &&!(two_write_queues_ && disable_memtable)) {// disableWAL 和 recycle_log_file_num > 0 不兼容return Status::InvalidArgument("WriteOptions::disableWAL option is not supported if ""DBOptions::recycle_log_file_num > 0");}// ...if (immutable_db_options_.enable_pipelined_write &&post_memtable_callback != nullptr) {// pipelined write 不支持 post_memtable_callbackreturn Status::NotSupported("pipelined write currently does not honor post_memtable_callback");}// ...
这里列举了一些关键检查:
my_batch不能为空。- 如果要写入 MemTable (
!disable_memtable),且列族开启了时间戳,那么WriteBatch中的数据必须带有时间戳。 - 对
WriteOptions中的各种选项进行合法性和兼容性校验,例如rate_limiter_priority、disableWAL与recycle_log_file_num的冲突、enable_pipelined_write与其他功能的冲突等。
写入路径选择
通过所有检查后,RocksDB 会根据配置选择不同的写入模式。
// ... existing code ...if (two_write_queues_ && disable_memtable) {// 路径 A: 双写队列,且只写 WAL (disable_memtable)// ...Status status = WriteImplWALOnly(/* ... */);// ...return status;}if (immutable_db_options_.enable_pipelined_write) {// 路径 B: 流水线写入 (Pipelined Write)return PipelinedWriteImpl(write_options, my_batch, callback, user_write_cb,wal_used, log_ref, disable_memtable, seq_used);}// 路径 C: 标准写入 (Standard Write)PERF_TIMER_GUARD(write_pre_and_post_process_time);WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,log_ref, disable_memtable, batch_cnt,pre_release_callback, post_memtable_callback, wbwi);// ... 后续进入写组逻辑 ...
// ... existing code ...
- 路径 A (
WriteImplWALOnly): 当启用了双写队列 (two_write_queues_) 并且当前写操作只写 WAL (disable_memtable,常见于 2PC 事务的prepare阶段) 时,会走这个专门的路径。 - 路径 B (
PipelinedWriteImpl): 当启用流水线写入 (enable_pipelined_write) 时,会进入此路径。这是一种性能优化,它将写 WAL 和写 MemTable 解耦,允许它们并行执行,从而降低写入延迟。 - 路径 C (Standard Write): 这是最经典、最通用的写入路径。它使用了写组 (Write Group) 机制来摊销 I/O 开销。我们重点分析这个路径。
WriteImplWALOnly
在 RocksDB 中,并非所有写操作都需要同时写入 WAL 和 MemTable。在以下关键场景中,只需要先将数据写入 WAL:
两阶段提交 (2PC - Two-Phase Commit): 在事务的
Prepare阶段,事务内容(WriteBatch)必须先持久化到 WAL 中。如果此时系统崩溃,重启后可以通过 WAL 恢复这个 "prepared" 状态的事务。只有在Commit阶段,一个简单的提交标记才会被写入,并更新 MemTable。WriteImplWALOnly正是为执行这个Prepare操作而生。无序写入 (
unordered_write): 这是一种性能优化选项。它将一次写入拆分为两个阶段:首先同步写入 WAL,然后异步写入 MemTable。WriteImplWALOnly执行第一阶段的同步写 WAL,允许客户端在数据持久化到日志后立即获得返回,从而降低延迟。双写队列 (
two_write_queues_): 当启用此选项时,RocksDB 会为 WAL-only 的写入操作启用一个独立的写队列 (nonmem_write_thread_)。这可以防止大的事务Prepare操作阻塞常规的、需要写 MemTable 的小操作,从而提高系统的并发性和响应能力。
我们先看一下函数的参数,这有助于理解它的功能:
db_impl_write.cc
Status DBImpl::WriteImplWALOnly(WriteThread* write_thread, const WriteOptions& write_options,WriteBatch* my_batch, WriteCallback* callback,UserWriteCallback* user_write_cb, uint64_t* wal_used,const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,const PublishLastSeq publish_last_seq, const bool disable_memtable)
WriteThread* write_thread: 要加入的写线程队列。WriteBatch* my_batch: 要写入的批处理数据。PreReleaseCallback* pre_release_callback: 一个非常关键的回调。它在 WAL 写入完成之后、函数返回之前被调用。在 2PC 场景中,Prepare的调用者通过这个回调来获取分配给这批数据的序列号(Sequence Number)。const AssignOrder assign_order: 枚举,决定是否需要为这批写入分配序列号。const PublishLastSeq publish_last_seq: 枚举,决定是否需要更新全局的LastSequence。在unordered_write模式下,写 WAL 后就需要更新,让数据可见;而在 2PC 的Prepare阶段则不需要,因为数据还未提交。const bool disable_memtable: 是否禁止写入 MemTable。在这个函数的大多数场景下,它都为true。
WriteImplWALOnly 的内部逻辑与标准写入路径一样,也采用了写组 (Write Group) 机制。
第 1 步: 加入写组
// ... existing code ...WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,log_ref, disable_memtable, sub_batch_cnt,pre_release_callback);
// ... existing code ...write_thread->JoinBatchGroup(&w);
// ... existing code ...if (w.state == WriteThread::STATE_COMPLETED) {if (wal_used != nullptr) {*wal_used = w.wal_used;}if (seq_used != nullptr) {*seq_used = w.sequence;}return w.FinalStatus();}// else we are the leader of the write batch groupassert(w.state == WriteThread::STATE_GROUP_LEADER);
// ... existing code ...
- 和标准写入一样,它首先创建一个
Writer对象来封装请求。 - 然后调用
JoinBatchGroup加入写组。 - 如果当前线程成为了 Follower,它会在此等待,直到 Leader 完成工作。
w.state == WriteThread::STATE_COMPLETED这个判断就是处理 Follower 的情况,它直接获取结果并返回。 - 如果成为了 Leader,则继续执行后续逻辑。
第 2 步: Leader 的准备工作
Leader 在正式写入前,会根据 publish_last_seq 的值选择不同的准备路径。
// ... existing code ...if (publish_last_seq == kDoPublishLastSeq) {// ...// 为 unordered_write 准备,不锁 DB 主锁Status status =PreprocessWrite(write_options, &wal_context, &write_context);// ...} else {// ...// 为 2PC Prepare 准备,需要锁 DB 主锁并检查写延迟InstrumentedMutexLock lock(&mutex_);Status status =DelayWrite(/*num_bytes=*/0ull, *write_thread, write_options);// ...}
// ... existing code ...
kDoPublishLastSeq路径 (用于unordered_write): Leader 直接调用PreprocessWrite,这个函数会处理 WAL 切换等逻辑,但它不会锁住整个数据库的mutex_。kDontPublishLastSeq路径 (用于 2PCPrepare): Leader 会先获取数据库的全局锁mutex_,然后调用DelayWrite进行写流控,确保不会因为大量事务Prepare写入而压垮系统。
第 3 步: Leader 执行写入
这是 Leader 的核心工作,负责将整个写组的数据写入 WAL。
// ... existing code ...WriteThread::WriteGroup write_group;uint64_t last_sequence;write_thread->EnterAsBatchGroupLeader(&w, &write_group);// 1. 聚合信息并更新统计数据for (auto* writer : write_group) {// ... 计算总大小 ...}// ... 更新各种统计信息,如 BYTES_WRITTEN ...// 2. 写入 WALPERF_TIMER_GUARD(write_wal_time);size_t seq_inc = 0;if (assign_order == kDoAssignOrder) {// ... 计算需要分配的序列号数量 ...seq_inc = total_batch_cnt;}Status status;if (!write_options.disableWAL) {IOStatus io_s = ConcurrentWriteGroupToWAL(write_group, wal_used,&last_sequence, seq_inc);status = io_s;// ... 错误检查 ...} else {// 如果禁用了 WAL,只分配序列号last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);}
// ... existing code ...
- 聚合与统计: Leader 遍历写组中的所有
Writer,计算总的写入字节数,并更新相关的性能统计数据。 - 写入 WAL:
- 计算需要分配的序列号总数
seq_inc。 - 调用
ConcurrentWriteGroupToWAL函数。这个函数是专门为并发写入 WAL 设计的,它会将整个write_group的WriteBatch合并成一个大块,一次性写入 WAL 文件,并原子地分配last_sequence。这是性能优化的关键。 - 如果
disableWAL为true,则跳过写日志,仅通过FetchAddLastAllocatedSequence来预留出序列号。
- 计算需要分配的序列号总数
第 4 步: Leader 的收尾工作
WAL 写入成功后,Leader 还需要完成一些重要的收尾工作。
// ... existing code ...// 3. 为每个 writer 分配具体的序列号auto curr_seq = last_sequence + 1;for (auto* writer : write_group) {// ...writer->sequence = curr_seq;// ...}// 4. 如果需要,同步 WALif (status.ok() && write_options.sync) {// ... 调用 SyncWAL() 或 FlushWAL(true) ...}// 5. 执行 PreReleaseCallbackif (status.ok()) {for (auto* writer : write_group) {if (!writer->CallbackFailed() && writer->pre_release_callback) {// ...Status ws = writer->pre_release_callback->Callback(writer->sequence, ...);// ...}}}// 6. 如果需要,发布 LastSequenceif (publish_last_seq == kDoPublishLastSeq) {versions_->SetLastSequence(last_sequence + seq_inc);}// 7. 退出写组,唤醒 Followerwrite_thread->ExitAsBatchGroupLeader(write_group, status);
// ... existing code ...
- 分配序列号: Leader 遍历写组,将连续的序列号分配给每个
Writer。 - 同步 WAL: 如果
WriteOptions要求同步 (sync=true),则执行fsync操作,确保日志落盘。 - 执行回调: 这是非常重要的一步。Leader 调用每个
Writer的pre_release_callback,并将分配到的序列号作为参数传入。这使得 2PC 的调用者可以得知Prepare操作成功,并记录下对应的序列号,以便后续Commit。 - 发布序列号: 如果是
unordered_write场景 (publish_last_seq == kDoPublishLastSeq),则更新全局的LastSequence,使得这批数据对读操作可见。 - 退出: Leader 调用
ExitAsBatchGroupLeader,将最终状态赋予组内所有Writer,并唤醒所有等待的 Follower 线程。
总结
DBImpl::WriteImplWALOnly 是一个为特定场景(2PC、unordered_write)设计的高效写入路径。它通过以下方式实现其目的:
- 重用写组机制: 与标准写入一样,通过批处理来摊销 I/O 成本。
- 专用写队列: 可选地使用独立线程池,避免与常规写入互相阻塞。
- 精细的流程控制: 通过
AssignOrder和PublishLastSeq参数,精确控制序列号的分配和可见性,以适应不同场景的需求。 - 关键的回调机制:
PreReleaseCallback是实现 2PCPrepare逻辑的核心,它在 WAL 持久化后、锁释放前提供了获取序列号的机会。
PipelinedWriteImpl
PipelinedWriteImpl 的核心思想是将一个写操作分解为两个主要阶段:
- WAL 写入阶段:由一个 leader 线程将一个批次(group)的所有写请求一次性写入 WAL 文件。
- MemTable 写入阶段:在 WAL 写入完成后,该批次中的各个写请求再被写入到 MemTable。
关键在于,当第 N 个批次正在进行 MemTable 写入时,第 N+1 个批次可以同时进行 WAL 写入。这种重叠执行(流水线)减少了线程等待,从而提高了整体的写入性能。
下面我们结合源代码,分步解析该函数的具体实现逻辑。
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,WriteBatch* my_batch, WriteCallback* callback,UserWriteCallback* user_write_cb,uint64_t* wal_used, uint64_t log_ref,bool disable_memtable, uint64_t* seq_used)
write_options: 本次写入操作的选项,如是否同步(sync)、是否禁用 WAL 等。my_batch: 需要写入的数据,封装在WriteBatch对象中。callback,user_write_cb: 用于在写入过程的不同阶段进行回调的接口。wal_used,log_ref,disable_memtable,seq_used: 其他控制参数和用于返回结果的指针。
入口与排队
所有写请求线程开始时都会调用 JoinBatchGroup。
// ... existing code ...WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,log_ref, disable_memtable, /*_batch_cnt=*/0,/*_pre_release_callback=*/nullptr);write_thread_.JoinBatchGroup(&w);TEST_SYNC_POINT("DBImplWrite::PipeldinedWriteImpl:AfterJoinBatchGroup");
// ... existing code ...
JoinBatchGroup 会将当前 Writer 放入一个队列中。如果队列是空的,它就成为 Leader (STATE_GROUP_LEADER);否则,它就是 Follower,并在此等待。
WAL 写入阶段 (组 N)
成为 Leader 的线程会执行以下代码块。假设这是 组 N 的 Leader。
这里N只是方便称呼,实际上只是 分了两个阶段。
// ... existing code ...if (w.state == WriteThread::STATE_GROUP_LEADER) {// ...// 1. 收集 Follower 形成一个写组 (wal_write_group)last_batch_group_size_ =write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);// 2. 乐观地更新全局序列号。这很重要,因为它解耦了序列号分配// 和 MemTable 写入,使得下一个组可以提前知道该用哪个序列号。const SequenceNumber current_sequence =write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;// ... (为组内所有 writer 分配序列号) ...write_thread_.UpdateLastSequence(current_sequence + total_count - 1);// ...// 3. 将整个组的数据写入 WALio_s = WriteGroupToWAL(wal_write_group, /* ... */);w.status = io_s;// ... (处理 WAL 同步等) ...// 4. *** 关键的流水线切换点 ***write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);}
// ... existing code ...
关键点在最后一步 ExitAsBatchGroupLeader。这个函数做了两件核心的事情:
- 唤醒组 N 的成员:它将
wal_write_group中所有 Follower 的状态改变,唤醒它们,使其可以进入接下来的 MemTable 写入阶段。 - 释放 Leader 角色:它释放了 WAL 阶段的 Leader 身份。这意味着在
JoinBatchGroup中等待的下一个线程(也就是 组 N+1 的 Leader)现在可以获得 Leader 身份,并开始执行if (w.state == WriteThread::STATE_GROUP_LEADER)这个代码块。
MemTable 写入阶段 (组 N) 与 WAL 写入阶段 (组 N+1) 并发执行
当组 N 的 Leader 调用 ExitAsBatchGroupLeader 之后:
组 N 的所有线程(包括前 Leader)被唤醒,它们的状态变为
STATE_MEMTABLE_WRITER_LEADER或STATE_PARALLEL_MEMTABLE_WRITER。它们会继续执行PipelinedWriteImpl函数中后面的逻辑,进入 MemTable 写入阶段。// ... existing code ...// 组 N 的线程会执行这里的代码if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {// ...// 串行或并行地将数据写入 MemTable// ...write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);}// ...if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {// ...// 并行地将自己的数据写入 MemTable// ...} // ... existing code ...与此同时,组 N+1 的 Leader 线程 已经通过了
JoinBatchGroup的等待,它的状态是STATE_GROUP_LEADER,因此它会进入同一个函数开头的if块,开始为组 N+1 执行收集成员、分配序列号、写入 WAL 的操作。
正是 ExitAsBatchGroupLeader 这个“交接棒”的动作,实现了流水线。
我们可以这样想象两个写组(Group N, Group N+1)的时间线:
| 时间点 | Group N (线程 A, B, C) | Group N+1 (线程 D, E, F) |
|---|---|---|
| T1 | 线程 A 成为 Leader,开始收集组员,准备写 WAL。 | 线程 D, E, F 在 JoinBatchGroup 中排队等待。 |
| T2 | 线程 A 将整个 Group N 的数据写入 WAL。 | 线程 D, E, F 仍在等待。 |
| T3 | 线程 A 调用 ExitAsBatchGroupLeader。 | 线程 D 立即成为新 Leader,开始为 Group N+1 写 WAL。 |
| T4 | 线程 A, B, C 开始并发地将各自数据写入 MemTable。 | 线程 D 正在将整个 Group N+1 的数据写入 WAL。 |
在 T4 这个时间点,两个组的不同阶段就在同时执行了,这就是流水线效应。这种设计极大地减少了线程的等待时间,提高了系统的整体写入吞吐量。
完成与返回
// ... existing code ...if (seq_used != nullptr) {*seq_used = w.sequence;}assert(w.state == WriteThread::STATE_COMPLETED);return w.FinalStatus();
}
最后,函数会断言 Writer 的状态为 STATE_COMPLETED,表示整个流程已走完,然后返回最终的执行状态 w.FinalStatus() 给调用者。
总结
PipelinedWriteImpl 通过精巧的 WriteThread 状态机设计,将写操作流水化:
- 写组聚合:多个并发写请求被聚合成一个组。
- WAL 阶段:一个 Leader 线程负责整个组的 WAL 写入,这是一个批处理 IO。
- MemTable 阶段:WAL 写入后,组内成员可以并发(或串行)地将数据写入 MemTable。
- 流水线效应:当一个组正在写 MemTable 时,下一个组已经可以开始写 WAL。这种重叠执行隐藏了 IO 延迟,是提升写入密集型场景性能的关键。
这个函数是 RocksDB 高性能写入能力的核心体现之一,展示了其在系统工程上的深度优化。
深入标准写入路径 (Write Group)
这是 RocksDB 提高写入吞吐量的核心机制。
创建 Writer 对象:
WriteThread::Writer w(...): 一个Writer对象被创建在当前线程的栈上。它像一个包裹,封装了本次写入的所有信息:WriteBatch、WriteOptions、各种回调函数等。
加入写组:
write_thread_.JoinBatchGroup(&w): 当前线程带着它的Writer包裹尝试加入写组。- Leader-Follower 模式:
- 如果写组是空的,当前线程成为 Leader。
- 如果写组已经有 Leader 了,当前线程成为 Follower,并将自己的
Writer对象链接到 Leader 的队列中,然后进入等待状态。
Leader 的工作流程:
- 合并批处理 (
MergeBatch): Leader 会遍历队列,将所有 Follower 的WriteBatch合并到自己的WriteBatch中,形成一个大的WriteBatch。 - 分配序列号 (Sequence Number): Leader 会为整个合并后的
WriteBatch从versions_中一次性申请一块连续的序列号。 - 写入 WAL (
WriteToWAL): Leader 将合并后的WriteBatch作为一个单独的记录写入到 WAL 文件。这是保证数据持久性的关键。如果WriteOptions要求sync,这里会触发一次fsync。 - 写入 MemTable (
WriteBatchInternal::InsertInto): WAL 写入成功后,Leader 会将WriteBatch中的每个操作(Put/Delete/Merge)应用到对应的 MemTable 中。如果配置了allow_concurrent_memtable_write,Leader 会启动多个线程并行地将数据写入不同的 MemTable,进一步提升性能。 - 更新最终序列号: 所有数据都写入 MemTable 后,Leader 会调用
versions_->SetLastSequence(),更新数据库的最新序列号。此时,这批写入的数据才对读操作可见。 - 唤醒 Follower: Leader 完成所有工作后,会唤醒所有等待的 Follower 线程。
- 合并批处理 (
Follower 的工作流程:
- Follower 线程在加入写组后就一直等待,直到被 Leader 唤醒。
- 唤醒后,它会从 Leader 那里获取整个写组的最终执行状态 (
w.FinalStatus())。 - 然后从
WriteImpl函数返回,完成自己的写入请求。
通过这种方式,多个并发的写入请求被组合成一次大的写入,大大减少了 WAL 的 I/O 次数(特别是 fsync),从而显著提高了整体的写入吞吐量。
DBImpl::Write 的执行流可以概括为:
- 入口 (
DBImpl::Write): 简单的封装和校验,调用WriteImpl。 - 核心 (
DBImpl::WriteImpl):- 前置检查: 进行大量的参数和配置校验。
- 路径分发: 根据配置选择标准写入、流水线写入或 WAL-only 写入。
- 标准写入(写组):
- 线程加入写组,选举出 Leader。
- Leader 负责合并所有
WriteBatch,分配序列号,统一写入 WAL 和 MemTable。 - Follower 等待 Leader 完成后被唤醒。
- 返回: 所有线程获取最终状态并返回。
这个流程清晰地展示了 RocksDB 如何通过批处理和精巧的并发控制来平衡数据一致性、持久性和高性能写入。
写入的连锁反应 - Flush(将 MemTable 的内容刷写到 SST 文件)
在 PipelinedWriteImpl 或 WriteImpl 的代码中,不会看到直接执行 flush 的代码,因为 flush 是一个被写入操作间接触发的、在后台异步执行的过程。
这个触发和调度的逻辑主要体现在 PreprocessWrite 函数中,它在数据被真正写入 MemTable 之前被调用。
下面是详细的流程:
无论是 PipelinedWriteImpl 还是 WriteImpl,在执行写入 WAL 和 MemTable 的核心逻辑之前,都会调用 PreprocessWrite 来做准备工作。
// ... existing code ...if (w.state == WriteThread::STATE_GROUP_LEADER) {// ...WalContext wal_context(!write_options.disableWAL && write_options.sync);// PreprocessWrite does its own perf timing.PERF_TIMER_STOP(write_pre_and_post_process_time);// 这是关键的调用点w.status = PreprocessWrite(write_options, &wal_context, &write_context);PERF_TIMER_START(write_pre_and_post_process_time);
// ... existing code ...
PreprocessWrite 的核心职责之一是调用 MakeRoomForWrite,确保当前的 active MemTable 有足够的空间来容纳即将写入的数据。
MakeRoomForWrite 的逻辑如下:
- 检查空间:判断当前 active MemTable 是否已满,或者是否因为其他原因(如
write_buffer_manager限制了总内存)需要强制切换。 - 切换 MemTable:如果需要,它会调用
SwitchMemtable。这个函数是flush机制的“扳机”:- 将当前的 active MemTable 变成 immutable MemTable(不可变内存表)。
- 将这个不可变的 MemTable 添加到
imm_列表中。 - 创建一个全新的、空的 MemTable 作为新的 active MemTable。
- 调度 Flush:在切换完成后,系统会调用
MaybeScheduleFlushOrCompaction。这个函数会检查imm_列表中是否有待刷写的 MemTable。
调度并执行 Flush
MaybeScheduleFlushOrCompaction:当此函数发现imm_列表中有新的不可变 MemTable 时,它会创建一个FlushJob。FlushScheduler:这个FlushJob会被提交给flush_scheduler_。可以在WriteBatchInternal::InsertInto的参数中看到它的身影,这表明它与写入流程紧密相关。// ... existing code ...w.status = WriteBatchInternal::InsertInto(write_group, current_sequence, column_family_memtables_.get(),&flush_scheduler_, &trim_history_scheduler_, // flush_scheduler 在这里 // ... existing code ...- 后台执行:RocksDB 的后台 flush 线程池会从
flush_scheduler_中获取这个FlushJob并执行它。这个 Job 的工作就是读取 immutable MemTable 的内容,并将其写入一个新的 SST 文件中。
所以,整个流程是这样的:
Write 请求 -> PipelinedWriteImpl -> PreprocessWrite -> MakeRoomForWrite -> 发现 MemTable 满了 -> SwitchMemtable -> 将旧 MemTable 变为 Immutable -> MaybeScheduleFlushOrCompaction -> 创建 FlushJob 并通过 flush_scheduler_ 调度 -> 后台线程执行 Flush。
因此,flush 操作与 write 操作是解耦的。write 操作负责触发,而真正的 flush I/O 操作由后台线程完成,这样可以避免阻塞前台的写请求,从而保证高写入性能。
总结
一次 DB::Put 的旅程如下:
封装 (DB::Put):
- 将单个
Put(key, value)操作封装成一个只包含一条记录的WriteBatch对象。 - 调用
DB::Write(WriteOptions, WriteBatch*)。
- 将单个
分发写入策略 (DBImpl::Write):
- 根据数据库配置(
unordered_write和enable_pipelined_write),决定是走流水线写入路径 (PipelinedWriteImpl) 还是传统写入路径 (WriteImpl)。
- 根据数据库配置(
核心写入路径 (以
PipelinedWriteImpl为例):- 加入写组:
- 每个写请求(
WriteBatch)被包装成一个Writer对象。 - 调用
write_thread_.JoinBatchGroup(&w),尝试加入一个写组。如果队列为空,当前Writer成为 WAL Leader;否则成为 Follower 并等待。
- 每个写请求(
- WAL 阶段 (由 WAL Leader 执行):
- 预处理 (
PreprocessWrite): 检查 MemTable 空间。如果空间不足,会触发SwitchMemtable,将当前 active MemTable 变为 immutable,并调度后台 Flush。 - 聚合: Leader 收集队列中所有的 Follower,形成一个
WriteGroup。 - 分配序列号: Leader 为
WriteGroup中的所有操作分配连续的SequenceNumber。 - 写入 WAL: Leader 将整个
WriteGroup的数据一次性写入 WAL 文件,实现 I/O 聚合。 - 释放 Leader: Leader 调用
ExitAsBatchGroupLeader,唤醒组内成员进入 MemTable 写入阶段,并释放 WAL Leader 角色。这是实现流水线的关键,下一个写组的 Leader 可以立即开始它的 WAL 阶段。
- 预处理 (
- MemTable 阶段 (由组内所有成员执行):
- 被唤醒的
Writer们会选出一个 MemTable Leader。 - 根据配置(
allow_concurrent_memtable_write),由 MemTable Leader 串行写入,或由所有Writer并发地将各自的WriteBatch插入到 MemTable 中。 - 更新版本: 最后一个完成 MemTable 写入的
Writer负责更新versions_->SetLastSequence(),使本次写入对读操作可见。
- 被唤醒的
- 完成: 所有阶段执行完毕,返回状态。
- 加入写组:
后台异步任务:
- Flush: 如果在预处理阶段触发了 MemTable 切换,后台的 Flush 线程会获取到这个 immutable MemTable,并将其内容刷写(Flush)到磁盘,生成一个新的 SST 文件。
- Compaction: Flush 产生新的 SST 文件可能会触发后台的 Compaction 任务,以合并和优化数据存储结构。
DB::PutEntity
简单来说,PutEntity 是 Put 的一个更通用、更强大的版本,主要用于支持 Wide Columns(宽列)特性。
DB::Put 是 RocksDB 传统的写入接口,它处理的是一个 key 对应一个 value 的简单模型。
// ... existing code ...
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& value) {// ...WriteBatch batch(/*...*/);Status s = batch.Put(column_family, key, value);// ...return Write(opt, &batch);
}
// ... existing code ...
它的作用非常直接:将一个 (key, value) 对包装成一个 WriteBatch,然后调用核心的 Write 方法。
DB::PutEntity 是为了支持“宽列”模型而引入的。在宽列模型中,一个 key 可以对应多个 "attribute"(属性)或 "column"(列),每个属性/列都有自己的名字和值。这在逻辑上类似于一个 key 对应一个小的 key-value map。
PutEntity 有两个主要的重载版本:
版本一:使用 WideColumns
它接受一个 key 和一个 WideColumns 对象。WideColumns 本质上是一个 (column_name, column_value) 对的集合。
// ... existing code ...
Status DB::PutEntity(const WriteOptions& options,ColumnFamilyHandle* column_family, const Slice& key,const WideColumns& columns) {
// ... existing code ...WriteBatch batch(/* ... */);const Status s = batch.PutEntity(column_family, key, columns);if (!s.ok()) {return s;}return Write(options, &batch);
}
// ... existing code ...
这个接口允许你一次性为一个 key 写入多个属性。例如,你可以为一个用户 ID(作为 key)同时写入他的姓名、年龄、地址等多个属性(作为 WideColumns)。
版本二:使用 AttributeGroups
这个版本更为灵活,它允许你为一个 key 写入跨越多个列族(Column Family)的属性。AttributeGroups 是 (ColumnFamilyHandle, WideColumns) 对的集合。
// ... existing code ...
Status DB::PutEntity(const WriteOptions& options, const Slice& key,const AttributeGroups& attribute_groups) {
// ... existing code ...WriteBatch batch(/* ... */);const Status s = batch.PutEntity(key, attribute_groups);if (!s.ok()) {return s;}return Write(options, &batch);
}
// ... existing code ...
关系总结
功能上的扩展:
PutEntity是Put的超集。Put可以看作是PutEntity的一个特例,即只写入一个默认名称的属性(value)。在 RocksDB 内部,一个简单的Put(key, value)最终可能会被当作写入一个(key, {"default_column", value})的实体来处理。实现上的相似性:从实现上可以看出,它们都遵循相同的模式:
- 创建一个
WriteBatch。 - 调用
WriteBatch对应的方法(batch.Put或batch.PutEntity)来填充WriteBatch。 - 调用核心的
DB::Write方法来执行这个WriteBatch。
- 创建一个
使用场景:
- 当你只需要存储简单的 key-value 数据时,使用
DB::Put更直观、更简单。 - 当你的数据模型更复杂,一个主键(key)需要关联多个命名的属性值时,
DB::PutEntity提供了更强大和结构化的方式来组织数据,避免了将多个属性手动序列化到一个 value 中的麻烦。
- 当你只需要存储简单的 key-value 数据时,使用
总而言之,PutEntity 是为了适应更复杂的数据模型而对 Put 进行的现代化扩展。两者共享底层的写入路径和机制。
DB::Merge
Merge 是 RocksDB 提供的一种非常有特色的写操作。与 Put(覆盖写)和 Delete(删除)不同,Merge 是一种**“读取-修改-写入”**(Read-Modify-Write)的原子操作。它允许用户定义一个合并函数(MergeOperator),当对一个 key 执行 Merge 操作时,RocksDB 会将新的值(称为 operand,操作数)与已存在的旧值进行合并,生成一个新值并写回。
DB::Merge 有两个主要的重载版本,一个带时间戳,一个不带。它们是暴露给用户的最顶层接口。
db_impl_write.cc
// ... existing code ...
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& value) {WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);Status s = batch.Merge(column_family, key, value);if (!s.ok()) {return s;}return Write(opt, &batch);
}Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& ts, const Slice& value) {ColumnFamilyHandle* default_cf = DefaultColumnFamily();assert(default_cf);const Comparator* const default_cf_ucmp = default_cf->GetComparator();assert(default_cf_ucmp);WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key,default_cf_ucmp->timestamp_size());Status s = batch.Merge(column_family, key, ts, value);if (!s.ok()) {return s;}return Write(opt, &batch);
}
// ... existing code ...
分析:
- 统一的模式: 和
Put、Delete一样,Merge的顶层实现遵循一个标准模式:- 创建一个
WriteBatch对象。 - 调用
WriteBatch相应的Merge方法,将Merge操作记录到WriteBatch中。 - 调用核心的
DB::Write方法,将这个WriteBatch提交给写入流水线。
- 创建一个
- 原子性保证: 将
Merge操作封装进WriteBatch是保证其原子性的第一步。后续的写入流程会确保WriteBatch中的所有操作要么全部成功,要么全部失败。
实现层检查 (DBImpl::Merge)
在 DBImpl 中,对 DB::Merge 进行了进一步的封装和检查,确保调用 Merge 的前提条件是满足的。
// ... existing code ...
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,const Slice& key, const Slice& val) {const Status s = FailIfCfHasTs(column_family);if (!s.ok()) {return s;}auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);if (!cfh->cfd()->ioptions().merge_operator) {return Status::NotSupported("Provide a merge_operator when opening DB");} else {return DB::Merge(o, column_family, key, val);}
}Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,const Slice& key, const Slice& ts, const Slice& val) {const Status s = FailIfTsMismatchCf(column_family, ts);if (!s.ok()) {return s;}return DB::Merge(o, column_family, key, ts, val);
}
// ... existing code ...
分析:
- 前置条件检查: 这是
DBImpl层的主要职责。FailIfCfHasTs: 检查不带时间戳的Merge是否用在了启用了时间戳的列族上,这是不允许的。FailIfTsMismatchCf: 检查带时间戳的Merge提供的时间戳大小是否和列族定义的一致。merge_operator检查: 这是最核心的检查。Merge操作必须依赖用户在打开数据库时提供的merge_operator。如果没有提供,调用Merge会直接返回NotSupported错误。
- 调用基类: 检查通过后,它会调用基类
DB::Merge的实现,也就是我们第一部分分析的逻辑,进入WriteBatch流程。
Merge 操作的本质:延迟计算
Merge 的一个关键特性是延迟计算。当调用 DB::Merge 时,RocksDB 并不会立即执行合并操作。它只是将一个类型为 kTypeMerge 的记录写入 WAL 和 MemTable。这条记录包含了 key 和新的操作数(operand)。
真正的合并计算发生在以下两个时机:
读取时 (Read Path): 当你调用
DB::Get读取一个 key 时,如果 RocksDB 发现这个 key 的最新版本是一系列的Merge操作数,它会:首先找到这个 key 的一个基础值(一个
Put操作或者不存在)。然后按顺序应用所有堆叠在基础值之上的
Merge操作数,通过调用merge_operator->FullMergeV2(...)来计算出最终结果。这个过程被称为 "Merge on Read"。
Compaction 时 (Compaction Path): 为了避免读取时合并过多的操作数导致性能下降,Compaction 过程也会执行合并。 a. 当 Compaction 处理到一个 key 时,它会收集这个 key 在不同 SST 文件和层级中的所有
Put和Merge记录。 b. 它会调用merge_operator将这些记录合并成一个最终的Put记录。 c. 这个最终的Put记录会被写入到新的 SST 文件中。这样,未来的读取就不再需要进行合并计算了。
MergeOperator
这是用户实现 Merge 逻辑的地方。用户需要继承 MergeOperator 类并实现其核心方法,主要是 FullMergeV2 或 PartialMergeMulti。
FullMergeV2: 定义了如何将一个基础值和一系列操作数合并成一个新值的逻辑。这是最常用的合并方法。PartialMergeMulti: 定义了如何将两个操作数合并成一个操作数的逻辑。这是一种优化,可以在 Compaction 期间提前合并部分操作数,减少FullMergeV2的负担。
例如,一个用于实现 "计数器" 的 MergeOperator 可能会将所有操作数(字符串形式的数字)解析成整数,然后求和,最后将结果转回字符串作为新的 value。
使用场景
基础场景:计数器 (Counter)
这是最经典、最直观的 Merge 应用场景。
- 场景描述: 需要对某个 key 关联的数值进行频繁的加/减操作。例如,统计网页的点击次数、用户收到的点赞数、商品的库存变化等。
- 不使用 Merge 的做法:
value = DB::Get(key)new_value = value + 1DB::Put(key, new_value)
- 问题: 这个过程需要一次读和一次写,并且需要客户端加锁来保证并发修改的原子性,非常低效且复杂。
- 使用 Merge 的做法:
- 定义一个
MergeOperator,其逻辑是将旧值(字符串)和操作数(operand,比如 "1")都解析为数字,相加后再转为字符串。 - 客户端只需要调用
DB::Merge(key, "1")。
- 定义一个
- 优势: 客户端操作被简化为单次、快速的写入。数据库利用 WAL 保证了原子性,并将合并计算推迟,使得写入性能极高。
数据结构场景:集合/列表的构建
Merge 可以用来高效地构建一个与 key 关联的集合或列表类型的数据。
- 场景描述: 需要为一个 key 关联一个列表或集合,并频繁地向其中添加新元素。例如,记录一个用户关注的所有人的 ID 列表、一个商品的所有标签集合。
- 不使用 Merge 的做法:
list_str = DB::Get(key)list = deserialize(list_str)list.add(new_element)new_list_str = serialize(list)DB::Put(key, new_list_str)
- 问题: 每次添加都需要读取、反序列化、修改、序列化、写入整个列表,当列表很长时,开销巨大。
- 使用 Merge 的做法:
- 定义一个
MergeOperator,其逻辑是将旧值(序列化的列表)和操作数(新元素)合并。例如,简单地将新元素追加到旧值字符串的末尾,用特殊分隔符隔开。 - 客户端只需调用
DB::Merge(key, new_element_str)。
- 定义一个
- 优势: 写入非常快。真正的列表合并发生在读取时或后台 Compaction 时。这对于写多读少的场景(例如,日志记录)尤其高效。
JSON 文档的部分更新
Merge 可以模拟对 JSON 或类似半结构化文档的局部更新(JSON Patch)。
- 场景描述: 存储一个大的 JSON 对象作为 value,但经常只需要修改其中的一两个字段。
- 不使用 Merge 的做法: 读取整个 JSON,在内存中解析并修改,然后写回整个新的 JSON。
- 使用 Merge 的做法:
- 定义一个
MergeOperator,其合并逻辑遵循 JSON Patch 规范(RFC 6902)。 - 操作数(operand)是一个描述如何修改的 JSON Patch 对象,例如
{"op": "replace", "path": "/baz", "value": "boo"}。 - 客户端调用
DB::Merge(key, patch_str)。
- 定义一个
- 优势: 避免了读取和重写整个大 value 的开销,写入操作数非常小,速度很快。
时间序列数据的聚合
Merge 可以用于时间序列数据的预聚合。
- 场景描述: 收集大量的时序数据点(例如,某台机器每秒的 CPU 使用率),希望在存储时就能进行一些聚合,比如计算一分钟内的最大/最小/平均值。
- 不使用 Merge 的做法: 客户端需要维护状态,攒够一分钟的数据,计算完再
Put。这会增加客户端的复杂性和状态丢失的风险。 - 使用 Merge 的做法:
- Key 可以是
machine_id + minute_timestamp。 - 定义一个
MergeOperator,其逻辑是解析旧值(例如,一个包含min,max,sum,count的结构体)和操作数(新的 CPU 使用率),然后更新这个结构体。 - 每一秒都直接调用
DB::Merge(key, new_cpu_usage)。
- Key 可以是
- 优势: 客户端无状态,写入延迟低。数据库在后台 Compaction 时会自动完成聚合计算,最终将一分钟内的所有操作数合并成一个聚合后的结果。这极大地简化了客户端逻辑,并利用了数据库的持久化和并发能力。
Merge 的本质与局限
Merge 的本质是将本应在客户端完成的逻辑下沉到存储层,利用存储层的原子性和并发控制来简化应用逻辑并优化性能。
局限性:
- 逻辑限制:
MergeOperator的逻辑应该是无状态的,并且最好满足交换律和结合律,这样 Compaction 时的合并顺序才不会影响最终结果。 - 读取性能: 如果一个 key 有大量的
Merge操作数堆积而没有被 Compaction 及时合并,那么读取这个 key 时(Merge on Read)的延迟会很高。因此,它不适合读性能要求极高的场景,除非能保证 Compaction 足够频繁。 - 复杂性: 实现一个正确且高效的
MergeOperator比简单的Put/Get要复杂,需要仔细考虑各种边界情况。
综上所述,Merge 是一个强大的高级特性,适用于各种需要原子性“读取-修改-写入”的场景,特别是当写入性能是主要瓶颈时。它通过牺牲一定的读取性能和增加实现复杂性,换来了极高的写入吞吐量和简化的客户端逻辑。
总结
DB::Merge 的完整旅程可以概括为:
- API 调用: 用户调用
DB::Merge(key, operand)。 - 实现层检查:
DBImpl::Merge检查merge_operator是否存在以及时间戳是否匹配。 - 封装:
DB::Merge将操作封装进WriteBatch。 - 写入流水线:
DB::Write将WriteBatch提交,最终一条kTypeMerge记录被写入 WAL 和 MemTable。此时没有发生真正的合并计算。 - 延迟合并:
- 读取时:
DB::Get触发 "Merge on Read",实时计算出结果返回给用户。 - Compaction 时: 后台 Compaction 任务会将多个
Merge操作数和一个基础值合并成一个最终的Put值,并写入新的 SST 文件,以优化未来的读取性能。
- 读取时:
这种设计将昂贵的“读取-修改-写入”操作的计算部分推迟到后台(Compaction)或必要时(读取)执行,从而极大地优化了写入路径的性能,使得 Merge 操作的写入速度几乎和 Put 一样快。
DB::Write
这个函数最主要的作用是将一个 WriteBatch 对象中包含的所有写操作(Put, Delete, Merge 等)作为一个原子单元提交给数据库。
WriteBatch: 这是一个“写入批次”的容器。你可以先创建一WriteBatch对象,然后向其中添加多个Put、Delete或Merge操作。- 原子性 (Atomicity): 当你调用
DB::Write并传入这个WriteBatch时,RocksDB 会保证这个批次里的所有操作要么全部成功写入,要么在发生错误时全部失败(不会出现只写入了一部分的情况)。这对于维护数据一致性至关重要。
在 RocksDB 中,所有独立的写操作接口,如 Put, Delete, Merge, SingleDelete 等,实际上都是 DB::Write 的“语法糖”。它们内部的实现逻辑都是:
- 创建一个只包含一个操作的
WriteBatch。 - 调用
DB::Write来执行这个WriteBatch。
我们可以从源码中清晰地看到这一点,例如 DB::Merge 的实现:
// ... existing code ...
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key, const Slice& value) {// 1. 创建一个 WriteBatchWriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);// 2. 向 Batch 中添加一个 Merge 操作Status s = batch.Merge(column_family, key, value);if (!s.ok()) {return s;}// 3. 调用核心的 Write 方法return Write(opt, &batch);
}
// ... existing code ...
这表明 DB::Write 是整个写入体系的基础。
实现探究 (DBImpl::Write)
DB 是一个抽象基类,真正的实现在 DBImpl 中。
// ... existing code ...
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {Status s;if (write_options.protection_bytes_per_key > 0) {s = WriteBatchInternal::UpdateProtectionInfo(my_batch, write_options.protection_bytes_per_key);}if (s.ok()) {// 调用更底层的 WriteImpls = WriteImpl(write_options, my_batch, /*callback=*/nullptr,/*user_write_cb=*/nullptr,/*wal_used=*/nullptr);}return s;
}
// ... existing code ...
从 DBImpl::Write 的实现可以看出:
- 它是一个相对薄的封装。
- 它会做一些预处理,比如根据
WriteOptions更新WriteBatch的校验信息。 - 最终,它会调用一个更为通用和强大的内部函数
WriteImpl(或者根据配置走PipelinedWriteImpl流程),这个WriteImpl才是我们之前详细分析过的、包含进入写组、写 WAL、写 MemTable等复杂逻辑的真正核心。
使用 DB::Write 进行批量写入主要有两个巨大的优势:
性能 (Performance): 写入1000条数据,调用1000次
Put和调用1次Write(传入包含1000个操作的WriteBatch),后者的性能会高出几个数量级。这是因为批量写入可以:- 减少 WAL 写入次数: 整个批次只需要写一次预写日志(WAL)。
- 减少锁竞争: 整个批次只需要获取一次锁,进入一次写组。
- 摊销开销: 将线程调度、内存分配等固定开销摊销到批次中的每一个操作上。
一致性 (Consistency): 保证了多个相互关联的写操作的原子性。例如,在银行转账场景中,“A账户减钱”和“B账户加钱”这两个操作必须被打包在一个
WriteBatch中,通过一次DB::Write调用来完成,以确保它们同时成功或同时失败。
总结
Status DB::Write(const WriteOptions& options, WriteBatch* updates) 不仅仅是提供了批量写的能力,它更是 RocksDB 写入操作的基石。它通过 WriteBatch 提供了原子性和高性能的保证,并且是所有其他上层写入API(如Put, Delete)最终依赖的核心入口。
RocksDB 中的三种删除操作
Delete、SingleDelete 和 DeleteRange。这三者虽然都用于删除数据,但它们的机制、性能特点和适用场景有很大不同。
| 特性 | Delete | SingleDelete | DeleteRange |
|---|---|---|---|
| 操作粒度 | 单个 Key | 单个 Key | 一个 Key 的范围 [begin, end) |
| 内部机制 | 写入一个标准的“墓碑”(Tombstone)标记 | 写入一个带优化提示的特殊“墓碑” | 写入一个“范围墓碑”(Range Tombstone) |
| 写入开销 | 低 | 低 | 极低(删除大量数据时) |
| 读取影响 | 几乎无影响 | 几乎无影响 | 有一定影响(迭代器需要检查是否被范围覆盖) |
| 核心优势 | 通用、安全地删除单个键 | 在特定条件下,Compaction 效率更高 | 删除大量连续数据时效率最高 |
| 使用约束 | 无 | 用户需保证同一批次中没有对该 Key 的 Put 操作 | WriteBatchWithIndex 等场景不支持 |
| 最佳场景 | 删除单个、已存在的 Key | “盲删”场景,即不确定 Key 是否存在就删除 | 删除大量连续数据,如按时间清理旧数据 |
Delete:通用的单点删除
Delete 是最标准、最常用的删除单个 key 的方法。
当调用 DB::Delete(key) 时,RocksDB 并不会立即去磁盘上找到并删除这个数据。相反,它会在 MemTable 和后续的 WAL 中写入一个针对这个 key 的特殊标记,我们称之为**“墓碑(Tombstone)”。
- 写入路径: 和
Put类似,Delete操作也是通过创建一个WriteBatch并调用DB::Write来完成的。// ... existing code ... Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,const Slice& key) {WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);Status s = batch.Delete(column_family, key);if (!s.ok()) {return s;}return Write(opt, &batch); } // ... existing code ... - 读取路径: 当用户查询这个
key时,RocksDB 会看到这个墓碑标记,并向用户返回Status::NotFound,即使在旧的 SST 文件中仍然存在这个key的旧值。 - Compaction: 这个墓碑会像普通数据一样,随着 Compaction 过程在 LSM 树中下沉。直到这个墓碑到达最底层(Bottommost Level),并且可以确定更低层(不存在)没有这个 key 的更旧版本时,这个墓碑和它覆盖的所有旧值才会被真正地物理删除。
适用于绝大多数需要删除单个 key 的情况。
SingleDelete:优化的单点删除
SingleDelete 的使用有一个非常严格的契约(contract):它假定你要删除的 key,在被删除之前只被写入(Put)过一次。
如果你违反了这个前提,比如对一个 key 多次 Put 后再使用 SingleDelete,或者将 SingleDelete 和 Delete、Merge 混用在同一个 key 上,其行为是未定义的(undefined behavior),可能会导致数据不一致。
这一点在多个代码文件中都有体现,尤其是 Java 的接口文档中解释得非常清楚:
WriteBatchInterface.java
// ... existing code .../*** Remove the database entry for {@code key}. Requires that the key exists* and was not overwritten. It is not an error if the key did not exist* in the database.* <p>* If a key is overwritten (by calling {@link #put(byte[], byte[])} multiple* times), then the result of calling SingleDelete() on this key is undefined.* SingleDelete() only behaves correctly if there has been only one Put()* for this key since the previous call to SingleDelete() for this key.* <p>* This feature is currently an experimental performance optimization* for a very specific workload. It is up to the caller to ensure that* SingleDelete is only used for a key that is not deleted using Delete() or* written using Merge(). Mixing SingleDelete operations with Deletes and* Merges can result in undefined behavior.*
// ... existing code ...
HISTORY.md 文件也强调了这一点:
// ... existing code ...
# Rocksdb Change Log
## 7.3.0 (05/20/2022)
### Behavior changes
* Enforce the existing contract of SingleDelete so that SingleDelete cannot be mixed with Delete because it leads to undefined behavior. Fix a number of unit tests that violate the contract but happen to pass.
// ... existing code ...
SingleDelete 的性能优势体现在 Compaction 过程中。由于它假定只有一个 Put 操作与之对应,Compaction 时就可以进行一种称为“操作数抵消”(operand cancellation)的优化。当 Compaction 过程同时遇到一个 Put 操作和其对应的 SingleDelete 操作时,可以直接将这两个操作同时安全地移除,而不需要像 Delete 那样去查找并清理所有可能的旧版本。这减少了 Compaction 的工作量,从而提升了性能。
从 DBImpl 的实现来看,Delete 和 SingleDelete 最终都会创建一个 WriteBatch 并写入。
而 DB::SingleDelete 的默认实现会调用 WriteBatch::SingleDelete:
// ... existing code ...
Status DB::SingleDelete(const WriteOptions& opt,ColumnFamilyHandle* column_family, const Slice& key) {WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);Status s = batch.SingleDelete(column_family, key);if (!s.ok()) {return s;}return Write(opt, &batch);
}
// ... existing code ...
WriteBatch 内部会将 Delete 和 SingleDelete 标记为不同的操作类型(kTypeDeletion vs kTypeSingleDeletion),以便在后续处理(如 Compaction)中应用不同的逻辑。
DeleteRange:高效的范围删除
DeleteRange 是为了解决删除大量连续 key 的性能问题而设计的。
在 DeleteRange 出现之前,要删除一个范围的数据,主要有两种方式:
- 遍历并删除 (Scan-and-Delete):创建一个迭代器,遍历整个范围,并对每个 key 调用
Delete。这种方式非常慢,因为它涉及到大量的读 I/O 和写操作,会产生海量的独立墓碑,严重拖慢后续的迭代器性能。 - 自定义 Compaction Filter:通过过滤器在后台 Compaction 时删除数据。这种方式是异步的,无法保证读者能立即看不到被删除的数据。
DeleteRange 提供了一个原生的、原子性的范围删除操作,解决了以上痛点。
2018-11-21-delete-range.markdown
// ... existing code ...
## Motivation
### Existing mechanisms: challenges and opportunitiesThe most common pattern we see is scan-and-delete, i.e., advance an iterator
through the to-be-deleted range, and issue a `Delete` for each key. This is
slow (involves read I/O) so cannot be done in any critical path. Additionally,
it creates many tombstones, which slows down iterators and doesn't offer a deadline
for space reclamation.
// ... existing code ...
DeleteRange 会在数据库中创建“范围墓碑”(range tombstones)。当读取数据时,无论是点查询(Get)还是范围扫描(Iterator),都必须检查一个 key 是否被某个范围墓碑所覆盖。
这里的核心挑战是:范围墓碑之间可能会重叠。例如,先删除 [a, z],再删除 [f, m]。为了高效判断一个 key 是否被删除,就需要一种方法来处理这些重叠的范围。这就是 "Skyline" 算法的用武之地。
这个概念源于一个经典的计算几何问题:给定一组矩形,计算它们顶部轮廓线。在 RocksDB 中,这被引申为:将所有重叠的、有效的范围墓碑合并成一个不重叠的、连续的“天际线”。
DeleteRange 的实现演进 (v1 vs v2)
根据官方博客,DeleteRange 的内部实现经历了重要的演进,这直接影响了 Skyline 的构建方式和性能表现。
v1 设计 (早期版本):
- 实现方式: 当创建一个迭代器并进行
Seek时,RocksDB 会收集所有相关的范围墓碑,并一次性构建一个全局的 "Skyline"。 - 性能:
- 优点: 一旦 Skyline 构建完成,后续的
Next()/Prev()操作就很快,因为只需要对照这个已经计算好的 Skyline 即可。 - 缺点: 迭代器创建和首次定位的开销巨大。构建 Skyline 的过程需要对所有墓碑进行排序和合并,复杂度为
O(T*log(T))(T 为墓碑数量),这会导致长范围扫描的启动延迟非常高。点查询性能也很差,需要线性扫描所有相关的墓碑。
- 优点: 一旦 Skyline 构建完成,后续的
2018-11-21-delete-range.markdown
// ... existing code ...
## v1: Getting it to work
### Read pathIn iterators, we aggregate range tombstones into a skyline as we visit live
memtable, immutable memtables, and SSTs. The skyline is expensive to construct but fast to determine whether a key is covered. The skyline keeps track of the most recent range tombstone found to optimize `Next` and `Prev`.
// ... existing code ...
### Performance characteristics
// ... existing code ...
When an iterator is first created and seeked, we construct a skyline over its
tombstones. This operation is O(T*log(T)) where T is the number of tombstones
// ... existing code ...
v2 设计 (当前主流版本):
为了解决 v1 的性能问题,特别是读性能,v2 设计被提出。
实现方式: v2 的核心思想是放弃构建全局 Skyline,转而进行局部处理和缓存。
- 在 SST 文件级别,范围墓碑被处理成已排序且不重叠的“片段” (fragments)。这使得在单个 SST 文件内可以通过二分查找快速定位覆盖某个 key 的墓碑。
- 当迭代器进行范围扫描时,它会为每个相关的 SST 文件(和 Memtable)创建一个范围墓碑的子迭代器,并将它们组合起来。在迭代过程中,动态地、增量地判断当前 key 是否被任何一个子迭代器中的墓碑覆盖。这可以看作是一种**“隐式的”或“懒加载的”Skyline**。
性能:
- 优点:
- 迭代器创建和首次定位非常快,因为它避免了 v1 中昂贵的全局构建过程。
- 点查询和短距离扫描性能大幅提升,因为可以利用二分查找。
- 缺点:
- 对于长距离扫描,由于需要同时管理多个子迭代器并在每一步都进行比较,其开销可能会比 v1 构建好全局 Skyline 后的情况要高。
HISTORY.md中也提到了针对 v2 设计的持续性能优化,例如让迭代器在可能的情况下直接跳过整个被删除的范围,而不是逐一检查其中的 key。
- 优点:
2018-11-21-delete-range.markdown
// ... existing code ...
## v2: Making it fast
### RepresentationsThe key idea of the redesign is that, instead of globally collapsing range tombstones,we can locally “fragment” them for each SST file and memtable to guarantee that:* no range tombstones overlap; and
* range tombstones are ordered by start key.Combined, these properties make range tombstones binary searchable.
// ... existing code ...
Skyline 算法和范围墓碑处理的复杂逻辑 深植于 RocksDB 的内部,主要涉及:
- 迭代器实现 (
db/db_iter.cc,table/iterator_wrapper.h等): 在Seek()和Next()等操作中融合范围墓碑的检查逻辑。 - SST 文件格式和读取 (
table/block_based/block_based_table_reader.cc): 如何存储、缓存和查询范围墓碑的“片段”。 - Compaction 过程: 如何在合并数据时处理和最终清理掉范围墓碑。
这些部分的源码非常复杂,但其设计思想与上述博客文章中描述的 v2 版本一致。
这是删除大量连续数据的最佳方式。典型的例子包括:
- 清理时序数据: 删除所有时间戳早于某个日期的日志或指标。
- 多租户数据隔离: 当一个租户被删除时,如果该租户的所有 key 都有一个共同的前缀,可以使用
DeleteRange高效地删除其全部数据。 - TTL (Time-to-Live):
DeleteRange是实现数据过期功能的基础。
正如文档 delete-range.markdown 所述,DeleteRange 避免了“先扫描再删除(scan-and-delete)”模式的低效(大量读IO和写放大)和使用 CompactionFilter 的复杂性与异步性。
Get
Get 是 RocksDB 中最核心的单点读取操作。它的目标是高效地从可能分布在内存和多层磁盘文件中的数据里,找到指定 key 对应的最新版本的值。其执行路径完美地体现了 LSM-Tree(日志结构合并树)的读操作精髓。
整个 Get 流程可以分为四个主要阶段:API 入口与准备 -> 获取一致性视图 (SuperVersion) -> 分层查找 -> 结果处理与返回。
阶段一:API 入口与准备 (DBImpl::Get)
DBImpl::Get 函数是 Get 操作的公共 API 实现。它更像一个“门面”,负责一些初步处理和参数校验,然后调用内部核心实现。
db_impl.cc
// ... existing code ...
Status DBImpl::Get(const ReadOptions& _read_options,ColumnFamilyHandle* column_family, const Slice& key,PinnableSlice* value, std::string* timestamp) {assert(value != nullptr);value->Reset();// 1. 校验 I/O 活动类型,这是为了内部监控和统计if (_read_options.io_activity != Env::IOActivity::kUnknown &&_read_options.io_activity != Env::IOActivity::kGet) {return Status::InvalidArgument("Can only call Get with `ReadOptions::io_activity` is ""`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");}// 2. 创建一个 ReadOptions 的副本,并设置默认的 io_activityReadOptions read_options(_read_options);if (read_options.io_activity == Env::IOActivity::kUnknown) {read_options.io_activity = Env::IOActivity::kGet;}// 3. 调用真正的内部核心实现Status s = GetImpl(read_options, column_family, key, value, timestamp);return s;
}
// ... existing code ...
这个入口函数主要做了三件事:
- 断言
value指针非空并重置它。 - 检查并设置
ReadOptions中的io_activity,用于 I/O 相关的统计和追踪。 - 将所有参数传递给内部的
GetImpl函数,开始真正的查找之旅。
阶段二:获取一致性视图 (DBImpl::GetImpl 的前半部分)
GetImpl 是 Get 的核心。它首先要做的不是立即去查找数据,而是建立一个进行查找所必需的“上下文环境”,其中最关键的就是 SuperVersion 和 SequenceNumber。
// ... existing code ...
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,GetImplOptions& get_impl_options) {
// ... existing code ...// 1. 时间戳(Timestamp)相关校验if (read_options.timestamp) {const Status s = FailIfTsMismatchCf(get_impl_options.column_family,*(read_options.timestamp));if (!s.ok()) {return s;}} else {const Status s = FailIfCfHasTs(get_impl_options.column_family);if (!s.ok()) {return s;}}
// ... existing code ...// 2. 启动性能计时器和追踪PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
// ... existing code ...auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(get_impl_options.column_family);auto cfd = cfh->cfd();
// ... existing code ...// 3. 获取并引用 SuperVersionSuperVersion* sv = GetAndRefSuperVersion(cfd);
// ... existing code ...// 4. 确定查询的序列号(快照版本)SequenceNumber snapshot;if (read_options.snapshot != nullptr) {// 如果用户提供了快照,就使用快照的序列号snapshot =static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;} else {// 否则,获取当前最新的“已发布”序列号snapshot = GetLastPublishedSequence();
// ... existing code ...}
// ... existing code ...// 5. 构造 LookupKeyLookupKey lkey(key, snapshot, read_options.timestamp);
// ... existing code ...
这个阶段的步骤分解如下:
- 参数校验:特别是对时间戳(Timestamp)的校验,确保列族(Column Family)的配置和读请求的参数是一致的。
- 性能追踪:启动各种性能计数器,用于监控
Get操作的耗时。 - 获取 SuperVersion:这是至关重要的一步。
SuperVersion是 RocksDB 内部的一个核心数据结构,它封装了一个列族在某个时间点的一致性视图。它包含了:- 一个指向当前可写 MemTable (
mem) 的指针。 - 一个指向不可变 MemTable 列表 (
imm) 的指针。 - 一个指向当前版本信息 (
Version) 的指针,Version描述了磁盘上所有 SST 文件的布局。 通过对SuperVersion进行引用计数 (Ref()),可以保证在本次Get操作期间,它所指向的这些 MemTable 和 SST 文件不会被后台的 Flush 或 Compaction 进程清理掉,从而保证了读取的一致性。
- 一个指向当前可写 MemTable (
- 确定序列号 (Snapshot):RocksDB 通过序列号(SequenceNumber)来实现多版本并发控制(MVCC)。
Get操作必须在一个明确的序列号下进行读取。- 如果用户在
ReadOptions中指定了snapshot,就使用该快照对应的序列号。 - 否则,获取当前数据库最新的序列号。这一步必须在获取
SuperVersion之后做,以防止在间隙中发生 Compaction 导致数据丢失。
- 如果用户在
- 构造 LookupKey:将用户提供的
key、上一步确定的snapshot序列号以及可选的timestamp组合成一个内部查找键LookupKey。后续的所有查找都将使用这个LookupKey。
阶段三:分层查找 (DBImpl::GetImpl 的核心)
这是 Get 操作的“重头戏”。LSM-Tree 的读取遵循一个基本原则:从新到旧,从快到慢。一旦找到,立即返回。查找顺序是:Mutable MemTable -> Immutable MemTable -> SST Files (L0 -> L1 -> ... -> Lmax)。
// ... existing code ...// 查找路径开始...bool done = false;
// ... existing code ...if (!skip_memtable) {// 步骤 1: 在可写 MemTable (mem) 中查找if (sv->mem->Get(lkey, get_impl_options.value, &s, &merge_context,&max_covering_tombstone_seq, get_impl_options.timestamp,get_impl_options.columns, get_impl_options.callback)) {done = true;} else if (s.IsMergeInProgress()) {// 找到一个 Merge 操作,需要继续往下找基础值} else if (!s.ok()) {done = true;}if (!done && !s.IsMergeInProgress()) {// 步骤 2: 在不可变 MemTable 列表 (imm) 中查找sv->imm->Get(lkey, get_impl_options.value, &s, &merge_context,&max_covering_tombstone_seq, get_impl_options.timestamp,get_impl_options.columns, get_impl_options.callback);if (!s.ok()) {done = true;}}}if (!done) {// 步骤 3: 在 SST 文件中查找PERF_TIMER_GUARD(get_from_output_files_time);sv->current->Get(read_options, lkey, get_impl_options, &s, &merge_context,&max_covering_tombstone_seq, &read_stats,/*version_read_tier=*/kReadAllTier,/*single_read_bytes=*/nullptr,/*allow_unprepared_value=*/true);}
// ... existing code ...
查找可写 MemTable (
mem):- 这是数据最新的地方,也是全内存操作,速度最快。
- 如果找到一个普通值 (
kTypeValue),则done设为true,查找结束。 - 如果找到一个删除标记 (
kTypeDeletion或kTypeSingleDeletion),也意味着查找结束,结果为NotFound。 - 如果找到一个合并操作 (
kTypeMerge),说明这是一个需要合并的 key,Get操作会记录下这个 merge 操作,然后继续向更老的数据层查找它的基础值(base value)。
查找不可变 MemTable 列表 (
imm):- 如果在上一步没找到确切结果(
done仍为false),则按时间从新到旧的顺序遍历这个列表。 - 查找逻辑与可写 MemTable 完全相同。
- 如果在上一步没找到确切结果(
查找 SST 文件 (
sv->current->Get):- 如果内存中(MemTable)一无所获,就必须进入磁盘上的 SST 文件进行查找。这是
Get操作中最复杂、也可能是最耗时的部分。 - 布隆过滤器 (Bloom Filter): 在读取任何 SST 文件之前,会先检查它的布隆过滤器。如果过滤器判断
key一定不存在,则可以完全跳过对该文件的读取,极大地减少了不必要的 I/O。 - 分层查找:
- Level 0 (L0): L0 的 SST 文件之间 key 的范围是可能重叠的。因此,需要按照从新到旧的顺序,依次检查每一个可能包含该
key的 L0 文件。 - Level 1+ (L1及以上): L1 及以上层级的 SST 文件,其 key 的范围是互不重叠的。因此,在每一层,通过二分查找就可以快速定位到唯一一个可能包含该
key的文件。这比 L0 的查找效率高得多。
- Level 0 (L0): L0 的 SST 文件之间 key 的范围是可能重叠的。因此,需要按照从新到旧的顺序,依次检查每一个可能包含该
- SST 文件内部查找:
- 块缓存 (Block Cache): RocksDB 会将 SST 的数据块(Data Block)和索引块(Index Block)缓存在内存中。如果需要的数据块在缓存中,就可以避免一次磁盘 I/O。
- 索引块 -> 数据块: 首先读取(或从缓存获取)SST 的索引块,通过索引块找到
key可能所在的具体数据块的位置。 - 读取数据块: 读取(或从缓存获取)对应的数据块,然后在数据块内部通过二分查找最终定位到
key。
- 如果内存中(MemTable)一无所获,就必须进入磁盘上的 SST 文件进行查找。这是
阶段四:结果处理与返回 (DBImpl::GetImpl 的后半部分)
查找到结果后,需要进行最后的处理并释放资源。
// ... existing code ...// 1. 释放 SuperVersion 的引用ReturnAndCleanupSuperVersion(cfd, sv);// 2. 处理 Merge 结果 (如果需要)// ... (复杂的 Merge 逻辑)// 3. 更新统计信息if (stats_ != nullptr) {// ...}return s;
}
// ... existing code ...
- 资源释放: 调用
ReturnAndCleanupSuperVersion,减少SuperVersion的引用计数。当计数归零时,其占用的资源会被回收。这是保证无锁读取和后台清理能正确协作的关键。 - 结果封装:
- 如果找到了值,会通过
PinnableSlice返回。PinnableSlice是一个重要的优化,如果数据块在块缓存中,它可以直接指向缓存中的内存,并通过“钉住”(pin)该缓存块来防止它被淘汰,从而避免了不必要的数据拷贝。 - 如果最终状态是
NotFound或其他错误,则返回对应的Status对象。
- 如果找到了值,会通过
总结
DBImpl::Get 的执行流是一个精心设计的多层查找过程:
- 一致性保证:通过
SuperVersion和SequenceNumber确保了在并发环境下读取到一致的数据快照。 - 分层查找:严格遵循“内存优先,磁盘其次;L0遍历,L1+二分”的策略,最大化利用高速缓存,减少慢速I/O。
- 多重优化:
- 布隆过滤器:快速排除不含目标 key 的文件。
- 块缓存:复用读过的索引和数据块,减少磁盘读取。
- PinnableSlice:避免从块缓存到用户缓冲区的内存拷贝。
- L1+文件不重叠:将多文件查找优化为单文件查找。
整个过程体现了 LSM-Tree 架构为优化写性能而在读路径上所做的权衡与优化,使其在大多数情况下依然能提供非常高效的读取性能。
MultiGet 和 GetEntity
总的来说,它们是为不同场景设计的读取接口:
MultiGet是Get的 批量性能优化版,用于一次性高效读取多个标准的“键-值”对。GetEntity(及其批量版MultiGetEntity) 是为了一种不同的数据模型——宽列(Wide Columns)——设计的读取接口。
下面我们来逐一深入分析。
MultiGet: 高效的批量读取
MultiGet 的核心目标是解决“循环调用 Get” 效率低下的问题。当需要一次性查询多个 key 时,使用 MultiGet 会比自己写 for 循环调用 Get 快得多。
从 db/db_impl/db_impl.cc 中可以看到其实现,其高效性主要源于以下几个关键优化点:
1. 批量化与排序 (Batching & Sorting)
这是最核心的优化。MultiGet 并不是简单地在内部循环调用 Get。
// ... existing code ...
void DBImpl::MultiGetCommon(const ReadOptions& read_options,const size_t num_keys,ColumnFamilyHandle** column_families,
// ... existing code ...const bool sorted_input) {
// ... existing code ...// 将所有待查询的 key 封装成 KeyContextfor (size_t i = 0; i < num_keys; ++i) {sorted_keys[i] = &key_context[i];}// 除非用户保证输入已排序,否则进行排序PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);// 按列族(Column Family)对排好序的 key 进行分组autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE>key_range_per_cf;
// ... existing code ...for (size_t i = 0; i < num_keys; ++i) {KeyContext* key_ctx = sorted_keys[i];if (key_ctx->column_family != cf) {key_range_per_cf.emplace_back(cf_start, i - cf_start);
// ... existing code ...}}
// ... existing code ...
}
如代码所示,MultiGet 会:
- 排序:调用
PrepareMultiGetKeys对所有待查询的 key 进行排序。排序规则是先按列族(Column Family)ID,再按 key 的内容。 - 分组:将排好序的 key 按列族进行分组。
为什么这么做? 排序和分组带来了巨大的好处:数据局部性(Data Locality)。当处理同一个列族的一批排好序的 key 时,这些 key 在 SST 文件中物理存储的位置也很可能是相邻的。这使得后续的磁盘读取可以更连续、更集中,从而大幅提升效率。
2. 一致性快照 (Consistent Snapshot)
MultiGet 需要保证一次调用中读取的所有 key 都来自同一个时间点的数据库快照,以确保一致性。它通过 MultiCFSnapshot 函数实现这一点,该函数可以原子性地获取所有相关列族的 SuperVersion 和一个统一的序列号(SequenceNumber),避免了在多次 Get 调用之间数据库状态发生变化的问题。
3. 并行与批量 I/O (Parallel & Batched I/O)
这是性能提升的另一个关键。根据 RocksDB 的官方文档,MultiGet 在底层实现了更智能的 I/O 模式。
参考资料:
docs/_posts/2022-10-07-asynchronous-io-in-rocksdb.markdownThe MultiGet API accepts a batch of keys as input. Its a more efficient way of looking up multiple keys compared to a loop of Gets. One way MultiGet is more efficient is by reading multiple data blocks from an SST file in a batch, for keys in the same file. This greatly reduces the latency of the request, compared to a loop of Gets. The MultiRead FileSystem API is used to read a batch of data blocks.
文档中提到:
- 批量读取数据块:对于落在同一个 SST 文件中的多个 key,
MultiGet可以将它们的读取请求合并,通过底层的MultiRead接口一次性从磁盘读取多个数据块,而不是一个一个地读。 - 并行读取文件:更进一步,
MultiGet内部利用协程(Coroutines)等异步技术,可以并行地从不同的 SST 文件中读取数据。
这两种 I/O 优化极大地降低了由磁盘寻道和多次系统调用带来的延迟。
RocksDB 如何使用协程(Coroutines)
RocksDB 并非自己从零实现协程,而是依赖于第三方库,主要是 Facebook 开源的 folly 库。
docs/_posts/2022-10-07-asynchronous-io-in-rocksdb.markdown 明确指出了引入协程的动机:为了在 MultiGet 中实现更高效的异步并行 I/O。
传统的异步编程通常使用回调(Callback)方式,但这会导致代码逻辑被分割,形成所谓的“回调地狱”,状态管理非常复杂。文章中提到:
In order to simplify it, we used async IO with C++ coroutines. The TableReader::MultiGet is implemented as a coroutine, and the coroutine is suspended after issuing async reads for missing data blocks. This allows the top-level MultiGet to iterate through the TableReaders for all the keys, before waiting for the reads to finish and resuming the coroutines.
简单来说,协程可以将异步代码写成看似同步的逻辑:
TableReader::MultiGet作为一个协程开始执行。- 当它需要从磁盘读取一个数据块时,它会发起一个异步读请求。
- 然后,它会**挂起(suspend)**自己,将CPU控制权交还给调用者(顶层的
MultiGet逻辑)。 - 顶层的
MultiGet可以继续去处理其他 key,并发起更多的异步读请求。 - 当某个异步读操作完成后,对应的协程会被恢复(resume),从它上次挂起的地方继续执行。
这种方式极大地简化了在多个文件中并行读取数据块的复杂逻辑。
a. 依赖 folly 库
CMakeLists.txt 文件清楚地表明了对 folly 的依赖关系,这是使用协程的前提。
CMakeLists.txt
// ... existing code ...
if(USE_COROUTINES)if(USE_FOLLY OR USE_FOLLY_LITE)message(FATAL_ERROR "Please specify exactly one of USE_COROUTINES,"" USE_FOLLY, and USE_FOLLY_LITE")endif()set(CMAKE_CXX_STANDARD 20)set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines -Wno-maybe-uninitialized")
// ... existing code ...add_compile_definitions(USE_COROUTINES)set(USE_FOLLY 1)
endif()if(USE_FOLLY)
// ... existing code ...find_package(folly)# If cmake could not find the folly-config.cmake file, fall back# to looking in third-party/folly for folly and its dependenciesif(NOT FOLLY_LIBRARIES)
// ... existing code ...
这里的关键点:
USE_COROUTINES宏是启用协程功能的开关。- 启用协程需要 C++20 标准 (
set(CMAKE_CXX_STANDARD 20)) 和特定的编译器标志 (-fcoroutines)。 - 它会设置
USE_FOLLY为 1,并调用find_package(folly)来寻找和链接folly库。
b. 头文件包含
util/coro_utils.h 文件直接包含了 folly 的协程头文件。
coro_utils.h
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).#if defined(USE_COROUTINES)
#include "folly/coro/Coroutine.h"
#include "folly/coro/Task.h"
#endif
#include "rocksdb/rocksdb_namespace.h"// This file has two sctions. The first section applies to all instances of
// header file inclusion and has an include guard. The second section is
// meant for multiple inclusions in the same source file, and is idempotent.
folly::coro::Task 是 folly 库中用于定义协程返回值的核心类型。
c. 协程函数的实现方式
table/block_based/block_based_table_reader.cc 文件展示了协程在 RocksDB 中是如何被巧妙实现的。它通过宏定义,将同一份逻辑代码编译成同步和**异步(协程)**两个版本。
block_based_table_reader.cc
// ... existing code ...
// Generate the regular and coroutine versions of some methods by
// including block_based_table_reader_sync_and_async.h twice
// Macros in the header will expand differently based on whether
// WITH_COROUTINES or WITHOUT_COROUTINES is defined
// clang-format off
#define WITHOUT_COROUTINES
#include "table/block_based/block_based_table_reader_sync_and_async.h"
#undef WITHOUT_COROUTINES
#define WITH_COROUTINES
#include "table/block_based/block_based_table_reader_sync_and_async.h"
#undef WITH_COROUTINES
// clang-format on
这个文件两次包含了 block_based_table_reader_sync_and_async.h:
- 第一次定义
WITHOUT_COROUTINES,生成同步版本的函数。 - 第二次定义
WITH_COROUTINES,生成使用co_await,co_return等协程关键字的异步版本函数。
这种技巧避免了维护两套功能相同但实现方式(同步/异步)不同的代码,降低了维护成本。
博客文章也提到了使用协程的一些代价和限制:
- 依赖 Folly: 引入了额外的编译步骤和依赖。
- CPU开销: 协程本身有不可忽略的CPU开销,在某些场景下(如100%缓存命中)可能会比同步版本更慢。
- 并非所有读操作都并行化: 元数据(如索引块)的读取仍然是同步阻塞的。
总结
RocksDB 使用协程主要用于优化
MultiGet和Iterator的 I/O 性能。依赖于 Facebook 的
folly库提供的协程实现,并结合 C++20 的原生协程支持。docs/_posts/2022-10-07-asynchronous-io-in-rocksdb.markdown:详细阐述了引入协程的设计思想、实现原理和性能收益。
GetEntity: 读取宽列数据
GetEntity 和它的批量版本 MultiGetEntity 服务于一个完全不同的数据模型:宽列(Wide Columns)。
在标准的 RocksDB 模型中,一个 key 对应一个 value(一个 Slice)。而宽列模型允许一个 key 对应多个命名的列(属性-值对)。
- 标准模型:
key -> value - 宽列模型:
key -> { "column_name_1": "value_1", "column_name_2": "value_2", ... }
例如,可以用一个用户 ID 作为 key,然后将用户的姓名、城市、年龄等作为不同的列存储,而不是将它们序列化成一个 JSON 字符串作为单个 value。
GetEntity 的作用就是读取一个 key 对应的所有列。
从include/rocksdb/utilities/transaction.h
virtual void MultiGetEntity(const ReadOptions& options,ColumnFamilyHandle* column_family,size_t num_keys, const Slice* keys,PinnableWideColumns* results, Status* statuses,bool sorted_input = false) = 0;
它的接口和 MultiGet 类似,但最关键的区别在于返回值类型:
MultiGet返回PinnableSlice* values,每个 key 对应一个PinnableSlice。MultiGetEntity返回PinnableWideColumns* results,每个 key 对应一个PinnableWideColumns对象,该对象内部包含了这个 key 的所有列。
GetEntity 则是 MultiGetEntity 的单 key 版本。
总结与对比
| 特性 | MultiGet | GetEntity / MultiGetEntity |
|---|---|---|
| 数据模型 | 标准键值对 (key -> value) | 宽列 (key -> {col1:val1, col2:val2, ...}) |
| 核心目的 | 性能优化:高效批量读取标准键值对 | 功能接口:读取宽列数据模型的特定接口 |
| 输入 | 一组 Slice 类型的 key | 一组 Slice 类型的 key |
| 输出 | 一组 PinnableSlice(每个 key 一个 value) | 一组 PinnableWideColumns(每个 key 一组列) |
| 关键优化 | 排序、分组、批量I/O、并行I/O | 继承了 MultiGet 的批量优化思想 |
结论: 应该根据 数据存储方式来选择使用哪个函数。
- 如果 应用使用的是标准的 key-value 存储,当需要一次读取多个 key 时,请务必使用
MultiGet以获得最佳性能。 - 如果 使用了 RocksDB 的宽列特性(例如通过
PutEntity写入数据),那么 必须使用GetEntity或MultiGetEntity来读取这些数据。
GetMergeOperands
GetMergeOperands 是一个诊断和调试性质的 API。它的作用不是获取最终合并后的值,而是获取一个 key 对应的所有尚未合并的 Merge 操作数(operands)。
当对一个 key 多次调用 Merge 操作时,RocksDB 并不会立即将这些操作合并(这称为“懒合并” Lazy Merge)。它会把这些 Merge 操作(比如一连串的字符串追加、JSON 字段更新等)像日志一样堆叠起来。只有在 Compaction(合并)或者 Get 操作真正需要读取最终值时,才会执行“完全合并”(Full Merge)计算出结果。
GetMergeOperands 这个函数就是让你能窥探这个中间状态,它会返回:
- 这个 key 的原始基础值(如果存在的话)。
- 所有堆叠在基础值之上的、尚未被合并的
Merge操作数列表。
从头文件 db/db_impl/db_impl.h 中可以看到它的实现非常直接:
// ... existing code ...using DB::GetMergeOperands;Status GetMergeOperands(const ReadOptions& options,ColumnFamilyHandle* column_family, const Slice& key,PinnableSlice* merge_operands,GetMergeOperandsOptions* get_merge_operands_options,int* number_of_operands) override {GetImplOptions get_impl_options;get_impl_options.column_family = column_family;get_impl_options.merge_operands = merge_operands;get_impl_options.get_merge_operands_options = get_merge_operands_options;get_impl_options.number_of_operands = number_of_operands;get_impl_options.get_value = false; // <-- 关键点return GetImpl(options, key, get_impl_options);}
// ... existing code ...
它内部调用了万能的 GetImpl 函数,但传递了一个关键参数 get_impl_options.get_value = false。这个标志告诉 GetImpl:“不要执行完全合并,只需要把找到的所有 Merge 操作数收集起来然后返回”。
原来的 Get 完全支持 Merge 操作,并且是 Merge 操作的最终消费者。Get 和 GetMergeOperands 的关系可以理解为:
Get(key, &value): 这是面向最终用户的标准读取接口。当Get发现一个 key 关联了一系列Merge操作时,它会:- 找到这个 key 的基础值(base value)。
- 收集所有比基础值更新的
Merge操作数。 - 调用用户定义的
MergeOperator的FullMerge方法,将基础值和所有操作数进行计算。 - 返回最终计算出的单一结果给用户。 这个过程对用户是透明的,用户只关心最终结果,不关心中间有多少次
Merge。
GetMergeOperands(key, &operands): 这是面向开发者或高级用户的诊断接口。它会跳过上面Get的第 3 和第 4 步。它不进行计算,而是直接返回第 1 和第 2 步收集到的原始数据。
为什么需要 GetMergeOperands?
- 调试与问题排查:当
Merge的结果不符合预期时,可以用这个函数查看是哪些原始操作数参与了合并,从而定位问题。 - 性能分析:一个 key 堆积了过多的
Merge操作数会严重影响Get的性能(因为每次Get都要做一次昂贵的FullMerge)。通过GetMergeOperands可以监控操作数的数量,判断是否需要手动触发 Compaction 来提前合并它们。 - 高级应用场景:在某些特殊场景下,应用可能需要自己处理合并逻辑,或者只想获取增量更新的部分。例如,
HISTORY.md中提到的修复GetMergeOperands()的 bug,就说明了它在一些复杂场景(如带有回调函数)下的重要性。
Get 不仅支持 Merge,还是 Merge 机制能够工作的核心环节之一。GetMergeOperands 则是为了提供更强的可观察性而分离出来的一个辅助工具。
DBImpl::Flush
Flush 是 RocksDB 中一个至关重要的操作。它的核心作用是将内存中的数据(存在于 MemTable 中)持久化到磁盘上的 SST 文件中。这个过程不仅能释放内存,也是数据从内存进入 LSM-Tree 磁盘结构的必经之路。
这里分析Flush 的一个重载版本,它允许用户一次性地对多个列族(Column Family)发起 Flush 请求。
db_impl_compaction_flush.cc
// ... existing code ...
Status DBImpl::Flush(const FlushOptions& flush_options,const std::vector<ColumnFamilyHandle*>& column_families) {Status s;if (!immutable_db_options_.atomic_flush) {for (auto cfh : column_families) {s = Flush(flush_options, cfh);if (!s.ok()) {break;}}} else {ROCKS_LOG_INFO(immutable_db_options_.info_log,"Manual atomic flush start.\n""=====Column families:=====");for (auto cfh : column_families) {auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",cfhi->GetName().c_str());}ROCKS_LOG_INFO(immutable_db_options_.info_log,"=====End of column families list=====");autovector<ColumnFamilyData*> cfds;std::for_each(column_families.begin(), column_families.end(),[&cfds](ColumnFamilyHandle* elem) {auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);cfds.emplace_back(cfh->cfd());});s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, cfds);ROCKS_LOG_INFO(immutable_db_options_.info_log,"Manual atomic flush finished, status: %s\n""=====Column families:=====",s.ToString().c_str());for (auto cfh : column_families) {auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",cfhi->GetName().c_str());}ROCKS_LOG_INFO(immutable_db_options_.info_log,"=====End of column families list=====");}return s;
}
// ... existing code ...
这个函数的执行逻辑可以清晰地分为两个分支,由一个关键的数据库选项 atomic_flush 控制。
阶段一:API 入口与分支选择
DBImpl::Flush 函数首先会检查 immutable_db_options_.atomic_flush 选项的值,这个选项决定了当刷新多个列族时,其行为是原子性的还是非原子性的。
1. 非原子性 Flush (atomic_flush == false)
这是默认的行为。
// ...if (!immutable_db_options_.atomic_flush) {for (auto cfh : column_families) {s = Flush(flush_options, cfh);if (!s.ok()) {break;}}}
// ...
- 执行方式:代码会简单地遍历用户传入的
column_families列表。 - 逐一调用:在循环中,它会逐个调用处理单个列族的
Flush函数Flush(flush_options, cfh)。 - 非原子性:这意味着每个列族的
Flush操作是独立的。如果中途某个列族的Flush失败,循环会中断并返回错误,但之前已经成功Flush的列族将保持完成状态。它不保证“要么全部成功,要么全部失败”的原子性。
2. 原子性 Flush (atomic_flush == true)
当用户启用原子 Flush 时,代码会走 else 分支。
// ...} else {// ... 日志记录 ...autovector<ColumnFamilyData*> cfds;std::for_each(column_families.begin(), column_families.end(),[&cfds](ColumnFamilyHandle* elem) {auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);cfds.emplace_back(cfh->cfd());});s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, cfds);// ... 日志记录 ...}
// ...
- 准备工作:它首先会记录一些日志,然后将用户传入的
ColumnFamilyHandle*列表转换成内部使用的ColumnFamilyData*列表。 - 核心调用:接着,它会调用一个关键的内部函数
AtomicFlushMemTables。所有列族的Flush逻辑都被委托给了这个函数。 - 原子性保证:
AtomicFlushMemTables的核心职责是确保所有指定列族的 MemTable 要么全部成功刷到磁盘,要么全部失败。这是通过将所有新生成的 SST 文件的元数据信息通过一次原子的 MANIFEST 文件写入来实现的。如果中途发生故障,恢复时数据库会处于Flush操作之前的状态,保证了跨列族的一致性。
阶段二:深入核心实现
为了完全理解 Flush,我们需要进一步分析它所调用的核心内部函数。
FlushMemTable (非原子 Flush 的核心)
在非原子模式下,最终会调用到 FlushMemTable。这个函数负责单个列族的 Flush 流程。
// ... existing code ...// Force current memtable contents to be flushed.Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,FlushReason flush_reason,bool entered_write_thread = false);
// ... existing code ...
FlushMemTable 及其后续流程:
前台线程(调用
FlushMemTable的线程)- 1.1 前置检查:检查是否会造成写停顿(Stall),并根据
FlushOptions决定是否等待。 - 1.2 加锁与排队:获取全局互斥锁
mutex_,并进入write_thread_写队列,确保与普通写操作的顺序一致性。 - 1.3 切换 MemTable (核心动作之一):调用
SwitchMemtable()。- 将当前的可写 MemTable (Mutable MemTable) 变成只读的 (Immutable MemTable)。
- 将这个新的 Immutable MemTable 添加到该列族的
imm()列表(一个MemTableList对象)的末尾。 - 创建一个全新的、空的 Mutable MemTable 供后续的写操作使用。
- 1.4 创建并调度后台任务:
- 创建一个
FlushRequest对象。 - 调用
EnqueuePendingFlush()将请求放入一个待处理队列。 - 调用
MaybeScheduleFlushOrCompaction()唤醒后台线程池中的一个线程。
- 创建一个
- 1.5 解锁与返回:退出写队列,释放
mutex_。如果flush_options.wait为false,函数到此返回,前台操作继续。
- 1.1 前置检查:检查是否会造成写停顿(Stall),并根据
后台线程(被唤醒的
BGWorkFlush线程)- 2.1 挑选任务:后台线程被唤醒后,会从待处理队列中取出
FlushRequest。 - 2.2 创建
FlushJob对象:db_impl_compaction_flush.cc
创建一个// ... existing code ... // To address this, we make sure NotifyOnFlushBegin() executes after memtable // picking so that no new snapshot can be taken between the two functions.FlushJob flush_job(dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id,file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,job_context, flush_reason, log_buffer, directories_.GetDbDir(),GetDataDir(cfd, 0U),GetCompressionFlush(cfd->ioptions(), mutable_cf_options), stats_,&event_logger_, mutable_cf_options.report_bg_io_stats,true /* sync_output_directory */, true /* write_manifest */, thread_pri,io_tracer_, cfd->GetSuperVersion()->ShareSeqnoToTimeMapping(), db_id_,db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_); // ... existing code ...FlushJob实例,它包含了执行 Flush 所需的所有上下文信息。 - 2.3
FlushJob::PickMemTable():在mutex_保护下,从列族的imm()列表中挑选出需要被本次FlushJob处理的一个或多个 Immutable MemTable。 - 2.4
FlushJob::Run()(核心动作之二):- 释放
mutex_,开始执行耗时的 I/O 操作。 - 遍历挑选出的 MemTable 中的数据,并将其写入一个新的 SST 文件。这是将数据从内存持久化到磁盘的关键步骤。
- I/O 操作完成后,重新获取
mutex_。
- 释放
- 2.5 安装结果 (Install Results):
- 在
mutex_保护下,调用MemTableList::TryInstallMemtableFlushResults()。 - 生成一个
VersionEdit,记录这次变更(例如:在 L0 层增加了一个新的 SST 文件,并删除了对应的 MemTable)。 - 将
VersionEdit写入 MANIFEST 文件,并应用到内存中的VersionSet,使新的 SST 文件对读操作可见。 - 将已成功刷盘的 MemTable 从
imm()列表中移除,并准备释放其内存。
- 在
- 2.6 清理与唤醒:释放
mutex_。如果前台有线程在等待 (flush_options.wait == true),则唤醒它们。FlushJob结束。
- 2.1 挑选任务:后台线程被唤醒后,会从待处理队列中取出
AtomicFlushMemTables (原子 Flush 的核心)
这个函数是实现原子 Flush 的关键。
db_impl.h
// ... existing code ...// Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds`// (if non-empty) or amomg all column families and atomically record the// result to the MANIFEST.Status AtomicFlushMemTables(const FlushOptions& options, FlushReason flush_reason,const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},bool entered_write_thread = false);
// ... existing code ...
它的流程比 FlushMemTable 更复杂:
- 统一准备:它会一次性为所有需要
Flush的列族准备好Flush任务。 - 并行刷盘:它可能会并行地将多个列族的 Immutable MemTable 写入各自的 SST 文件。
- 统一提交:最关键的一步是,它会将所有列族的变更(比如 CF1 增加文件 A,CF2 增加文件 B)合并成一个单一的
VersionEdit。 - 原子写入 MANIFEST:最后,将这个包含了所有变更的
VersionEdit一次性地写入 MANIFEST 文件。这确保了从数据库元数据的角度看,所有列族的Flush是一个不可分割的操作。
Flush 到底是什么语义?
Flush 在 RocksDB 中是一个过程,而不是一个单一的原子动作。它包含两个紧密相连但又可以区分的阶段:
将 MemTable 从“可写”变为“只读” (MemTable Rotation / Switching)
- 触发者:
DBImpl::FlushMemTable->DBImpl::SwitchMemtable。 - 语义: 这是
Flush过程的开始。它将当前活跃的、正在接收写入的 MemTable(Mutable MemTable)“固化”,使其成为一个内容不再改变的 Immutable MemTable,并放入一个待处理列表(imm())。同时,一个新的空 MemTable 会被创建出来,接替成为新的 Mutable MemTable,保证写操作可以不间断地进行。 Flush包含了“把 mem 变为 read only”这一步。这是整个流程的第一阶段。
- 触发者:
将“只读”的 MemTable 数据写入磁盘 SST 文件 (MemTable Persisting)
- 触发者: 后台
FlushJob线程。 - 语义: 这是
Flush过程的核心实体工作。后台线程会处理imm()列表中的一个或多个 Immutable MemTable,将其中的数据迭代出来,编码、压缩,并最终写入一个全新的 SST 文件中。写完之后,会更新数据库的元数据(MANIFEST),正式将这个 SST 文件纳入 LSM-Tree 的管理体系。 Flush也包含了“把 read only 变为磁盘 SST”这一步。这是整个流程的第二阶段,也是最终目的。
- 触发者: 后台
Flush 的完整语义是一个从内存到磁盘的持久化过程,它起始于将可写 MemTable 切换为只读,最终完成于将该只读 MemTable 的内容写入磁盘上的 SST 文件。这两个阶段共同构成了 Flush 的完整含义。
可以将 DBImpl::FlushMemTable 函数看作是这个过程的发起者,它完成了第一阶段(切换),并启动了第二阶段(调度后台任务)。而 FlushJob 则是第二阶段的执行者。
总结
DBImpl::Flush(const FlushOptions&, const std::vector<ColumnFamilyHandle*>&) 是一个功能强大的 API,它为用户提供了批量刷新多个列族的能力。
- 它的行为由
atomic_flush选项决定,可以在**逐个刷新(非原子)和一体刷新(原子)**之间选择。 - 非原子模式实现简单,通过循环调用单个列族的
Flush来完成,容错性较低。 - 原子模式实现复杂,通过
AtomicFlushMemTables函数,确保所有列族的Flush作为一个整体成功或失败,为需要跨列族数据一致性的应用场景提供了重要保障。 - 无论是哪种模式,
Flush操作的本质都是将内存中的 MemTable 数据持久化为磁盘上的 SST 文件,这是 LSM-Tree 架构的核心机制之一。
DBImpl::FlushMemTable
FlushMemTable 是 RocksDB 中执行非原子性 Flush 的核心内部函数。当用户调用 DB::Flush(options, column_family) 且 atomic_flush 选项为 false 时,最终会由这个函数来处理。它的职责是发起一个指定列族(Column Family)的 Flush 过程,即将该列族内存中的 MemTable 数据转存为磁盘上的 SST 文件。
这个函数本身并不执行 I/O 操作,而是扮演一个协调者的角色:它在正确的同步保护下,准备好待刷盘的数据,然后将实际的刷盘任务调度给后台线程去完成。
下面是该函数的详细逻辑分析,我们将按照其执行顺序逐一拆解。
db_impl_compaction_flush.cc
// ... existing code ...
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,const FlushOptions& flush_options,FlushReason flush_reason,bool entered_write_thread) {
// ... existing code ...
函数签名解析
ColumnFamilyData* cfd: 指向需要被 Flush 的列族的内部数据结构。const FlushOptions& flush_options: Flush 操作的选项。其中两个最关键的标志是:wait:true表示调用线程需要阻塞等待,直到 Flush 操作完全结束。false表示函数会立即返回,Flush 在后台异步进行。allow_write_stall:true表示允许 Flush 操作导致写操作停顿。false则会先检查是否会造成停顿,如果会,则可能提前返回。
FlushReason flush_reason: 发起此次 Flush 的原因(如:手动、自动、后台恢复等)。bool entered_write_thread: 一个内部优化标志,如果调用者已经处于写线程上下文中,则为true,避免重复进入。
详细执行流程分析
阶段一:前置检查与写停顿处理
这是函数的第一道防线,确保 Flush 操作可以在合适的状态下进行。
db_impl_compaction_flush.cc
// ... existing code ...
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,const FlushOptions& flush_options,FlushReason flush_reason,bool entered_write_thread) {// 该函数不应在 atomic_flush 模式下被调用assert(!immutable_db_options_.atomic_flush);// 如果是异步调用,且写控制器已停止,则无法执行,直接返回 TryAgainif (!flush_options.wait && write_controller_.IsStopped()) {std::ostringstream oss;oss << "Writes have been stopped, thus unable to perform manual flush. ""Please try again later after writes are resumed";return Status::TryAgain(oss.str());}Status s;// 如果用户不允许写停顿if (!flush_options.allow_write_stall) {bool flush_needed = true;// 等待直到 Flush 不会造成写停顿s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");// 如果等待出错,或者检查后发现不再需要 Flush,则直接返回if (!s.ok() || !flush_needed) {return s;}}
// ... existing code ...
- 断言检查: 首先断言
atomic_flush为false,确保此函数只用于非原子 Flush 场景。 - 写控制器检查: 如果是异步 Flush (
wait == false),且写操作已被暂停,那么无法保证 Flush 能被调度,因此直接返回错误。 - 写停顿处理: 如果
allow_write_stall为false,会调用WaitUntilFlushWouldNotStallWrites。RocksDB 有机制防止后台积压过多导致前台写操作停顿(Stall)。此调用会检查当前 Flush 是否会触发停顿,如果会,则等待;如果等待后发现由于其他后台任务的推进,当前列族已经不再需要 Flush,则直接返回成功。
阶段二:获取锁,切换 MemTable
这是 Flush 操作的核心准备阶段,在数据库全局锁的保护下进行。
// ... existing code ...const bool needs_to_join_write_thread = !entered_write_thread;autovector<FlushRequest> flush_reqs;autovector<uint64_t> memtable_ids_to_wait;{WriteContext context;// 获取数据库全局互斥锁InstrumentedMutexLock guard_lock(&mutex_);WriteThread::Writer w;WriteThread::Writer nonmem_w;// 进入写线程队列,确保与所有写操作串行化if (needs_to_join_write_thread) {write_thread_.EnterUnbatched(&w, &mutex_);if (two_write_queues_) {nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);}}// 等待任何已进入队列但在我们之前的写操作完成WaitForPendingWrites();// 如果当前的可写 MemTable 不为空,则进行切换if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load() ||IsRecoveryFlush(flush_reason)) {s = SwitchMemtable(cfd, &context);}
// ... existing code ...
- 加锁与排队:
- 获取全局锁
mutex_,保证线程安全。 - 调用
write_thread_.EnterUnbatched()进入写线程队列。这保证了 Flush 操作与普通写操作(Put/Delete)的顺序一致性。 - 调用
WaitForPendingWrites()确保在自己之前的写操作都已经完成。
- 获取全局锁
- 切换 MemTable:
- 检查当前列族的 Mutable MemTable 是否有数据 (
!cfd->mem()->IsEmpty())。 - 如果需要 Flush,则调用
SwitchMemtable(cfd, &context)。这个关键函数会: a. 将当前正在写入的 Mutable MemTable 变成一个只读的 Immutable MemTable。 b. 将这个新的 Immutable MemTable 添加到该列族的 Immutable MemTable 列表中。 c. 创建一个全新的、空的 Mutable MemTable 供后续的写操作使用。
- 检查当前列族的 Mutable MemTable 是否有数据 (
阶段三:创建并提交 Flush 请求
MemTable 切换完成后,并不会立即刷盘,而是创建一个请求,交给后台调度器。
// ... existing code ...const uint64_t flush_memtable_id = std::numeric_limits<uint64_t>::max();if (s.ok()) {// 如果切换成功,且确实有待刷盘的 Immutable MemTableif (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||!cached_recoverable_state_empty_.load() ||IsRecoveryFlush(flush_reason)) {// 创建一个 Flush 请求FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};flush_reqs.emplace_back(std::move(req));memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID(false /* for_atomic_flush */));}// 特殊处理:如果开启了统计信息持久化,可能会触发统计CF的Flushif (immutable_db_options_.persist_stats_to_disk) {// ... (此处省略了触发统计CF Flush的复杂逻辑)}}if (s.ok() && !flush_reqs.empty()) {for (const auto& req : flush_reqs) {// ...// 标记该列族的 Immutable MemTable 列表需要被 Flushloop_cfd->imm()->FlushRequested();}// 如果调用者要求等待,增加 cfd 的引用计数,防止在等待期间被其他线程删除if (flush_options.wait) {for (const auto& req : flush_reqs) {// ...loop_cfd->Ref();}}for (const auto& req : flush_reqs) {// ...// 将 Flush 请求放入待处理队列bool flush_req_enqueued = EnqueuePendingFlush(req);// ...}// 唤醒后台线程,检查是否有待处理的 Flush 或 Compaction 任务MaybeScheduleFlushOrCompaction();}// 退出写线程队列,释放锁if (needs_to_join_write_thread) {write_thread_.ExitUnbatched(&w);if (two_write_queues_) {nonmem_write_thread_.ExitUnbatched(&nonmem_w);}}} // 锁的作用域结束
// ... existing code ...
- 创建请求: 为需要 Flush 的列族创建一个
FlushRequest对象,并记录下需要等待的 MemTable ID。 - 处理统计列族 (Stats CF): 这是一个有趣的细节。如果开启了
persist_stats_to_disk,为了防止统计列族持有过旧的 WAL 日志文件,当其他列族 Flush 时,可能会联动触发统计列族的 Flush。 - 标记与入队:
- 调用
imm()->FlushRequested()在列族元数据中打上一个“需要Flush”的标记。 - 如果
flush_options.wait为true,则增加cfd的引用计数,防止在等待期间列族被删除。 - 调用
EnqueuePendingFlush(req)将请求放入一个全局的待处理队列。
- 调用
- 调度: 调用
MaybeScheduleFlushOrCompaction(),此函数会检查待处理队列,并在需要时唤醒后台的 Flush/Compaction 线程池来执行任务。 - 释放锁: 退出写线程队列,并释放全局锁
mutex_。至此,前台线程的准备工作完成。
阶段四:等待与收尾
最后,根据 flush_options.wait 的值决定是立即返回还是等待后台任务完成。
// ... existing code ...// 通知监听器,一个手动 Flush 已被调度NotifyOnManualFlushScheduled({cfd}, flush_reason);TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");// 如果调用者要求等待if (s.ok() && flush_options.wait) {// ... (准备需要等待的cfd列表和memtable_id列表) ...// 阻塞等待,直到后台线程完成了指定的 Flush 任务s = WaitForFlushMemTables(cfds, flush_memtable_ids,flush_reason == FlushReason::kErrorRecovery /* resuming_from_bg_err */,flush_reason);InstrumentedMutexLock lock_guard(&mutex_);// 等待结束后,释放之前增加的引用计数for (auto* tmp_cfd : cfds) {tmp_cfd->UnrefAndTryDelete();}}TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");return s;
}
// ... existing code ...
- 通知: 调用
NotifyOnManualFlushScheduled通知所有注册的监听器(Listener),一个 Flush 任务已经被调度。 - 等待: 如果
flush_options.wait为true,则调用WaitForFlushMemTables。该函数会使用条件变量等同步原语,阻塞当前线程,直到后台线程完成了对指定 MemTable 的刷盘工作并通知唤醒。 - 清理: 等待结束后,释放之前为防止列族被删除而增加的引用计数。
- 返回状态: 返回最终的操作状态。
总结
DBImpl::FlushMemTable 是一个设计精良的协调函数,它完美地体现了 RocksDB 将前台操作与后台 I/O 分离的设计哲学。其完整流程可以概括为:
- 前置检查:确保操作的合法性和时机(避免写停顿)。
- 同步与准备:在全局锁的保护下,与写操作串行化,并将可写 MemTable 切换为只读状态。
- 调度:将具体的刷盘任务封装成请求,放入队列,并唤醒后台线程。
- 等待(可选):根据用户需求,提供同步等待或异步返回的能力。
通过这个流程,它高效地将内存数据移交给了后台处理,最大限度地减少了对前台写操作的影响。
