当前位置: 首页 > news >正文

MongoDB源码delete分析oplog:从删除链路到核心函数实现

MongoDB Oplog(操作日志)是副本集与分片集群实现数据一致性的核心组件,它以固定集合(Capped Collection)形式存储在local库中,具备固定大小、循环覆盖的特性。本文将从删除操作的 Oplog 记录链路切入,结合核心源代码,拆解 Oplog 的日志生成、合法性校验与持久化机制,帮助开发者理解其底层工作原理。

一、Oplog 基础认知:定位与核心特性

在深入代码前,先明确 Oplog 的核心属性,为后续分析铺垫基础:

  • 核心作用:记录 MongoDB 所有写操作(插入、更新、删除、命令),副本集从节点通过复制并应用 Oplog,实现与主节点的数据同步。
  • 存储形态:默认存储于local.oplog.rs集合,属于 Capped Collection(固定集合),初始大小为磁盘剩余空间的 5%(最小 1GB、最大 50GB),空间满后自动覆盖最早的记录。
  • 关键接口createOplog()负责初始化 Oplog 集合;logInsertOps()专用于记录批量插入操作;logOp()是通用写操作日志记录入口,也是本文重点解析的函数。

MongoDB Oplog是local库下的一个固定集合。它是Capped collection,通俗意思就是它是固定大小,循环使用的。如下图: 

二、删除操作的 Oplog 记录链路

当执行文档删除操作时,MongoDB 会通过 “观察者模式” 触发 Oplog 记录,完整调用链如下(基于源码调用关系梳理):

mongo/db/catalog/collection_impl.cpp的deleteDocument
mongo/db/catalog/collection_impl.h的docFormongo/db/storage/record_store.h的dataFormongo/db/storage/wiredtiger/wiredtiger_record_store.cpp的findRecord,返回RecordData
mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp的deleteRecord
mongo/db/op_observer_registry.h的onDelete
mongo/db/auth_op_observer.h的onDelete
mongo/db/op_observer_impl.cpp的onDelete
mongo/db/op_observer_impl.cpp中replLogDelete
mongo/db/op_observer_impl.cpp中logOperation
mongo/db/repl/oplog.cpp中logOp
mongo/db/repl/oplog.cpp中_logOpsInner
  1. 删除操作入口mongo/db/catalog/collection_impl.cppdeleteDocument()函数,先执行索引删除(_indexCatalog->unindexRecord)和物理记录删除(_recordStore->deleteRecord),再通过观察者接口触发日志记录。
  2. 观察者触发:调用getGlobalServiceContext()->getOpObserver()->onDelete(),将删除事件传递给 Oplog 模块(非事务场景下直接记录,事务场景需关联会话记录)。
  3. 日志记录转发:经op_observer_registry.hauth_op_observer.h等中间层,最终路由到mongo/db/op_observer_impl.cpponDelete(),再调用replLogDelete()logOperation()
  4. 核心日志函数:最终进入mongo/db/repl/oplog.cpplogOp()函数,完成 Oplog 记录的生成与持久化。

mongo/db/catalog/collection_impl.cpp的deleteDocument代码

void CollectionImpl::deleteDocument(OperationContext* opCtx,StmtId stmtId,RecordId loc,OpDebug* opDebug,bool fromMigrate,bool noWarn,Collection::StoreDeletedDoc storeDeletedDoc) {...int64_t keysDeleted;_indexCatalog->unindexRecord(opCtx, doc.value(), loc, noWarn, &keysDeleted);_recordStore->deleteRecord(opCtx, loc);getGlobalServiceContext()->getOpObserver()->onDelete(opCtx, ns(), uuid(), stmtId, fromMigrate, deletedDoc);if (opDebug) {opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted);}
}

getGlobalServiceContext()->getOpObserver()->onDelete(opCtx, ns(), uuid(), stmtId, fromMigrate, deletedDoc);代码块非事务操作则直接记录删除操作到 oplog,并更新会话事务记录。

mongo/db/repl/oplog.h总预览代码

void appendRetryableWriteInfo(OperationContext* opCtx,MutableOplogEntry* oplogEntry,OplogLink* oplogLink,StmtId stmtId);void createOplog(OperationContext* opCtx,const NamespaceString& oplogCollectionName,bool isReplSet);
void createOplog(OperationContext* opCtx);std::vector<OpTime> logInsertOps(OperationContext* opCtx,MutableOplogEntry* oplogEntryTemplate,std::vector<InsertStatement>::const_iterator begin,std::vector<InsertStatement>::const_iterator end);
OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry);void clearLocalOplogPtr();void acquireOplogCollectionForLogging(OperationContext* opCtx);

mongo/db/repl/oplog.h是MongoDB源代码主要涉及操作日志(oplog)的管理与复制机制,是MongoDB副本集(Replica Set)和分片集群(Sharded Cluster)实现数据一致性的核心组件。

三、核心函数logOp()深度解析

logOp()是 MongoDB 记录 Oplog 的核心入口,负责操作合法性校验、OpTime 分配、并发控制与日志持久化触发,其源码逻辑可拆解为 4 个关键步骤:

mongo/db/repl/oplog.cpp是mongo/db/repl/oplog.h实现,OpObserver()观察者调用onDelete记录日志,logOp 是 MongoDB 复制模块中用于将操作记录到 Oplog(操作日志)的核心函数。其职责包括验证操作合法性、分配唯一时间戳(OpTime)、并发控制及持久化存储,确保分布式系统中的数据一致性。调用的是mongo/db/repl/oplog.cpp中logOp方法,代码如下:

OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {// All collections should have UUIDs now, so all insert, update, and delete oplog entries should// also have uuids. Some no-op (n) and command (c) entries may still elide the uuid field.invariant(oplogEntry->getUuid() || oplogEntry->getOpType() == OpTypeEnum::kNoop ||oplogEntry->getOpType() == OpTypeEnum::kCommand,str::stream() << "Expected uuid for logOp with oplog entry: "<< redact(oplogEntry->toBSON()));auto replCoord = ReplicationCoordinator::get(opCtx);// For commands, the test below is on the command ns and therefore does not check for// specific namespaces such as system.profile. This is the caller's responsibility.if (replCoord->isOplogDisabledFor(opCtx, oplogEntry->getNss())) {uassert(ErrorCodes::IllegalOperation,str::stream() << "retryable writes is not supported for unreplicated ns: "<< oplogEntry->getNss().ns(),!oplogEntry->getStatementId());return {};}auto oplogInfo = LocalOplogInfo::get(opCtx);// Obtain Collection exclusive intent write lock for non-document-locking storage engines.boost::optional<Lock::DBLock> dbWriteLock;boost::optional<Lock::CollectionLock> collWriteLock;if (!opCtx->getServiceContext()->getStorageEngine()->supportsDocLocking()) {dbWriteLock.emplace(opCtx, NamespaceString::kLocalDb, MODE_IX);collWriteLock.emplace(opCtx, oplogInfo->getOplogCollectionName(), MODE_IX);}// If an OpTime is not specified (i.e. isNull), a new OpTime will be assigned to the oplog entry// within the WUOW. If a new OpTime is assigned, it needs to be reset back to a null OpTime// before exiting this function so that the same oplog entry instance can be reused for logOp()// again. For example, if the WUOW gets aborted within a writeConflictRetry loop, we need to// reset the OpTime to null so a new OpTime will be assigned on retry.OplogSlot slot = oplogEntry->getOpTime();auto resetOpTimeGuard = makeGuard([&, resetOpTimeOnExit = bool(slot.isNull())] {if (resetOpTimeOnExit)oplogEntry->setOpTime(OplogSlot());});WriteUnitOfWork wuow(opCtx);if (slot.isNull()) {slot = oplogInfo->getNextOpTimes(opCtx, 1U)[0];// It would be better to make the oplogEntry a const reference. But because in some cases, a// new OpTime needs to be assigned within the WUOW as explained earlier, we instead pass// oplogEntry by pointer and reset the OpTime to null using a ScopeGuard.oplogEntry->setOpTime(slot);}std::cout<<"conca logOp oplogInfo->getOplogCollectionName(): "<<oplogInfo->getOplogCollectionName()<<std::endl;auto oplog = oplogInfo->getCollection();auto wallClockTime = oplogEntry->getWallClockTime();auto bsonOplogEntry = oplogEntry->toBSON();// The storage engine will assign the RecordId based on the "ts" field of the oplog entry, see// oploghack::extractKey.std::vector<Record> records{{RecordId(), RecordData(bsonOplogEntry.objdata(), bsonOplogEntry.objsize())}};std::vector<Timestamp> timestamps{slot.getTimestamp()};_logOpsInner(opCtx, oplogEntry->getNss(), &records, timestamps, oplog, slot, wallClockTime);wuow.commit();return slot;
}

输入验证与合法性检查

确保除 noop 和 command 操作外,其他操作日志条目必须包含 UUID,这是因为 MongoDB 从 3.6 版本开始要求所有集合包含 UUID,以增强数据一致性和可管理性。

 // All collections should have UUIDs now, so all insert, update, and delete oplog entries should// also have uuids. Some no-op (n) and command (c) entries may still elide the uuid field.invariant(oplogEntry->getUuid() || oplogEntry->getOpType() == OpTypeEnum::kNoop ||oplogEntry->getOpType() == OpTypeEnum::kCommand,str::stream() << "Expected uuid for logOp with oplog entry: "<< redact(oplogEntry->toBSON()));

Oplog 禁用检查

某些系统集合(如local_数据库下面的集合,system.profile)可能被配置为不参与复制。

if (replCoord->isOplogDisabledFor(opCtx, oplogEntry->getNss())) {uassert(ErrorCodes::IllegalOperation,str::stream() << "retryable writes is not supported for unreplicated ns: "<< oplogEntry->getNss().ns(),!oplogEntry->getStatementId());return {};}

mongo/db/repl/replication_coordinator.cpp的isOplogDisabledFor代码:

bool ReplicationCoordinator::isOplogDisabledFor(OperationContext* opCtx,const NamespaceString& nss) const {if (getReplicationMode() == ReplicationCoordinator::modeNone) {return true;}if (!opCtx->writesAreReplicated()) {return true;}if (ReplicationCoordinator::isOplogDisabledForNS(nss)) {return true;}fassert(28626, opCtx->recoveryUnit());return false;
}bool ReplicationCoordinator::isOplogDisabledForNS(const NamespaceString& nss) {if (nss.isLocal()) {return true;}if (nss.isSystemDotProfile()) {return true;}if (nss.isDropPendingNamespace()) {return true;}return false;
}

mongo/db/namespace_string.h的isLocal代码判断是否是local数据库,具体代码如下:

 // Namespace for the local databasestatic constexpr StringData kLocalDb = "local"_sd;bool isLocal() const {return db() == kLocalDb;}bool isSystemDotProfile() const {return coll() == "system.profile";}

Oplog 写入与持久化

构造存储引擎可识别的 Record 对象,包含操作数据,调用 _logOpsInner 执行底层写入,关联时间戳和集合信息。oplogEntry->getNss()值是local.oplog.rs,MongoDB 的操作日志(oplog)默认存储在本地数据库(local)的oplog.rs集合中。

WriteUnitOfWork wuow(opCtx);
auto bsonOplogEntry = oplogEntry->toBSON();
std::vector<Record> records{ {RecordId(), RecordData(bsonOplogEntry.objdata(), bsonOplogEntry.objsize())} };
std::vector<Timestamp> timestamps{slot.getTimestamp()};
_logOpsInner(opCtx, oplogEntry->getNss(), &records, timestamps, oplog, slot, wallClockTime);
wuow.commit();

四、_logOpsInner:Oplog 的物理写入与状态更新

logOp()仅负责 “上层逻辑处理”,真正的物理写入由_logOpsInner()完成,其核心职责包括写入权限校验、存储引擎调用、复制状态更新。

mongo/db/repl/oplog.cpp中_logOpsInner代码如下:

/** records - a vector of oplog records to be written.* timestamps - a vector of respective Timestamp objects for each oplog record.* oplogCollection - collection to be written to.* finalOpTime - the OpTime of the last oplog record.* wallTime - the wall clock time of the last oplog record.*/
void _logOpsInner(OperationContext* opCtx,const NamespaceString& nss,std::vector<Record>* records,const std::vector<Timestamp>& timestamps,Collection* oplogCollection,OpTime finalOpTime,Date_t wallTime) {auto replCoord = ReplicationCoordinator::get(opCtx);if (nss.size() && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet &&!replCoord->canAcceptWritesFor(opCtx, nss)) {str::stream ss;ss << "logOp() but can't accept write to collection " << nss;ss << ": entries: " << records->size() << ": [ ";for (const auto& record : *records) {ss << "(" << record.id << ", " << redact(record.data.toBson()) << ") ";}ss << "]";uasserted(ErrorCodes::NotMaster, ss);}Status result = oplogCollection->insertDocumentsForOplog(opCtx, records, timestamps);if (!result.isOK()) {severe() << "write to oplog failed: " << result.toString();fassertFailed(17322);}// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime, wallTime](boost::optional<Timestamp> commitTime) {if (commitTime) {// The `finalOpTime` may be less than the `commitTime` if multiple oplog entries// are logging within one WriteUnitOfWork.invariant(finalOpTime.getTimestamp() <= *commitTime,str::stream() << "Final OpTime: " << finalOpTime.toString()<< ". Commit Time: " << commitTime->toString());}// Optionally hang before advancing lastApplied.if (MONGO_unlikely(hangBeforeLogOpAdvancesLastApplied.shouldFail())) {log() << "hangBeforeLogOpAdvancesLastApplied fail point enabled.";hangBeforeLogOpAdvancesLastApplied.pauseWhileSet(opCtx);}// Optimes on the primary should always represent consistent database states.replCoord->setMyLastAppliedOpTimeAndWallTimeForward({finalOpTime, wallTime}, ReplicationCoordinator::DataConsistency::Consistent);// We set the last op on the client to 'finalOpTime', because that contains the// timestamp of the operation that the client actually performed.ReplClientInfo::forClient(opCtx->getClient()).setLastOp(opCtx, finalOpTime);});
}

写入权限验证

确保当前节点是主节点(Primary),并且允许对目标命名空间(nss)写入。

 if (nss.size() && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet &&!replCoord->canAcceptWritesFor(opCtx, nss)) {str::stream ss;ss << "logOp() but can't accept write to collection " << nss;ss << ": entries: " << records->size() << ": [ ";for (const auto& record : *records) {ss << "(" << record.id << ", " << redact(record.data.toBson()) << ") ";}ss << "]";uasserted(ErrorCodes::NotMaster, ss);}

物理写入 Oplog 集合

调用存储引擎接口将 records 和 timestamps 写入 Oplog 集合。

 Status result = oplogCollection->insertDocumentsForOplog(opCtx, records, timestamps);if (!result.isOK()) {severe() << "write to oplog failed: " << result.toString();fassertFailed(17322);}

mongo/db/catalog/collection_impl.cpp的insertDocumentsForOplog代码如下: 

Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx,std::vector<Record>* records,const std::vector<Timestamp>& timestamps) {dassert(opCtx->lockState()->isWriteLocked());// Since this is only for the OpLog, we can assume these for simplicity.invariant(!_validator);invariant(!_indexCatalog->haveAnyIndexes());Status status = _recordStore->insertRecords(opCtx, records, timestamps);if (!status.isOK())return status;opCtx->recoveryUnit()->onCommit([this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); });return status;
}

local.oplog.rs表插入日志记录,mongodb默认的存储引擎是WiredTiger, _recordStore实现类是mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp。mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp会插入日志记录。

五、总结:Oplog 的核心价值与关键启示

Oplog 作为 MongoDB 复制机制的 “生命线”,其设计体现了三大核心思想:

  1. 原子性保障:通过 WUOW 将 “OpTime 分配 - 日志写入” 包装为原子操作,避免分布式场景下的日志不一致;
  2. 性能优先:Oplog 集合无索引、无校验,写入逻辑极简,确保写操作不会因日志记录产生明显延迟;
  3. 一致性校验:UUID 校验、主节点权限判断等机制,从源头规避非法操作对复制集群的影响。

对于开发者而言,理解 Oplog 的底层逻辑,不仅能更高效地排查副本集同步问题,还能在设计高可用 MongoDB 架构时,更合理地配置 Oplog 大小、监控 Oplog 覆盖频率,确保集群稳定性。

http://www.dtcms.com/a/465068.html

相关文章:

  • 运维面试准备——综合篇(一)
  • 线性代数 · 矩阵 | SVD 与 PCA 应用区别
  • 网站漏洞扫描服务个人怎么做公众号
  • 云计算综合标准化体系建设提供系统性指引
  • 阿里云智能集团首席技术官云栖大会要点总结
  • 6. React useState基础使用:useState修改状态的规则;useState修改对象状态的规则
  • 凡科做的网站怎么打不开了天津做再生资源交易的网站
  • AWS Shield 与海外高防服务器的对比分析
  • CTF攻防世界WEB精选基础入门:cookie
  • Vue 中 props 传递数据的坑
  • Descheduler for Kubernetes(K8s 重调度器)
  • Embedding(嵌入):让机器理解世界的通用语言
  • sql练习题单-知识点总结
  • 网站空间域名续费湖南送变电建设公司 网站
  • 国产化PDF处理控件Spire.PDF教程:C#中轻松修改 PDF 文档内容
  • 文件预览(pdf、docx、xlsx)
  • AutoCAD如何将指定窗口导出成PDF?
  • 测试DuckDB电子表格读取插件rusty_sheet 0.2版
  • 用「心率」重塑极限,以「中国精度」见证热爱——宜准产品体验官于淼成功挑战北京七环
  • 18003.TwinCat3配置LAN9253从站XML文件(Ethercat)- 示例(一)
  • 解锁特征工程:机器学习的秘密武器
  • 南昌企业网站开发公司hao123网址导航
  • 中山市有什么网站推广长臂挖机出租东莞网站建设
  • 网站建设多少钱一个月青岛网站公司哪家好
  • PowerBI一直在为个人版用户赋能,QuickBI目前正在拥抱个人版用户,FineBI正在抛弃个人版用户
  • 做网站和平台多少钱dedecms 网站地图 插件
  • 在 C# 中显示或隐藏 PDF 图层
  • 货车智能化配置手机控车远程启动一键启动无钥匙进入
  • Unity 项目外部浏览并读取PDF文件在RawImage中显示,使用PDFRender插件
  • 网站规划与建设评分标准昆明的互联网公司有哪些