当前位置: 首页 > news >正文

Flink Source源码解析

Source

Source执行时由SourceOperatorCoordinator和SourceReader两部分组成,SourceOperatorCoordinator执行SourceSplitEnumerator逻辑,管理SourceSplit, 将SourceSplit分配给SourceReader;SourceReader则读取SourceSplit的具体内容, 传递给SourceOutput发送给下游。

Source接口抽象了工厂方法,可生产SplitEnumerator和SourceReader, 因此,自定义Source时主要关注实现自定义SourceReader和SourceEnumerator。

SourceReader与SourceEnumerator通信图:

img

OperatorCoordinator&SplitEnumerator

OperatorCoordinator是在JobMaster启动时创建的,包括SourceOperatorCoordinator, SourceOperatorCoordinator会调用Source接口的createEnumerator方法生产SourceSplitEnumerator。

在创建ExecutionJobVertex时,在initialize方法中会为每个ExecutionJobVertex创建OperatorCoordinatorHolder, OperatorCoordinatorHolder持有对应OperatorID的OperatorCoordinator, 其中SourceOperator对应的SourceCoordinator是通过SourceCoordinatorProvider::getCoordinator创建出来。

SourceCoordinator启动时会调用Source::createEnumerator创建SplitEnumerator并启动,流程如下:

JobMaster::onStart
JobMaster::startJobExecution
JobMaster::startScheduling
SchedulerBase::startScheduling
DefaultOperatorCoordinatorHandler::startAllOperatorCoordinators
DefaultOperatorCoordinatorHandler::startOperatorCoordinators
OperatorCoordinatorHolder::start
SourceCoordinator::start
Source::createEnumerator
SplitEnumerator::start

SourceCoordinator启动后就可以处理SourceReader的请求,比如请求Split分配事件RequestSplitEvent,SourceReader注册事件ReaderRegistrationEvent, Watermark报告ReportedWatermarkEvent

SourceReader

img

SourceReader是SourceOperator在启动时创建,SourceOperator和SourceCoordinatorProvider由SourceOperatorFactory创建。

SourceTask启动时加载SourceOperator,创建SourceReader, 向SourceOperatorCoordinator注册Reader后启动Reader, 向SourceCoordinator发送RequestSplitEvent事件。

SourceReader启动流程如下:

TaskManager.Task::run
TaskManager.Task::doRun
TaskManager.Task::restoreAndInvoke //调用TaskInvokable.invoke方法
StreamTask::invokeStreamTask::restoreInternalSourceOperatorStreamTask::init //创建StreamTaskSourceInputSourceOperator::initReaderSource::createReader  //创建SourceReaderStreamTask::restoreGatesOperatorChain::initializeStateAndOpenOperatorsSourceOperator::open SourceOperator::registerReader  //注册ReaderSourceReader::start //启动Reader, 可向Coordinator发送RequestSplitEvent请求Split
StreamTask::runMailboxLoop
MailboxProcessor::runMailboxLoop
StreamTask::processInput
StreamOneInputProcessor::processInput
StreamTaskSourceInput::emitNext 
SourceOperator::emitNextSourceOperator::emitNextNotReadingSourceOperator::initializeMainOutput //创建Source ReaderOutput
SourceReader::pollNext  //循环调用从Queue获取RecordsWithSplitIds数据传递给RecordEmitter发送出去
SourceReader::moveToNextSplit 
SplitContext::getOrCreateSplitOutput //为每个split创建SourceOutput
RecordEmitter::emitRecord
SourceOutput::collect //发送数据到下游
RequestSplitEvent

SourceReader启动后在start方法中向SourceCoordinator发送RequestSplitEvent事件请求Split, SourceCoordinator通过SplitEnumerator获取Split封装成AddSplitEvent返回给SourceReader

SourceCoordinator处理RequestSplitEvent流程:

DefaultOperatorCoordinatorHandler::deliverOperatorEventToCoordinator
OperatorCoordinatorHolder::handleEventFromOperator
SourceCoordinator::handleEventFromOperator
SourceCoordinator::handleRequestSplitEvent   //判断是否有可用split
SplitEnumerator::handleSplitRequest  
SplitEnumeratorContext::assignSplit  //返回split给SourceReader
SourceCoordinatorContext::assignSplits
SourceCoordinatorContext::assignSplitsToAttempts //封装AddSplitEvent消息发送给SourceReader

SingleThreadMultiplexSourceReaderBase

为方便自定义SourceReader, Flink提供了SingleThreadMultiplexSourceReaderBase/SingleThreadFetcherManager实现单线程单分片串行或多分片多路同时读取模型。 SourceReader处理AddSplitEvent事件时会创建一个线程执行SplitFetcher逻辑,将获取的split信息传递给SplitReader, 只要存在未完成的split, SplitFetcher线程就会不断调用SplitReader::fetch获取RecordsWithSplitIds,将RecordsWithSplitIds存入Queue中 给SourceReader主线程消费

AddSplitEvent事件处理流程:

SourceOperator::handleOperatorEvent
SourceOperator::handleAddSplitsEvent
SourceReaderBase::addSplits
SplitContext::new 创建SplitContext, 记录split状态信息
SingleThreadFetcherManager::addSplits  //添加split
SingleThreadFetcherManager::createSplitFetcher  //创建SplitFetcher线程并启动执行
SplitFetcher::new //Feacher控制中心,保存分配的split信息,以及从split读取的数据
FetchTask::new  //创建FetchTask, 从split读取数据,放入Queue
SplitFetcher::addSplits //封装成AddSplitsTask
AddSplitsTask::newSplitFetcher::run //优先执行AddSplitsTask,将split信息传递给SplitReader,其次调用FetchTask,只要当前的SplitFetcher还有分配的split未读取完成,会不断调用FetchTask::run方法读取数据
AddSplitsTask::run
SplitReader::handleSplitsChanges 
FetchTask::run 
SplitReader::fetch //获取RecordsWithSplitIds

img

自定义SourceReader时如果继承自SingleThreadMultiplexSourceReaderBase,则主要实现SplitReader逻辑,关注如何根据split信息从数据源获取记录。

Flink CDC中各种数据源SourceReader的实现就是继承自SingleThreadMultiplexSourceReaderBase。

Ref:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

https://zhuanlan.zhihu.com/p/454440159

http://www.dtcms.com/a/581177.html

相关文章:

  • 春招准备之MyBatis框架篇
  • 华为交换机上配置基于 IP 地址的 ACL
  • 【C++练习】31. C++计算最大公约数(GCD)
  • 从普通屏到 明基RD320U:一台显示器如何提升我的编码效率?
  • 从 ACID 到 MVCC,MySQL 事务与隔离级别超详解
  • 植物生理研究的精准量化:光合作用测定仪的应用与前景
  • Win电脑文字转语音,不限使用次数和字数!可将文字文本内容转换为朗读配音的音频文件!多功能语音合成,内置多语种、多角色语音配音模型,支持普通话标准发音和方言!
  • 网页设计与网站建设作业答案淘宝宝贝关键词排名查询工具
  • 2025年CSP-X复赛真题及题解(山东):T2IOI串
  • 基于网易CodeWave智能开发平台构建宝可梦图鉴
  • Ubuntu2204降内核版本
  • 数据在网络上的转发过程
  • 跨地域传文件太麻烦?Nginx+cpolar 让本地服务直接公网访问
  • ASP.NET MVC 数据验证进阶:用 IValidatableObject 实现自定义验证逻辑 引言:为什么需要 “自定义验证”?
  • 网站流量报表摄像头怎么做直播网站
  • XMOS与飞腾云联袂以模块化方案大幅加速音频产品落地
  • AI 下的 Agent 技术全览
  • 唐山免费网站制作wordpress企业cms开发
  • Windows 里用 Linux 不卡顿?WSL + cpolar让跨系统开发变简单
  • Java 全栈 Devs【应用】:用Spring Boot、MinIO 实现文件上传存储,结合 OnlyOffice 实现文件预览
  • 优化SEO表现的方法:有效利用关键词和长尾关键词的策略
  • 协同感知:未来智能系统的“神经中枢”与跨域融合引擎
  • 做淘宝客网站的流程4399网页版入口
  • 氛围编程走远,规格驱动开发降临
  • 硅基计划6.0 JavaEE 叁 文件IO
  • python+django/flask的篮球馆/足球场地/运动场地预约系统
  • 网站做零售node.js网站开发框架
  • AUTOSAR Adaptive Platform ——Platform Health Management (PHM)
  • 云空间网站qq刷赞网站如何做分站
  • 【技术教程】Python/Node.js 调用拼多多商品详情 API 示例详解