当前位置: 首页 > news >正文

Flink NetworkBufferPool核心原理解析

Flink NetworkBufferPool 深度分析

NetworkBufferPool 是 Flink 网络栈的核心内存管理器。其基本工作原理是在启动时预先分配一块固定大小的内存(通常是堆外内存),并将其切分为多个大小相等的 MemorySegment。它本身不直接向任务提供 Buffer,而是作为一个工厂,为各个数据交换通道(如 ResultSubpartition 或 InputChannel)创建专属的 LocalBufferPool。

NetworkBufferPool 负责在这些 LocalBufferPool 之间进行 MemorySegment 的全局管理和动态分配,旨在实现高效的内存复用、减少 GC 压力,并为网络流控提供基础。

内存段的生命周期管理 (Memory Segment Lifecycle)

这是 NetworkBufferPool 最基础的职责:创建、存储、分配和销毁内存段。

​核心代码示例:​

// ...existing code...
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, Duration requestSegmentsTimeout) {// ...existing code...try {this.availableMemorySegments = new ArrayDeque<>(numberOfSegmentsToAllocate);} catch (OutOfMemoryError err) {// ...existing code...}try {for (int i = 0; i < numberOfSegmentsToAllocate; i++) {availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));}} catch (OutOfMemoryError err) {// ...existing code...}// ...existing code...
}
@Nullable
public MemorySegment requestPooledMemorySegment() {synchronized (availableMemorySegments) {return internalRequestMemorySegment();}
}
// ...existing code...
public void recyclePooledMemorySegment(MemorySegment segment) {// ...existing code...internalRecycleMemorySegments(Collections.singleton(checkNotNull(segment)));
}
public void destroy() {synchronized (factoryLock) {isDestroyed = true;}synchronized (availableMemorySegments) {MemorySegment segment;while ((segment = availableMemorySegments.poll()) != null) {segment.free();}}
}
// ...existing code...

​角色与价值:​

  • ​角色​​: 作为所有网络缓冲区的物理内存源头。它在构造函数中一次性申请所有内存,在 destroy 方法中统一释放。availableMemorySegments (一个 ArrayDeque) 作为空闲内存段的中央存储。

  • ​价值​​: 通过预分配和池化管理,完全避免了在运行时频繁地向操作系统申请和释放内存,极大地降低了系统调用开销和内存碎片,特别是对于堆外内存,有效规避了由 DirectByteBuffer 引起的 GC 问题。


两级缓冲池结构 (Two-Tier Pool Hierarchy)

NetworkBufferPool 并不直接被任务使用,而是通过创建 LocalBufferPool 来服务于具体的网络组件。

​核心代码示例:​

// ...existing code...
@Override
public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {return internalCreateBufferPool(numRequiredBuffers, maxUsedBuffers, 0, Integer.MAX_VALUE, 0);
}
private BufferPool internalCreateBufferPool(// ...) throws IOException {synchronized (factoryLock) {// ...// We are good to go, create a new buffer pool and redistribute// non-fixed size buffers.LocalBufferPool localBufferPool = new LocalBufferPool(this,numRequiredBuffers,// ...);allBufferPools.add(localBufferPool);if (numRequiredBuffers < maxUsedBuffers) {resizableBufferPools.add(localBufferPool);}redistributeBuffers();return localBufferPool;}
}
// ...existing code...

​角色与价值:​

  • ​角色​​: NetworkBufferPool 扮演“全局总管”和“工厂”的角色,LocalBufferPool 则是面向具体消费者的“区域代理”。这种分层结构清晰地隔离了全局资源管理和局部缓冲逻辑。

  • ​价值​​: 实现了关注点分离。NetworkBufferPool 专注于全局内存的分配和回收策略,而 LocalBufferPool 则处理具体任务的缓冲申请、流控、回收等逻辑。这种设计使得系统扩展性更强,管理更清晰。


动态缓冲区再分配 (Dynamic Buffer Redistribution)

这是 NetworkBufferPool 设计中的一个精髓。它能够根据当前所有 LocalBufferPool 的需求,动态地分配那些“浮动”的、非必需的缓冲区。

​核心代码示例:​

//\flink\flink-runtime\src\main\java\org\apache\flink\runtime\io\network\buffer\NetworkBufferPool.java
// ...existing code...
// Must be called from synchronized block
private void redistributeBuffers() {assert Thread.holdsLock(factoryLock);if (resizableBufferPools.isEmpty()) {return;}// All buffers, which are not among the required onesfinal int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;        // ...long totalCapacity = 0; // long to avoid int overflowfor (LocalBufferPool bufferPool : resizableBufferPools) {int excessMax = bufferPool.getMaxNumberOfMemorySegments()- bufferPool.getNumberOfRequiredMemorySegments();totalCapacity += Math.min(numAvailableMemorySegment, excessMax);}// ...// 计算每个可伸缩的 bufferPool 能获得的额外 buffer 数量for (LocalBufferPool bufferPool : resizableBufferPools) {// ...final int mySize = MathUtils.checkedDownCast(memorySegmentsToDistribute * totalPartsUsed / totalCapacity- numDistributedMemorySegment);numDistributedMemorySegment += mySize;bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);}// ...
}
// ...existing code...

​角色与价值:​

  • ​角色​​: 作为一个资源调度器。它首先保证每个 LocalBufferPool 获得其声明的“必需”(numRequiredBuffers)缓冲区,然后将剩余的缓冲区,按比例分配给那些有能力使用更多缓冲区的池(resizableBufferPools)。

  • ​价值​​: 实现了内存资源的公平性和利用率最大化。当系统负载不均时,可以将空闲的内存资源动态地倾斜给繁忙的数据通道,提高整体吞吐量。当新的任务加入或旧任务结束时,该机制能自动调整,使系统具备良好的弹性。


异步可用性通知 (Asynchronous Availability Notification)

当池中没有可用 Buffer 时,请求者不必阻塞等待,而是可以注册一个回调。

​核心代码示例:​

// flink-runtime\src\main\java\org\apache\flink\runtime\io\network\buffer\NetworkBufferPool.java
// ...existing code...
private final AvailabilityHelper availabilityHelper = new AvailabilityHelper();
// ...existing code...
private void internalRecycleMemorySegments(Collection<MemorySegment> segments) {CompletableFuture<?> toNotify = null;synchronized (availableMemorySegments) {if (availableMemorySegments.isEmpty() && !segments.isEmpty()) {toNotify = availabilityHelper.getUnavailableToResetAvailable();}availableMemorySegments.addAll(segments);availableMemorySegments.notifyAll();}if (toNotify != null) {toNotify.complete(null);}
}
// ...existing code...
@Override
public CompletableFuture<?> getAvailableFuture() {return availabilityHelper.getAvailableFuture();
}
// ...existing code...

​角色与价值:​

  • ​角色​​: AvailabilityHelper 结合 CompletableFuture 构成了一个非阻塞的通知机制。

  • ​价值​​: 这是支撑 Flink 网络栈实现高效异步 I/O 的关键。当消费者(如 InputChannel)请求 Buffer 失败时,它可以获取一个 Future 并注册回调,然后继续处理其他任务。当有 Buffer 被回收时,NetworkBufferPool 会 complete 这个 Future,触发回调,从而唤醒消费者继续读取数据。这避免了线程阻塞,提高了 CPU 利用率。


设计分析

独特的设计思想:

  • ​全局-局部两级管理模式​​: 如上所述,NetworkBufferPool (全局) + LocalBufferPool (局部) 的模式,是典型的“分而治之”思想。它将复杂的全局资源调度与具体的业务逻辑解耦,使得每一层的职责都非常清晰。

  • ​“必需”与“浮动”资源分离​​: 将 Buffer 需求区分为 numRequiredBuffers (必需的,保证任务能启动) 和 maxUsedBuffers - numRequiredBuffers (浮动的,用于提升性能) 两部分。这种设计为动态资源分配提供了基础,保证了系统的稳定性和弹性。

性能优化:

  • ​精细化锁策略​​: NetworkBufferPool 使用了多个锁(factoryLock 和 availableMemorySegments),而不是一个全局大锁。factoryLock 用于保护缓冲池的创建、销毁和重分配等管理操作,而 availableMemorySegments 锁则专门保护内存段队列的并发访问。这种分离降低了锁的粒度,减少了热点路径(如高频的 request/recycle 操作)的锁竞争。

  • ​无锁化的可用性通知​​: 使用 CompletableFuture 实现异步通知,替代了传统的 wait/notify 或 Condition。这使得等待方线程无需阻塞,可以被调度去执行其他任务,是典型的响应式编程范式应用,对提升系统吞吐量至关重要。


可迁移的应用经验

从 NetworkBufferPool 的设计中,我们可以提炼出以下通用设计原则:

原则一:池化与预分配

  • ​核心思想​​: 对于那些创建开销大、使用频繁的稀缺资源(如内存、数据库连接、线程),应采用预先创建并放入池中进行循环利用的模式。

  • ​应用场景示例​​: 设计一个高性能的日志服务。可以在服务启动时,预先创建一批 LogEvent 对象并放入对象池。当需要记录日志时,从池中获取一个对象,填充数据后传递给后台线程处理。处理完毕后,清空 LogEvent 对象并将其归还到池中,而不是销毁。这可以显著减少高并发下日志记录造成的内存分配和 GC 压力。

原则二:分层与分治

  • ​核心思想​​: 将一个复杂的系统分解为多个层次或模块,每个层次/模块有明确的职责边界。上层负责策略和协调,下层负责执行和具体实现。

  • ​应用场景示例​​: 构建一个多租户的云存储服务。可以设计一个两级配额管理系统。

    • ​全局管理器 (类似 NetworkBufferPool)​​: 负责管理整个集群的总存储容量,以及在不同租户之间分配基础配额和弹性配额。

    • ​租户管理器 (类似 LocalBufferPool)​​: 负责管理单个租户内部的存储使用,例如为该租户下的不同应用或用户分配空间,并执行具体的读写权限控制。

原则三:异步化与事件驱动 (Asynchronization and Event-driven)

  • ​核心思想​​: 在处理 I/O 密集型或需要等待资源的任务时,用异步回调或事件通知机制代替同步阻塞等待。调用者发起请求后立即返回,当资源可用或操作完成时,通过事件或 Future 回调来触发后续处理。

  • ​应用场景示例​​: 开发一个需要调用多个下游微服务的聚合网关。当网关收到一个请求后,它需要调用服务 A、B、C。

    • ​同步阻塞方式​​: 依次调用 A、B、C,总耗时 = A耗时 + B耗时 + C耗时。

    • ​异步化方式​​: 并行发起对 A、B、C 的异步调用,每个调用返回一个 CompletableFuture。然后使用 CompletableFuture.allOf() 等待所有结果返回。总耗时约等于最慢的那个服务的耗时。这极大地降低了请求延迟,提升了网关的并发处理能力。


Flink LocalBufferPool 深度分析

LocalBufferPool 是 Flink 两级内存管理体系中的"局部"缓冲池。它由全局的 NetworkBufferPool 创建,并服务于特定的网络数据交换实体(如一个结果分区或一个输入门)。

其核心目标是作为其所有者(例如一个任务)与全局 NetworkBufferPool 之间的中介,负责管理一个动态大小的 Buffer 子集。它处理来自任务的具体 Buffer 请求,实现局部的缓冲和流控(反压),并将使用完毕的 Buffer 回收给 NetworkBufferPool,从而实现了资源隔离和精细化管理。

缓冲区的申请与流转 (Buffer Request and Flow)

​角色与价值:​

  • 角色:作为缓冲区的直接提供者。它首先尝试从自己的本地可用队列 availableMemorySegments中获取 MemorySegment。如果本地没有,它会检查是否已达到其当前配额 (currentPoolSize)。若未达到,则会向全局的 NetworkBufferPool 申请新的 Segment。

  • 价值:这种机制实现了缓冲区的按需申请和本地缓存。它避免了每次请求都与全局池交互,降低了全局锁的竞争,提升了性能。同时,通过 currentPoolSize 限制,确保了单个 LocalBufferPool 不会无限制地消耗全局资源。

​核心代码示例:​

// flink-runtime\src\main\java\org\apache\flink\runtime\io\network\buffer\LocalBufferPool.java
// ...existing code...
@Nullable
private MemorySegment requestMemorySegment(int targetChannel) {MemorySegment segment = null;synchronized (availableMemorySegments) {checkDestroyed();if (!availableMemorySegments.isEmpty()) {segment = availableMemorySegments.poll();} else if (isRequestedSizeReached()) {// Only when the buffer request reaches the upper limit(i.e. current pool size),// requests an overdraft buffer.segment = requestOverdraftMemorySegmentFromGlobal();}if (segment == null) {return null;}// ...checkAndUpdateAvailability();}return segment;
}@GuardedBy("availableMemorySegments")
private boolean requestMemorySegmentFromGlobal() {assert Thread.holdsLock(availableMemorySegments);if (isRequestedSizeReached()) {return false;}MemorySegment segment = requestPooledMemorySegment();if (segment != null) {availableMemorySegments.add(segment);return true;}return false;
}
// ...existing code...

动态池大小与懒惰回收

​角色与价值:​

  • 角色:作为一个可伸缩的资源容器。setNumBuffers方法允许 NetworkBufferPool 调整其配额。当配额减少时,LocalBufferPool 并不立即强制回收正在使用的 Buffer,而是在 Buffer 被归还 (recycle) 时,通过检查 hasExcessBuffers()来判断是否需要将这个 Buffer 直接还给全局池,而不是放回本地池。

  • 价值:这种"懒惰回收"策略非常优雅。它避免了复杂且低效的"抢占"逻辑(即从使用者手中夺回 Buffer),简化了设计。资源在自然流转过程中平滑地调整,既实现了动态性,又保证了系统的高效运行。

​核心代码示例:​

// flink-runtime\src\main\java\org\apache\flink\runtime\io\network\buffer\LocalBufferPool.java
// ...existing code...
@Override
public void setNumBuffers(int numBuffers) {// ...synchronized (availableMemorySegments) {// ...currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);returnExcessMemorySegments();// ...}// ...
}private void recycle(MemorySegment segment, int channel) {// ...synchronized (availableMemorySegments) {// ...if (isDestroyed || hasExcessBuffers()) {returnMemorySegment(segment);return;} else {// ...}}// ...
}@GuardedBy("availableMemorySegments")
private boolean hasExcessBuffers() {return numberOfRequestedMemorySegments > currentPoolSize;
}
// ...existing code...

异步可用性与反压通知

​角色与价值:​

  • 角色:作为反压信号的媒介。当一个任务请求 Buffer 失败时,它可以将自己注册为一个 BufferListener。当有 Buffer 被回收时,LocalBufferPool 会从 registeredListeners队列中取出一个监听器并通知它 Buffer 可用。

  • 价值:这是典型的事件驱动设计。它将"拉"(Pull)模型(任务不断轮询 Buffer)转换为了"推"(Push)模型(池在 Buffer 可用时通知任务)。这避免了线程的忙等待,使得任务线程在没有 Buffer 时可以被释放去执行其他工作,极大地提高了 CPU 效率和系统吞吐量。

​核心代码示例:​

// flink-runtime\src\main\java\org\apache\flink\runtime\io\network\buffer\LocalBufferPool.java
// ...existing code...
@Override
public boolean addBufferListener(BufferListener listener) {synchronized (availableMemorySegments) {if (!availableMemorySegments.isEmpty() || isDestroyed) {return false;}registeredListeners.add(listener);return true;}
}private void recycle(MemorySegment segment, int channel) {BufferListener listener;// ...do {synchronized (availableMemorySegments) {// ...listener = registeredListeners.poll();if (listener == null) {availableMemorySegments.add(segment);// ...break;}// ...}} while (!fireBufferAvailableNotification(listener, segment));// ...
}
// ...existing code...

子分区配额管理

​角色与价值:​

  • 角色:作为一个更细粒度的资源控制器。它为每个子分区维护一个计数器 subpartitionBuffersCount,并确保其不超过 maxBuffersPerChannel的限制。

  • 价值:这是一种重要的公平性保障机制。它能有效防止因为某个下游消费者处理缓慢,导致其对应的子分区占用了本地池中所有的 Buffer,从而饿死其他正常的子分区。这保证了即使在部分反压的情况下,数据流也能在其他通道上继续进行。

​核心代码示例:​

// flink-runtime\src\main\java\org\apache\flink\runtime\io\network\buffer\LocalBufferPool.java
// ...existing code...
private MemorySegment requestMemorySegment(int targetChannel) {// ...synchronized (availableMemorySegments) {// ...if (targetChannel != UNKNOWN_CHANNEL) {if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {unavailableSubpartitionsCount++;}}checkAndUpdateAvailability();}return segment;
}private void recycle(MemorySegment segment, int channel) {// ...synchronized (availableMemorySegments) {if (channel != UNKNOWN_CHANNEL) {if (subpartitionBuffersCount[channel]-- == maxBuffersPerChannel) {unavailableSubpartitionsCount--;}}// ...}// ...
}
// ...existing code...

设计亮点

独特的设计思想:

  1. ​委托与代理模式​​:LocalBufferPool 完美扮演了 NetworkBufferPool 的代理角色。它向任务暴露了简单的 request/recycle 接口,同时内部封装了与全局池交互、动态调整、处理反压等复杂逻辑。

  2. ​信用/透支机制 (Overdraft Buffers)​​:maxOverdraftBuffersPerGate的设计允许在池大小已满的情况下,还能"透支"申请少量 Buffer。这为系统提供了额外的弹性,可以缓解一些临界状态下的死锁问题,提高鲁棒性。

性能优化:

  1. ​避免全局锁竞争​​:通过在 LocalBufferPool 缓存一定数量的 Buffer,绝大多数的申请和回收操作都只在本地池的锁(availableMemorySegments)下完成,显著减少了对 NetworkBufferPool 全局锁的访问,提高了并发性能。

  2. ​异步化交互​​:当本地池需要向全局池申请 Buffer 但全局池为空时,它会注册一个异步回调 (onGlobalPoolAvailable),而不是同步阻塞等待。这使得任务线程不会被 I/O 或资源等待所阻塞,是实现高吞吐异步处理的核心。


可迁移的应用经验

从 LocalBufferPool 的设计中,我们可以提炼出以下通用设计原则:

原则一:分层缓存与就近原则

​核心思想​​:对于全局共享资源,建立一个多级缓存/池化结构。高频操作应尽可能在最近的、小范围的"本地"缓存/池中完成,以减少对全局、昂贵的资源的访问。

​应用场景示例​​:设计一个多用户的配置中心。可以设计一个两级缓存:

  • 全局缓存 (类似 NetworkBufferPool):在配置中心服务端,用 Redis 或内存缓存所有配置。

  • 本地缓存 (类似 LocalBufferPool):在每个应用客户端,维持一个 JVM 内存缓存。客户端启动时从服务端拉取全量配置。日常请求首先访问本地缓存,只有在本地缓存没有或需要更新时,才向服务端发起请求。这大大降低了配置中心的网络负载和请求延迟。

原则二:懒惰状态更新

​核心思想​​:当收到一个状态变更指令时(如资源配额缩减),不要立即强制执行这个变更,特别是当执行成本很高或会干扰正在进行的操作时。相反,将状态变更的执行推迟到资源自然流转的某个环节。

​应用场景示例​​:实现一个动态调整线程池核心线程数的功能。当收到指令要减少核心线程数时,不是立即暴力 interrupt 正在执行任务的线程,而是仅仅更新线程池的目标核心数。线程池在自己的工作循环中,当一个工作线程完成任务变为空闲时,会检查当前总线程数是否超过了新的目标核心数,如果是,则该线程自行销毁。

原则三:基于监听器的异步回调

​核心思想​​:当一个操作因为资源不可用而无法立即完成时,不要让调用者同步等待或轮询。而是提供一个注册监听器的机制,让调用者可以注册一个回调函数。当资源变得可用时,由资源提供方主动调用该回调函数,通知调用者继续操作。

​应用场景示例​​:开发一个自定义的消息队列。当生产者要发送消息但队列已满时:

  • 同步阻塞方式:生产者线程阻塞,直到队列有空间。

  • 异步回调方式:send 方法立即返回一个 false 或抛出特定异常。调用者可以调用 addProducibleListener注册一个监听器。当消费者从队列中取出消息,腾出空间后,队列会触发该监听器,通知生产者可以再次尝试发送。这使得生产者线程可以不被阻塞,转而去处理其他任务。


文章转载自:

http://4m5qi4Wu.rqnzh.cn
http://dMwT7Xjz.rqnzh.cn
http://pXYq61eg.rqnzh.cn
http://m64adx3h.rqnzh.cn
http://rLiYqBWl.rqnzh.cn
http://2kPfLxs1.rqnzh.cn
http://rT08Rxtl.rqnzh.cn
http://aflTOxh4.rqnzh.cn
http://oYPALqYF.rqnzh.cn
http://omZLYRd4.rqnzh.cn
http://XAiSrM0u.rqnzh.cn
http://fWKvGsUW.rqnzh.cn
http://a2fE730c.rqnzh.cn
http://HhFguc6p.rqnzh.cn
http://UgK8IuAl.rqnzh.cn
http://SGHnrpL8.rqnzh.cn
http://FvhSrD9W.rqnzh.cn
http://nUD1e5lo.rqnzh.cn
http://OyU25PVO.rqnzh.cn
http://zKILamqK.rqnzh.cn
http://X9xoLISX.rqnzh.cn
http://rncLF4kl.rqnzh.cn
http://v00pW0Oq.rqnzh.cn
http://2wwWPyLd.rqnzh.cn
http://lX77gUy1.rqnzh.cn
http://x4LfD1I6.rqnzh.cn
http://pDAeAZQh.rqnzh.cn
http://cWMgDkFt.rqnzh.cn
http://gqoo8PJY.rqnzh.cn
http://lzr6BsjF.rqnzh.cn
http://www.dtcms.com/a/371712.html

相关文章:

  • python数据可视化之Matplotlib(8)-Matplotlib样式系统深度解析:从入门到企业级应用
  • Recharts:React图表库,组件化设计助力高效数据可视化开发
  • Linux知识清单
  • SpringMVC 入门详解: MVC 思想(附核心流程)
  • CMake简易使用教程
  • daily notes[13]
  • Solana 核心概念:计算单元与交易成本解析
  • 【系统分析师】第11章-关键技术:软件需求工程(核心总结)
  • 如何通过日志先行原则保障数据持久化:Redis AOF 和 MySQL redo log 的对比
  • 做好LoRaWAN的传感器都需要实现哪些功能点?
  • React入门 | React 新手入门与常用库和工具
  • jvm问题排查
  • C/C++数据结构之栈基础
  • 【Qt】项目的创建和各个控件的使用
  • Python高级技巧(七):装饰器
  • C#有人IO模块USR-IO808的完整指南
  • Apache Dubbo学习笔记-使用Dubbo发布、调用服务
  • CTFshow系列——PHP特性Web97-
  • Photoshop - Photoshop 创建图层蒙版
  • DevOps实战(3) - 使用Arbess+GitLab+Hadess实现Java项目自动化部署
  • Python从入门到精通_00_初识python
  • LabVIEW 与 PLC 通讯
  • 项目介绍:图像分类项目的最小可用骨架--代码细节讲解
  • 【.Net技术栈梳理】01-核心框架与运行时(CLR与GC)
  • 简述ajax、node.js、webpack、git
  • Java安全体系深度研究:技术演进与攻防实践
  • Drupal XSS漏洞复现:原理详解+环境搭建+渗透实践(CVE-2019-6341)
  • Mybatis常见问题
  • Python基础语法篇:布尔值是什么?True 和 False 的实际用途
  • FMI(Functional Mock-up Interface,功能模型接口)