当前位置: 首页 > news >正文

Spark 运行流程核心组件(三)任务执行

一、启动模式

1、standalone

在这里插入图片描述

  1. 资源申请:Driver向Master申请Executor资源
  2. Executor启动:Master调度Worker启动Executor
  3. 注册通信:Executor直接向Driver注册

2、YARN

在这里插入图片描述

  1. Driver向YARN ResourceManager(RM)申请AM容器

  2. RM分配NodeManager(NM)启动AM(yarn-client 仅资源代理,不运行用户代码)

  3. AM向RM注册

  4. AM根据申请Executor容器

5.RM分配多个NM

6.每个NM启动ExecutorBackend进程

**7.**注册通信:Executor向AM内的Driver注册

二、Executor端任务执行的核心组件

  1. Driver 端组件
    • CoarseGrainedSchedulerBackend:负责与Executor通信
    • TaskSchedulerImpl:任务调度核心逻辑
    • DAGScheduler:DAG调度与Stage管理
    • BlockManagerMaster:块管理器协调器
    • MapOutputTrackerMaster:Shuffle输出跟踪器
  2. Executor 端组件
    • CoarseGrainedExecutorBackend:Executor的通信端点
    • Executor:任务执行引擎
    • TaskRunner:任务执行线程封装
    • BlockManager:本地数据块管理
    • ShuffleManager:Shuffle读写控制
    • ExecutorSource:指标监控

三、Executor 端任务执行核心流程

1、任务接收与初始化

  • CoarseGrainedExecutorBackend 接收任务
case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo(log"Got assigned task ${MDC(LogKeys.TASK_ID, taskDesc.taskId)}")executor.launchTask(this, taskDesc)}
  • Executor 任务启动
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val taskId = taskDescription.taskIdval tr = createTaskRunner(context, taskDescription)runningTasks.put(taskId, tr)val killMark = killMarks.get(taskId)if (killMark != null) {tr.kill(killMark._1, killMark._2)killMarks.remove(taskId)}threadPool.execute(tr)if (decommissioned) {log.error(s"Launching a task while in decommissioned state.")}}

2、任务执行

  • TaskRunner.run
// 1. 类加载与依赖管理
updateDependencies(taskDescription.artifacts.files,taskDescription.artifacts.jars,taskDescription.artifacts.archives,isolatedSession)
// 2. 反序列化任务
task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
// 3. 内存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
task.setTaskMemoryManager(taskMemoryManager)// 4. 任务执行
val value = Utils.tryWithSafeFinally {val res = task.run(taskAttemptId = taskId,attemptNumber = taskDescription.attemptNumber,metricsSystem = env.metricsSystem,cpus = taskDescription.cpus,resources = resources,plugins = plugins)threwException = falseres} {// block 释放val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)// memory 释放val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()if (freedMemory > 0 && !threwException) {val errMsg = log"Managed memory leak detected; size = " +log"${LogMDC(NUM_BYTES, freedMemory)} bytes, ${LogMDC(TASK_NAME, taskName)}"if (conf.get(UNSAFE_EXCEPTION_ON_MEMORY_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty && !threwException) {val errMsg =log"${LogMDC(NUM_RELEASED_LOCKS, releasedLocks.size)} block locks" +log" were not released by ${LogMDC(TASK_NAME, taskName)}\n" +log" ${LogMDC(RELEASED_LOCKS, releasedLocks.mkString("[", ", ", "]"))})"if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {throw SparkException.internalError(errMsg.message, category = "EXECUTOR")} else {logInfo(errMsg)}}}// 5. 状态上报
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  • Task run
// hadoop.caller.context.enabled = true
// 添加 HDFS 审计日志 , 用于问题排查 。 
// e.g. 小文件剧增 定位spark 任务
new CallerContext("TASK",SparkEnv.get.conf.get(APP_CALLER_CONTEXT),appId,appAttemptId,jobId,Option(stageId),Option(stageAttemptId),Option(taskAttemptId),Option(attemptNumber)).setCurrentContext()// 任务启动
context.runTaskWithListeners(this)

3、shuffle处理

  • ShuffleMapTask

为下游 Stage 准备 Shuffle 数据(Map 端输出),生成 MapStatus(包含数据位置和大小信息)。

val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTimeNs = System.nanoTime()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 从广播 序列化 rdd 、 dep
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0Lval rdd = rddAndDep._1
val dep = rddAndDep._2
// While we use the old shuffle fetch protocol, we use partitionId as mapId in the
// ShuffleBlockId construction.
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {partitionId
} else {context.taskAttemptId()
}
dep.shuffleWriterProcessor.write(rdd.iterator(partition, context),dep,mapId,partitionId,context)
  • ResultTask
override def runTask(context: TaskContext): U = {// Deserialize the RDD and the func using the broadcast variables.val threadMXBean = ManagementFactory.getThreadMXBeanval deserializeStartTimeNs = System.nanoTime()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lfunc(context, rdd.iterator(partition, context))
}

四、核心通信机制

消息类型方向内容
RegisterExecutorExecutor→DriverexecutorId, hostPort
RegisteredExecutorDriver→Executor注册成功确认
LaunchTaskDriver→Executor序列化的TaskDescription
StatusUpdateExecutor→DrivertaskId, state, result data
KillTaskDriver→Executor终止指定任务
StopExecutorDriver→Executor关闭Executor指令
HeartbeatExecutor→Driver心跳+指标数据

五、Executor 线程模型

Executor JVM Process
├── CoarseGrainedExecutorBackend (netty)
├── ThreadPool (CacheThreadPool)
│   ├── TaskRunner 1
│   ├── TaskRunner 2
│   └── ... 
├── BlockManager
│   ├── MemoryStore (on-heap/off-heap)
│   └── DiskStore
└── ShuffleManager├── SortShuffleWriter└── UnsafeShuffleWriter
http://www.dtcms.com/a/337618.html

相关文章:

  • C语言基础:变量与进制详解
  • 直播美颜SDK架构揭秘:动态贴纸功能的实现原理与性能优化
  • 计算机网络技术-交换机配置(Day.2)
  • 戴尔易安信 PowerEdge R540服务器系统安装教程
  • 深度学习篇---卷积
  • 远程访问公司内网电脑怎么操作?3个简单通用的跨网异地连接管理计算机方法
  • IoT/透过oc_lwm2m和at源码,分析NB-IoT通信模组和主板MCU之间的通信过程
  • 自建K8s集群无缝集成阿里云RAM完整指南
  • 重温 K8s 基础概念知识系列五(存储、配置、安全和策略)
  • Kubernetes(K8s)常用命令全解析:从基础到进阶
  • kubeadm方式部署k8s集群
  • 备考国央企-算法笔记-01链表
  • HakcMyVM-Friendly
  • MongoDB Windows 系统实战手册:从配置到数据处理入门
  • Esp32基础(③旋转编码器)
  • 用一个label控件随便显示一些字(用矢量字库),然后用anim动画动态设置lable位置
  • 上海1KM人口热力数据分享
  • 音频分类模型笔记
  • rust 从入门到精通之变量和常量
  • 杂记 04
  • 脑潜在进展:基于潜扩散模型的三维脑磁共振成像个体时空疾病进展研究|文献速递-深度学习人工智能医疗图像
  • python的课外学习生活活动系统
  • 视觉语言导航(13)——AIR-VLN 4.3
  • Mysql核心框架知识
  • 学习雪花算法
  • 冒泡排序——简单理解和使用
  • NVIDIA 技术沙龙探秘:聚焦 Physical AI 专场前沿技术
  • Handler以及AsyncTask知识点详解
  • 数据结构部分算法记录
  • Oracle维护指南