【项目笔记】高并发内存池项目剖析(二)
【项目笔记】高并发内存池项目剖析(二)
🔥个人主页:大白的编程日记
🔥专栏:项目笔记
文章目录
- 【项目笔记】高并发内存池项目剖析(二)
- 前言
- 一.central cache
- 1.1 central cache整体框架
- 1.2 centralcache桶的结构
- 1.3 spanlist类
- 1.4 单例模式
- 1.5 慢反馈调节算法
- 1.6 NumMoveSize
- 1.7 GetOneSpan
- 1.8 FetchRangeObj
- 1.9 FetchFromCentralCache
- 二.Page cache
- 2.1 Page cache整体框架
- 2.2 GetOneSpan
- 2.3 NewSpan
- 2.4 锁的问题
- 2.5 申请内存过程联调
- 后言
前言
哈喽,各位小伙伴大家好!上期我们讲了高并发内存池的整体框架和Threadcache 今天我们来讲一下Centralcache和Pagecache 今天我们讲的是Linux基本指令及其分析(二)。话不多说,我们进入正题!向大厂冲锋!
一.central cache
1.1 central cache整体框架
当Threadcache的内存不足时候就会想centralcache申请内存
central cache 是一个哈希桶结构,他的哈希桶的映射关系跟 thread cache 是一样的。不同的是他的每个哈希桶位置挂是SpanList 链表结构,不过每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个大小内存块对挂在 span 的自由链表中。
哈希桶的每个位置下面挂的都是 Span 对象连接的链表,不同的是:
- 8Byte 映射位置下面挂的 span 中的页被切成 8Byte 大小的对象的自由链表。
- 256KB 位置的 span 中的页被切成 256KB 大小对象的自由链表。
申请内存:
-
当 thread cache 中没有内存时,就会批量向 central cache 申请一些内存对象,这里的批量获取对象的数量使用了类似网络协议拥塞控制的慢开始算法;central cache 也有一个哈希映射的 spanlist,spanlist 中挂着 span,从 span 中取出对象给 thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
-
central cache 映射的 spanlist 中所有 span 的都没有内存以后,则需要向 page cache 申请一个新的 span 对象,拿到 span 以后将 span 管理的内存按大小切好作为自由链表链接到一起。然后从 span 中取对象给 thread cache。
-
central cache 的中挂的 span 中 use_count 记录分配了多少个对象出去,分配一个对象给 thread cache,就 ++use_count
释放内存:
- 当 thread cache 过长或者线程销毁,则会将内存释放回 central cache 中的,释放回来时 --use_count。当 use_count 减到 0 时则表示所有对象都回到了 span,则将 span 释放回 page cache,page cache 中会对前后相邻的空闲页进行合并。
1.2 centralcache桶的结构
当一个span的小内存申请完后 才会向pagecache申请下一个span
但是可能申请下一个span是 前一个span的内存又归还了 所以哪一个span有内存是不确定的
centralcache桶是一个span的双向链表 因为当span的自由链表的小块内存时需要删除归还给pagecache 而双向链表删除效率高!
1.3 spanlist类
这里我们就可以创建一个spanlist的类管理span的双向链表
主要负责插入一段span区间 删除一段span区间 以及删除 插入
注意删除不需要释放span 因为他还有归还给pagecache 从链表中拿掉即可!
同时使用mutex互斥锁作为桶锁!
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head;}void PushFront(Span* cur){Insert(_head->_next, cur);}Span* PopFront(){Span* ret = _head->_next;Pop(ret);return ret;}//pos之前插入void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;newSpan->_next = pos;newSpan->_prev = prev;prev->_next = newSpan;pos->_prev = newSpan;}//删除posvoid Pop(Span* pos){assert(pos);assert(pos != _head);/* pos->_prev->_next = pos->_next;pos->_next->_prev = pos->_prev;*//*pos->_next = pos->_prev = nullptr;*///不用删除节点因为还要回收给上一层合并减少内存碎片Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}
private:Span* _head;
public:std::mutex _mutex;//桶锁
};
1.4 单例模式
我们的线程没有内存时都需要去centralcaceh去获取 并且他们获取的都是一个centralcache 这种场景我们就可以把
centralcaceh设计为单例模式 单例模式有懒汉和饿汉 这里我们用的是懒汉模式
- 主要就是=delete删除类的拷贝和构造函数 不让别人创建对象 定义一个私有cnetralcaceh 提供公共函数全局访问点获取centralcaceh
1.5 慢反馈调节算法
- 现在我们Threadcache没有了就要像centrralcache申请 那每次申请我们就只给一个吗?
这样就会高频访问centralcache 多线程访问同一个桶就需要加锁 效率就比较低?所以我们可以给每次可以给多一点
就像vector的2倍扩容一样。所以就有了慢反馈调节算法!
每次申请小对象我们一次给的上限就可以给多一点
大对象我们一次给的上限我们就给少一点 否则浪费太多内存
刚开始不要给那么多 因为可能用不完 而是随着要次数增加
每次给的对象个数也增加 直到上限为止就不增加了!
1.6 NumMoveSize
控制上限让256kb/size size对象越大 分配个数越小 反之越大
然后上限至少为2 至多为512 否则上线太大浪费内存也多
static const size_t MAX_BYTES = 256 * 1024;
//计算依次申请的上下限内存块
static size_t NumMoveSize(size_t size)
{assert(size>0);size_t n = MAX_BYTES/size;if (n < 2){n = 2;}if (n > 512){n = 512;}return n;}
1.7 GetOneSpan
获取centralcache中对应桶第一个不为空的span
涉及pagecache 后面再去实现
1.8 FetchRangeObj
先加上桶锁 然后找到对应桶中第一个部位空的span 然后让end从对应span的第一个自由链表节点开始 向后走n-1步
如果不足n个节点有多少给多少 下一个节点为空就返回 最后让自由链表的头结点链接end->next即可 end->next=nullptr
返回获取的节点个数 start和end分别指向切割链表的头和尾
//从中心缓存获取一定数量的对象给threadcache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);//向centraul申请要加锁_spanList[index]._mutex.lock();//找到central第一个不为空的spanSpan* span = CentralCache::GetOneSpan(_spanList[index], size);assert(span);assert(span->_freeList);start = end = span->_freeList;size_t k = 1, i = 0;//如果后面的链表不足有多少给多少//让end向后走到最后一个节点的位置while (i < batchNum - 1 && Nextobj(end)){end = Nextobj(end);i++;k++;}//让原来的自由链表指向end的下一个节点//end的下一个节点指向空span->_freeList = Nextobj(end);Nextobj(end) = nullptr;span->_useCount += k;_spanList[index]._mutex.unlock();//解锁return k;
}
// 将一定数量的对象释放到central对应的span中
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{//计算桶下标size_t index = SizeClass::Index(size);//对应桶加锁_spanList[index]._mutex.lock();while (start){void* next = Nextobj(start);//算出内存块所处的spanSpan * span = PageCache::GetInstance()->MapobjectToSpan(start);//头插到对应span的自由链表 申请出去的引用计数--Nextobj(start) = span->_freeList;span->_freeList = start;span->_useCount--;//引用计数为0 说明全部归还 此时就可以上page归还//page归还时可以进行前后页合并减少内存碎片if (span->_useCount == 0){//弹出对应的span 自由链表置空_spanList[index].Pop(span);span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;//归还时 先把桶锁结掉_spanList[index]._mutex.unlock();//向归还page过程加锁PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanList[index]._mutex.lock();//归还完后把桶锁加上}start = next;}_spanList[index]._mutex.unlock();//归还桶锁解掉
}
1.9 FetchFromCentralCache
慢增长计算 申请个数 然后FetchRangeObj获取能获取的节点个数和链表 将链表的头结点返回给Threadcache
将剩余节点链入后对应Threadcache的桶中
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// 慢开始反馈调节算法// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完// 2、如果你一直要size大小内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大size_t num = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));//cout << "向中心缓存申请:" << num << "块内存块" << endl;if (num ==_freeLists[index].MaxSize()){_freeLists[index].MaxSize()+=1;}void* start=nullptr, *end=nullptr;//实际能获取的内存块个数size_t n = CentralCache::GetInstance()->FetchRangeObj(start, end, num, size);//cout << "实际获取内存块:" << n << "块内存块" << endl;assert(n >= 1);//第一个内存块返回 剩下挂载到自由链表里面if (n == 1){assert(start == end);return start;}else {//cout<<"挂载前:"<<_freeLists[index].Size() << endl;_freeLists[index].PushRange(Nextobj(start), end,n-1);/* cout << "挂载前:" << _freeLists[index].Size() << endl;*/return start;}
}
二.Page cache
2.1 Page cache整体框架
申请内存:
-
当
central cache
向page cache
申请内存时,page cache
先检查对应位置有没有span
,如果没有则向更大页寻找一个span
,如果找到则分裂成两个。比如:申请的是 4 页page
,4 页page
后面没有挂span
,则向后面寻找更大的span
,假设在 10 页page
位置找到一个span
,则将 10 页page span
分裂为一个 4 页page span
和一个 6 页page span
。 -
如果找到
_spanList[128]
都没有合适的span
,则向系统使用mmap、brk
或者是VirtualAlloc
等方式申请 128 页 page span 挂在自由链表中,再重复 1 中的过程。 -
需要注意的是
central cache
和page cache
的核心结构都是spanlist
的哈希桶,但是他们是有本质区别的,central cache
中哈希桶,是按照thread cache
一样的大小对齐关系映射的,他的spanlist
中挂的span
中的内存都被映射关系切好链接成小块内存的自由链表。而page cache
中的spanlist
则是按下标桶号映射的,也就是说span 1
号桶中挂的span
都是 1 页内存。
释放内存:
- 如果
central cache
释放回一个span
,则依次寻找span
的前后page id
的没有在使用的空闲span
,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span
,减少内存碎片。
2.2 GetOneSpan
//获取一个不为空的Span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{//先遍历central的哈希桶 不为空直接返回Span* pos = list.Begin();while (pos != list.End()){if (pos->_freeList){return pos;}else{pos = pos->_next;}}//只能向Page申请list._mutex.unlock();//先解锁central的锁 让其他线程可以归还内存块 减少消耗//申请page内存要加锁 申请之后在释放PageCache::GetInstance()->_pageMtx.lock();Span* big = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size));big->_isUse = true;big->_size = byte_size;PageCache::GetInstance()->_pageMtx.unlock();//切分的过程中不需要加锁 因为其他线程现在也拿不到内存块char* start = (char*)(big->_pageId<< PAGE_SHIFT);size_t size = big->_n<< PAGE_SHIFT;char* end = start+size;big->_freeList = start;int i = 1;void* prev = big->_freeList;start += byte_size;//切割span形成自由链表while (start<end){i++;/*assert(Nextobj(prev));*/Nextobj(prev) = start;prev = Nextobj(prev);start += byte_size;}Nextobj(prev) = nullptr;list._mutex.lock();//插入central之前要加锁 因为其他进程可能正在归还插入list.PushFront(big);return big;
}
2.3 NewSpan
//获取一个K页的span
Span* PageCache::PageCache::NewSpan(size_t k)
{//大于128页直接向系统的堆区申请if (k > NPAGES - 1){void* ptr = SystemAlloc(k);/*Span* span = new Span;*/Span* span =_spanPool.New();span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;span->_size = k;//PageCache::GetInstance()->_idSpanMap.set(span->_pageId, span);if (span->_pageId >= 5000){int a = 0;}return span;}else{assert(k > 0 && k < NPAGES);//如果k页的桶不为空 弹出第一个spanif (!_spanList[k].Empty()){Span* kSpan=_spanList[k].PopFront();for (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap.set(kSpan->_pageId + i,kSpan);}if (kSpan->_pageId >= 100000){int a = 0;}return kSpan;}//否则向后查看更大的页的桶for (int i = k + 1; i < NPAGES; i++){//找到更大的页的桶if (!_spanList[i].Empty()){//获取第一个spanSpan* kSpan = _spanPool.New();Span* nSpan = _spanList[i].PopFront();if (nSpan->_pageId >= 10000){int a = 0;}//将span分割为k的页和n-k的页kSpan->_pageId = nSpan->_pageId;nSpan->_pageId += k;nSpan->_n -= k;kSpan->_n = k;_idSpanMap.set(nSpan->_pageId, nSpan);_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);for (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap.set(kSpan->_pageId + i, kSpan);}//将n-k的页插入到n-k的桶中_spanList[nSpan->_n].PushFront(nSpan);if (kSpan->_pageId >= 100000){int a = 0;}return kSpan;}}//否则只能向系统申请128页的内存块Span* tmp = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1);tmp->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;tmp->_n = NPAGES - 1;//将内存块插入128页的桶中//再次复用代码将其分割_spanList[tmp->_n].PushFront(tmp);/* for (auto& x : _idSpanMap){if (x.second->_pageId >=5000){int a = 0;}}*/for (auto& x : _spanList){auto it = x.Begin();while (it != x.End()){if (it->_pageId >= 5000){int a = 0;}it=it->_next;}}return NewSpan(k);}
}
2.4 锁的问题
因为我们是复用代码所以不能在里面NewSpan里面加锁 否则就会死锁 我们可以在centralcache访问pagecache前加锁即可
- 如果在central向page申请内存的过程中 从在申请到内存切分没挂载到centaral的过程中central是否要加锁?
- 不需要 因为此时其他进程拿不到内存块 可是此时还没挂载 其他进程进来申请页也找不到内存块啊!
- 但是有可能有thread的线程要归还内存到central的桶里面 所以我们不加锁 减少无畏消耗
- 所以central要去page申请内存之前先把桶锁解掉 直到的那个申请的内存切分好了
- 要挂载到central桶之前 此时需要枷锁 因为此时其他线程可能正在申请或归还访问这个桶
2.5 申请内存过程联调
现在我们就可以来测试一下申请内存的过程了
- 测试代码
void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);//申请1块 拿走1块 还剩0块 下一次一次申请2块void* p2 = ConcurrentAlloc(8);//申请2块 拿走1块 还剩1块 下一次一次申请3块void* p3 = ConcurrentAlloc(1);//没有申请 拿走1块 还剩0块 下一次一次申请3块void* p4 = ConcurrentAlloc(7);//去central申请 申请3块 拿走一块还剩2块void* p5 = ConcurrentAlloc(8);//没有申请 拿走1块 还剩1块 下一次申请3块cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;cout << p4 << endl;cout << p5 << endl;/*ConcurrentFree(p1,6);ConcurrentFree(p2,8);ConcurrentFree(p3,1);ConcurrentFree(p4,7);ConcurrentFree(p5,8);*/
}
这里可以发现第一次申请好的地址和可以桶页号进行转化
这里申请好的地址的前1页的大小会被切分呢为自由链表 观察内存发现也没有问题
这里我们申请5次内存 根据日志的输出我们发现也没有问题 同时每次申请的小块内存都是被1页切分的
所以他们的地址是连续的 都是8递增的!也没有问题!
void TestConcurrentAlloc2()
{for (size_t i = 0; i < 1024; i++){void* p1 = ConcurrentAlloc(6);cout << p1 << endl;}//申请1024次每次拿到8字节大小 第一次申请1页的内存//1025次申请内存用完 此时就会去page在去申请内存void* p2 = ConcurrentAlloc(8);cout << p2 << endl;
}
可以发现此时对应的哈希桶里面还有一个span 也就是我们第一次申请的span 但是此时他的自由链表已经为空了
说明他的1024个8字节的内存已经被申请完了 所以此时他又会去pagecache申请内存 这次就会把上次分解后的127页的内存块
分解为126和1页的内存 继续切分为1024个内存块
后言
这就是Centralcache和Pagecache。大家自己好好消化!今天就分享到这! 感谢各位的耐心垂阅!咱们下期见!拜拜~