当前位置: 首页 > news >正文

Flink Stream API 源码走读 - map 和 flatMap

概述

本文深入分析了 Flink 中 map()flatMap() 方法的源码实现,展示了从 Function 到 Operator 再到 Transformation 的完整转换流程。

前置知识回顾

DataStream 的核心结构

public class DataStream<T> {protected final StreamExecutionEnvironment environment;  // 执行环境protected final Transformation<T> transformation;        // 转换操作
}

重要理解:

  • 每个 DataStream 都包含一个 Transformation
  • Transformation 持有上一个 Transformation 的引用,形成链条
  • 执行环境 environment 在整个调用链中保持不变

map() 方法源码分析

1. map 方法的重载链

// 用户调用入口
dataStream.map(new MapFunction<String, Integer>() {@Overridepublic Integer map(String value) throws Exception {return value.length();}
})
map(MapFunction<T,R> mapper)
抽取返回类型
TypeExtractor.getMapReturnTypes()
map(mapper, outType)
transform('Map', outType, new StreamMap(mapper))

2. 类型信息抽取

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {// 1. 抽取输出类型信息TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true);// 2. 调用重载方法return map(mapper, outType);
}

关键点:

  • clean(mapper) - 对用户函数进行序列化检查
  • TypeExtractor.getMapReturnTypes() - 抽取 MapFunction 的返回类型
  • 解决 Java 泛型擦除问题

3. Function → Operator 转换

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {// 将 MapFunction 封装成 StreamMap Operatorreturn transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

转换过程:

  • MapFunction (用户逻辑) → StreamMap (算子)
  • StreamMap 继承自 AbstractUdfStreamOperator
  • 这是第二个核心概念:Operator(算子)

4. transform 方法分析

public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {// 将 Operator 包装成 OperatorFactoryreturn doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

重要概念:

  • SimpleOperatorFactory - 简单算子工厂
  • 算子工厂是对算子的进一步包装
  • 提供 getOperator()createStreamOperator() 方法

5. doTransform 核心逻辑

protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName,TypeInformation<R> outTypeInfo,StreamOperatorFactory<R> operatorFactory) {// 1. 创建 OneInputTransformationOneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,    // 上一个 transformationoperatorName,          // 算子名称 "Map"operatorFactory,       // 算子工厂outTypeInfo,          // 输出类型environment.getParallelism(),  // 并行度false                 // 是否并行配置);// 2. 创建新的 DataStreamSingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);// 3. 将 transformation 添加到执行环境getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

核心概念转换流程

MapFunction
(用户逻辑)
StreamMap
(算子)
SimpleOperatorFactory
(算子工厂)
OneInputTransformation
(单输入转换)
SingleOutputStreamOperator
(新的DataStream)

转换详解

  1. Function → Operator

    MapFunction<String, Integer> mapper = ...;
    StreamMap<String, Integer> operator = new StreamMap<>(mapper);
    
  2. Operator → OperatorFactory

    SimpleOperatorFactory<Integer> factory = SimpleOperatorFactory.of(operator);
    
  3. OperatorFactory → Transformation

    OneInputTransformation<String, Integer> transformation = new OneInputTransformation<>(this.transformation, "Map", factory, outType, parallelism, false);
    
  4. Transformation → DataStream

    SingleOutputStreamOperator<Integer> result = new SingleOutputStreamOperator<>(environment, transformation);
    

OneInputTransformation 详解

为什么叫 OneInput?

public class OneInputTransformation<IN, OUT> extends Transformation<OUT> {private final Transformation<IN> input;  // 只有一个输入// ...
}

命名含义:

  • OneInput - 表示只有一个输入流
  • 对应的还有 TwoInputTransformation (如 join、union 操作)
  • 体现了不同算子的输入特性

Transformation 链条

SourceTransformation
(socketTextStream)
OneInputTransformation
(map)
OneInputTransformation
(flatMap)
OneInputTransformation
(filter)
SinkTransformation
(print)

flatMap() 方法源码分析

1. flatMap 与 map 的相似性

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {// 1. 抽取输出类型TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType(), Utils.getCallLocationName(), true);// 2. 调用重载方法return flatMap(flatMapper, outType);
}public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {// 将 FlatMapFunction 封装成 StreamFlatMap Operatorreturn transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}

2. 与 map 的差异

特性mapflatMap
函数接口MapFunction<T, R>FlatMapFunction<T, R>
算子实现StreamMapStreamFlatMap
输出特性1对1映射1对多映射
算子名称“Map”“Flat Map”

3. 相同的处理流程

FlatMapFunction
StreamFlatMap
SimpleOperatorFactory
OneInputTransformation
SingleOutputStreamOperator

流程一致性:

  • 都经过相同的 doTransform 方法
  • 都创建 OneInputTransformation
  • 都返回 SingleOutputStreamOperator

执行环境的 Transformation 管理

addOperator 方法

// 在 doTransform 中调用
getExecutionEnvironment().addOperator(resultTransform);
// StreamExecutionEnvironment 中的实现
public class StreamExecutionEnvironment {private final List<Transformation<?>> transformations = new ArrayList<>();public void addOperator(Transformation<?> transformation) {transformations.add(transformation);}
}

重要作用:

  • 将每个 Transformation 添加到环境的列表中
  • 为后续生成 JobGraph 做准备
  • 形成完整的 Transformation 树

命名问题的吐槽

容易混淆的命名

  1. SingleOutputStreamOperator

    // 实际上是个 DataStream,不是 Operator!
    public class SingleOutputStreamOperator<T> extends DataStream<T>
    
  2. addOperator vs addTransformation

    // 方法名叫 addOperator,实际添加的是 Transformation
    environment.addOperator(transformation);  // 应该叫 addTransformation
    

建议:

  • 忽略这些命名问题,理解本质
  • SingleOutputStreamOperator 就是 DataStream
  • 重点关注概念转换流程

链式调用的实现

返回值分析

DataStream<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Integer> mapped = source.map(...);      // 返回 DataStream
SingleOutputStreamOperator<String> flatMapped = mapped.flatMap(...); // 返回 DataStream
KeyedStream<String, String> keyed = flatMapped.keyBy(...);          // 返回 KeyedStream

链式调用原理:

  • 每个操作都返回某种形式的 DataStream
  • SingleOutputStreamOperator 继承自 DataStream
  • 所有 API 方法都定义在 DataStream

为什么叫 SingleOutput?

public class SingleOutputStreamOperator<T> extends DataStream<T>

命名含义:

  • SingleOutput - 表示只有一个输出流
  • 区别于可能有多个输出的算子
  • 体现了算子的输出特性

总结

核心流程回顾

  1. 用户调用 dataStream.map(mapFunction)
  2. 类型抽取 通过 TypeExtractor 获取返回类型
  3. Function→OperatorMapFunction 封装成 StreamMap
  4. Operator→Factory 将算子包装成 SimpleOperatorFactory
  5. Factory→Transformation 创建 OneInputTransformation
  6. Transformation→DataStream 创建新的 SingleOutputStreamOperator
  7. 环境管理Transformation 添加到执行环境

设计模式体现

  • 装饰器模式: Function → Operator → Factory → Transformation → DataStream
  • 工厂模式: SimpleOperatorFactory 封装算子创建逻辑
  • 建造者模式: 逐步构建复杂的 Transformation 对象

关键技术点

  • 类型安全: 通过 TypeInformation 解决泛型擦除
  • 链式调用: 每个操作返回新的 DataStream
  • 延迟执行: 只构建 Transformation 树,不立即执行
  • 统一抽象: map 和 flatMap 使用相同的处理框架

下节预告

Flink Stream API 源码走读 - keyby


重要提醒:

  • 忽略混淆的命名,关注核心概念
  • SingleOutputStreamOperator 本质就是 DataStream
  • 重点理解 Function → Operator → Transformation → DataStream 的转换流程
http://www.dtcms.com/a/332810.html

相关文章:

  • ETH持续上涨推动DEX热潮,交易活跃度飙升的XBIT表现强势出圈
  • MySQL 全面指南:从入门到精通——深入解析安装、配置、操作与优化
  • 从阻塞到异步:Java IO 模型进化史 ——BIO、NIO、AIO 深度解析
  • Cherryusb UAC例程对接STM32 SAI播放音乐和录音(下)=>USB+SAI+TX+RX+DMA控制WM8978播放和录音实验
  • 【嵌入式FreeRTOS#补充1】临界区
  • K-means 聚类算法学习笔记
  • 解锁PostgreSQL专家认证增强驱动引擎
  • 打靶日常-sql注入(手工+sqlmap)
  • 136-基于Spark的酒店数据分析系统
  • Python Sqlalchemy数据库连接
  • 紫金桥RealSCADA:国产工业大脑,智造安全基石
  • 【已解决】在Spring Boot工程中,若未识别到resources/db文件夹下的SQL文件
  • JavaScript 防抖(Debounce)与节流(Throttle)
  • 易道博识康铁钢:大小模型深度融合是现阶段OCR的最佳解决方案
  • 【Trans2025】计算机视觉|UMFormer:即插即用!让遥感图像分割更精准!
  • Notepad++插件开发实战指南
  • Radar Forward-Looking Imaging Based on Chirp Beam Scanning论文阅读
  • 《WINDOWS 环境下32位汇编语言程序设计》第1章 背景知识
  • 【Linux】探索Linux虚拟地址空间及其管理机制
  • C# HangFire的使用
  • 概率论基础教程第2章概率论公理(习题和解答)
  • 在 Linux 服务器搭建Coturn即ICE/TURN/STUN实现P2P(点对点)直连
  • HarmonyOS 实战:用 @Observed + @ObjectLink 玩转多组件实时数据更新
  • pyecharts可视化图表-pie:从入门到精通(进阶篇)
  • Python 数据可视化:柱状图/热力图绘制实例解析
  • 概率论基础教程第2章概率论公理
  • 享元模式C++
  • 基于深度学习的零件缺陷识别方法研究(LW+源码+讲解+部署)
  • 力扣hot100 | 普通数组 | 53. 最大子数组和、56. 合并区间、189. 轮转数组、238. 除自身以外数组的乘积、41. 缺失的第一个正数
  • 什么才是真正的白盒测试?