高并发内存池 内存释放回收(6)
文章目录
- 前言
- 一、threadcache回收内存
- 二、centralcache回收内存
- 三、pagecache回收内存
- 总结
前言
Hello,我们继续乘胜追击
本篇难度较大,大家要好好学一下
一、threadcache回收内存
- 当某个线程申请的对象不用了,可以将其释放给 thread cache ,然后 thread cache 将该对象插入到对应哈希桶的自由链表当中即可。
- 但是随着线程不断的释放,对应自由链表的长度也会越来越长,这些内存堆积在一个 thread cache 中就是一种浪费,我们应该将这些内存还给 central cache ,这样一来,这些内存对其他线程来说也是可申请的,因此当 thread cache 某个桶当中的自由链表太长时我们可以进行一些处理。
- 如果 thread cache 某个桶当中自由链表的长度超过它一次批量向 central cache 申请的对象个数,那么此时我们就要把该自由链表当中的这些对象还给 central cache 。
// 释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找出对应的自由链表桶将对象插入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);// 当自由链表长度大于一次批量申请的对象个数时就开始还一段list给central cacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}
当自由链表的长度大于一次批量申请的对象时,我们具体的做法就是,从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给 central cache 中对应的 span 即可
// 释放对象时,链表过长时,回收内存回到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;// 从list取出批量个对象list.PopRange(start, end, list.MaxSize());// 将取出的对象还给central cache对应的spanCentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
从上述代码可以看出, FreeList类 需要支持用 Size函数 获取自由链表中对象的个数,还需要支持用 PopRange 函数从自由链表中取出指定个数的对象。因此我们需要给 FreeList类 增加一个对应的 PopRange函数 ,然后再增加一个 _size 成员变量,该成员变量用于记录当前自由链表中对象的个数,当我们向自由链表插入或删除对象时,都应该更新 _size 的值。
// 管理切分好的小对象的自由链表
class FreeList
{
public:// 将释放的对象头插到自由链表void Push(void* obj){assert(obj);//头插NextObj(obj) = _freeList;_freeList = obj;_size++;}// 从自由链表头部获取一个对象void* Pop(){assert(_freeList);//头删void* obj = _freeList;_freeList = NextObj(_freeList);_size--;return obj;}// 插入一段范围的对象到自由链表void PushRange(void* start, void* end, size_t n){assert(start);assert(end);//头插NextObj(end) = _freeList;_freeList = start; _size += n;}// 从自由链表获取一段范围的对象void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);//头删start = _freeList;end = start;for (size_t i = 0; i < n - 1;i++){end = NextObj(end);}_freeList = NextObj(end); // 自由链表指向end的下一个对象NextObj(end) = nullptr; // 取出的一段链表的表尾置空_size -= n;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}size_t Size(){return _size;}
private:void* _freeList = nullptr; // 自由链表size_t _maxSize = 1;size_t _size = 0;
};
对于 FreeList类 当中的 PushRange 成员函数,我们最好也像 PopRange 一样给它增加一个参数,表示插入对象的个数,不然我们这时还需要通过遍历统计插入对象的个数
因此之前在调用 PushRange 的地方就需要修改一下,而我们实际就在一个地方调用过 PushRange 函数,并且此时插入对象的个数也是很容易知道的。当时 thread cache 从 central cache 获取了 actualNum 个对象,将其中的一个返回给了申请对象的线程,剩下的 actualNum - 1 个挂到了 thread cache 对应的桶当中,所以这里插入对象的个数就是 actualNum - 1。
// 从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum >= 1); if (actualNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(Next(start),end, actualNum - 1); // 修改部分return start;}
}
-
当 thread cache 的某个自由链表过长时,我们实际就是把这个自由链表当中全部的对象都还给 central cache 了,但这里在设计 PopRange 接口时还是设计的是取出指定个数的对象,因为在某些情况下当自由链表过长时,我们可能并不一定想把链表中全部的对象都取出来还给 central cache ,这样设计就是为了增加代码的可修改性。
-
当我们判断 thread cache 是否应该还对象给 central cache 时,还可以综合考虑每个 thread cache 整体的大小。比如当某个 thread cache 的总占用大小超过一定阈值时,我们就将该 thread cache 当中的对象还一些给 central cache ,这样就尽量避免了某个线程的 thread cache 占用太多的内存。对于这一点,在 tcmalloc 当中就是考虑到了的。(但是对我们来说还是太难了,至少我实在是不会)
二、centralcache回收内存
当 thread cache 中某个自由链表太长时,会将自由链表当中的这些对象还给 central cache 中的 span
还给 central cache 的这些对象不一定都是属于同一个 span 的。 central cache 中的每个哈希桶当中可能都不止一个 span ,因此当我们计算出还回来的对象应该还给 central cache 的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个 span
至于说为什么还给 central cache 的这些对象不一定都是属于同一个 span 的,答案在这里
至于说为什么需要区分 Span,原因在这里
那么如何根据对象的地址得到对象所在的页号?
某个页当中的所有地址除以页的大小都等该页的页号。比如我们这里假设一页的大小是100,那么地址0 ~ 99都属于第0页,它们除以100都等于0,而地址100 ~ 199都属于第1页,它们除以100都等于1
如何找到一个对象对应的span?
虽然我们现在可以通过对象的地址得到其所在的页号,但是我们还是不能知道这个对象到底属于哪一个 span 。因为一个 span 管理的可能是多个页
为了解决这个问题,我们可以建立 页号 和 span 之间的映射。由于这个映射关系在 page cache 进行 span 的合并时也需要用到,因此我们直接将其存放到 page cache 里面。这时我们就需要在 PageCache类 当中添加一个映射关系了,这里可以用 C++ 当中的 unordered_map 进行实现,并且添加一个函数接口,用于让 central cache 获取这里的映射关系。
PageCache类当中新增的成员变量及其成员函数
// 单例模式
class PageCache
{
public:// 获取从对象到span的映射Span* MapObjectToSpan(void* obj);
private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
每当 page cache 分配 span 给 central cache 时,都需要记录一下 页号 和 span 之间的映射关系。此后当 thread cache 还对象给 central cache 时,才知道应该具体还给哪一个 span
因此当 central cache 在调用 NewSpan 接口向 page cache 申请 k页 的 span 时, page cache 在返回这个 k页 的 span 给 central cache 之前,应该建立这 k个页号 与该 span 之间的映射关系
// 获取一个K页的span(加映射版本)
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES); // 保证在桶的范围内// 1.检查第k个桶里面有没有Span,有则返回if (!_spanLists[k].Empty()){Span* kSpan = _spanLists[k].PopFront();// 建立页号与span的映射,方便central cache回收小块内存查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}return kSpan;}// 2.检查一下后面的桶里面有没有span,有则将其切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的头部切一个k页下来kSpan->_pageID = nSpan->_pageID;kSpan->_n = k;// 更新nSpan位置nSpan->_pageID += k;nSpan->_n -= k;// 将剩下的挂到对应映射的位置_spanLists[nSpan->_n].PushFront(nSpan);for (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}return kSpan;}}// 3.没有大页的span,找堆申请128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1); // 申请128页内存bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1; // 页数// 将128页span挂接到128号桶上_spanLists[bigSpan->_n].PushFront(bigSpan);// 递归调用自己(避免代码重复)return NewSpan(k);
}
此时我们就可以通过对象的地址找到该对象对应的 span 了,直接将该对象的地址除以页的大小得到页号,然后在 unordered_map 当中找到其对应的 span 即可
// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; // 计算页号auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
注意:当我们要通过某个页号查找其对应的 span 时,该页号与其 span 之间的映射一定是建立过的,如果此时我们没有在 unordered_map 当中找到,则说明我们之前的代码逻辑有问题,因此当没有找到对应的 span 时可以直接用断言结束程序,以表明程序逻辑出错
这时当 thread cache 还对象给 central cache 时,就可以依次遍历这些对象,将这些对象插入到其对应 span 的自由链表当中,并且及时更新该 span 的 _useCount 计数即可
在 thread cache 还对象给 central cache 的过程中,如果 central cache 中某个 span 的 _useCount 减到 0 时,说明这个 span 分配出去的对象全部都还回来了,那么此时就可以将这个 span 再进一步还给 page cache
//将一定数量的对象还给对应的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock(); // 加锁while (start){void* next = NextObj(start); // 记录下一个Span* span = PageCache::GetInstance()->MapObjectToSpan(start);// 将对象头插到span的自由链表NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--; // 更新被分配给thread cache的计数if (span->_useCount == 0) // 说明这个span分配出去的对象全部都回来了{// 此时这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并_spanLists[index].Erase(span);span->_freeList = nullptr; // 自由链表置空span->_next = nullptr;span->_prev = nullptr;// 释放span给page cache时,使用page cache的锁就可以了,这时把桶锁解掉_spanLists[index]._mtx.unlock(); // 解桶锁PageCache::GetInstance()->_pageMtx.lock(); // 加大锁PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock(); // 解大锁_spanLists[index]._mtx.lock(); // 加桶锁}start = next;}_spanLists[index]._mtx.unlock(); // 解锁
}
如果要把某个 span 还给 page cache ,我们需要先将这个 span 从 central cache 对应的双链表中移除,然后再将该 span 的自由链表置空,因为 page cache 中的 span 是不需要切分成一个个的小对象的,以及该 span 的前后指针也都应该置空,因为之后要将其插入到 page cache 对应的双链表中。但 span 当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了
在 central cache 还 span 给 page cache 时也存在锁的问题,此时需要先将 central cache 中对应的桶锁解掉,然后再加上 page cache 的大锁之后才能进入 page cache 进行相关操作,当处理完毕回到 central cache 时,除了将 page cache 的大锁解掉,还需要立刻加上 central cache 对应的桶锁,然后将还未还完对象继续还给 central cache 中对应的 span
三、pagecache回收内存
- 如果 central cache 中有某个 span 的 _useCount 减到 0 了,那么 central cache 就需要将这个 span 还给 page cache 了。
- 这个过程看似是非常简单的, page cache 只需将还回来的 span 挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题, page cache 还需要尝试将还回来的 span 与其他空闲的 span 进行合并。
合并的过程可以分为 向前合并 和 向后合并 。如果还回来的 span 的起始页号是 num ,该 span 所管理的页数是 n 。那么在向前合并时,就需要判断第 num-1 页对应 span 是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。而在向后合并时,就需要判断第 num+n 页对应的 span 是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。
因此 page cache 在合并 span 时,是需要通过页号获取到对应的 span 的,这就是我们要把页号与 span 之间的映射关系存储到 page cache 的原因
当我们通过页号找到其对应的 span 时,这个 span 此时可能挂在 page cache ,也可能挂在 central cache 。而在合并时我们只能合并挂在 page cache 的 span ,因为挂在 central cache 的 span 当中的对象正在被其他线程使用。
可是我们不能通过 span 结构当中的 _useCount 成员,来判断某个 span 到底是在 central cache 还是在 page cache 。因为当 central cache 刚向 page cache 申请到一个 span 时,这个 span 的 _useCount 就是等于 0 的,这时可能当我们正在对该 span 进行切分的时候, page cache 就把这个 span 拿去进行合并了,这显然是不合理的。
基于上面的分析,我们可以在 span 结构中再增加一个 _isUse 成员,用于标记这个 span 是否正在被使用,而当一个 span 结构被创建时我们默认该 span 是没有被使用的。
// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageID = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _next = nullptr; // 双向链表结构Span* _prev = nullptr;size_t _useCount = 0; // 切好的小内存块,被分配给thread cache的计数void* _freeList = nullptr; // 切好的小块内存的自由链表bool _isUse = false; // 是否在被使用
};
当 central cache 向 page cache 申请到一个 span 时 (CentralCache::GetOneSpan函数) ,需要立即将该 span 的 _isUse 改为 true
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//...Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));span->_isUse = true; // 设置为truePageCache::GetInstance()->_pageMtx.unlock();//...
}
当 central cache 将某个 span 还给 page cache 时 (PageCache::ReleaseSpanToPageCache函数) ,也就需要将该 span 的 _isUse 改成 false
// 释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//...span->_isUse = false;
}
由于在合并 page cache 当中的 span 时,需要通过页号找到其对应的 span ,而一个 span 是在被分配给 central cache 时,才建立的各个页号与 span 之间的映射关系,因此 page cache 当中的 span 也需要建立页号与 span 之间的映射关系。
与 central cache 中的 span 不同的是,在 page cache 中,只需建立一个 span 的首尾页号与该 span 之间的映射关系。因为当一个 span 在尝试进行合并时,如果是往前合并,那么只需要通过一个 span 的尾页找到这个 span ,如果是向后合并,那么只需要通过一个 span 的首页找到这个 span 。也就是说,在进行合并时我们只需要用到 span 与其首尾页之间的映射关系就够了。
因此当我们申请 k页 的 span 时,如果是将 n页 的 span 切成了一个 k页 的 span 和一个 n-k页 的 span ,我们除了需要建立 k页 span 中每个页与该 span 之间的映射关系之外,还需要建立剩下的 n-k页 的 span 与其首尾页之间的映射关系。
// 获取一个K页的span(加映射版本)
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES); // 保证在桶的范围内// 1.检查第k个桶里面有没有Span,有则返回if (!_spanLists[k].Empty()){Span* kSpan = _spanLists[k].PopFront();// 建立页号与span的映射,方便central cache回收小块内存查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}return kSpan;}// 2.检查一下后面的桶里面有没有span,有则将其切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;// 在nSpan的头部切一个k页下来kSpan->_pageID = nSpan->_pageID;kSpan->_n = k;// 更新nSpan位置nSpan->_pageID += k;nSpan->_n -= k;// 将剩下的挂到对应映射的位置_spanLists[nSpan->_n].PushFront(nSpan);for (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageID + i] = kSpan;}return kSpan;}}// 3.没有大页的span,找堆申请128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1); // 申请128页内存bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1; // 页数// 将128页span挂接到128号桶上_spanLists[bigSpan->_n].PushFront(bigSpan);// 递归调用自己(避免代码重复)return NewSpan(k);
}
总结
本篇难度相当大,后面可能还会有类似难度的文章,大家要好好消化!!