[Spark] 事件总线机制
事件总线,用于接收事件并提交到监听器中。Spark在应用启动的时候,会在SparkContext中激活LiveListenerBus。
SparkContext中,启动ListenerBus
private def setupAndStartListenerBus(): Unit = {registerListener(conf.get(EXTRA_LISTENERS))// Register internal listeners. If failed, don't stop the spark contextregisterListener(conf.get(INTERNAL_EXTRA_LISTENERS), stopIfFailed = false)registerListener(conf.get(APP_STATUS_EXTRA_LISTENERS),queueName = LiveListenerBus.APP_STATUS_QUEUE, stopIfFailed = false)listenerBus.start(this, _env.metricsSystem)_listenerBusStarted = true
}
ListenerBus
这是所有事件总线实现的基类,其中泛型参数L代表监听器,E代表事件,表示接受事件,并且将事件提交给对应监听器。 listenersPlusTimers维护了监听器和对应的定时器
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]// Marked `private[spark]` for access in tests.private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
LiveListenerBus
LiveListenerBus作为一个事件总线,提供了监听器注册和事件投递等功能,这些都是在AsyncEventQueue基础之上实现的。
同时,维护了queues,这是一个AsyncEventQueue列表,里面是各种各样的AEQ,也就是说,这个LiveListenerBus中会有多个事件队列。采用了CopyOnWriteArrayList来保证线程安全性。
监听器注册
将监听器添加到特定队列中:
/*** Add a listener to a specific queue, creating a new queue if needed. Queues are independent* of each other (each one uses a separate thread for delivering events), allowing slower* listeners to be somewhat isolated from others.*/
private[spark] def addToQueue(listener: SparkListenerInterface,queue: String): Unit = synchronized {if (stopped.get()) {throw new IllegalStateException("LiveListenerBus is stopped.")}
# 这里会根据queue名字进行判断,如果是新的,则新建一个AEQ,否则,添加到原有的队列中queues.asScala.find(_.name == queue) match {case Some(queue) =>queue.addListener(listener)case None =>val newQueue = new AsyncEventQueue(queue, conf, metrics, this)newQueue.addListener(listener)if (started.get()) {# 这里启动了dispatchThreadnewQueue.start(sparkContext)}queues.add(newQueue)}
}
调用AEQ父类型ListenerBus的addListener方法,将监听器和定时器存到listenersPlusTimers中,
/*** Add a listener to listen events. This method is thread-safe and can be called in any thread.*/
final def addListener(listener: L): Unit = {listenersPlusTimers.add((listener, getTimer(listener)))
}
事件投递
queuedEvents维护一个SparkListenerEvent的列表,它的用途是在LiveListenerBus启动成功之前,缓存可能已经收到的事件。在启动之后,这些缓存的事件会首先投递出去。
在LiveListenerBus启动了之后,就可以正常投递了,postsToQueues(event)
/** Post an event to all queues. */
def post(event: SparkListenerEvent): Unit = {
# 总线停止之后,直接丢弃if (stopped.get()) {return}
# 将投递到总线上的事件数量metric+1metrics.numEventsPosted.inc()// If the event buffer is null, it means the bus has been started and we can avoid// synchronization and post events directly to the queues. This should be the most// common case during the life of the bus.# 这里可以理解为,queueEvents在start前是一个ListBuffer,临时缓存所有post进来的事件。# 当其为空,则代表总线已经启动了,可以看start方法中的实现if (queuedEvents == null) {postToQueues(event)return}// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread// calling start() picks up the new event.
# 到这里,使用synchronized保护对started的检查以及可以向queuedEvents中追加事件synchronized {if (!started.get()) {queuedEvents += eventreturn}}// If the bus was already started when the check above was made, just post directly to the// queues.postToQueues(event)
}
postToQueues过程是遍历queues,然后将事件发送到每个AEQ中
private def postToQueues(event: SparkListenerEvent): Unit = {val it = queues.iterator()while (it.hasNext()) {it.next().post(event)}
}
LiveListenerBus启动,start 方法中
- 幂等性检查
- 启动每个队列,然后对每个队列,将buffer中的事件进行补发。
- 将指标注册到指标系统里
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {if (!started.compareAndSet(false, true)) {throw new IllegalStateException("LiveListenerBus already started.")}this.sparkContext = scqueues.asScala.foreach { q =>q.start(sc)queuedEvents.foreach(q.post)}queuedEvents = nullmetricsSystem.registerSource(metrics)}
AEQ的start方法。设置sparkcontext,并且启动dispatch线程,开始消费队列中的事件并分发给监听器
/*** Start an asynchronous thread to dispatch events to the underlying listeners.** @param sc Used to stop the SparkContext in case the async dispatcher fails.*/
private[scheduler] def start(sc: SparkContext): Unit = {if (started.compareAndSet(false, true)) {this.sc = scdispatchThread.start()} else {throw new IllegalStateException(s"$name already started!")}
}===== private val dispatchThread = new Thread(s"spark-listener-group-$name") {# 守护线程setDaemon(true)# 异常处理,这里通过Utils.tryOrStopSparkContext包装,异常的时候停止sparkContextoverride def run(): Unit = Utils.tryOrStopSparkContext(sc) {dispatch()}}
AEQ(AsyncEventQueue)
实现了SparkListenerBus,是SparkContext中事件总线LiveListenerBus的基础。
其原理就是基于消息队列的异步通信,因此有以下优点:1、将Event发送者和Event listerner解耦。2、异步:Event发送者发送Event给消息队列后直接返回,无需等待listener处理后才返回,减少了Event发送者的阻塞,提高了性能。其构造函数有4个:队列名、spark配置、LiveListenerBusMetrics以及LiveListenerBus
private class AsyncEventQueue(val name: String,conf: SparkConf,metrics: LiveListenerBusMetrics,bus: LiveListenerBus)extends SparkListenerBuswith Logging {
eventQueue是一个LinkedBlockingQueue,用来存放SparkListenerEvent事件。
droppedEventsCounter用来存放当前已经丢弃了多少个事件,offer(event)失败的时候,会增加1
AEQ.post
这里就是将event放到LinkedBlockingQueue中,排队等待处理
到这里,就可以总结出事件是如何传递的了:LiveListenerBus.post → postToQueues → q.post → queue.offer
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity)def post(event: SparkListenerEvent): Unit = {if (stopped.get()) {return}eventCount.incrementAndGet()if (eventQueue.offer(event)) {return}
dispatch,核心的消费循环,阻塞take,直到拿到POISION_PILL
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {var next: SparkListenerEvent = eventQueue.take()while (next != POISON_PILL) {val ctx = processingTime.time()try {super.postToAll(next)} finally {ctx.stop()}eventCount.decrementAndGet()next = eventQueue.take()}eventCount.decrementAndGet()
}
拿到event后,dispatch里调用ListenerBus的postToAll方法,遍历当前队列中所有的监听器
/*** Post the event to all registered listeners. The `postToAll` caller should guarantee calling* `postToAll` in the same thread for all events.*/def postToAll(event: E): Unit = {// JavaConverters can create a JIterableWrapper if we use asScala.// However, this method will be called frequently. To avoid the wrapper cost, here we use// Java Iterator directly.val iter = listenersPlusTimers.iteratorwhile (iter.hasNext) {val listenerAndMaybeTimer = iter.next()val listener = listenerAndMaybeTimer._1val maybeTimer = listenerAndMaybeTimer._2val maybeTimerContext = if (maybeTimer.isDefined) {maybeTimer.get.time()} else {null}lazy val listenerName = Utils.getFormattedClassName(listener)try {doPostEvent(listener, event)if (Thread.interrupted()) {// We want to throw the InterruptedException right away so we can associate the interrupt// with this listener, as opposed to waiting for a queue.take() etc. to detect it.throw new InterruptedException()}} catch {case ie: InterruptedException =>logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie)removeListenerOnError(listener)case NonFatal(e) if !isIgnorableException(e) =>logError(s"Listener ${listenerName} threw an exception", e)} finally {if (maybeTimerContext != null) {val elapsed = maybeTimerContext.stop()if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +s"${elapsed / 1000000000d}s.")}}}}}
SparkListenerBus
这是Spark Core内部事件总线的基类,继承了ListenerBus,实现了doPostEvent方法,对事件进行匹配,然后调用对应的监听方法。
private[spark] trait SparkListenerBusextends ListenerBus[SparkListenerInterface, SparkListenerEvent] {protected override def doPostEvent(listener: SparkListenerInterface,event: SparkListenerEvent): Unit = {event match {case stageSubmitted: SparkListenerStageSubmitted =>listener.onStageSubmitted(stageSubmitted)case stageCompleted: SparkListenerStageCompleted =>listener.onStageCompleted(stageCompleted)case jobStart: SparkListenerJobStart =>listener.onJobStart(jobStart)case jobEnd: SparkListenerJobEnd =>listener.onJobEnd(jobEnd)case taskStart: SparkListenerTaskStart =>listener.onTaskStart(taskStart)case taskGettingResult: SparkListenerTaskGettingResult =>listener.onTaskGettingResult(taskGettingResult)case taskEnd: SparkListenerTaskEnd =>listener.onTaskEnd(taskEnd)case environmentUpdate: SparkListenerEnvironmentUpdate =>
这里可以看到,监听器是SparkListenerInterface的子类,而事件都是SparkListenerEvent的子类
在SparkListenerInterface定义了所有事件的处理方法,在SparkListenerEvent中定义事件:“SparkListener+事件名称”