【项目日记】高并发内存池--page cache
目录
申请内存
释放内存
PageCache代码框架
windows下直接向堆申请页为单位的大块内存
申请内存
1.当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如,申请的是4页的span,4页后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
2.如果找到_spanList[128]都没有合适的span,则向系统使申请128页的page span挂在自由链表中,再重复1中的过程。
3.需要注意的是,central cache和page cache的核心结构都是spanlist的哈希桶,但是它们有本质的区别,central cache中的哈希桶,是按跟thread cache一样的大小对齐关系映射的,它的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache中的spanlist是按下标桶号映射的,也就是说第i号桶挂的span都是i页内存。
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
//大于128 page直接向堆申请
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);
return span;
}
//先检查第k个桶里面有没有span
if (!_spanlists[k].Empty())
{
Span* kSpan = _spanlists[k].PopFront();
//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (size_t i = 0; i < kSpan->_n; i++)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
//检查一下后面的桶里面有没有span,如果有可以把它进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
//把当前这个桶切分成一个k页的span和一个n-k页的span
//k页的span返回给central cache
//n-k页span挂到第n-k桶中去
if (!_spanlists[i].Empty())
{
Span* nSpan = _spanlists[i].PopFront();
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();
//在nSpan的头部切一个k页下来
//k页span返回
//nSpan再挂到对应映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanlists[nSpan->_n].PushFront(nSpan);
//存储nSpan的首尾页号跟nSpan映射,方便page cache回收内存
//进行合并查找
/*_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;*/
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
//建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (size_t i = 0; i < kSpan->_n; i++)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
}
//走到这个位置说明没有大页的span了
//这个时候就去找堆要一个128页的span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanlists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
central cache中用的是一把大锁,而不是桶锁,用桶锁可能导致频繁加锁解锁(在central cache中不是一次只访问某一个桶,可能这个桶里没有page,就会向更大的page桶去要),这样导致效率低。
释放内存
如果central cache释放回一个span,则依次寻找span的前后page id(页号)的span是否空闲,如果有就合并,合并出更大的页,如果可以合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//大于128page的直接还给堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
//对span前后的页,尝试进行合并,缓解内存碎片的问题
while (1)
{
PAGE_ID prev_Id = span->_pageId - 1;
//auto ret = _idSpanMap.find(prev_Id);
前面的页号没有,不合并了
//if (ret == _idSpanMap.end())
//{
// break;
//}
auto ret = (Span*)_idSpanMap.get(prev_Id);
if (ret == nullptr)
{
break;
}
//前面相邻页的span在使用,不合并了
Span* prevSpan = ret;
if (prevSpan->_isUse == true)
{
break;
}
// 合并出超过128页的span没办法管理,不合并了
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanlists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan; // 这里释放的是new出来的span对象,而不是对应的内存,
//内存是SystemAlloc申请出来的,应该由SystemFree释放
_spanPool.Delete(prevSpan);
}
//向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
/*auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}*/
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)
{
break;
}
Span* nextSpan = ret;
if (nextSpan->_isUse == true)
{
break;
}
if (span->_n + nextSpan->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanlists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
_spanlists[span->_n].PushFront(span);
span->_isUse = false;
/*_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;*/
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
另外,我们在查找地址所对应的span时,可以通过_idSpanMap中去找。这个_idSpanMap我们会在PageCache中申请和切分大块Span时会维护。
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
//std::unique_lock<std::mutex> lock(_pageMtx);
/*auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}*/
auto ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
}
PageCache代码框架
//page cache也用单例模式
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
//获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
//释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
//获取一个K页的span
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
PageCache() {}
PageCache(const PageCache&) = delete;
PageCache& operator=(const PageCache&) = delete;
SpanList _spanlists[NPAGES];
ObjectPool<Span> _spanPool; //定长内存池,用它来new出Span对象,而不是用系统接口new
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
static PageCache _sInst;
};
windows下直接向堆申请页为单位的大块内存
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage * (1 << PAGE_SHIFT), MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}