当前位置: 首页 > news >正文

Spark 运行流程核心组件(一)作业提交

1、Job启动流程

在这里插入图片描述

1、Client触发 SparkContext 初始化

2、SparkContextMaster 注册应用

3、Master 调度 Worker 启动 Executor

4、Worker 进程启动 Executor

5、DAGScheduler 将作业分解为 Stage

6、TaskScheduler 分配 TaskExecutor

2、核心组件

组件职责
SparkContext应用入口,协调各组件,管理应用生命周期。
DAGScheduler将 Job 拆分为 Stage,构建 DAG,提交 TaskSet 给 TaskScheduler。
TaskScheduler调度 Task 到 Executor,处理故障重试。
CoarseGrainedSchedulerBackend与集群管理器交互,申请资源,管理 Executor。
ExternalClusterManager抽象层,适配不同集群(Standalone/YARN/Mesos)。
Master & WorkerStandalone 模式下管理集群资源(Master 分配资源,Worker 启动 Executor)。
Executor在 Worker 上运行,执行 Task,管理内存/磁盘。
CoarseGrainedExecutorBackendExecutor 的通信代理,接收 Task,返回状态/结果。
Task计算单元(ShuffleMapTask / ResultTask)。
ShuffleManager管理 Shuffle 数据读写(如 SortShuffleManager)。

3、工作流程

1、SparkContext

负责资源申请、任务提交、与集群管理器通信。

调用runJob方法,将RDD操作传递给DAGScheduler

2、DAGScheduler

将Job拆分为Stage(DAG),处理Shuffle依赖,提交TaskSet给TaskScheduler。

1、DAGSchedulerEvent

/* 作业生命周期事件 */
JobSubmitted //新作业提交时触发
JobCancelled //单个作业被取消
JobGroupCancelled //作业组整体取消
JobTagCancelled //按标签批量取消作业
AllJobsCancelled //取消所有运行中的作业/* 阶段执行事件 */
MapStageSubmitted //Shuffle Map阶段提交
StageCancelled //单个阶段取消
StageFailed //阶段执行失败
ResubmitFailedStages //自动重试失败阶段 ,默认4次/* 任务调度事件 */
TaskSetFailed //整个任务集失败,默认4次
SpeculativeTaskSubmitted //启动推测执行任务
UnschedulableTaskSetAdded //任务集进入待调度队列
UnschedulableTaskSetRemoved //任务集离开待调度队列/* Shuffle 优化事件 */
RegisterMergeStatuses //注册Shuffle合并状态
ShuffleMergeFinalized //Shuffle合并完成
ShufflePushCompleted //Shuffle数据推送完成/* 资源管理事件 */
ExecutorAdded //新Executor注册成功
ExecutorLost //Executor异常丢失
WorkerRemoved //工作节点移除/* 执行过程事件 */
BeginEvent //任务集开始执行 
GettingResultEvent //驱动程序主动获取任务结果
CompletionEvent //作业/阶段完成

2、stage拆分流程

*ResultStage (执行作的最后一个阶段)、ShuffleMapStage (shuffle映射输出文件)*

  1. 用户行动操作触发submitJob,发送JobSubmitted事件。
  2. handleJobSubmitted处理事件,调用createResultStage创建ResultStage。
  3. createResultStage调用getOrCreateParentStages获取父Stage,父Stage的创建会递归进行。
  4. 在创建父Stage的过程中,遇到宽依赖则创建ShuffleMapStage,并递归创建其父Stage。
  5. 当所有父Stage都创建完成后,回到handleJobSubmitted,调用submitStage提交ResultStage。
  6. submitStage检查父Stage是否完成,如果有未完成的父Stage,则递归提交父Stage;否则,提交当前Stage(调用submitMissingTasks)。
  7. submitMissingTasks为Stage创建任务(ShuffleMapTask或ResultTask),并提交给TaskScheduler执行。

3、宽窄依赖切分

private def stageDependsOn(stage: Stage, target: Stage): Boolean = {if (stage == target) {return true}// DFS遍历RDD依赖树val visitedRdds = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new ListBuffer[RDD[_]]waitingForVisit += stage.rdddef visit(rdd: RDD[_]): Unit = {if (!visitedRdds(rdd)) {visitedRdds += rddfor (dep <- rdd.dependencies) {dep match {// 宽依赖:创建新的ShuffleMapStagecase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {waitingForVisit.prepend(mapStage.rdd)}  // Otherwise there's no need to follow the dependency back// 窄依赖:继续回溯case narrowDep: NarrowDependency[_] =>waitingForVisit.prepend(narrowDep.rdd)}}}}while (waitingForVisit.nonEmpty) {visit(waitingForVisit.remove(0))}visitedRdds.contains(target.rdd)}

3、TaskScheduler

接收TaskSet,按调度策略(FIFO/FAIR)将Task分配给Executor。

1、执行流程

1、DAGScheduler 调用 taskScheduler.submitTasks() 后,任务进入 TaskScheduler 调度阶段

2、任务提交submitTasks

// TaskSetManager管理任务集
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 添加到调度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 触发资源分配
backend.reviveOffers()

3、资源分配 (Driver)

// CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {driverEndpoint.send(ReviveOffers)  // 向DriverEndpoint发送消息
}// DriverEndpoint处理
case ReviveOffers =>makeOffers()  // 触发资源分配

4、资源分配核心

private def makeOffers(): Unit = {// Make sure no executor is killed while some task is launching on itval taskDescs = withLock {// 1. 获取所有可用Executor资源val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }val workOffers = activeExecutors.map {case (id, executorData) => buildWorkerOffer(id, executorData)}.toIndexedSeq// 2. 调用任务调度器分配任务scheduler.resourceOffers(workOffers, true)}// 3. 启动分配的任务if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}

5、任务启动

// CoarseGrainedSchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {// 1. 序列化任务val serializedTask = TaskDescription.encode(task)// 2. 检查任务大小if (serializedTask.limit() >= maxRpcMessageSize) {Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)// Do resources allocation here. The allocated resources will get released after the task// finishes.executorData.freeCores -= task.cpustask.resources.foreach { case (rName, addressAmounts) =>executorData.resourcesInfo(rName).acquire(addressAmounts)}logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")// 发送任务到ExecutorexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}
http://www.dtcms.com/a/327590.html

相关文章:

  • 干货分享|如何从0到1掌握R语言数据分析
  • 小红书笔记信息获取_实在智能RPA源码解读
  • 邦纳BANNER相机视觉加镜头PresencePLUSP4 RICOH FL-CC2514-2M工业相机
  • C++实现LINGO模型处理程序
  • Java结课案例-景点人数统计的几种场景
  • 日期格式化成英文月,必須指定語言環境
  • Secure CRT做代理转发
  • HTTP应用层协议-长连接
  • 记对外国某服务器的内网渗透
  • C++少儿编程(二十二)—条件结构
  • 机械臂运动规划与控制12讲
  • SQL 语言分类
  • 后端学习路线
  • 3D文档控件Aspose.3D实用教程:在 C# 中将 3MF 文件转换为 STL
  • 开疆智能Ethernet转ModbusTCP网关连接发那科机器人与三菱PLC配置案例
  • Spring Boot部署万亿参数模型推理方案(深度解析)
  • css之再谈浮动定位float(深入理解篇)
  • 物联网、大数据与云计算持续发展,楼宇自控系统应用日益广泛
  • 黑马程序员mysql课程p65 安装linux版本的mysql遇到问题
  • [密码学实战]基于国密TLCP协议的Java服务端实现详解(四十四)
  • 【基于DesignStart的M3 SoC】
  • 4/5G中频段频谱全球使用现状概述(截止2025 年7月)
  • 【unity实战】在 Unity 中实现卡牌翻转或者翻书的效果
  • 现代化水库运行管理矩阵建设的要点
  • 学习笔记《区块链技术与应用》ETH 第二天 状态树
  • 解决 HTTP 请求 RequestBody 只能被读取一次的问题
  • 敏捷开发的关键点是什么?深入探索!
  • Windows server服务器上部署python项目域名访问(超详细教程)
  • Vue 3 + Elementui + TypeScript 实现左侧菜单定位右侧内容
  • 【实时Linux实战系列】实时智能监控与异常检测