一文对最新版本 Flink 反压机制全景深度解析-附源码
- 1. 反压形成的根本原因
-
- 2. Buffer 管理体系(反压的物理基础)
-
- 2.1 三层 Buffer 架构
- 2.2 NetworkBufferPool 核心源码
- 2.3 LocalBufferPool - 动态内存管理
- 3. Credit-Based Flow Control(信用流量控制)
-
- 3.1 核心思想
- 3.2 Credit 传递流程
- 3.3 RemoteInputChannel - Downstream Credit 管理
- 3.4 PartitionRequestServerHandler - Upstream Credit 处理
- 3.5 CreditBasedSequenceNumberingViewReader - 数据发送控制
- 4. 反压传播链路(多层次分析)
-
- 4.1 完整的反压传播路径
- 4.2 代码层面的阻塞点
- 4.3 反压传播的时序图
- 5. 上下游感知反压的机制
-
- 5.1 Downstream 如何感知压力?
- 5.2 Upstream 如何感知反压?
- 5.3 反压监控指标(Task Metrics)
- 6. 反压处理策略(源码级优化)
-
- 6.1 Buffer Debloating(动态缓冲区调整)
- 6.2 Overdraft Buffers(透支缓冲)
- 6.3 Credit 累积与批量通知
- 7. 反压的级联效应(系统视角)
-
- 7.1 反压如何影响 Checkpoint
- 7.2 反压与 Watermark 的交互
- 8. 完整示例:反压从产生到解决
-
- 8.1 场景:Kafka Source → Slow Sink
- 8.2 反压形成过程(时间线)
- 8.3 监控指标表现
- 8.4 解决方案
- 9. 核心源码路径总结
- 10. 总结
1. 反压形成的根本原因
1.1 反压的本质
数据生产速度 > 数据消费速度 ↓
缓冲区填满↓
生产者被阻塞↓
反压形成并向上游传播
1.2 反压产生的典型场景
场景1: 算子处理能力不足
Source (1000 records/s) → Map (1000 r/s) → Window (100 r/s) ← 瓶颈!↓反压产生场景2: 外部系统慢
Kafka Source → Transform → **Sink to DB** ← DB响应慢↓反压传播到Source场景3: 数据倾斜
Key "A": 90% 数据 ← 过载!
Key "B": 5% 数据
Key "C": 5% 数据场景4: GC 压力
大状态 + 频繁GC → Task线程暂停 → 反压
2. Buffer 管理体系(反压的物理基础)
2.1 三层 Buffer 架构
┌─────────────────────────────────────────────────────────────┐
│ Layer 1: NetworkBufferPool │
│ (全局共享 Buffer 池) │
├─────────────────────────────────────────────────────────────┤
│ • 固定大小的 MemorySegment 集合 │
│ • 由 TaskManager 启动时分配 │
│ • 默认 32KB per segment │
│ • 动态在 LocalBufferPool 之间重分配 │
└─────────────────────────────────────────────────────────────┘↓ 分配给 ↓ 分配给
┌──────────────────────────┐ ┌──────────────────────────┐
│ Layer 2: LocalBufferPool │ │ LocalBufferPool │
│ (InputGate 专属) │ │ (ResultPartition 专属) │
├──────────────────────────┤ ├──────────────────────────┤
│ • 每个 InputGate 一个 │ │ • 每个 ResultPartition │
│ • 可动态调整大小 │ │ 一个 │
│ • 支持 overdr专用
│ • Exclusive Buffers │ │ │
│ (专用于某个 channel) │ │ │
│ • Floating Buffers │ │ │
│ (共享于所有 channels) │ │ │
└──────────────────────────┘ └──────────────────────────┘
2.2 NetworkBufferPool 核心源码
public class NetworkBufferPool implements BufferPoolFactory {private final int totalNumberOfMemorySegments;private final int memorySegmentSize;private final ArrayDeque<MemorySegment> availableMemorySegments;private final Set<LocalBufferPool> allBufferPools = new HashSet<>();private List<MemorySegment> internalRequestMemorySegments(int numberOfSegmentsToRequest) throws IOException {final List<MemorySegment> segments = new ArrayList<>(numberOfSegmentsToRequest);final Deadline deadline = Deadline.fromNow(requestSegmentsTimeout);while (true) {if (isDestroyed) {throw new IllegalStateException("Buffer pool is destroyed.");}MemorySegment segment;synchronized (availableMemorySegments) {segment = internalRequestMemorySegment();if (segment == null) {availableMemorySegments.wait(2000);}}if (segment != null) {segments.add(segment);}if (segments.size() >= numberOfSegmentsToRequest) {break; }if (!deadline.hasTimeLeft()) {throw new IOException("Timeout requesting buffers");}}return segments;}
}
2.3 LocalBufferPool - 动态内存管理
public class LocalBufferPool implements BufferPool {private final ArrayDeque<MemorySegment> availableMemorySegments;private final int[] subpartitionBuffersCount;private int maxUsedBuffers;private MemorySegment requestMemorySegmentBlocking(int targetChannel)throws InterruptedException {MemorySegment segment;while ((segment = requestMemorySegment(targetChannel)) == null) {try {getAvailableFuture().get(); } catch (ExecutionException e) {LOG.error("The available future is completed exceptionally.", e);ExceptionUtils.rethrow(e);}}return segment;}@Nullableprivate MemorySegment requestMemorySegment(int targetChannel) {MemorySegment segment = null;synchronized (availableMemorySegments) {checkDestroyed();if (!availableMemorySegments.isEmpty()) {segment = availableMemorySegments.poll();}else if (isRequestedSizeReached()) {segment = requestOverdraftMemorySegmentFromGlobal();}if (segment == null) {return null; }if (targetChannel != UNKNOWN_CHANNEL) {subpartitionBuffersCount[targetChannel]++;if (subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {unavailableSubpartitionsCount++;}}checkAndUpdateAvailability();}return segment;}
}