当前位置: 首页 > wzjs >正文

南阳网站排名价格网站用图怎么做文件小质量高

南阳网站排名价格,网站用图怎么做文件小质量高,关键词搜索引擎工具爱站,成都装修设计公司推荐前绪 记第一次跟踪seatunnel的任务运行过程三——解析配置的具体方法getLogicalDag 正文 书接上文 ImmutablePair<List<Action>, Set<URL>> immutablePair getJobConfigParser().parse(null);在前一篇文章中说到getLogicDag()方法的第一行&#xff08;如…

前绪

记第一次跟踪seatunnel的任务运行过程三——解析配置的具体方法getLogicalDag

正文

书接上文

ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse(null);

在前一篇文章中说到getLogicDag()方法的第一行(如上),执行了一个解析方法获得了action和jar包资源对,这一步就是由parse()方法来执行的。本片文章就是对parse()方法的源码探索。

任务配置文件

先回顾一下执行seatunnel.sh时命令中‘ --config ’指定的任务配置文件的结构:

env{}
source{}
transform{}
sink{}

主要有以上四部分

获得任务配置解析器

getJobConfigParser()方法也org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment中的一个受保护的方法,在这个方法中直接返回了一个MultipleTableJobConfigParser(多表任务配置解析器)对象。这也与Seatunnel官网宣传的支持耽搁文件中同时同步多张表的数据相一致。
所以对parse()方法的调用,实际就是由MultipleTableJobConfigParser的parse方法的承担执行的。
在getJobConfigParser()方法方法中,有对‘从保存的检查点开始任务’的支持。但是从执行的代码路径上来看,创建ClientJobExecutionEnvironment对象的时候,其中isStartWithSavePoint参数给的是false,所以并没有启用。
确实我们的探索是从一个全新的任务开始并不是从一个save point开始。

填充jobClient对象(解析env)

this.fillJobConfigAndCommonJars();

获取‘ --config ’指定的配置文件中的env部分的内容:

  • 提取任务模式(jobModel),是stream流式的任务,还是batch批任务;
  • 提取任务命名;
  • 提取其他env部分的配置;
  • 提取env部分指定的第三方jar包的uri,并且封装为commonPluginJars(公共插件jar包);

解析source、tranform、sink

List<? extends Config> sourceConfigs = TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, "source", Collections.emptyList());
List<? extends Config> transformConfigs = TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, "transform", Collections.emptyList());
List<? extends Config> sinkConfigs =  TypesafeConfigUtils.getConfigList(seaTunnelJobConfig, "sink", Collections.emptyList());

解析source、transform、sink部分的内容,获取各部分的配置信息。

解析connector

List<URL> sourceConnectorJars = getConnectorJarList(sourceConfigs, PluginType.SOURCE);
List<URL> transformConnectorJars = getConnectorJarList(transformConfigs, PluginType.TRANSFORM);
List<URL> sinkConnectorJars = getConnectorJarList(sinkConfigs, PluginType.SINK);

获得source、transform、sink三部分的使用的connector信息。

换用seatunnel自己的classLoader

ClassLoader parentClassLoader = Thread.currentThread().getContextClassLoader();

将当前使用中的classLoader暂存。

List<URL> sourceJars = Stream.of(sourceConnectorJars, transformConnectorJars).flatMap(Collection::stream).distinct().collect(Collectors.toList());
ClassLoader sourceAndTransformClassLoader = getClassLoader(classLoaderService, parentClassLoader, sourceJars);
ClassLoader sinkClassLoader = getClassLoader(classLoaderService, parentClassLoader, sinkConnectorJars);

将source和transform的connector jar包资源合并去重,然后分别创建“source与transform共用的classLoader”以及“sink使用的classLoader”。

Thread.currentThread().setContextClassLoader(sourceAndTransformClassLoader);

先使用“source与transform共用的classLoader”进行处理

检查配置文件是否可有构建成DAG

ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, sinkConfigs);

检查source、sink、transform(如有)能否组成一个DAG。

checkGraph方法中

if (CollectionUtils.isEmpty(sources) || CollectionUtils.isEmpty(sinks)) {throw new JobDefineCheckException("Source And Sink can not be null");
}

如果缺少source或sink直接报错,source和sink作为执行任务的必备元素,不可或缺。

if (isSimpleGraph(sources, transforms, sinks)) {checkSimpleGraph(sources, transforms, sinks);return;}
checkComplexGraph(sources, transforms, sinks);

如果是简单的图(graph)则执行简单查询,执行完直接返回,否则就执行复杂检查。

是否是简单图:isSimpleGraph(sources, transforms, sinks)
sources.size() == 1 && sinks.size() == 1 && (CollectionUtils.isEmpty(transforms) || transforms.size() == 1)

如果source下只有一个内容,并且sink下也只有一个内容,并且没有transform或只有一个transform,则认为是简单图。
这也是最常用的两种配置模式

  1. 单source+单sink
  2. 单source+单transform+单sink
执行简单检查:checkSimpleGraph(sources, transforms, sinks)
ReadonlyConfig source = ReadonlyConfig.fromConfig(sources.get(0));
ReadonlyConfig sink = ReadonlyConfig.fromConfig(sinks.get(0));
if (transforms.size() == 0) {checkEdge(source, sink);
} else {ReadonlyConfig transform = ReadonlyConfig.fromConfig(transforms.get(0));checkEdge(source, transform);checkEdge(transform, sink);
}
  • 如果没有transform,就检查source的plugin_output(之前版本是result_table_name)和sink的plugin_input(之前版本是source_table_name)是否匹配。
  • 如果有transform,就检查source的plugin_output(之前版本是result_table_name)和transform的plugin_input(之前版本是source_table_name)是否匹配;以及就检查transform的plugin_output(之前版本是result_table_name)和sink的plugin_input(之前版本是source_table_name)是否匹配。
    在此过程中,若配置文件中没有配置plugin_input和plugin_output,则使用默认的名称default-identifier代替。

解析source、transform、sink配置内容

for (int configIndex = 0; configIndex < sourceConfigs.size(); configIndex++) {Config sourceConfig = sourceConfigs.get(configIndex);Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =parseSource(configIndex, sourceConfig, sourceAndTransformClassLoader);tableWithActionMap.put(tuple2._1(), tuple2._2());}log.info("start generating all transforms.");parseTransforms(transformConfigs, sourceAndTransformClassLoader, tableWithActionMap);Thread.currentThread().setContextClassLoader(sinkClassLoader);log.info("start generating all sinks.");List<Action> sinkActions = new ArrayList<>();for (int configIndex = 0; configIndex < sinkConfigs.size(); configIndex++) {Config sinkConfig = sinkConfigs.get(configIndex);sinkActions.addAll(parseSink(configIndex, sinkConfig, sinkClassLoader, tableWithActionMap));}Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);return new ImmutablePair<>(sinkActions, factoryUrls);

逐步解析source、transform、sink的配置内容,最终组装成action返回。
从MultipleTableJobConfigParser的类名刻制,这是支持多表配置文件解析的,所以在解析source、transform、sink时都有循环便利的代码存在。

结束

 Thread.currentThread().setContextClassLoader(parentClassLoader);if (classLoaderService != null) {classLoaderService.releaseClassLoader(Long.parseLong(jobConfig.getJobContext().getJobId()), sourceJars);classLoaderService.releaseClassLoader( Long.parseLong(jobConfig.getJobContext().getJobId()), sinkConnectorJars);}

换回默认的类加载器,并释放“source与transform共用的classLoader”以及“sink使用的classLoader”资源。


文章转载自:

http://LveLJr9E.rLzxr.cn
http://xjVlcaSi.rLzxr.cn
http://KHjr6jel.rLzxr.cn
http://CRdyHNbv.rLzxr.cn
http://Im4bDv1O.rLzxr.cn
http://P1BGIKB6.rLzxr.cn
http://rd0VS4GQ.rLzxr.cn
http://W2Sz4k78.rLzxr.cn
http://yRVXdOuV.rLzxr.cn
http://4FowoFwG.rLzxr.cn
http://pmi7ChpH.rLzxr.cn
http://ReTbMxcs.rLzxr.cn
http://AHNxEuPX.rLzxr.cn
http://TwCohSV2.rLzxr.cn
http://uLKw0u3h.rLzxr.cn
http://CXRrYiVr.rLzxr.cn
http://267zbZlj.rLzxr.cn
http://IivSw6TM.rLzxr.cn
http://PqxMy1bm.rLzxr.cn
http://mPGtpXi6.rLzxr.cn
http://PlVk1gkQ.rLzxr.cn
http://D331ifH1.rLzxr.cn
http://PkogYzPG.rLzxr.cn
http://r25eNlpt.rLzxr.cn
http://41yhvgsN.rLzxr.cn
http://CWV9htbo.rLzxr.cn
http://a8l9qRbL.rLzxr.cn
http://cz7kKZFL.rLzxr.cn
http://nES49C7u.rLzxr.cn
http://EKbiSj7B.rLzxr.cn
http://www.dtcms.com/wzjs/758220.html

相关文章:

  • 百度账号购买网站网络营销有哪些主要策略
  • 连云港网站推广优化推广qq群的网站
  • 网站推广哪个好如何做网站优化并快速提高权重
  • 湛江做网站公司变更地址多少钱
  • 天河做网站公司免费做公众号的网站
  • 企业官网网站建设免费做班级网站的实训报告
  • 想在网站里添加超链接怎么做做繁体书的网站
  • 做网站一天网站制作如何
  • 稳定的网站建设土地违法建设投诉网站
  • wordpress全站固定链接怎样无货源开网店
  • 如何提高网站收录网站做树状结构有什么作用
  • 网站开发记科目提供网站建设备案报价
  • wordpress地址和站点地址区别有没有可以在网站上做试卷的
  • 网站建设属于哪个专业湖北网络科技有限公司
  • 如何做网站公司名seo做网站遇到各种问题
  • 嘉兴定制型网站建设自己电脑做服务器上传网站 需要备案吗
  • 里面云智能建站企业班组建设案例
  • 用织梦做的网站 图片打开很慢设计游戏的软件
  • 济南企业网站制作费用域名网站备案
  • 网站开发安全管理建筑工程网官网招聘资料员
  • 工商注册在哪个网站大连专业网站设计服务商
  • 织梦html5手机网站模板爱站网长尾词挖掘工具
  • 网站建设技术服务合同怎么让网站分享有图片
  • 济南网站制作企业phpstudy搭建本地网站
  • 兰州网站建设 冰雨网页设计培训评价怎么写
  • 平度168网站建设移动互联网 传统网站
  • 网站购买空间信阳百度推广公司电话
  • 网站适配手机怎么做网站怎么做双机房切换
  • 电商网站建设价格低wordpress更改邮箱
  • 网站建设个人职责网络推广专员是干嘛的