【高并发内存池】六、三种缓存的回收内存过程
文章目录
- 前言
- Ⅰ. `thread cache`的内存回收
- Ⅱ. `central cache`的内存回收
- Ⅲ. `page cache`的内存回收

前言
前面我们将内存的申请流程都走通了,现在就是内存回收的过程,主要是从 thread cache
开始,一层一层往下回收,因为我们调用的申请内存接口也是从 thread cache
开始的!
而每一层回收内存的方式其实都不太一样,因为我们还有一个负载均衡的机制,就是当一个线程缓存中内存块太多了,我们要将这些内存块合并成一个大的内存块对象也就是 span
然后返回给中心缓存,而中心缓存则将那些线程都还回来的 span
对象再组织成一个更大的 span
对象返回给页缓存保存起来!所以我们需要将它们分开来讲解!
Ⅰ. thread cache
的内存回收
线程缓存的内存回收其实并不难,就是将用户不需要的小内存块重新插入到对应哈希桶的空闲链表中,此时加入一个负载均衡的判断:判断一下此时如果对应空闲链表中的内存块个数大于了当前申请内存块的数量的时候,我们就认为当前申请内存块的数量对应的内存块是不用到的,就将它们从空闲链表中弹出,然后返回这些内存块的起始地址 start
给中心缓存,这样子做的目的是防止这些不用的内存块被浪费了!
当然 tcmalloc
源码中实现的会更加复杂,它还会判断整个哈希桶中的内存块的个数来进行负载均衡,而我们只是一个简化版本,不过重要的是学习这个思想!
所以我们来完善一下之前 thread cache
中遗留下的 deallocate()
函数,如下所示:
// ThreadCache.cpp文件
// 释放内存接口
void ThreadCache::deallocate(void* ptr, size_t size)
{assert(size <= THREAD_MAX_SIZE); // 要求申请的空间是小于256KB的assert(ptr != nullptr);// 将ptr指向的空间插入到对应的哈希桶中即可size_t index = AlignClass::get_index(size);_freelists[index].push(ptr);// 当一个桶中的内存块超过了其当前申请内存块的数量的时候,// 就将当前申请的内存块数量的内存块还给central cache,防止占用太多内存块if (_freelists[index].get_size() > _freelists[index].get_maxsize())give_back_memory(_freelists[index], size);
}
此时需要提供一些细节,比如上面的获取当前空闲链表的内存块个数 get_size()
函数,以及待会 give_back_memory()
函数中需要用到将多个内存块从空闲链表弹出的操作,所以要提供一个 pop_range()
接口,然后因为我们需要对内存块进行计数,所以需要在之前的 push
、pop
、append
函数中处理一下计数,所以我们都在 FreeList
类中完善这些接口,具体看注释部分即可:
// Common.h文件
class FreeList
{
private:void* _freelist = nullptr; // 指向空闲链表头节点的指针size_t _size = 0; // 表示当前空闲链表中内存块的个数size_t _maxsize = 1; // 表示当前空闲链表申请空间块时允许获得的最大个数(会不断递增)
public:// 将单个内存对象头插插入空闲链表void push(void* obj){assert(obj != nullptr);get_next(obj) = _freelist;_freelist = obj;_size++; // 记得累加个数}// 将多个内存对象头插插入空闲链表void append(void* start, void* end, size_t size){assert(start != nullptr && end != nullptr);get_next(end) = _freelist;_freelist = start;_size += size; // 记得累加个数}// 头删方式取出空闲链表中的空间void* pop(){assert(_freelist != nullptr);void* obj = _freelist;_freelist = get_next(obj);_size--; // 记得减去个数return obj;}// 将多个内存块从空闲链表弹出,通过输出型参数获取首尾地址void pop_range(void*& start, void*& end, size_t num){// 1. 先定位首尾位置(end要停在第num块上,所以实际走的是num-1步)start = _freelist;end = start;for (int i = 0; i < num - 1; ++i)end = get_next(end);// 2. 将该区间的内存块与空闲链表断开,然后将end的next置为空即可_freelist = get_next(end);get_next(end) = nullptr;// 3. 最后别忘了内存块个数要减少_size -= num;}bool empty() { return _freelist == nullptr; }size_t& get_maxsize() { return _maxsize; }size_t& get_size() { return _size; }
};
最后就是来实现另一个辅助接口 give_back_memory()
,因为我们上面 deallocate()
接口只是负责判断是否需要启动负载均衡操作,而具体操作要交给该函数来实现!
其实并不难,其内部就是调用中心缓存的一个接口 merge_memory_from_ThreadCache()
来合并到中心缓存中,具体的实现我们到中心缓存部分再讲,下面是 give_back_memory()
函数的实现:
// ThreadCache.cpp
// 释放对象时,链表过长时,归还对应数量的内存块给central cache
void ThreadCache::give_back_memory(FreeList& list, size_t size)
{// 1. 先从当前空闲链表中取出当前申请内存块的数量个内存块void* start = nullptr;void* end = nullptr;list.pop_range(start, end, list.get_maxsize());// 2. 然后调用central cache的接口将这些内存块合并到central cache中CentralCache::get_instance()->merge_memory_from_ThreadCache(start, size);
}
Ⅱ. central cache
的内存回收
现在中心缓存要完成的无非就是上面遗留下的
merge_memory_from_ThreadCache()
接口,但是现在有个问题就是,我们拿到了要回收到中心缓存的一串小内存块链表的起始地址start
,并且知道它们的总大小为size
,此时我们需要找到这些小内存块对应的是中心缓存中的哪个span
对象呀,这怎么办❓❓
其实可以 用得到的每个小内存块的地址 start
来除以一页的大小(我们规定是 8K
),就能得到该小内存块对应的起始页号,然后我们可以暴力一点,去中心缓存或者页缓存中遍历所有的 span
对象的页号进行比对,就能确认每个小内存块对应的是哪个 span
对象了(因为小内存块的排序也是混乱的,所以需要每个小内存块都去查找),但问题是这种查找方式的时间复杂度比较高,是 O(n^2)
,这是我们无法接受的!
既然说到了比对,我们就想到可以用哈希表来快速索引,所以我们可以 在 PageCache
类中添加一个哈希表,建立一下每个页号与对应 span
对象的映射关系,这样子我们得到了页号就能快速找到其对应的 span
对象了,时间是非常快的!(至于为什么不是在 CentralCache
类中添加哈希表,是因为后面 PageCache
也会用到页号来查找对应内存块的对象,那我们干脆就存放在 PageCache
类中就行了,就不用再多存一份了!)
下面只需要关心注释部分:
// PageCache.h
#pragma once
#include "Common.h"class PageCache
{
private:SpanList _spanlists[PAGELIST_NUMS];std::mutex _mtx; static PageCache _page_instance; std::unordered_map<page_t, Span*> _tables; // 存放页号与对应span对象的映射关系
public:static PageCache* get_instance() { return &_page_instance; }Span* new_span(size_t k);std::mutex& get_mutex() { return _mtx; }
private:PageCache() {}PageCache(const PageCache&) = delete;PageCache& operator=(const PageCache&) = delete;
};
然后我们需要 在 new_span()
函数中添加上建立页号和对应 span
对象关系的操作,也就两行代码,如下所示:(也是只需要关心注释部分)
// PageCache.cpp
// 获取一个k页大小的span
Span* PageCache::new_span(size_t k)
{// 看看当前k页大小的哈希桶中是否有可用的span对象,找到了直接将其取出然后进行返回即可if (!_spanlists[k].empty()){Span* span = _spanlists[k].pop_front();// 建立back中每个页号和span的映射,方便central cache回收小块内存时,查找对应的spanfor (page_t i = 0; i < span->_num; ++i)_tables[span->_pid + i] = span;return span;}for (int i = k + 1; i < PAGELIST_NUMS; ++i){if (!_spanlists[i].empty()){Span* front = _spanlists[i].pop_front();Span* back = new Span;back->_pid = front->_pid;back->_num = k;front->_pid += k;front->_num -= k;_spanlists[front->_num].push_front(front);/*对于front中每个页号与span的映射,我们只需要记录首尾页号即可,因为front是没有分配中心缓存使用的,所以后面我们再合并已经分配的那些内存块的时候,是中心发散的去查找内存块左右临近的内存块进行合并的,此时我们对于front的页号与span的映射只需要记录下首尾即可,而不需要向back一页每个小内存块的页号都要去进行映射,因为back是被分配去使用的,所以需要进行每个页号的映射*/_tables[front->_pid] = front;_tables[front->_pid + front->_num - 1] = front;// 建立页号和span的映射,方便central cache回收小块内存时,查找对应的spanfor (page_t i = 0; i < back->_num - 1; ++i)_tables[back->_pid + i] = back;return back;}}void* ptr = SystemAlloc(PAGELIST_NUMS - 1);Span* newspan = new Span;newspan->_num = PAGELIST_NUMS - 1;newspan->_pid = (page_t)ptr >> PAGE_SHIFT;_spanlists[PAGELIST_NUMS - 1].push_front(newspan);return new_span(k);
}
此时我们可以提供一个功能函数,用于将传入的内存块地址获取对应的 span
对象指针,如下所示:
// PageCache.cpp
// 根据传入的内存块地址返回对应的span对象的指针
Span* PageCache::get_span_from_pageID(void* ptr)
{// 1. 先根据地址求出其所属的页号page_t pid = ((page_t)ptr >> PAGE_SHIFT);// 2. 找到对应的页号对应的span指针进行返回auto it = _tables.find(pid);if (it != _tables.end())return it->second;elsereturn nullptr;
}
有了上面的铺垫,我们就能实现 merge_memory_from_ThreadCache()
函数了,但是需要借助到页缓存中实现的 merge_memory_from_CentralCache()
接口,这个接口等下面我们将页缓存的回收内存再来讲!剩下的具体步骤参考下面代码以及注释:
// CentralCache.cpp
// 将size大小的内存块合并到对应的span哈希桶中
void CentralCache::merge_memory_from_ThreadCache(void* start, size_t size)
{// 1. 获取对应哈希桶下标并且加上桶锁size_t index = AlignClass::get_index(size);_spanlists[index].get_mutex().lock();// 2. 遍历每一个小内存块while (start != nullptr){// 3. 找到小内存块对应的span对象,并将该小内存块链接到该span上面(先记录下start后面的指针next,防止start修改连接后丢失)void* next = get_next(start);Span* span = PageCache::get_instance()->get_span_from_pageID(start);get_next(start) = span->_freelist;span->_freelist = start;// 4. 减少span的使用个数,然后判断是否可以回收给PageCachespan->_use_count--;if (span->_use_count == 0){// 可以的话调用PageCache对应的接口回收该span对象// 再次之前要先将该span对象从central cache中弹出,并且修改span的链接属性_spanlists[index].erase(span);span->_freelist = nullptr;span->_next = span->_prev = nullptr;// 因为涉及到页缓存操作,所以需要加锁,并且此时可以顺便先将中心缓存的锁给释放,让其它线程可以执行,提高效率_spanlists[index].get_mutex().unlock();PageCache::get_instance()->get_mutex().lock();PageCache::get_instance()->merge_memory_from_CentralCache(span); // 这步就是PageCache回收span对象的操作PageCache::get_instance()->get_mutex().unlock();_spanlists[index].get_mutex().lock();}// 5. 别忘了让start迭代start = next;}// 6. 最后别忘了释放桶锁_spanlists[index].get_mutex().unlock();
}
Ⅲ. page cache
的内存回收
页缓存的内存回收,就是尝试将中心缓存返回的小 span
对象以及其相邻的空闲的 span
对象合并成一个页。比如中心缓存中一个 span
对象中有 3
页,其左边有一个空闲的 span
对象为 2
页大小,那么就将它们两个合并成一个 5
页大小的 span
对象交给页缓存管理!
但此时有一个问题,就是我们得知道其相邻的 span
对象是否为空闲状态,如果我们用 _use_count == 0
去判断为空闲的话,是不合适的,因为有可能别的线程正在准备使用该相邻的内存块,但是还没有将其 _use_count
进行改变,此时如果我们就将其判断为空闲然后回收的话,那么别的线程就出错了!
所以需要 在 Span
结构体中再添加一个布尔值变量 _is_used
来表示当前对象是否被线程使用着。
// 管理以页为单位的大内存块
struct Span
{page_t _pid = 0; // 大块内存起始页的页号size_t _num = 0; // 页的个数Span* _next = nullptr; // 双向链表结构Span* _prev = nullptr;size_t _use_count = 0; // 当前分配给ThreadCache对象的小内存块个数void* _freelist = nullptr; // 当前大内存块对应的空闲链表bool _is_used = false; // 表示当前对象是否被线程使用着
};
然后我们只需要用中心扩散法,向前和向后合并空闲的内存块即可,最后再将合并好的内存块挂到 page cache
中,如下所示,具体参考代码注释:
// PageCache.cpp
// 将CentralCache传来的span对象合并到页缓存中管理
void PageCache::merge_memory_from_CentralCache(Span* span)
{// 1. 向前合并空闲的span对象while (true){// 获取前面一个span对象(如果没有则直接退出)page_t prev_pid = span->_pid - 1;auto it = _tables.find(prev_pid);if (it == _tables.end())break;// 如果span对象被使用着,或者和当前span的页数加起来超过了PAGELIST_NUMS-1页的话,则直接退出Span* prev_span = it->second;if (prev_span->_is_used == true)break;if (prev_span->_num + span->_num > PAGELIST_NUMS - 1)break;// 走到这说明prev_span对象是可以合并的,则将其进行合并操作// 合并操作其实只需要修改span的属性即可,然后将原本prev_span从哈希表中去除,最后释放prev_span空间的内容即可span->_pid = prev_span->_pid;span->_num += prev_span->_num;_spanlists[prev_span->_num].erase(prev_span);delete prev_span;}// 2. 向后合并空闲的span对象(和上面是类似的操作,就是位置变了而已)while (true){// 获取后面一个span对象(如果没有则直接退出)page_t next_pid = span->_pid + span->_num;auto it = _tables.find(next_pid);if (it == _tables.end())break;// 如果span对象被使用着,或者和当前span的页数加起来超过了PAGELIST_NUMS-1页的话,则直接退出Span* next_span = it->second;if (next_span->_is_used == true)break;if (next_span->_num + span->_num > PAGELIST_NUMS - 1)break;// 走到这说明next_span对象是可以合并的,则将其进行合并操作// 合并操作其实只需要修改span的属性即可,然后将原本next_span从哈希表中去除,最后释放next_span空间的内容即可span->_num += next_span->_num;_spanlists[next_span->_num].erase(next_span);delete next_span;}// 3. 将合并后的span对象插入到page cache的哈希桶中,然后设置其映射关系_spanlists[span->_num].push_front(span);span->_is_used = false;_tables[span->_pid] = span;_tables[span->_pid + span->_num - 1] = span;
}