当前位置: 首页 > news >正文

Spark核心Storage详解

1、Master-Slave架构

1、Master角色

1、BlockManagerMasterEndpoint

它负责跟踪所有Slave节点的Block信息,维护整个应用程序的Block元数据,包括Block所在的位置、所占用的存储空间大小(内存、磁盘或Tachyon)以及副本位置等

  • blockManagerInfo:保存BlockManagerId到BlockManagerInfo的映射,记录Executor节点的内存使用情况和Block状态
  • blockManagerIdByExecutor:保存Executor ID到BlockManagerId的映射,用于快速查找
  • blockLocations:BlockId到BlockManagerId集合的映射,支持数据块多副本定位

2、Slave角色

1、BlockManagerStorageEndpoint

它主要负责接收来自Master的命令,执行相应的数据清理操作,并响应Master获取Block状态的请求。Slave节点会将Block状态变化主动上报给Master,但不会主动向Master发送消息。

3、消息传递

1 、Slave到Master的消息

case RegisterBlockManager(//Executor启动时注册BlockManagerid, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister) =>context.reply(register(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, endpoint, isReRegister))case _updateBlockInfo @ //数据块状态变更时上报元数据UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>@inline def handleResult(success: Boolean): Unit = {// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo// returns false since the block info would be updated again later.if (success) {listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))}context.reply(success)}if (blockId.isShuffle) {updateShuffleBlockInfo(blockId, blockManagerId).foreach(handleResult)} else {handleResult(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))}case GetLocations(blockId) =>//查询数据块位置信息context.reply(getLocations(blockId))case GetLocationsAndStatus(blockId, requesterHost) =>context.reply(getLocationsAndStatus(blockId, requesterHost))case GetLocationsMultipleBlockIds(blockIds) =>context.reply(getLocationsMultipleBlockIds(blockIds))case GetPeers(blockManagerId) =>context.reply(getPeers(blockManagerId))case GetExecutorEndpointRef(executorId) =>context.reply(getExecutorEndpointRef(executorId))case GetMemoryStatus =>context.reply(memoryStatus)case GetStorageStatus =>context.reply(storageStatus)case GetBlockStatus(blockId, askStorageEndpoints) =>context.reply(blockStatus(blockId, askStorageEndpoints))case GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter) =>context.reply(getShufflePushMergerLocations(numMergersNeeded, hostsToFilter))case RemoveShufflePushMergerLocation(host) =>context.reply(removeShufflePushMergerLocation(host))case IsExecutorAlive(executorId) =>context.reply(blockManagerIdByExecutor.contains(executorId))case GetMatchingBlockIds(filter, askStorageEndpoints) =>context.reply(getMatchingBlockIds(filter, askStorageEndpoints))case RemoveRdd(rddId) =>context.reply(removeRdd(rddId))case RemoveShuffle(shuffleId) =>context.reply(removeShuffle(shuffleId))case RemoveBroadcast(broadcastId, removeFromDriver) =>context.reply(removeBroadcast(broadcastId, removeFromDriver))case RemoveBlock(blockId) =>removeBlockFromWorkers(blockId)context.reply(true)case RemoveExecutor(execId) =>removeExecutor(execId)context.reply(true)case DecommissionBlockManagers(executorIds) =>// Mark corresponding BlockManagers as being decommissioning by adding them to// decommissioningBlockManagerSet, so they won't be used to replicate or migrate blocks.// Note that BlockManagerStorageEndpoint will be notified about decommissioning when the// executor is notified(see BlockManager.decommissionSelf), so we don't need to send the// notification here.val bms = executorIds.flatMap(blockManagerIdByExecutor.get)logInfo(log"Mark BlockManagers (${MDC(BLOCK_MANAGER_IDS, bms.mkString(", "))}) as " +log"being decommissioning.")decommissioningBlockManagerSet ++= bmscontext.reply(true)case GetReplicateInfoForRDDBlocks(blockManagerId) =>context.reply(getReplicateInfoForRDDBlocks(blockManagerId))case StopBlockManagerMaster =>context.reply(true)stop()case UpdateRDDBlockTaskInfo(blockId, taskId) =>// This is to report the information that a rdd block(with `blockId`) is computed// and cached by task(with `taskId`). And this happens right after the task finished// computing/caching the block only when the block is not visible yet. And the rdd// block will be marked as visible when the corresponding task finished successfully.context.reply(updateRDDBlockTaskInfo(blockId, taskId))case GetRDDBlockVisibility(blockId) =>// Get the visibility status of a specific rdd block.context.reply(isRDDBlockVisible(blockId))case UpdateRDDBlockVisibility(taskId, visible) =>// This is to report the information that whether rdd blocks computed by task(with `taskId`)// can be turned to be visible. This is reported by DAGScheduler right after task completes.// If the task finished successfully, rdd blocks can be turned to be visible, otherwise rdd// blocks' visibility status won't change.context.reply(updateRDDBlockVisibility(taskId, visible))

2、 Master到Slave的消息

case RemoveBlock(blockId) =>//删除指定数据块doAsync[Boolean](log"removing block ${MDC(BLOCK_ID, blockId)}", context) {blockManager.removeBlock(blockId)true}case RemoveRdd(rddId) =>//删除RDD所有相关数据块doAsync[Int](log"removing RDD ${MDC(RDD_ID, rddId)}", context) {blockManager.removeRdd(rddId)}case RemoveShuffle(shuffleId) =>//删除Shuffle相关数据doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", context) {if (mapOutputTracker != null) {mapOutputTracker.unregisterShuffle(shuffleId)}val shuffleManager = SparkEnv.get.shuffleManagerif (shuffleManager != null) {shuffleManager.unregisterShuffle(shuffleId)} else {logDebug(log"Ignore remove shuffle ${MDC(SHUFFLE_ID, shuffleId)}")true}}case DecommissionBlockManager =>context.reply(blockManager.decommissionSelf())case RemoveBroadcast(broadcastId, _) =>//删除广播变量数据doAsync[Int](log"removing broadcast ${MDC(BROADCAST_ID, broadcastId)}", context) {blockManager.removeBroadcast(broadcastId, tellMaster = true)}case GetBlockStatus(blockId, _) =>context.reply(blockManager.getStatus(blockId))case GetMatchingBlockIds(filter, _) =>context.reply(blockManager.getMatchingBlockIds(filter))case TriggerThreadDump =>context.reply(Utils.getThreadDump())case TriggerHeapHistogram =>context.reply(Utils.getHeapHistogram())case ReplicateBlock(blockId, replicas, maxReplicas) =>context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))case MarkRDDBlockAsVisible(blockId) =>// The message is sent from driver to ask the block manager to mark the rdd block with// `blockId` to be visible now. This happens in 2 scenarios:// 1. A task computing/caching the rdd block finished successfully and the rdd block can be//    turned to be visible. Driver will ask all block managers hosting the rdd block to mark//    the block as visible.// 2. Once a replica of a visible block is cached and reported, driver will also ask the//    the block manager to mark the block as visible immediately.context.reply(blockManager.blockInfoManager.tryMarkBlockAsVisible(blockId))

2、基本概念

1、BlockData

block 抽象出块的存储方式,调用方应在完成块后调用 BlockData#dispose
在这里插入图片描述

2、BlockId

块唯一标识

在这里插入图片描述

val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_BATCH = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
val SHUFFLE_PUSH = "shufflePush_([0-9]+)_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_MERGED = "shuffleMerged_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_MERGED_DATA = "shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_MERGED_INDEX = "shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+)_([0-9]+).index".r
val SHUFFLE_MERGED_META = "shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+)_([0-9]+).meta".r
val SHUFFLE_CHUNK = "shuffleChunk_([0-9]+)_([0-9]+)_([0-9]+)_([0-9]+)".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val PYTHON_STREAM = "python-stream-([0-9]+)-([0-9]+)".r
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r

3、BlockInfo

跟踪单个块的元数据,由BlockInfoManager持有

3、核心组件

1、BlockManager

BlockManager 是存储体系最核心的组件,它存在于 Spark 中的所有结点中(包括 Driver 和 Executor),BlockManager 的核心功能就是对磁盘、堆内存和堆外内存进行统一管理。

2、BlockManagerMaster

BlockManagerMaster 持有 BlockManagerMasterEndpoint 的引用。Driver 与 Executor 中的 BlockManager 信息交互都需要依赖于 BlockManagerMaster。

3、BlockInfoManager

blockInfo元数据管理

4、BlockTransferService

块传输服务

5、DiskBlockManager

磁盘块管理器

6、DiskStore

通过DiskBlockManager将逻辑BlockId映射到本地文件(/tmp/blockmgr-.../3e/rdd_<id> )

7、MemoryManager

内存管理器。负责节点内存的分配与回收

8、MemoryStore

反序列化 Java 对象的数组或序列化的 ByteBuffers

基于LinkedHashMap管理内存块,LRU策略淘汰数据。

1、MemoryEntry

DeserializedMemoryEntry 访问速度快,无需反序列化

SerializedValuesHolder 内存紧凑,占用空间小,GC 压力小

http://www.dtcms.com/a/398263.html

相关文章:

  • 高系分二十:微服务系统分析与设计
  • 深度学习----ResNet(残差网络)-彻底改变深度神经网络的训练方式:通过残差学习来解决深层网络退化问题(附PyTorch实现)
  • 脑电模型实战系列:入门脑电情绪识别-用最简单的DNN模型起步
  • 赣州企业网站建设比较火的推广软件
  • 广州公司网站制作网页游戏排行榜20
  • 算法提升之单调数据结构-(单调队列)
  • PHP 线上环境 Composer 依赖包更新部署指南-简易版
  • 设计模式-原型模式详解
  • ESP8266与CEM5826-M11毫米波雷达传感器的动态检测系统
  • [原创]怎么用qq邮箱订阅arxiv.org?
  • 设计模式-中介者模式详解
  • 【探寻C++之旅】第十四章:简单实现set和map
  • 牛客:机器翻译
  • 20250925的学习笔记
  • 域名不同网站程序相同wordpress多门户网站
  • 淘宝API商品详情接口全解析:从基础数据到深度挖掘
  • 【低代码】百度开源amis
  • 求推荐专业的网站建设开发免费商城
  • java面试day4 | 微服务、Spring Cloud、注册中心、负载均衡、CAP、BASE、分布式接口幂等性、xxl-job
  • 高QE sCMOS相机在SIM超分辨显微成像中的应用
  • C++设计模式之创建型模式:原型模式(Prototype)
  • Node.js/Python 调用 1688 API 实时拉取商品信息的实现方案
  • OpenLayers地图交互 -- 章节九:拖拽框交互详解
  • 浅谈 Kubernetes 微服务部署架构
  • 媒体资源云优客seo排名公司
  • 企业如何构建全面防护体系,应对勒索病毒与恶意软件攻击?
  • 【重磅发布】《特色产业数据要素价值化研究报告》
  • fast-lio有ros2版本吗?
  • PWM 冻结模式 模式1 强制输出有效电平 强制输出无效电平 设置有效电平 实现闪烁灯
  • 系统分析师-软件工程-信息系统开发方法面向对象原型化方法面向服务快速应用开发