当前位置: 首页 > news >正文

Spark源码中的线程池


1. newFixedThreadPool(定长线程池)

​Spark 源码位置​​:org.apache.spark.util.ThreadUtils工具类创建,广泛应用于调度、通信等模块。

​核心设计意图​​:处理​​稳定、持续、且需要严格控制并发度​​的核心任务。

​具体实现案例​​:

​案例1:Driver 端的调度通信线程池​

// 源码位置:CoarseGrainedSchedulerBackend.scala
private var driverEndpoint: RpcEndpointRef = _// 创建固定大小的线程池处理驱动器消息
override protected def createDriverEndpoint(): DriverEndpoint = {new DriverEndpoint(rpcEnv, "driver", scheduler)
}// 在DriverEndpoint中,使用固定线程池处理任务状态更新等核心消息
private val threadPool = ThreadUtils.newDaemonFixedThreadPool(numCores, "driver-message-dispatcher")  // 线程数通常与CPU核心数相关

​为什么用 FixedThreadPool?​

  • ​稳定性​​:与集群管理器(如 YARN ResourceManager)的通信、任务状态更新是 Spark 的生命线,需要稳定可靠的线程保障。

  • ​资源可控​​:避免创建过多线程导致 Driver 端内存溢出,因为 Driver 通常部署在资源受限的节点上。

  • ​任务特性​​:这些消息处理任务属于​​持续型任务​​,与应用程序生命周期一致。


2. newSingleThreadExecutor(单线程化线程池)

​核心设计意图​​:保证​​任务执行的严格顺序性​​,避免并发导致的竞态条件。

​具体实现案例​​:

​案例1:LiveListenerBus事件总线(核心中的核心)​

// 源码位置:LiveListenerBus.scala
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))// 创建单线程处理所有事件
private lazy val listenerThread = new Thread(s"SparkListenerBus-${name}") {override def run(): Unit = {while (!stopped.get()) {try {val event = eventQueue.take()  // 阻塞获取事件postToAll(event)               // 顺序处理事件} catch {case _: InterruptedException =>}}}
}

​为什么必须用单线程?​

​如果事件处理乱序,比如变成 3 -> 1 -> 2,会发生什么?​

这在 UI 上会给用户造成极大的困惑,而且会使基于事件流的监控系统(如指标计算、日志分析)计算出错误的结果(例如,认为一个已完成Stage中的任务还在运行)。

​因此,保证事件被处理的顺序与事件产生的顺序完全一致,是维护整个系统状态一致性的基石。​

  • ​事件顺序性​​:Spark 事件(如 TaskStartTaskEnd)有严格的因果关系。如果并发处理,Web UI 可能先显示任务完成再显示任务开始,导致状态混乱。

  • 状态一致性​​:监听器(Listener)可能维护内部状态,并发访问需要额外同步,单线程从根本上避免这个问题。

    这里说下顺序性,以及单线程化线程是否有可能带来阻塞?

    看一个具体的例子。假设有三个事件接连发生:
  • SparkListenerTaskStart(任务A开始)

  • SparkListenerTaskEnd(任务A结束)

  • SparkListenerStageCompleted(任务A所属的Stage完成)

  • Web UI 会先显示 “Stage 已完成”

  • 然后显示 “任务A 正在运行”

  • 最后才显示 “任务A 已结束”

        阻塞问题?

阻塞问题确实存在,但 Spark 通过出色的​​异步化和生产-消费者模型​​将其影响降到了最低。关键点在于:

  • ​生产快,消费慢​​:负责生成事件的线程(如任务线程、调度器线程)只需将事件对象放入队列(eventQueue.put(event)),这个操作非常快,几乎是瞬间完成的,然后它们就可以立即返回去执行核心的计算任务,​​不会被事件处理拖慢​​。

  • ​阻塞被隔离​​:所谓的“阻塞”只发生在那个​​唯一的事件处理线程​​上。如果它处理得慢,那么事件会在队列中堆积,但不会影响前台执行任务的线程。这是一种​​有益的背压机制​​:当系统处理不过来时,通过队列积压来减缓事件产生的速度,而不是让系统崩溃。

为什么用链表实现的阻塞队列?

  • ​解耦​​:事件产生(如任务执行)和事件处理(如更新UI)完全分离。任务线程只需“扔”出事件,无需等待处理完成,极大提升核心计算任务的性能。

  • ​流量削峰​​:任务提交高峰时,可能瞬间产生大量事件。队列作为缓冲区,防止事件洪流冲垮处理系统。

  • ​顺序保证​​:LinkedBlockingQueue的 FIFO(先进先出)特性保证了事件被处理的顺序与产生顺序一致,对于日志、监控等场景至关重要。

 ​​“ArrayBlockingQueue和LinkedBlockingQueue区别”​​。

考量点

如果使用 ArrayBlockingQueue

Spark 选择 LinkedBlockingQueue的原因

​容量​

必须预先指定固定容量。若容量太小,在事件高峰时生产者线程会因put操作而阻塞,影响核心任务。

​默认无界​​,可应对任意瞬时流量高峰,避免阻塞生产者。同时可通过参数spark.scheduler.listenerbus.eventqueue.size设置容量上限,防止内存溢出。

​吞吐量​

生产者和消费者共用一把锁,高并发下锁竞争激烈,吞吐量较低。

​双锁设计​​(putLock和 takeLock),生产者和消费者的操作基本不会相互阻塞,在高并发的事件发布场景下吞吐量更高。

​内存​

预先分配固定大小的连续数组,可能造成内存浪费。

​链表结构​​,按需分配节点,内存使用更高效、灵活。

​案例2:ContextCleaner清理线程​

// 源码位置:ContextCleaner.scala
private val cleaningThread = new Thread() { override def run(): Unit = {while (!stopped) {try {val reference = referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)if (reference != null) {logDebug(s"Cleaner cleaning $reference")reference.clear()}} catch {case _: InterruptedException =>}}}
}

​设计意图​​:清理任务是​​后台低优先级任务​​,单线程顺序执行足够,且能避免复杂的并发控制。


3. newCachedThreadPool(可缓存线程池)

​核心设计意图​​:处理​​突发性、短生命周期、高吞吐量​​的异步任务。

​具体实现案例​​:

​案例1:TaskResultGetter任务结果获取​

// 源码位置:TaskResultGetter.scala
private val getTaskResultExecutor = {// 创建可缓存线程池,应对结果获取的突发流量ThreadUtils.newDaemonCachedThreadPool("task-result-getter", conf.get(MAX_TASK_RESULT_GETTER_THREADS))
}def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer): Unit = {// 将结果处理任务提交到缓存线程池getTaskResultExecutor.execute(new Runnable {override def run(): Unit = {try {handleSuccessfulTask(taskSetManager, tid, serializedData)} catch {case e: Exception => logError("Exception in task-result-getter", e)}}})
}

​为什么用 CachedThreadPool?​

  • ​突发性​​:任务可能在某个时刻集中完成,产生大量需要反序列化和处理的结果。

  • ​短时性​​:每个结果的处理时间较短,适合线程快速回收。

  • ​弹性需求​​:任务完成数量不确定,需要线程池能弹性伸缩。

​风险控制​​:Spark 通过 spark.task.resultGetter.threads参数限制最大线程数,防止无限创建线程导致 OOM。


4. newScheduledThreadPool(支持延迟/周期任务线程池)

​核心设计意图​​:执行​​延迟任务、周期性任务、超时控制​​等需要时间调度的功能。

​具体实现案例​​:

​案例1:ExecutorAllocationManager动态资源分配​

// 源码位置:ExecutorAllocationManager.scala
private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-allocation-manager")// 定时调度资源分配策略(默认每秒执行一次)
executor.scheduleWithFixedDelay(new Runnable {override def run(): Unit = {try {// 1. 检查待处理任务数量// 2. 根据策略添加或移除Executorschedule()} catch {case _: InterruptedException =>}}
}, 0, intervalMillis, TimeUnit.MILLISECONDS)

​案例2:心跳检测与超时控制​

// 源码位置:多个组件中,如 HeartbeatReceiver.scala
private val heartbeatExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-sender")// 定期发送心跳
heartbeatExecutor.scheduleAtFixedRate(new Runnable {override def run(): Unit = {if (System.currentTimeMillis() - lastHeartbeatTime > timeout) {// 处理超时逻辑handleTimeout()} else {// 发送心跳sendHeartbeat()}}
}, 0, heartbeatInterval, TimeUnit.MILLISECONDS)

​设计意图​​:

  • ​周期性​​:资源分配策略、心跳检测等需要定期执行。

  • ​延迟性​​:超时控制、重试机制等需要延迟执行。


总结:

线程池类型

Spark 应用场景

设计哲学

关键配置参数

​FixedThreadPool​

核心调度、网络通信

​稳定压倒一切​

spark.rpc.io.serverThreads

​SingleThreadExecutor​

事件总线、资源清理

​顺序性保证正确性​

spark.scheduler.listenerbus.eventqueue.size

​CachedThreadPool​

结果处理、临时任务

​弹性应对突发流量​

spark.task.resultGetter.threads

​ScheduledThreadPool​

资源分配、心跳检测

​时间驱动型任务​

spark.dynamicAllocation.schedulerInterval

http://www.dtcms.com/a/418302.html

相关文章:

  • Kafka06-进阶-尚硅谷
  • TDengine 时序函数 IRATE 用户手册
  • 网站模板源码下载广告网站建设
  • 一键部署 Spring Boot 到远程 Docker 容器
  • Docker 入门:容器化开发的强大工具
  • iOS 26 全景揭秘,新界面、功能创新、兼容挑战与各种工具在新版系统中的定位
  • 北京交易中心网站电商网站建设需要
  • 【ansible/K8s】K8s的自动化部署源码分享
  • C++STL之list
  • CentOS 7安装部署RabbitMQ
  • 本地怎么远程调试服务器
  • AndroidID重置功能开发
  • 【Byte 类型】编程基石:揭开 `Byte`(字节)的神秘面纱
  • 天津做网站哪家服务好北京正邦品牌设计公司
  • 外贸搜素网站android studio开发app实例
  • 5. Prompt 提示词
  • android 自定义样式 Toast 实现(兼容 Android 4.1+~Android 16(API 16))
  • android SharedPreferences 工具类 * 兼容 Android 16+ (API 16)
  • 宁波易通建设网站网站备案信息代码
  • 阿里云OpenLake及行业解决方案年度发布,助力千行百业Data+AI一体化融合
  • 独立站收款方式有哪些
  • 2025 年 Python 数据分析全栈学习路线:从入门到精通的进阶指南
  • 行业类网站应如何建设网站怎么建设以及维护
  • Go 和云原生 的现状和发展前景
  • C# 中Byte类型转化问题
  • 紫外UV相机在机器视觉检测方向的应用
  • 一款国产开源免费的项目管理工具 - Kanass,超级轻量、简洁
  • 自己做的网站百度搜到新增接入 新增网站
  • (七——下)复习(分布式链路追踪/Rabiit MQ使用/Api Gateway)
  • 前端八股文 Vue上