Netty源码—6.ByteBuf原理二
大纲
1.关于ByteBuf的问题整理
2.ByteBuf结构以及重要API
3.ByteBuf的分类
4.ByteBuf分类的补充说明
5.ByteBuf的主要内容分三大方面
6.内存分配器ByteBufAllocator
7.ByteBufAllocator的两大子类
8.PoolArena分配内存的流程
7.ByteBufAllocator的两大子类
(1)UnpooledByteBufAllocator介绍
(2)PooledByteBufAllocator介绍
(3)PooledByteBufAllocator的结构
(4)PooledByteBufAllocator如何创建一个ByteBuf总结
(1)UnpooledByteBufAllocator介绍
对于UnpooledHeadByteBuf的创建,会直接new一个字节数组,并且读写指针初始化为0。对于UnpooledDirectByteBuf的创建,会直接new一个DirectByteBuffer对象。注意:其中unsafe是Netty自行判断的,如果系统支持获取unsafe对象就使用unsafe对象。
对比UnpooledUnsafeHeadByteBuf和UnpooledHeadByteBuf的getByte()方法可知,unsafe和非unsafe的区别如下:unsafe最终会通过对象的内存地址 + 偏移量的方式去拿到对应的数据,非unsafe最终会通过数组 + 下标或者JDK底层的ByteBuffer的API去拿到对应的数据。一般情况下,通过unsafe对象去取数据要比非unsafe要快一些,因为unsafe对象是直接通过内存地址操作的。
//Simplistic ByteBufAllocator implementation that does not pool anything.
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
...
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
...
}
//1.使用UnpooledUnsafeHeapByteBuf通过unsafe创建一个非池化的堆内存ByteBuf
final class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {
...
//Creates a new heap buffer with a newly allocated byte array.
//@param initialCapacity the initial capacity of the underlying byte array
//@param maxCapacity the max capacity of the underlying byte array
UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}
...
}
//2.使用UnpooledHeapByteBuf通过非unsafe创建一个非池化的堆内存ByteBuf
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
byte[] array;
...
//Creates a new heap buffer with a newly allocated byte array.
//@param initialCapacity the initial capacity of the underlying byte array
//@param maxCapacity the max capacity of the underlying byte array
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
}
private UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
...
this.alloc = alloc;
setArray(initialArray);//设置直接创建的字节数组
setIndex(readerIndex, writerIndex);//设置读写指针为0
}
private void setArray(byte[] initialArray) {
array = initialArray;
...
}
@Override
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}
...
}
//3.使用UnsafeByteBufUtil.newUnsafeDirectByteBuf()创建一个非池化的直接内存ByteBuf
final class UnsafeByteBufUtil {
...
static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
}
public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
ByteBuffer buffer;
...
//Creates a new direct buffer.
//@param initialCapacity the initial capacity of the underlying direct buffer
//@param maxCapacity the maximum capacity of the underlying direct buffer
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
...
this.alloc = alloc;
setByteBuffer(allocateDirect(initialCapacity), false);
}
//Allocate a new direct ByteBuffer with the given initialCapacity.
protected ByteBuffer allocateDirect(int initialCapacity) {
//使用ByteBuffer直接分配一个DirectByteBuffer对象
return ByteBuffer.allocateDirect(initialCapacity);
}
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
...
this.buffer = buffer;
...
}
}
//4.使用UnpooledDirectByteBuf创建一个非池化的直接内存ByteBuf
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
private ByteBuffer buffer;
...
//Creates a new direct buffer.
//@param initialCapacity the initial capacity of the underlying direct buffer
//@param maxCapacity the maximum capacity of the underlying direct buffer
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
...
this.alloc = alloc;
//使用ByteBuffer直接分配一个DirectByteBuffer对象
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}
private void setByteBuffer(ByteBuffer buffer) {
...
this.buffer = buffer;
...
}
}
public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer> {
...
//Allocates a new direct byte buffer.
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
...
}
//5.unsafe和非unsafe的区别
final class UnsafeByteBufUtil {
//unsafe会调用这个方法
static byte getByte(byte[] array, int index) {
return PlatformDependent.getByte(array, index);
}
...
}
final class HeapByteBufUtil {
//非unsafe会调用这个方法
static byte getByte(byte[] memory, int index) {
return memory[index];
}
...
}
(2)PooledByteBufAllocator介绍
PooledByteBufAllocator的newHeapBuffer()方法和newDirectBuffer()方法,都会首先通过threadCache获取一个PoolThreadCache对象,然后再从该对象获取一个heapArena对象或directArena对象,最后通过heapArena对象或directArena对象的allocate()方法去分配内存。
具体步骤如下:
步骤一:拿到线程局部缓存PoolThreadCache
因为newHeapBuffer()和newDirectBuffer()可能会被多线程同时调用,所以threadCache.get()拿到的是当前线程的cache,一个PoolThreadLocalCache对象。
PoolThreadLocalCache继承自FastThreadLocal,FastThreadLocal可以当作JDK的ThreadLocal,只不过比ThreadLocal更快。
每个线程都有唯一的PoolThreadCache,PoolThreadCache里维护两大内存:一个是堆内存heapArena,一个是堆外内存directArena。
步骤二:在线程局部缓存的Arena上进行内存分配
Arena可以翻译成竞技场的意思。
创建PooledByteBufAllocator内存分配器时,会创建两种类型的PoolArena数组:heapArenas和directArenas。这两个数组的大小默认都是两倍CPU核数,因为这样就和创建的NIO线程数一样了。这样每个线程都可以有一个独立的PoolArena。
PoolThreadLocalCache的initialValue()方法中,会从PoolArena数组中获取一个PoolArena与当前线程进行绑定。对于PoolArena数组里的每个PoolArena,在分配内存时是不用加锁的。
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private final PoolThreadLocalCache threadCache;
private final PoolArena<byte[]>[] heapArenas;//一个线程会和一个PoolArena绑定
private final PoolArena<ByteBuffer>[] directArenas;//一个线程会和一个PoolArena绑定
//表示threadCache.tinySubPageHeapCaches数组里的每个MemoryRegionCache元素,最多可以缓存512个ByteBuf
private final int tinyCacheSize;
//表示threadCache.smallSubPageHeapCaches数组里的每个MemoryRegionCache元素,最多可以缓存256个ByteBuf
private final int smallCacheSize;
//表示threadCache.normalHeapCaches数组里的每个MemoryRegionCache元素,最多可以缓存64个ByteBuf
private final int normalCacheSize;
...
static {
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
DEFAULT_PAGE_SIZE = defaultPageSize;
int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
DEFAULT_MAX_ORDER = defaultMaxOrder;
final Runtime runtime = Runtime.getRuntime();
final int defaultMinNumArena = runtime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;//8K * 2^11 = 16M
DEFAULT_NUM_HEAP_ARENA = Math.max(0,SystemPropertyUtil.getInt("io.netty.allocator.numHeapArenas",
(int) Math.min(defaultMinNumArena, runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,SystemPropertyUtil.getInt("io.netty.allocator.numDirectArenas",
(int) Math.min(defaultMinNumArena, PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
//cache sizes
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
//32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in 'Scalable memory allocation using jemalloc'
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt("io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
//the number of threshold of allocations when cached entries will be freed up if not frequently used
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt("io.netty.allocator.cacheTrimInterval", 8192);
...
}
public PooledByteBufAllocator() {
this(false);
}
public PooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
}
//默认的pageSize=8K=8192,maxOrder=11,tinyCacheSize=512,smallCacheSize=256,normalCacheSize=64
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena,
int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
super(preferDirect);
//初始化PoolThreadLocalCache
this.threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;//512
this.smallCacheSize = smallCacheSize;//256
this.normalCacheSize = normalCacheSize;//64
//chunkSize = 8K * 2^11 = 16M
final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
...
//pageShifts = 13
int pageShifts = validateAndCalculatePageShifts(pageSize);
if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
if (heapArena != null) {
//分配堆内存
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
//分配直接内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
...
}
final class PoolThreadCache {
//PoolArena对象
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
//ByteBuffer缓存队列
//Hold the caches for the different size classes, which are tiny, small and normal.
//有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
//有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
//有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
...
}
(3)PooledByteBufAllocator的结构
(4)PooledByteBufAllocator如何创建一个ByteBuf总结
每个线程调用PoolThreadLocalCache的get()方法时,都会拿到一个PoolThreadCache对象。然后通过PoolThreadCache对象可以拿到该线程对应的一个PoolArena对象。
这个PoolThreadCache对象的作用就是通过FastThreadLocal的方式,把PooledByteBufAllocator内存分配器的PoolArena数组中的一个PoolArena对象放入它的成员变量里。
比如第1个线程就会拿到内存分配器的heapArenas数组中的第1个PoolArena对象,第n个线程就会拿到内存分配器的heapArenas数组中的第n个PoolArena对象,从而将一个线程和一个PoolArena进行绑定了。
PoolThreadCache除了可以直接在这个PoolArena内存上进行分配外,还可以在它维护的一些ByteBuffer或者byte[]缓存列表上进行分配。比如我们通过PooledByteBufAllocator内存分配器创建了一个1024字节的ByteBuf,该ByteBuf被用完并释放后,可能还需要在其他地方继续分配1024字节大小的内存。这时其实不需要重新在PoolArena上进行内存分配,而可以直接从PoolThreadCache里维护的ByteBuffer或byte[]缓存列表里拿出来返回即可。
PooledByteBufAllocator内存分配器里维护了三个值:tinyCacheSize、smallCacheSize、normalCacheSize,tinyCacheSize表明tiny类型的ByteBuf最多可以缓存512个,smallCacheSize表明small类型的ByteBuf最多可以缓存256个,normalCacheSize表明normal类型的ByteBuf最多可以缓存64个。
在创建PoolThreadCache对象时,会把这3个值传递进去。然后用于创建:
tinySubPageHeapCaches、
smallSubPageHeapCaches、
normalHeapCaches、
tinySubPageDirectCaches、
smallSubPageDirectCaches、
normalDirectCaches。
8.PoolArena分配内存的流程
PooledByteBufAllocator内存分配器在使用其方法newHeapBuffer()和newDirectBuffer()分配内存时,会分别执行代码heapArena.allocate()和directArena.allocate(),其实就是调用PoolArena的allocate()方法。在PoolArena的allocate()方法里,会通过其抽象方法newByteBuf()创建一个PooledByteBuf对象,而具体的newByteBuf()方法会由PoolArena的子类DirectArena和HeapArena来实现。
PoolArena的allocate()方法分配内存的大体逻辑如下:首先通过由PoolArena子类实现的newByteBuf()方法获取一个PooledByteBuf对象,接着通过PoolArena的allocate()方法在线程私有的PoolThreadCache上分配内存,这个分配过程其实就是拿一块内存,然后初始化PooledByteBuf对象里的内存地址值。
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private final PoolThreadLocalCache threadCache;
private final PoolArena<byte[]>[] heapArenas;//一个线程会和一个PoolArena绑定
private final PoolArena<ByteBuffer>[] directArenas;//一个线程会和一个PoolArena绑定
...
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
if (heapArena != null) {
//分配堆内存
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
//分配直接内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
...
}
abstract class PoolArena<T> implements PoolArenaMetric {
...
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建ByteBuf对象
allocate(cache, buf, reqCapacity);//基于PoolThreadCache对ByteBuf对象进行内存分配
return buf;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) {//capacity < pageSize,需要分配的内存小于8K
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) {//< 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
//命中缓存,was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
//命中缓存,was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
//Synchronize on the head.
//This is needed as PoolChunk#allocateSubpage(int) and PoolChunk#free(long) may modify the doubly linked list as well.
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
return;
}
}
//没有命中缓存
allocateNormal(buf, reqCapacity, normCapacity);
return;
}
if (normCapacity <= chunkSize) {//需要分配的内存大于8K,但小于16M
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
//命中缓存,was able to allocate out of the cache so move on
return;
}
//没有命中缓存
allocateNormal(buf, reqCapacity, normCapacity);
} else {//需要分配的内存大于16M
//Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);
static final class HeapArena extends PoolArena<byte[]> {
...
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
: PooledHeapByteBuf.newInstance(maxCapacity);
}
}
static final class DirectArena extends PoolArena<ByteBuffer> {
...
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
}
...
}
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};
static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
...
}
final class PooledUnsafeHeapByteBuf extends PooledHeapByteBuf {
private static final Recycler<PooledUnsafeHeapByteBuf> RECYCLER = new Recycler<PooledUnsafeHeapByteBuf>() {
@Override
protected PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) {
return new PooledUnsafeHeapByteBuf(handle, 0);
}
};
static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {
PooledUnsafeHeapByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
...
}
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0);
}
};
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
...
}
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
...
}
PoolArena.allocate()方法分配内存的逻辑如下:
步骤一:首先PoolArena.newByteBuf()方法会从RECYCLER对象池中,尝试获取一个PooledByteBuf对象并进行复用,若获取不到就创建一个PooledByteBuf。以DirectArena的newByteBuf()方法为例,它会通过RECYCLER.get()拿到一个PooledByteBuf。RECYCLER是一个带有回收特性的对象池,RECYCLER.get()的含义是:若对象池里有一个PooledByteBuf就拿出一个,没有就创建一个。拿到一个PooledByteBuf之后,由于可能是从回收站里拿出来的,所以要调用buf.reuse()进行复用,然后才是返回。
步骤二:接着PoolArena.allocate()方法会在PoolThreadCache缓存上尝试进行内存分配。如果有一个ByteBuf对象之前已使用过并且被释放掉了,而这次需要分配的内存是差不多规格大小的一个ByteBuf,那么就可以直接在该规格大小对应的一个缓存列表里获取这个ByteBuf缓存,然后进行分配。
步骤三:如果没有命中PoolThreadCache的缓存,那么就进行实际的内存分配。