C++ 高并发内存池项目——无锁化设计、TLS 线程隔离与内存碎片优化
🔥个人主页🔥:孤寂大仙V
🌈收录专栏🌈:C++
🔖流水不争,争的是滔滔不息
- 一、 tcmalloc 简介 —— 高并发内存池的鼻祖级方案
- 二、高并发内存池的开胃菜:对象池(ObjectPool)
- 先谈谈池化技术与malloc
- 实现对象池
- ObjectPool 的优缺点与局限性
- 三、高并发内存池的设计与实现
- 详细聊聊为什么要设计高并发内存池?
- 内存申请机制
- (1)ThreadCache的设计(高并发内存池的第一战线)
- 前置需要的数据结构和算法
- threadcache申请内存的整体逻辑
- ThreadCache的内存申请(Allocate)
- FetchFromCentralCache 跨层协作(和 CentralCache怎么合作)
- (2)CentralCache的设计
- 前置需要的数据结构
- CentralCache申请内存的整体逻辑
- FetchRangeObj从中心缓存获取内存
- GetoneSpon 跨层协作(和Pagecage如何合作)
- (3)PageCache的设计
- pagecache申请内存的整体逻辑
- NewSpon申请span(大块页分割)
- MapObjectToSpon
- 内存申请的整体过程简述
- 内存释放机制
- (1)ThreadCache的内存释放设计
- Deallocate函数 释放自由链表的逻辑
- (2)CentralCache的内存释放设计
- ReleaseListToSpans函数 释放centralcache中span的逻辑
- (3)PageCache的内存释放设计
- ReleaseSpanToPageCache函数把Span进行合并
- 优化
- 大对象申请逻辑
- 申请内存
- 释放内存
- 用对象池优化new和delete
- 优化内存释放函数减少调用
- 性能测试与性能瓶颈
在我们项目开始之前先了解一下这个高并发内存池的原型项目。
一、 tcmalloc 简介 —— 高并发内存池的鼻祖级方案
在我们自己实现高并发内存池之前,必须要提到 Google 的开源项目 tcmalloc (Thread-Caching Malloc)。它是 Google 为高并发场景优化过的内存分配器,广泛应用在 gperftools、Chrome、MySQL、gRPC 等项目中。
- 为什么要有 tcmalloc?
标准库里的 malloc/free 在多线程场景下有两个大问题:
-
锁竞争严重
多个线程同时 malloc/free,必须抢同一把全局锁,导致性能大幅下降。 -
内存碎片严重
标准分配器不能很好地复用小块内存,容易造成堆空间碎片,浪费内存。
tcmalloc 的目标就是:
👉 降低锁竞争,保证高并发性能。
👉 减少内存碎片,让分配更高效。
- tcmalloc 的核心思想
tcmalloc 采用了 多层缓存结构,这一点和我们要写的高并发内存池思路一致:
-
ThreadCache (线程缓存)
每个线程维护自己的一份小对象缓存,分配小对象时 不需要加锁,直接从本线程的缓存里取。
解决了高并发下的锁竞争问题。 -
CentralCache (中心缓存)
当某个线程的缓存不足时,会向全局的 CentralCache 申请一批对象;
线程缓存里空闲太多时,也会把一部分归还给 CentralCache。
起到“调度员”的作用,平衡各线程的内存使用。 -
PageHeap (页堆 / PageCache)
负责向操作系统(如 mmap/brk)申请大块内存,并把它们按页(通常 8KB)为单位管理。
避免直接和系统频繁交互,提高分配效率。
简单来说,tcmalloc 就是:
线程局部缓存 → 中央缓存 → 页堆
这三层架构最大化减少了系统调用和锁的竞争。
- tcmalloc 的特点
-
极快的小对象分配速度(远超标准 malloc)
-
低碎片率(通过对象对齐和批量管理实现)
-
高并发友好(ThreadCache 消除了大部分锁竞争)
- 与我的高并发内存池的关系
我的高并发内存池项目,就借鉴了 tcmalloc 的设计理念:
-
采用 ThreadCache + CentralCache + PageCache 的三层结构
-
按照 不同大小的桶(SizeClass) 管理对象,避免碎片
-
使用 自由链表 管理小对象,快速复用
可以说,tcmalloc 是“工业级标准答案”,而我做的项目就是它的 学习版 + 简化实现,通过它来理解并实践高并发内存分配器的核心思想。
二、高并发内存池的开胃菜:对象池(ObjectPool)
先谈谈池化技术与malloc
在高性能编程领域,我们经常会听到“池化技术”(Pooling Technology)。它的核心思想其实很简单:
“把昂贵的资源先准备好、集中管理、循环复用,而不是每次都重新创建和销毁。”这样一来,就能避免频繁的资源申请与释放带来的高开销,尤其适合高并发场景。
池化技术的核心机制就是
- 批量创建:资源提前准备好,避免临时开销。
- 集中管理:资源由池统一维护,分配/回收更高效。
- 循环复用:用过的资源不销毁,直接放回池中等待下次使用。
更简单的理解就是,先申请一批资源等着,这样不就比要一次资源申请一次资源效率更高吗?
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存不是直接去堆获取内存的。
这里可以把malloc理解成一个内存池:malloc()相当于向操作系统“批发”了⼀块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
对于malloc实现,不同平台底层实现方法不一样:windows的vs中用的微软自己实现的,linux的gcc⽤的glibc中的ptmalloc。
实现对象池
在开始实现真正的高并发内存池之前,我们先来看看一个简化版的 对象池 (ObjectPool)。它的思想很简单:
- 批量申请内存:不是每次 new 就找系统申请,而是一次性向系统申请一大块内存。
- 切分复用:把大块内存切分成多个小对象需要的空间,后续申请对象时直接复用这部分内存。
- 自由链表管理:用一个单链表管理已经回收的对象,下次再申请时就能直接取出来用,避免频繁 malloc/free 的开销。
总体代码如下
#pragma once#include <iostream>
#include <vector>
using namespace std;template<class T>
class ObjectPool
{
public:ObjectPool():_memory(nullptr), _freelist(nullptr), _leftbytes(0){}// 申请一个对象T* New(){T* obj = nullptr;// 优先从自由链表中取if (_freelist){obj = (T*)_freelist;_freelist = *(void**)_freelist;}else{// 如果大块内存不够,就再申请一块if (_leftbytes < sizeof(T)){_leftbytes = 128 * 1024;_memory = (char*)malloc(_leftbytes);if (_memory == nullptr){throw bad_alloc();}}// 从大块内存中切一个对象出来obj = (T*)_memory;size_t SizeObj = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += SizeObj;_leftbytes -= SizeObj;}// 定位 new:调用构造函数,但不重新分配空间new (obj)T;return obj;}// 回收对象,放回自由链表void Delete(T* obj){obj->~T(); // 手动调用析构函数*(void**)obj = _freelist; // 头插_freelist = obj;}private:char* _memory; // 指向大块内存的指针void* _freelist; // 自由链表int _leftbytes; // 剩余空间
};
_memory是指向大块内存的指针,_freelist自由链表链接那些用完的内存对象。_leftbytes表示大块内存中剩余空间的大小。
-
优先复用:
如果 _freelist(自由链表)里有空闲对象,就直接拿来复用,避免再次申请内存。注意这里的_freelist链表是如何链接到一起的,这里我们想要被回收的内存块(也就是首地址)能及时被找到然后再次利用,因此采取把对应回收内存块的首地址用链表形式管理起来;也就是每个内存块里面前4/8个字节保存一下下一个内存块的首地址,这样就连接起来了。 -
不够就扩容:
如果没有可用对象,且 _leftbytes < sizeof(T),说明大块内存也不足,就向系统申请一块新的大内存(128KB)。 -
从大块切分:
把大块内存 _memory 按对象大小切出一份,指针后移,剩余字节数 _leftbytes 相应减少。
注意这里取 SizeObj = max(sizeof(T), sizeof(void*)),保证切出来的内存至少能存一个指针(为了以后能挂链表)。 -
调用构造函数:
new (obj)T; 是 定位 new,它不会重新分配空间,而是直接在已有内存上调用 T 的构造函数。这样就统一了对象的构造过程。
T* New() //方法 向对象池申请一个 T 类型的对象,并返回这个对象的指针。
{T* obj = nullptr; //申请的小内存对象 if (_freelist) //自由链表部为空{obj = (T*)_freelist; //自由链表头部重当小内存_freelist = *(void**)_freelist;}else{if (_leftbytes < sizeof(T)) //大块内存中剩余空间都小于要申请的小内存了{_leftbytes = 128 * 1024;_memory = (char*)malloc(_leftbytes);//申请大块内存if (_memory == nullptr){throw bad_alloc();}}obj = (T*)_memory; //从大块内存中拿到小内存 size_t SizeObj = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += SizeObj; //指向大块内存的指针移动 _leftbytes -= SizeObj;}new (obj)T; //定位new,统一构造不要开空间就构造效率太低return obj;
}
-
调用析构函数
由于我们自己管理内存,所以必须手动调用 obj->~T(); 来释放对象占用的资源(比如对象里开过文件、锁等)。 -
挂到自由链表
把 obj 的头部强转成 void**,存放 _freelist 的地址(相当于链表的 next 指针)。 -
更新链表头
把 _freelist 移到 obj 上,实现“头插法”。
这样下次 New() 时就能直接复用该对象的内存。
void Delete(T* obj)
{obj->~T(); // 1. 手动调用析构函数*(void**)obj = _freelist; // 2. 把对象头部改写为指向自由链表_freelist = obj; // 3. 头插到自由链表
}
ObjectPool 的优缺点与局限性
在前面对 ObjectPool 的申请与释放流程解析后,我们可以总结出几点 优点与局限。
优点
-
提高了分配效率
Delete 并没有真正意义上释放对象的内存,而是把这块空间挂到 freelist 中,下次直接复用。
相比系统的 new/delete 每次都可能触发 malloc/free,对象池显然减少了时间开销。 -
避免外部碎片
每个 ObjectPool 只服务于一种固定大小的对象类型 T。
这种“定长分配”的模式,保证了不会产生“外部碎片”(不同类型对象交错导致的空间浪费)。
局限与问题
-
只能服务于单一类型
ObjectPool 绑定到某个具体类型 T,不同类型就需要不同的池。
优点是避免了外部碎片,但局限性在于灵活性较差。 -
“占着茅坑不拉屎”问题
如果某个类型的对象池曾经申请了大量内存块,但后续这些对象不再使用,它们仍然挂在 freelist 中,无法被其他对象类型复用。
这就可能导致 内存浪费:看似系统有空闲内存,但却被某个 ObjectPool 占着不用。 -
对系统其他对象“不公平”
因为每个类型的池是独占的,如果一个类型霸占了大块内存,其他类型即便需要空间,也不能直接利用这些“闲置”的内存。
三、高并发内存池的设计与实现
详细聊聊为什么要设计高并发内存池?
在单线程、小规模程序里,我们直接用系统提供的 malloc/free 或 new/delete 就可以了。但一旦进入高并发场景,这些通用内存分配器就会暴露出严重的问题。
- 系统分配器的性能瓶颈
系统分配器(glibc malloc / Windows HeapAlloc)是通用的,需要处理各种复杂场景(大小不一、跨线程、跨模块)。在多线程下,malloc/free 内部往往需要 全局锁 来保证堆结构的一致性。高并发场景中,线程之间会频繁竞争这把锁,导致分配/释放速度急剧下降。
结论:性能不够用,成了全局热点。
- 内存碎片问题
外部碎片:不同大小的对象混杂,释放后留下的小空洞无法复用。
内部碎片:系统对齐开销大,申请 1KB 可能实际分配 1.1KB+。
在大规模业务里,这些碎片会迅速累积,导致 内存利用率下降,甚至出现 明明有空闲内存却申请失败 的情况。
结论:浪费严重,影响大规模稳定性。
- 不同线程间的不公平
系统分配器无法区分「哪个线程占用了多少内存」。有的线程可能申请很多但释放少,长期占用资源;另一些线程可能频繁申请释放,但不得不频繁和系统打交道,性能被拖累。
结论:资源调度不公平,容易导致线程间性能差异。
- 高并发应用的需求
现代高性能服务(如 Web 服务器、游戏服务器、数据库内核)往往有这些特点:
小对象申请频繁:比如网络消息结构体、任务节点、连接对象。申请/释放速度要求极高:要在纳秒级别完成。多线程同时操作:每个线程既要独立高效,又要和全局公平共享内存。
结论:必须设计一套 专门针对高并发优化的内存池。
- 高并发内存池的设计目标
所以,我们需要的内存池要解决以下痛点:性能高:分配/释放尽量 O(1),避免全局锁。碎片少:通过分段对齐、桶化机制,控制最坏碎片率。并发友好:线程本地缓存(ThreadCache),大大降低锁竞争。公平性:通过 CentralCache 统一调度,避免单线程独占。伸缩性:支持从系统按页分配和合并,能适应不同规模的应用。
设计高并发内存池,就是为了在多线程环境下,把 “快” 和 “省” 这两件事做到极致。它比系统分配器快(无锁、本地缓存)。它比随意 malloc/free 省(低碎片率,公平利用)。这就是为什么 Google 要开源 tcmalloc,而我们要学习并实现属于自己的高并发内存池。
三层结构
- ThreadCache(线程缓存)
ThreadCache 是每个线程私有的缓存区域(线程的本地存储),主要负责 小于 256KB 的内存分配。由于每个线程独享自己的 ThreadCache,所以在这里申请内存 不需要加锁,也就避免了多线程竞争。这正是高并发内存池能做到高效的关键之一。
- CentralCache(中心缓存)
CentralCache 是所有线程共享的缓存池,ThreadCache 会按需从 CentralCache 中获取对象。同时,CentralCache 会在合适的时机回收各个 ThreadCache 中的部分对象,避免单个线程长期占用过多内存,从而实现多线程之间更均衡的调度。承上启下的作用。
因为 CentralCache 是共享资源,所以这里存在竞争,取对象时需要加锁。但它采用了 分桶锁(桶级别的细粒度锁),并且只有在 ThreadCache 为空时才会访问 CentralCache,因此实际竞争并不会太激烈。
- PageCache(页缓存)
PageCache 位于 CentralCache 之上,负责以页(Page)为单位管理内存。当 CentralCache 中没有可用对象时,会向 PageCache 申请一定数量的页(Span),再将这些页切割成固定大小的小块,分配给 CentralCache 使用。当一个 Span 管理的所有小块都被回收时,PageCache 会将其回收,并尝试合并相邻的空闲页,重新拼成更大的页,以减少和缓解内存碎片问题。
优势
- 内存浪费少(控制碎片率)
内部碎片通过 SizeClass 分段 + 对齐,最坏浪费率 ≤ 12.5%。举例:申请 9B,实际分配 16B,浪费 7B;比例 7/9≈77%,但在整体池子上,浪费率被数学证明控制在 ≤ 1/8。
外部碎片桶内都是定长块,没有外部碎片。PageCache 还能合并空闲 Span,避免大块被割碎。
- 分配释放快
ThreadCache 层操作 O(1):Pop/Push 就是链表头插/头删。无锁(TLS 隔离)。
CentralCache/ PageCache 的锁竞争被稀释:ThreadCache 每次去 CentralCache 拿一批对象(慢启动算法 NumMoveSize)。这样一个桶里的请求不用频繁加锁。
- 并发友好
Thread Local Storage (pTLSThreadCache) 保证线程独占本地桶。线程间只在真正缺页或释放过量时才会竞争 CentralCache/PageCache 的锁。高并发场景下,99% 的申请和释放都是 线程内部无锁完成。
- 灵活伸缩
慢启动算法 (NumMoveSize):第一次去 CentralCache 只拿几个对象。如果 ThreadCache 桶用得快,会逐渐增加一次批量获取的数量。避免一开始就占用太多内存,又能动态提升效率。
回收机制 (ListTooLong):如果本地桶的对象过多,ThreadCache 会把一部分还给 CentralCache。防止某个线程独占过多内存,保证系统公平性
ThreadCache 负责“就近快取”,CentralCache 负责“全局均衡”,PageCache 负责“页级整形”。
内存申请机制
(1)ThreadCache的设计(高并发内存池的第一战线)
在内存池中,为了管理不同大小的内存块,采用了多个自由链表(FreeList),每个链表维护一类固定大小的对象。
这里引入了对齐机制:不是每个字节大小都单独维护一个链表,否则需要的数组数量会非常多,管理也会变得复杂。相反,内存池会把一段区间的请求大小对齐到一个固定粒度,比如 5、6、7 字节都会被对齐到 8 字节,从而共用同一个自由链表。
这种做法的好处是减少了链表数量,提高了查找和管理效率。但它也带来一个副作用:内碎片。
例如,用户请求 7 字节,系统实际分配 8 字节,多出来的 1 字节不能利用,就浪费掉了。虽然存在一定的浪费,但通过合理的分段和对齐策略,这种浪费通常是可控的,并且能够换取更高的分配效率。
此外,哈希映射(或数组索引映射) 用来快速定位应该使用哪一个自由链表桶,从而让“请求大小 → 对应链表”的过程变成 O(1),大幅度降低了内存分配的查找成本。
需要注意的是:最小对齐数通常设为 8 字节,这是因为要保证能至少存放一个指针大小(在 64 位系统上正好是 8 字节),方便通过对象头部串联成链表。
(这个图只是形象的反应了这个映射关系,不要被迷惑。其一个自由链表的块大小 = 某个对齐后的大小
自由链表(FreeList):维护的是一类固定大小的内存块。对齐数(Alignment):决定了「请求的字节数」最终被映射到哪个链表的块大小。)
前置需要的数据结构和算法
这里有一个公共类(common.h),用于封装自由链表的结构以及对齐数 哈希映射等。
自由链表代码如下
class FreeList
{
public:void Push(void* obj) //头插{assert(obj);NextObj(obj) = _freelist;_freelist = obj;++_size;}void* Pop() //头删 弹出一个内存{assert(_freelist);void* obj = _freelist;_freelist = NextObj(obj);--_size;return obj;}bool Empty(){return _freelist == nullptr;} size_t& MaxSize() {return _MaxSize;}void PushRange(void* start,void* end,size_t n){NextObj(end) = _freelist;_freelist = start;_size += n;}void PopRange(void*& start,void*& end,size_t n){assert(n);start = _freelist;end = start;for (size_t i = 0; i < n - 1; i++){end = NextObj(end);}_freelist = NextObj(end);NextObj(end) = nullptr;_size -= n;}size_t Size(){return _size;}
private:void* _freelist = nullptr;size_t _MaxSize = 1;size_t _size = 0;
};
static void*& NextObj(void* obj)
{return *(void**)obj;
}
通过 NextObj(void*) 把空闲对象串起来,省额外元数据开销。
Push:把用完的内存插入链表中。
Pop:从自由链表中取出内存。
Empty:判断该自由链表中是否有节点,进而判断是否需要到central cache中取内存。
其他方法用到时再具体聊。
Push
头插一个对象到空闲链表里。
NextObj(obj) = _freelist → 新对象的 next 指针指向旧头结点
_freelist = obj → 更新链表头
_size++ → 空闲对象数量 +1
Pop()
从头删:弹出一个对象返回给上层。
拿 _freelist 出来
_freelist = NextObj(obj) → 头后移
_size–
Empty()
判断是否为空,直接看 _freelist 是否为 nullptr。
如下是对齐数等算法的实现
class SizeClass
{
public://对齐数static inline size_t _RoundUp(size_t bytes, size_t alignNum){return (bytes + alignNum - 1) & ~(alignNum - 1);}static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{assert(false);return -1;}}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (static_cast<unsigned long long>(1) << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}static inline size_t NumMoveSize(size_t size) //慢启动ThreadCache申请所得到的size{size_t num = MAX_BYTES / size; //threadcache要小内存一次性给多个对象if (num < 2) //要大内存一次性给少量对象num = 2;if (num > 512)num = 512;return num;}static size_t NumMovePage(size_t size)// 计算一次向系统获取几个页{size_t num = NumMoveSize(size);size_t npage = num * size;npage >>= PAGE_SHIFT;if (npage == 0)npage = 1;return npage;}
};
RoundUp:根据要的字节,计算对齐数。
Index:计算映射到哪一个自由链表桶。
这里把它写成静态成员函数是为方便在没有类对象情况下对函数的直接调用,因为现在并没有成员变量所以没必要实例化出对象。可以把它做为内联函数,减少函数栈帧的开辟。
RoundUp(size)
根据大小落在哪个区间,选择不同的对齐粒度:
[1,128] → 8 byte 对齐
[129,1024] → 16 byte 对齐
[1025,8KB] → 128 byte 对齐
[8KB+1,64KB] → 1KB 对齐
[64KB+1,256KB] → 8KB 对齐
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16) 0-16个桶
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
_RoundUp(bytes, alignNum)
按对齐数 alignNum 向上取整。
位运算 (bytes + alignNum - 1) & ~(alignNum - 1) 保证对齐。
Index(bytes)
根据大小,分区间计算桶号(用 group_array 保存前面区间桶的偏移量)。
举个例子:
64 byte → 在第 8 个桶(因为 64/8 -1=7)。
200 byte → 在第二个区间,桶号要加上前 16 个 offset。
_Index(bytes, align_shift)
计算落在某个区间内的桶号。
核心公式:
((bytes + (1 << align_shift) - 1) >> align_shift) - 1
就是 向上对齐后 / 对齐大小 - 1。
“分段 + 桶化 + 对齐”的做法,本质上就是在做“减少内存浪费 & 提高分配速度”的权衡。
优势一:减少内存碎片
-
避免外部碎片
外部碎片是指:不同大小的内存块交错使用后,可能出现很多“空洞”但无法被利用。
桶化之后,每个桶里的对象都是固定大小(比如 16B 桶只放 16B 的对象),所以桶内不会产生外部碎片。 -
控制内部碎片
内部碎片是指:用户申请的大小和实际分配的大小之间的差额。比如用户要 9B,实际分配 16B,多浪费 7B。
但为什么不直接精确分配 9B?因为那样桶就会爆炸(每个字节都一个桶,效率极差)。
所以 分段 + 对齐就是在做折中:小对象 → 对齐粒度小(8B、16B),内部碎片少;大对象 → 对齐粒度大(128B、1KB),桶数量可控。这样保证了整体浪费可控,而桶数量也不会过多。
优势二:快速定位 & 高效复用
通过 Index(size),可以 O(1) 算出用户申请的大小该落在哪个桶。桶内是 FreeList(单链表),分配时只要从链表头取一个即可,非常快。回收时,只要把对象插回链表头,也很快。相比系统的 malloc/free,少了复杂的堆管理逻辑,性能提升显著。
优势三:按需分段,公平利用
小对象数量最多 → 给它最密集的桶划分(比如 0~128B → 16 个桶,8B 对齐)。
大对象数量相对少 → 用粗对齐减少桶数量(比如 64KB~256KB → 8KB 对齐)。
这种分段设计既避免了浪费,又让内存池结构不会爆炸。
threadcache申请内存的整体逻辑
threadCache.h
ThreadCache 是整个内存池里离用户最近的一层:线程的本地存储每个线程各有一份(TLS:_declspec(thread) ThreadCache* pTLSThreadCache)。负责 ≤ MAX_BYTES(你设的是 256KB) 的小对象分配。热路径完全 无锁(因为“线程私有”),快得飞起。
#pragma once
#include "Common.h"
class ThreadCache
{
public:void* Allocate(size_t size); //申请内存对象就是从桶中拿到小内存void Deallocate(void* ptr, size_t size); //释放内存对象就是吧用完的内存放回去(自由链表)void* FetchFromCentralCache(size_t index, size_t size);//内存不够了去中心缓存拿void ListTooLoog(FreeList& list, size_t size);//释放内存,链表过长,回收内存到中心缓存
private:FreeList _freelists[MAX_BUCKET];
};
//线程本地存储
// 每个线程都会拥有自己独立的 pTLSThreadCache 指针变量,互不干扰,互不共享,自动隔离!
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
ThreadCache的内存申请(Allocate)
void* ThreadCache::Allocate(size_t size) //申请内存
{assert(size < MAX_BYTES);size_t alignNum = SizeClass::RoundUp(size);//对齐数size_t index = SizeClass::Index(size); //映射的桶if (!_freelists[index].Empty()){return _freelists[index].Pop(); //弹出内存去工作//弹出的内存块就是这个桶所对应的大小,整个自由链表的个数就是桶的个数rr}else{return FetchFromCentralCache(index, alignNum); //没有内存去中心缓冲申请内存}
}
常态:O(1) 无锁 Pop。缺货时 才触发跨层交互:去 CentralCache 批量拿一串对象回来,摊薄锁开销与切分成本。
传进来申请的字节数,根据算法计算对齐数然后计算所映射的桶。如果这个自由链表桶不为空,就让块内存弹出去工作。也有可能threadcache没有内存了,这时就要去CentralCache中心缓存去申请内存。
FetchFromCentralCache 跨层协作(和 CentralCache怎么合作)
和上文中threadcache中没内存去centralcache申请环环相扣
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)//从中心缓存获取内存
{//慢启动算法//NumMoveSize根据算法已经得到了应该获得的内存的大小,//在设置慢启动自动和NumMoveSize获得的大小一样才用NumMoveSize获得的大小size_t batchNum = min(_freelists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freelists[index].MaxSize() == batchNum){_freelists[index].MaxSize() += 1;}void* Start = nullptr;void* End = nullptr;//从中心缓存中根据上面得到的获取对象的数量进行获取,但是不一定想获取多少就获取多少size_t actual = CentralCache::GetInstance()->FetchRangeObj(Start, End, batchNum, size);assert(actual >= 1);if (actual == 1){assert(Start == End);return Start;}else{//把得到的内存插入自由链表中_freelists[index].PushRange(NextObj(Start), End, actual-1);//那一块内存返回给用户,剩下的插入链表return Start; //因为调这个方法的时候已经缺内存了}
}
当线程缓存里的某个桶空了,就得去 CentralCache 拿一批对象回来补充 CentralCache 会找到(或新建)对应大小的 Spon(Span),把 Span 里的小块串成 spon->_freeList,再一次性切给 ThreadCache。
这里就用到了common.h中的SizeClass类中的慢启动算法NumMoveSize方法。放到这里讲。
慢启动算法理论推荐值如下:
static inline size_t NumMoveSize(size_t size) //慢启动ThreadCache申请所得到的size{size_t num = MAX_BYTES / size; //threadcache要小内存一次性给多个对象if (num < 2) //要大内存一次性给少量对象num = 2;if (num > 512)num = 512;return num;}
threadcache要小内存多给几个对象,要大内存一次性给少量对象。
size_t batchNum = min(_freelists[index].MaxSize(), SizeClass::NumMoveSize(size));
先决定这次要从 CentralCache 拿多少个对象。SizeClass::NumMoveSize(size) → 理论推荐值(比如小对象多拿,大对象少拿)。_freelists[index].MaxSize() → 当前桶允许的最大批量(慢启动调节用)。两者取最小值。
if (_freelists[index].MaxSize() == batchNum)
{_freelists[index].MaxSize() += 1;
}
慢启动算法:刚开始,MaxSize 很小(默认 1,freelist中设置好了)。每次真正“吃满”了,就 +1。这样慢慢放宽批量申请的上限。好处:避免一上来就把内存要太多,导致别的线程饿死。
size_t actual = CentralCache::GetInstance()->FetchRangeObj(Start, End, batchNum, size);
assert(actual >= 1);
向 CentralCache 申请一批对象。要 batchNum 个,但不一定全给(可能内存紧张)。actual 表示实际拿到的数量。保证至少拿到 1 个。FetchRangeObj方法就要在Centralcache的类中实现。
if (actual == 1)
{assert(Start == End);return Start;
}
如果只拿到 1 个:直接返回给用户,不存 freelist。
_freelists[index].PushRange(NextObj(Start), End, actual-1);
return Start;
如果拿到多个:第一个对象 Start 返回给用户。剩下 (actual-1) 个批量挂到该桶的 freelist 中备用。
这里的PushRange就是把获得的内存对象,一块挂到挂到该桶的freelist中备用。是common.h中的方法
PushRange(start,end,n)一次性把 [start, end] 这一段 n 个对象挂到当前 freelist 的头。end->next = _freelist → 接在原来的链表前,_freelist = start → 更新头,_size += n。
慢启动调节
freelist 的 MaxSize 从小到大逐步增加,不会一口吃成胖子。这样线程缓存“成长”是温和的,避免系统抖动。
批量获取 + 局部缓存
一次拿多个对象,减少锁竞争。自己用一个,剩下的留在 freelist 里,下次直接用。
(2)CentralCache的设计
CentralCache 是线程本地缓存(ThreadCache)和页级缓存(PageCache)之间的中间调度层。职责大致三条:按 size-class 管理一组 Spon(span)链表(每个桶一个 SponList)。在 thread 缺货时,批量从某个 Spon 切出若干小块返回给 ThreadCache(FetchRangeObj)。在 thread 回收过多内存或归还对象时,把对象放回对应 Spon,并在空 Spon 时把 span 返还给 PageCache(ReleaseListToSpans)。central cache映射的spanlist中所有span的都没有内存以后,则需要向pagecache申请⼀个新的span对象,拿到span以后将span管理的内存按大小切好作为⾃由链表链接到⼀起。然后从span中取对象给threadcache。
换一句话:CentralCache 是“局部缓存的粮仓 + 全局页的分配员”。启承上启下的作用。
前置需要的数据结构
central cache也是⼀个哈希桶结构,他的哈希桶的映射关系跟threadcache是⼀样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了⼀个个小内存块对象挂在span的自由链表中。
//中心缓存 一个页单位为spon
//管理多个连续页大块内存跨度结构
struct Spon
{Spon():_prev(nullptr),_next(nullptr), _n(0),_pageid(0), _usecount(0){}PAGE_ID _pageid; //大块内存的起始地址 页号size_t _n; //页的数量Spon* _prev; //前后指针Spon* _next;size_t _usecount;//切好的小内存块,用了就++还回来就--void* _freeList = nullptr;//Spon下面的小内存的自由链表
};
//双向链表连接Spons
class SponList
{
public:SponList(){_head = new Spon;_head->_prev = _head;_head->_next = _head;}void Insert(Spon* pos,Spon* newpos) //插入{assert(pos);assert(newpos);Spon* prev = pos->_prev;prev->_next = newpos;newpos->_prev = prev;pos->_prev = newpos;newpos->_next = pos;}void Erase(Spon* pos) //删除{assert(pos);Spon* prev = pos->_prev;Spon* next = pos->_next;prev->_next = next;next->_prev = prev;}Spon* begin(){return _head->_next;}Spon* end(){return _head;}bool Empty(){return _head->_next == _head;}void PushFront(Spon* spon){Insert(begin(), spon);}Spon* PopFront(){Spon* front = _head->_next;Erase(front);return front;}
private:Spon* _head; //头节点
public:std::mutex _mtx;
};
Spon(也就是 span)
_pageid:span 起始页号(page-level id)
_n:占用页数(span 长度)
_freeList:span 内部的小块自由链表(同 size-class 的对象)
_usecount:当前被分配出去且尚未归还的对象数量
_prev/_next:双向链表连接 Spon(放在 _sponlists[index] 中)
pageid + n 用来在 PageCache 层实现合并/切割(coalesce / split)。只有知道页范围,才能把相邻 span 合并成更大的连续块,或把大 span 切成多个需要大小的 k-span 给中央缓存。
_freeList 用于快速分配/回收 span 内的“固定大小”小块(同一个 size-class)。
_usecount 负责跟踪“正在被外部使用的小块数”,这是判断何时可以把 span 回收给 PageCache 的关键。
SponList
每个 size-class 一个 SponList(有头结点),并带一个桶级别的 mutex(_mtx)。
构造函数 new Spon 生成一个哨兵 _head,并把头的 _prev/_next 指向自己。这样 begin() == end() 判断空链表非常简单。
Insert(pos, newpos):把 newpos 插到 pos 之前(常用于 PushFront)。
Erase(pos):删除 pos,但实现没有把 pos->_prev/_next 置 nullptr —— 推荐在 Erase 后置空以避免悬指针。
PushFront / PopFront:常用于 PageCache 的按页数量桶管理(比如 _sponlists[k] 存放所有长度为 k 的空闲 span)。
CentralCache申请内存的整体逻辑
CentralCache.h
//中心缓存 单例模式 饿汉单例
class CentralCache
{
public:static CentralCache* GetInstance(){return &_SInst;}//获取Spon对象Spon* GetoneSpon(SponList& sponlist, size_t size);//从中心缓存中获取内存对象size_t FetchRangeObj(void*& Start, void*& End, size_t batchNum, size_t size);//*&在函数内部修改 Start 和 End 的值(也就是两个指针本身的地址),并将它们“返回”给调用者。void ReleaseListToSpans(void* start,size_t byte_size);
private:SponList _sponlists[MAX_BUCKET];
private:CentralCache(){ }CentralCache(const CentralCache&) = delete;//禁止拷贝构造static CentralCache _SInst;//单例指针};
CentralCache 是所有线程共享的全局资源,因此必须加锁来保证线程安全。但它并不是直接在哈希桶外套一把“大锁”,而是为哈希桶中每条 span 链分别设置独立的锁,也就是所谓的桶锁。这样一来,只有当两个线程在相同类型的内存块耗尽后,同时去申请 span,并且恰好映射到同一条 span 链时,才会发生锁竞争。由于触发条件苛刻,这种竞争的概率极低,从而保证了高效的并发性能。
_sponlists[MAX_BUCKET]:每个桶维护多个 span(不同 span 可能对应不同连续页区间但都属于该 size-class)。FreeList(在 Spon 内部和 ThreadCache 中)单链表,用对象头部存 next 指针(NextObj 辅助函数)。
CentralCache和PageCache都采用单例模式 ,是因为 CentralCache / PageCache 天生只能有一个。它保证了内存池的全局一致性、线程安全和高效管理。
FetchRangeObj从中心缓存获取内存
在上面聊的threadcache申请内存的设计中有和centralcache跨层协作,就是去中心缓存申请内存FetchRangeObj方法,这里讲从中心缓存中具体的获取方法。
//从中心缓存中获取内存
size_t CentralCache::FetchRangeObj(void*& Start, void*& End, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size); //桶大小_sponlists[index]._mtx.lock(); //上锁Spon* spon = GetoneSpon(_sponlists[index], size); //获取Sponassert(spon);assert(spon->_freeList);Start = spon->_freeList;End = Start;int i = 0;int num = 1; //实际需要拿走的小内存对象 while (i < batchNum - 1 && NextObj(End) != nullptr){End = NextObj(End);++i;++num;}spon->_freeList = NextObj(End);NextObj(End) = nullptr; //这是拿走的内存的最后一个节点的下一位置为空spon->_usecount += num;_sponlists[index]._mtx.unlock();//解锁return num;
}
这个方法简单来说就是从CentralCache中拿spon下的小内存空间,FetchRangeObj(void*& Start, void*& End, size_t batchNum, size_t size)在某个桶上,调用此函数从一个 Spon 上取出 最多 batchNum 个 小块,返回起止 Start…End 并把 Spon 的 freelist 向后推进,同时 spon->_usecount += num。该函数负责上/解桶锁,保证并发安全。
整体流程,先根据字节数找到对应的span桶,然后上桶锁,通过GetoneSpon方法来获取spon。根据batchnum和两个指针start和end,拿到实际需要拿走的小内存的对象。把spon的freelist 向后推进到 End 的下一个,然后把我们返回的区间断开(使start-end成为独立链),计数_usecount记下拿走了几个内存。然后拿到的内存链接到threadcache的freelist(这里就和前面threadcache申请内存部分耦合了)。
GetoneSpon 跨层协作(和Pagecage如何合作)
//获取一个非空Spon
Spon* CentralCache::GetoneSpon(SponList& sponlist, size_t size)
{//遍历这个spon链表中是否还有未分配的Spon对象Spon* its = sponlist.begin();while (its != sponlist.end()){if (its->_freeList != nullptr) //这个spon对象下还有小内存{return its;}else{its = its->_next;}}sponlist._mtx.unlock();//解掉桶锁,如果其他线程释放内存回来对象回来,不会阻塞。//为空了去PageCache申请内存//pagecache申请的内存是大块的spon在这里细分小块,形成自由链表PageCache::GetInstance()->_page_Mtx.lock();Spon* Spon = PageCache::GetInstance()->NewSpon(SizeClass::NumMovePage(size));//严格意义上是页缓存申请页大小,原始大内存,申请的连续内存,尚未切分成小对象。PageCache::GetInstance()->_page_Mtx.unlock();char* Start = (char*)(Spon->_pageid << PAGE_SHIFT); //起始地址size_t bytes = Spon->_n << PAGE_SHIFT; //页大小char* End = Start + bytes;// ——————————————————————————————————————//| | //页中申请的大块内存// ______________________________________ //^ ^ //| |//start end////____________//| | Spon //切割成spon//____________// | |// v v//[obj1]->[obj2]Spon->_freeList = Start; //切割成sponStart += size;void* tail = Spon->_freeList; //尾插的形式while (Start < End){NextObj(tail) = Start;tail = NextObj(tail);Start += size;}NextObj(tail) = nullptr;// 切好span以后,需要把span挂到桶里面去的时候,再加锁sponlist._mtx.lock();//页缓存中获取的内存给中心缓存需要内存的桶sponlist.PushFront(Spon);return Spon;
}
GetoneSpon(SponList& sponlist, size_t size):在给定桶(sponlist)里找一个还有空闲小块的 Spon(span)。如果找不到,就去 PageCache 申请新的连续页(一个新的 span),把它切成小块并挂回到该 sponlist,最后返回一个非空的 Span。重要不变式:函数返回时,调用者仍然应当拥有对应桶的 mutex(也就是说函数结束时桶锁是持有状态)。
这里的加锁与解锁非常关键,但是我们先把内存的申请逻辑捋一遍在谈锁机制。
内存申请逻辑:前面的FetchRangeObj找到spon就在span下切割小内存,如果找不到span就要调用GetoneSpon方法来获取新的spon。遍历这个span链表判断是否有span对象下有未分配的小内存,有内存就要返回这个spanlist下的小内存。如果spanlist中没有span,就要去PageCache中接着去申请内存了(与pagecache进行联动),解掉桶锁,加上pagecache的锁,然后去pagecache中申请内存。从pagecache申请来的大块内存切割成spon,然后尾插的形式挂到centralcache中的sponlist下。然后加上桶锁。保证新的 span 被正确挂到桶链表头。返回这个新 span 给 FetchRangeObj,然后就走FetchRangeObj的逻辑。
锁的逻辑:
线程缓存缺内存 → 找 CentralCache,ThreadCache 调用 CentralCache::FetchRangeObj,希望拿到一批对象。先找到对应桶 _sponlists[index](index 根据 size-class 算出来)。这里的桶加锁防止多个线程同时从该桶里取内存,导致 freelist/链表状态混乱(上图蓝色锁)。然后调用 GetoneSpon,遍历spanlist是否有可用对象,没有知道啊span对象去PageCache中申请大块内存,先解桶锁(关键点)因为后续要去 PageCache 申请,耗时长,如果还占着桶锁会阻塞别的线程(比如别的线程正在回收对象回这个桶)(上面红色锁)。给 PageCache 加锁保证 PageCache 的页分配是线程安全的。(上面黄色锁),去pagecache申请内存,申请完内存回来解锁(黄色锁)。把新的span挂回centralcache的桶,然后再次给桶加锁(红色锁)。
//下面这个引用的流程由ai生成
当线程缓存(ThreadCache)缺内存 时,会调用 CentralCache::FetchRangeObj
向中心缓存要一批对象。流程如下:
定位桶并加锁(蓝色锁) 根据 size-class 算出 index,找到 _sponlists[index]。 给该桶加锁,保证只有一个线程能同时操作这个桶的 freelist/spanlist,避免链表状态被多个线程并发修改。
尝试获取可用 Span 调用 GetoneSpon 遍历该桶的 spanlist,看是否存在还有小对象可分配的 span。 如果找到了,直接返回(桶锁还保持着)。
如果该桶没有可用 Span:解桶锁 → 申请新内存(红色锁 → 黄色锁)
- 解桶锁(关键点!) 因为后续要去 PageCache 申请大块内存,这一步耗时可能很长。 如果还占着桶锁,会把其他线程卡死 —— 特别是那些想要回收内存到该桶的线程,就会一直阻塞。
- 给 PageCache 加锁(黄色锁) PageCache 是全局的,内部要维护页号映射表和 span 管理链表,必须用一把全局大锁保护。 在这里安全地向 PageCache 申请新 span。
- 申请完成后解 PageCache 锁,避免长时间持有全局锁。
- 把新 span 挂回 CentralCache 桶 把 PageCache 切好的 span 挂到 _sponlists[index] 的链表中。 挂之前需要 再次加桶锁(蓝色锁),保证插入过程安全。
注意,如上图红色锁和蓝色锁是同一把锁,下面看一下,容易绕晕:
_sponlists[index],这是 CentralCache 里定义的数组,数组元素类型就是 SponList。每个桶对应一个 size-class。所以 _sponlists[index] 表示某个具体桶。
sponlist(函数参数)GetoneSpon 的参数:SponList& sponlist。当你在 FetchRangeObj 调用 GetoneSpon(_sponlists[index], size) 时,其实就是把 _sponlists[index] 传进去了。
sponlist 就是 _sponlists[index] 的别名(引用)。它们指向的就是同一块数据,同一把锁。蓝色锁」「红色锁」其实就是同一把桶锁,只不过在不同作用域下你用不同名字访问了它,看起来好像有两把。
这里锁机制的优势
细粒度桶锁 → 提高并发度,不同大小内存请求互不干扰。
短期持锁 → 遇到耗时操作先释放,避免长时间阻塞。
两级锁分离 → CentralCache 桶锁(局部)、PageCache 全局锁(全局),分工明确。
接口不变式 → 保证调用方看到的是“始终持桶锁”的一致状态,简化使用。
这套锁机制是“细粒度 + 短时间持锁 + 分层管理”,兼顾了高并发和线程安全。
(3)PageCache的设计
pagecache申请内存的整体逻辑
pageCache.h
#include"Common.h"//页缓存 单例模式 饿汉模式
class PageCache
{
public:static PageCache* GetInstance(){return &_SInst;}Spon* NewSpon(size_t k);Spon* MapObjectToSpon(void* obj); //判断当前小内存所处的页号也就是所处于那个spanvoid ReleaseSpanToPageCache(Spon* spon);std::mutex _page_Mtx;
private:SponList _sponlists[NPAGES];std::unordered_map<PAGE_ID, Spon*> _idSpanMap; //建立页号和spon的指针的映射关系PageCache(){}PageCache(const PageCache&) = delete; //禁止拷贝构造static PageCache _SInst;//单例指针 注意饿汉模式先创建对象,所以这里不用指针
};
先明确一个概念,CentralCache 和 PageCache 里都出现了 “桶”,但两者的概念完全不同。
CentralCache 的桶🪣
定义:CentralCache::_sponlists[index]
粒度:一个桶对应一个 size-class(比如 16B、32B、64B …)。
内容:每个桶里是一个 SponList,存放切分好的 span(Spon),span 里再切分成小对象。
作用:线程缓存(ThreadCache)缺小对象时,去 CentralCache 的桶里拿。每个桶只管理某一类固定大小的小对象。
PageCache 的桶🪣
定义:PageCache 里通常有 _pageLists[N],N 是页数。
粒度:一个桶对应一类 连续页数量(比如 1 页、2 页、4 页、8 页 …)。
内容:每个桶是一个链表,存放若干 span,每个 span 表示一大块连续的物理页。
作用:当 CentralCache 需要新 span(切分小块用)时,从 PageCache 的桶里取对应大小的连续页。PageCache 只关心页级内存块,完全不涉及小对象切分。
Page Cache 位于 Central Cache 之上,以页(page)为单位进行存储和分配。当 Central Cache 中没有可用的小块内存时,Page Cache 会分配出若干页,再将这些页切割成固定大小的小块交给 Central Cache 使用。当某个 span 内的所有对象都被回收后,Page Cache 会将该 span 收回,并尝试与相邻的空闲页进行合并,形成更大的连续页块,从而有效缓解内存碎片问题。
NewSpon申请span(大块页分割)
通过GetOneSpan从CentralCache中取到了Span,遍历没有找到span,就要newspan向pagecache申请内存,pagecache先检查对应位置有没有span,如果没有则向更⼤⻚寻找⼀个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到⼀个span,则将10页pagespan分裂为⼀个4页pagespan和⼀个6页pagespan
Spon* PageCache::NewSpon(size_t k)
{assert(k > 0 && k < NPAGES);//获取一个k页的sponif (!_sponlists[k].Empty()){return _sponlists[k].PopFront();}//如果没有找到的k页的spon,去下一个桶中for (int i = k + 1; i < NPAGES; i++) //找下一个桶,但是此时页就多出来了得做切割{if (!_sponlists[i].Empty()){Spon* nSpon = _sponlists[i].PopFront();Spon* kSpon = new Spon; //切割下来的sponkSpon->_pageid = nSpon->_pageid;kSpon->_n = k;nSpon->_pageid += k;nSpon->_n -= k;_sponlists[nSpon->_n].PushFront(nSpon);//把切下来的页挂回去//割下来的页,就是给threadcache用的页span,每个都要建立映射关系,方便central cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpon->_n; i++){_idSpanMap[kSpon->_pageid + i] = kSpon;}return kSpon;}}//如果到最后一个桶中都没有页就要系统去堆上申请空间了Spon* bigSpon = new Spon;void* ptr = SystemAlloc(NPAGES - 1);bigSpon->_pageid = ((PAGE_ID)ptr >> PAGE_SHIFT);bigSpon->_n = NPAGES - 1;_sponlists[bigSpon->_n].PushFront(bigSpon);return NewSpon(k);
}
补充 common.h中的算法NumMovePage
centralcache中GetInstance()->NewSpon(SizeClass::NumMovePage(size));,通过这个算法**NumMovePage(size)**计算一次性向pagecache要几个页num = NumMoveSize(size); size_t npage = num * size;npage >>= PAGE_SHIFT; if (npage == 0) npage = 1;return npage; } ```
newspon的流程
assert(k > 0 && k < NPAGES);
k 必须在 1…NPAGES-1 之间。
如果 _sponlists[k]
(第 k 页大小的桶)非空,直接 PopFront() 返回一个已有的 k-page span。
从更大块切割(split) 否则,从 k+1 到 NPAGES-1 逐桶查找第一个非空桶:弹出一个更大的 nSpon(比如有 8 页)新建 kSpon,把 k 页从 nSpon 前端切出来:kSpon->_pageid = nSpon->_pageid; kSpon->_n = k;
nSpon->_pageid += k; nSpon->_n -= k;
把 nSpon(剩余部分)按新的页数放回相应 _sponlists[nSpon->_n]
。为 kSpon 在 _idSpanMap 中建立每页到 kSpon 的映射(页 -> span)。返回 kSpon。
都没找到 → 向系统申请大块 new 一个 bigSpon,SystemAlloc(NPAGES - 1)
向 OS 申请 NPAGES-1
页。bigSpon->_pageid = ptr >> PAGE_SHIFT; bigSpon->_n = NPAGES - 1;
把 bigSpon 挂入 _sponlists[bigSpon->_n]
,然后递归 return NewSpon(k)
;(这次一定能命中)。
简单来说它的作用就是:给 CentralCache 拿一个恰好 k 页的 span,没有就切,有大块就拆,没有大块就找操作系统要。先看对应桶:如果 _sponlists[k] 里有现成的 k 页 span,直接拿。没有就切割:去更大的桶里找,切出前 k 页作为新 span,把剩余挂回对应桶。全都没有就 OS 申请:向系统申请一大块(NPAGES-1 页),挂回桶,再递归拿 k 页。
MapObjectToSpon
//根据建立的映射关系,拿到页的起始地址
//只要是属于这个页中的地址,左移PAGE_SHIFT位就可以得到对应的起始地址 也就拿到了Spon
Spon* PageCache::MapObjectToSpon(void* obj)
{PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); //拿到这个span对应的起始地址 auto itr = _idSpanMap.find(id);if (itr != _idSpanMap.end()) //找到了{return itr->second;}else{assert(false);return nullptr;}
这里其实是为了方便还内存,后面内存释放机制在详细讲
内存申请的整体过程简述
整个小对象内存分配过程,可以分为三层:
线程缓存 (ThreadCache)
每个线程有自己的一份小对象缓存(freelist)。如果 freelist 里有空闲对象,就直接拿,速度最快,无锁。
中心缓存 (CentralCache)
当线程缓存不够用时,从 CentralCache 对应 size-class 的桶里批量取一批。如果该桶下没有 span,就去 PageCache 申请。粒度:小对象批发 → ThreadCache。
页缓存 (PageCache)
维护以页为单位的大块内存(span)。CentralCache 缺 span 时,就从 PageCache 拿。如果 PageCache 对应桶没有,就去更大桶里切割;如果所有桶都没货,就直接向操作系统申请。粒度:大块页分配 → CentralCache。
线程要小对象 → ThreadCache → (不够) → CentralCache → (不够) → PageCache → (不够) → 系统堆。
内存释放机制
(1)ThreadCache的内存释放设计
Deallocate函数 释放自由链表的逻辑
void ThreadCache::Deallocate(void* ptr, size_t size)//之前自由链表中申请的内存用完了放回去
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,内存对象插图进自由链表还回去size_t index = SizeClass::Index(size);_freelists[index].Push(ptr);//当这个桶中的链表节点的个数大于之前申请的最大的个数就说明需要释放了if (_freelists[index].Size() >= _freelists[index].MaxSize()){ListTooLoog(_freelists[index], size);}
}void ThreadCache::ListTooLoog(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;list.PopRange(start,end,list.MaxSize());//把链表中的这一段CentralCache::GetInstance()->ReleaseListToSpans(start,size); //释放spon
}
把用完的内存对象,用SizeClass::Index
找到映射的自由链表桶,然后把还回来的内存对象插入对应的自由链表桶。当这个桶的链表节点的个数大于之前申请的最大的个数就说明需要释放了。这时候调用ListTooLoog(_freelists[index], size);
这个函数,把自由链表还给span
LisTooLong函数 实现与centralcache的联动
void ThreadCache::ListTooLoog(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;
//// 1) 从本地 FreeList 弹出一段(长度 = MaxSize),形成 [start, ..., end] 的单链list.PopRange(start,end,list.MaxSize());//把链表中的这一段// 2) 一把“整段”还给 CentralCache(批量归还)CentralCache::GetInstance()->ReleaseListToSpans(start,size); //释放spon
}
把threadcache下还回来的自由链表要还给centralcache的span。使用start和end进行控制需要回收的这段小内存,调用PopRange函数(common.h中freelist的函数)。
void PopRange(void*& start,void*& end,size_t n)
{assert(n>=_size);start = _freelist;end = start;for (size_t i = 0; i < n - 1; i++){end = NextObj(end);}_freelist = NextObj(end);NextObj(end) = nullptr;_size -= n;
}
批量弹出:PopRange 不是一次弹一个,而是直接拿走 MaxSize 个节点,构成一段链表 [start…end]。这是O(k) 的简单指针走访,不分配、不拷贝。
批量归还:一次调用把一段链直接交给 CentralCache::ReleaseListToSpans,降低中心锁的竞争与系统调用次数。
(2)CentralCache的内存释放设计
ReleaseListToSpans函数 释放centralcache中span的逻辑
这块逻辑是 ThreadCache 回收小对象 → CentralCache 接收 → 必要时再返还给 PageCache 的“回流路径”。
把 ThreadCache 吐回来的那一段小对象,重新挂回它们所属的 Span。如果某个 Span 已经“满血”(所有小块都回来了),就把这个 Span 从 CentralCache 中摘掉,返还给 PageCache。简而言之,它是内存回收链路中的 中转站:对象 → Span → PageCache。
void CentralCache::ReleaseListToSpans(void* start, size_t size)//把小内存还给对应的span
{size_t index = SizeClass::Index(size);_sponlists[index]._mtx.lock();while (start){void* Next = NextObj(start);Spon* spon = PageCache::GetInstance()->MapObjectToSpon(start); //需要释放的spanNextObj(start) = spon->_freeList; //把小内存放进对应的spon中spon->_freeList = start;spon->_usecount--;if (spon->_usecount == 0) //=0说明span的切分出去的所有小块内存都回来了 这个span可以在回收给pagecache了{_sponlists[index].Erase(spon);spon->_freeList = nullptr;spon->_prev = nullptr;spon->_next = nullptr;_sponlists[index]._mtx.unlock();PageCache::GetInstance()->_page_Mtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(spon);//把span还给pagecachePageCache::GetInstance()->_page_Mtx.unlock();_sponlists[index]._mtx.lock();}start = Next;}_sponlists[index]._mtx.unlock();
}
步骤逻辑
1. 找对象对应的 Span
Spon* spon = PageCache::GetInstance()->MapObjectToSpon(start);
CentralCache 并不直接知道这个小对象属于哪个 Span。
所以通过 PageCache 的 id → span 映射表快速定位(本质是页号映射)。
下面是对映射逻辑以及MapObjectToSpon函数的讲解
(这里的映射定义在pagecache中,因为后面pagecache的回收也要用到)
这里就是 小对象 → 找到它所在 Span 的关键步骤,底层依赖的正是 _idSpanMap 这个 页号到 Span 的哈希映射表。在整个内存管理体系里,对象回收时必须知道它属于哪个 Span,否则 CentralCache
无法把它正确挂回去,更无法判断 Span 是否该回收给 PageCache。这里用到的就是 _idSpanMap,一个 页号到 Span
指针的映射表。基本思路
- 页号的计算 每个对象(小内存块)一定来自某个 Span,而 Span 是由一组连续的页组成。 只要知道对象的虚拟地址,就能反推出它属于哪一页:PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
PAGE_SHIFT = log₂(page_size)相当于把地址除以页大小,得到该对象所在的页号。 例子: 假设 page size =
4KB (PAGE_SHIFT = 12) 对象地址 = 0x80456000 id = 0x80456000 >> 12 =
0x80456 所以它属于页号 0x80456。
2.映射表查找 每个 Span 在被分配时,PageCache 都会为它覆盖的每一页建立映射关系:_idSpanMap[kSpon->_pageid + i] = kSpon; key = 页号
value = 指向 Span 的指针 这样,给定任意对象地址,我们只需:取页号,在 _idSpanMap 查找对应的
Span,结果找到:直接返回对应 Span 指针,找不到:assert(false),说明逻辑出了 bug(对象不属于任何 Span)。核心思想:对象地址 → 页号 → 查 _idSpanMap → 定位 Span。
2. 回收到 Span 的 freeList
NextObj(start) = spon->_freeList;
spon->_freeList = start;
用简单的头插法把对象挂回 spon->_freeList。spon->_usecount-- 保持已用计数一致。
3. Span 彻底空闲时的释放流程
当 usecount == 0,说明该 span 中的小对象全部回来了,整个 Span 不再被任何线程使用。
此时 CentralCache 把 Span 从桶链表中摘掉,并交还给 PageCache。这里就用到了ReleaseSpanToPageCache函数需要与pagecache进行协调联动。
这是这段代码里最值得注意的点:
桶锁:保护当前 size-class 的 span 链表和对象挂接。
PageCache 锁:保护页分配/回收的线程安全。
为什么在释放 span 时 先解桶锁再加 PageCache 锁?
避免“长时间持桶锁” → 如果直接持有桶锁再去操作 PageCache,就可能拖慢别的线程(比如正在申请/释放小对象的线程)。这里的做法是:短锁快路径(先解锁)+ 慢路径另加锁(PageCache 释放)。再回来时重新加桶锁,保证不破坏不变式。
其实这里加锁与解锁的逻辑与申请内存时,去pagecache中申请内存的逻辑大相径庭。
(3)PageCache的内存释放设计
ReleaseSpanToPageCache函数把Span进行合并
当 CentralCache 把一个 整块 Spon 还给 PageCache 时,PageCache 不会傻乎乎地直接丢回桶里,而是会尝试和前后相邻的空闲 Spon 合并,这样可以缓解内存碎片问题(小块太多用处不大,大块更容易再切分出去)。
void PageCache::ReleaseSpanToPageCache(Spon* spon)
{assert(spon);PAGE_ID id = spon->_pageid;// 向前合并while (true){//对span前后的页,尝试进行合并,缓解内存碎片的问题PAGE_ID prev= spon->_pageid - 1;auto ret = _idSpanMap.find(prev);if (ret != _idSpanMap.end() && !ret->second->_isUse){Spon* prevSpon = ret->second;// 如果合并后的页数超过最大 NPAGES,就不能合并if (spon->_n + prevSpon->_n >= NPAGES)break;// 更新当前 spanspon->_pageid = prevSpon->_pageid;spon->_n += prevSpon->_n;// 把前面的 span 从桶里移除并删除_sponlists[prevSpon->_n].Erase(prevSpon);delete prevSpon;}else break;}// 向后合并while (true){PAGE_ID next = spon->_pageid + spon->_n;auto ret = _idSpanMap.find(next);if (ret != _idSpanMap.end() && !ret->second->_isUse){Spon* nextSpon = ret->second;if (spon->_n + nextSpon->_n >= NPAGES)break;spon->_n += nextSpon->_n;_sponlists[nextSpon->_n].Erase(nextSpon);delete nextSpon;}else break;}// 标记为未使用spon->_isUse = false;// 把合并后的 span 挂回桶里_sponlists[spon->_n].PushFront(spon);// 更新映射表(注意要把所有页都映射回这个 span)for (PAGE_ID i = 0; i < spon->_n; i++){_idSpanMap[spon->_pageid + i] = spon;}}
所以 ReleaseSpanToPageCache 的核心任务是:
尝试前向合并。
尝试后向合并。
把合并好的 span 放回 PageCache 的桶中。
更新映射表 _idSpanMap。
下面引用是前置条件
先明确isUse的概念
申请内存的时候当在pagecache申请到Span,把这个Sapn的_isuse标记为true。usecount 和 isUse 看起来像是重复,但它们的语义完全不同,所以才需要同时存在。
usecount 的作用
usecount 表示:这个 span 下的小对象正在被使用的数量。 比如一个 span 被切成了 128 个小块,当 ThreadCache 拿出去 50 个,还剩 78 个在 freelist 里,那 usecount = 50。当对象被还回时→ usecount–。当 usecount == 0 时,说明这个 span 的所有小块都回来了,理论上 span 可以回收给PageCache。问题点:
usecount 只能精确描述 小对象分配/回收 的状态,但它不是 PageCache 回收的唯一依据。isUse 的作用
isUse 表示:这个 span 是否被 CentralCache/ThreadCache 占用。当 CentralCache 从 PageCache 拿到一个 span 并开始切小块时,isUse = true。当 CentralCache确认这个 span 完全回收(usecount == 0 且它已经从桶里移除)时,isUse = false。为什么要加它: 因为在 PageCache 做前后合并时,不能只靠 usecount 来判断 span 是否可合并。
举个冲突场景
假设我们只有 usecount 没有 isUse: 一个 span 被分出去 128 个小块。有一瞬间所有小块都被还回来了 →
usecount == 0。但这时候 CentralCache 可能还没真正释放 span 回 PageCache(因为它还要先把 span
从桶里摘掉,然后再调用 PageCache::ReleaseSpanToPageCache)。如果 PageCache 在这个时机发现
usecount == 0,就可能错误地把 span 当成空闲合并掉,导致内存管理冲突。总结
usecount:跟踪 span 下的小对象分配/回收情况。什么时候对象被用,什么时候归还。 isUse:标记整个 span 是否已经交还 PageCache,可以参与前后合并。
为什么在 NewSpan 时必须插入首尾两个页号的映射?
在 NewSpan 时为什么要这样设置
_idSpanMap[nSpon->_pageid] = nSpon;
_idSpanMap[nSpon->_pageid + nSpon->_n - 1] = nSpon;
nSpon->_pageid :这个 Span 的起始页号。 nSpon->_pageid + nSpon->_n - 1 :这个 Span的结束页号。
也就是说:在申请新 Span 时,把 首尾页号 都映射到 nSpon。
这样做的目的
- 支持合并查找 在 ReleaseSpanToPageCache 里会尝试前后合并:
PAGE_ID prev = spon->_pageid - 1;
// 前一页
PAGE_ID next = spon->_pageid + spon->_n;
// 后一页。
如果前一页或后一页的页号能在 _idSpanMap 查到,就能知道是否有相邻 Span。只有首尾页号被映射,查找效率高,不用记录中间所有页。- 减少存储开销 如果每一页都存映射,一个 512 页的大 Span 就得存 512 条映射。现在只存 首尾,合并时通过查前一页/后一页就够用了。真正需要“地址 → Span” 时,用起始页号算就行,不需要中间页号都记。
当 Span 被释放回 PageCache,要进行合并:向前合并:查 spon->_pageid - 1 是否有空闲 Span。向后合并:查
spon->_pageid + spon->_n 是否有空闲 Span。如果这两处页号在 _idSpanMap 里能找到且对应 Span
不是 is-use,那么就能安全合并。这就是为什么在 NewSpan 时必须插入首尾两个页号的映射:不然释放时找不到邻居,就没法合并了。
逻辑步骤
void PageCache::ReleaseSpanToPageCache(Spon* spon)
{assert(spon);PAGE_ID id = spon->_pageid;
参数是要释放的Span,_pageid是这块span的其实页号
前向合并
// 向前合并
while (true)
{//对span前后的页,尝试进行合并,缓解内存碎片的问题PAGE_ID prev= spon->_pageid - 1;auto ret = _idSpanMap.find(prev);if (ret != _idSpanMap.end() && !ret->second->_isUse){Spon* prevSpon = ret->second;// 如果合并后的页数超过最大 NPAGES,就不能合并if (spon->_n + prevSpon->_n >= NPAGES)break;// 更新当前 spanspon->_pageid = prevSpon->_pageid;spon->_n += prevSpon->_n;// 把前面的 span 从桶里移除并删除_sponlists[prevSpon->_n].Erase(prevSpon);delete prevSpon;}else break;
}
找到当前 span 前一页,看它是不是空闲 span 的一部分。如果是,就把这两个 span 合并。更新 spon->_pageid(新的起始页就是更靠前的 prevSpon 的起始页)。更新 spon->_n(页数累加)。把旧的 span 从桶里删除并释放。如果找不到,或者前一个 span 在用,就直接停。
向后合并
// 向后合并
while (true)
{PAGE_ID next = spon->_pageid + spon->_n;auto ret = _idSpanMap.find(next);if (ret != _idSpanMap.end() && !ret->second->_isUse){Spon* nextSpon = ret->second;if (spon->_n + nextSpon->_n >= NPAGES)break;spon->_n += nextSpon->_n;_sponlists[nextSpon->_n].Erase(nextSpon);delete nextSpon;}else break;
}
看看当前 span 的 尾页 后面紧跟的页号是不是属于一个空闲 span。如果是,就合并,增加 span 的 _n。同样把旧的 nextSpon 从桶里移除并删除。
注意:这里不用更新 _pageid,因为合并的是后面的 span。
标记与回收
// 标记为未使用
spon->_isUse = false;// 把合并后的 span 挂回桶里
_sponlists[spon->_n].PushFront(spon);
把span标记为“空闲”
按照它的页数_n放回对应的桶。(比如合并后有16页,就放到_sponlists[16])
更新映射表
// 更新映射表(注意要把所有页都映射回这个 span)
for (PAGE_ID i = 0; i < spon->_n; i++)
{_idSpanMap[spon->_pageid + i] = spon;
}
每一页号都必须更新映射表 _idSpanMap。
这样以后只要给定一个对象指针,通过页号计算,就能快速找到它所属的 span。
内存池为什么要前后页合并,其实是为了对付内存碎片。
什么是前后合并
假设你的 PageCache 管理的是 页粒度内存(比如每页 8KB)。线程申请小对象时,最终会从PageCache 里拿一段 Span(可能是 2 页、4 页……)。线程释放对象,Span 可能会变空(usecount ==0),于是回到 PageCache。这时候,如果我们只把 空 Span 丢回桶里,不去管它前后的邻居页,就会出现这样的情况:
[SpanA: 2页 空闲] [SpanB: 3页 已占用] [SpanC: 2页 空闲]
虽然一共空闲了4页,但它们是零散的,没有拼成大页。 如果业务突然需要一个4页的Span,你就分不出来了,即使实际上有4页的空闲空间。这就是外碎片问题。为什么要合并
通过“前后页合并”,PageCache 能在释放时把相邻的空闲 Span 拼起来:释放 SpanC 后:
[SpanA: 2页 空闲] [SpanB: 3页 已占用] [SpanC: 2页 空闲]
、
再释放 SpanB后(空闲):[SpanA: 2页 空闲] [SpanB+C: 5页 空闲]
再合并 SpanA:[SpanA+B+C: 7页 空闲]
这样一来,大对象/大页的申请就有了可能。
前后页合并的目的,就是减少外部碎片,把零散的小Span拼成大Span,从而保证大对象申请时有足够的连续页可用。
为什么要同时“往前”和“往后”
只往前:如果释放的是“最右的块”,就可能错过机会。只往后:如果释放的是“最左的块”也可能错过。往前+往后:不管释放的是哪一块,都能把它的左右相邻空闲块合并成更大的块。
优化
大对象申请逻辑
在内存池设计里,ThreadCache 专门负责 小对象分配,性能优先。我们这里设置的上限是 256KB,也就是说:
<=256KB 的申请:走 ThreadCache 的自由链表,快速分配。>256KB 的申请:不再经过 ThreadCache,而是直接去 PageCache 拿大块页。
为什么?因为如果让 ThreadCache 也去维护这么大的对象,它会产生几个问题:
自由链表空间浪费
大对象不可能像小对象一样频繁复用,存到 ThreadCache 里容易长时间闲置,占用大量内存。
竞争加剧
大对象分配涉及到多页拼接,这些逻辑已经超出 ThreadCache 的职责范围。放在里面会把它搞得很臃肿。
PageCache 更适合管理大块
PageCache 本来就是以页为单位(8KB)管理的,申请大于 256KB 的对象,直接去 PageCache 找 Span 才是正路。
申请内存
static inline size_t RoundUp(size_t size)
{if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{return _RoundUp(size,1<<PAGE_SHIFT); //大于256kb的申请逻辑}
}
大对象是以页为单位申请的,所以需要做页对齐。这里 (1 << PAGE_SHIFT) 比 8*1024 更灵活。因为 PAGE_SHIFT 是可配置的,未来想调整页大小,只要改一处就行。
static void* ConcurrentAlloc(size_t size)
{if (size > MAX_BYTES){// 页对齐size_t bytes = SizeClass::RoundUp(size);// 上锁,因为 PageCache 是全局共享的PageCache::GetInstance()->_page_Mtx.lock();// 申请页数 = 对齐后的字节数 / 页大小Spon* spon = PageCache::GetInstance()->NewSpon(bytes >> PAGE_SHIFT);PageCache::GetInstance()->_page_Mtx.unlock();// 返回 span 的起始地址void* ptr = (void*)(spon->_pageid << PAGE_SHIFT);return ptr;}else{.......}}
大于 256KB 的申请直接走 PageCache,不会进入 ThreadCache。
分配前必须做 页对齐,否则可能出现跨页不连续的问题。
PageCache 是全局资源,必须加锁,避免多线程同时修改 _idSpanMap 出错。
返回地址的时候,是 span->_pageid << PAGE_SHIFT,这就是对应的大块内存的首地址。
Spon* PageCache::NewSpon(size_t k)
{if (k > NPAGES - 1){assert(k > 0);Spon* kSpon = new Spon;void* ptr = SystemAlloc(k);kSpon->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;kSpon->_n = k;_idSpanMap[kSpon->_pageid] = kSpon;return kSpon;} ..........
}
对于小于128页的内存申请NewSpan无需修改,如果用户需求是>128页呢?此时PageCache桶就无法满足,我们直接向系统申请即可。
SystemAlloc(k):直接向操作系统要 k 页。
kSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
ptr 是系统返回的地址。
右移 PAGE_SHIFT(例如 13,相当于除以 8192),得到页号。
页号就是这个 span 的唯一标识。
_idSpanMap[kSpan->_pageId] = kSpan;
建立页号 → span 的映射关系。
后面通过某个地址,可以用 _idSpanMap 快速找到它属于哪个 span。
释放内存
我们内存池分两类情况:
小块内存(≤ 256KB):走 ThreadCache / CentralCache,释放时回收到对应的自由链表桶。
大块内存(> 256KB):直接跟系统交互,不走 ThreadCache。
所以ConcurrentFree 和 ReleaseSpanToPageCache 就是专门处理大块内存释放的。
static void ConcurrentFree(void* ptr, size_t size){Spon* spon = PageCache::GetInstance()->MapObjectToSpon(ptr);if (size > MAX_BYTES){PageCache::GetInstance()->_page_Mtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_page_Mtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}}
MapObjectToSpan(ptr):通过地址找到它对应的 Span。
如果 size > MAX_BYTES (256KB) → 说明是大块内存,不走普通逻辑。
加锁是必须的,因为大块内存操作直接改 _idSpanMap,不是线程私有的。
调用 ReleaseSpanToPageCache(span) 释放大块内存。
void PageCache::ReleaseSpanToPageCache(Spon* spon)
{if (spon->_n > NPAGES - 1){void* ptr = (void*)(spon->_pageid << PAGE_SHIFT);SystemFree(ptr);delete spon;return;}...........
}
判断 span->_n > NPAGES - 1:超过 PageCache 能管理的最大桶范围,说明这是 直接向系统申请的 span。
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);把页号左移回去,得到 span 的真实地址。
SystemFree(ptr);把内存交还给系统。
delete span;把 span 元信息结构释放掉。
用对象池优化new和delete
最开始的讲的对象池的实现,这个对象池优于new和delete 也就是优于malloc。所以我们要把高并发内存池项目中的new和delete改一下。
私有成员变量,引入对象池。
在之前用到过new和delete的地方,改为对象池的new和delete。
优化内存释放函数减少调用
//static void ConcurrentFree(void* ptr, size_t size)优化前
static void ConcurrentFree(void* ptr)//优化后
{Spon* spon = PageCache::GetInstance()->MapObjectToSpon(ptr);size_t size = spon->_objsize;if (size > MAX_BYTES){PageCache::GetInstance()->_page_Mtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(spon);PageCache::GetInstance()->_page_Mtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}
之前内存释放,每次都要添加字节数的大小,这样不仅非常繁琐还容易出错。所以这里通过一个非常巧妙的方式优化一下。
span结构中,添加一个成员变量 来记录size的大小。每次去pagecache申请内存的时候·都要记录一下size的大小。然后根据span和页号的映射就能是走大内存的释放逻辑还是走小内存的释放逻辑了。
正因为有 _idSpanMap,所以:我们不需要额外传 size;只要 ptr -> pageId -> span,就能定位这块内存属于哪一类、多少页;然后根据 span 的 _n(页数)和 _objsize(小对象大小)来决定释放到 ThreadCache 还是 PageCache。
性能测试与性能瓶颈
#include "ConcurrentAllo.h"
#include <thread>
#include <vector>
#include <atomic>
#include <iostream>
#include <chrono>
#include<cstdio>
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++)free(v[i]);size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%zu个线程并发执行%zu轮次,每轮次malloc %zu次: 花费:%.2f ms\n",nworks, rounds, ntimes, (double)malloc_costtime);printf("%zu个线程并发执行%zu轮次,每轮次free %zu次: 花费:%.2f ms\n",nworks, rounds, ntimes, (double)free_costtime);printf("%zu个线程并发malloc&free %zu次,总计花费:%.2f ms\n",nworks, nworks * rounds * ntimes, (double)(malloc_costtime + free_costtime));
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(ConcurrentAllo::ConcurrentAlloc(16));//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentAllo::ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread)t.join();printf("%zu个线程并发执行%zu轮次,每轮次concurrent alloc %zu次: 花费:%.2f ms\n",nworks, rounds, ntimes, (double)malloc_costtime);printf("%zu个线程并发执行%zu轮次,每轮次concurrent dealloc %zu次: 花费:%.2f ms\n",nworks, rounds, ntimes, (double)free_costtime);printf("%zu个线程并发concurrent alloc&dealloc %zu次,总计花费:%.2f ms\n",nworks, nworks * rounds * ntimes, (double)(malloc_costtime + free_costtime));
}
int main()
{size_t n = 10000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
debug下运行
release下运行
我们发现有性能瓶颈,高并发内存池的申请是比malloc快很多,但是释放却比malloc慢,下面来解决这一问题。
通过基数树进行优化,引入tcmalloc的基数树实现pagemap
#pragma once
#include"Common.h"// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1<<PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize>>PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
修改这里映射那里也要修改,进而之前映射的地方也要修改
注意基数树一层和二层只能在32(x86)位下运行
基数树在内存分配器中的应用
在提供的内存分配器代码中,基数树(Radix Tree)用于PageCache中,通过TCMalloc_PageMap实现页面ID(PAGE_ID)到Span结构的映射。基数树是一种高效的数据结构,用于快速查找和管理大范围的键值对,特别适合内存分配器中快速定位内存块所属的Span。
基数树的作用
基数树在PageCache::MapObjectToSpan中用于根据内存对象的地址计算页面ID,并快速找到对应的Span结构。这对于内存分配和释放至关重要,因为它允许分配器高效地确定任意内存块所属的页面范围,进而管理内存的分配和回收。
基数树的实现
代码中提供了三种基数树实现:TCMalloc_PageMap1(单层数组)、TCMalloc_PageMap2(两层基数树)和TCMalloc_PageMap3(三层基数树)。实际使用的是TCMalloc_PageMap1,我们以它为例讲解大致流程。
TCMalloc_PageMap1(单层数组)
结构:
TCMalloc_PageMap1是一个简单的单层数组,数组大小为2BITS(BITS=32-PAGE_SHIFT,通常PAGE_SHIFT=13,因此数组大小为219)。
初始化:
在构造函数中,通过SystemAlloc分配一大块内存来存储数组(array_),并用memset初始化为0。
数组的每个元素是一个void*指针,指向对应的Span结构。
存储(set):
set(Number k, void* v):将页面ID k映射到Span指针v,直接将v存储在array_[k]。
页面ID k由内存地址右移PAGE_SHIFT位(通常13位,表示8KB页面)得到。
查找(get):
get(Number k):根据页面ID k直接访问array_[k],返回对应的Span指针。
如果k超出范围(k >> BITS > 0),返回NULL。
特点:
简单高效,适合页面ID范围较小的场景。
空间开销较大(2^19 * sizeof(void*)字节,通常为4MB或8MB),但查询时间为O(1)。
为什么用基数树?
高效性:基数树提供快速的键值查找(O(1)或O(log k),取决于层数),适合高频的页面ID到Span映射查询。
空间优化:相比直接使用std::unordered_map,基数树(特别是多层实现)在处理大范围键时更节省内存。
线程安全:PageCache使用互斥锁(_pageMtx)保护基数树操作,确保并发安全。
gitee:高并发内存池