当前位置: 首页 > news >正文

一文对最新版本 Flink 反压机制全景深度解析(附源码)

一文对最新版本 Flink 反压机制全景深度解析-附源码

  • 1. 反压形成的根本原因
    • 1.1 反压的本质
    • 1.2 反压产生的典型场景
  • 2. Buffer 管理体系(反压的物理基础)
    • 2.1 三层 Buffer 架构
    • 2.2 NetworkBufferPool 核心源码
    • 2.3 LocalBufferPool - 动态内存管理
  • 3. Credit-Based Flow Control(信用流量控制)
    • 3.1 核心思想
    • 3.2 Credit 传递流程
    • 3.3 RemoteInputChannel - Downstream Credit 管理
    • 3.4 PartitionRequestServerHandler - Upstream Credit 处理
    • 3.5 CreditBasedSequenceNumberingViewReader - 数据发送控制
  • 4. 反压传播链路(多层次分析)
    • 4.1 完整的反压传播路径
    • 4.2 代码层面的阻塞点
    • 4.3 反压传播的时序图
  • 5. 上下游感知反压的机制
    • 5.1 Downstream 如何感知压力?
    • 5.2 Upstream 如何感知反压?
    • 5.3 反压监控指标(Task Metrics)
  • 6. 反压处理策略(源码级优化)
    • 6.1 Buffer Debloating(动态缓冲区调整)
    • 6.2 Overdraft Buffers(透支缓冲)
    • 6.3 Credit 累积与批量通知
  • 7. 反压的级联效应(系统视角)
    • 7.1 反压如何影响 Checkpoint
    • 7.2 反压与 Watermark 的交互
  • 8. 完整示例:反压从产生到解决
    • 8.1 场景:Kafka Source → Slow Sink
    • 8.2 反压形成过程(时间线)
    • 8.3 监控指标表现
    • 8.4 解决方案
  • 9. 核心源码路径总结
  • 10. 总结

1. 反压形成的根本原因

1.1 反压的本质

数据生产速度 > 数据消费速度 ↓
缓冲区填满↓
生产者被阻塞↓
反压形成并向上游传播

1.2 反压产生的典型场景

场景1: 算子处理能力不足
Source (1000 records/s)Map (1000 r/s)Window (100 r/s) ← 瓶颈!↓反压产生场景2: 外部系统慢
Kafka Source → Transform → **Sink to DB** ← DB响应慢↓反压传播到Source场景3: 数据倾斜
Key "A": 90% 数据 ← 过载!
Key "B": 5% 数据
Key "C": 5% 数据场景4: GC 压力
大状态 + 频繁GC → Task线程暂停 → 反压

2. Buffer 管理体系(反压的物理基础)

2.1 三层 Buffer 架构

┌─────────────────────────────────────────────────────────────┐
│              Layer 1: NetworkBufferPool                      │
│              (全局共享 Buffer 池)                             │
├─────────────────────────────────────────────────────────────┤
│  • 固定大小的 MemorySegment 集合                              │
│  • 由 TaskManager 启动时分配                                  │
│  • 默认 32KB per segment                                     │
│  • 动态在 LocalBufferPool 之间重分配                          │
└─────────────────────────────────────────────────────────────┘↓ 分配给                   ↓ 分配给
┌──────────────────────────┐  ┌──────────────────────────┐
│  Layer 2: LocalBufferPool │  │  LocalBufferPool         │
│  (InputGate 专属)         │  │  (ResultPartition 专属)   │
├──────────────────────────┤  ├──────────────────────────┤
│  • 每个 InputGate 一个    │  │  • 每个 ResultPartition  │
│  • 可动态调整大小          │  │    一个                   │
│  • 支持 overdr专用
│  • Exclusive Buffers     │  │                           │
│    (专用于某个 channel)   │  │                           │
│  • Floating Buffers      │  │                           │
│    (共享于所有 channels)  │  │                           │
└──────────────────────────┘  └──────────────────────────┘

2.2 NetworkBufferPool 核心源码

// flink-runtime/.../network/buffer/NetworkBufferPool.javapublic class NetworkBufferPool implements BufferPoolFactory {// 全局 Buffer 总数private final int totalNumberOfMemorySegments;// 单个 Segment 大小(默认 32KB)private final int memorySegmentSize;// 可用的内存段队列private final ArrayDeque<MemorySegment> availableMemorySegments;// 所有 LocalBufferPoolprivate final Set<LocalBufferPool> allBufferPools = new HashSet<>();/*** 请求 Buffer(阻塞式)* 关键:如果没有可用 Buffer,线程会在这里阻塞!*/private List<MemorySegment> internalRequestMemorySegments(int numberOfSegmentsToRequest) throws IOException {final List<MemorySegment> segments = new ArrayList<>(numberOfSegmentsToRequest);final Deadline deadline = Deadline.fromNow(requestSegmentsTimeout);while (true) {if (isDestroyed) {throw new IllegalStateException("Buffer pool is destroyed.");}MemorySegment segment;synchronized (availableMemorySegments) {// 🔥 尝试获取 Segmentsegment = internalRequestMemorySegment();if (segment == null) {// ⏳ 没有可用 Buffer,等待 2 秒availableMemorySegments.wait(2000);}}if (segment != null) {segments.add(segment);}if (segments.size() >= numberOfSegmentsToRequest) {break;  // 请求满足}if (!deadline.hasTimeLeft()) {// ❌ 超时抛异常throw new IOException("Timeout requesting buffers");}}return segments;}
}

2.3 LocalBufferPool - 动态内存管理

// flink-runtime/.../network/buffer/LocalBufferPool.javapublic class LocalBufferPool implements BufferPool {// 当前池子可用的 Bufferprivate final ArrayDeque<MemorySegment> availableMemorySegments;// 每个 subpartition 的 Buffer 计数private final int[] subpartitionBuffersCount;// 最大 Buffer 数(上限)private int maxUsedBuffers;/*** 🔥 阻塞式请求 Buffer* Task 线程会在这里被阻塞,形成反压!*/private MemorySegment requestMemorySegmentBlocking(int targetChannel)throws InterruptedException {MemorySegment segment;// 循环等待直到有 Buffer 可用while ((segment = requestMemorySegment(targetChannel)) == null) {try {// ⏳ 等待 Buffer 可用的 FuturegetAvailableFuture().get();  // 阻塞点!} catch (ExecutionException e) {LOG.error("The available future is completed exceptionally.", e);ExceptionUtils.rethrow(e);}}return segment;}/*** 非阻塞式请求*/@Nullableprivate MemorySegment requestMemorySegment(int targetChannel) {MemorySegment segment = null;synchronized (availableMemorySegments) {checkDestroyed();// 1. 优先从本地池子取if (!availableMemorySegments.isEmpty()) {segment = availableMemorySegments.poll();}// 2. 如果达到上限,尝试获取 Overdraft Bufferelse if (isRequestedSizeReached()) {segment = requestOverdraftMemorySegmentFromGlobal();}if (segment == null) {return null;  // 🚫 没有可用 Buffer}// 更新 per-channel 计数if (targetChannel != UNKNOWN_CHANNEL) {subpartitionBuffersCount[targetChannel]++;// 达到per-channel上限if (subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {unavailableSubpartitionsCount++;}}checkAndUpdateAvailability();}return segment;}
}

http://www.dtcms.com/a/477758.html

相关文章:

  • 从硅谷到全球:新思科技(Synopsys)的发展史与产业深耕之路
  • 网站建设wang1314公司图案设计
  • 【AES加密专题】7.AES全局函数的编写
  • EPSON TG2016SMN:低功耗温补晶振延长电池设备续航
  • Qt C++ 教程:无边框窗体 + 自定义标题栏 + 圆角 + 拖拽拉升 + 阴影
  • 用 Gradle 实现自动化测试:集成 JUnit、TestNG,生成测试报告
  • 邵阳市住房和建设局网站西安做网站收费价格
  • 【QT界面设计学习篇】qt快速开发技巧
  • Hadoop面试题及详细答案 110题 (86-95)-- Hadoop生态系统工具
  • 基于单片机电器断路器保护器系统Proteus仿真(含全部资料)
  • 如何做天猫网站怎么做win10原版系统下载网站
  • FocusAny开源 #2:速算本Calculator
  • Typecho独立页面能否支持多个自定义永久链接路径(如 /special/ 和 /other/)
  • uniapp学习【路由跳转 +数据请求+本地存储+常用组件】
  • ads基本量的含义和计算方程(1.直流扫描)
  • ORACLE 高危漏洞(9.8分)
  • 【检索:LSM】7、LSM树深度解析:为什么日志系统首选LSM树而非B+树?从原理到实践
  • 网站推广句子快照关键词优化
  • (解决)重装系统电脑账户进不去被停用,PIN无法验证,提示0xc0000234
  • 属于门户网站的有个人网站制作多少钱
  • wpf 命令理解
  • [好用工具] 一款mac/windows电脑历史剪切板工具,类似著名的Paste
  • 【Qt开发】输入类控件(七)-> QSlider
  • Oracle Exadata一体机简介 1千多个W
  • Caffeinated for Mac 防止屏幕睡眠工具
  • Trae官网炫酷特效与vue-bits的使用
  • 网站内页修改关键字抖音广告投放平台官网
  • Artstudio Pro for Mac 绘图与图片编辑软件
  • 上班没事做看什么网站wordpress主题官方
  • .NET Framework 4.0和Visual Studio 2010的串口通信类