当前位置: 首页 > news >正文

Netty对象池ObjectPool源码解析

ObjectPool 

io.netty.util.internal.ObjectPool 类的结构和源码实现。

ObjectPool 是 Netty 中一个轻量级的对象池接口。它的主要目的是提供一个通用的对象复用机制,以减少对象的创建和垃圾回收开销,从而提高应用程序的性能。

核心设计理念

ObjectPool 的设计非常简洁,它定义了对象池的基本行为:获取对象 (get()),以及如何创建新对象和回收对象(通过内部接口 ObjectCreator 和 Handle)。

其核心思想是委托

  • 对象创建的委托ObjectPool 本身不负责如何创建具体的对象实例,而是将这个责任委托给用户提供的 ObjectCreator 实现。
  • 对象回收的委托:当一个从池中获取的对象不再被使用时,用户通过与该对象关联的 Handle 来通知对象池,以便对象池可以回收并复用这个对象。

主要组成部分

ObjectPool 类本身是一个抽象类,并定义了几个关键的内部接口和静态工厂方法。


package io.netty.util.internal;import io.netty.util.Recycler;/*** Light-weight object pool.** @param <T> the type of the pooled object*/
public abstract class ObjectPool<T> {ObjectPool() { } // 包级私有构造函数,防止外部直接继承/*** Get a {@link Object} from the {@link ObjectPool}. The returned {@link Object} may be created via* {@link ObjectCreator#newObject(Handle)} if no pooled {@link Object} is ready to be reused.*/public abstract T get(); // 抽象方法,用于从池中获取对象/*** Handle for an pooled {@link Object} that will be used to notify the {@link ObjectPool} once it can* reuse the pooled {@link Object} again.* @param <T>*/public interface Handle<T> { // 句柄接口,用于回收对象/*** Recycle the {@link Object} if possible and so make it ready to be reused.*/void recycle(T self); // 回收对象的方法}/*** Creates a new Object which references the given {@link Handle} and calls {@link Handle#recycle(Object)} once* it can be re-used.** @param <T> the type of the pooled object*/public interface ObjectCreator<T> { // 对象创建者接口/*** Creates an returns a new {@link Object} that can be used and later recycled via* {@link Handle#recycle(Object)}.** @param handle can NOT be null.*/T newObject(Handle<T> handle); // 创建新对象的方法,需要传入一个 Handle}/*** Creates a new {@link ObjectPool} which will use the given {@link ObjectCreator} to create the {@link Object}* that should be pooled.*/public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {// 静态工厂方法,用于创建 ObjectPool 实例// 内部实际创建的是 RecyclerObjectPool 实例return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));}private static final class RecyclerObjectPool<T> extends ObjectPool<T> {// ObjectPool 的一个具体实现,基于 Netty 的 Recyclerprivate final Recycler<T> recycler; // 内部持有一个 Recycler 实例RecyclerObjectPool(final ObjectCreator<T> creator) {// 构造函数,初始化 Recyclerrecycler = new Recycler<T>() { // 匿名内部类继承 Recycler@Overrideprotected T newObject(Handle<T> handle) {// Recycler 的 newObject 方法委托给外部传入的 ObjectCreatorreturn creator.newObject(handle);}};}@Overridepublic T get() {// get 方法直接调用 Recycler 的 get 方法return recycler.get();}}
}
  • abstract class ObjectPool<T>:

    • ObjectPool(): 包级私有的构造函数,意味着这个抽象类不能在包外部被直接继承。通常,用户会通过其静态工厂方法 newPool() 来获取实例。
    • public abstract T get(): 核心方法,用于从对象池中获取一个类型为 T 的对象。如果池中有可用的对象,则返回一个复用的对象;否则,通常会通过 ObjectCreator 创建一个新的对象。
  • interface Handle<T>:

    • void recycle(T self): 这个接口定义了一个句柄,每个从池中获取的对象都会关联一个这样的句柄。当对象使用完毕后,调用此方法将对象自身传递回去,通知对象池该对象可以被回收复用了。
  • interface ObjectCreator<T>:

    • T newObject(Handle<T> handle): 这个接口定义了如何创建新的池化对象。实现者需要提供创建具体对象 T 的逻辑。关键在于,创建新对象时,必须将传入的 handle 与新创建的对象关联起来。这个 handle 就是将来用于回收该对象的凭证。
  • static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator):

    • 这是一个静态工厂方法,是获取 ObjectPool 实例的推荐方式。
    • 它接收一个 ObjectCreator 作为参数,并确保其不为 null
    • 内部实际上创建并返回了一个 RecyclerObjectPool 的实例。这意味着 ObjectPool 的默认实现是基于 Netty 强大的 Recycler 类。
  • final class RecyclerObjectPool<T> extends ObjectPool<T>:

    • 这是 ObjectPool 的一个私有静态内部实现类。
    • recycler: 它内部持有一个 io.netty.util.Recycler<T> 的实例。Recycler 是 Netty 中一个更复杂、线程本地优化的对象池实现。
    • 构造函数 RecyclerObjectPool(final ObjectCreator<T> creator):
      • 在构造时,它会创建一个 Recycler 的匿名子类实例。
      • 关键在于,它重写了 Recycler 的 newObject(Handle<T> handle) 方法,并将调用委托给了外部传入的 creator.newObject(handle)。这样,Recycler 在需要创建新对象时,就会使用用户定义的逻辑。
    • public T get():
      • 此方法直接调用其内部 recycler.get() 方法。Recycler 会负责从其线程本地缓存或者共享池中获取对象,或者在必要时创建新对象。

工作流程

  1. 创建对象池: 用户通过 ObjectPool.newPool(objectCreator) 创建一个对象池实例。用户需要提供一个 ObjectCreator 的实现,告诉对象池如何创建新的对象。

    // 示例:创建一个 PooledObject 的对象池
    ObjectPool.ObjectCreator<PooledObject> creator = new ObjectPool.ObjectCreator<PooledObject>() {@Overridepublic PooledObject newObject(ObjectPool.Handle<PooledObject> handle) {return new PooledObject(handle); // 创建新对象时,传入 handle}
    };
    ObjectPool<PooledObject> pool = ObjectPool.newPool(creator);
    
  2. 获取对象: 当需要一个对象时,调用 pool.get()

    • RecyclerObjectPool 会调用其内部 recycler.get()
    • Recycler 会尝试从其内部缓存(通常是线程本地的 Stack)中获取一个可用的对象。
    • 如果缓存为空,Recycler 会调用其 newObject(Handle<T> handle) 方法。
    • 由于 RecyclerObjectPool 重写了此方法,最终会调用用户提供的 creator.newObject(handle) 来创建一个新的对象。这个新创建的对象会与一个由 Recycler 内部管理的 Handle 相关联。
  3. 使用对象: 应用程序正常使用从池中获取的对象。

  4. 回收对象: 当对象使用完毕后,应用程序需要调用与该对象关联的 Handle 的 recycle(T self) 方法。

    // 假设 PooledObject 有一个 getHandle() 方法返回其关联的 Handle
    // 或者 Handle 本身就是对象的一部分
    PooledObject obj = pool.get();
    try {// ... 使用 obj ...
    } finally {obj.getAssociatedHandle().recycle(obj); // 回收对象
    }
    
    • 调用 handle.recycle(obj) 会通知 Recycler 该对象可以被回收。
    • Recycler 会将该对象放回其内部缓存中,以备后续的 get() 调用复用。

与 io.netty.util.Recycler 的关系

ObjectPool 可以看作是 Recycler 的一个简化和更通用的门面 (Facade)。Recycler 本身是一个非常强大且复杂的对象池,它考虑了多线程环境下的性能优化,使用了线程本地存储 (ThreadLocal) 来减少竞争,并有复杂的回收策略。

ObjectPool 通过封装 Recycler,为用户提供了一个更易于理解和使用的 API,同时保留了 Recycler 的性能优势。用户只需要关注如何创建对象 (ObjectCreator) 和如何通过 Handle 回收对象,而不需要了解 Recycler 内部的复杂机制。

使用场景示例

在 Netty 的源码中,ObjectPool 被广泛用于各种需要对象复用的场景,例如:

PooledByteBuf 的子类: 如 PooledUnsafeHeapByteBufPooledDuplicatedByteBufPooledUnsafeDirectByteBufPooledHeapByteBuf。它们都使用 ObjectPool (间接通过 Recycler) 来复用 ByteBuf 对象本身,而不是每次都创建新的 ByteBuf 实例。

PooledUnsafeHeapByteBuf.java

// ...
final class PooledUnsafeHeapByteBuf extends PooledHeapByteBuf {private static final ObjectPool<PooledUnsafeHeapByteBuf> RECYCLER = ObjectPool.newPool(new ObjectCreator<PooledUnsafeHeapByteBuf>() {@Overridepublic PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) {return new PooledUnsafeHeapByteBuf(handle, 0);}});static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {PooledUnsafeHeapByteBuf buf = RECYCLER.get(); // 从池中获取buf.reuse(maxCapacity);return buf;}// ...// 在 PooledByteBuf 的 release() 方法中,会调用 recyclerHandle.recycle(this)
}

PoolThreadCache.Entry: 在 PoolThreadCache 内部,用于缓存内存块信息的 Entry 对象也是通过 ObjectPool 进行管理的。

PoolThreadCache.java

// ...private static final class MemoryRegionCache<T> {// ...@SuppressWarnings("rawtypes")private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {@SuppressWarnings("unchecked")@Overridepublic Entry newObject(Handle<Entry> handle) {return new Entry(handle);}});// ...@SuppressWarnings("rawtypes")private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {Entry entry = RECYCLER.get(); // 获取 Entryentry.chunk = chunk;entry.nioBuffer = nioBuffer;entry.handle = handle;entry.normCapacity = normCapacity;return entry;}// Entry 的 recycle() 方法会调用其 recyclerHandle.recycle(this)}
// ...

总结

ObjectPool 是 Netty 提供的一个简洁、高效的对象池抽象。它通过将对象的创建和回收逻辑委托给用户,并利用内部强大的 Recycler 实现,为开发者提供了一种方便的方式来复用对象,减少 GC 压力,提升应用性能。其设计清晰,易于使用,是 Netty 高性能基础设施的重要组成部分。

Recycler类结构分析

Recycler 是 Netty 提供的一个轻量级对象池,它基于 ThreadLocal 实现,旨在减少对象分配和垃圾回收的开销,从而提高应用程序性能。

主要结构和特性

  1. 泛型设计 (<T>)Recycler<T>  可以用于回收和重用任何类型的对象 T
  2. 抽象方法 newObject(Handle<T> handle): 这是 Recycler 的核心抽象方法。子类必须实现此方法来定义如何创建一个新的对象实例。Handle 参数用于将新创建的对象与回收器关联起来,以便后续回收。
  3. ThreadLocal 存储 (FastThreadLocal<LocalPool<T>> threadLocal):
    • 每个线程都拥有一个独立的 LocalPool 实例。这避免了多线程竞争,提高了并发性能。
    • Netty 使用了其自定义的 FastThreadLocal,它针对 Netty 的线程模型(尤其是 FastThreadLocalThread)进行了优化。
    • initialValue(): 当线程首次访问时,会创建一个新的 LocalPool
    • onRemoval(): 当线程结束或 FastThreadLocal 被移除时,会清理 LocalPool 中的资源,例如清空其内部的 pooledHandles 队列。
  4. 配置参数:
    • DEFAULT_MAX_CAPACITY_PER_THREAD: 每个线程的 LocalPool 默认最大容量。可以通过系统属性 io.netty.recycler.maxCapacityPerThread 或 io.netty.recycler.maxCapacity 配置。
    • RATIO: 控制对象回收的频率。默认情况下,每尝试回收8个从未被回收过的对象,才真正允许一个对象进入池中。这有助于防止池容量因突发分配而过度膨胀。可以通过系统属性 io.netty.recycler.ratio 配置。
    • DEFAULT_QUEUE_CHUNK_SIZE_PER_THREADLocalPool 从共享队列(如果使用)或其内部队列批量获取/释放对象的块大小。可以通过系统属性 io.netty.recycler.chunkSize 配置。
    • BLOCKING_POOL: 是否使用阻塞队列 (BlockingMessageQueue) 作为 LocalPool 内部的 pooledHandles。默认为 false,使用 PlatformDependent.newMpscQueue() 创建的非阻塞多生产者单消费者(MPSC)队列。可以通过系统属性 io.netty.recycler.blocking 配置。
    • BATCH_FAST_TL_ONLY: 一个优化选项,决定 LocalPool 的 owner 线程是否仅限于 FastThreadLocalThread
  5. 核心方法:
    • get(): 从对象池中获取一个对象。如果池为空,则调用 newObject() 创建一个新对象。
    • recycle(T o, Handle<T> handle) (已废弃,推荐使用 Handle#recycle(Object)): 将对象 o 通过其关联的 Handle 回收到池中。
  6. Handle<T> 接口与 DefaultHandle<T> 实现:
    • Handle<T> 是一个标记接口,代表池中对象的句柄 ObjectPool.Handle<T>。
    • DefaultHandle<T> 是 Handle 的具体实现,它持有实际的对象 value 和一个指向其所属 LocalPool 的引用。
    • DefaultHandle 内部使用 AtomicIntegerFieldUpdater 来管理其状态 (STATE_CLAIMED 或 STATE_AVAILABLE),确保线程安全地更新对象是否可用的状态。
  7. LocalPool<T> 内部类:
    • 这是实际存储和管理回收对象的单元,每个线程一个。
    • pooledHandles: 一个 MessagePassingQueue<DefaultHandle<T>>,用于存储可用的 DefaultHandle 对象。根据 BLOCKING_POOL 的配置,它可以是 BlockingMessageQueue 或 MPSC 队列。
    • batch: 一个 ArrayDeque<DefaultHandle<T>>,作为从 pooledHandles 批量获取或向其批量释放对象的本地缓存,减少对 pooledHandles 的直接操作次数,提高效率。
    • ownerLocalPool 所属的线程。用于判断是否可以直接将对象放入 batch
    • ratioCounter 和 ratioInterval: 用于实现 RATIO 配置的回收策略。只有当 ratioCounter 达到 ratioInterval 时,newHandle() 才会真正创建一个新的 DefaultHandle,否则返回 null,意味着新创建的对象不会被池化管理(除非通过其他方式获取到 Handle)。
    • claim(): 从 LocalPool 中获取一个 DefaultHandle。它首先尝试从 batch 中获取,如果 batch 为空,则从 pooledHandles 中批量 drain 一些 DefaultHandle 到 batch 中。
    • release(DefaultHandle<T> handle, boolean guarded): 将一个 DefaultHandle 释放回 LocalPool。如果当前线程是 owner 且 batch 未满,则直接放入 batch;否则,放入 pooledHandles 队列。它还会检查 owner 线程是否已终止,如果是,则清空池。
  8. NOOP_HANDLE: 一个特殊的 Handle 实现,它的 recycle 方法是空操作。当 maxCapacityPerThread 为0,或者在不支持 FastThreadLocal 的虚拟线程上时,新创建的对象会关联到这个 NOOP_HANDLE,意味着这些对象不会被回收。

类层次结构

主要组件关系

Recycler<T>
├── Handle<T> (接口) - 对象句柄
│   └── EnhancedHandle<T> (抽象类)
│       └── DefaultHandle<T> (实现类)
├── LocalPool<T> - 线程本地对象池
└── BlockingMessageQueue<T> - 调试用的阻塞队列

Handle层次

  • ObjectPool.Handle<T>: Netty 内部 通用的对象池句柄接口。
  • Recycler.Handle<T> extends ObjectPool.Handle<T>Recycler 特有的句柄接口,目前只是继承,没有添加新方法。
  • Recycler.EnhancedHandle<T> implements Recycler.Handle<T>: 一个抽象基类,引入了 unguardedRecycle 方法。设计为 Recycler 内部使用。
  • Recycler.DefaultHandle<T> extends EnhancedHandle<T>Handle 的具体实现,包含了上述所有核心逻辑。

Handle接口

public interface Handle<T> extends ObjectPool.Handle<T> { }

EnhancedHandle抽象类

public abstract static class EnhancedHandle<T> implements Handle<T> {public abstract void unguardedRecycle(Object object);
}

DefaultHandle实现

private static final class DefaultHandle<T> extends EnhancedHandle<T> {private static final int STATE_CLAIMED = 0;    // 已获取状态private static final int STATE_AVAILABLE = 1;  // 可用状态private volatile int state;private final LocalPool<T> localPool;private T value;
}

Recycler、Handle、LocalPool 之间的关系和设计原因

这三个组件协同工作,构成了一个高效的、基于线程本地存储的对象池。

  • Recycler<T> (对象回收器/池的入口):

    • 角色: 用户与对象池交互的顶层 API。用户通过它来获取 (get()) 对象和(间接地通过 Handle)回收对象。
    • 核心机制: 它内部使用了一个 FastThreadLocal<LocalPool<T>> threadLocal。这意味着每个线程都有自己独立的 LocalPool 实例。这是 Recycler 高性能的关键,因为它极大地减少了线程间的同步开销。
    • 职责:
      • 提供 get() 方法从池中获取对象。
      • 定义 protected abstract T newObject(Handle<T> handle); 方法,由子类实现,用于在池为空时创建新的对象实例。
  • LocalPool<T> (线程本地池):

    • 角色: 实际存储和管理当前线程可回收对象的单元。每个 Recycler 实例在每个线程中都会有一个对应的 LocalPool
    • 核心组件:
      • MessagePassingQueue<DefaultHandle<T>> pooledHandles: 一个队列(通常是 MPSC 无锁队列或在特定配置下的阻塞队列),用于存储当前线程可用的 DefaultHandle 对象。MPSC 队列允许其他线程安全地将对象回收给这个 LocalPool
      • ArrayDeque<DefaultHandle<T>> batch: 一个本地的、非线程安全的双端队列,作为 pooledHandles 的一个批处理缓存。当从 pooledHandles 取出或放入时,会先经过这个 batch,以减少对 pooledHandles 的直接操作次数,提高效率。
      • Thread owner: 记录拥有此 LocalPool 的线程。
    • 职责:
      • claim(): 尝试从 batch 或 pooledHandles 中获取一个可用的 DefaultHandle
      • release(DefaultHandle<T> handle, boolean guarded): 将一个 DefaultHandle 回收进池中(优先放入 batch,其次是 pooledHandles)。
      • newHandle(): 根据一定的比率 (ratio) 控制是否创建新的 DefaultHandle 实例。
  • Handle<T> (具体为 DefaultHandle<T>) (对象的句柄/包装器):

    • 角色: 代表池中的一个可回收对象。它包装了实际的对象 (value),并持有其回收状态 (state)。用户通过 Handle 来将对象归还给池。
    • 核心组件 (DefaultHandle):
      • volatile int state: 标记对象是已被占用 (STATE_CLAIMED) 还是可用 (STATE_AVAILABLE)。
      • LocalPool<T> localPool: 一个指向其所属的 LocalPool 的引用。当调用 recycle 时,Handle 需要知道应该把自己放回哪个池。
      • T value: 实际被池化的对象。
    • 职责:
      • recycle(Object object): 当用户用完对象后,调用此方法。它会校验对象,然后调用其 localPool 的 release 方法将自身(Handle)放回池中。
      • get(): 获取 Handle 包装的实际对象。
      • set(T value): 设置 Handle 包装的实际对象。
      • 状态转换方法如 toClaimed() 和 toAvailable()

为什么要这样设计和互相引用?

  1. 高性能与低竞争:

    • Recycler 使用 FastThreadLocal 为每个线程创建一个 LocalPool,使得绝大多数的对象获取和回收操作都在线程内部完成,避免了全局锁和跨线程同步的开销。
    • LocalPool 中的 batch 进一步减少了对底层共享队列 pooledHandles 的访问频率。
    • MPSC 队列 (pooledHandles) 允许高效地处理跨线程回收的场景(一个线程创建的对象被另一个线程回收)。
  2. 封装与职责分离:

    • Recycler 作为用户接口,隐藏了复杂的内部实现。
    • LocalPool 管理线程本地的池化逻辑。
    • Handle 封装了被池化对象及其状态,并提供了回收的入口。
  3. 对象生命周期管理:

    • 当调用 Recycler.get() 时:
      1. Recycler 获取当前线程的 LocalPool
      2. LocalPool.claim() 尝试从池中获取一个 DefaultHandle
      3. 如果池中没有,LocalPool.newHandle() 会创建一个新的 DefaultHandle(如果满足 ratio 条件),然后 Recycler 的 newObject(handle) 方法会被调用来创建实际的对象 T,并将其设置到 DefaultHandle 中。
      4. 如果池中有可用的 DefaultHandle,则直接使用其关联的 value
    • 当调用 Handle.recycle(object) 时:
      1. DefaultHandle 校验传入的 object
      2. DefaultHandle 调用其持有的 localPool 引用上的 release() 方法,将自己(DefaultHandle)放回池中。

为什么需要 Handle

  1. 解耦 (Decoupling)Handle 将池化逻辑(如对象的状态、如何回收等)与被池化的对象(业务对象 T)分离开来。业务对象 T 不需要实现任何特殊的接口或包含额外的字段来支持池化。Recycler 通过 Handle 来间接操作和管理 T
  2. 状态管理 (State Management): 池中的对象有两种基本状态:正在被使用(claimed)或空闲可用(available)。Handle (具体是 DefaultHandle 中的 state 字段) 负责精确地追踪这个状态,这对于防止对象被错误地多次回收或在被使用时被回收至关重要。
  3. 归属管理 (Ownership and Return Path)Handle 知道它属于哪个 LocalPool (线程本地池)。当你调用 handle.recycle(object) 时,Handle 能够将自身(以及它包装的对象)送回到正确的 LocalPool 中。
  4. 统一接口 (Uniform Interface)Recycler 需要一种统一的方式来处理所有类型的池化对象。Handle 提供了这个统一的抽象层。

Handle (特别是 DefaultHandle) 做了什么?

让我们分析 DefaultHandle<T> 的关键组成部分和行为:

  • private T value;

    • 作用: 这个字段持有实际被池化的对象实例。当你从 Recycler 获取一个对象时,你最终得到的就是这个 value
  • private volatile int state;

    • 作用: 这是一个至关重要的字段,用于标记当前 Handle (以及其 value) 的状态。
      • STATE_CLAIMED (0): 表示该 Handle 及其关联的 value 当前正被应用程序使用,尚未被回收。
      • STATE_AVAILABLE (1): 表示该 Handle 及其关联的 value 已经被回收,可以被对象池重新分配和复用。
    • 初始化: 当一个新的 DefaultHandle 实例被创建时,state 字段默认为 0 (即 STATE_CLAIMED)。这是因为新创建的 Handle 通常是为了包装一个即将被使用的新对象,所以它一开始就是“被占用”状态。注释 // State is initialised to STATE_CLAIMED (aka. 0) so they can be released. 指的是它初始为 CLAIMED 状态,因此后续可以被 release (即回收)。
    • 更新STATE_UPDATER (一个 AtomicIntegerFieldUpdater) 用于原子地更新这个状态,确保线程安全。
  • private final LocalPool<T> localPool;

    • 作用: 这个字段持有一个对其所属的 LocalPool (线程本地池) 的引用。
    • 必要性: 当调用 handle.recycle(object) 时,DefaultHandle 需要知道应该将自己归还给哪个具体的 LocalPool 实例。因为 Recycler 是基于 FastThreadLocal 的,每个线程都有自己的 LocalPool
  • DefaultHandle(LocalPool<T> localPool) (构造函数)

    • 作用: 初始化 DefaultHandle,最重要的是保存 localPool 的引用。
  • public void recycle(Object object) 和 public void unguardedRecycle(Object object)

    • 作用: 这是用户将对象归还给池的入口点。
    • 行为:
      1. 首先检查传入的 object 是否确实是当前 Handle 所持有的 value,防止错误回收。
      2. 然后调用 localPool.release(this, boolean guarded) 方法,将当前 DefaultHandle 实例(this)交由 LocalPool 处理回收逻辑。guarded 参数决定了回收时的检查严格程度。
  • T get() 和 void set(T value)

    • 作用get() 用于从 Handle 中获取实际的池化对象 valueset() 用于在 Handle 被创建并与一个新的对象关联时,设置其 value
  • void toClaimed()

    • 作用: 当一个 Handle 从 LocalPool 中被取出(即一个对象被认领)时调用。
    • 行为: 断言当前状态必须是 STATE_AVAILABLE,然后通过 STATE_UPDATER.lazySet(this, STATE_CLAIMED) 将状态原子地设置为 STATE_CLAIMED
  • void toAvailable() 和 void unguardedToAvailable()

    • 作用: 当一个 Handle 被回收进 LocalPool 时调用。
    • 行为:
      • toAvailable(): 使用 STATE_UPDATER.getAndSet(this, STATE_AVAILABLE) 原子地设置状态为 STATE_AVAILABLE,并检查之前的状态,如果之前已经是 STATE_AVAILABLE,则抛出异常(防止重复回收)。
      • unguardedToAvailable(): 类似地设置状态为 STATE_AVAILABLE,但使用 lazySet,并且其重复回收检查是在 lazySet 之前。

对象获取: get()

Recycler.java

// ... existing code ...@SuppressWarnings("unchecked")public final T get() {// 1. 如果最大容量为0,或者当前是虚拟线程且不支持FastThreadLocal,则直接创建新对象,不走池化逻辑if (maxCapacityPerThread == 0 ||(PlatformDependent.isVirtualThread(Thread.currentThread()) &&!FastThreadLocalThread.currentThreadHasFastThreadLocal())) {return newObject((Handle<T>) NOOP_HANDLE);}// 2. 获取当前线程的 LocalPoolLocalPool<T> localPool = threadLocal.get();// 3. 尝试从 LocalPool 中声明一个 HandleDefaultHandle<T> handle = localPool.claim();T obj;if (handle == null) { // 4. 如果没有可用的 Handle// 4a. 尝试创建一个新的 Handle (受 RATIO 控制)handle = localPool.newHandle();if (handle != null) {// 4b. 如果成功创建 Handle,则创建新对象并关联obj = newObject(handle);handle.set(obj);} else {// 4c. 如果 RATIO 限制了 Handle 的创建,则创建新对象,但不池化 (使用 NOOP_HANDLE)obj = newObject((Handle<T>) NOOP_HANDLE);}} else {// 5. 如果获取到已有的 Handle,直接获取其关联的对象obj = handle.get();}return obj;}
// ... existing code ...

逻辑解释:

  1. 首先检查是否启用了回收 (maxCapacityPerThread > 0) 以及当前线程环境是否适合使用 FastThreadLocal(特别是针对虚拟线程)。如果不满足条件,则直接调用 newObject() 创建一个与 NOOP_HANDLE 关联的新对象,该对象不会被回收。
  2. 通过 threadLocal.get() 获取当前线程绑定的 LocalPool 实例。如果这是线程第一次调用,会触发 threadLocal.initialValue() 来创建一个新的 LocalPool
  3. 调用 localPool.claim() 尝试从池中获取一个可用的 DefaultHandle
  4. 如果 claim() 返回 null(池中没有可用的 DefaultHandle): a. 调用 localPool.newHandle()。这个方法会根据 RATIO 配置来决定是否真的创建一个新的 DefaultHandle。 b. 如果 newHandle() 成功返回了一个新的 DefaultHandle,则调用用户实现的 newObject(handle) 来创建业务对象,并将该对象设置到 handle 中。 c. 如果 newHandle() 返回 null(因为 RATIO 的限制),则同样调用 newObject(),但传递的是 NOOP_HANDLE,意味着这个新对象不会被池管理。
  5. 如果 claim() 返回了一个非 null 的 DefaultHandle,说明从池中成功获取了一个之前回收的对象。直接通过 handle.get() 获取该对象实例。

对象回收

DefaultHandle.recycle(Object object)  和  LocalPool.release(DefaultHandle<T> handle, boolean guarded)

当用户调用 handle.recycle(object) 时:

Recycler.java

// ... existing code ...private static final class DefaultHandle<T> extends EnhancedHandle<T> {
// ... existing code ...@Overridepublic void recycle(Object object) {if (object != value) {throw new IllegalArgumentException("object does not belong to handle");}// 委托给 LocalPool 的 release 方法localPool.release(this, true);}@Overridepublic void unguardedRecycle(Object object) {if (object != value) {throw new IllegalArgumentException("object does not belong to handle");}// 委托给 LocalPool 的 release 方法,unguarded 表示不做状态检查的强制设置localPool.release(this, false);}
// ... existing code ...void toAvailable() {// 使用 CAS 更新状态为 AVAILABLEint prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);if (prev == STATE_AVAILABLE) {throw new IllegalStateException("Object has been recycled already.");}}void unguardedToAvailable() {int prev = state;if (prev == STATE_AVAILABLE) {throw new IllegalStateException("Object has been recycled already.");}// 使用 lazySet 更新状态,性能稍好,但有序性保证较弱STATE_UPDATER.lazySet(this, STATE_AVAILABLE);}}private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
// ... existing code ...void release(DefaultHandle<T> handle, boolean guarded) {if (guarded) {// 1. 将 Handle 状态设置为 AVAILABLE (原子操作,防止重复回收)handle.toAvailable();} else {handle.unguardedToAvailable();}Thread owner = this.owner;// 2. 如果当前线程是 LocalPool 的所有者,并且本地批处理队列 batch 未满if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {// 2a. 直接将 Handle 添加到 batch 中accept(handle); // batch.addLast(handle);} else if (owner != null && isTerminated(owner)) {// 3. 如果 owner 线程已终止,则清空池this.owner = null;pooledHandles = null;// 4. 否则 (非 owner 线程,或 batch 已满,或 owner 为 null 但 pooledHandles 存在)} else {MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;if (handles != null) {// 4a. 将 Handle 添加到 pooledHandles 队列中 (通常是 MPSC 队列)handles.relaxedOffer(handle);}}}
// ... existing code ...@Overridepublic void accept(DefaultHandle<T> e) {batch.addLast(e);}
// ... existing code ...}
// ... existing code ...

逻辑解释:

  1. DefaultHandle.recycle() 首先会校验传入的对象是否是当前 Handle 所持有的对象。然后调用 localPool.release(this, true)unguardedRecycle 类似,但调用 localPool.release(this, false)
  2. LocalPool.release():
    1. a. 根据 guarded 参数,调用 handle.toAvailable() 或 handle.unguardedToAvailable() 将 Handle 的状态(原子地)更新为 STATE_AVAILABLE。如果对象已经被回收,会抛出 IllegalStateException
    2. b. 优化路径: 如果当前线程 (Thread.currentThread()) 正是此 LocalPool 的 owner 线程,并且其本地缓存 batch (一个 ArrayDeque) 尚未满 (batch.size() < chunkSize),则直接将 handle 添加到 batch 的末尾。这是一种快速路径,避免了对共享队列 pooledHandles 的操作。
    3. c. Owner 终止处理: 如果 owner 线程已经终止 (isTerminated(owner) 返回 true),则认为此 LocalPool 不再有效,将其 owner 和 pooledHandles 都置为 null,相当于清空了这个线程的池。
    4. d. 常规路径: 如果不满足快速路径条件(例如,回收操作发生在不同的线程,或者 batch 已满,或者 owner 为 null 但 pooledHandles 仍然存在),并且 pooledHandles 队列不为 null,则调用 pooledHandles.relaxedOffer(handle) 将 handle 放回主存储队列。relaxedOffer 通常是 MPSC 队列的非阻塞或低竞争入队操作。

LocalPool.claim() - 从池中获取 Handle

Recycler.java

// ... existing code ...private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
// ... existing code ...DefaultHandle<T> claim() {MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;// 1. 如果 pooledHandles 为 null (可能已被清理),则无法获取if (handles == null) {return null;}// 2. 如果本地批处理队列 batch 为空if (batch.isEmpty()) {// 2a. 从 pooledHandles 队列中批量取出最多 chunkSize 个 Handle 到 batch 中//    this (LocalPool) 实现了 MessagePassingQueue.Consumer<DefaultHandle<T>>接口//    其 accept(DefaultHandle<T> e) 方法会将 e 添加到 batch 的末尾handles.drain(this, chunkSize);}// 3. 从 batch 的末尾取出一个 Handle (LIFO 行为)DefaultHandle<T> handle = batch.pollLast();if (null != handle) {// 4. 如果成功取出,将其状态设置为 CLAIMEDhandle.toClaimed();}return handle;}}
// ... existing code ...

逻辑解释:

  1. 首先检查 pooledHandles 是否为 null。如果是,则表示池不可用,返回 null
  2. 检查本地缓存 batch 是否为空。
    1. 如果 batch 为空,则调用 handles.drain(this, chunkSize)。这个操作会尝试从 pooledHandles 队列中取出最多 chunkSize 个 DefaultHandle 对象。drain 方法会将取出的每个对象传递给 this(即 LocalPool 实例)的 accept 方法。
    2.  LocalPool.accept(DefaultHandle<T> e) 方法会将传入的 handle 添加到 batch 的末尾 (batch.addLast(e))。
  3. 从 batch 的末尾 (batch.pollLast()) 取出一个 DefaultHandle。这使得 batch 表现为栈(LIFO)。
  4. 如果成功从 batch 中获取到一个 handle,则调用 handle.toClaimed() 将其状态设置为 STATE_CLAIMED,表示该对象已被取出并正在使用。

LocalPool.newHandle() 创建新的 Handle (受 RATIO 控制)

Recycler.java

// ... existing code ...private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
// ... existing code ...DefaultHandle<T> newHandle() {// 1. ratioCounter 自增if (++ratioCounter >= ratioInterval) {// 2. 如果 ratioCounter 达到或超过 ratioInterval (默认为 RATIO 配置值)//    则重置 ratioCounter,并创建一个新的 DefaultHandleratioCounter = 0;return new DefaultHandle<T>(this);}// 3. 否则,返回 null,表示此次不创建池化的 Handlereturn null;}
// ... existing code ...}
// ... existing code ...

逻辑解释:

  1. 每次调用 newHandle() 时,ratioCounter 都会自增。
  2. 只有当 ratioCounter 达到 ratioInterval(在 Recycler 构造函数中初始化,通常等于静态配置 RATIO)时,才会真正创建一个新的 DefaultHandle<T> 实例,并将 ratioCounter 重置为0。
  3. 如果 ratioCounter 未达到 ratioInterval,则返回 null。这意味着,并非每次需要新对象时都会创建一个可被池化的 Handle。这个机制(RATIO)用于平滑池的增长,避免因短暂的分配高峰导致池急剧膨胀。当 newHandle() 返回 null 时,Recycler.get() 方法会创建一个与 NOOP_HANDLE 关联的对象,这个对象在使用完毕后不会被回收。

性能优化特性

批处理机制

  • 本地batch:减少对共享队列的访问
  • 批量拖拽:一次性从队列获取多个对象到batch
  • chunkSize控制:平衡内存使用和性能

虚拟线程支持

if (PlatformDependent.isVirtualThread(Thread.currentThread()) &&!FastThreadLocalThread.currentThreadHasFastThreadLocal()) {return newObject((Handle<T>) NOOP_HANDLE);  // 虚拟线程使用NOOP
}

线程终止检测

private static boolean isTerminated(Thread owner) {// J9 JVM优化:避免使用getState()的性能问题return PlatformDependent.isJ9Jvm() ? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
}

内部类 BlockingMessageQueue<T>

Recycler.java 文件中

这个类 BlockingMessageQueue<T> 是 MessagePassingQueue<T> 接口的一个实现。根据注释,它的主要设计目标和特点如下:

  1. 调试用途

    • 它是一个主要用于调试目的的消息传递队列实现。
    • 它提供了与 PlatformDependent#newMpscQueue(int) 可能返回的队列类似的功能,但实现方式更简单,易于理解和调试。
  2. 线程安全

    • 通过在几乎所有公共方法上使用 synchronized 关键字(同步监视器锁,锁的是 this 即 BlockingMessageQueue 实例本身),来保证线程安全。这意味着在任何时候只有一个线程可以执行该实例的任何同步方法。
  3. 底层数据结构

    • 内部使用 java.util.ArrayDeque<T> 作为实际存储元素的队列。
    • 选择 ArrayDeque 的原因在注释中有详细说明:
      • 比 LinkedList 或 LinkedBlockingQueue 更节省空间。
      • 需要队列 API,所以不选择 ArrayList
      • ConcurrentLinkedQueue 是无界的,并且其 size() 操作是 O(n) 复杂度的,而这里需要有界队列。
      • ArrayBlockingQueue 会预先分配其最大容量的内存,而这些队列通常容量很大(每个线程可能一个),但实际存储的元素可能相对较少,ArrayDeque 则可以按需增长(虽然在这个实现中,外部逻辑通过 maxCapacity 限制了其大小)。
  4. 有界队列

    • 构造函数接收一个 maxCapacity 参数,用于限制队列可以存储的最大元素数量。
    • offer(T e) 方法在队列已满时会返回 false,不会阻塞。
  5. 核心方法实现

    • offer(T e): 尝试将元素添加到队列尾部,如果队列未满则成功并返回 true,否则返回 false
    • poll(): 移除并返回队列头部的元素,如果队列为空则返回 null
    • peek(): 返回队列头部的元素但不移除,如果队列为空则返回 null
    • size(): 返回队列中的元素数量。
    • clear(): 清空队列。
    • isEmpty(): 检查队列是否为空。
    • capacity(): 返回队列的最大容量。
  6. relaxedOfferrelaxedPollrelaxedPeek

    • 这些 "relaxed" 版本的方法直接调用了它们对应的同步版本 (offerpollpeek)。在这个特定的 BlockingMessageQueue 实现中,它们并没有提供比同步版本更宽松的语义或更高的性能(因为它们仍然是同步的)。
  7. drain(Consumer<T> c, int limit)

    • 这个方法会从队列中取出最多 limit 个元素,并传递给提供的 Consumer<T> c 进行处理。它会返回实际取出的元素数量。
  8. 不支持的操作

    • 注释明确指出,批量的 fill 操作以及部分 drain 的重载方法(如无 limit 参数的 drain,以及带有 WaitStrategy 和 ExitCondition 的 drain/fill)在此实现中均不支持,调用它们会抛出 UnsupportedOperationException。这进一步印证了它是一个相对简单、用于特定(调试)目的的实现。

      总结

      Netty 的 Recycler 通过以下机制实现高效的对象池:

      • 线程局部化: 每个线程拥有自己的 LocalPool,避免了跨线程同步的开销。
      • 批量操作LocalPool 中的 batch (ArrayDeque) 和 chunkSize 参数允许批量从主队列 pooledHandles 获取或向其归还对象,减少了对主队列的访问频率。
      • MPSC 队列: 默认情况下,pooledHandles 使用 MPSC (Multi-Producer Single-Consumer) 队列,适合于多个生产者线程(其他线程归还对象)和一个消费者线程(LocalPool 的 owner 线程获取对象)的场景。
      • 回收比率 (RATIO): 通过 RATIO 控制新创建对象被池化的概率,防止池大小对短期分配突发过于敏感,有助于稳定池的容量。
      • 原子状态更新DefaultHandle 使用 AtomicIntegerFieldUpdater 来安全地更新其可用状态。
      • FastThreadLocal 优化: 针对 Netty 的 FastThreadLocalThread 进行了优化。

      这种设计使得 Recycler 在高并发场景下能够提供低延迟的对象分配和回收,是 Netty 高性能特性的一个重要组成部分。

      相关文章:

    1. 网站开发社交网络功能的作用百度大数据
    2. 毕业设计论文网seo服务建议
    3. 购物网站怎么建立百度推广登录入口
    4. 做pc端网站精英扫描图片找原图
    5. 淘宝客api采集发布到wordpress优化大师官网下载
    6. 性价比最高网站建设品牌网站设计
    7. arthas助力Java程序Full GC频率大降!
    8. NVIDIA A100 GPU的计算与内存层级结构
    9. day042-负载均衡与web集群搭建
    10. AR/VR 显示画质失真?OAS 体全息光栅案例来解决
    11. Vue Devtools “Open in Editor” 配置教程(适用于 VSCode 等主流编辑器)
    12. Codex+ 自建中转 API 部署教程(Windows 版)
    13. 3 大语言模型预训练数据-3.2 数据处理-3.2.2 冗余去除——1.SimHash算法处理冗余信息的核心原理
    14. react中使用3D折线图跟3D曲面图
    15. 分布式环境下 Spring Boot 项目基于雪花算法的唯一 ID 生成方案
    16. 【LLaMA-Factory 实战系列】四、API 篇 - 部署推理服务与批量调用实战
    17. 国道观察者手记
    18. computed()、watch() 与 watchEffect()
    19. Android14音频子系统-Audio HAL分析
    20. H5录音、图文视频IndexDB储存最佳实践:用AI生成语音备忘录
    21. 华为云Flexus+DeepSeek征文|基于Dify+ModelArts打造智能客服工单处理系统
    22. 了解笔记本电脑制造:从品牌到代工厂的全产业链
    23. Android14音频子系统-ASoC-ALSA之DAPM电源管理子系统
    24. 鸿蒙与h5的交互
    25. 基于Kafka实现企业级大数据迁移的完整指南
    26. 2025学年湖北省职业院校技能大赛 “信息安全管理与评估”赛项 样题卷(一)