【项目实践】高并发内存池
一、项目介绍
当前项目是实现一个高并发的内存池,它的原型是google的一个开源项目tcmalloc(Thread-Caching Malloc),即线程缓存的malloc。 实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free……)。
本篇博客就是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习tcmalloc的精华。当前项目会用到C/C++、数据结构(链表、哈希桶)、操作系统的内存管理、单例模式、多线程、互斥锁等等方面的知识。有兴趣的小伙伴可以研读一下 tcmalloc源码 。
二、内存池的介绍
2.1 了解池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。 之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,从而大大的提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求;当处理完这个请求,线程又进入睡眠状态。
2.2 内存池的概念
内存池是指程序预先从操作系统申请一块足够大的内存空间。 当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
2.3 内存池解决的问题
内存池主要解决的当然是 效率 和 内存碎片 的问题。
什么是内存碎片?
内存碎片分为外碎片和内碎片。外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
2.4 重新认识malloc
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,malloc就是一个内存池。malloc()相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。下面有几篇关于这块的文章,感兴趣的小伙伴可以去简单看看、了解一下。
一文了解,Linux内存管理,malloc、free 实现原理
malloc()背后的实现原理——内存池
malloc的底层实现(ptmalloc)
三、设计定长内存池
作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池来先熟悉一下简单内存池是如何控制的。
这里我们介绍一下windows和Linux下如何直接向堆申请页为单位的大块内存:
VirtualAlloc
Linux进程分配内存的两种方式–brk() 和mmap()
//FixSizeConcurrentMemoryPool.hpp
#pragma once
#include <cstdlib>
#include <sys/mman.h> // for mmap, munmap
#include <unistd.h> // for getpagesize()
namespace FixSizeMemoryPoolModule
{
//直接去堆上申请空间
inline static void *SystemAlloc(size_t kpage)
{
void *ptr = nullptr;
#ifdef _WIN32 || _WIN64
ptr = VirtualAlloc(0, kpage << 13), MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
//linux下使用mmap实现
size_t pagesize = getpagesize(); // 获取系统页大小(一般是 4096 字节)
size_t bytes = kpage * pagesize; // 计算要分配的总字节数
ptr = mmap(nullptr, bytes, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
//定长内存池的实现
template <typename T>
class FixSizeMemoryPool
{
public:
T *New()
{
T *obj = nullptr;
if (_freelist)
{
// 如果自由链表中存在销毁的结点就直接使用
obj = (T *)_freelist;
_freelist = *((void **)_freelist);
}
else
{
if (_leftsize < sizeof(T))
{
// 当前剩余空间不足就会申请内存
_leftsize = 128 * 1024;
// 这里使用的还是malloc接口,效率已经提高了一些,但是我们可以使用系统调用来完成
//_memory = (char *)malloc(_leftsize);
// 系统调用实现
_memory = (char *)SystemAlloc(_leftsize);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
// 申请内存空间
obj = (T *)_memory;
// 至少要存储一个指针的大小
int objsize = sizeof(T) < sizeof(void *) ? sizeof(void *) : sizeof(T);
_memory += objsize;
_leftsize -= objsize;
}
// 定位new初始化
new (obj) T;
return obj;
}
void Delete(T *obj)
{
// 显示调用析构函数
obj->~T();
// 头插法插入
// void**强转后解引用可以使得每个平台都兼容
*((void **)obj) = _freelist;
_freelist = obj;
}
private:
char *_memory = nullptr; // 指向内存池的指针(char*方便进行加减操作)
int _leftsize = 0; // 剩余的内存空间大小
void *_freelist = nullptr; // 用于回收内存的自由链表
};
}
//test.cc
#include<iostream>
#include<vector>
#include<time.h>
#include"FixSizeConcurrentMemoryPool.hpp"
using std::cout;
using std::endl;
using namespace FixSizeMemoryPoolModule;
//对比使用内存池和直接使用malloc的时间差距
struct TreeNode
{
int _val;
TreeNode *_left;
TreeNode *_right;
TreeNode()
: _val(0), _left(nullptr), _right(nullptr)
{
}
};
void TestFixSizeMemoryPoolModule()
{
// 申请释放的轮次
const size_t Rounds = 5;
// 每轮申请释放多少次
const size_t N = 1000000;
std::vector<TreeNode *> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
FixSizeMemoryPool<TreeNode> TNPool;
std::vector<TreeNode *> v2;
v2.reserve(N);
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < 100000; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "FixSizeMemoryPool cost time:" << end2 - begin2 << endl;
}
int main()
{
TestObjectPool();
return 0;
}
四、设计高并发内存池
对于下面的代码部分,我建议在看的时候,先看小于256KB的申请流程(只看申请),履清从用户开始调用TCMalloc::Malloc之后的步骤,接下来看释放的逻辑。这时的你就会对这个项目有了一个宏观的认识,再然后就结合讲解(在gitee的代码仓库中有思维导图)来理解大于256的逻辑,最后看优化部分,理解基数树的优化就可以对该项目有一个深刻的认识。
4.1 高并发内存池的整体框架
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀了,但是我们项目的原型tcmalloc却是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几个方面的问题:
1.性能问题
2.多线程环境下,锁竞争问题
3.内存碎片问题
• thread cache: 线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
• central cache: 中心缓存是所有线程所共享的,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存而其它线程的内存吃紧的情况,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里的竞争不会很激烈。
• page cache: 页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
//TCMalloc.hpp
//初次查看该代码时,忽略掉大于256KB的内容,只需记住该代码是调用了ThreadCache的函数获取、释放即可,了解完整体结构自然会明白其他的部分
#pragma once
#include "ThreadCache.hpp"
#include "PageCache.hpp"
#include "CorrectionIndex.hpp"
#include "FixSizeConcurrentMemoryPool.hpp"
using namespace ThreadCacheModule;
using namespace PageCacheModule;
using namespace CorrectionIndexModule;
using namespace FixSizeMemoryPoolModule;
struct TCMalloc
{
//提供给外层的获取内存池的接口,保证每个线程都有
static void* Malloc(size_t size)
{
//我们的Page Cache的大小是128*8K,因此这里我们的内存申请可以分成三个部分:
// 0~32页(0~256KB) 33~128页(256KB + 1~1024KB) 大于128页(1024KB+1)
//对应的申请内存的方式: 逐层申请 向PageCache申请 向堆申请(在Page Cache中进行)
if (size > MAXMEMORYSIZE)
{
// 算出对齐数
size_t align_num = CorrectionIndex::AlignedNum(size);
//算出所需的页数
size_t page_num = CorrectionIndex::PageNum(align_num);
//直接从page cache中申请
PageCache::GetInstance()->GetMutex().lock();
Span* span=PageCache::GetInstance()->GetSpan(page_num);
span->_size = size;
PageCache::GetInstance()->GetMutex().unlock();
return (void*)(span->_id << PAGESIZESHIFT);
}
else
{
// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
static FixSizeMemoryPool< ThreadCache> tcPool;
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();
}
return pTLSThreadCache->Allocate(size);
}
}
static void Free(void* ptr)
{
//地址获得span,span得到size
Span* span = PageCache::GetInstance()->IdToSpan(ptr);
size_t size = span->_size;
if (size > MAXMEMORYSIZE)
{
PageCache::GetInstance()->GetMutex().lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->GetMutex().unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
};
4.2 thread cache 的整体框架
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
设计思路:
申请内存:
• 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
• 如果自由链表_freeLists[i]中有对象,则直接pop一个内存对象返回;如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
• 当释放内存小于256KB时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象push到_freeLists[i]。
• 当链表的长度过长时,回收一部分内存对象到central cache。
//FreeList.hpp
//本文件主要用于对自由链表进行管理
#pragma once
#include <assert.h>
namespace FreeListModule
{
//自由链表的管理
class FreeList
{
public:
void Push(void*& obj)
{
//头插法
*((void**)obj) = _freelist;
_freelist = obj;
_size++;
}
void Push(void*& start, void*& end, size_t num)
{
*((void**)end) = _freelist;
_freelist = start;
_size += num;
}
void* Pop()
{
//头删法
assert(_freelist);
void* obj = _freelist;
_freelist = *((void**)_freelist);
_size--;
return obj;
}
void Pop(void*& begin, void*& end, size_t num)
{
//取出num个内存单元返回
begin = _freelist;
end = begin;
for (int i = 0; i < num - 1; i++)
{
end = *(void**)end;
}
_freelist = *(void**)end;
*(void**)end = nullptr;
_size -= num;
}
bool Empty()
{
return _freelist == nullptr;
}
size_t& Count()
{
return _count;
}
size_t Size()
{
return _size;
}
private:
size_t _count = 1; //慢增长次数
size_t _size = 0; //实际个数
void* _freelist = nullptr; //自由链表
};
}
//CorrectionIndex.hpp
//该类主要用于计算一些常见数据,如对齐数,桶号……
//该文件是一次性给出,后续的文件在算其他与数字相关的数据时也会包含该文件
#pragma once
#include <stddef.h>
#include <assert.h>
namespace CorrectionIndexModule
{
static const size_t MAXMEMORYSIZE = 256 * 1024; //最大内存
static const size_t PAGESIZESHIFT = 13; //页的大小为8*1024,这里仅用2的幂次表示
static const size_t MAXBUCKETNUM = 208; //桶的个数
static const size_t MAXPAGESIZE = 129; //页的最大个数 128
//修正指数类
class CorrectionIndex
{
public:
//对齐数
static size_t AlignedNum(size_t byte, int alignment_rules)
{
//位运算效率更高
return (((byte)+alignment_rules - 1) & ~(alignment_rules - 1));
}
//桶号
static size_t BucketNum(size_t byte, int bucket_shift_rules)
{
//位运算效率更高
return ((byte + (1 << bucket_shift_rules) - 1) >> bucket_shift_rules) - 1;
}
//对齐数的修正
static size_t AlignedNum(size_t byte)
{
//我们这里采用这样的对齐规则可以使得整体控制在最多10%左右的内碎片浪费
// 数字范围 对齐规则 对应的桶
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
if (byte <= 128)
{
return AlignedNum(byte, 8);
}
else if (byte <= 1024)
{
return AlignedNum(byte, 16);
}
else if (byte <= 8 * 1024)
{
return AlignedNum(byte, 128);
}
else if (byte <= 64 * 1024)
{
return AlignedNum(byte, 1024);
}
else if (byte <= 256 * 1024)
{
return AlignedNum(byte, 8 * 1024);
}
else
{
return AlignedNum(byte, 1<<PAGESIZESHIFT);
}
}
//所在桶的编号
static size_t BucketNum(size_t byte)
{
int bucketarray[] = { 0,16,72,128,184 };
if (byte <= 128)
{
return BucketNum(byte, 3) + bucketarray[0];
}
else if (byte <= 1024)
{
return BucketNum(byte - 128, 4) + bucketarray[1];
}
else if (byte <= 8 * 1024)
{
return BucketNum(byte - 1024, 7) + bucketarray[2];
}
else if (byte <= 64 * 1024)
{
return BucketNum(byte - 8 * 1024, 10) + bucketarray[3];
}
else if (byte <= 256 * 1024)
{
return BucketNum(byte - 64 * 1024, 13) + bucketarray[4];
}
else
{
assert(false);
return -1;
}
}
//申请的内存的个数
static size_t ObtainedNum(size_t size)
{
//为直接从PageCaceh申请计算的
if (size > MAXMEMORYSIZE)
return 1;
//最多给512,最少给两个
//内存块小就多给点,内存块大就少给点
size_t retnum = MAXMEMORYSIZE / size;
if (retnum > 512)
return 512;
if (retnum < 2)
return 2;
return retnum;
}
//申请的页数
static size_t PageNum(size_t size)
{
size_t memorynum = ObtainedNum(size);
size_t pagenum = ((memorynum * size) >> PAGESIZESHIFT);
if (!pagenum) pagenum++;
return pagenum;
}
};
}
//ThreadCache.hpp
//该文件是第一层文件,
#pragma once
#include <assert.h>
#include "FreeList.hpp"
#include "CorrectionIndex.hpp"
#include "CentralCache.hpp"
namespace ThreadCacheModule
{
using namespace FreeListModule;
using namespace CorrectionIndexModule;
using namespace CentralCacheModule;
class ThreadCache
{
public:
// 申请内存对象
void* Allocate(size_t size)
{
assert(size < MAXMEMORYSIZE);
// 算出对齐数
size_t align_num = CorrectionIndex::AlignedNum(size);
// 算出在哪个桶中
size_t bucket_num = CorrectionIndex::BucketNum(size);
// 开始申请内存
if (!_freelist[bucket_num].Empty())
{
// 如果ThreadCache中存在这样大小的内存,就直接返回一个
return _freelist[bucket_num].Pop();
}
else
{
// 说明当前ThreadCache中不存在这样大小的内存片,只能从下层的CentealCache中获取
return FetchFromCentralCache(bucket_num, align_num);
}
}
// 从CentralCache获取内存对象,size这里是已经对齐后的
void* FetchFromCentralCache(size_t bucket_num, size_t size)
{
//使用慢调节算法
size_t obtainednum = min(_freelist[bucket_num].Count(), CorrectionIndex::ObtainedNum(size));
if (obtainednum == _freelist[bucket_num].Count())
_freelist[bucket_num].Count()++;
//从Central Cache获取多个内存块,使用obtainednum更新实际获取的内存块个数
void* start = nullptr, * end = nullptr;
obtainednum = CentralCache::GetInstance()->FetchRangeObj(start, end, obtainednum, size);
if (obtainednum == 1)
{
assert(start == end);
return start;
}
else if (obtainednum > 1)
{
//将多余的内存块用自由链表管理起来
_freelist[bucket_num].Push(*((void**)start), end, obtainednum-1);
return start;
}
else
{
assert(false);
return nullptr;
}
}
// 释放内存对象
void Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAXMEMORYSIZE);
// 直接将内存片放回到freelist中
size_t bucket_num = CorrectionIndex::BucketNum(size);
_freelist[bucket_num].Push(ptr);
//当长度过大时就会释放回Central Cache
if (_freelist[bucket_num].Size() >= _freelist[bucket_num].Count())
{
ListTooLong(_freelist[bucket_num], size);
}
}
void ListTooLong(FreeList& freelist, size_t size)
{
void* begin = nullptr, * end = nullptr;
//取得要被释放的区间长度
freelist.Pop(begin, end, freelist.Count());
CentralCache::GetInstance()->ReleaseListToSpans(begin, size);
}
private:
FreeList _freelist[MAXBUCKETNUM]; // 自由列表桶
};
// 每个线程都要有对应的ThreadCache,这就需要使用到线程的局部性原理
// 这里不在条件编译Linux或者Windows系统么认识使用C++11标准下定义的关键字
thread_local ThreadCache* pTLSThreadCache = nullptr;
}
4.3 central cache 的整体框架
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是它的每个哈希桶位置挂是SpanList链表结构,每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
设计思路:
单例模式
我们这里的central cache是被多个thread cache共享使用的,故而需要设计为单例模式
申请内存:
• 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
• central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
• central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count
释放内存:
• 当thread_cache过长或者线程销毁,则会将内存释放回central cache中,释放回来时--use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
//SpanList.hpp
#pragma once
#include<mutex>
#include<assert.h>
namespace SpanListModule
{
using PageId = size_t;
struct Span
{
PageId _id = 0; //页号
size_t _num = 0; //页的大小
Span* _next = nullptr; //下一个页
Span* _prev = nullptr; //前一个页
int _usernum = 0; //当前使用者的数量
void* _freelist = nullptr; //切好的自由链表
bool _isuse = false; //是否在被使用
size_t _size; //存储对象的大小
};
class SpanList
{
public:
SpanList()
{
_headspan = new Span;
_headspan->_next = _headspan;
_headspan->_prev = _headspan;
}
Span* Begin()
{
return _headspan->_next;
}
Span* End()
{
return _headspan;
}
void Insert(Span* pos, Span* newspan)
{
assert(pos);
assert(newspan);
Span* prev = pos->_prev;
prev->_next = newspan;
newspan->_prev = prev;
newspan->_next = pos;
pos->_prev = newspan;
}
void Erase(Span* pos)
{
assert(pos);
Span* next = pos->_next;
Span* prev = pos->_prev;
prev->_next = next;
next->_prev = prev;
}
void Pushfront(Span* pos)
{
Insert(Begin(), pos);
}
Span* Popfront()
{
Span* front = _headspan->_next;
Erase(front);
return front;
}
~SpanList()
{
}
std::mutex& GetMutex()
{
return _lock;
}
bool Empty()
{
return _headspan == _headspan->_next;
}
private:
Span* _headspan;
std::mutex _lock; //这是一个桶锁,交由Central Cache使用
};
}
//CentralCache.hpp
#pragma once
#include<Windows.h>
#include "SpanList.hpp"
#include "CorrectionIndex.hpp"
#include "PageCache.hpp"
namespace CentralCacheModule
{
using namespace SpanListModule;
using namespace CorrectionIndexModule;
using namespace PageCacheModule;
//单例模式
class CentralCache
{
private:
CentralCache() {}
CentralCache(const CentralCache&) = delete;
CentralCache& operator==(const CentralCache&) = delete;
//在spanlist中找到一个大块Span,没有的话向Page Cache申请
Span* GetOneSpan(SpanList& spanlist, size_t size)
{
Span* begin = spanlist.Begin();
while (begin != spanlist.End())
{
if (begin->_freelist) return begin;
begin = begin->_next;
}
spanlist.GetMutex().unlock();
//从page cache中获取新的页,防止同时向Page Cache申请需要加锁
PageCache::GetInstance()->GetMutex().lock();
size_t pagenum = CorrectionIndex::PageNum(size);
Span* newspan = PageCache::GetInstance()->GetSpan(pagenum);
newspan->_isuse = true;//标识正在被使用
newspan->_size = size;//记录内存块的大小
PageCache::GetInstance()->GetMutex().unlock();
//对获取的页进行切割
char* start = (char*)(newspan->_id << PAGESIZESHIFT);//char* 方便切割
if (IsBadWritePtr(start, sizeof(void*)))
{
std::cerr << "Error: Invalid memory address: " << (void*)start << std::endl;
throw std::runtime_error("Invalid memory address");
}
char* end = start + (newspan->_num << PAGESIZESHIFT);
newspan->_freelist = start;
start += size;
void* tail = newspan->_freelist;
while (start < end)
{
*((void**)tail) = start;
tail = start;
start += size;
}
*((void**)tail) = nullptr;
//将Span挂到SpanList中
spanlist.GetMutex().lock();
spanlist.Insert(spanlist.Begin(), newspan);
return newspan;
}
public:
//返回单例
static CentralCache* GetInstance()
{
return &_instance;
}
// 从Central Cache获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
//计算桶号
size_t bucketnum = CorrectionIndex::BucketNum(size);
//实际获取的数量,注意:这里是从1开始
size_t actuallynum = 1;
//上桶锁
_spanlists[bucketnum].GetMutex().lock();
//获取大块span
Span* span = GetOneSpan(_spanlists[bucketnum], size);
//获取一定数量的内存对象
start = span->_freelist;
end = start;
for (int i = 0; i < batchNum - 1/*这样的话刚好可以使得end指向最后一个切割的位置*/ && *((void**)end); i++)
{
end = *((void**)end);
++actuallynum;
}
//更新节点关系
span->_freelist = *((void**)end);
*((void**)end) = nullptr;
span->_usernum += actuallynum;
_spanlists[bucketnum].GetMutex().unlock();
//返回实际获取到的内存片数量
return actuallynum;
}
//将Thread Cache中的内存块释放回Central Cache
void ReleaseListToSpans(void* start, size_t size)
{
//获取桶号
size_t bucketnum = CorrectionIndex::BucketNum(size);
//上桶锁
_spanlists[bucketnum].GetMutex().lock();
//将该区间的内存依次挂载到freelist中
while (start)
{
void* next = *(void**)start;
Span* span = PageCache::GetInstance()->IdToSpan(start);
//头插
*((void**)start) = span->_freelist;
span->_freelist = start;
span->_usernum--;
if (span->_usernum == 0)
{
//清空变量
_spanlists[bucketnum].Erase(span);
span->_freelist = span->_next = span->_prev = nullptr;
// 释放span给page cache时,使用page cache的锁就可以了
// 这时把桶锁解掉
_spanlists[bucketnum].GetMutex().unlock();
//这时候就可以将span块还给page cache了
PageCache::GetInstance()->GetMutex().lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->GetMutex().unlock();
_spanlists[bucketnum].GetMutex().lock();
}
start = next;
}
_spanlists[bucketnum].GetMutex().unlock();
}
private:
SpanList _spanlists[MAXBUCKETNUM]; //自由链表桶
static CentralCache _instance; //单例对象
};
CentralCache CentralCache::_instance; //单例对象实例化
}
4.4 page cache 的整体框架
page cache也是一个哈希结构,它其中存储的也是spanlists。需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,它的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
设计思路:
申请内存:
• 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
• 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
释放内存:
• 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
//SyastemCall.hpp
//该部分代码仅需理解基本逻辑即可,这一部分代码就是原生的系统调用接口
#pragma once
#ifdef _WIN32
#include<Windows.h>
#elif
#include <sys/mman.h> // for mmap, munmap
#include <unistd.h> // for getpagesize()
#endif
namespace SystemCallModule
{
//直接去堆上申请空间
inline static void* SystemAlloc(size_t kpage)
{
void* ptr = nullptr;
#ifdef _WIN32
ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
//linux下使用mmap实现
size_t pagesize = getpagesize(); // 获取系统页大小(一般是 4096 字节)
size_t bytes = kpage * pagesize; // 计算要分配的总字节数
ptr = mmap(nullptr, bytes, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
#endif
if (ptr == nullptr)
assert(false);
return ptr;
}
// 释放内存
inline static void SystemFree(void* ptr, size_t kpage)
{
#ifdef _WIN32
// Windows 直接释放
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// Linux/macOS 使用 munmap
if (ptr != MAP_FAILED && ptr != nullptr)
{
size_t pagesize = getpagesize();
size_t bytes = kpage * pagesize;
munmap(ptr, bytes);
}
#endif
}
}
//SpanList.hpp
#pragma once
#include<mutex>
#include<unordered_map>
#include "CorrectionIndex.hpp"
#include "SpanList.hpp"
#include "SystemCall.hpp"
//初次查看代码时,先忽略掉下面两个文件的内容引用,即看注释的部分即可,下面会对这个文件的引入做出说明
#include "FixSizeConcurrentMemoryPool.hpp"
#include "PageMap.hpp"
namespace PageCacheModule
{
using PAGE_ID = size_t;
using namespace CorrectionIndexModule;
using namespace SpanListModule;
using namespace SystemCallModule;
using namespace FixSizeMemoryPoolModule;
//单例模式
class PageCache
{
PageCache() {}
PageCache(const PageCache&) = delete;
PageCache operator==(const PageCache&) = delete;
public:
static PageCache* GetInstance()
{
return &_instance;
}
Span* GetSpan(size_t pagenum)
{
//当申请的页数大于128时
if (pagenum >= MAXPAGESIZE)
{
void* ptr = SystemAlloc(pagenum);
//Span* span = new Span;
Span* span = _spanpool.New();
span->_id = ((PageId)span >> PAGESIZESHIFT);
span->_num = pagenum;
//_idtoSpan[span->_id] = span;
_idSpanMap.set(span->_id, span);
return span;
}
//对应的pagenum位置要是存在还可使用的page就返回
if (!_spanlists[pagenum].Empty())
{
Span* targetPage = _spanlists[pagenum].Popfront();
//建立targetPage中Id与span的映射关系
for (size_t i = 0; i < targetPage->_num; i++)
{
//_idtoSpan[targetPage->_id + i] = targetPage;
_idSpanMap.set(targetPage->_id + i, targetPage);
}
return targetPage;
}
//找到使用大的Page进行切分
for (int i = pagenum + 1; i < MAXPAGESIZE; i++)
{
if (!_spanlists[i].Empty())
{
Span* otherPage = _spanlists[i].Popfront();
//Span* targetPage = new Span;
Span* targetPage = _spanpool.New();
targetPage->_id = otherPage->_id;
targetPage->_num = pagenum;
otherPage->_id += pagenum;
otherPage->_num -= pagenum;
_spanlists[otherPage->_num].Pushfront(otherPage);
//建立targetPage中Id与span的映射关系
for (size_t i = 0; i < targetPage->_num; i++)
{
//_idtoSpan[targetPage->_id + i] = targetPage;
_idSpanMap.set(targetPage->_id + i, targetPage);
}
return targetPage;
}
}
//只能从系统申请空间了
//Span* newspan = new Span;
Span* newspan = _spanpool.New();
void* ptr = SystemAlloc(MAXPAGESIZE - 1);
newspan->_id = ((PAGE_ID)ptr >> PAGESIZESHIFT);
newspan->_num = MAXPAGESIZE - 1;
_spanlists[newspan->_num].Pushfront(newspan);
return GetSpan(pagenum);
}
std::mutex& GetMutex()
{
return _lock;
}
Span* IdToSpan(void* addr)
{
PageId id = ((PageId)addr >> PAGESIZESHIFT);
//加锁
//std::unique_lock<std::mutex> lock(_lock);
//auto ret = _idtoSpan.find(id);
auto ret = (Span*)_idSpanMap.get(id);
//if (ret == _idtoSpan.end())
if (ret == nullptr)
{
assert(false);
return nullptr;
}
else
{
//return ret->second;
return ret;
}
}
void ReleaseSpanToPageCache(Span* span)
{
//关于大于128页的情况
if (span->_num >= MAXPAGESIZE)
{
void* ptr = (void*)(span->_id << PAGESIZESHIFT);
SystemFree(ptr,span->_num);
//delete span;
_spanpool.Delete(span);
}
else
{
// 对span前后的页,尝试进行合并,缓解内存碎片问题
while (true)
{
PAGE_ID prevId = span->_id - 1;
//auto ret = _idtoSpan.find(prevId);
auto ret = (Span*)_idSpanMap.get(prevId);
// 前面的页号没有,不合并了
/*if (ret == _idtoSpan.end())
{
break;
}*/
if (ret == nullptr)
{
break;
}
// 前面相邻页的span在使用,不合并了
//Span* prevSpan = ret->second;
Span* prevSpan = ret;
if (prevSpan->_isuse == true)
{
break;
}
// 合并出超过128页的span没办法管理,不合并了
if (prevSpan->_num + span->_num > MAXPAGESIZE - 1)
{
break;
}
span->_id = prevSpan->_id;
span->_num += prevSpan->_num;
_spanlists[prevSpan->_num].Erase(prevSpan);
//delete prevSpan;
_spanpool.Delete(prevSpan);
}
// 向后合并
while (1)
{
PAGE_ID nextId = span->_id + span->_num;
//auto ret = _idtoSpan.find(nextId);
//if (ret == _idtoSpan.end())
//{
// break;
//}
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)
{
break;
}
//Span* nextSpan = ret->second;
Span* nextSpan = ret;
if (nextSpan->_isuse == true)
{
break;
}
if (nextSpan->_num + span->_num > MAXPAGESIZE - 1)
{
break;
}
span->_num += nextSpan->_num;
_spanlists[nextSpan->_num].Erase(nextSpan);
//delete nextSpan;
_spanpool.Delete(nextSpan);
}
_spanlists[span->_num].Pushfront(span);
span->_isuse = false;
//_idtoSpan[span->_id] = span;
//_idtoSpan[span->_id + span->_num - 1] = span;
_idSpanMap.set(span->_id, span);
_idSpanMap.set(span->_id + span->_num - 1, span);
}
}
private:
SpanList _spanlists[MAXPAGESIZE];
//std::unordered_map<PageId, Span*> _idtoSpan;
TCMalloc_PageMap1<32 - PAGESIZESHIFT> _idSpanMap;//32位机器下的pagemap
std::mutex _lock; //全局锁
static PageCache _instance; //单例模式
FixSizeMemoryPool<Span> _spanpool;//用以替换new和delete
};
PageCache PageCache::_instance;
}
4.5 对当前内存池的优化
先给出一段测试代码:
#include<iostream>
#include<atomic>
#include<mutex>
#include"TCMalloc.hpp"
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(TCMalloc::Malloc(16));
//v.push_back(TCMalloc::Malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
TCMalloc::Free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
int main()
{
size_t n = 100000;
std::cout << "==========================================================" << std::endl;
BenchmarkConcurrentMalloc(n, 4, 10);
std::cout << std::endl << std::endl;
BenchmarkMalloc(n, 4, 10);
std::cout << "==========================================================" << std::endl;
return 0;
}
当将我们已有的代码(注释的部分)进行测试的时候会发现我们的TCMalloc的时间消耗是要远大于malloc的,这和我们的初衷是相悖的,那为什么我们的代码会耗时更长呢?
这时我们就可以使用性能测试工具来进行测试,VS2022上就有对应的测试工具(Alt+F2):
经过分析我们可以看到在PageCache中调用每个函数的时间消耗是最大的,这是因为我们在PageCaceh中使用了哈希桶结构,我们的STL容器它不是线程安全的,这也就是我们上面加桶锁的原因,加锁就是导致我们耗时的原因之一。因此我们可以采用基数树的方式进行ID与Span的映射来避免加锁:
//PageMap.hpp
#pragma once
#include"CorrectionIndex.hpp"
#include"FixSizeConcurrentMemoryPool.hpp"
using namespace CorrectionIndexModule;
using namespace FixSizeMemoryPoolModule;
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;//长度
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = CorrectionIndex::AlignedNum(size, 1 << PAGESIZESHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGESIZESHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static FixSizeMemoryPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};
这样的话大家也就理解了为什么我上面的函数中使用的都是get,set系列的接口来确定哈希的映射关系。
除了上面这个原因之外,我们的TCMalloc应该脱离malloc new这样的调用接口,以避免性能的算是,因此我们采用了上面设计的定长内存池来满足我们的malloc操作,从而使得我们的TCMalloc彻底的脱离了malloc这种语言级接口。
最后附上我们的项目成功展示:
可见,在多线程情况下使用TCMalloc将效率提高了至少50%。
至于源码与思维导图请看我的gitee仓库。