当前位置: 首页 > news >正文

记第一次跟踪seatunnel的任务运行过程三——解析配置的具体方法getLogicalDag

前绪

记第一次跟踪seatunnel的任务运行过程二——ClientJobExecutionEnvironment的execture方法

从这里开始,就是使用seatunnel-2.3.9的源码了。前面部分没有变化,2.3.X版本都是通用的。
建议打开源码,边读文章,边阅读源码

正文

getLogicalDag()方法还是在ClientJobExecutionEnvironment这个类中。

关键词DAG

DAG:有向无环图。
LogicalDag:在此可以理解为一个seatunnel job的运行结构图。管理的是从source到transform到sink的过程。

解析配置文件,生成资源对

ImmutablePair<List<Action>, Set<URL>> immutablePair = getJobConfigParser().parse(null);

getJobConfigParser().parse()方法中解析在seatunnel执行名中使用‘–config’指定的配置文件,将其中的source、transformer、sink解析成一个个的anction,并且将每个action(即source、transform、sink)所需要用到的jar包地址提取出来。

收集全部的action,以备后用(后面还收集了全部的jar包资源)

actions.addAll(immutablePair.getLeft());

使用actions这个对象,直接引用所有的action,方便后续的使用。例如:遍历所有的action进行某个动作处理。

读去配置,确定是否自动上传jar包

boolean enableUploadConnectorJarPackage = seaTunnelConfig.getEngineConfig().getConnectorJarStorageConfig().getEnable();

前面已经解析出来的所有的action和对应用到的jar包,这里就是根据配置是否将jar自动上传到服务器。
默认值是:false,即不自动上传。代表着需要提前将需要用到的jar包上传到seatunnel的lib文件夹下。
这里的配置就是从${SEATUNNEL_HOME}/config/seatunnel.yaml这个配置文件中解析出来了的。但是2.3.9版本的seatunnel.yaml中默认是没有seatunnel.engine.jar-storage.enable这一项的,所以使用的基本都是默认值,即:false。

seatunnnl.yaml配置文件完整版及解析

seatunnel.yaml文件的解析对象对应的是org.apache.seatunnel.engine.common.config.server.ServerConfigOptions这个类。
seatunnel.yaml中配置不全且没有明确的说明,可以到这个文件中查找。

处理jar包

配置seatunnel.engine.jar-storage.enable=true,上传jar包

 if (enableUploadConnectorJarPackage) {
            Set<ConnectorJarIdentifier> commonJarIdentifiers = connectorPackageClient.uploadCommonPluginJars(Long.parseLong(jobConfig.getJobContext().getJobId()), commonPluginJars);
            Set<URL> commonPluginJarUrls = getJarUrlsFromIdentifiers(commonJarIdentifiers);
            Set<ConnectorJarIdentifier> pluginJarIdentifiers = new HashSet<>();
            uploadActionPluginJar(actions, pluginJarIdentifiers);
            Set<URL> connectorPluginJarUrls = getJarUrlsFromIdentifiers(pluginJarIdentifiers);
            connectorJarIdentifiers.addAll(commonJarIdentifiers);
            connectorJarIdentifiers.addAll(pluginJarIdentifiers);
            jarUrls.addAll(commonPluginJarUrls);
            jarUrls.addAll(connectorPluginJarUrls);
            actions.forEach(
                    action -> {
                        addCommonPluginJarsToAction(
                                action, commonPluginJarUrls, commonJarIdentifiers);
                    });
        }

如果要上传jar包,则将公共插件的jar包、前面解析出来的action使用到的jar包上传上去。
收集所有的jar包,并且给每个action添加公共插件jar包。

配置seatunnel.engine.jar-storage.enable=false(默认),不上传jar包

jarUrls.addAll(commonPluginJars);
            jarUrls.addAll(immutablePair.getRight());
            actions.forEach(
                    action -> {
                        addCommonPluginJarsToAction(
                                action, new HashSet<>(commonPluginJars), Collections.emptySet());
                    });

收集所有的jar包,并且给每个action添加公共插件jar包。

结束:生成logicDag

getLogicalDagGenerator().generate()

生成一个logicDag,并返回。

后续

记第一次跟踪seatunnel的任务运行过程四——getJobConfigParser().parse()的动作

相关文章:

  • 论文调研 | 一些开源的AI代码生成模型调研及总结【更新于250313】
  • 专题三x的平方根
  • 动态调试实战:Frida脚本编写与内存注入
  • 【实战ES】实战 Elasticsearch:快速上手与深度实践-附录-1-常用命令速查表-集群健康检查、索引生命周期管理、故障诊断命令
  • stable Diffusion 中的 VAE是什么
  • P3390 【模板】矩阵快速幂
  • Redis项目_黑马点评
  • 【JavaEE】Spring Web MVC
  • 蓝队基本技能 web入侵指南 日志分析 后门查杀 流量分析
  • Houdini学习笔记
  • 2024 CCPC Liaoning Provincial Contest K
  • 【C++】每日一练(用队列实现栈)
  • VSTO(C#)Excel开发6:与窗体交互
  • Java网络多线程
  • TCP网络协议
  • 评委打分5个评委 去掉一个最高分和一个最低分 取平均分
  • Java高频面试之集合-11
  • 【2025】基于springboot+vue+uniapp的厨师预约上门做菜小程序(源码、万字文档、图文修改、调试答疑)
  • 使用Qt创建悬浮窗口
  • NPU的工作原理:神经网络计算的流水线
  • 临朐在线网站建设/百度在线翻译
  • 无锡网站建设上海韵茵/seo 优化思路
  • 人民政府门户网站首页/电工培训机构
  • java做门户网站/揭阳seo快速排名
  • 如何解决旅游网站建设问题/站长工具之家seo查询
  • 网站名字备案流程/软件定制开发