EfficMultiCoreMemoryPool项目
目录
1. 定长内存池介绍
2. tcmalloc整体框架了解
2.1 ThreadCache设计
各区间内存浪费分析
2.1.1 ThreadCache申请内存
2.1.2 ThreadCache释放内存
2.1.3 TLS线程局部变量
2.2 CentralCache设计
2.2.1 CentralCache申请内存
ThreadCache --> CentralCache的申请内存 - FetchRangeObj
GetOneSpan给FetchRangeObj返还给Thread Cache
2.2.2 CentralCache释放内存:
2.3 PageCache设计
2.3.1 PageCache申请内存
2.3.2 PageCache释放内存
3. 项目优化
3.1 解决大于256KB的情况
3.1.1 申请内存情况
3.1.2 释放内存情况
3.2 清理项目中使用的malloc-->new
3.3 优化项目的ConcurrentFree接口
3.4 项目运行调试+测试
项目错误
1. new(obj)T 定位new错误
2. PopRange访问异常
3.5 基数树的项目优化
3.6 项目缺陷
3.7 对比malloc
编辑 3.8 打包为静态库
编辑 3.9 项目链接:
1. 定长内存池介绍
-
定长内存池是一个用于解决
new/malloc
API申请内存时产生大量内存碎片的简易方案,该方案只能申请固定大小的内存,具有局限性,定长内存池主旨是通过跳过malloc
直接向堆区申请一大块内存使用指针memory
进行管理和分配,对于释放的内存碎片使用自由链表freelist
进行回收管理,实现内存碎片的再次利用,从而提高性能! -
了解定长内存池的实现方式可以让我们了解内存池的概念!
-
为什么使用
char*
用于管理系统调用申请的大块内存,由于char
为1字节,这样方便适用申请各种大小的内存块。 -
自由链表
freelist
用于链接管理归还Delete
的内存块,即使用过的内存块。我们使用归还内存碎片的前4个字节
或者前8个字节
用于指向下一个内存碎片的地址。这个由使用机器是多少位决定!因此我们划分这个内存块的头4字节/8字节
来作为存储指针,建议使用*(void**)
,首先void*
是一个固定大小的指针,它的大小由机器决定,因此这种用法就可以不要考虑当前使用的机器是多少位了! -
我们回收内存碎片需要使用到内存碎片头
4/8
字节去指向下一块碎片,但是我们可能出现申请的内存没有这么大,因此我们需要保证至少要申请一个指针大小的内存块,但这会可能造成内碎片问题!(有可能我们此次申请对象只需要1/2/3字节)。
内碎片(Internal Fragmentation)
定义: 当分配给进程的内存块实际大小大于进程实际需要的大小时,剩余未使用的部分称为内碎片。这些碎片位于已分配的内存块内部,无法被其他进程使用。
外碎片(External Fragmentation)
定义: 内存中存在大量分散的小块空闲内存,虽然它们的总容量足够满足某个请求,但由于不连续,无法合并成一个足够大的连续块供进程使用。
-
我们申请内存块时,会分配给对象一块T大小的固定内存,然后由
T* object
指向这块内存,_memory指针指向_memory + sizeof(T)
后面的内存块,但是这会造成一个问题,我们不知道什么时候_memory所指向的内存块使用完全了,指针一直往后走并不会指向空,可能是越界访问到其他未分配的内存块,因此我们需要一个可以时刻记录内存块剩余大小的变量!-->remainbytes
-
同时可能到最后
_memory
所指向的内存块已经不够分配一个大小为T(remainbytes < sizeof(T))
的对象了,在此时我们需要向底层再次申请一块内存块!
-
使用完的内存块归还需要链接到
freelist
自由链表,以待再次分配使用,所以我们在申请_memory
内存块之前需要先检查自由链表是否存在着可以使用的内存块,由于是定长内存池,因此自由链表链接的内存块一定是可以使用的,因为内存块的大小一样,但对于后面我们要实现的tcmalloc
就需要考虑了! -
我们
_memory
使用申请的大块内存要直接使用系统接口去调用,跳过malloc
,这边我们是Windows系统,使用的系统调用是VirtualAlloc
,同时我们需要注意我们获取了内存块之后需要使用定位new,和手动调用析构函数!
VirtualAlloc
是 Windows 操作系统提供的一个底层内存管理 API,允许应用程序直接在进程的虚拟地址空间中保留(reserve)或提交(commit)内存区域。它是 Windows 内存管理的核心接口之一,常用于需要精细控制内存分配的场景(如高性能计算、自定义内存池、硬件交互等)。
LPVOID VirtualAlloc(
LPVOID lpAddress, // 期望的内存起始地址(通常设为 NULL,由系统决定)
SIZE_T dwSize, // 要分配的内存大小(字节为单位,需对齐到页面边界)
DWORD flAllocationType,// 分配类型(如 MEM_COMMIT 或 MEM_RESERVE)
DWORD flProtect // 内存保护选项(如 PAGE_READWRITE)
);
核心参数说明
lpAddress
指定内存区域的期望起始地址。通常设为
NULL
,由系统自动选择合适地址。若要手动指定地址,需确保地址是系统分配粒度的倍数(通常为 64KB),可通过
GetSystemInfo
获取具体值。
dwSize
要分配的内存大小(字节)。实际分配时,系统会按内存页大小(通常 4KB)向上对齐。
flAllocationType
内存分配类型,常用标志:
MEM_RESERVE
保留虚拟地址空间,但不分配物理内存或页面文件空间。用于预先占用连续的地址范围。
MEM_COMMIT
提交物理内存或页面文件空间,使其可用于程序。可与MEM_RESERVE
结合使用。
MEM_RESET
标记内存为未使用(用于优化内存使用,不影响数据)。
MEM_LARGE_PAGES
使用大页内存(需权限和系统支持,提升性能)。
flProtect
内存保护属性,控制访问权限:
PAGE_READONLY
:只读
PAGE_READWRITE
:可读写
PAGE_EXECUTE_READ
:可执行和读取
PAGE_EXECUTE_READWRITE
:可执行、读写
PAGE_NOACCESS
:禁止访问
#pragma once
#include <iostream>
#include <vector>
#pragma once
#include "Comm.h"
#include <Windows.h>// 系统调用申请内存块
inline static void* SystemCallGetMemory(size_t Size)
{
#ifdef _WIN64VirtualAlloc()
#elif _WIN32void* ptr = VirtualAlloc(nullptr, Size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// Linux brk()/mmap()
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}// 定长内存池
template<class T>
class ObjectPool
{
public:ObjectPool() :_memory(nullptr),_remainbytes(0),_freelist(nullptr){}T* New(){T* obj = nullptr;if (_freelist) // 如果自由链表存在着内存块,优先使用{// 头删void* next = *((void**)_freelist);obj = (T*)_freelist;_freelist = next;}else{// 向_memory申请内存if (_remainbytes < sizeof(T)) // 如果当前内存已经不能申请一个对象所需要的大小T则去底层申请内存块{_remainbytes = 128 * 1024; /* 一次申请 128 KB */_memory = (char*)SystemCallGetMemory(_remainbytes);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); // 最少分配一个指针大小的内存块_memory += objSize;_remainbytes -= objSize;}// 使用定位newnew(obj)T;return obj;}void Delete(T* obj){// 手动调用析构函数obj->~T();// 头插(两种情况都适用)// 1. _freelist -> nullptr// obj// 2. _freelist -> prevobj// obj*(void**)obj = _freelist; // *(void**) 表示一个指针的大小,_WIN32 / _WIN64 分别是 4位/8位,这种写法适用不同机器位数!_freelist = obj;}~ObjectPool() {}
private:char* _memory = nullptr; // 指向大块内存的指针size_t _remainbytes = 0; // 用于记录_memory指向的大块内存的剩余量void* _freelist = nullptr; // 自由链表,用于管理归还的内存块
};// ObjectPool性能测试
struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 5;// 每轮申请释放多少次const size_t N = 100000;std::vector<TreeNode*> v1;v1.reserve(N);size_t begin1 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();std::vector<TreeNode*> v2;v2.reserve(N);ObjectPool<TreeNode> TNPool;size_t begin2 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < N; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();std::cout << "new cost time:" << end1 - begin1 << std::endl;std::cout << "object pool cost time:" << end2 - begin2 << std::endl;std::cout << std::endl;
}
#include "Comm.h"
#include "ObjectPool.h"int main()
{// Release 测试对比10次for(int i = 0; i < 10; ++i)TestObjectPool();return 0;
}
测试对比我们发现相较于malloc,我们实现的定长内存池申请和释放固定大小的对象性能显著提升!
2. tcmalloc整体框架了解
整体框架了解图示:
2.1 ThreadCache设计
-
线程缓存独属于每一个线程使用,因此在高并发场景下,多线程并发申请内存时只需要申请自己本身
thread cache
, 不用涉及到争抢资源的问题,因此不需要加锁,这也是tcmalloc
在高并发条件下高效的原因。 -
每个线程独享的
thread cache
的大小为256KB
,一旦超过这个大小就需要向下一层CentralCache
申请空间使用。 -
Thread cache
是一种Hash桶的设计,每个映射的位置都挂着自由链表,每个自由链表上挂着不同大小的内存块,为了解决不同类型申请内存块的问题(定长内存池没有解决的),Hash的下标映射按字节位映射,但是一个thread cache
的大小为256KB
, 我们不可能一个字节一个字节的映射,256 * 1024 = 262,144
这个下标太多了,因此我们在允许一定程度内存浪费(内碎片)的基础上,将对应大小的对象进行对齐到一定字节数,设计下标:-
[1, 128] 8字节对齐 哈希桶[0, 16)
-
[128 + 1, 1024] 16字节对齐 哈希桶[16, 72)
-
[1024 + 1, 8 * 1024] 128字节对齐 哈希桶[72, 128)
-
[8 * 1024 + 1, 64 * 1024] 1024字节对齐 哈希桶[128, 184)
-
[64 * 1024 + 1, 256 * 1024] 8 * 1024字节对齐 哈希桶[184, 208)
-
-
无论是申请内存块,还是归还内存块都是如此,我们现在来计算这种对齐设计导致内碎片造成内存浪费问题,综合来看对于内碎片的浪费影响不是太大。
各区间内存浪费分析
-
1~128字节(8字节对齐)
-
最大浪费:7字节(例如申请1字节时)
-
浪费比例:最高87.5%(1字节申请)
-
-
129~1024字节(16字节对齐)
-
最大浪费:15字节(例如申请129字节时)
-
浪费比例:最高11.7%(129字节申请)
-
-
1025~8KB(128字节对齐)
-
最大浪费:127字节(例如申请1025字节时)
-
浪费比例:最高12.4%(1025字节申请)
-
-
8KB+1~64KB(1024字节对齐)
-
最大浪费:1023字节(例如申请8193字节时)
-
浪费比例:最高12.5%(8193字节申请)
-
-
64KB+1~256KB(8KB对齐)
-
最大浪费:8191字节(例如申请65537字节时)
-
浪费比例:最高12.5%(65537字节申请)
-
-
class ThreadCache
{
public:void* Allocate(size_t size); // 分配内存空间void Deallocate(void* ptr, size_t size); // 回收内存空间private:freelist _freelists[FREELISTSIZE]; // 管理_freelist自由链表的哈希桶
};// 底层freelist对象
class freelist
{
public:// 插入void Push(void* obj){assert(obj);Nextobj(obj) = _freelist;_freelist = obj;}// 弹出void* Pop(){assert(_freelist);void* next = *(void**)_freelist;void* obj = _freelist;_freelist = next;return obj;}// 检测当前的自由链表是否为nullptrbool Empty(){return _freelist == nullptr;}private:void* _freelist = nullptr;
};
2.1.1 ThreadCache申请内存
-
我们每个线程需要申请对象大小就需要拿到对应的内存块,根据前面的Hash桶设计,我们需要找到对应申请对象大小的Hash桶下标
index
,从链接该下标index
的自由链表_freelist
中拿取内存块,因此我们需要根据对象大小找到对应的下标,我们需要封装一个函数去完成这个任务:
// 计算对象对应需要申请的哈希桶下标
// 传入参数: size 申请对象的字节大小 alignNum 对于该对象需要对齐的字节对齐数
static inline size_t _GetHashIndex(size_t size, size_t alignNum)
{if (size % alignNum == 0){return size / alignNum - 1;}else{return size / alignNum;}
}static inline size_t GetHashIndex(size_t size)
{assert(size < MAXBYTESSIZE);if (size <= 128){return _GetHashIndex(size, 8);}else if (size <= 1024){return _GetHashIndex(size, 16);}else if (size <= 8 * 1024){return _GetHashIndex(size, 128);}else if (size <= 64 * 1024){return _GetHashIndex(size, 1024);}else if (size <= 256 * 1024){return _GetHashIndex(size, 8 * 1024);}else{assert(false);}
}
-
找到指定的Hash桶下标,我们需要查看该Hash桶挂载的自由链表
注意:如果当前ThreadCache没有内存块需要向CentralCache申请需要告诉CentralCache对应的对齐数方便拿取指定大小的内存块!_freelist
是否存在可以使用的内存块,如果存在直接Pop一个内存块以供使用,如果没有就向下一层CentralCache
申请内存块!
// 计算对象大小按对齐数大小对齐
// 传入参数: size 申请对象的字节大小 alignNum 对于该对象需要对齐的字节对齐数
static inline size_t _GetAlignNum(size_t size, size_t alignNum)
{ size_t AlignNum = 0;if (size % alignNum != 0) // 说明没有对齐{AlignNum = ((size / alignNum) + 1) * alignNum; // 按对齐数对齐}else // 说明已经对齐{AlignNum = size;}return AlignNum;
}static inline size_t GetAlignNum(size_t size)
{assert(size > 0);if (size <= 128){return _GetAlignNum(size, 8);}else if (size <= 1024){return _GetAlignNum(size, 16);}else if (size <= 8 * 1024){return _GetAlignNum(size, 128);}else if (size <= 64 * 1024){return _GetAlignNum(size, 1024);}else if (size <= 256 * 1024){return _GetAlignNum(size, 8 * 1024);}else{assert(false);return -1;}
}
void* ThreadCache::Allocate(size_t size)
{// 申请的字节数不能大于最大申请字节数assert(size <= MAXBYTESSIZE);// 获取对齐后的大小 size_t alignSize = SizeClass::GetAlignNum(size);// 获取对应Hash桶的下标 size_t index = SizeClass::GetHashIndex(size); if (!_freelists[index].Empty()) {// ThreadCache中Hash桶所管理的自由链表存在内存块直接申请return _freelists[index].Pop();}else{// 如果ThreadCache中对应的Hash桶没有内存可以申请,向上一层CentralCache申请return FetchFromCentralCache(index, alignSize); }}
2.1.2 ThreadCache释放内存
归还内存不需要free掉,只需要将归还的内存块链接到对应Hash桶的自由链表中即可!当满足一定的条件后:项目中实现的是满足实际的内存块数大于我们一次批量申请的内存块数就需要进行回收!
void ThreadCache::Deallocate(void* ptr, size_t size) // TODO,后续需要处理一下,不用传size
{assert(ptr);assert(size <= MAXBYTESSIZE);// 找出对应下标的Hash桶,将归还的内存块链接到指定的下标size_t index = SizeClass::GetHashIndex(size);_freelists[index].Push(ptr);// 当_freelists[index]对应的内存块数量 >= 一次批量申请的个数时我们对其进行回收if (_freelists[index].Size() >= _freelists[index].GetMaxSize()){ListTooLong(_freelists[index], size);}}
ListTooLong函数的作用是去ThreadCache
底层将对应要归还的内存块摘取下来,这个摘取的大小最好不要是全部拿走,这边拿走GetMaxSize
的数量即可,拿到对应的内存块之后就可以传递给下一层ReleaseListToSpans
进行回收了!
class freelist
{
public:// 插入void Push(void* obj){assert(obj);Nextobj(obj) = _freelist;_freelist = obj;++_size;}// 弹出void* Pop(){assert(_freelist);void* next = Nextobj(_freelist);void* obj = _freelist;_freelist = next;--_size;return obj;}// 检测是否为nullptrbool Empty(){return _freelist == nullptr;}// 慢开始算法的增长数size_t& GetMaxSize(){return _maxsize;}size_t Size(){return _size;}// 循环插入void PushRange(void* start, void* end, size_t n){// 不论原本的_freelist指向是否为nullptrNextobj(end) = _freelist;_freelist = start;_size += n;}// 循环弹出void PopRange(void*& start, void*& end, size_t n){assert(n >= _size); // 取n个肯定是 <= _size(真实的大小)start = _freelist;end = start;for (int i = 0; i < n - 1; ++i){end = Nextobj(end);}_freelist = Nextobj(end);Nextobj(end) = nullptr;_size -= n;}private:void* _freelist = nullptr;size_t _maxsize = 1; // 用于慢开始反馈调节算法的增长调节size_t _size = 0; // Hash映射下标_freelist链接的内存块数量
};
void ThreadCache::ListTooLong(freelist& list, size_t size)
{void* start = nullptr, * end = nullptr;// 将GetMaxSize()大小数量的内存块链从list弹出// start / end 输出型参数list.PopRange(start, end, list.GetMaxSize());// 继续将要回收的内存块链传递给下一层CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
2.1.3 TLS线程局部变量
前面我们的设计是每个线程独享一个ThreadCache对象,该线程从中申请和释放内存,但是我们知道线程对于全局变量都是共享的,怎么让每个线程拥有独属于自己的局部对象呢?--TLS
#ifdef _WIN32// 为每一个线程都分配一个ThreadCache只由自己独享,TLS线程局部,添加static只在当前文件生效static __declspec(thread) ThreadCache* pTLS_ThreadCache = nullptr;
#elif// Linux TODOstatic __thread ThreadCache* pTLS_ThreadCache = nullptr;
#endif
static void* ConcurrentAlloc(size_t size)
{// 通过TLS每个线程无锁的获取自己的专属的ThreadCache对象if (pTLS_ThreadCache == nullptr){pTLS_ThreadCache = new ThreadCache;}// For Text// std::cout << std::this_thread::get_id() << ":" << pTLS_ThreadCache << std::endl;return pTLS_ThreadCache->Allocate(size);
}static void ConcurrentFree(void* ptr, size_t size) // TODO后续处理不要传size
{assert(pTLS_ThreadCache);pTLS_ThreadCache->Deallocate(ptr, size);
}
2.2 CentralCache设计
-
为了方便
ThreadCache
的申请使用,CentralCache
的设计也是一个Hash桶自由链表,这个Hash桶的映射下标和ThreadCache一致,这是为了方便取到对应大小的内存块使用。 -
不同于
ThreadCache
的是CentralCache
每个下标映射挂载的是一个Span
双向链表对象,Span
表示的是以页为单位从PageCache
申请的大块内存块,然后切分成对应的小块内存挂载到每个Span下。
#pragma once
#include "Comm.h"// 单列模式:饿汉模式
class CentralCache
{
public:// 获取CentralCache单列对象static CentralCache* GetInstance(){return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);// 将一定数量的对象释放到span跨度void ReleaseListToSpans(void* start, size_t size);private:SpanList _spanlists[FREELISTSIZE]; // 大小和ThreadCache一致的双向SpanList对象Hash桶结构
private:// 防止拷贝/赋值重载/构造CentralCache(){}CentralCache(const CentralCache&) = delete;CentralCache operator=(const CentralCache&) = delete;// 单列饿汉static CentralCache _sInst;
};
2.2.1 CentralCache申请内存
-
从 Thread Cache 到 Central Cache
-
当 Thread Cache 中没有可用内存时,会批量
bathNum
向 Central Cache 申请内存对象。 -
申请数量采用类似 TCP 拥塞控制的“慢开始算法”动态调整。
-
Central Cache 通过哈希映射的 SpanList 管理 Span,从中取出对象分配给 Thread Cache。
-
此过程需加锁(桶锁),以提高并发效率。
-
-
从 Central Cache 到 Page Cache
-
若 Central Cache 的 SpanList 中所有 Span 均无可用内存,则向 Page Cache 申请新 Span。
-
Page Cache 返回 Span 后,Central Cache 将其管理的内存
按大小切分
为自由链表,并分配对象给 Thread Cache。
-
-
Span 使用计数
-
Central Cache 的 Span 通过
use_count
记录已分配的对象数。 -
每分配一个对象给 Thread Cache,
use_count
递增(++use_count
)。
-
ThreadCache --> CentralCache的申请内存 - FetchRangeObj
我们在前面说到如果对应的线程向自己的ThreadCache
申请内存块失败后,会继续向下一层CentralCache
申请,并且这一次的申请是批量的申请(这个批量申请不能过多,也不可以过少,要做到动态的一个调节),由于这两层结构的底层都是相同映射的Hash桶结构,因此需要对应大小的内存块就可以去找对应大小Hash映射下标的SpanList
,从SpanList
拿到一个非空的Span
,获取该Span
下自由链表挂载的内存块。
// 从中心缓存获取一定数量的对象给thread cache
// start / end 输出型参数
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{// 1. 计算要找的Hash桶下标size_t index = SizeClass::GetHashIndex(size);// 2. 上锁,防止多线程访问的线程安全问题_spanlists[index]._mtx.lock();// 3. 从当前Hash桶下标获取一个非空的SpanSpan* span = GetOneSpan(_spanlists[index], size);// 判断拿取的span是否正确assert(span);assert(span->_freelist);// 4. 从Span中获取batchNum个对象内存块start = span->_freelist;end = start;size_t i = 0, actualNum = 1;// [1] span->_freelist有足够的对象内存块,就获取足够内存块// [2] span->_freelist没有足够的对象内存块,能拿多少就拿多少while (i < batchNum - 1 && Nextobj(end) != nullptr){end = Nextobj(end);++i;++actualNum;}// 切断内存块链接span->_freelist = Nextobj(end);Nextobj(end) = nullptr;// 解锁_spanlists[index]._mtx.unlock();// 返回真实拿到的内存块数量return actualNum;
}
// 一次thread cache从中心缓存获取多少个
static size_t NumMoveSize(size_t size)
{assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高, 最高不超过512个// 大对象一次批量上限低,最低少于2个// MAXBYTESSIZE 256KB int num = MAXBYTESSIZE / size;// num = 256KB / 256KB = 1 --> 至少分配2个// num = 256KB / 8byte = 32,768 --> 最多分配512个if (num < 2)num = 2;if (num > 512)num = 512;return num;
}class freelist
{
public:// 插入void Push(void* obj){assert(obj);Nextobj(obj) = _freelist;_freelist = obj;}// 弹出void* Pop(){assert(_freelist);void* next = Nextobj(_freelist);void* obj = _freelist;_freelist = next;return obj;}// 检测是否为nullptrbool Empty(){return _freelist == nullptr;}// 慢开始算法的增长数size_t& GetMaxSize(){return _maxsize;}// 循环插入void PushRange(void* start, void* end){// 不论原本的_freelist指向是否为nullptrNextobj(end) = _freelist;_freelist = start;}private:void* _freelist = nullptr;size_t _maxsize = 1; // 用于慢开始反馈调节算法的增长调节
};
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// 慢开始反馈调节算法// 1. 一开始不会向CentralCache申请过多内存块,防止用不完// 2. 如果不断申请size大小内存的需求增多,那么batchNum会不断的增长,直到上限(上限由NumMoveSize决定)// 3、size越大,一次向central cache要的batchNum就越小(NumMoveSize控制上限)// 4、size越小,一次向central cache要的batchNum就越大(慢开始反馈调节算法)size_t batchNum = std::min(_freelists[index].GetMaxSize(), SizeClass::NumMoveSize(size));if (_freelists[index].GetMaxSize() == batchNum){_freelists[index].GetMaxSize() += 1;}void* start = nullptr, * end = nullptr;// 我们想要batch(批量)Num个内存块,但是实际上不一定有这么多,有多少拿多少,actualNum实际内存块的个数size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);// 至少有1个assert(actualNum > 1);// 1. 如果申请到一个直接返回给申请的对象使用// 2. 如果申请到的是多个,要将剩下多余的内存块链接到当前Hash的自由链表中if (actualNum == 1){assert(start == end);return start;}else{_freelists[index].PushRange(Nextobj(start), end);// 一旦使用,start头4/8字节会被覆盖,不用考虑是否置为nullptrreturn start;}
}
GetOneSpan给FetchRangeObj返还给Thread Cache
我们知道CentralCache
的设计也是Hash桶,这个桶对应的index挂载着不少的Span
,我们的ThreadCache
需要的内存块链就要从其中的一个Span
下拿取,因此我们需要先获取一个非空的Span
以供拿取!
// 管理多个连续页大块内存跨度Span结构
class Span
{
public:PAGE_ID _pageid = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _prev = nullptr; // 双向链表结构Span* _next = nullptr;size_t _useCount = 0; // 小块内存分配给ThreadCache的使用计数void* _freelist = nullptr; // 管理小块内存的自由链表bool _isUse = false;
};// 带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head == _head->_next;}void PushFront(Span* newSpan){Insert(Begin(), newSpan);}Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);// newSpan// prev posSpan* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);// prev pos nextSpan* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev; }private:Span* _head; // 哨兵位
public:std::mutex _mtx; // 桶锁
};
2.2.2 CentralCache释放内存:
-
从 Thread Cache 回收到 Central Cache
-
当 Thread Cache 内存过多或线程销毁时,内存会被释放回 Central Cache。
-
每回收一个对象,对应 Span 的
use_count
递减(--use_count
)。
-
-
从 Central Cache 回收到 Page Cache
-
当 Span 的
use_count
减至 0 时,表示所有对象已回收。 -
该 Span 将被释放回 Page Cache,Page Cache 会尝试合并相邻的空闲页以形成更大的连续内存块。
-
// 将一定数量的对象释放到span跨度
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{// 算出属于哪个桶indexsize_t index = SizeClass::GetHashIndex(size);_spanlists[index]._mtx.lock(); // 访问临界资源上锁while (start){void* next = Nextobj(start);// 获取指定的span,这个是怎么实现的,为什么指定对象可以找到对应的span????? 一串内存块链可能前两个和后两个属于不同的页// 我们申请了两个页,页号分别是: 2000 2001,那么在2000 -- 2001 之间的所有地址 >> PAGE_SHIFT都是2000Span* span = PageCache::GetInstance()->MapObjectToSpan(start);// 头插Nextobj(start) = span->_freelist;span->_freelist = start;span->_useCount--;// 判断span是否所有小块内存块都归还完毕// 如果完毕,这个span就可以再回收给Page Cache,Page Cache可以再尝试去做前后页的合并if (span->_useCount == 0){// 从Central Cache哈希桶取掉要合并的span_spanlists[index].Erase(span);span->_prev = nullptr;span->_next = nullptr;span->_freelist = nullptr; // 链接的内存块我们已经不会用到了,置为nullptr// 由于span已经从Central Cache哈希桶取掉,不属于临界资源,我们可以暂时去掉桶锁,方便其他线程申请和释放_spanlists[index]._mtx.unlock();// 我们要访问Page Cache将span合并,这个过程需要上锁访问!PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();// 有可能我们获取的start内存块链,其中前面的内存块属于一个span满足归还条件,后面的内存块属于另一个span需要访问// span归还之后还要访问其他的内存块需要加上锁_spanlists[index]._mtx.lock();}start = next;}_spanlists[index]._mtx.unlock(); // 解锁
}
// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT); // 为什么这边可以通过页号算出指定的spanauto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{// 出错了,不可能不存在对应的Spanassert(false);return nullptr;}
}
获取指定的span,这个是怎么实现的,为什么指定对象可以找到对应的span:
void Testaddress()
{// 测试我们的MapObjectToSpan是否能通过address获取pageid拿到具体的span// 2000页 2001页PAGE_ID id1 = 2000;PAGE_ID id2 = 2001;char* p1 = (char*)(id1 << PAGE_SHIFT);char* p2 = (char*)(id2 << PAGE_SHIFT);while (p1 < p2){std::cout << "p1:" << (void*)p1 << ":" << ((PAGE_ID)p1 >> PAGE_SHIFT) << std::endl;p1 += 8; // 8bytes切分}std::cout << "p2:" << (void*)p2 << ":" << ((PAGE_ID)p2 >> PAGE_SHIFT) << std::endl;
}
2.3 PageCache设计
-
PageCache
底层的设计也是哈希桶的结构,但每个index
下标的映射不同于前面的ThreadCache
和CentralCache
,PageCache
的下标index
映射是以页page
来进行的,最大的映射位是128page
页,每页Page
的大小可以进行调节,本项目使用的的8KB
为一页。 -
PageCache
没有对应的管理页内存块时会向底层的堆要内存空间,堆给PageCache
只申请128页进行挂载,如果需要使用其他页大小的span
,使用更大的span
来划分即可!如我现在想要一个2页的空间,PageCache
向堆申请了128页,此时划分128页为2页和126页,2页的span向上返回,126页span挂载到对应的index
下标。
#pragma once
#include "Comm.h"// 单列模式:饿汉模式
class PageCache
{
public:// 获取PageCache单列对象static PageCache* GetInstance(){return &_sInst;}Span* NewSpan(size_t k); // k 表示申请的page页数 // 获取从对象到span的映射Span* MapObjectToSpan(void* obj);// 释放空闲span回到Pagecache,并合并相邻的spanvoid ReleaseSpanToPageCache(Span* span);private:SpanList _spanlists[PAGESIZE]; // 管理存放PageCache的Span哈希桶结构// 防止拷贝/赋值重载/构造PageCache() {}PageCache(const PageCache&) = delete;PageCache operator=(const PageCache&) = delete;// 单列饿汉static PageCache _sInst;std::unordered_map<PAGE_ID, Span*> _idSpanMap; // 映射页号和其对应的Span,这种做法可以使用页号来查找Spanpublic:std::mutex _pageMtx;
};
2.3.1 PageCache申请内存
申请内存
-
central cache 向 page cache 申请内存时,page cache 先检查对应位置有无 span,无则向更大页找 span,找到后分裂。如申请 4 页 page,若 4 页后无 span,在 10 页找到则分裂为 4 页和 6 页 span。
-
若 _spanList[128] 无合适 span,系统用 mmap、brk 或 VirtualAlloc 等申请 128 页 span 挂自由链表,再重复步骤 1。
-
central cache 和 page cache 核心结构都是 spanlist 哈希桶,但 central cache 按 thread cache 大小对齐映射,spanlist 中 span 内存切成小块自由链表;page cache 按桶号映射,i 号桶挂 i 页内存。
Span* PageCache::NewSpan(size_t k)
{// 申请的页数k > 0 && k < PAGESIZEassert(k > 0 && k < PAGESIZE);// 先查看第k个桶是否存在有span可供使用if (!_spanlists[k].Empty()){Span* kSpan = _spanlists[k].PopFront();// 注意这边弹出使用span之前需要建立一个映射// 建立id和span的映射,方便Central Cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageid + i] = kSpan;}return kSpan;}// 查看后续其他桶是否存在可以划分的spanfor (int i = k + 1; i < PAGESIZE; ++i){if (!_spanlists[i].Empty()){Span* nSpan = _spanlists[i].PopFront();Span* kSpan = new Span;// 划分k页给kSpankSpan->_pageid = nSpan->_pageid;kSpan->_n = k;// 将剩下的n-k页挂载到指定的Hash桶下标映射nSpan->_pageid += k;nSpan->_n -= k;_spanlists[nSpan->_n].PushFront(nSpan);// 存储nSpan的首位页号跟nSpan的映射,方便Page Cache回收内存时进行的合并查找_idSpanMap[nSpan->_pageid] = nSpan;// _pageid:1000 _n:5 1000 - 1004,因此需要-1 _idSpanMap[nSpan->_pageid + nSpan->_n - 1] = nSpan;// 建立id和span的映射,方便Central Cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageid + i] = kSpan;}return kSpan;}}// 走到这边说明后续没有大页的span划分// 需要向堆直接申请一个128页的spanSpan* Bigspan = new Span;void* ptr = SystemCallGetMemory((PAGESIZE - 1) << PAGE_SHIFT); // 128 * 8KBBigspan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;Bigspan->_n = PAGESIZE - 1;_spanlists[Bigspan->_n].PushFront(Bigspan);return NewSpan(k); // 进行一个回调复用代码
}
注意:其中的_idSpanMap映射是为了后续释放内存时使用
2.3.2 PageCache释放内存
释放内存
-
central cache 释放 span 时,依次找其前后 page id 未使用的空闲 span 尝试合并,减少内存碎片。
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 向前探查是否存在可以合并的span, 缓解内存碎片(外碎片)问题while (1){PAGE_ID prevId = span->_pageid - 1;auto ret = _idSpanMap.find(prevId);// 如果前面页号不存在就不合并了,防止访问到没有分配给我们的内存if (ret == _idSpanMap.end())break;// 前面相邻的页的span在使用就不合并了。这边为什么不可以使用_useCount判断一个span是否在使用中????// _useCount == 0,可能是两种情况:1. 就是内存块全部归还回来了 2. 这个span刚刚从Central Cache申请准备使用// 如果用_useCount判断,可能会涉及线程安全问题!!Span* prevSpan = ret->second;if (prevSpan->_isUse == true)break;// 合并出超过128页的span没办法管理,不合并了if ((prevSpan->_n + span->_n) > PAGESIZE - 1)break;// 开始合并span->_pageid = prevSpan->_pageid;span->_n += prevSpan->_n;// 将合并的span从桶中拿下来_spanlists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}// 向后合并while (1){PAGE_ID nextId = span->_pageid + span->_n;auto ret = _idSpanMap.find(nextId);if (ret == _idSpanMap.end())break;Span* nextSpan = ret->second;if (nextSpan->_isUse == true)break;if ((nextSpan->_n + span->_n) > PAGESIZE - 1)break;// 合并span->_n += nextSpan->_n;_spanlists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}// 将合并成功的span插入_spanlists[span->_n].PushFront(span);// 设置状态为未使用状态span->_isUse = false;// 加载首尾页号到_idSpanMap,方便以后合并内存块_idSpanMap[span->_pageid] = span;_idSpanMap[span->_pageid + span->_n - 1] = span;
}
3. 项目优化
3.1 解决大于256KB的情况
3.1.1 申请内存情况
前面我们在介绍整个项目框架时每个线程独享的TLS的最大申请空间是256KB,一旦超过这个申请的内存,我们就需要在调用ConcurrentAlloc
函数时做一些处理。
-
内存块 <= 256KB
--> 三层缓存Cache内存池 -
内存块 > 256KB
-
PageCache最大的申请页数为
128
页,而256KB
按8KB
为一页的情况下是32
页,也就是说,申请的内存块以页为单位计算,如果申请的内存在32 ~ 128
页之间向PageCache
拿取即可! -
申请的内存块 > 128 页的大小,直接向堆申请对应的空间
-
static void* ConcurrentAlloc(size_t size)
{// 如果申请的内存块大小超过256KBif (size > MAXBYTESSIZE){// 算出对齐数,大于256KB的内存按一页对齐size_t alignSize = SizeClass::GetAlignNum(size);// 计算需要的页数size_t kpage = alignSize >> PAGE_SHIFT;// 直接去找PageCache要内存PageCache::GetInstance()->_pageMtx.lock(); // 访问PageCache临界资源需要上锁Span* span = PageCache::GetInstance()->NewSpan(kpage);PageCache::GetInstance()->_pageMtx.unlock(); // 解锁// 获取并且返回void* ptr = (void*)(span->_pageid << PAGE_SHIFT);return ptr;}else{// 通过TLS每个线程无锁的获取自己的专属的ThreadCache对象if (pTLS_ThreadCache == nullptr){pTLS_ThreadCache = new ThreadCache;}std::cout << std::this_thread::get_id() << ":" << pTLS_ThreadCache << std::endl;return pTLS_ThreadCache->Allocate(size);}
}
Span* PageCache::NewSpan(size_t k)
{// 申请的页数k > 0 assert(k > 0);// 如果申请的页数 > 128页,直接向堆要if (k > PAGESIZE - 1){void* ptr = SystemCallGetMemory(k << PAGE_SHIFT);Span* span = new Span;span->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;// 做好span和页号直接的映射_idSpanMap[span->_pageid] = span;// 返回return span;}// 先查看第k个桶是否存在有span可供使用if (!_spanlists[k].Empty()){return _spanlists->PopFront();}// 查看后续其他桶是否存在可以划分的spanfor (int i = k + 1; i < PAGESIZE; ++i){if (!_spanlists[i].Empty()){Span* nSpan = _spanlists[i].PopFront();Span* kSpan = new Span;// 划分k页给kSpankSpan->_pageid = nSpan->_pageid;kSpan->_n = k;// 将剩下的n-k页挂载到指定的Hash桶下标映射nSpan->_pageid += k;nSpan->_n -= k;_spanlists[nSpan->_n].PushFront(nSpan);// 存储nSpan的首位页号跟nSpan的映射,方便Page Cache回收内存时进行的合并查找_idSpanMap[nSpan->_pageid] = nSpan;// _pageid:1000 _n:5 1000 - 1004,因此需要-1 _idSpanMap[nSpan->_pageid + nSpan->_n - 1] = nSpan;// 建立id和span的映射,方便Central Cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageid + i] = kSpan;}return kSpan;}}// 走到这边说明后续没有大页的span划分// 需要向堆直接申请一个128页的spanSpan* Bigspan = new Span;void* ptr = SystemCallGetMemory((PAGESIZE - 1) << PAGE_SHIFT); // 128 * 8KBBigspan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;Bigspan->_n = PAGESIZE - 1;_spanlists[Bigspan->_n].PushFront(Bigspan);return NewSpan(k); // 进行一个回调复用代码
}
3.1.2 释放内存情况
释放内存的情况也是如此,需要特殊的处理。
-
内存块 <= 256KB
--> 三层缓存Cache内存池 -
内存块 > 256KB
-
PageCache最大的挂载页数为
128
页,而256KB
按8KB
为一页的情况下是32
页,也就是说,释放的内存块以页为单位计算,如果释放的内存在32 ~ 128
页之间挂载到PageCache
即可! -
释放的内存块 > 128 页的大小,直接向堆释放对应的空间即可
-
// 系统调用释放内存块
inline static void SystemCallFreeMemory(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else// sbrk unmmap等
#endif
}
static void ConcurrentFree(void* ptr, size_t size)
{// 如果释放的内存块大小超过256KBif (size > MAXBYTESSIZE){Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);PageCache::GetInstance()->_pageMtx.lock(); // 访问PageCache临界资源需要上锁PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock(); // 访问PageCache临界资源需要上锁}else{assert(pTLS_ThreadCache);pTLS_ThreadCache->Deallocate(ptr, size);}
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 如果释放的页数 > 128页,直接还给堆if (span->_n > PAGESIZE - 1){void* ptr = (void*)(span->_pageid << PAGE_SHIFT);SystemCallFreeMemory(ptr);delete span;return;}// 向前探查是否存在可以合并的span, 缓解内存碎片(外碎片)问题while (1){PAGE_ID prevId = span->_pageid - 1;auto ret = _idSpanMap.find(prevId);// 如果前面页号不存在就不合并了,防止访问到没有分配给我们的内存if (ret == _idSpanMap.end())break;// 前面相邻的页的span在使用就不合并了。这边为什么不可以使用_useCount判断一个span是否在使用中????// _useCount == 0,可能是两种情况:1. 就是内存块全部归还回来了 2. 这个span刚刚从Central Cache申请准备使用// 如果用_useCount判断,可能会涉及线程安全问题!!Span* prevSpan = ret->second;if (prevSpan->_isUse == true)break;// 合并出超过128页的span没办法管理,不合并了if ((prevSpan->_n + span->_n) > PAGESIZE - 1)break;// 开始合并span->_pageid = prevSpan->_pageid;span->_n += prevSpan->_n;// 将合并的span从桶中拿下来_spanlists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}// 向后合并while (1){PAGE_ID nextId = span->_pageid + span->_n;auto ret = _idSpanMap.find(nextId);if (ret == _idSpanMap.end())break;Span* nextSpan = ret->second;if (nextSpan->_isUse == true)break;if ((nextSpan->_n + span->_n) > PAGESIZE - 1)break;// 合并span->_n += nextSpan->_n;_spanlists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}// 将合并成功的span插入_spanlists[span->_n].PushFront(span);// 设置状态为未使用状态span->_isUse = false;// 加载首尾页号到_idSpanMap,方便以后合并内存块_idSpanMap[span->_pageid] = span;_idSpanMap[span->_pageid + span->_n - 1] = span;
}
3.2 清理项目中使用的malloc-->new
前面我们项目中使用很多new,我们作为自己的内存池是不允许去调用malloc中的方法的,因此我们需要对此进行一系列优化,前面我们实现了固定大小的对象内存池,此时就恰好可以发挥作用了!
#pragma once
#include "Comm.h"
#include "ObjectPool.h" // 引入固定大小的对象池子,替换malloc->new// 单列模式:饿汉模式
class PageCache
{
public:// 获取PageCache单列对象static PageCache* GetInstance(){return &_sInst;}Span* NewSpan(size_t k); // k 表示申请的page页数 // 获取从对象到span的映射Span* MapObjectToSpan(void* obj);// 释放空闲span回到Pagecache,并合并相邻的spanvoid ReleaseSpanToPageCache(Span* span);private:SpanList _spanlists[PAGESIZE]; // 管理存放PageCache的Span哈希桶结构// 防止拷贝/赋值重载/构造PageCache() {}PageCache(const PageCache&) = delete;PageCache operator=(const PageCache&) = delete;// 单列饿汉static PageCache _sInst;std::unordered_map<PAGE_ID, Span*> _idSpanMap; // 映射页号和其对应的Span,这种做法可以使用页号来查找Span// 固定大小对象池ObjectPool<Span> _spanPool;
public:std::mutex _pageMtx;
};
// 通过TLS每个线程无锁的获取自己的专属的ThreadCache对象
if (pTLS_ThreadCache == nullptr)
{static ObjectPool<ThreadCache> TcPool;// pTLS_ThreadCache = new ThreadCache;pTLS_ThreadCache = TcPool.New();
}
3.3 优化项目的ConcurrentFree接口
在前面项目调用ConcurrentFree
接口回收内存块时,我们常常需要传入回收内存块的指定大小,但是我们平时使用delete并没有传入大小,我们需要优化这个问题!
我们这边的做法是在Span类中添加一个记录大小的成员变量!
// 管理多个连续页大块内存跨度Span结构
class Span
{
public:PAGE_ID _pageid = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _prev = nullptr; // 双向链表结构Span* _next = nullptr;size_t _useCount = 0; // 小块内存分配给ThreadCache的使用计数void* _freelist = nullptr; // 管理小块内存的自由链表size_t _objsize = 0; // 记录内存块大小bool _isUse = false;
};
static void ConcurrentFree(void* ptr)
{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objsize;// 如果释放的内存块大小超过256KBif (size > MAXBYTESSIZE){PageCache::GetInstance()->_pageMtx.lock(); // 访问PageCache临界资源需要上锁PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock(); // 访问PageCache临界资源需要上锁}else{assert(pTLS_ThreadCache);pTLS_ThreadCache->Deallocate(ptr, size);}
}
Span* PageCache::NewSpan(size_t k)
{// 申请的页数k > 0 assert(k > 0);// 如果申请的页数 > 128页,直接向堆要if (k > PAGESIZE - 1){void* ptr = SystemCallGetMemory(k << PAGE_SHIFT);// Span* span = new Span;Span* span = _spanPool.New();span->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;span->_objsize = k << PAGE_SHIFT;// 做好span和页号直接的映射_idSpanMap[span->_pageid] = span;// 返回return span;}// 先查看第k个桶是否存在有span可供使用if (!_spanlists[k].Empty()){return _spanlists[k].PopFront(); }// 查看后续其他桶是否存在可以划分的spanfor (int i = k + 1; i < PAGESIZE; ++i){if (!_spanlists[i].Empty()){Span* nSpan = _spanlists[i].PopFront();// Span* kSpan = new Span;Span* kSpan = _spanPool.New();// 划分k页给kSpankSpan->_pageid = nSpan->_pageid;kSpan->_n = k;kSpan->_objsize += k << PAGE_SHIFT;// 将剩下的n-k页挂载到指定的Hash桶下标映射nSpan->_pageid += k;nSpan->_n -= k;nSpan->_objsize -= k << PAGE_SHIFT;_spanlists[nSpan->_n].PushFront(nSpan);// 存储nSpan的首位页号跟nSpan的映射,方便Page Cache回收内存时进行的合并查找_idSpanMap[nSpan->_pageid] = nSpan;// _pageid:1000 _n:5 1000 - 1004,因此需要-1 _idSpanMap[nSpan->_pageid + nSpan->_n - 1] = nSpan;// 建立id和span的映射,方便Central Cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageid + i] = kSpan;}return kSpan;}}// 走到这边说明后续没有大页的span划分// 需要向堆直接申请一个128页的span// Span* Bigspan = new Span;Span* Bigspan = _spanPool.New();void* ptr = SystemCallGetMemory((PAGESIZE - 1) << PAGE_SHIFT); // 128 * 8KBBigspan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;Bigspan->_n = PAGESIZE - 1;Bigspan->_objsize = (PAGESIZE - 1) << PAGE_SHIFT;_spanlists[Bigspan->_n].PushFront(Bigspan);return NewSpan(k); // 进行一个回调复用代码
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 如果释放的页数 > 128页,直接还给堆if (span->_n > PAGESIZE - 1){void* ptr = (void*)(span->_pageid << PAGE_SHIFT);SystemCallFreeMemory(ptr);// delete span;_spanPool.Delete(span);return;}// 向前探查是否存在可以合并的span, 缓解内存碎片(外碎片)问题while (1){PAGE_ID prevId = span->_pageid - 1;auto ret = _idSpanMap.find(prevId);// 如果前面页号不存在就不合并了,防止访问到没有分配给我们的内存if (ret == _idSpanMap.end())break;// 前面相邻的页的span在使用就不合并了。这边为什么不可以使用_useCount判断一个span是否在使用中????// _useCount == 0,可能是两种情况:1. 就是内存块全部归还回来了 2. 这个span刚刚从Central Cache申请准备使用// 如果用_useCount判断,可能会涉及线程安全问题!!Span* prevSpan = ret->second;if (prevSpan->_isUse == true)break;// 合并出超过128页的span没办法管理,不合并了if ((prevSpan->_n + span->_n) > PAGESIZE - 1)break;// 开始合并span->_pageid = prevSpan->_pageid;span->_n += prevSpan->_n;span->_objsize += prevSpan->_objsize;// 将合并的span从桶中拿下来_spanlists[prevSpan->_n].Erase(prevSpan);// delete prevSpan;_spanPool.Delete(prevSpan);}// 向后合并while (1){PAGE_ID nextId = span->_pageid + span->_n;auto ret = _idSpanMap.find(nextId);if (ret == _idSpanMap.end())break;Span* nextSpan = ret->second;if (nextSpan->_isUse == true)break;if ((nextSpan->_n + span->_n) > PAGESIZE - 1)break;// 合并span->_n += nextSpan->_n;span->_objsize += nextSpan->_objsize;_spanlists[nextSpan->_n].Erase(nextSpan);// delete nextSpan;_spanPool.Delete(nextSpan);}// 将合并成功的span插入_spanlists[span->_n].PushFront(span);// 设置状态为未使用状态span->_isUse = false;// 加载首尾页号到_idSpanMap,方便以后合并内存块_idSpanMap[span->_pageid] = span;_idSpanMap[span->_pageid + span->_n - 1] = span;
}
3.4 项目运行调试+测试
#include "ConcurrentAlloc.h"
#include <atomic>
#include <ctime>// ntimes 一轮申请和释放内存的次数
// nworks 线程数量
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = { 0 };std::atomic<size_t> free_costtime = { 0 };for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%zu个线程并发执行%zu轮次,每轮次malloc %zu次: 花费:%zu ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%zu个线程并发执行%zu轮次,每轮次free %zu次: 花费:%zu ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%zu个线程并发malloc&free %zu次,总计花费:%zu ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%zu个线程并发执行%zu轮次,每轮次concurrent alloc %zu次: 花费:%zu ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%zu个线程并发执行%zu轮次,每轮次concurrent dealloc %zu次: 花费:%zu ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%zu个线程并发concurrent alloc&dealloc %zu次,总计花费:%zu ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}int main()
{size_t n = 1000;std::cout << "==========================================================" << std::endl;BenchmarkConcurrentMalloc(n, 4, 10);std::cout << std::endl << std::endl;BenchmarkMalloc(n, 4, 10);std::cout << "==========================================================" << std::endl;return 0;
}
项目错误
1. new(obj)T 定位new错误
经过调试我发现
_memory
指向的内存为nullptr
(可能是使用完了),但此时的_remainbytes
并没去申请内存,我考虑到该项目可能是在多线程并发访问下去申请ThreadCache
所导致的问题,pTLS_ThreadCache
是在申请之后独属于某一个线程,在申请之前所有线程会访问临界资源TcPool.New()
,因此我决定在New内部加上锁。
// pTLS_ThreadCache = new ThreadCache;
static ObjectPool<ThreadCache> TcPool;
pTLS_ThreadCache = TcPool.New();
// 定长内存池
template<class T>
class ObjectPool
{
public:T* New(){std::unique_lock<std::mutex> lock(_lock);T* obj = nullptr;if (_freelist != nullptr) // 如果自由链表存在着内存块,优先使用{// 头删void* next = *((void**)_freelist);obj = (T*)_freelist;_freelist = next;}else{// 向_memory申请内存if (_remainbytes < sizeof(T)) // 如果当前内存已经不能申请一个对象所需要的大小T则去底层申请内存块{_remainbytes = 128 * 1024; /* 一次申请 128 KB */_memory = (char*)SystemCallGetMemory(_remainbytes);if (_memory == nullptr){throw std::bad_alloc();}}// Debugif (_memory == nullptr){int x = 0;}obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); // 最少分配一个指针大小的内存块_memory += objSize;_remainbytes -= objSize;}// 使用定位new,初始化new(obj)T;return obj;}void Delete(T* obj){// 手动调用析构函数obj->~T();// 头插(两种情况都适用)// 1. _freelist -> nullptr// obj// 2. _freelist -> prevobj// objNextobj(obj) = _freelist; // *(void**) 表示一个指针的大小,_WIN32 / _WIN64 分别是 4位/8位,这种写法适用不同机器位数!_freelist = obj;}~ObjectPool() {}
private:char* _memory = nullptr; // 指向大块内存的指针size_t _remainbytes = 0; // 用于记录_memory指向的大块内存的剩余量void* _freelist = nullptr; // 自由链表,用于管理归还的内存块std::mutex _lock; // 锁,用于多线程访问申请内存时防止同时占用一块内存
};
2. PopRange访问异常
运行之后发现,end访问到了nullptr。经过断点调试发现start实践的内存块链个数和我们要取下的个数不一致
发现截取start链接的内存链时发现个数异常的多,当时初步认为应该是访问到了不属于start链的其他内存块,因此继续向上查找start链的来源。一般一个一个取和一个一个拿是基本不会出错的,问题可能在放入一条链,因此我去查了
void PushRange(void* start, void* end, size_t n)
函数
发现内存块链的实际数量和传入给
_size
加上的n
不一致,是不是n
传入错误了?经过搜查发现actualNum是CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size)
函数中得到的,我们向内查找打上测试断点。
发现截走切断的内存链异常的长,查看对应切断内存链的代码也没有错误,只能去查找span->_freelist
的来源,进一步进入GetOneSpan
查看。
// 3. 从当前Hash桶下标获取一个非空的Span
Span* span = GetOneSpan(_spanlists[index], size);
assert(span);
assert(span->_freelist);
最后发现我们在start走到end后,tail没有将头部指向end置为nullptr,导致链接这end后面的内存块导致获取了没有分配的内存块!
3.5 基数树的项目优化
经过上面项目的错误探查我们已经申请和释放的问题,下面使用测试对比代码进行项目的性能测试
我们发现在Debug下项目的性能对比malloc申请内存的提升不是太大,反而释放内存的性能相较于free还慢,总体来看项目和malloc的性能大差不差,因此我们需要分析我们实现的这个项目性能的瓶颈在什么地方,是否能进行一个优化。
我们先做一个项目性能的瓶颈分析:
我们发现使用MapObjectToSpan时的映射占用了很多的时间,这是锁导致的,那么我们该怎么去进行一个优化呢?这边参考官方tcmalloc实现的是通过基数树去实现的一个优化!
基数树实际上就是一个分层的哈希表,根据所分层数不同可分成单层基数树、二层基数树、三层基数树等。
-
单层基数树
单层基数树实际采用的就是直接定址法,每一个页号对应span的地址就存储数组中以该页号为下标的位置上。
最坏的情况下我们需要建立所有页号与其span之间的映射关系,因此这个数组中元素个数应该与页号的数目相同,数组中每个位置存储的就是对应span的指针。
//单层基数树
template <int BITS>
class TCMalloc_PageMap1 {
public:typedef uintptr_t Number;explicit TCMalloc_PageMap1() {size_t size = sizeof(void*) << BITS; //需要开辟数组的大小size_t alignSize = SizeClass::_GetAlignNum(size, 1 << PAGE_SHIFT); //按页对齐后的大小array_ = (void**)SystemCallGetMemory(alignSize); //向堆申请空间memset(array_, 0, size); //对申请到的内存进行清理}void* get(Number k) const {if ((k >> BITS) > 0) {//k的范围不在[0, 2^BITS-1]return NULL;}return array_[k]; //返回该页号对应的span}void set(Number k, void* v) {assert((k >> BITS) == 0); //k的范围必须在[0, 2^BITS-1]array_[k] = v; //建立映射}
private:void** array_; //存储映射关系的数组static const int LENGTH = 1 << BITS; //页的数目
};
代码中的非类型模板参数BITS表示存储页号最多需要比特位的个数。在32位下我们传入的是32-PAGE_SHIFT,在64位下传入的是64-PAGE_SHIFT。而其中的LENGTH成员代表的就是页号的数目,即2^BITS。
比如32位平台下,以一页大小为8K为例,此时页的数目为2^32 / 2^13 = 2^19,因此存储页号最多需要19个比特位,此时传入非类型模板参数的值就是32-13=19。由于32位平台下指针的大小是4字节,因此该数组的大小就是2 ^19 * 4 = 2 ^ 21 = 2M,内存消耗不大,是可行的。但是如果是在64位平台下,此时数组的大小是 2^51 * 8 = 2 ^ 54 = 2^24 G,这显然是不可行的,实际上对于64位的平台,我们需要使用三层基数树。
-
二层基数树
这里还是以32位平台下,一页的大小为8K为例来说明,此时存储页号最多需要19个比特位。而二层基数树实际上就是把这19个比特位分为两次进行映射。
比如用前5个比特位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的比特位在基数树的第二层进行映射,映射后最终得到该页号对应的span指针。
二层基数树中,第一层的数组占用2^5 * 4 = 2^7 Byte空间,第二层的数组最多占用 2^5 * 2^14 *4 = 2 ^ 21 = 2 M。二层基数树相比一层基数树的好处就是,一层基数树必须一开始就把2M的数组开辟出来,而二层基数树一开始时只需要将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组就行了。
//二层基数树
template <int BITS>
class TCMalloc_PageMap2 {
private:static const int ROOT_BITS = 5; //第一层对应页号的前5个比特位static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层存储元素的个数static const int LEAF_BITS = BITS - ROOT_BITS; //第二层对应页号的其余比特位static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层存储元素的个数//第一层数组中存储的元素类型struct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; //第一层数组
public:typedef uintptr_t Number;explicit TCMalloc_PageMap2() {memset(root_, 0, sizeof(root_)); //将第一层的空间进行清理PreallocateMoreMemory(); //直接将第二层全部开辟}void* get(Number k) const {const Number i1 = k >> LEAF_BITS; //第一层对应的下标const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标if ((k >> BITS) > 0 || root_[i1] == NULL) {//页号值不在范围或没有建立过映射return NULL;}return root_[i1]->values[i2]; //返回该页号对应span的指针}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS; //第一层对应的下标const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标assert(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v; //建立该页号与对应span的映射}//确保映射[start,start_n-1]页号的空间是开辟好了的bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;if (i1 >= ROOT_LENGTH) //页号超出范围return false;if (root_[i1] == NULL) {//第一层i1下标指向的空间未开辟//开辟对应空间static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查}return true;}void PreallocateMoreMemory() {Ensure(0, 1 << BITS); //将第二层的空间全部开辟好}
};
因此在第二层基数树中有一个Ensure函数,当需要建立某一页号与其span之间的映射关系时,需要先调用该Ensure函数确保用于映射该页号的空间是开辟了的,如果没有开辟则会立即开辟。
而在32位平台下,就算第二层基数树第二层的数组全部开辟出来也就消耗了2M的空间,内存消耗也不算太多,因此我们可以在构造第二层基数树的时候就把第二层数组全部开辟出来。
-
三层基数层
此时只有当要建立某一页号的映射关系时,再开辟对应的数组空间,而没有建立映射的页号就可以不用开辟其对应的数据空间 ,此时就能在一定程度上节省内存空间。
//三层基数树
template <int BITS>
class TCMalloc_PageMap3 {
private:static const int INTERIOR_BITS = (BITS + 2) / 3; //第一、二层对应页号的比特位个数static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二层存储元素的个数static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三层对应页号的比特位个数static const int LEAF_LENGTH = 1 << LEAF_BITS; //第三层存储元素的个数struct Node {Node* ptrs[INTERIOR_LENGTH];};struct Leaf {void* values[LEAF_LENGTH];};Node* NewNode() {static ObjectPool<Node> nodePool;Node* result = nodePool.New();if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}Node* root_;
public:typedef uintptr_t Number;explicit TCMalloc_PageMap3() {root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一层对应的下标const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标const Number i3 = k & (LEAF_LENGTH - 1); //第三层对应的下标//页号超出范围,或映射该页号的空间未开辟if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回该页号对应span的指针}void set(Number k, void* v) {assert(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一层对应的下标const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标const Number i3 = k & (LEAF_LENGTH - 1); //第三层对应的下标Ensure(k, 1); //确保映射第k页页号的空间是开辟好了的reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立该页号与对应span的映射}//确保映射[start,start+n-1]页号的空间是开辟好了的bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS); //第一层对应的下标const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下标值超出范围return false;if (root_->ptrs[i1] == NULL) { //第一层i1下标指向的空间未开辟//开辟对应空间Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}if (root_->ptrs[i1]->ptrs[i2] == NULL) { //第二层i2下标指向的空间未开辟//开辟对应空间static ObjectPool<Leaf> leafPool;Leaf* leaf = leafPool.New();if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查}return true;}void PreallocateMoreMemory(){}
};
-
为什么基数树不需要加锁
当某个线程在读取映射关系时,可能另一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的。
因为C++中的map底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当我们在插入数据时其底层的结构都有可能会发生变化。比如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此我们在读取映射关系的时候是需要加锁的。
而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在
page cache
进行的。也就是说,读取映射时读取的都是对应span
的_useCount
不等于0的页【寻找,归还】,而建立映射时建立的都是对应span
的_useCount
等于0的页,所以说我们不会同时对同一个页进行读取映射和建立映射的操作。
3.6 项目缺陷
-
没有实现在Linux下的代码
-
不支持64位操作系统
3.7 对比malloc
-
申请固定大小的内存
-
申请不同内存大小的内存
3.8 打包为静态库
3.9 项目链接:
EfficMultiCoreMemoryPool