当前位置: 首页 > news >正文

Flink的数据流图中的数据通道 StreamEdge 详解

        本文从基础原理到代码层面逐步解释 Flink 的数据通道 StreamEdge,尽量让初学者也能理解。

主要思路:从概念开始,逐步深入到实现细节,并结合伪代码来逐步推导。


第一步:什么是 StreamEdge?

        StreamEdge 是 Flink 中用于表示数据流图(Stream Graph)中两个算子(Operator)之间数据传输关系的对象。简单来说,它是一条“数据通道”,定义了数据如何从一个算子流向下游的另一个算子。想象成一条水管,上游的水(数据)通过这条管子流到下游。

  • 为什么需要 StreamEdge?
    Flink 是一个分布式流处理框架,数据需要在不同的任务(Task)之间传递。StreamEdge 负责描述这种传递的逻辑,包括“从哪里来”(源算子)、“到哪里去”(目标算子)、“数据如何分区”(Partitioning)等。

第二步:数据流图的基础

        在 Flink 中,用户定义的程序(比如“读取数据 -> 过滤 -> 聚合”)会被转换成一个逻辑执行计划,叫做 StreamGraph。这个图由以下部分组成:

  1. StreamNode:表示算子(比如 map、filter 等操作)。
  2. StreamEdge:表示算子之间的连接。

例如:

输入数据 -> map 操作 -> sink 操作

在 StreamGraph 中:

  • 有 3 个 StreamNode:输入源(source)、map、sink。
  • 有 2 个 StreamEdge:输入源(source)到 map,map 到 sink。

StreamEdge 的作用是连接这些节点,并定义数据传输的规则。


第三步:StreamEdge 的核心属性

        StreamEdge 是一个 Java 类,我们来看看它的主要属性(基于 Flink 源代码,比如 org.apache.flink.streaming.api.graph.StreamEdge):

  1. 源节点 ID(sourceId)
    表示数据从哪个算子发出。
  2. 目标节点 ID(targetId)
    表示数据流向哪个算子。
  3. 分区策略(Partitioning)
    定义数据如何分配到下游。比如:
    • FORWARD:直接发给下游的同一个任务。
    • HASH:根据键(key)哈希分配。
    • BROADCAST:发送到下游所有任务。
  4. 数据类型(OutputTag)
    如果有侧输出(side output),会标记数据的类型。

用生活中的例子比喻:

  • 源节点和目标节点像是寄信的“发件人”和“收件人”。
  • 分区策略像是“快递分拣规则”(按地址分、全部发、随机发等)。

第四步:StreamEdge 的创建过程

让我们一步步推导 StreamEdge 如何在 Flink 中生成:

1. 用户代码

假设写了一个简单的 Flink 程序:

DataStream<String> input = env.fromElements("a", "b", "c");
DataStream<String> mapped = input.map(x -> x.toUpperCase());
mapped.print();

这里有 3 个操作:输入源 -> map -> print。

2. 构建 StreamGraph

Flink 的 StreamGraphGenerator 会把这个程序翻译成图:

  • 创建 StreamNode:为每个算子分配一个 ID。
  • 创建 StreamEdge:连接这些节点。

伪代码(简化的生成过程):

StreamNode sourceNode = new StreamNode(1, "Source");
StreamNode mapNode = new StreamNode(2, "Map");
StreamNode sinkNode = new StreamNode(3, "Sink");

StreamEdge edge1 = new StreamEdge(sourceNode, mapNode, Partitioning.FORWARD);
StreamEdge edge2 = new StreamEdge(mapNode, sinkNode, Partitioning.FORWARD);
3. 分区策略的推导
  • 从 source 到 map,数据是顺序传递的,所以用 FORWARD
  • 从 map 到 sink,也是直接输出,所以也是 FORWARD

如果用户加了 keyBy(比如按某个字段分组),分区策略会变成 HASH,数据会根据键的哈希值分配。


第五步:底层实现与数据传输

StreamEdge 只是逻辑表示,实际数据传输发生在运行时,由 StreamTask 和 RecordWriter 完成。

  1. 序列化与发送

    • 数据被序列化(比如 "A" 变成字节数组)。
    • RecordWriter 根据 StreamEdge 的分区策略决定发送目标。
  2. 网络传输

    • 如果下游在另一台机器上,数据通过网络发送(基于 Netty)。
    • 分区策略决定数据发到哪个子任务(Subtask)。
  3. 反序列化与处理

    • 下游任务收到数据,反序列化后交给目标算子处理。

用伪代码表示:

class RecordWriter {
    void emit(Record record, StreamEdge edge) {
        if (edge.partitioning == FORWARD) {
            sendToLocalTask(edge.targetId, record);
        } else if (edge.partitioning == HASH) {
            int targetSubtask = hash(record.key) % numSubtasks;
            sendToRemoteTask(edge.targetId, targetSubtask, record);
        }
    }
}

第六步:从代码层面看 StreamEdge

以下是 StreamEdge 类的简化版(基于 Flink 1.18 左右的源码):

public class StreamEdge {
    private final int sourceId;           // 源节点 ID
    private final int targetId;           // 目标节点 ID
    private final StreamPartitioner<?> partitioner; // 分区策略
    private final OutputTag outputTag;    // 侧输出标记(可选)

    public StreamEdge(StreamNode source, StreamNode target, StreamPartitioner<?> partitioner) {
        this.sourceId = source.getId();
        this.targetId = target.getId();
        this.partitioner = partitioner;
        this.outputTag = null; // 默认无侧输出
    }

    public int getSourceId() { return sourceId; }
    public int getTargetId() { return targetId; }
    public StreamPartitioner<?> getPartitioner() { return partitioner; }
}
  • StreamPartitioner 是一个接口,定义了具体分区逻辑(比如 ForwardPartitionerKeyGroupStreamPartitioner)。

第七步:完整推导示例

假设输入是 {("key1", 1), ("key2", 2)},经过 keyBy(key) 和 sum(value)

  1. StreamGraph

    • StreamNode1:Source
    • StreamNode2:KeyBy + Sum
    • StreamEdge:Source -> KeyBy,分区策略为 HASH
  2. 数据流

    • ("key1", 1) 的哈希值算出目标子任务 0。
    • ("key2", 2) 的哈希值算出目标子任务 1。
    • RecordWriter 根据 StreamEdge 的 HASH 策略发送。
  3. 结果

    • 子任务 0 计算 "key1" 的和。
    • 子任务 1 计算 "key2" 的和。

总结

StreamEdge 是 Flink 数据流处理的核心桥梁:

  • 逻辑层面:定义算子间的数据流向和分区规则。
  • 运行时层面:指导数据如何在分布式环境中传输。
  • 代码层面:通过属性和分区器实现灵活的分发。

通过上面的逐步推导,从“水管”比喻到代码实现,希望你对 StreamEdge 的原理有了清晰的理解!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.dtcms.com/a/124032.html

相关文章:

  • 如何保持自己在职场的核心竞争力
  • Python贝叶斯回归、强化学习分析医疗健康数据拟合截断删失数据与参数估计3实例
  • icoding题解排序
  • NO.87十六届蓝桥杯备战|动态规划-完全背包|疯狂的采药|Buying Hay|纪念品(C++)
  • x265 编码器中运动搜索 ME 方法对比实验
  • C++基础精讲-03
  • 苍穹外卖总结
  • 【Web API系列】WebSocketStream API 深度实践:构建高吞吐量实时应用的流式通信方案
  • 23种设计模式生活化场景,帮助理解
  • 洛谷刷题Day1——P1706+P1157+P2089+P3654
  • 要查看 FAISS 使用的 OpenMP 版本,需根据安装方式和系统环境采用不同方法。以下是具体步骤和原理分析:
  • [设计模式]发布订阅者模式解耦业务和UI(以Axios拦截器处理响应状态为例)
  • Spring Boot 自动加载流程详解
  • 8.3.5 ToolStripContainer(工具栏容器)控件
  • 线代第四课:行列式的性质
  • 电子元件浸入式冷却
  • 对重大保险风险测试的算法理解
  • Dify 插件开发笔记
  • MyBatis深度解析与实战指南:细节完整,从入门到精通
  • Windows下进行Redis for Windows安装
  • Linux服务器——搭建Zabbix
  • 02-redis-数据结构实现原理
  • 移动端六大语言速记:第12部分 - 测试与优化
  • vue-ganttastic在vue3中使用示例
  • CISA关键措施要求解析:提升组织网络安全的实用指南
  • 近两年年化是177.6%,wxpython+backtrader+quantstats的智能投研平台(系统源码+策略下载)
  • LangChain4j(1):初步认识Java 集成 LLM 的技术架构
  • Dart逆向之函数调用
  • AI | 字节跳动 AI 中文IDE编辑器 Trae 初体验
  • java线程安全-单例模式-线程通信