当前位置: 首页 > news >正文

杭州公司网站设计外贸手工做兼职的网站

杭州公司网站设计,外贸手工做兼职的网站,设计师培训 网站,关于建筑的网站有哪些内容Flink Stream API 源码走读 - 总结 概述 本文档详细分析Flink数据流处理的完整流程,从API调用到Transformation构建,再到最终的执行图生成。通过源码分析,深入理解Flink如何将用户的流式API调用转换为可执行的数据流图。 1. Source阶段&a…

Flink Stream API 源码走读 - 总结

概述

本文档详细分析Flink数据流处理的完整流程,从API调用到Transformation构建,再到最终的执行图生成。通过源码分析,深入理解Flink如何将用户的流式API调用转换为可执行的数据流图。
流程概览

1. Source阶段:数据源的创建

1.1 API调用

// 用户代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);

1.2 内部处理流程

当调用env.socketTextStream()时,Flink内部会:

  1. 创建SourceFunction

    • 创建SocketTextStreamFunction实例
    • 封装Socket连接逻辑和数据读取逻辑
  2. 生成LegacySourceTransformation

    // 源码位置:LegacySourceTransformation.java
    public class LegacySourceTransformation<T> extends PhysicalTransformation<T> {private final StreamOperatorFactory<T> operatorFactory;// ...
    }
    
    • 包含具体的运算逻辑(SocketTextStreamFunction)
    • 通过SimpleOperatorFactory.of(operator)包装成算子工厂
    • 这是一个物理Transformation,包含实际的计算逻辑
  3. 返回DataStreamSource

    // DataStreamSource继承自SingleOutputStreamOperator
    public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {// this.transformation 引用上面创建的LegacySourceTransformation
    }
    

2. Map阶段:数据转换操作

2.1 API调用

// 用户代码
SingleOutputStreamOperator<String> mapped = source.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.toUpperCase();}
});

2.2 内部处理流程

当调用map()方法时:

  1. 类型推断

    // DataStream.java 第588行
    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true);return map(mapper, outType);
    }
    
  2. 创建StreamMap算子

    // DataStream.java 第631行
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
    
  3. 生成OneInputTransformation

    // DataStream.doTransform() 方法
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,    // 引用上一个transformation(LegacySourceTransformation)operatorName,          // "Map"operatorFactory,       // SimpleOperatorFactory.of(StreamMap)outTypeInfo,          // 输出类型信息environment.getParallelism(),false
    );
    
  4. 添加到执行环境

    // 将物理transformation添加到transformations列表
    getExecutionEnvironment().addOperator(resultTransform);
    
  5. 返回新的DataStream

    SingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator(environment, resultTransform);
    

3. FlatMap阶段:扁平化映射操作

3.1 API调用

// 用户代码
SingleOutputStreamOperator<String> flatMapped = mapped.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}
});

3.2 内部处理流程

FlatMap操作的处理流程与Map类似:

  1. 创建StreamFlatMap算子:包装用户的FlatMapFunction
  2. 生成OneInputTransformation:引用上一个Map的transformation
  3. 添加到transformations列表:这是物理transformation
  4. 返回新的SingleOutputStreamOperator

4. KeyBy阶段:数据分组操作

4.1 API调用

// 用户代码
KeyedStream<String, String> keyed = flatMapped.keyBy(value -> value);

4.2 内部处理流程

KeyBy操作比较特殊:

  1. 生成PartitionTransformation

    // 源码位置:PartitionTransformation.java
    public class PartitionTransformation<T> extends Transformation<T> {private final Transformation<T> input;  // 引用上一个transformationprivate final StreamPartitioner<T> partitioner;  // 分区器,不是算子工厂
    }
    
  2. 关键特点

    • 这是一个虚拟transformation,不包含实际的计算逻辑
    • 持有的是KeyGroupStreamPartitioner分区器,而不是算子工厂
    • 不会添加到transformations列表
    • 用于指导数据在网络中的分发策略
  3. 返回KeyedStream

    public class KeyedStream<T, KEY> extends DataStream<T> {// 继承自DataStream,但提供了按键分组后的特殊API
    }
    

5. Window阶段:窗口操作

5.1 API调用

// 用户代码
WindowedStream<String, String, TimeWindow> windowed =keyed.timeWindow(Time.seconds(5));

5.2 内部处理流程

Window操作创建了一个特殊的中间对象:

  1. 创建WindowedStream

    // 源码位置:WindowedStream.java 第79行
    public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;  // 保持对上游KeyedStream的引用this.builder = new WindowOperatorBuilder<>(windowAssigner,                    // 窗口分配器windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),  // 触发器input.getExecutionConfig(),        // 执行配置input.getType(),                   // 输入类型input.getKeySelector(),            // Key选择器input.getKeyType()                 // Key类型);
    }
    
  2. WindowedStream特点

    • 不是真正的DataStream,只是一个工具类/API容器
    • 不能调用map、flatMap等通用流操作
    • 只能调用窗口相关的API:reduce、apply、process等
    • 持有WindowOperatorBuilder,用于后续构建窗口算子

6. Reduce阶段:窗口聚合操作

6.1 API调用

// 用户代码
SingleOutputStreamOperator<String> result = windowed.reduce(new ReduceFunction<String>() {@Overridepublic String reduce(String value1, String value2) throws Exception {return value1 + "," + value2;}
});

6.2 内部处理流程

这是窗口操作的核心实现:

  1. 构建WindowOperator

    // WindowedStream.java 第230行
    OneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);
    
  2. WindowOperatorBuilder.reduce()方法

    • 创建WindowOperator实例
    • 将用户的ReduceFunction和内部的PassThroughWindowFunction组合
    • WindowOperator包含完整的窗口处理逻辑:
      • 窗口分配(WindowAssigner)
      • 窗口触发(Trigger)
      • 数据聚合(ReduceFunction)
      • 结果输出(WindowFunction)
  3. 生成OneInputTransformation

    // WindowedStream.java 第233行
    return input.transform(opName, resultType, operator).setDescription(opDescription);
    
    • 这是一个物理transformation
    • 包含具体的窗口+聚合运算逻辑
    • 会被添加到transformations列表中
  4. 返回SingleOutputStreamOperator

    • 又回到了正常的DataStream
    • 可以继续进行其他流操作

7. Sink阶段:数据输出操作

7.1 API调用

// 用户代码
DataStreamSink<String> sink = result.print();

7.2 内部处理流程

Sink操作是数据流的终点:

  1. 创建PrintSinkFunction

    // 内部会创建PrintSinkFunction实例
    // 包含将数据打印到控制台的逻辑
    
  2. 生成LegacySinkTransformation

    // 通过SimpleOperatorFactory.of()包装成算子工厂
    // 创建LegacySinkTransformation实例
    LegacySinkTransformation<T> sinkTransformation = new LegacySinkTransformation<>(input.getTransformation(),  // 引用上一个transformationname,                       // "Sink: Print to Std. Out"operatorFactory,           // SimpleOperatorFactory.of(StreamSink)parallelism
    );
    
  3. 添加到transformations列表

    • 这是一个物理transformation
    • 包含具体的数据处理逻辑(打印到屏幕)
    • 会被添加到环境的transformations列表中
  4. 返回DataStreamSink

    public class DataStreamSink<T> {private final Transformation<T> transformation;// 不是真正的流,只是一个封装工具类// 不能再调用map、flatMap等API// 只持有自己的transformation引用
    }
    

8. Transformation链式结构分析

8.1 物理Transformation vs 虚拟Transformation

物理Transformation(会添加到transformations列表):

  • LegacySourceTransformation:包含Source算子逻辑
  • OneInputTransformation:包含Map、FlatMap、Window等算子逻辑
  • LegacySinkTransformation:包含Sink算子逻辑

虚拟Transformation(不会添加到transformations列表):

  • PartitionTransformation:只包含分区策略,用于指导数据分发

8.2 引用关系链

LegacySourceTransformation (Source)↑ input
OneInputTransformation (Map)↑ input
OneInputTransformation (FlatMap)↑ input
PartitionTransformation (KeyBy) [虚拟]↑ input
OneInputTransformation (Window+Reduce)↑ input
LegacySinkTransformation (Sink)

8.3 关键特点

  1. Source Transformation

    • 没有input引用(数据流的起点)
    • 包含数据源逻辑
  2. 中间Transformation

    • 都有input成员变量,引用上一个transformation
    • 形成链式结构
  3. Sink Transformation

    • 有input引用,但没有下游
    • 数据流的终点
  4. 遍历能力

    • 通过最后一个transformation可以向上遍历整个链路
    • 这是Flink构建执行图的基础

返回目录

Flink 源码系列 - 前言

http://www.dtcms.com/a/519443.html

相关文章:

  • 深入浅出Langchain4j——构建Java大语言模型应用的新范式
  • Babylon.js学习之路《添加自定义摇杆控制相机》
  • 【JAVA 进阶】SpringBoot集成Sa-Token权限校验框架深度解析
  • 【CMakeLists.txt】Qt6 依赖配置详解
  • 用js做网站登录网页成品
  • 数据库安全网关:从“看得见访问”到“控得住风险”的关键一层
  • 对泊松过程的理解
  • 【数论】质数筛(埃氏筛、欧拉筛)
  • 扩展名网站兰州做网站一咨询兰州做网站公司
  • 华为OD-Java面经-21届考研
  • Excel拆分和合并优化版本
  • 智能网联汽车:当汽车遇上“智慧网络”
  • 常规点光源在工业视觉检测上的应用
  • C++新特性——正则表达式
  • 基于卷积神经网络的汽车类型识别系统,resnet50,vgg16,resnet34【pytorch框架,python代码】
  • 设计 企业网站电脑系统网站建设
  • 做网站业务的怎么找资源网站推广名片
  • FPGA强化- HDMI显示器驱动设计与验证
  • 【PPT-ungroup PPT解组合,python无水印】
  • Java 17 环境下 EasyPoi 反射访问异常分析与解决方案(ExcelImportUtil.importExcelMore)
  • SpringBoot+alibaba的easyexcel实现前端使用excel表格批量插入
  • 重大更新,LVGL有UI编辑器用了
  • 多场景 VR 教学编辑器:重构教学流程的场景化实践
  • 公司做网站让我们销售单页面网站模板怎么做
  • 广州微网站建设价位广东网站建设公司
  • 基于 Spring AI Alibaba + Nacos 的分布式 Multi-Agent 构建指南
  • 《与幽灵作战:Python 棘手 Bug 的调试策略与实战技巧》
  • 使用Requests和lxml实现飞卢小说网小说爬取
  • bug 记录 - 路由守卫 beforeRouteLeave 与 confirm 结合,不生效问题
  • 数据库字段类型bit容易被忽视的bug