当前位置: 首页 > wzjs >正文

网站根目录验证文件是什么泸州网站seo

网站根目录验证文件是什么,泸州网站seo,wordpress 登录页面,网站建设有云端吗目录 1. 任务启动入口 2. 任务执行命令类:FlinkTaskExecuteCommand 3. FlinkExecution的创建与初始化 3.1 核心组件初始化 3.2 关键对象说明 4. 任务执行:FlinkExecution.execute() 5. Source处理流程 5.1 插件初始化 5.2 数据流生成 6. Trans…

目录

1. 任务启动入口

2. 任务执行命令类:FlinkTaskExecuteCommand

3. FlinkExecution的创建与初始化

3.1 核心组件初始化

3.2 关键对象说明

4. 任务执行:FlinkExecution.execute()

5. Source处理流程

5.1 插件初始化

5.2 数据流生成

6. Transform处理流程

6.1 插件初始化

6.2 转换执行

7. Sink处理流程

7.1 插件初始化

7.2 数据输出

执行流程全景图

关键设计总结


        本文基于SeaTunnel 2.3.x源码分析Flink引擎执行流程,以seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/SeaTunnelApiExample.java为入口,完整解析Flink引擎的执行流程。


1. 任务启动入口

启动类核心代码:

// 1. 初始化Flink启动命令参数
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
​
// 2. 执行SeaTunnel.run()回调Flink执行命令
SeaTunnel.run(flinkCommandArgs.buildCommand());
  • buildCommand()返回FlinkTaskExecuteCommand实例

  • SeaTunnel.run()最终调用FlinkTaskExecuteCommand.execute()


2. 任务执行命令类:FlinkTaskExecuteCommand

核心执行流程:

public void execute() {// 1. 解析配置文件生成Config对象Config config = ConfigBuilder.of(configFile);// 2. 创建FlinkExecution实例FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);// 3. 执行任务seaTunnelTaskExecution.execute();
}

3. FlinkExecution的创建与初始化
3.1 核心组件初始化
public FlinkExecution(Config config) {// 创建三大处理器this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(jarPaths, config.getConfigList(Constants.SOURCE), jobContext);this.transformPluginExecuteProcessor = new TransformExecuteProcessor(jarPaths, TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()),jobContext);this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(jarPaths, config.getConfigList(Constants.SINK), jobContext);// 初始化Flink执行环境this.flinkRuntimeEnvironment = FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths));// 为处理器注入运行时环境this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);this.sinkPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
}
3.2 关键对象说明
组件类型功能
sourcePluginExecuteProcessorSourceExecuteProcessor处理数据源接入
transformPluginExecuteProcessorTransformExecuteProcessor处理数据转换逻辑
sinkPluginExecuteProcessorSinkExecuteProcessor处理数据输出
flinkRuntimeEnvironmentFlinkRuntimeEnvironment封装Flink StreamExecutionEnvironment

4. 任务执行:FlinkExecution.execute()

DAG构建流程:

public void execute() {// 初始化数据流集合List<DataStream<Row>> dataStreams = new ArrayList<>();// 按顺序执行三大组件dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);dataStreams = transformPluginExecuteProcessor.execute(dataStreams);sinkPluginExecuteProcessor.execute(dataStreams);
}

5. Source处理流程

5.1 插件初始化

调用链:

SourceExecuteProcessor()→ super(jarPaths, pluginConfigs, jobContext) // 调用父类构造器→ this.plugins = initializePlugins(jarPaths, pluginConfigs)

插件加载核心逻辑:

protected List<SeaTunnelSource> initializePlugins(...) {SeaTunnelSourcePluginDiscovery discovery = new SeaTunnelSourcePluginDiscovery();List<SeaTunnelSource> sources = new ArrayList<>();for (Config sourceConfig : pluginConfigs) {// 1. 识别插件类型PluginIdentifier identifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));// 2. 加载依赖JARjars.addAll(discovery.getPluginJarPaths(Lists.newArrayList(identifier)));// 3. 创建插件实例SeaTunnelSource source = discovery.createPluginInstance(identifier);// 4. 初始化插件source.prepare(sourceConfig);source.setJobContext(jobContext);// 5. 批处理模式校验if (jobContext.getJobMode() == JobMode.BATCH && source.getBoundedness() == Boundedness.UNBOUNDED) {throw new UnsupportedOperationException("Unbounded source not support batch mode");}sources.add(source);}jarPaths.addAll(jars);return sources;
}
5.2 数据流生成

执行入口:

public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {StreamExecutionEnvironment env = flinkRuntimeEnvironment.getStreamExecutionEnvironment();List<DataStream<Row>> sources = new ArrayList<>();for (int i = 0; i < plugins.size(); i++) {SeaTunnelSource internalSource = plugins.get(i);// 1. 创建SourceFunctionBaseSeaTunnelSourceFunction sourceFunction;if (internalSource instanceof SupportCoordinate) {sourceFunction = new SeaTunnelCoordinatedSource(internalSource);} else {sourceFunction = new SeaTunnelParallelSource(internalSource); // Flink默认路径}// 2. 创建DataStreamSourceDataStreamSource<Row> sourceStream = addSource(env,sourceFunction,"SeaTunnel " + internalSource.getClass().getSimpleName(),internalSource.getBoundedness() == Boundedness.BOUNDED);// 3. 设置并行度if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {sourceStream.setParallelism(pluginConfig.getInt(CommonOptions.PARALLELISM.key()));}// 4. 注册结果表registerResultTable(pluginConfig, sourceStream);sources.add(sourceStream);}return sources;
}

结果表注册逻辑:

void registerResultTable(Config config, DataStream<Row> dataStream) {if (config.hasPath(RESULT_TABLE_NAME)) {String tableName = config.getString(RESULT_TABLE_NAME);StreamTableEnvironment tableEnv = getStreamTableEnvironment();if (!TableUtil.tableExists(tableEnv, tableName)) {if (config.hasPath("field_name")) {// 带字段名的注册方式tableEnv.registerDataStream(tableName, dataStream, config.getString("field_name"));} else {// 默认注册方式tableEnv.registerDataStream(tableName, dataStream);}}}
}

6. Transform处理流程

6.1 插件初始化

关键校验逻辑:

public void prepare(Config pluginConfig) {// 必须包含source_table_name和result_table_nameif (!pluginConfig.hasPath(SOURCE_TABLE_NAME) || !pluginConfig.hasPath(RESULT_TABLE_NAME)) {throw new IllegalArgumentException("Missing required table name config");}// 输入输出表名不能相同if (Objects.equals(pluginConfig.getString(SOURCE_TABLE_NAME),pluginConfig.getString(RESULT_TABLE_NAME)) {throw new IllegalArgumentException("Source and result table names must be different");}this.inputTableName = pluginConfig.getString(SOURCE_TABLE_NAME);this.outputTableName = pluginConfig.getString(RESULT_TABLE_NAME);// 调用具体Transform的配置初始化setConfig(pluginConfig);
}
6.2 转换执行

核心处理流程:

public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {DataStream<Row> input = upstreamDataStreams.get(0); // 默认使用第一个上游流for (int i = 0; i < plugins.size(); i++) {SeaTunnelTransform transform = plugins.get(i);Config pluginConfig = pluginConfigs.get(i);// 1. 获取输入流(通过source_table_name查找)DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);// 2. 执行转换input = flinkTransform(transform, stream);// 3. 注册结果表registerResultTable(pluginConfig, input);}return Collections.singletonList(input);
}

转换算子实现:

protected DataStream<Row> flinkTransform(SeaTunnelTransform transform, DataStream<Row> stream) {// 类型系统转换SeaTunnelDataType inputType = TypeConverterUtils.convert(stream.getType());transform.setTypeInfo(inputType);// 创建行转换器FlinkRowConverter inputConverter = new FlinkRowConverter(inputType);FlinkRowConverter outputConverter = new FlinkRowConverter(transform.getProducedType());// 通过flatMap实现转换逻辑return stream.flatMap(new FlatMapFunction<Row, Row>() {@Overridepublic void flatMap(Row value, Collector<Row> out) {// 类型转换SeaTunnelRow inRow = inputConverter.reconvert(value);// 执行Transform核心逻辑SeaTunnelRow outRow = (SeaTunnelRow) transform.map(inRow);if (outRow != null) {// 输出类型转换Row result = outputConverter.convert(outRow);out.collect(result);}}}, TypeConverterUtils.convert(transform.getProducedType()));
}

7. Sink处理流程

7.1 插件初始化

特殊处理逻辑:

protected List<SeaTunnelSink> initializePlugins(...) {for (Config sinkConfig : pluginConfigs) {SeaTunnelSink sink = discovery.createPluginInstance(identifier);sink.prepare(sinkConfig);// 数据保存模式处理if (sink instanceof SupportDataSaveMode) {SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;saveModeSink.checkOptions(sinkConfig); // 校验配置}}
}
7.2 数据输出

执行流程:

public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {DataStream<Row> input = upstreamDataStreams.get(0);for (int i = 0; i < plugins.size(); i++) {Config sinkConfig = pluginConfigs.get(i);SeaTunnelSink sink = plugins.get(i);// 1. 获取输入流DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);// 2. 设置类型信息sink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));// 3. 处理数据保存模式if (sink instanceof SupportDataSaveMode) {SupportDataSaveMode saveModeSink = (SupportDataSaveMode) sink;DataSaveMode saveMode = saveModeSink.getDataSaveMode();saveModeSink.handleSaveMode(saveMode); // 处理保存模式}// 4. 适配Flink Sink APIDataStreamSink<Row> dataStreamSink = stream.sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(sink))).name(sink.getPluginName());// 5. 设置并行度if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {dataStreamSink.setParallelism(sinkConfig.getInt(CommonOptions.PARALLELISM.key()));}}return null; // Sink是终点,无下游数据流
}

FlinkSink适配器伪代码:

class FlinkSink implements SinkFunction<Row> {private final SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink;public void invoke(Row value) {// 将Flink Row转换为SeaTunnelRowSeaTunnelRow row = converter.convert(value);// 调用SeaTunnel Sink写入逻辑sink.write(row);}
}

执行流程全景图


关键设计总结

  1. 插件化架构

    • 通过SPI机制动态加载Source/Transform/Sink插件

    • 插件发现机制:SeaTunnel*PluginDiscovery

  2. 表名映射机制

    • source_table_nameresult_table_name构成DAG链路

    • 通过registerDataStream()实现表注册

  3. 类型系统转换

    • TypeConverterUtils处理SeaTunnel类型与Flink类型的转换

    • FlinkRowConverter实现行数据双向转换

  4. 执行环境封装

    • FlinkRuntimeEnvironment统一管理执行环境

    • 兼容批流一体:通过JobMode控制执行模式

  5. 适配器模式

    • SeaTunnelParallelSource适配SeaTunnelSource到Flink SourceFunction

    • FlinkSink适配SeaTunnelSink到Flink SinkFunction

        本文完整解析了SeaTunnel Flink引擎从任务启动到DAG构建的全流程,重点突出了插件加载机制、类型转换系统和表名映射等核心设计,为理解SeaTunnel内部工作原理提供了详细参考。

http://www.dtcms.com/wzjs/350772.html

相关文章:

  • 快速网站优化哪家好软文推广怎么做
  • 正规招聘网站有哪些营销页面设计
  • 订阅号栏目里做微网站比较有名的个人网站
  • 网站建设 国家标准新闻发稿渠道
  • 江苏h5响应式网站建设设计营销推广的平台
  • 福州网站建设金森seo广告
  • 企业网站制作百度竞价一个月5000够吗
  • asp网站上传保定网站seo
  • 绿色主色调的网站seo推广专员工作好做吗
  • 有没有专门做布料的网站app推广代理加盟
  • 中关村手机之家官网台州seo公司
  • 长沙做网站街网络营销优化推广公司
  • 做网站要钱吗免费下载百度一下
  • 表白网页在线生成网站企业网站seo优化
  • 上海网站开发设计公司seo教程搜索引擎优化入门与进阶
  • 连州网站建设软文模板300字
  • 仙游网站建设巨量算数数据分析
  • indesign做网站如何做个网站推广自己产品
  • 武陟县住房和城乡建设局网站常用网站推广方法及资源
  • 免费网站制作知乎黑帽seo寄生虫
  • 三五互联网站管理登录地址成都百度推广联系方式
  • 苏州营销网站建设专业的网页制作公司
  • 电子商务网站建设软件开发课设旧版优化大师
  • 大连网站建设公司排名抖音seo是什么
  • 网站别人能打开我打不开佛山市人民政府门户网站
  • 网络优化工程师需要学什么seo检查工具
  • 石家庄做网站汉狮网络搜索引擎seo关键词优化
  • 网页转向功能网站百度广告点击软件源码
  • 便宜网站建设宁德市旅游景点大全
  • ic外贸网站建设b站免费版入口