当前位置: 首页 > news >正文

HBase Compaction HFile 可见性和并发安全性分析

HStore

HStore 本身不直接记录一个“目录”的概念,但它管理着与特定列族相关的所有 StoreFiles。这些 StoreFiles 存储在 HDFS 上的特定目录中,通常位于 Region 目录下的列族目录中。HStore 通过 StoreFileTracker (如 StoreFileTrackerFactory.create) 来跟踪和管理这些文件。StoreFileTracker 负责记录和更新 StoreFiles 的元数据,包括它们在文件系统中的位置。

  • 在 HStore 的构造函数中,会初始化 StoreFileManager,它负责管理 StoreFiles 的生命周期,包括添加、删除和获取 StoreFiles。StoreFileManager 会与 StoreFileTracker 交互,以确保文件列表是最新的。
  • HStore 还通过 archiveLock 和相关方法(如 closeAndArchiveCompactedFiles 和 removeCompactedfiles)来管理已压缩文件的归档,确保这些文件被正确地移动到归档目录并从活动文件列表中移除。

Compaction 如何与 HStore 交互?

  • Compaction 触发HStore 通过 requestCompaction 方法来触发 compaction。该方法会选择合适的 StoreFiles 进行 compaction,并将这些文件添加到 filesCompacting 列表中,以防止它们被其他操作干扰。
  • Compaction 执行:实际的 compaction 逻辑由 CompactionPipeline 和 Compactor 实现。HStore 通过 compact 方法来协调整个 compaction 过程。该方法会创建一个临时文件来存储 compaction 结果,然后将输入文件移动到归档目录,并将临时文件移动到 Store 的目录中。
  • Compaction 完成:在 compaction 完成后,HStore 会调用 replaceStoreFiles 方法来更新 StoreFiles 列表,将旧的输入文件替换为新的输出文件。同时,它会更新相关的统计信息和指标,如 compactedCellsCount 和 compactedOutputFileSize
  • Compaction 失败处理:如果 compaction 过程中发生错误,HStore 会尝试回滚操作,确保数据一致性。例如,在 compact 方法中,如果写入 WAL 记录失败,它会删除临时文件并恢复原始文件。
  • Compaction 监控HStore 提供了 getCompactionProgress 方法来获取当前 compaction 的进度,并通过 finishCompactionRequest 和 cancelRequestedCompaction 方法来管理 compaction 请求的完成和取消。

总结来说,HStore 通过 StoreFileManager 和 StoreFileTracker 来管理 StoreFiles 的目录和元数据,并通过一系列方法(如 requestCompactioncompactreplaceStoreFiles 等)来协调和执行 compaction 过程。

HStore 如何利用 storeEngine 的锁机制保证可见性和安全性

1. 读操作方面

在 HStore 类中,读操作(如 getScanners 方法)通过获取 storeEngine 的读锁来保证可见性和安全性:

public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)throws IOException {Collection<HStoreFile> storeFilesToScan;List<KeyValueScanner> memStoreScanners;this.storeEngine.readLock();try {storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,includeStartRow, stopRow, includeStopRow, onlyLatestVersion);memStoreScanners = this.memstore.getScanners(readPt);// NOTE: here we must increase the refCount for storeFiles because we would open the// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484// for more details.HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);} finally {this.storeEngine.readUnlock();}try {// First the store file scanners// TODO this used to get the store files in descending order,// but now we get them in ascending order, which I think is// actually more correct, since memstore get put at the end.List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);scanners.addAll(sfScanners);// Then the memstore scannersscanners.addAll(memStoreScanners);return scanners;} catch (Throwable t) {clearAndClose(memStoreScanners);throw t instanceof IOException ? (IOException) t : new IOException(t);} finally {HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);}
}

在读操作中,storeEngine.readLock() 被调用以获取读锁,这确保了在读取 StoreFile 列表时不会发生并发修改。读锁允许多个读操作同时进行,但会阻止写操作。在读取完成后,通过 storeEngine.readUnlock() 释放读锁。

2. 写操作方面

在 HStore 类中,写操作(如 replaceStoreFiles 方法)通过获取 storeEngine 的写锁来保证可见性和安全性:

@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",allowedOnPath = ".*/(HStore|TestHStore).java")
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,boolean writeCompactionMarker) throws IOException {storeEngine.replaceStoreFiles(compactedFiles, result, () -> {if (writeCompactionMarker) {writeCompactionWalRecord(compactedFiles, result);}}, () -> {synchronized (filesCompacting) {filesCompacting.removeAll(compactedFiles);}});// These may be null when the RS is shutting down. The space quota Chores will fix the Region// sizes later so it's not super-critical if we miss these.RegionServerServices rsServices = region.getRegionServerServices();if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {updateSpaceQuotaAfterFileReplacement(rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),compactedFiles, result);}
}

replaceStoreFiles 方法调用了 storeEngine.replaceStoreFiles,其实现如下:

public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)throws IOException {storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),StoreUtils.toStoreFileInfo(newFiles));walMarkerWriter.run();writeLock();try {storeFileManager.addCompactionResults(compactedFiles, newFiles);actionUnderLock.run();} finally {writeUnlock();}
}

在写操作中,storeEngine.writeLock() 被调用以获取写锁,这确保了在修改 StoreFile 列表时不会有其他读或写操作同时进行。写锁是独占的,它会阻止所有其他读和写操作。在修改完成后,通过 storeEngine.writeUnlock() 释放写锁。

StoreEngine 增加文件也是使用写锁

  /*** Add the store files to store file manager, and also record it in the store file tracker.* <p/>* The {@code actionAfterAdding} will be executed after the insertion to store file manager, under* the lock protection. Usually this is for clear the memstore snapshot.*/public void addStoreFiles(Collection<HStoreFile> storeFiles,IOExceptionRunnable actionAfterAdding) throws IOException {storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));writeLock();try {storeFileManager.insertNewFiles(storeFiles);actionAfterAdding.run();} finally {// We need the lock, as long as we are updating the storeFiles// or changing the memstore. Let us release it before calling// notifyChangeReadersObservers. See HBASE-4485 for a possible// deadlock scenario that could have happened if continue to hold// the lock.writeUnlock();}}

总结

HStore 通过 storeEngine 的读写锁机制来保证并发安全和可见性:

  1. 读操作:使用读锁 (readLock),允许多个读操作并发执行,但阻止写操作。
  2. 写操作:使用写锁 (writeLock),确保写操作的独占性,阻止所有其他读和写操作。

这种机制确保了在并发环境下,StoreFile 列表的读取和修改操作是安全的,并且能够保证数据的一致性和可见性。


​Compaction 执行层级​

​1. HStore 级别触发​
  • ​核心机制​​:Compaction 主要在 HStore 级别进行触发和管理

  • ​存储关联​​:每个列族(Column Family)对应一个 HStore 实例

  • ​触发判断​​:通过 HStore.needsCompaction()方法决定是否需要执行 compaction

​2. Region 级别协调​
  • ​多 Store 管理​​:单个 HRegion 包含多个 HStore,由 Region 层级协调各 Store 的 compaction

  • ​启动检查​​:在 HRegionServer 中,region 打开时自动检查 compaction 需求:

    if (!r.isReadOnly()) {for (HStore s : r.stores.values()) {if (s.hasReferences() || s.needsCompaction()) {this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");}}
    }
​3. Region Server 级别调度​
  • ​定期检测​​:CompactionChecker定期扫描所有 region 的 stores,触发必要 compaction

  • ​任务执行​​:通过 CompactSplit线程池异步执行 compaction 任务


​Compaction 协调机制​

​核心检测逻辑​

// CompactionChecker 定期检查实现
private static class CompactionChecker extends ScheduledChore {private final HRegionServer instance;@Overrideprotected void chore() {for (HRegion hr : this.instance.onlineRegions.values()) {if (hr == null || hr.isReadOnly() || !hr.getTableDescriptor().isCompactionEnabled()) {continue;}for (HStore s : hr.stores.values()) {if (s.needsCompaction()) {  // 检查 compaction 必要性this.instance.compactSplitThread.requestSystemCompaction(hr, s, "Periodic compaction"  // 提交系统级 compaction 请求);}}}}
}

​可能存在的问题及解决方案​

​问题类型​

​具体表现​

​解决方案​

​并发控制​

Compaction 与 region 关闭操作冲突

使用 writestate.compacting计数器同步状态;region 关闭时等待所有 compaction 完成

​资源竞争​

多 compaction 任务争抢系统资源

通过 CompactSplit线程池限制并发数;支持优先级调度确保关键任务优先执行

​跨 Region 协调​

各 Region 独立管理导致资源分配不均

设计上保持 Region 独立性(简化实现),但可能引发局部资源竞争


​总结​

HBase Compaction 采用 ​​多层级协同设计​​:

  1. ​触发检测​​:Store 级别实时监测 compaction 需求

  2. ​执行调度​​:Region Server 级别统一协调任务分配与执行

  3. ​资源管理​​:通过线程池并发控制和优先级机制优化资源利用率

该设计通过分层解耦实现了 ​​灵活性​​ 与 ​​可靠性​​ 的平衡,避免了单点故障风险,是经过验证的高效架构方案。

http://www.dtcms.com/a/355077.html

相关文章:

  • Docker-compose离线安装
  • 【Canvas与盾牌】“靡不有初,鲜克有终”黄竖条盾牌
  • [ICCV25]TRACE:用3D高斯直接学习物理参数,让AI“推演”未来场景
  • 微硕WINSOK高性能MOS管WSF80P04,助力充电宝效能与安全升级
  • 在IAR Embedded Workbench for Arm中实现Infineon TRAVEO™ T2G安全调试
  • 舆情监测系统有哪些功能
  • 省市区三级联动选择器-组件
  • C++ 方向 Web 自动化测试实战博客系统思路
  • mac系统本地部署Dify步骤梳理
  • 资产与设备管理数字化转型实践:企业降本增效的新引擎
  • 图书管理系统练习项目源码-前后端分离-【Java版】
  • Linux /proc/pid 探索
  • 【全开源】云贝餐饮V3独立版系统 v1.8.7+API接口开发文档+搭建教程
  • 从 Dockerfile 到 Kubernetes:现代化 PHP 应用配置管理进阶指南
  • 打造旅游实训新场景:旅游管理虚拟仿真实训室的运营与教学落地
  • 旅游管理新阵地:虚拟仿真实训室的功能设计与教学应用
  • Python 实现冒泡排序:从原理到代码
  • java去图片水印的方法
  • Redis 连接数爆炸:连接池配置错误踩坑记录
  • Runway Gen-2 深度技术解析:AI视频生成的范式变革
  • Bscan Bonding Chain
  • 使用llamafactory对模型进行微调
  • 软考-系统架构设计师 决策支持系统(DSS)详细讲解
  • 滤波算法作用
  • Redis高性能数据库讲解与实战指南
  • 文件系统挂载详细分析(《图解Linux内核》虚拟文件系统篇笔记三)
  • [机械结构设计-48]:机械工程师的岗位要求
  • ArkUI框架之promptAction弹窗
  • 安卓开发---BLE通信
  • 基于STM32单片机的车牌识别设计