当前位置: 首页 > news >正文

[Spark] 事件总线机制

事件总线,用于接收事件并提交到监听器中。Spark在应用启动的时候,会在SparkContext中激活LiveListenerBus。

SparkContext中,启动ListenerBus

private def setupAndStartListenerBus(): Unit = {registerListener(conf.get(EXTRA_LISTENERS))// Register internal listeners. If failed, don't stop the spark contextregisterListener(conf.get(INTERNAL_EXTRA_LISTENERS), stopIfFailed = false)registerListener(conf.get(APP_STATUS_EXTRA_LISTENERS),queueName = LiveListenerBus.APP_STATUS_QUEUE, stopIfFailed = false)listenerBus.start(this, _env.metricsSystem)_listenerBusStarted = true
}

ListenerBus

这是所有事件总线实现的基类,其中泛型参数L代表监听器,E代表事件,表示接受事件,并且将事件提交给对应监听器。 listenersPlusTimers维护了监听器和对应的定时器

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]// Marked `private[spark]` for access in tests.private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava

LiveListenerBus

LiveListenerBus作为一个事件总线,提供了监听器注册和事件投递等功能,这些都是在AsyncEventQueue基础之上实现的。
同时,维护了queues,这是一个AsyncEventQueue列表,里面是各种各样的AEQ,也就是说,这个LiveListenerBus中会有多个事件队列。采用了CopyOnWriteArrayList来保证线程安全性。

监听器注册

将监听器添加到特定队列中:

/*** Add a listener to a specific queue, creating a new queue if needed. Queues are independent* of each other (each one uses a separate thread for delivering events), allowing slower* listeners to be somewhat isolated from others.*/
private[spark] def addToQueue(listener: SparkListenerInterface,queue: String): Unit = synchronized {if (stopped.get()) {throw new IllegalStateException("LiveListenerBus is stopped.")}
# 这里会根据queue名字进行判断,如果是新的,则新建一个AEQ,否则,添加到原有的队列中queues.asScala.find(_.name == queue) match {case Some(queue) =>queue.addListener(listener)case None =>val newQueue = new AsyncEventQueue(queue, conf, metrics, this)newQueue.addListener(listener)if (started.get()) {# 这里启动了dispatchThreadnewQueue.start(sparkContext)}queues.add(newQueue)}
}

调用AEQ父类型ListenerBus的addListener方法,将监听器和定时器存到listenersPlusTimers中,

/*** Add a listener to listen events. This method is thread-safe and can be called in any thread.*/
final def addListener(listener: L): Unit = {listenersPlusTimers.add((listener, getTimer(listener)))
}
事件投递

queuedEvents维护一个SparkListenerEvent的列表,它的用途是在LiveListenerBus启动成功之前,缓存可能已经收到的事件。在启动之后,这些缓存的事件会首先投递出去。
在LiveListenerBus启动了之后,就可以正常投递了,postsToQueues(event)

/** Post an event to all queues. */
def post(event: SparkListenerEvent): Unit = {
# 总线停止之后,直接丢弃if (stopped.get()) {return}
# 将投递到总线上的事件数量metric+1metrics.numEventsPosted.inc()// If the event buffer is null, it means the bus has been started and we can avoid// synchronization and post events directly to the queues. This should be the most// common case during the life of the bus.# 这里可以理解为,queueEvents在start前是一个ListBuffer,临时缓存所有post进来的事件。# 当其为空,则代表总线已经启动了,可以看start方法中的实现if (queuedEvents == null) {postToQueues(event)return}// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread// calling start() picks up the new event.
# 到这里,使用synchronized保护对started的检查以及可以向queuedEvents中追加事件synchronized {if (!started.get()) {queuedEvents += eventreturn}}// If the bus was already started when the check above was made, just post directly to the// queues.postToQueues(event)
}
postToQueues过程是遍历queues,然后将事件发送到每个AEQ中
private def postToQueues(event: SparkListenerEvent): Unit = {val it = queues.iterator()while (it.hasNext()) {it.next().post(event)}
}

LiveListenerBus启动,start 方法中

  1. 幂等性检查
  2. 启动每个队列,然后对每个队列,将buffer中的事件进行补发。
  3. 将指标注册到指标系统里
  def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {if (!started.compareAndSet(false, true)) {throw new IllegalStateException("LiveListenerBus already started.")}this.sparkContext = scqueues.asScala.foreach { q =>q.start(sc)queuedEvents.foreach(q.post)}queuedEvents = nullmetricsSystem.registerSource(metrics)}

AEQ的start方法。设置sparkcontext,并且启动dispatch线程,开始消费队列中的事件并分发给监听器

/*** Start an asynchronous thread to dispatch events to the underlying listeners.** @param sc Used to stop the SparkContext in case the async dispatcher fails.*/
private[scheduler] def start(sc: SparkContext): Unit = {if (started.compareAndSet(false, true)) {this.sc = scdispatchThread.start()} else {throw new IllegalStateException(s"$name already started!")}
}===== private val dispatchThread = new Thread(s"spark-listener-group-$name") {# 守护线程setDaemon(true)# 异常处理,这里通过Utils.tryOrStopSparkContext包装,异常的时候停止sparkContextoverride def run(): Unit = Utils.tryOrStopSparkContext(sc) {dispatch()}}

AEQ(AsyncEventQueue)

实现了SparkListenerBus,是SparkContext中事件总线LiveListenerBus的基础。
其原理就是基于消息队列的异步通信,因此有以下优点:1、将Event发送者和Event listerner解耦。2、异步:Event发送者发送Event给消息队列后直接返回,无需等待listener处理后才返回,减少了Event发送者的阻塞,提高了性能。其构造函数有4个:队列名、spark配置、LiveListenerBusMetrics以及LiveListenerBus

private class AsyncEventQueue(val name: String,conf: SparkConf,metrics: LiveListenerBusMetrics,bus: LiveListenerBus)extends SparkListenerBuswith Logging {

eventQueue是一个LinkedBlockingQueue,用来存放SparkListenerEvent事件。
droppedEventsCounter用来存放当前已经丢弃了多少个事件,offer(event)失败的时候,会增加1

AEQ.post
这里就是将event放到LinkedBlockingQueue中,排队等待处理
到这里,就可以总结出事件是如何传递的了:LiveListenerBus.post → postToQueues → q.post → queue.offer

private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity)def post(event: SparkListenerEvent): Unit = {if (stopped.get()) {return}eventCount.incrementAndGet()if (eventQueue.offer(event)) {return}

dispatch,核心的消费循环,阻塞take,直到拿到POISION_PILL

private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {var next: SparkListenerEvent = eventQueue.take()while (next != POISON_PILL) {val ctx = processingTime.time()try {super.postToAll(next)} finally {ctx.stop()}eventCount.decrementAndGet()next = eventQueue.take()}eventCount.decrementAndGet()
}

拿到event后,dispatch里调用ListenerBus的postToAll方法,遍历当前队列中所有的监听器

  /*** Post the event to all registered listeners. The `postToAll` caller should guarantee calling* `postToAll` in the same thread for all events.*/def postToAll(event: E): Unit = {// JavaConverters can create a JIterableWrapper if we use asScala.// However, this method will be called frequently. To avoid the wrapper cost, here we use// Java Iterator directly.val iter = listenersPlusTimers.iteratorwhile (iter.hasNext) {val listenerAndMaybeTimer = iter.next()val listener = listenerAndMaybeTimer._1val maybeTimer = listenerAndMaybeTimer._2val maybeTimerContext = if (maybeTimer.isDefined) {maybeTimer.get.time()} else {null}lazy val listenerName = Utils.getFormattedClassName(listener)try {doPostEvent(listener, event)if (Thread.interrupted()) {// We want to throw the InterruptedException right away so we can associate the interrupt// with this listener, as opposed to waiting for a queue.take() etc. to detect it.throw new InterruptedException()}} catch {case ie: InterruptedException =>logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie)removeListenerOnError(listener)case NonFatal(e) if !isIgnorableException(e) =>logError(s"Listener ${listenerName} threw an exception", e)} finally {if (maybeTimerContext != null) {val elapsed = maybeTimerContext.stop()if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +s"${elapsed / 1000000000d}s.")}}}}}

SparkListenerBus

这是Spark Core内部事件总线的基类,继承了ListenerBus,实现了doPostEvent方法,对事件进行匹配,然后调用对应的监听方法。

private[spark] trait SparkListenerBusextends ListenerBus[SparkListenerInterface, SparkListenerEvent] {protected override def doPostEvent(listener: SparkListenerInterface,event: SparkListenerEvent): Unit = {event match {case stageSubmitted: SparkListenerStageSubmitted =>listener.onStageSubmitted(stageSubmitted)case stageCompleted: SparkListenerStageCompleted =>listener.onStageCompleted(stageCompleted)case jobStart: SparkListenerJobStart =>listener.onJobStart(jobStart)case jobEnd: SparkListenerJobEnd =>listener.onJobEnd(jobEnd)case taskStart: SparkListenerTaskStart =>listener.onTaskStart(taskStart)case taskGettingResult: SparkListenerTaskGettingResult =>listener.onTaskGettingResult(taskGettingResult)case taskEnd: SparkListenerTaskEnd =>listener.onTaskEnd(taskEnd)case environmentUpdate: SparkListenerEnvironmentUpdate =>

这里可以看到,监听器是SparkListenerInterface的子类,而事件都是SparkListenerEvent的子类
在SparkListenerInterface定义了所有事件的处理方法,在SparkListenerEvent中定义事件:“SparkListener+事件名称”

http://www.dtcms.com/a/482035.html

相关文章:

  • 长春建站公众号wordpress4.7中文主题
  • 6.string的模拟实现(三)
  • AQS 为什么采用抽象类(abstract class)而不是接口(interface)实现?
  • stable-diffusion-webui / stable-diffusion-webui-forge部署
  • 阿里云和聚名网的域名注册安全性如何?
  • 别让链表兜圈子——力扣141.环形链表
  • 济南网站推广公司做二手网站的用意
  • 专业的汽车媒体发稿怎么选
  • 事务消息(Transactional Message)
  • 北京网站开发周期专业的传媒行业网站开发
  • 高频使用RocksDB DeleteRange引起的问题及优化
  • for是什么意思?从foreign、forest谈起
  • 网站开发设计工程师网上注册公司申请入口
  • ARM 总线技术 —— AHB
  • .NET 程序自动更新的回忆总结
  • 自然语言处理笔记
  • 通州网站建设如何做信用网站截图
  • 网站空间控制网络服务采购
  • 方法合集——第七章
  • 定制衣柜厂柔性生产:客户需求拆解、板材切割与组装工序协同路径
  • 厦门外贸网站建设 之家wordpress菜单与顶部互换
  • openrewrite 的rewrite.yml 编写注意事项
  • 系统架构的平衡之道
  • 考研10.2笔记
  • Linux:传输层协议
  • 北京做网站建设的公司有哪些优化网站哪个好
  • 搭建网站工具抚州公司做网站
  • RK3588 + 银河麒麟部署 swarm 集群指南-续(自己应用程序部署)
  • 为什么我选择用 Rust 构建全栈后台管理系统?
  • 一篇文章讲清 UPD协议 与 TCP协议