Spark源码中的线程池
1. newFixedThreadPool
(定长线程池)
Spark 源码位置:org.apache.spark.util.ThreadUtils
工具类创建,广泛应用于调度、通信等模块。
核心设计意图:处理稳定、持续、且需要严格控制并发度的核心任务。
具体实现案例:
案例1:Driver 端的调度通信线程池
// 源码位置:CoarseGrainedSchedulerBackend.scala
private var driverEndpoint: RpcEndpointRef = _// 创建固定大小的线程池处理驱动器消息
override protected def createDriverEndpoint(): DriverEndpoint = {new DriverEndpoint(rpcEnv, "driver", scheduler)
}// 在DriverEndpoint中,使用固定线程池处理任务状态更新等核心消息
private val threadPool = ThreadUtils.newDaemonFixedThreadPool(numCores, "driver-message-dispatcher") // 线程数通常与CPU核心数相关
为什么用 FixedThreadPool?
稳定性:与集群管理器(如 YARN ResourceManager)的通信、任务状态更新是 Spark 的生命线,需要稳定可靠的线程保障。
资源可控:避免创建过多线程导致 Driver 端内存溢出,因为 Driver 通常部署在资源受限的节点上。
任务特性:这些消息处理任务属于持续型任务,与应用程序生命周期一致。
2. newSingleThreadExecutor
(单线程化线程池)
核心设计意图:保证任务执行的严格顺序性,避免并发导致的竞态条件。
具体实现案例:
案例1:LiveListenerBus
事件总线(核心中的核心)
// 源码位置:LiveListenerBus.scala
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))// 创建单线程处理所有事件
private lazy val listenerThread = new Thread(s"SparkListenerBus-${name}") {override def run(): Unit = {while (!stopped.get()) {try {val event = eventQueue.take() // 阻塞获取事件postToAll(event) // 顺序处理事件} catch {case _: InterruptedException =>}}}
}
为什么必须用单线程?
如果事件处理乱序,比如变成 3 -> 1 -> 2,会发生什么?
这在 UI 上会给用户造成极大的困惑,而且会使基于事件流的监控系统(如指标计算、日志分析)计算出错误的结果(例如,认为一个已完成Stage中的任务还在运行)。
因此,保证事件被处理的顺序与事件产生的顺序完全一致,是维护整个系统状态一致性的基石。
事件顺序性:Spark 事件(如
TaskStart
→TaskEnd
)有严格的因果关系。如果并发处理,Web UI 可能先显示任务完成再显示任务开始,导致状态混乱。状态一致性:监听器(Listener)可能维护内部状态,并发访问需要额外同步,单线程从根本上避免这个问题。
这里说下顺序性,以及单线程化线程是否有可能带来阻塞?
看一个具体的例子。假设有三个事件接连发生:SparkListenerTaskStart
(任务A开始)SparkListenerTaskEnd
(任务A结束)SparkListenerStageCompleted
(任务A所属的Stage完成)Web UI 会先显示 “Stage 已完成”
然后显示 “任务A 正在运行”
最后才显示 “任务A 已结束”
阻塞问题?
阻塞问题确实存在,但 Spark 通过出色的异步化和生产-消费者模型将其影响降到了最低。关键点在于:
生产快,消费慢:负责生成事件的线程(如任务线程、调度器线程)只需将事件对象放入队列(
eventQueue.put(event)
),这个操作非常快,几乎是瞬间完成的,然后它们就可以立即返回去执行核心的计算任务,不会被事件处理拖慢。阻塞被隔离:所谓的“阻塞”只发生在那个唯一的事件处理线程上。如果它处理得慢,那么事件会在队列中堆积,但不会影响前台执行任务的线程。这是一种有益的背压机制:当系统处理不过来时,通过队列积压来减缓事件产生的速度,而不是让系统崩溃。
为什么用链表实现的阻塞队列?
解耦:事件产生(如任务执行)和事件处理(如更新UI)完全分离。任务线程只需“扔”出事件,无需等待处理完成,极大提升核心计算任务的性能。
流量削峰:任务提交高峰时,可能瞬间产生大量事件。队列作为缓冲区,防止事件洪流冲垮处理系统。
顺序保证:
LinkedBlockingQueue
的 FIFO(先进先出)特性保证了事件被处理的顺序与产生顺序一致,对于日志、监控等场景至关重要。
“ArrayBlockingQueue和LinkedBlockingQueue区别”。
考量点 | 如果使用 | Spark 选择 |
---|---|---|
容量 | 必须预先指定固定容量。若容量太小,在事件高峰时生产者线程会因 | 默认无界,可应对任意瞬时流量高峰,避免阻塞生产者。同时可通过参数 |
吞吐量 | 生产者和消费者共用一把锁,高并发下锁竞争激烈,吞吐量较低。 | 双锁设计( |
内存 | 预先分配固定大小的连续数组,可能造成内存浪费。 | 链表结构,按需分配节点,内存使用更高效、灵活。 |
案例2:ContextCleaner
清理线程
// 源码位置:ContextCleaner.scala
private val cleaningThread = new Thread() { override def run(): Unit = {while (!stopped) {try {val reference = referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)if (reference != null) {logDebug(s"Cleaner cleaning $reference")reference.clear()}} catch {case _: InterruptedException =>}}}
}
设计意图:清理任务是后台低优先级任务,单线程顺序执行足够,且能避免复杂的并发控制。
3. newCachedThreadPool
(可缓存线程池)
核心设计意图:处理突发性、短生命周期、高吞吐量的异步任务。
具体实现案例:
案例1:TaskResultGetter
任务结果获取
// 源码位置:TaskResultGetter.scala
private val getTaskResultExecutor = {// 创建可缓存线程池,应对结果获取的突发流量ThreadUtils.newDaemonCachedThreadPool("task-result-getter", conf.get(MAX_TASK_RESULT_GETTER_THREADS))
}def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer): Unit = {// 将结果处理任务提交到缓存线程池getTaskResultExecutor.execute(new Runnable {override def run(): Unit = {try {handleSuccessfulTask(taskSetManager, tid, serializedData)} catch {case e: Exception => logError("Exception in task-result-getter", e)}}})
}
为什么用 CachedThreadPool?
突发性:任务可能在某个时刻集中完成,产生大量需要反序列化和处理的结果。
短时性:每个结果的处理时间较短,适合线程快速回收。
弹性需求:任务完成数量不确定,需要线程池能弹性伸缩。
风险控制:Spark 通过 spark.task.resultGetter.threads
参数限制最大线程数,防止无限创建线程导致 OOM。
4. newScheduledThreadPool
(支持延迟/周期任务线程池)
核心设计意图:执行延迟任务、周期性任务、超时控制等需要时间调度的功能。
具体实现案例:
案例1:ExecutorAllocationManager
动态资源分配
// 源码位置:ExecutorAllocationManager.scala
private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("executor-allocation-manager")// 定时调度资源分配策略(默认每秒执行一次)
executor.scheduleWithFixedDelay(new Runnable {override def run(): Unit = {try {// 1. 检查待处理任务数量// 2. 根据策略添加或移除Executorschedule()} catch {case _: InterruptedException =>}}
}, 0, intervalMillis, TimeUnit.MILLISECONDS)
案例2:心跳检测与超时控制
// 源码位置:多个组件中,如 HeartbeatReceiver.scala
private val heartbeatExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-sender")// 定期发送心跳
heartbeatExecutor.scheduleAtFixedRate(new Runnable {override def run(): Unit = {if (System.currentTimeMillis() - lastHeartbeatTime > timeout) {// 处理超时逻辑handleTimeout()} else {// 发送心跳sendHeartbeat()}}
}, 0, heartbeatInterval, TimeUnit.MILLISECONDS)
设计意图:
周期性:资源分配策略、心跳检测等需要定期执行。
延迟性:超时控制、重试机制等需要延迟执行。
总结:
线程池类型 | Spark 应用场景 | 设计哲学 | 关键配置参数 |
---|---|---|---|
FixedThreadPool | 核心调度、网络通信 | 稳定压倒一切 |
|
SingleThreadExecutor | 事件总线、资源清理 | 顺序性保证正确性 |
|
CachedThreadPool | 结果处理、临时任务 | 弹性应对突发流量 |
|
ScheduledThreadPool | 资源分配、心跳检测 | 时间驱动型任务 |
|