Netty对象池ObjectPool源码解析
ObjectPool
io.netty.util.internal.ObjectPool
类的结构和源码实现。
ObjectPool
是 Netty 中一个轻量级的对象池接口。它的主要目的是提供一个通用的对象复用机制,以减少对象的创建和垃圾回收开销,从而提高应用程序的性能。
核心设计理念
ObjectPool
的设计非常简洁,它定义了对象池的基本行为:获取对象 (get()
),以及如何创建新对象和回收对象(通过内部接口 ObjectCreator
和 Handle
)。
其核心思想是委托:
- 对象创建的委托:
ObjectPool
本身不负责如何创建具体的对象实例,而是将这个责任委托给用户提供的ObjectCreator
实现。 - 对象回收的委托:当一个从池中获取的对象不再被使用时,用户通过与该对象关联的
Handle
来通知对象池,以便对象池可以回收并复用这个对象。
主要组成部分
ObjectPool
类本身是一个抽象类,并定义了几个关键的内部接口和静态工厂方法。
package io.netty.util.internal;import io.netty.util.Recycler;/*** Light-weight object pool.** @param <T> the type of the pooled object*/
public abstract class ObjectPool<T> {ObjectPool() { } // 包级私有构造函数,防止外部直接继承/*** Get a {@link Object} from the {@link ObjectPool}. The returned {@link Object} may be created via* {@link ObjectCreator#newObject(Handle)} if no pooled {@link Object} is ready to be reused.*/public abstract T get(); // 抽象方法,用于从池中获取对象/*** Handle for an pooled {@link Object} that will be used to notify the {@link ObjectPool} once it can* reuse the pooled {@link Object} again.* @param <T>*/public interface Handle<T> { // 句柄接口,用于回收对象/*** Recycle the {@link Object} if possible and so make it ready to be reused.*/void recycle(T self); // 回收对象的方法}/*** Creates a new Object which references the given {@link Handle} and calls {@link Handle#recycle(Object)} once* it can be re-used.** @param <T> the type of the pooled object*/public interface ObjectCreator<T> { // 对象创建者接口/*** Creates an returns a new {@link Object} that can be used and later recycled via* {@link Handle#recycle(Object)}.** @param handle can NOT be null.*/T newObject(Handle<T> handle); // 创建新对象的方法,需要传入一个 Handle}/*** Creates a new {@link ObjectPool} which will use the given {@link ObjectCreator} to create the {@link Object}* that should be pooled.*/public static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator) {// 静态工厂方法,用于创建 ObjectPool 实例// 内部实际创建的是 RecyclerObjectPool 实例return new RecyclerObjectPool<T>(ObjectUtil.checkNotNull(creator, "creator"));}private static final class RecyclerObjectPool<T> extends ObjectPool<T> {// ObjectPool 的一个具体实现,基于 Netty 的 Recyclerprivate final Recycler<T> recycler; // 内部持有一个 Recycler 实例RecyclerObjectPool(final ObjectCreator<T> creator) {// 构造函数,初始化 Recyclerrecycler = new Recycler<T>() { // 匿名内部类继承 Recycler@Overrideprotected T newObject(Handle<T> handle) {// Recycler 的 newObject 方法委托给外部传入的 ObjectCreatorreturn creator.newObject(handle);}};}@Overridepublic T get() {// get 方法直接调用 Recycler 的 get 方法return recycler.get();}}
}
-
abstract class ObjectPool<T>
:ObjectPool()
: 包级私有的构造函数,意味着这个抽象类不能在包外部被直接继承。通常,用户会通过其静态工厂方法newPool()
来获取实例。public abstract T get()
: 核心方法,用于从对象池中获取一个类型为T
的对象。如果池中有可用的对象,则返回一个复用的对象;否则,通常会通过ObjectCreator
创建一个新的对象。
-
interface Handle<T>
:void recycle(T self)
: 这个接口定义了一个句柄,每个从池中获取的对象都会关联一个这样的句柄。当对象使用完毕后,调用此方法将对象自身传递回去,通知对象池该对象可以被回收复用了。
-
interface ObjectCreator<T>
:T newObject(Handle<T> handle)
: 这个接口定义了如何创建新的池化对象。实现者需要提供创建具体对象T
的逻辑。关键在于,创建新对象时,必须将传入的handle
与新创建的对象关联起来。这个handle
就是将来用于回收该对象的凭证。
-
static <T> ObjectPool<T> newPool(final ObjectCreator<T> creator)
:- 这是一个静态工厂方法,是获取
ObjectPool
实例的推荐方式。 - 它接收一个
ObjectCreator
作为参数,并确保其不为null
。 - 内部实际上创建并返回了一个
RecyclerObjectPool
的实例。这意味着ObjectPool
的默认实现是基于 Netty 强大的Recycler
类。
- 这是一个静态工厂方法,是获取
-
final class RecyclerObjectPool<T> extends ObjectPool<T>
:- 这是
ObjectPool
的一个私有静态内部实现类。 recycler
: 它内部持有一个io.netty.util.Recycler<T>
的实例。Recycler
是 Netty 中一个更复杂、线程本地优化的对象池实现。- 构造函数
RecyclerObjectPool(final ObjectCreator<T> creator)
:- 在构造时,它会创建一个
Recycler
的匿名子类实例。 - 关键在于,它重写了
Recycler
的newObject(Handle<T> handle)
方法,并将调用委托给了外部传入的creator.newObject(handle)
。这样,Recycler
在需要创建新对象时,就会使用用户定义的逻辑。
- 在构造时,它会创建一个
public T get()
:- 此方法直接调用其内部
recycler.get()
方法。Recycler
会负责从其线程本地缓存或者共享池中获取对象,或者在必要时创建新对象。
- 此方法直接调用其内部
- 这是
工作流程
-
创建对象池: 用户通过
ObjectPool.newPool(objectCreator)
创建一个对象池实例。用户需要提供一个ObjectCreator
的实现,告诉对象池如何创建新的对象。// 示例:创建一个 PooledObject 的对象池 ObjectPool.ObjectCreator<PooledObject> creator = new ObjectPool.ObjectCreator<PooledObject>() {@Overridepublic PooledObject newObject(ObjectPool.Handle<PooledObject> handle) {return new PooledObject(handle); // 创建新对象时,传入 handle} }; ObjectPool<PooledObject> pool = ObjectPool.newPool(creator);
-
获取对象: 当需要一个对象时,调用
pool.get()
。RecyclerObjectPool
会调用其内部recycler.get()
。Recycler
会尝试从其内部缓存(通常是线程本地的Stack
)中获取一个可用的对象。- 如果缓存为空,
Recycler
会调用其newObject(Handle<T> handle)
方法。 - 由于
RecyclerObjectPool
重写了此方法,最终会调用用户提供的creator.newObject(handle)
来创建一个新的对象。这个新创建的对象会与一个由Recycler
内部管理的Handle
相关联。
-
使用对象: 应用程序正常使用从池中获取的对象。
-
回收对象: 当对象使用完毕后,应用程序需要调用与该对象关联的
Handle
的recycle(T self)
方法。// 假设 PooledObject 有一个 getHandle() 方法返回其关联的 Handle // 或者 Handle 本身就是对象的一部分 PooledObject obj = pool.get(); try {// ... 使用 obj ... } finally {obj.getAssociatedHandle().recycle(obj); // 回收对象 }
- 调用
handle.recycle(obj)
会通知Recycler
该对象可以被回收。 Recycler
会将该对象放回其内部缓存中,以备后续的get()
调用复用。
- 调用
与 io.netty.util.Recycler
的关系
ObjectPool
可以看作是 Recycler
的一个简化和更通用的门面 (Facade)。Recycler
本身是一个非常强大且复杂的对象池,它考虑了多线程环境下的性能优化,使用了线程本地存储 (ThreadLocal) 来减少竞争,并有复杂的回收策略。
ObjectPool
通过封装 Recycler
,为用户提供了一个更易于理解和使用的 API,同时保留了 Recycler
的性能优势。用户只需要关注如何创建对象 (ObjectCreator
) 和如何通过 Handle
回收对象,而不需要了解 Recycler
内部的复杂机制。
使用场景示例
在 Netty 的源码中,ObjectPool
被广泛用于各种需要对象复用的场景,例如:
PooledByteBuf
的子类: 如 PooledUnsafeHeapByteBuf
, PooledDuplicatedByteBuf
, PooledUnsafeDirectByteBuf
, PooledHeapByteBuf
。它们都使用 ObjectPool
(间接通过 Recycler
) 来复用 ByteBuf
对象本身,而不是每次都创建新的 ByteBuf
实例。
PooledUnsafeHeapByteBuf.java
// ...
final class PooledUnsafeHeapByteBuf extends PooledHeapByteBuf {private static final ObjectPool<PooledUnsafeHeapByteBuf> RECYCLER = ObjectPool.newPool(new ObjectCreator<PooledUnsafeHeapByteBuf>() {@Overridepublic PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) {return new PooledUnsafeHeapByteBuf(handle, 0);}});static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {PooledUnsafeHeapByteBuf buf = RECYCLER.get(); // 从池中获取buf.reuse(maxCapacity);return buf;}// ...// 在 PooledByteBuf 的 release() 方法中,会调用 recyclerHandle.recycle(this)
}
PoolThreadCache.Entry
: 在 PoolThreadCache
内部,用于缓存内存块信息的 Entry
对象也是通过 ObjectPool
进行管理的。
PoolThreadCache.java
// ...private static final class MemoryRegionCache<T> {// ...@SuppressWarnings("rawtypes")private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {@SuppressWarnings("unchecked")@Overridepublic Entry newObject(Handle<Entry> handle) {return new Entry(handle);}});// ...@SuppressWarnings("rawtypes")private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {Entry entry = RECYCLER.get(); // 获取 Entryentry.chunk = chunk;entry.nioBuffer = nioBuffer;entry.handle = handle;entry.normCapacity = normCapacity;return entry;}// Entry 的 recycle() 方法会调用其 recyclerHandle.recycle(this)}
// ...
总结
ObjectPool
是 Netty 提供的一个简洁、高效的对象池抽象。它通过将对象的创建和回收逻辑委托给用户,并利用内部强大的 Recycler
实现,为开发者提供了一种方便的方式来复用对象,减少 GC 压力,提升应用性能。其设计清晰,易于使用,是 Netty 高性能基础设施的重要组成部分。
Recycler类结构分析
Recycler 是 Netty 提供的一个轻量级对象池,它基于 ThreadLocal 实现,旨在减少对象分配和垃圾回收的开销,从而提高应用程序性能。
主要结构和特性
- 泛型设计 (
<T>
):Recycler<T>
可以用于回收和重用任何类型的对象T
。 - 抽象方法
newObject(Handle<T> handle)
: 这是Recycler
的核心抽象方法。子类必须实现此方法来定义如何创建一个新的对象实例。Handle
参数用于将新创建的对象与回收器关联起来,以便后续回收。 ThreadLocal
存储 (FastThreadLocal<LocalPool<T>> threadLocal
):- 每个线程都拥有一个独立的
LocalPool
实例。这避免了多线程竞争,提高了并发性能。 - Netty 使用了其自定义的
FastThreadLocal
,它针对 Netty 的线程模型(尤其是FastThreadLocalThread
)进行了优化。 initialValue()
: 当线程首次访问时,会创建一个新的LocalPool
。onRemoval()
: 当线程结束或FastThreadLocal
被移除时,会清理LocalPool
中的资源,例如清空其内部的pooledHandles
队列。
- 每个线程都拥有一个独立的
- 配置参数:
DEFAULT_MAX_CAPACITY_PER_THREAD
: 每个线程的LocalPool
默认最大容量。可以通过系统属性io.netty.recycler.maxCapacityPerThread
或io.netty.recycler.maxCapacity
配置。RATIO
: 控制对象回收的频率。默认情况下,每尝试回收8个从未被回收过的对象,才真正允许一个对象进入池中。这有助于防止池容量因突发分配而过度膨胀。可以通过系统属性io.netty.recycler.ratio
配置。DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD
:LocalPool
从共享队列(如果使用)或其内部队列批量获取/释放对象的块大小。可以通过系统属性io.netty.recycler.chunkSize
配置。BLOCKING_POOL
: 是否使用阻塞队列 (BlockingMessageQueue
) 作为LocalPool
内部的pooledHandles
。默认为false
,使用PlatformDependent.newMpscQueue()
创建的非阻塞多生产者单消费者(MPSC)队列。可以通过系统属性io.netty.recycler.blocking
配置。BATCH_FAST_TL_ONLY
: 一个优化选项,决定LocalPool
的owner
线程是否仅限于FastThreadLocalThread
。
- 核心方法:
get()
: 从对象池中获取一个对象。如果池为空,则调用newObject()
创建一个新对象。recycle(T o, Handle<T> handle)
(已废弃,推荐使用Handle#recycle(Object)
): 将对象o
通过其关联的Handle
回收到池中。
Handle<T>
接口与DefaultHandle<T>
实现:Handle<T>
是一个标记接口,代表池中对象的句柄 ObjectPool.Handle<T>。DefaultHandle<T>
是Handle
的具体实现,它持有实际的对象value
和一个指向其所属LocalPool
的引用。DefaultHandle
内部使用AtomicIntegerFieldUpdater
来管理其状态 (STATE_CLAIMED
或STATE_AVAILABLE
),确保线程安全地更新对象是否可用的状态。
LocalPool<T>
内部类:- 这是实际存储和管理回收对象的单元,每个线程一个。
pooledHandles
: 一个MessagePassingQueue<DefaultHandle<T>>
,用于存储可用的DefaultHandle
对象。根据BLOCKING_POOL
的配置,它可以是BlockingMessageQueue
或 MPSC 队列。batch
: 一个ArrayDeque<DefaultHandle<T>>
,作为从pooledHandles
批量获取或向其批量释放对象的本地缓存,减少对pooledHandles
的直接操作次数,提高效率。owner
:LocalPool
所属的线程。用于判断是否可以直接将对象放入batch
。ratioCounter
和ratioInterval
: 用于实现RATIO
配置的回收策略。只有当ratioCounter
达到ratioInterval
时,newHandle()
才会真正创建一个新的DefaultHandle
,否则返回null
,意味着新创建的对象不会被池化管理(除非通过其他方式获取到Handle
)。claim()
: 从LocalPool
中获取一个DefaultHandle
。它首先尝试从batch
中获取,如果batch
为空,则从pooledHandles
中批量drain
一些DefaultHandle
到batch
中。release(DefaultHandle<T> handle, boolean guarded)
: 将一个DefaultHandle
释放回LocalPool
。如果当前线程是owner
且batch
未满,则直接放入batch
;否则,放入pooledHandles
队列。它还会检查owner
线程是否已终止,如果是,则清空池。
NOOP_HANDLE
: 一个特殊的Handle
实现,它的recycle
方法是空操作。当maxCapacityPerThread
为0,或者在不支持FastThreadLocal
的虚拟线程上时,新创建的对象会关联到这个NOOP_HANDLE
,意味着这些对象不会被回收。
类层次结构
主要组件关系
Recycler<T>
├── Handle<T> (接口) - 对象句柄
│ └── EnhancedHandle<T> (抽象类)
│ └── DefaultHandle<T> (实现类)
├── LocalPool<T> - 线程本地对象池
└── BlockingMessageQueue<T> - 调试用的阻塞队列
Handle层次
ObjectPool.Handle<T>
: Netty 内部 通用的对象池句柄接口。Recycler.Handle<T> extends ObjectPool.Handle<T>
:Recycler
特有的句柄接口,目前只是继承,没有添加新方法。Recycler.EnhancedHandle<T> implements Recycler.Handle<T>
: 一个抽象基类,引入了unguardedRecycle
方法。设计为Recycler
内部使用。Recycler.DefaultHandle<T> extends EnhancedHandle<T>
:Handle
的具体实现,包含了上述所有核心逻辑。
Handle接口
public interface Handle<T> extends ObjectPool.Handle<T> { }
EnhancedHandle抽象类
public abstract static class EnhancedHandle<T> implements Handle<T> {public abstract void unguardedRecycle(Object object);
}
DefaultHandle实现
private static final class DefaultHandle<T> extends EnhancedHandle<T> {private static final int STATE_CLAIMED = 0; // 已获取状态private static final int STATE_AVAILABLE = 1; // 可用状态private volatile int state;private final LocalPool<T> localPool;private T value;
}
Recycler、Handle、LocalPool 之间的关系和设计原因
这三个组件协同工作,构成了一个高效的、基于线程本地存储的对象池。
-
Recycler<T>
(对象回收器/池的入口):- 角色: 用户与对象池交互的顶层 API。用户通过它来获取 (
get()
) 对象和(间接地通过Handle
)回收对象。 - 核心机制: 它内部使用了一个
FastThreadLocal<LocalPool<T>> threadLocal
。这意味着每个线程都有自己独立的LocalPool
实例。这是Recycler
高性能的关键,因为它极大地减少了线程间的同步开销。 - 职责:
- 提供
get()
方法从池中获取对象。 - 定义
protected abstract T newObject(Handle<T> handle);
方法,由子类实现,用于在池为空时创建新的对象实例。
- 提供
- 角色: 用户与对象池交互的顶层 API。用户通过它来获取 (
-
LocalPool<T>
(线程本地池):- 角色: 实际存储和管理当前线程可回收对象的单元。每个
Recycler
实例在每个线程中都会有一个对应的LocalPool
。 - 核心组件:
MessagePassingQueue<DefaultHandle<T>> pooledHandles
: 一个队列(通常是 MPSC 无锁队列或在特定配置下的阻塞队列),用于存储当前线程可用的DefaultHandle
对象。MPSC 队列允许其他线程安全地将对象回收给这个LocalPool
。ArrayDeque<DefaultHandle<T>> batch
: 一个本地的、非线程安全的双端队列,作为pooledHandles
的一个批处理缓存。当从pooledHandles
取出或放入时,会先经过这个batch
,以减少对pooledHandles
的直接操作次数,提高效率。Thread owner
: 记录拥有此LocalPool
的线程。
- 职责:
claim()
: 尝试从batch
或pooledHandles
中获取一个可用的DefaultHandle
。release(DefaultHandle<T> handle, boolean guarded)
: 将一个DefaultHandle
回收进池中(优先放入batch
,其次是pooledHandles
)。newHandle()
: 根据一定的比率 (ratio
) 控制是否创建新的DefaultHandle
实例。
- 角色: 实际存储和管理当前线程可回收对象的单元。每个
-
Handle<T>
(具体为DefaultHandle<T>
) (对象的句柄/包装器):- 角色: 代表池中的一个可回收对象。它包装了实际的对象 (
value
),并持有其回收状态 (state
)。用户通过Handle
来将对象归还给池。 - 核心组件 (
DefaultHandle
):volatile int state
: 标记对象是已被占用 (STATE_CLAIMED
) 还是可用 (STATE_AVAILABLE
)。LocalPool<T> localPool
: 一个指向其所属的LocalPool
的引用。当调用recycle
时,Handle
需要知道应该把自己放回哪个池。T value
: 实际被池化的对象。
- 职责:
recycle(Object object)
: 当用户用完对象后,调用此方法。它会校验对象,然后调用其localPool
的release
方法将自身(Handle
)放回池中。get()
: 获取Handle
包装的实际对象。set(T value)
: 设置Handle
包装的实际对象。- 状态转换方法如
toClaimed()
和toAvailable()
。
- 角色: 代表池中的一个可回收对象。它包装了实际的对象 (
为什么要这样设计和互相引用?
-
高性能与低竞争:
Recycler
使用FastThreadLocal
为每个线程创建一个LocalPool
,使得绝大多数的对象获取和回收操作都在线程内部完成,避免了全局锁和跨线程同步的开销。LocalPool
中的batch
进一步减少了对底层共享队列pooledHandles
的访问频率。- MPSC 队列 (
pooledHandles
) 允许高效地处理跨线程回收的场景(一个线程创建的对象被另一个线程回收)。
-
封装与职责分离:
Recycler
作为用户接口,隐藏了复杂的内部实现。LocalPool
管理线程本地的池化逻辑。Handle
封装了被池化对象及其状态,并提供了回收的入口。
-
对象生命周期管理:
- 当调用
Recycler.get()
时:Recycler
获取当前线程的LocalPool
。LocalPool.claim()
尝试从池中获取一个DefaultHandle
。- 如果池中没有,
LocalPool.newHandle()
会创建一个新的DefaultHandle
(如果满足ratio
条件),然后Recycler
的newObject(handle)
方法会被调用来创建实际的对象T
,并将其设置到DefaultHandle
中。 - 如果池中有可用的
DefaultHandle
,则直接使用其关联的value
。
- 当调用
Handle.recycle(object)
时:DefaultHandle
校验传入的object
。DefaultHandle
调用其持有的localPool
引用上的release()
方法,将自己(DefaultHandle
)放回池中。
- 当调用
为什么需要 Handle
?
- 解耦 (Decoupling):
Handle
将池化逻辑(如对象的状态、如何回收等)与被池化的对象(业务对象T
)分离开来。业务对象T
不需要实现任何特殊的接口或包含额外的字段来支持池化。Recycler
通过Handle
来间接操作和管理T
。 - 状态管理 (State Management): 池中的对象有两种基本状态:正在被使用(claimed)或空闲可用(available)。
Handle
(具体是DefaultHandle
中的state
字段) 负责精确地追踪这个状态,这对于防止对象被错误地多次回收或在被使用时被回收至关重要。 - 归属管理 (Ownership and Return Path):
Handle
知道它属于哪个LocalPool
(线程本地池)。当你调用handle.recycle(object)
时,Handle
能够将自身(以及它包装的对象)送回到正确的LocalPool
中。 - 统一接口 (Uniform Interface):
Recycler
需要一种统一的方式来处理所有类型的池化对象。Handle
提供了这个统一的抽象层。
Handle
(特别是 DefaultHandle
) 做了什么?
让我们分析 DefaultHandle<T>
的关键组成部分和行为:
-
private T value;
- 作用: 这个字段持有实际被池化的对象实例。当你从
Recycler
获取一个对象时,你最终得到的就是这个value
。
- 作用: 这个字段持有实际被池化的对象实例。当你从
-
private volatile int state;
- 作用: 这是一个至关重要的字段,用于标记当前
Handle
(以及其value
) 的状态。STATE_CLAIMED (0)
: 表示该Handle
及其关联的value
当前正被应用程序使用,尚未被回收。STATE_AVAILABLE (1)
: 表示该Handle
及其关联的value
已经被回收,可以被对象池重新分配和复用。
- 初始化: 当一个新的
DefaultHandle
实例被创建时,state
字段默认为0
(即STATE_CLAIMED
)。这是因为新创建的Handle
通常是为了包装一个即将被使用的新对象,所以它一开始就是“被占用”状态。注释// State is initialised to STATE_CLAIMED (aka. 0) so they can be released.
指的是它初始为CLAIMED
状态,因此后续可以被release
(即回收)。 - 更新:
STATE_UPDATER
(一个AtomicIntegerFieldUpdater
) 用于原子地更新这个状态,确保线程安全。
- 作用: 这是一个至关重要的字段,用于标记当前
-
private final LocalPool<T> localPool;
- 作用: 这个字段持有一个对其所属的
LocalPool
(线程本地池) 的引用。 - 必要性: 当调用
handle.recycle(object)
时,DefaultHandle
需要知道应该将自己归还给哪个具体的LocalPool
实例。因为Recycler
是基于FastThreadLocal
的,每个线程都有自己的LocalPool
。
- 作用: 这个字段持有一个对其所属的
-
DefaultHandle(LocalPool<T> localPool)
(构造函数)- 作用: 初始化
DefaultHandle
,最重要的是保存localPool
的引用。
- 作用: 初始化
-
public void recycle(Object object)
和public void unguardedRecycle(Object object)
- 作用: 这是用户将对象归还给池的入口点。
- 行为:
- 首先检查传入的
object
是否确实是当前Handle
所持有的value
,防止错误回收。 - 然后调用
localPool.release(this, boolean guarded)
方法,将当前DefaultHandle
实例(this
)交由LocalPool
处理回收逻辑。guarded
参数决定了回收时的检查严格程度。
- 首先检查传入的
-
T get()
和void set(T value)
- 作用:
get()
用于从Handle
中获取实际的池化对象value
。set()
用于在Handle
被创建并与一个新的对象关联时,设置其value
。
- 作用:
-
void toClaimed()
- 作用: 当一个
Handle
从LocalPool
中被取出(即一个对象被认领)时调用。 - 行为: 断言当前状态必须是
STATE_AVAILABLE
,然后通过STATE_UPDATER.lazySet(this, STATE_CLAIMED)
将状态原子地设置为STATE_CLAIMED
。
- 作用: 当一个
-
void toAvailable()
和void unguardedToAvailable()
- 作用: 当一个
Handle
被回收进LocalPool
时调用。 - 行为:
toAvailable()
: 使用STATE_UPDATER.getAndSet(this, STATE_AVAILABLE)
原子地设置状态为STATE_AVAILABLE
,并检查之前的状态,如果之前已经是STATE_AVAILABLE
,则抛出异常(防止重复回收)。unguardedToAvailable()
: 类似地设置状态为STATE_AVAILABLE
,但使用lazySet
,并且其重复回收检查是在lazySet
之前。
- 作用: 当一个
对象获取: get()
Recycler.java
// ... existing code ...@SuppressWarnings("unchecked")public final T get() {// 1. 如果最大容量为0,或者当前是虚拟线程且不支持FastThreadLocal,则直接创建新对象,不走池化逻辑if (maxCapacityPerThread == 0 ||(PlatformDependent.isVirtualThread(Thread.currentThread()) &&!FastThreadLocalThread.currentThreadHasFastThreadLocal())) {return newObject((Handle<T>) NOOP_HANDLE);}// 2. 获取当前线程的 LocalPoolLocalPool<T> localPool = threadLocal.get();// 3. 尝试从 LocalPool 中声明一个 HandleDefaultHandle<T> handle = localPool.claim();T obj;if (handle == null) { // 4. 如果没有可用的 Handle// 4a. 尝试创建一个新的 Handle (受 RATIO 控制)handle = localPool.newHandle();if (handle != null) {// 4b. 如果成功创建 Handle,则创建新对象并关联obj = newObject(handle);handle.set(obj);} else {// 4c. 如果 RATIO 限制了 Handle 的创建,则创建新对象,但不池化 (使用 NOOP_HANDLE)obj = newObject((Handle<T>) NOOP_HANDLE);}} else {// 5. 如果获取到已有的 Handle,直接获取其关联的对象obj = handle.get();}return obj;}
// ... existing code ...
逻辑解释:
- 首先检查是否启用了回收 (
maxCapacityPerThread > 0
) 以及当前线程环境是否适合使用FastThreadLocal
(特别是针对虚拟线程)。如果不满足条件,则直接调用newObject()
创建一个与NOOP_HANDLE
关联的新对象,该对象不会被回收。 - 通过
threadLocal.get()
获取当前线程绑定的LocalPool
实例。如果这是线程第一次调用,会触发threadLocal.initialValue()
来创建一个新的LocalPool
。 - 调用
localPool.claim()
尝试从池中获取一个可用的DefaultHandle
。 - 如果
claim()
返回null
(池中没有可用的DefaultHandle
): a. 调用localPool.newHandle()
。这个方法会根据RATIO
配置来决定是否真的创建一个新的DefaultHandle
。 b. 如果newHandle()
成功返回了一个新的DefaultHandle
,则调用用户实现的newObject(handle)
来创建业务对象,并将该对象设置到handle
中。 c. 如果newHandle()
返回null
(因为RATIO
的限制),则同样调用newObject()
,但传递的是NOOP_HANDLE
,意味着这个新对象不会被池管理。 - 如果
claim()
返回了一个非null
的DefaultHandle
,说明从池中成功获取了一个之前回收的对象。直接通过handle.get()
获取该对象实例。
对象回收
DefaultHandle.recycle(Object object)
和 LocalPool.release(DefaultHandle<T> handle, boolean guarded)
当用户调用 handle.recycle(object)
时:
Recycler.java
// ... existing code ...private static final class DefaultHandle<T> extends EnhancedHandle<T> {
// ... existing code ...@Overridepublic void recycle(Object object) {if (object != value) {throw new IllegalArgumentException("object does not belong to handle");}// 委托给 LocalPool 的 release 方法localPool.release(this, true);}@Overridepublic void unguardedRecycle(Object object) {if (object != value) {throw new IllegalArgumentException("object does not belong to handle");}// 委托给 LocalPool 的 release 方法,unguarded 表示不做状态检查的强制设置localPool.release(this, false);}
// ... existing code ...void toAvailable() {// 使用 CAS 更新状态为 AVAILABLEint prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);if (prev == STATE_AVAILABLE) {throw new IllegalStateException("Object has been recycled already.");}}void unguardedToAvailable() {int prev = state;if (prev == STATE_AVAILABLE) {throw new IllegalStateException("Object has been recycled already.");}// 使用 lazySet 更新状态,性能稍好,但有序性保证较弱STATE_UPDATER.lazySet(this, STATE_AVAILABLE);}}private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
// ... existing code ...void release(DefaultHandle<T> handle, boolean guarded) {if (guarded) {// 1. 将 Handle 状态设置为 AVAILABLE (原子操作,防止重复回收)handle.toAvailable();} else {handle.unguardedToAvailable();}Thread owner = this.owner;// 2. 如果当前线程是 LocalPool 的所有者,并且本地批处理队列 batch 未满if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {// 2a. 直接将 Handle 添加到 batch 中accept(handle); // batch.addLast(handle);} else if (owner != null && isTerminated(owner)) {// 3. 如果 owner 线程已终止,则清空池this.owner = null;pooledHandles = null;// 4. 否则 (非 owner 线程,或 batch 已满,或 owner 为 null 但 pooledHandles 存在)} else {MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;if (handles != null) {// 4a. 将 Handle 添加到 pooledHandles 队列中 (通常是 MPSC 队列)handles.relaxedOffer(handle);}}}
// ... existing code ...@Overridepublic void accept(DefaultHandle<T> e) {batch.addLast(e);}
// ... existing code ...}
// ... existing code ...
逻辑解释:
DefaultHandle.recycle()
首先会校验传入的对象是否是当前Handle
所持有的对象。然后调用localPool.release(this, true)
。unguardedRecycle
类似,但调用localPool.release(this, false)
。LocalPool.release()
:- a. 根据
guarded
参数,调用handle.toAvailable()
或handle.unguardedToAvailable()
将Handle
的状态(原子地)更新为STATE_AVAILABLE
。如果对象已经被回收,会抛出IllegalStateException
。 - b. 优化路径: 如果当前线程 (
Thread.currentThread()
) 正是此LocalPool
的owner
线程,并且其本地缓存batch
(一个ArrayDeque
) 尚未满 (batch.size() < chunkSize
),则直接将handle
添加到batch
的末尾。这是一种快速路径,避免了对共享队列pooledHandles
的操作。 - c. Owner 终止处理: 如果
owner
线程已经终止 (isTerminated(owner)
返回true
),则认为此LocalPool
不再有效,将其owner
和pooledHandles
都置为null
,相当于清空了这个线程的池。 - d. 常规路径: 如果不满足快速路径条件(例如,回收操作发生在不同的线程,或者
batch
已满,或者owner
为null
但pooledHandles
仍然存在),并且pooledHandles
队列不为null
,则调用pooledHandles.relaxedOffer(handle)
将handle
放回主存储队列。relaxedOffer
通常是 MPSC 队列的非阻塞或低竞争入队操作。
- a. 根据
LocalPool.claim()
- 从池中获取 Handle
Recycler.java
// ... existing code ...private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
// ... existing code ...DefaultHandle<T> claim() {MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;// 1. 如果 pooledHandles 为 null (可能已被清理),则无法获取if (handles == null) {return null;}// 2. 如果本地批处理队列 batch 为空if (batch.isEmpty()) {// 2a. 从 pooledHandles 队列中批量取出最多 chunkSize 个 Handle 到 batch 中// this (LocalPool) 实现了 MessagePassingQueue.Consumer<DefaultHandle<T>>接口// 其 accept(DefaultHandle<T> e) 方法会将 e 添加到 batch 的末尾handles.drain(this, chunkSize);}// 3. 从 batch 的末尾取出一个 Handle (LIFO 行为)DefaultHandle<T> handle = batch.pollLast();if (null != handle) {// 4. 如果成功取出,将其状态设置为 CLAIMEDhandle.toClaimed();}return handle;}}
// ... existing code ...
逻辑解释:
- 首先检查
pooledHandles
是否为null
。如果是,则表示池不可用,返回null
。 - 检查本地缓存
batch
是否为空。- 如果
batch
为空,则调用handles.drain(this, chunkSize)
。这个操作会尝试从pooledHandles
队列中取出最多chunkSize
个DefaultHandle
对象。drain
方法会将取出的每个对象传递给this
(即LocalPool
实例)的accept
方法。 -
LocalPool.accept(DefaultHandle<T> e)
方法会将传入的handle
添加到batch
的末尾 (batch.addLast(e)
)。
- 如果
- 从
batch
的末尾 (batch.pollLast()
) 取出一个DefaultHandle
。这使得batch
表现为栈(LIFO)。 - 如果成功从
batch
中获取到一个handle
,则调用handle.toClaimed()
将其状态设置为STATE_CLAIMED
,表示该对象已被取出并正在使用。
LocalPool.newHandle()
创建新的 Handle (受 RATIO 控制)
Recycler.java
// ... existing code ...private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
// ... existing code ...DefaultHandle<T> newHandle() {// 1. ratioCounter 自增if (++ratioCounter >= ratioInterval) {// 2. 如果 ratioCounter 达到或超过 ratioInterval (默认为 RATIO 配置值)// 则重置 ratioCounter,并创建一个新的 DefaultHandleratioCounter = 0;return new DefaultHandle<T>(this);}// 3. 否则,返回 null,表示此次不创建池化的 Handlereturn null;}
// ... existing code ...}
// ... existing code ...
逻辑解释:
- 每次调用
newHandle()
时,ratioCounter
都会自增。 - 只有当
ratioCounter
达到ratioInterval
(在Recycler
构造函数中初始化,通常等于静态配置RATIO
)时,才会真正创建一个新的DefaultHandle<T>
实例,并将ratioCounter
重置为0。 - 如果
ratioCounter
未达到ratioInterval
,则返回null
。这意味着,并非每次需要新对象时都会创建一个可被池化的Handle
。这个机制(RATIO
)用于平滑池的增长,避免因短暂的分配高峰导致池急剧膨胀。当newHandle()
返回null
时,Recycler.get()
方法会创建一个与NOOP_HANDLE
关联的对象,这个对象在使用完毕后不会被回收。
性能优化特性
批处理机制
- 本地batch:减少对共享队列的访问
- 批量拖拽:一次性从队列获取多个对象到batch
- chunkSize控制:平衡内存使用和性能
虚拟线程支持
if (PlatformDependent.isVirtualThread(Thread.currentThread()) &&!FastThreadLocalThread.currentThreadHasFastThreadLocal()) {return newObject((Handle<T>) NOOP_HANDLE); // 虚拟线程使用NOOP
}
线程终止检测
private static boolean isTerminated(Thread owner) {// J9 JVM优化:避免使用getState()的性能问题return PlatformDependent.isJ9Jvm() ? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
}
内部类 BlockingMessageQueue<T>
Recycler.java
文件中
这个类 BlockingMessageQueue<T>
是 MessagePassingQueue<T>
接口的一个实现。根据注释,它的主要设计目标和特点如下:
-
调试用途:
- 它是一个主要用于调试目的的消息传递队列实现。
- 它提供了与
PlatformDependent#newMpscQueue(int)
可能返回的队列类似的功能,但实现方式更简单,易于理解和调试。
-
线程安全:
- 通过在几乎所有公共方法上使用
synchronized
关键字(同步监视器锁,锁的是this
即BlockingMessageQueue
实例本身),来保证线程安全。这意味着在任何时候只有一个线程可以执行该实例的任何同步方法。
- 通过在几乎所有公共方法上使用
-
底层数据结构:
- 内部使用
java.util.ArrayDeque<T>
作为实际存储元素的队列。 - 选择
ArrayDeque
的原因在注释中有详细说明:- 比
LinkedList
或LinkedBlockingQueue
更节省空间。 - 需要队列 API,所以不选择
ArrayList
。 ConcurrentLinkedQueue
是无界的,并且其size()
操作是 O(n) 复杂度的,而这里需要有界队列。ArrayBlockingQueue
会预先分配其最大容量的内存,而这些队列通常容量很大(每个线程可能一个),但实际存储的元素可能相对较少,ArrayDeque
则可以按需增长(虽然在这个实现中,外部逻辑通过maxCapacity
限制了其大小)。
- 比
- 内部使用
-
有界队列:
- 构造函数接收一个
maxCapacity
参数,用于限制队列可以存储的最大元素数量。 offer(T e)
方法在队列已满时会返回false
,不会阻塞。
- 构造函数接收一个
-
核心方法实现:
offer(T e)
: 尝试将元素添加到队列尾部,如果队列未满则成功并返回true
,否则返回false
。poll()
: 移除并返回队列头部的元素,如果队列为空则返回null
。peek()
: 返回队列头部的元素但不移除,如果队列为空则返回null
。size()
: 返回队列中的元素数量。clear()
: 清空队列。isEmpty()
: 检查队列是否为空。capacity()
: 返回队列的最大容量。
-
relaxedOffer
,relaxedPoll
,relaxedPeek
:- 这些 "relaxed" 版本的方法直接调用了它们对应的同步版本 (
offer
,poll
,peek
)。在这个特定的BlockingMessageQueue
实现中,它们并没有提供比同步版本更宽松的语义或更高的性能(因为它们仍然是同步的)。
- 这些 "relaxed" 版本的方法直接调用了它们对应的同步版本 (
-
drain(Consumer<T> c, int limit)
:- 这个方法会从队列中取出最多
limit
个元素,并传递给提供的Consumer<T> c
进行处理。它会返回实际取出的元素数量。
- 这个方法会从队列中取出最多
-
不支持的操作:
- 注释明确指出,批量的
fill
操作以及部分drain
的重载方法(如无limit
参数的drain
,以及带有WaitStrategy
和ExitCondition
的drain
/fill
)在此实现中均不支持,调用它们会抛出UnsupportedOperationException
。这进一步印证了它是一个相对简单、用于特定(调试)目的的实现。
- 注释明确指出,批量的
总结
Netty 的 Recycler
通过以下机制实现高效的对象池:
- 线程局部化: 每个线程拥有自己的
LocalPool
,避免了跨线程同步的开销。 - 批量操作:
LocalPool
中的batch
(ArrayDeque) 和chunkSize
参数允许批量从主队列pooledHandles
获取或向其归还对象,减少了对主队列的访问频率。 - MPSC 队列: 默认情况下,
pooledHandles
使用 MPSC (Multi-Producer Single-Consumer) 队列,适合于多个生产者线程(其他线程归还对象)和一个消费者线程(LocalPool
的owner
线程获取对象)的场景。 - 回收比率 (RATIO): 通过
RATIO
控制新创建对象被池化的概率,防止池大小对短期分配突发过于敏感,有助于稳定池的容量。 - 原子状态更新:
DefaultHandle
使用AtomicIntegerFieldUpdater
来安全地更新其可用状态。 - FastThreadLocal 优化: 针对 Netty 的
FastThreadLocalThread
进行了优化。
这种设计使得 Recycler
在高并发场景下能够提供低延迟的对象分配和回收,是 Netty 高性能特性的一个重要组成部分。