【高并发内存池——项目】central cache 讲解
提示:高并发内存池完整项目代码,在主页专栏项目中
文章目录
目录
提示:高并发内存池完整项目代码,在主页专栏项目中
高并发内存池整体框架设计
一、central cache 完整代码实现
Common.h
ThreadCache.h
ThreadCache.cpp
CentralCache.h
CentralCache.cpp
ConcurrentAlloc.h
二、核心代码解析
1. CentralCache类定义
2. 跨度结构Span
3. 跨度链表SpanList
4. 批量获取内存对象
5. 智能批量分配策略
高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本 ⾝其实已经很优秀,那么我们项⽬的原型tcmalloc就是在多线程⾼并发的场景下更胜⼀筹,所以这次 我们实现的内存池需要考虑以下⼏⽅⾯的问题。
1.性能问题。
2. 多线程环境下,锁竞争问题。
3. 内存碎⽚问题。
concurrent memorypool主要由以下3个部分构成:
1. threadcache:线程缓存是每个线程独有的,⽤于⼩于256KB的内存的分配,线程从这⾥申请内存 不需要加锁,每个线程独享⼀个cache,这也就是这个并发线程池⾼效的地⽅。
2. central cache:中⼼缓存是所有线程所共享,threadcache是按需从centralcache中获取的对 象。centralcache合适的时机回收threadcache中的对象,避免⼀个线程占⽤了太多的内存,⽽其 他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的⽬的。centralcache是存在竞 争的,所以从这⾥取内存对象是需要加锁,⾸先这⾥⽤的是桶锁,其次只有threadcache的没有内 存对象时才会找centralcache,所以这⾥竞争不会很激烈。
3. pagecache:⻚缓存是在centralcache缓存上⾯的⼀层缓存,存储的内存是以⻚为单位存储及分 配的,centralcache没有内存对象时,从pagecache分配出⼀定数量的page,并切割成定⻓⼤⼩ 的⼩块内存,分配给centralcache。当⼀个span的⼏个跨度⻚的对象都回收以后,pagecache会 回收centralcache满⾜条件的span对象,并且合并相邻的⻚,组成更⼤的⻚,缓解内存碎⽚的问 题。
一、central cache 完整代码实现
Common.h
#pragma once#include <iostream>
#include <vector>
#include <algorithm>
#include <time.h>
#include <assert.h>
#include <thread>
#include <mutex>using std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else// linux
#endifstatic void*& NextObj(void* obj)
{return *(void**)obj;
}// 管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);NextObj(obj) = _freeList;_freeList = obj;}void PushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}private:void* _freeList = nullptr;size_t _maxSize = 1;
};// 计算对象大小的对齐映射规则
class SizeClass
{
public:static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128) return _RoundUp(size, 8);else if (size <= 1024) return _RoundUp(size, 16);else if (size <= 8*1024) return _RoundUp(size, 128);else if (size <= 64*1024) return _RoundUp(size, 1024);else if (size <= 256 * 1024) return _RoundUp(size, 8*1024);else { assert(false); return -1; }}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) return _Index(bytes, 3);else if (bytes <= 1024) return _Index(bytes - 128, 4) + group_array[0];else if (bytes <= 8 * 1024) return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];else if (bytes <= 64 * 1024) return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];else if (bytes <= 256 * 1024) return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];else { assert(false); }return -1;}static size_t NumMoveSize(size_t size){assert(size > 0);int num = MAX_BYTES / size;if (num < 2) num = 2;if (num > 512) num = 512;return num;}
};// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0;size_t _n = 0;Span* _next = nullptr;Span* _prev = nullptr;size_t _useCount = 0;void* _freeList = nullptr;
};// 带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx;
};
ThreadCache.h
#pragma once
#include "Common.h"class ThreadCache
{
public:void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
ThreadCache.cpp
#include "ThreadCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{return nullptr;
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}void* ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);
}
CentralCache.h
#pragma once
#include "Common.h"class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}Span* GetOneSpan(SpanList& list, size_t byte_size);size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST];CentralCache() {}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
CentralCache.cpp
#include "CentralCache.h"CentralCache CentralCache::_sInst;Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{return nullptr;
}size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;_spanLists[index]._mtx.unlock();return actualNum;
}
ConcurrentAlloc.h
#pragma once
#include "Common.h"
#include "ThreadCache.h"static void* ConcurrentAlloc(size_t size)
{if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
二、核心代码解析
当线程申请某一大小的内存时,如果thread cache中对应的自由链表不为空,那么直接取出一个内存块进行返回即可,但如果此时该自由链表为空,那么这时thread cache就需要向central cache申请内存了。
central cache的结构与thread cache是一样的,它们都是哈希桶的结构,并且它们遵循的对齐映射规则都是一样的。这样做的好处就是,当thread cache的某个桶中没有内存了,就可以直接到central cache中对应的哈希桶里去取内存就行了。
central cache与thread cache有两个明显不同的地方,首先,thread cache是每个线程独享的,而central cache是所有线程共享的,因为每个线程的thread cache没有内存了都会去找central cache,因此在访问central cache时是需要加锁的。
但central cache在加锁时并不是将整个central cache全部锁上了,central cache在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。
central cache与thread cache的第二个不同之处就是,thread cache的每个桶中挂的是一个个切好的内存块,而central cache的每个桶中挂的是一个个的span。
每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶这些内存块被切成了对应的大小。
1. CentralCache类定义
class CentralCache
{
public:static CentralCache* GetInstance() // 单例模式{return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t byte_size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST]; // 跨度链表数组private:CentralCache() {} // 私有构造函数CentralCache(const CentralCache&) = delete; // 禁止拷贝static CentralCache _sInst; // 单例实例
};
CentralCache采用单例模式设计,确保整个进程中只有一个实例。它维护了一个SpanList数组,每个元素对应不同大小的内存块。
2. 跨度结构Span
struct Span
{PAGE_ID _pageId = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _next = nullptr; // 双向链表结构Span* _prev = nullptr;size_t _useCount = 0; // 被分配给thread cache的计数void* _freeList = nullptr; // 切好的小块内存的自由链表
};
Span是内存管理的基本单位,代表一块连续的内存页:_pageId和_n:标识内存位置和大小_useCount:跟踪内存使用情况,为0时可回收_freeList:管理切分后的小内存块
3. 跨度链表SpanList
class SpanList
{
public:SpanList() // 构造带头双向循环链表{_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan); // 插入操作void Erase(Span* pos); // 删除操作private:Span* _head;
public:std::mutex _mtx; // 桶锁,每个桶独立加锁
};
SpanList采用带头双向循环链表结构,这种设计:插入删除操作高效(O(1)时间复杂度)每个桶有独立的锁,减少锁竞争便于遍历和管理Span
4. 批量获取内存对象
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock(); // 加桶锁Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);// 从span中获取batchNum个对象start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;span->_useCount += actualNum; // 增加使用计数_spanLists[index]._mtx.unlock(); // 解桶锁return actualNum;
}
这个过程体现了CentralCache的核心工作流程:
-
按大小定位:通过
SizeClass::Index
找到对应的内存桶 -
加桶锁:只锁定当前操作的桶,其他桶仍可并发访问
-
获取Span:找到有可用内存的Span
-
批量提取:一次性获取多个对象,减少频繁申请
-
更新计数:记录Span的使用情况
5. 智能批量分配策略
// 一次thread cache从中心缓存获取多少个
static size_t NumMoveSize(size_t size)
{assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;
}
这个函数实现了慢启动的批量分配策略:
-
小对象:一次性获取较多数量(最多512个)
-
大对象:一次性获取较少数量(最少2个)
-
动态调整:根据对象大小智能计算批量数量