C++项目实战——高性能内存池(四)
目录
1、高并发内存池——pagecache
2、逐步实现pagecache
3、代码展示
common.h
centralcache.h
centralcache.cpp
pagecache.h
pagecache.cpp
之前的文章中介绍了threadcache和centralcache,那么接下来该详细介绍pagecache,并实现从pagecache中申请空间
1、高并发内存池——pagecache
pagecache的结构就与threadcache和centralcache略有不同了,虽然也是哈希桶的结构,但是这里并没有用到之前的对齐规则,而是使用页数来进行直接的桶号对应,下图就是对pagecache结构的形象描述
由于之前申请都是按照字节来进行申请的,那么当我们在pagecache申请空间时,申请到的都是以页为单位的大块内存,我们需要将大块内存切成对应字节大小的小对象,放到centralcache的span下的自由链表中以备使用,而当我们在pagecache中申请对象时,如果对应的哈希桶下没有对象,我们并不是直接去向系统进行申请,而是在pagecache中继续向下寻找页数更大的对象,如果找到了,就把这个更大的对象拆分成我们要的大小进行返回,并把拆分下的另一部分放到对应的哈希桶下。举例来说,如果我们要找4page的对象,我们发现4page的哈希桶下为空,于是我们继续向下查找,发现10page的哈希桶不为空,那我们就把10page切成4page和6page,将我们要的4page返回使用,而剩下的6page就放到对应的6page桶下挂起来。如果我们一直没有找到更大的对象时,就说明pagecache中无法申请到对象,那么我们就只能去系统中进行申请
2、逐步实现pagecache
pagecache中总共只有128个桶,可以放下从1page~128page大小的对象,每页为8KB,那么咱们可以用常量来表示,这样一来,如果我们后续要更改成更大的内存池也可以更加方便。
//pagecache中哈希桶的数量
static const size_t NPAGES = 129;
//pagecache中每个页的大小
static const size_t PAGE_SHIFT = 13;
pagecache和centralcache一样,是所有线程共享一个,因此我们还是使用单例模式,pagecache中的对象依然是使用Spanlist
class pagecache
{
public:static pagecache* GetInstance(){return &_sInst;}//获取一个k页的spanSpan* NewSpan(size_t k);
private:SpanList _span[NPAGES];pagecache(){}pagecache(const pagecache&) = delete;static pagecache _sInst;};
那么接下来我们就可以完成之前在centralcache里面写的向pagecache申请空间,这里申请到的空间是以页为单位的大块内存,我们需要计算出需要申请多少页的对象,申请到以后,要将这部分空间切割为小对象挂在span下面以供threadcache申请使用
所以我们先要写一个计算申请页数的方法
//计算需要向pagecache申请多少个页static size_t NumMovePage(size_t size){//计算需要申请的对象个数size_t num = NumMoveSize(size);//申请对象的大小为申请对象的个数*每个对象的字节数size_t npage = num * size;//申请对象的大小/每个页的大小就是页的个数npage = npage >> PAGE_SHIFT;//至少申请一个页if (npage < 1)npage = 1;return npage;}
再是向pagecache申请对象的方法,但是当我们将申请来的大块内存进行切分时需要对这部分内存进行定位,也就是要知道这部分内存的页号和字节数大小,因此我们需要再Span的结构体中加上一些内容用于表示这两个数据
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endifstruct Span
{void* _freelist = nullptr;//切好的小块对象的自由链表Span* _next = nullptr;//指向下一个结点Span* _prev = nullptr;//指向前一个结点PAGE_ID _pageid = 0;//页号size_t _n = 0;//页的数量};
准备工作做好以后就可以完成申请对象的方法了
//在centralcache中获取一个非空的span,如果有则返回该span,如果没有则向pagecache申请对象
Span* centralcache::GetOneSpan(SpanList& list, size_t size)
{//从当前的Spanlist的起始位置开始Span* it = list.Begin();//遍历寻找到非空的spanwhile (it != list.End()){if (it->_freelist != nullptr)return it;it = it->_next;}//走到这里就说明没有非空的span,那么就需要向pagecache进行申请空间Span* span = pagecache::GetInstance()->NewSpan(sizeclass::NumMovePage(size));//指针指向大块内存的起始位置和结束位置,便于后续切分小对象char* start = (char*)(span->_pageid << PAGE_SHIFT);size_t bytenum = span->_n << PAGE_SHIFT;char* end = start + bytenum;//从起始位置开始,切分size字节大小的小对象,挂到Span下的自由链表中span->_freelist = start;void* tail = start;start += size;while (start < end){Nextobj(tail) = start;tail = Nextobj(tail);start += size;}Nextobj(tail) = nullptr;//将申请好空间的span放到Spanlist里,以备threadcache下次申请时使用list.PushFront(span);return span;
}
那么下一步就是把上面代码中调用的pagecache中获取对象的方法进行实现,根据我们之前所说的,寻找对应的哈希桶拿取对象,如果该哈希桶下为空,则向下查找更大的哈希桶,直到找到以后取出对象并一分为二,如果找不到则只能去堆上申请一块大的空间,并先将这块空间放到pagecache中,通过再次调用获取对象的方法来获取我们需要的空间大小
//获取一个k页的span
Span* pagecache::NewSpan(size_t k)
{//如果pagecache上对应哈希桶不为空,直接获取spanif (!_spanlist[k].Empty()){return _spanlist[k].PopFront();}//如果pagecache上对应哈希桶为空,继续向下找到一个更大的不为空的哈希桶for (size_t i = k + 1; i < NPAGES; i++){//找到更大的哈希桶,取出哈希桶里的对象一分为二if (!_spanlist[i].Empty()){Span* nspan = _spanlist[i].PopFront();Span* kspan = new Span;//一份切成需要的k页大小kspan->_pageid = nspan->_pageid;kspan->_n = k;//一份为切完k页大小后剩下的nspan->_pageid -= k;nspan->_n -= k;//将剩下的这份插入对应的哈希桶中_spanlist[nspan->_n].PushFront(nspan);return kspan;}}//pagecache上没有可以申请的对象,只能去堆上申请Span* bigspan = new Span;//去堆上申请一个pagecache可以存放的最大的对象void* ptr = SystemAlloc(NPAGES - 1);//计算出这个大对象的页号bigspan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;//计算出这个大对象的页的数量bigspan->_n = NPAGES - 1;//把这个大对象放到pagecache里对应的哈希桶下以备使用_spanlist[bigspan->_n].PushFront(bigspan);//从堆上申请好对象以后就可以给centralcache分配空间了,再次调用该方法实现分配空间return NewSpan(k);
}
这里还有新增一个从堆上获取对象的方法
#ifdef _WIN32
#include <windows.h>
#else
// ...
#endif// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}
到这里,我们已经实现了整个申请对象的流程,但是还有一点需要注意的是,由于threadcache是每个进程独享的,所以在申请空间时可以直接申请而不需要考虑与其他进程的竞争关系,但是centralcache和pagecache在多个进程中都只有一份,所以不同的进程使用起来就会打架,那么对于这样的两个内存池就需要使用锁来实现互斥访问
根据centralcache和pagecache的特性,使用的锁也不一样,centralcache是只会去对应的哈希桶中进行申请对象,因此我们只需要使用桶锁即可,也就是说,不同的进程可以同时访问centralcache的不同哈希桶且互不影响,而pagecache则必须使用整个大锁,因为在pagecache中申请对象时是并不确定从哪个哈希桶中拿取对象的。
分析到这里,我们就需要将之前所写的代码中需要加锁的地方都加上,这里的锁其实也很形象,当家里有人时,为了不让其他人进来就要关门上锁,而家里没有人时,为了能让人进来,就不能关门上锁,这里就不具体展示上锁了,只在之前的代码上做了一些小调整,可以在代码展示部分留心仔细看一下
3、代码展示
这里由于threadcache.h threadcache.cpp ConcurrentAlloc.h这几个文件没有变化,就不在这里进行重复展示了,有需要的话可以参考上一篇文章
common.h
#include<iostream>
#include<assert.h>
#include<algorithm>
#include<mutex>
using namespace std;//内存池的大小
static const size_t MAX_BYTES = 256 * 1028;
//哈希桶的数量
static const size_t FREELIST = 208;//pagecache中哈希桶的数量
static const size_t NPAGES = 129;
//pagecache中每个页的大小
static const size_t PAGE_SHIFT = 13;#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif#ifdef _WIN32
#include <windows.h>
#else
// ...
#endif// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}//取当前结点的内容
static void*& Nextobj(void* obj)
{return *(void**)obj;
}
class FreeList
{
public://头插于自由链表void* push(void* obj){assert(obj);Nextobj(obj) = _freelist;_freelist = obj;return _freelist;}void* pushRange(void* start, void* end){Nextobj(end) = _freelist;_freelist = start;return _freelist;}//头删于自由链表void* pop(){assert(_freelist);_freelist = Nextobj(_freelist);return _freelist;}//判断自由链表是否为空bool empty(){return _freelist == nullptr;}size_t& Maxsize(){return _Maxsize;}
private:void* _freelist = nullptr;size_t _Maxsize = 1;
};class sizeclass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)// //辅助计算实际需要申请的字节数大小static size_t _round(size_t size, size_t alignNum){if (size % alignNum != 0)return (size / alignNum + 1) * alignNum;elsereturn size;}//计算实际需要申请的字节数大小static size_t Round(size_t size){assert(size > 0 && size <= 256 * 1024);if (size >= 1 && size <= 128)return _round(size, 8);else if (size > 128 && size <= 1024)return _round(size, 16);else if (size > 1024 && size <= 8 * 1024)return _round(size, 128);else if (size > 8 * 1024 && size <= 64 * 1024)return _round(size, 1024);elsereturn _round(size, 8 * 1024);}//辅助计算所在的哈希桶static size_t _index(size_t size, size_t alignNum){if (size % alignNum != 0)return size / alignNum;elsereturn size / alignNum - 1;}//计算在哪个哈希桶申请空间static size_t Index(size_t size){assert(size > 0 && size <= 256 * 1024);if (size >= 1 && size <= 128)return _index(size, 8);else if (size > 128 && size <= 1024)return _index(size, 16);else if (size > 1024 && size <= 8 * 1024)return _index(size, 128);else if (size > 8 * 1024 && size <= 64 * 1024)return _index(size, 1024);elsereturn _index(size, 8 * 1024);}//threadcache一次从centralcache获取多少个对象static size_t NumMoveSize(size_t size){// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 小对象一次批量上限低size_t num = MAX_BYTES / size;if (num > 512)num = 512;else if (num < 2)num = 2;return num;}//计算需要向pagecache申请多少个页static size_t NumMovePage(size_t size){//计算需要申请的对象个数size_t num = NumMoveSize(size);//申请对象的大小为申请对象的个数*每个对象的字节数size_t npage = num * size;//申请对象的大小/每个页的大小就是页的个数npage = npage >> PAGE_SHIFT;//至少申请一个页if (npage < 1)npage = 1;return npage;}
};struct Span
{void* _freelist = nullptr;//切好的小块对象的自由链表Span* _next = nullptr;//指向下一个结点Span* _prev = nullptr;//指向前一个结点PAGE_ID _pageid = 0;//页号size_t _n = 0;//页的数量};
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}//返回Spanlist的起始位置Span* Begin(){return _head->_next;}//返回Spanlist的尾位置Span* End(){return _head;}//在pos位置的结点前插入一个新结点void Insert(Span* pos,Span* newspan){assert(pos);assert(newspan);Span* prev = pos->_prev;newspan->_next = pos;newspan->_prev = prev;pos->_prev = newspan;prev->_next = newspan;}//头插新结点void PushFront(Span* newspan){assert(newspan);Insert(Begin(), newspan);}//删除pos位置的结点void Erase(Span* pos){assert(pos);Span* prev = pos->_prev;pos->_next->_prev = prev;prev->_next = pos->_next;}//取出一个头结点Span* PopFront(){Span* front = Begin();Erase(front);return front;}//判断链表是否为空bool Empty(){return _head->_next = _head;}mutex _mtx;//桶锁
private:Span* _head;
};
centralcache.h
#include"common.h"
class centralcache
{
public:static centralcache* GetInstance(){return &_sInst;}//在centralcache中获取一个非空的span,如果有则返回该span,如果没有则向pagecache申请对象Span* GetOneSpan(SpanList& list, size_t size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:centralcache(){}centralcache(const centralcache&) = delete;static centralcache _sInst;SpanList _spanlist[FREELIST];
};
centralcache.cpp
#include"common.h"
#include"centralcache.h"
#include"pagecache.h"
centralcache centralcache::_sInst;//在centralcache中获取一个非空的span,如果有则返回该span,如果没有则向pagecache申请对象
Span* centralcache::GetOneSpan(SpanList& list, size_t size)
{//从当前的Spanlist的起始位置开始Span* it = list.Begin();//遍历寻找到非空的spanwhile (it != list.End()){if (it->_freelist != nullptr)return it;it = it->_next;}//这里继续上锁会导致需要释放的空间无法完成释放,而去pagecache申请对象并切割的过程其他进程本就无法参与,因此开桶锁list._mtx.unlock();//要用pagecache了,上大锁pagecache::GetInstance()->_pagemtx.lock();//走到这里就说明没有非空的span,那么就需要向pagecache进行申请空间Span* span = pagecache::GetInstance()->NewSpan(sizeclass::NumMovePage(size));//用完pagecache了可以开锁pagecache::GetInstance()->_pagemtx.unlock();//指针指向大块内存的起始位置和结束位置,便于后续切分小对象char* start = (char*)(span->_pageid << PAGE_SHIFT);size_t bytenum = span->_n << PAGE_SHIFT;char* end = start + bytenum;//从起始位置开始,切分size字节大小的小对象,挂到Span下的自由链表中span->_freelist = start;void* tail = start;start += size;while (start < end){Nextobj(tail) = start;tail = Nextobj(tail);start += size;}Nextobj(tail) = nullptr;//上桶锁list._mtx.lock();//将申请好空间的span放到Spanlist里,以备threadcache下次申请时使用list.PushFront(span);return span;
}// 从中心缓存获取一定数量的对象给thread cache
size_t centralcache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{//计算出centralcache的哈希桶号size_t index = sizeclass::Index(size);//上桶锁_spanlist[index]._mtx.lock();//从对应的哈希桶中获取一个非空的spanSpan* span = GetOneSpan(_spanlist[index], size);//起始指针指向获取对象的起始位置start = span->_freelist;//先将尾指针也放在起始位置end = start;//此时已经实际拿取到了一个对象size_t actualNum = 1;size_t i = 1;//通过循环来拿取到计划拿取的数量,如果没有那么多对象,则有多少取多少while (i<batchNum && Nextobj(end)){i++;end = Nextobj(end);actualNum++;}//span下挂着的自由链表为拿取后剩下的对象span->_freelist = Nextobj(end);//开锁_spanlist[index]._mtx.unlock();//返回实际拿取的对象数量return actualNum;
}
pagecache.h
#include"common.h"
class pagecache
{
public:static pagecache* GetInstance(){return &_sInst;}//获取一个k页的spanSpan* NewSpan(size_t k);//大锁mutex _pagemtx;
private:SpanList _spanlist[NPAGES];pagecache(){}pagecache(const pagecache&) = delete;static pagecache _sInst;};
pagecache.cpp
#include"common.h"
#include"pagecache.h"
pagecache pagecache::_sInst;//获取一个k页的span
Span* pagecache::NewSpan(size_t k)
{//如果pagecache上对应哈希桶不为空,直接获取spanif (!_spanlist[k].Empty()){return _spanlist[k].PopFront();}//如果pagecache上对应哈希桶为空,继续向下找到一个更大的不为空的哈希桶for (size_t i = k + 1; i < NPAGES; i++){//找到更大的哈希桶,取出哈希桶里的对象一分为二if (!_spanlist[i].Empty()){Span* nspan = _spanlist[i].PopFront();Span* kspan = new Span;//一份切成需要的k页大小kspan->_pageid = nspan->_pageid;kspan->_n = k;//一份为切完k页大小后剩下的nspan->_pageid -= k;nspan->_n -= k;//将剩下的这份插入对应的哈希桶中_spanlist[nspan->_n].PushFront(nspan);return kspan;}}//这里先开锁,如果需要释放对象可以先使用_pagemtx.unlock();//pagecache上没有可以申请的对象,只能去堆上申请Span* bigspan = new Span;//去堆上申请一个pagecache可以存放的最大的对象void* ptr = SystemAlloc(NPAGES - 1);//计算出这个大对象的页号bigspan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;//计算出这个大对象的页的数量bigspan->_n = NPAGES - 1;//要回来了,关门上大锁_pagemtx.lock();//把这个大对象放到pagecache里对应的哈希桶下以备使用_spanlist[bigspan->_n].PushFront(bigspan);//从堆上申请好对象以后就可以给centralcache分配空间了,再次调用该方法实现分配空间return NewSpan(k);
}