高并发内存池
1. 项目介绍
该项目的原型是Google的开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代内存分配函数malloc和free
我们将tcmalloc的核心框架提取出来,模拟实现一个简单版本的高并发内存池,目的是学习tcmalloc的精华
主要的技术栈:
- 数据结构:链表、哈希桶
- C/C++:基础语法
- 操作系统:内存管理、多线程
- 设计模式:单例模式
2. 池化技术
所谓池化技术,就是一次向系统申请过量的资源,没用完的自己管理起来,以备不时之需
之所以要一次申请过量的资源,是因为向系统申请资源是有很大的开销的,频繁的向系统申请资源会降低系统的效率
如果一次申请过量的资源,下次再要时,就不需要向系统申请了,从而提高整个系统的性能
内存池、进程池、线程池、连接池等都使用了池化技术
内存池
所谓内存池,就是程序预先向操作系统申请一块大量的内存,当需要申请内存时,从内存池中获取,需要释放内存时,向内存池中释放;当程序结束时,内存池中的内存才会归还给操作系统
内存池的作用:
- 提升了系统整体的效率
- 减少系统的内存外碎片问题
上述图中,两块内存已经释放,但不连续的状况,称为内存外碎片问题
malloc
平常想从堆上获取内存,调用malloc函数
实际上,malloc底层也是采用内存池的方式,也就是说,调用malloc并不是直接从堆上获取内存,也是从malloc中的内存池获取内存
既然malloc也是采用内存池实现的,为啥我们还要重新设计自己的内存池?
malloc它适用于大部分场景,既然是通用的,就代表它的性能不会太高;接下来我们设计的内存池针对不同的场景有不同的内存池
3. 定长内存池
- 预先向系统申请一块大内存,挂在memory下
- 用户要申请内存时,向定长内存池获取:首先查看freelist是否为空,不为空直接从freelist取下内存块返回,否则从memory中切下内存返回给用户
- 用户要释放内存时,定长内存池将释放的内存块挂在freelist下,以链表的方式组织起来
- freelist中每个内存块的前4/8字节指向下一个内存块
#ifdef _WIN32
#include <windows.h>
#elif
// linux下的头文件
#endif
void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#elif
// linux下申请内存的系统调用
#endif
if (ptr == nullptr) throw std::bad_alloc();
return ptr;
}
// 返回当前内存块的下一个内存块
void*& NextObj(void* ptr)
{
return *(void**)ptr;
}
template<class T>
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;
if (_freeList)
{
void* next = NextObj(_freeList);
obj = (T*)_freeList;
_freeList = next;
}
else
{
// 一个内存块必须能容纳一个指针
size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
if (_remain < objsize)
{
_remain = 128 * 1024;
_memory = (char*)SystemAlloc(_remain >> 13);
}
obj = (T*)_memory;
_memory += objsize;
_remain -= objsize;
}
// 定位new,显示调用T的构造函数
new(obj)T;
return obj;
}
void Delete(T* ptr)
{
// 显示调用T的析构函数
ptr->~T();
NextObj(ptr) = _freeList;
_freeList = ptr;
}
private:
char* _memory = nullptr;
size_t _remain = 0;
void* _freeList = nullptr;
};
4. 高并发内存池整体框架
大部分的应用场景都是多线程的,必然存在锁竞争的问题;tcmalloc在多线程并发场景下比malloc要略胜一筹,因此,我们模拟实现的tcmalloc主要考虑的问题有:
- 多线程锁竞争
- 性能
- 内存碎片
高并发内存池主要由3个部分构成
ThreadCache
- 每个线程独享一个ThreadCache,因此ThreadCache不需要加锁
- ThreadCache中有多个freelist,每个freelist下挂着不同大小的内存块,用户申请内存最先从这里获取,释放内存也向这里释放,挂在指定的freelist下
- 当某个freelist下内存块的个数超过某个值,将该freelist下的所有内存块归还给CentralCache
CentralCache
- 当ThreadCache不能满足用户的要求,ThreadCache按需向CentralCache获取内存块
- CentralCache在合适的时候回收ThreadCache中的对象,避免一个线程占了太多内存,达到内存分配在多个线程中更均衡的按需调度的目的
- 所有线程共享一个CentralCache,因此需要加锁
PageCache
- 该模块的每个对象是以页为单位的大块内存
- 当CentralCache分配出去的小块内存都回收时,PageCache会回收CentralCache中的对象,同时跟自身的对象合并成更大的页,缓解内存碎片的问题
- 多个线程共享一个PageCache,因此需要加锁
5. 申请内存的逻辑
ThreadCache
ThreadCache是一个哈希桶的结构,每个桶对应不同大小的内存块
申请内存时,先将要申请的空间大小对齐到某个数,再根据这个对齐数计算映射到哪个桶,再去对应的桶中取出内存块
对齐数与映射到的桶之间的关系:
字节数 | 对齐数 | 映射到的桶 |
---|---|---|
[1, 128] | 8 | [0, 15] |
[128 + 1, 1024] | 16 | [16, 71] |
[1024 + 1, 8 * 1024] | 128 | [72, 127] |
[8 * 1024 + 1, 64 * 1024] | 1024 | [128, 183] |
[64 * 1024 + 1, 256 * 1024] | 8 * 1024 | [184, 207] |
能向ThreadCache申请的内存最大为256KB,总共有208个桶
这样做会有一定的内存浪费,也叫做内碎片问题,比如要申请129字节,但对齐到144字节,有15字节浪费了
总体的浪费率保持在10%左右
如何根据字节数算出它对齐到的数以及映射到哪个桶?
// Communal.h
class SizeClass
{
inline static size_t _AlignUp(size_t size, size_t align)
{
if (size % align == 0) return size;
return (size / align + 1) * align;
}
// 计算对齐到的数
static size_t AlignUp(size_t size)
{
if (size <= 128) return _AlignUp(size, 8);
else if (size <= 1024) return _AlignUp(size, 16);
else if (size <= 8 * 1024) return _AlignUp(size, 128);
else if (size <= 64 * 1024) return _AlignUp(size, 1024);
else if (size <= 256 * 1024) return _AlignUp(size, 8 * 1024);
else
{
assert(false);
return -1;
}
}
inline static size_t _Index(size_t size, size_t align)
{
if (size % align == 0) return size / align - 1;
return size / align;
}
// 计算映射到的桶的下标
static size_t Index(size_t size)
{
int array[4] = { 16, 56, 56, 56 };
if (size <= 128)
return _Index(size, 8);
else if (size <= 1024)
return _Index(size - 128, 16) + array[0];
else if (size <= 8 * 1024)
return _Index(size - 1024, 128) + array[0] + array[1];
else if (size <= 64 * 1024)
return _Index(size - 8 * 1024, 1024) + array[0] + array[1] + array[2];
else if (size <= 256 * 1024)
return _Index(size - 64 * 1024, 8 * 1024) + array[0] + array[1] + array[2] + array[3];
else
{
assert(false);
return -1;
}
}
};
首先是最上层ConcurrentAlloc,如果ThreadCache的指针对象为空,则创建一个ThreadCache,向ThreadCache申请内存
void* ConcurrentAlloc(size_t size)
{
assert(size <= MAX_SIZE); // 后续调整
if (pThreadCache == nullptr)
pThreadCache = new ThreadCache;
return pThreadCache->Allocate(size);
}
ThreadCache的内存申请结构
// Communal.h
static void*& NextObj(void* ptr)
{
return *(void**)ptr;
}
class FreeList
{
public:
bool Empty()
{
return _freeList == nullptr;
}
void* PopFront()
{
void* ptr = _freeList;
_freeList = NextObj(_freeList);
return ptr;
}
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
}
size_t& Factor()
{
return _factor;
}
private:
void* _freeList = nullptr;
size_t _factor = 1; // 调节因子
};
// ThreadCache.h
class ThreadCache
{
public:
// 从ThreadCache中获取内存块
void* Allocate(size_t size);
// 向CentralCache获取一定数量的小块内存
void* FetchMemoryFromCentralCache(size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// 每个线程独有一个ThreadCache
static _declspec(thread) ThreadCache* pThreadCache = nullptr;
// ThreadCache.cpp
void* ThreadCache::Allocate(size_t size)
{
size_t alignsize = SizeClass::AlignUp(size);
size_t index = SizeClass::Index(size);
if (!_freeLists[index].Empty())
return _freeLists[index].PopFront();
return FetchMemoryFromCentralCache(size);
}
void* ThreadCache::FetchMemoryFromCentralCache(size_t size)
{
size_t alignsize = SizeClass::AlignUp(size);
size_t index = SizeClass::Index(size);
/*
* 慢开始调节机制:
* 一方面:小内存给多点,大内存给少点,通过NumOfMemory控制
* 另一方面:每个FreeList中都有一个调节因子factor,随着要的次数增多,factor值越大,一次给出的内存个数越多
* 取两者中的较小值
* 通过这种机制,达到不至于最开始给的太多导致用不完
*/
int num = std::min(_freeLists[index].Factor(), SizeClass::NumOfMemory(alignsize));
if (num == _freeLists[index].Factor())
_freeLists[index].Factor()++;
void* start = nullptr, * end = nullptr;
size_t actual = CentralCache::GetSingleton()->RemoveMemory(size, start, end, num);
if (actual == 1)
assert(start == end);
else
_freeLists[index].PushRange(NextObj(start), end, actual - 1);
return start;
}
CentralCache
CentralCache中每个桶都有一个SpanList的双向循环链表结构,节点是一个个的Span对象
Span对象中记录着当前Span管理的大块内存起始id、页数、分配给ThreadCache小块内存的个数等属性
由于不同的线程可能会去不同的SpanList获取内存,因此,这里使用桶锁
// Communal.h
struct Span
{
PAGE_ID _page_id = 0; // Span管理的内存的起始id
size_t _n = 0; // Span管理内存的页数
size_t _count = 0; // CentralCache分配出去的小块内存个数
Span* _prev = nullptr;
Span* _next = nullptr;
void* _freeList = nullptr; // 自由链表
};
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
private:
Span* _head = nullptr;
public:
std::mutex _mtx; // 桶锁
};
所有线程公用一个CentralCache,将CentralCache设计为单例模式,在.cpp文件中定义单例对象
// CentralCache.h
class CentralCache
{
public:
// 获取单例对象
static CentralCache* GetSingleton()
{
return &_singleton;
}
// 分配num个内存块给ThreadCache,返回实际分配内存块的个数
size_t RemoveMemory(size_t size, void*& start, void*& end, int num);
// 获取一个Span对象
Span* GetOneSpan(SpanList& spanList, size_t size);
private:
SpanList _spanLists[NFREELIST];
static CentralCache _singleton; // 单例模式
CentralCache() {}
CentralCache(const CentralCache&) = delete;
};
CentralCache分配指定数量的小块内存给ThreadCache时,先找到对应的桶,在桶中查找是否有Span,其自由链表不为空,这是GetOneSpan的逻辑
拿到一个Span后,其自由链表下小块内存的个数不一定满足需要,这时就有多少给多少
整个RemoveMemory过程分为两步,1) 获取Span,2) 取出小块内存,不管是哪一步,都需要对_spanLists进行修改,因此,操作之前进行加锁,返回实际给出的小块内存的个数
// CentralCache.cpp
size_t CentralCache::RemoveMemory(size_t size, void*& start, void*& end, int num)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size);
assert(span);
assert(span->_freeList);
start = span->_freeList;
end = start;
int actual = 1;
while (actual < num && NextObj(end) != nullptr)
{
end = NextObj(end);
actual++;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return actual;
}
而在GetOneSpan中,如果没有找到满足要求的Span,则向PageCache申请Span
向PageCache申请之前,其他线程可能要对当前桶申请/释放内存,因此先将桶解锁,申请完Span后,要将新Span插入到当前桶前在进行加锁,从而提高整体的效率
获取到新Span后,根据页数计算出起始地址和结束地址,切分成小块内存,挂到指定的桶中
// CentralCache.cpp
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
Span* cur = spanList.Begin();
while (cur != spanList.End())
{
if (!cur->_freeList) return cur;
cur = cur->_next;
}
// 先将spanList的锁解开
// 因为向PageCache NewSpan期间可能有其他线程来获取/释放内存块
spanList._mtx.unlock();
PageCache::GetSingleton()->_mtx.lock();
Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));
PageCache::GetSingleton()->_mtx.unlock();
// 将新获得的Span对象进行小块内存的切分
char* start = (char*)(span->_page_id << PAGE_SHIFT);
span->_freeList = start;
size_t total = span->_n << PAGE_SHIFT;
char* end = start + total;
void* prev = start;
while (start < end - size)
{
start += size;
NextObj(prev) = start;
prev = start;
}
NextObj(prev) = nullptr;
spanList._mtx.lock();
spanList.PushFront(span);
return span;
}
PageCache
PageCache中,同样也有很多桶,每个桶中都有一个SpanList,挂着多个Span,不同的是,每个Span管理的是以页为单位的大块内存;同时,PageCache使用全局锁,因为如果在当前页没找到Span,会去更大的页找,如果频繁的加锁解锁,会降低整体的效率
// PageCache.h
class PageCache
{
public:
static PageCache* GetSingleton()
{
return &_singleton;
}
// 给CentralCache一个Span对象
Span* NewSpan(size_t kpage);
private:
SpanList _pageLists[NPAGE];
static PageCache _singleton;
PageCache(){}
PageCache(const PageCache&) = delete;
public:
std::mutex _mtx; // 全局锁
};
申请的页数满足: 大内存申请的页数多,小内存申请的页数少
// Communal.h
static size_t NumOfPage(size_t size)
{
size_t num = NumOfMemory(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0) npage = 1;
return npage;
}
在NewSpan中,如果在当前页没有Span可以分配,则在更大的页中找,将大页数分割成两个小页数,如果找到128页还是没有,则向系统申请一个128页的内存
// PageCache.cpp
Span* PageCache::NewSpan(size_t kpage)
{
assert(kpage < NPAGE);
if (!_pageLists[kpage].Empty())
return _pageLists[kpage].PopFront();
for (size_t i = kpage + 1; i < NPAGE; i++)
{
if (_pageLists[i].Empty()) continue;
Span* nSpan = _pageLists[i].PopFront();
Span* kSpan = new Span;
nSpan->_n -= kpage;
kSpan->_n = kpage;
kSpan->_page_id = nSpan->_page_id + nSpan->_n;
_pageLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
// 向系统申请
void* ptr = SystemAlloc(NPAGE - 1);
Span* bigSpan = new Span;
bigSpan->_n = NPAGE - 1;
bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
_pageLists[NPAGE - 1].PushFront(bigSpan);
return NewSpan(kpage);
}
单元测试
// UnitTest.cpp
#include "ConcurrentAlloc.h"
void Alloc1()
{
for (size_t i = 0; i < 5; ++i)
{
void* ptr = ConcurrentAlloc(6);
}
}
void Alloc2()
{
for (size_t i = 0; i < 5; ++i)
{
void* ptr = ConcurrentAlloc(7);
}
}
void TLSTest()
{
std::thread t1(Alloc1);
t1.join();
std::thread t2(Alloc2);
t2.join();
}
void TestConcurrentAlloc1()
{
void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);
void* p4 = ConcurrentAlloc(7);
void* p5 = ConcurrentAlloc(8);
std::cout << p1 << std::endl;
std::cout << p2 << std::endl;
std::cout << p3 << std::endl;
std::cout << p4 << std::endl;
std::cout << p5 << std::endl;
}
void TestConcurrentAlloc2()
{
for (size_t i = 0; i < 1024; ++i)
{
void* p1 = ConcurrentAlloc(16);
std::cout << p1 << std::endl;
}
void* p2 = ConcurrentAlloc(8);
std::cout << p2 << std::endl;
}
int main()
{
//TestConcurrentAlloc1();
//TestConcurrentAlloc2();
//TLSTest();
return 0;
}
6. 释放内存的逻辑
从哪个桶申请的内存,就释放回哪个桶,释放内存时也要计算对应桶的下标
因此,释放内存时需要传递内存大小(暂时的做法)
void ConcurrentFree(void* ptr, size_t size)
{
assert(pThreadCache);
pThreadCache->Free(ptr, size);
}
ThreadCache
// ThreadCache.h
class ThreadCache
{
public:
// 从ThreadCache中获取内存块
void* Allocate(size_t size);
// 向CentralCache获取一定数量的小块内存
void* FetchMemoryFromCentralCache(size_t size);
// 释放内存回ThreadCache
void Free(void* ptr, size_t size);
// 当FreeList中自由链表下的小块内存个数过多,归还给CentralCache
void ListTooLong(FreeList& freeList, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// 每个线程独有一个ThreadCache
static _declspec(thread) ThreadCache* pThreadCache = nullptr;
释放内存回ThreadCache时,直接将内存块挂到对应桶的自由链表下
为了避免一个线程独占过多内存,规定:当桶中自由链表下小块内存的个数超过了该桶的调节因子,就将该桶下的所有小块内存归还给CentralCache
void ThreadCache::Free(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_SIZE);
size_t index = SizeClass::Index(size);
_freeLists[index].PushFront(ptr);
if (_freeLists[index].Size() >= _freeLists[index].Factor())
ListTooLong(_freeLists[index], size);
}
void ThreadCache::ListTooLong(FreeList& freeList, size_t size)
{
void* start = nullptr, * end = nullptr;
freeList.PopRange(start, end, freeList.Size());
CentralCache::GetSingleton()->ReleaseToCentralCache(start, size);
}
CentralCache
要想将小块内存释放回CentralCache,首先得知道当前内存是由CentralCache中哪个Span分出来的
因此,代码中需要有PAGE_ID到Span*的映射关系,因为由内存的起始地址,就能计算出PAGE_ID
PAGE_ID = 内存起始地址 / 一页的大小
使用unordered_map数据结构存储映射关系,将数据结构放到PageCache中,因为在PageCache中也需要
用到PAGE_ID到Span*的映射关系
// PageCache.h
class PageCache
{
// ...
private:
// ...
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
// ...
};
在通过PAEG_ID查找Span*之前,使用PageCache中的全局锁进行加锁
// PageCache.cpp
Span* PageCache::IdToSpan(PAGE_ID id)
{
std::unique_lock<std::mutex> lock(_mtx);
auto it = _idSpanMap.find(id);
if (it == _idSpanMap.end())
{
assert(false);
return nullptr;
}
return it->second;
}
因此,PageCache在分配Span给CentralCache之前,需要先将Span管理的大块内存的所有PAEG_ID与Span*建立映射关系
// PageCache.cpp
Span* PageCache::NewSpan(size_t kpage)
{
// ...
for (size_t i = kpage + 1; i < NPAGE; i++)
{
if (_pageLists[i].Empty()) continue;
Span* nSpan = _pageLists[i].PopFront();
Span* kSpan = new Span;
nSpan->_n -= kpage;
kSpan->_n = kpage;
kSpan->_page_id = nSpan->_page_id + nSpan->_n;
_idSpanMap[nSpan->_page_id] = nSpan;
_idSpanMap[nSpan->_page_id + nSpan->_n - 1] = nSpan;
for (size_t i = 0; i < kSpan->_n; i++)
_idSpanMap[kSpan->_page_id + i] = kSpan;
_pageLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
// ...
}
ReleaseToCentralCache()的逻辑:
先对桶进行加锁,遍历小块内存,首先找到对应的Span,将小块内存头插到Span的自由链表下,Span中的count–,如果count == 0,表示当前Span分配出去的小块内存已全部归还,这时就需要将Span归还给PageCache,归还之前,先对桶解锁,因为归还过程中可能有其他线程对当前桶进行操作,提高效率,归还给PageCache之后,再加锁,依次往复,直到遍历完小块内存,对桶解锁
// CentralCache.cpp
void CentralCache::ReleaseToCentralCache(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
PAGE_ID id = (PAGE_ID)start >> PAGE_SHIFT;
Span* span = PageCache::GetSingleton()->IdToSpan(id);
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_count--;
// 当前span分配给ThreadCache的小块内存已全部归还
if (span->_count == 0)
{
_spanLists[index].Erase(span);
span->_prev = span->_next = nullptr;
span->_freeList = nullptr;
_spanLists[index]._mtx.unlock();
PageCache::GetSingleton()->_mtx.lock();
PageCache::GetSingleton()->ReleaseSpanToPageCache(span);
PageCache::GetSingleton()->_mtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
PageCache
// PageCache.h
class PageCache
{
public:
static PageCache* GetSingleton()
{
return &_singleton;
}
// 给CentralCache一个Span对象
Span* NewSpan(size_t kpage);
// 查找id对应的Span
Span* IdToSpan(PAGE_ID id);
// CetralCache将Span归还给PageCache
void ReleaseSpanToPageCache(Span* span);
private:
SpanList _pageLists[NPAGE];
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
static PageCache _singleton;
PageCache(){}
PageCache(const PageCache&) = delete;
public:
std::mutex _mtx; // 全局锁
};
ReleaseSpanToPageCache()的逻辑:
CentralCache归还给PageCache一个Span,PageCache争取将Span与其他Span合并成一个更大的页,首先向前查找当前Span起始PAGE_ID - 1的Span,再向后查找当前Span结束PAGE_ID + 1的Span,如果遇到一下三种情况,则不合并:
- 要查找的Span不存在
- 找到的Span在被使用中(如果Span被分配给了CentralCache)
- 找到的Span与当前Span合并后的页数大于了128页
如何表示当前Span正在被使用?在Span中添加属性isusing,在CentralCache调用完NewSpan()后,即可将Span的isusing属性置true
// Communal.h
struct Span
{
// ...
bool _isusing = false; // 当前Span是否在被使用
};
// CentralCache.cpp
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
// ...
Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));
span->_isusing = true;
//...
}
由于需要在PageCache中查找Span,因此,挂在PageCache中的Span需要将Span的起始PAGE_ID和结束PAGE_ID添加到_idSpanMap中
// PageCache.cpp
Span* PageCache::NewSpan(size_t kpage)
{
assert(kpage < NPAGE);
if (!_pageLists[kpage].Empty())
{
Span* span = _pageLists[kpage].PopFront();
for (size_t i = 0; i < span->_n; i++)
_idSpanMap[span->_page_id + i] = span;
return span;
}
for (size_t i = kpage + 1; i < NPAGE; i++)
{
if (_pageLists[i].Empty()) continue;
Span* nSpan = _pageLists[i].PopFront();
Span* kSpan = new Span;
nSpan->_n -= kpage;
kSpan->_n = kpage;
kSpan->_page_id = nSpan->_page_id + nSpan->_n;
_idSpanMap[nSpan->_page_id] = nSpan;
_idSpanMap[nSpan->_page_id + nSpan->_n - 1] = nSpan;
for (size_t i = 0; i < kSpan->_n; i++)
_idSpanMap[kSpan->_page_id + i] = kSpan;
_pageLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
// 向系统申请
void* ptr = SystemAlloc(NPAGE - 1);
Span* bigSpan = new Span;
bigSpan->_n = NPAGE - 1;
bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
_pageLists[NPAGE - 1].PushFront(bigSpan);
return NewSpan(kpage);
}
合并完成之后,记得将原来的Span销毁,并将新Span的起始PAGE_ID和结束PAGE_ID添加到_idSpanMap中,Span的使用状态置为false,并加到PageCache中
// PageCache.cpp
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 向前合并
while (1)
{
PAGE_ID prev = span->_page_id - 1;
auto it = _idSpanMap.find(prev);
if (it == _idSpanMap.end()) break;
Span* prevSpan = it->second;
if (prevSpan->_isusing == true) break;
if (prevSpan->_n + span->_n > NPAGE - 1) break;
_pageLists[prevSpan->_n].Erase(prevSpan);
span->_page_id = prevSpan->_page_id;
span->_n += prevSpan->_n;
delete prevSpan;
}
// 向后合并
while (1)
{
PAGE_ID next = span->_page_id + span->_n;
auto it = _idSpanMap.find(next);
if (it == _idSpanMap.end()) break;
Span* nextSpan = it->second;
if (nextSpan == nullptr) break;
if (nextSpan->_isusing == true) break;
if (nextSpan->_n + span->_n > NPAGE - 1) break;
_pageLists[nextSpan->_n].Erase(nextSpan);
span->_n += nextSpan->_n;
delete nextSpan;
}
_idSpanMap[span->_page_id] = span;
_idSpanMap[span->_page_id + span->_n - 1] = span;
span->_isusing = false;
_pageLists[span->_n].PushFront(span);
}
单元测试
// UnitTest.cpp
void MultiThreadAlloc1()
{
std::vector<void*> v;
for (size_t i = 0; i < 1000; ++i)
{
void* ptr = ConcurrentAlloc(6);
v.push_back(ptr);
}
for (auto e : v)
{
ConcurrentFree(e, 6);
}
}
void MultiThreadAlloc2()
{
std::vector<void*> v;
for (size_t i = 0; i < 1000; ++i)
{
void* ptr = ConcurrentAlloc(16);
v.push_back(ptr);
}
for (auto e : v)
{
ConcurrentFree(e, 16);
}
}
void TestMultiThread()
{
std::thread t1(MultiThreadAlloc1);
std::thread t2(MultiThreadAlloc2);
t1.join();
t2.join();
}
int main()
{
TestMultiThread();
return 0;
}
7. 代码测试
// BenchMark.cpp
#include"ConcurrentAlloc.h"
// ntimes 一轮申请和释放内存的次数
// nworks 线程的个数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i], (16 + i) % 8192 + 1);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
int main()
{
size_t n = 10000;
BenchmarkConcurrentMalloc(n, 4, 10);
std::cout << std::endl;
//BenchmarkMalloc(n, 4, 10);
return 0;
}
8. 细节优化
超过32页的内存的申请与释放
如果要申请和释放的内存超过32页,则直接向PageCache申请和释放
而在PageCache中,如果要申请和释放的内存超过128页,则直接向系统申请和释放
// ConcurrentAlloc.h
void* ConcurrentAlloc(size_t size)
{
// 如果size > 32页,直接找PageCache申请
if (size > MAX_SIZE)
{
size_t alignsize = SizeClass::AlignUp(size);
size_t kpage = SizeClass::NumOfPage(alignsize);
PageCache::GetSingleton()->_mtx.lock();
Span* span = PageCache::GetSingleton()->NewSpan(kpage);
PageCache::GetSingleton()->_mtx.unlock();
return (void*)(span->_page_id << PAGE_SHIFT);
}
assert(size <= MAX_SIZE);
if (pThreadCache == nullptr)
pThreadCache = new ThreadCache;
return pThreadCache->Allocate(size);
}
// PageCache.cpp
Span* PageCache::NewSpan(size_t kpage)
{
// kpage > 128,直接向系统申请
if (kpage > NPAGE - 1)
{
void* ptr = SystemAlloc(kpage);
Span* span = new Span;
span->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = kpage;
_idSpanMap[span->_page_id] = span;
return span;
}
// ...
}
// ConcurrentAlloc.h
void ConcurrentFree(void* ptr, size_t size)
{
// 如果size > 32页,直接释放给PageCache
if (size > MAX_SIZE)
{
PAGE_ID id = (PAGE_ID)ptr >> PAGE_SHIFT;
Span* span = PageCache::GetSingleton()->IdToSpan(id);
PageCache::GetSingleton()->_mtx.lock();
PageCache::GetSingleton()->ReleaseSpanToPageCache(span);
PageCache::GetSingleton()->_mtx.unlock();
}
else
{
assert(pThreadCache);
pThreadCache->Free(ptr, size);
}
}
// PageCache.cpp
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > NPAGE - 1)
{
void* ptr = (void*)(span->_page_id >> PAGE_SHIFT);
SystemFree(ptr);
delete span;
return;
}
// ...
}
释放无需指定内存大小
在Span中添加一个_objSize属性,当CentralCache向PageCache后去到新Span时,更新该属性值
// Communal.h
struct Span
{
// ...
size_t _objSize = 0;
};
// CentralCache.cpp
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
// ...
Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));
span->_isusing = true;
span->_objSize = size;
// ...
}
// ConcurrentAlloc.h
void* ConcurrentAlloc(size_t size)
{
// 如果size > 32页,直接找PageCache申请
if (size > MAX_SIZE)
{
size_t alignsize = SizeClass::AlignUp(size);
size_t kpage = SizeClass::NumOfPage(alignsize);
PageCache::GetSingleton()->_mtx.lock();
Span* span = PageCache::GetSingleton()->NewSpan(kpage);
span->_objSize = alignsize;
PageCache::GetSingleton()->_mtx.unlock();
return (void*)(span->_page_id << PAGE_SHIFT);
}
assert(size <= MAX_SIZE);
if (pThreadCache == nullptr)
pThreadCache = new ThreadCache;
return pThreadCache->Allocate(size);
}
void ConcurrentFree(void* ptr)
{
PAGE_ID id = (PAGE_ID)ptr >> PAGE_SHIFT;
Span* span = PageCache::GetSingleton()->IdToSpan(id);
size_t size = span->_objSize;
// 如果size > 32页,直接释放给PageCache
if (size > MAX_SIZE)
{
PageCache::GetSingleton()->_mtx.lock();
PageCache::GetSingleton()->ReleaseSpanToPageCache(span);
PageCache::GetSingleton()->_mtx.unlock();
}
else
{
assert(pThreadCache);
pThreadCache->Free(ptr, size);
}
}
接入定长内存池
将项目中所有使用new和delete的地方改成向定长内存池申请和销毁
class PageCache
{
// ...
ObjectPool<Span> _spanPool;
// ...
};
使用_spanPool修改项目中所有使用new和delete的地方
void* ConcurrentAlloc(size_t size)
{
// ...
static ObjectPool<ThreadCache> pool;
if (pThreadCache == nullptr)
pThreadCache = pool.New();
// ...
}
基数树优化
结果多轮测试,发现我们的高并发内存池总体效率是要比malloc和free要低的
实际上,每次查找PAGE_ID对应的Span时,都需要保证线程安全,频繁的加锁解锁会降低整体的效率,我们的高并发内存池效率低下主要是这个原因
可以使用基数树存储PAGE_ID到Span的映射关系
其本质就是利用数组的下标表示PAGE_ID,数组的内容表示Span,这样查找映射时,既不需要加锁解锁,又能在O(1)的时间内找到
在x86的环境下,系统总内存为232,一页的大小为213,总的页数为2^32 / 2^13 = 219,也就是说总共有219个PAGE_ID
// PageMap.h
template<size_t BITS>
class PageMap
{
public:
PageMap()
{
size_t size = sizeof(void*) * _length;
size_t alignSize = SizeClass::AlignUp(size);
_array = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(_array, 0, alignSize);
}
void set(PAGE_ID id, void* ptr)
{
_array[id] = ptr;
}
void* get(PAGE_ID id)
{
if ((id >> BITS) > 0) return nullptr;
return _array[id];
}
private:
size_t _length = 1 << BITS;
void** _array = nullptr;
};
class PageCache
{
// ...
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
PageMap<32 - PAGE_SHIFT> _idSpanMap;
// ...
};
然后将项目中_idSpanMap替换成基数树中的成员函数即可
// PageCache.cpp
Span* PageCache::NewSpan(size_t kpage)
{
// kpage > 128,直接向系统申请
if (kpage > NPAGE - 1)
{
void* ptr = SystemAlloc(kpage);
Span* span = _spanPool.New();
span->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = kpage;
_idSpanMap.set(span->_page_id, span);
return span;
}
assert(kpage < NPAGE);
if (!_pageLists[kpage].Empty())
{
Span* span = _pageLists[kpage].PopFront();
for (size_t i = 0; i < span->_n; i++)
_idSpanMap.set(span->_page_id + i, span);
return span;
}
for (size_t i = kpage + 1; i < NPAGE; i++)
{
if (_pageLists[i].Empty()) continue;
Span* nSpan = _pageLists[i].PopFront();
Span* kSpan = _spanPool.New();
nSpan->_n -= kpage;
kSpan->_n = kpage;
kSpan->_page_id = nSpan->_page_id + nSpan->_n;
_idSpanMap.set(nSpan->_page_id, nSpan);
_idSpanMap.set(nSpan->_page_id + nSpan->_n - 1, nSpan);
for (size_t i = 0; i < kSpan->_n; i++)
_idSpanMap.set(kSpan->_page_id + i, kSpan);
_pageLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
// 向系统申请
void* ptr = SystemAlloc(NPAGE - 1);
Span* bigSpan = _spanPool.New();
bigSpan->_n = NPAGE - 1;
bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
_pageLists[NPAGE - 1].PushFront(bigSpan);
return NewSpan(kpage);
}
Span* PageCache::IdToSpan(PAGE_ID id)
{
Span* span = (Span*)_idSpanMap.get(id);
return span;
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > NPAGE - 1)
{
void* ptr = (void*)(span->_page_id >> PAGE_SHIFT);
SystemFree(ptr);
_spanPool.Delete(span);
return;
}
// 向前合并
while (1)
{
PAGE_ID prev = span->_page_id - 1;
Span* prevSpan = (Span*)_idSpanMap.get(prev);
if(prevSpan == nullptr) break;
if (prevSpan->_isusing == true) break;
if (prevSpan->_n + span->_n > NPAGE - 1) break;
_pageLists[prevSpan->_n].Erase(prevSpan);
span->_page_id = prevSpan->_page_id;
span->_n += prevSpan->_n;
_spanPool.Delete(prevSpan);
}
// 向后合并
while (1)
{
PAGE_ID next = span->_page_id + span->_n;
Span* nextSpan = (Span*)_idSpanMap.get(next);
if (nextSpan == nullptr) break;
if (nextSpan->_isusing == true) break;
if (nextSpan->_n + span->_n > NPAGE - 1) break;
_pageLists[nextSpan->_n].Erase(nextSpan);
span->_n += nextSpan->_n;
_spanPool.Delete(nextSpan);
}
_idSpanMap.set(span->_page_id, span);
_idSpanMap.set(span->_page_id + span->_n - 1, span);
span->_isusing = false;
span->_objSize = 0;
_pageLists[span->_n].PushFront(span);
}
结果多轮测试,我们的高并发内存池要比malloc和free高效很多
项目位置:高并发内存池