【高并发内存池】从零到一的项目之centralcache整体结构设计及核心实现
个人主页 : zxctscl
专栏 【C++】、 【C语言】、 【Linux】、 【数据结构】、 【算法】
如有转载请先通知
文章目录
- 前言
- 1. central cache整体结构
- 2. central cache基础结构
- 2.1 span类设计
- 2.2 SpanList带头双向循环链表设计
- 2.3 central cache类设计
- 3. central cache核心实现
- 3.1 单例模式
- 3.2 thread cache获取central cache对象
- 3.2.1 thread cache获取central cache对象设计
- 3.2.2 ThreadCache::FetchFromCentralCache()代码实现
- 3.3 central cache.cpp实现
- 3.3.1 从中心缓存获取一定数量的对象给thread cache
- 3.3.2 central cache.cpp代码
- 3.4 central cache.h代码
前言
接上回的项目 【高并发内存池】从零到一的项目之高并发内存池整体框架设计及thread cache设计继续分享项目创做过程及代码。
1. central cache整体结构
central cache
与thread cache
不同的是:
thread cache
是每一个线程独享的,而central cache
是当所有线程没有内存时候都会去着它,所以central cache
里面是要加锁的。
这里central cache
用的是桶锁,就是每一个桶都有一个锁。
central cache
也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
对比一下thread cache
:
thread cache
下面是一个一个切好的对象,而central cache
挂的是一个一个的span
。
span
是跨度的意思,它来管理以页为单位的大块内存,可能会有多个span
。
// 管理以页为单位的大块内存
struct Span
{PageID _pageId = 0; // 页号size_t _n = 0; // 页的数量Span* _next = nullptr;Span* _prev = nullptr;void* _list = nullptr; // 大块内存切小链接起来,这样回收回来的内存也方便链接size_t _usecount = 0; // 使用计数,==0 说明所有对象都回来了size_t _objsize = 0; // 切出来的单个对象的大小bool _isUse = false; // 是否在使用
};
申请内存:
- 当
thread cache
中没有内存时,就会批量向central cache
申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache
也有一个哈希映射的spanlist
,spanlist
中挂着span
,从span
中取出对象给thread cache
,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。 central cache
映射的spanlist
中所有span
的都没有内存以后,则需要向page cache
申请一个新的span
对象,拿到span
以后将span
管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache
。central cache
的中挂的span
中use_count
记录分配了多少个对象出去,分配一个对象给thread cache
,就++use_count
。
释放内存:
- 当
thread_cache
过长或者线程销毁,则会将内存释放回central cache
中的,释放回来时--use_count
。当use_count
减到0时则表示所有对象都回到了span
,则将span
释放回page cache
,page cache
中会对前后相邻的空闲页进行合并。
2. central cache基础结构
central cache和thread cache的对齐规则是一样的。
不同的是thread cache挂的是小块内存,central cache挂的是span,span的页又会根据映射被切成小块。
没有一个span里面有对象,才会申请新的span。
2.1 span类设计
span设计成了双向链表的形式,因为涉及到内存的申请和释放,释放回来的又重新链接到span上,当span对象全部回来之后,就把span还给page cache,就得删除这个span,进行前后页的合并,解决内存碎片和外碎片的问题。
为了方便插入删除,span就设计成带头双向循环链表。
span管理多个连续页大块内存跨度结构,它里面需要页号。
什么是页号呢?
一个进程的地址空间,如果在32位程序里面,它是4G也就是2的32次方个字节,它被分成8k(2的13次方个字节)就有2的19次方个页。本质上跟地址是一样的。
但是如果在64位程序上面,2的64次方个字节被分成2的13次方个字节,就有2的51次方个页。
如果页号用size_t的话,在32位上是可以的,但64位下就是不行的。
这里就能用条件编译,_WIN64必须在前面,如果在_WIN32上面,没有_WIN64,就会走下面的_WIN32,就会执行typedef size_t PAGE_ID。
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
PAGE_ID _pageId在32位置下就是4字节,64位下就是8字节
span类设计代码:
// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _next = nullptr; // 双向链表的结构Span* _prev = nullptr;size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 切好的小块内存的自由链表
};
2.2 SpanList带头双向循环链表设计
SpanList类就一个成员变量Span* _head
构造就直接指向新的span
插入前先判断插入位置和新span在不在,在将新的newapan插入到pos位置之前:
// 带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;// prev newspan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
};
2.3 central cache类设计
它成员变量就是哈希桶 _spanLists[NFREELIST]
,在thread cache
里面是几号桶,在central cache
里面就是几号桶。
桶锁的设计
在SpanList设计上加上桶锁:
public:std::mutex _mtx; // 桶锁
// 带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;// prev newspan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
3. central cache核心实现
3.1 单例模式
thread cache如何获取到central cache的对象呢?
可以用全局变量,而项目要求全局只有唯一一个,每一个线程独享的central cache
的对象。
想要每一个线程都有独享的central cache
的对象,那么就把central cache
设置为单例模式下的饿汉模式。
static CentralCache _sInst;
单例模式不想让别人创建对象,就把构造函数设置成私有,构造函数初始化列表不写,默认就对自定义的成员变量初始化,就会调用SpanList进行初始化。
为了防止拿到对象以后,拷贝构造,把拷贝构造函数也设置成私有:
private:SpanList _spanLists[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
提供一个公有成员函数GetInstance(),获取实例对象。
// 单例模式
class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t byte_size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
3.2 thread cache获取central cache对象
3.2.1 thread cache获取central cache对象设计
每一个线程找到它独享的thread cache
对象,去调用thread cache
对象里面的Allocate()
,Allocate()
算好它桶的位置,如果这个位置有,就直接弹一个对象出去就行;如果没有,就得调用FetchFromCentralCache()
。
thread cache
需要一个,central cache
就分配一个吗?
就会有不断申请内存的需求,thread cache
需要就找central cache
申请,thread cache
本身是无锁的,但是central cache
有锁,出现锁竞争就麻烦,效率就降低了。
所以central cache
就多给一些,thread cache
需要一个,central cache
就分配10个,后面的9次来申请,都是无锁的;那么第11次申请就是有锁的。
thread cache
需要一个,central cache
就分配100个,就会造成浪费。
所以这里就采用慢开始反馈调节算法。
在Common.h中:
// 一次thread cache从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 小对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}
每一个_freeLists桶都有MaxSize(),批量申请的多少,就是MaxSize()和SizeClass::NumMoveSize(size)中小的那个;
size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
第二次再来申请的时候,如果批量申请的大小等于_freeLists[index].MaxSize(),那么_freeLists[index].MaxSize()加1。
if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}
慢开始反馈调节算法
1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
3、size越大,一次向central cache要的batchNum就越小
4、size越小,一次向central cache要的batchNum就越大
拿到了CentralCache对象,就得记录它的开始和结束位置。如果实际得到CentralCache对象的空间,可能与需要的不同,就得记录下实际拿到的大小size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
如果实际就获取到一个,就返回start;获取多个,就把这多个对象插入到自由链表中:
if (actualNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(NextObj(start), end);return start;}
在Common.h中自由链表中,实现一定范围头插。
void PushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}
3.2.2 ThreadCache::FetchFromCentralCache()代码实现
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// 慢开始反馈调节算法// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum > 1);if (actualNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(NextObj(start), end);return start;}
}
3.3 central cache.cpp实现
3.3.1 从中心缓存获取一定数量的对象给thread cache
获取非空的span,在后面博客中会具体实现,在这里假设已经实现了。
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// ...return nullptr;
}
想要从中心缓存获取一定数量的对象给thread cache,可能会出现多个线程找到同一个桶,首先就得先加锁:_spanLists[index]._mtx.lock();
加了锁最后一定得解锁:_spanLists[index]._mtx.unlock();
在span里面找非空的span:Span* span = GetOneSpan(_spanLists[index], size);
得判断一下获取到的span是不是空。
central cache.cpp下
从span中获取batchNum个对象,从start位置开始要3个
还得考虑把拿出三个在span置为空,用start就比较麻烦,那么就用end往后走batchNum-1步:
让span的next指针指向end的next,再让end的next指向空
start = span->_freeList;end = start;for (size_t i = 0;i<batchNum-1;++i){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;
但是可能会存在一个问题,batchNum大于span里面对象数量。
此时就span里面有多少就给多少:
start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;
3.3.2 central cache.cpp代码
#include "CentralCache.h"CentralCache CentralCache::_sInst;// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// ...return nullptr;
}// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);// 从span中获取batchNum个对象// 如果不够batchNum个,有多少拿多少start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;_spanLists[index]._mtx.unlock();return actualNum;
}
3.4 central cache.h代码
#pragma once#include "Common.h"// 单例模式
class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t byte_size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
有问题请指出,大家一起进步!!!