仿tcmalloc高并发内存池
文章目录
- 项目介绍
- 项目要求
- 什么是内存池
- 定长内存池的实现
- 高并发内存池整体框架设计
- threadcache
- threadcache整体设计
- threadcache哈希桶映射对齐规则
- threadcacheTLS无锁访问
- centralcache
- centralcache整体设计
- centralcache结构设计
- centralcache核心实现
- pagecache
- pagecache整体设计
- pagecache中获取Span
- 申请内存过程联调
- 出现的问题
- 测试一
- 测试二
- threadcache回收内存
- centralcache回收内存
- pagecache回收内存
- 释放内存过程联调
- 大于256KB内存申请问题
- 使用定长内存池配合脱离使用new
- 释放对象时优化为不传对象大小
- 多线程环境下对比malloc测试
- 复杂问题的调试技巧
- 性能瓶颈分析
- 针对性能瓶颈使用基数树进行优化
- 使用基数树进行优化代码实现
- 源码
项目介绍
本项目实现的是一个高并发的内存池,它的原型是Google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替换系统的内存分配相关函数malloc和free。这个项⽬是把tcmalloc最核⼼的框架简化后拿出来,模拟实现出⼀个⾃⼰的⾼并发内存池。
项目要求
这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的知识。
什么是内存池
池化技术
池化技术(Pooling Technique)是一种资源管理策略,通过预先分配和复用资源(如内存、线程、对象、连接等),减少资源创建和销毁的开销,从而提高系统性能和资源利用率。常见的池化技术有:内存池,线程池,连接池,对象池,缓存池等。
内存池
内存池(Memory Pool)是一种内存管理技术,用于高效地分配和释放内存资源。它通过预先分配一块较大的内存空间,并将其划分为多个固定大小的内存块,从而避免频繁地向操作系统申请和释放内存,提高内存分配的效率和性能。
内存池主要解决的问题
内存池主要解决的就是效率的问题,它能够避免让程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还需要尝试解决内存碎片的问题。
内存碎片分为内部碎片和外部碎片:
- 外部碎片是一些空闲的小块内存区域,由于这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配申请需求。
- 内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
注意: 内存池尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生。
malloc
C/C++中我们要动态申请内存并不是直接去堆申请的,而是通过malloc函数去申请的,包括C++中的new实际上也是封装了malloc函数的。
我们申请内存块时是先调用malloc,malloc再去向操作系统申请内存。malloc实际就是一个内存池,malloc相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用,当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如Windows的VS系列中的malloc就是微软自行实现的,而Linux下的gcc用的是glibc中的ptmalloc。
定长内存池的实现
malloc其实就是一个通用的内存池,在什么场景下都可以使用,但这也意味着malloc在什么场景下都不会有很高的性能,因为malloc并不是针对某种场景专门设计的。
定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们可以将其性能做到极致,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们申请/释放的都是固定大小的内存块。
我们可以通过实现定长内存池来熟悉一下对简单内存池的控制,其次,这个定长内存池后面会作为高并发内存池的一个基础组件。
实现定长
在实现定长内存池时要做到“定长”有很多种方法,比如我们可以使用非类型模板参数,使得在该内存池中申请到的对象的大小都是N。
//定长内存池
template<size_t N>
class ObjectPool
{};
此外,定长内存池也叫做对象池,在创建对象池时,对象池可以根据传入的对象类型的大小来实现“定长”,因此我们可以通过使用模板参数来实现“定长”,比如创建定长内存池时传入的对象类型是int,那么该内存池就只支持4字节大小内存的申请和释放。
template<class T>
class ObjectPool
{};
直接向堆申请空间
既然是内存池,那么我们首先得向系统申请一块内存空间,然后对其进行管理。要想直接向堆申请内存空间,在Windows下,可以调用VirtualAlloc函数;在Linux下,可以调用brk或mmap函数。
#ifdef _WIN32#include <Windows.h>
#else
//
#endif//在堆上按页申请空间 4KB的整数倍 -> 8KB
inline static void* SystemAlloc(size_t kpage)
{
//windows
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
//Linux
#else//Linux下的brk,mmap
#endifif (ptr == nullptr){throw std::bad_alloc();}return ptr;
}
这里我们可以通过条件编译将对应平台下向堆申请内存的函数进行封装,此后我们就不必再关心当前所在平台,当我们需要直接向堆申请内存时直接调用我们封装后的SystemAlloc函数即可。
定长内存池中的成员变量
对于向堆申请到的大块内存,我们可以用一个指针来对其进行管理,但仅用一个指针肯定是不够的,我们还需要用一个变量来记录这块内存的长度。
由于此后我们需要将这块内存进行切分,为了方便切分操作,指向这块内存的指针最好是字符指针,因为指针的类型决定了指针向前或向后走一步有多大距离,对于字符指针来说,当我们需要向后移动n个字节时,直接对字符指针进行加n操作即可。
其次,释放回来的定长内存块也需要被管理,我们可以将这些释放回来的定长内存块链接成一个链表,这里我们将管理释放回来的内存块的链表叫做自由链表,为了能找到这个自由链表,我们还需要一个指向自由链表的指针。
因此,定长内存池当中包含三个成员变量:
- _memory:指向内存块的指针
- _remainBytes:切分内存块后剩余字节数
- _freeList:还回来的内存块链接的自由链表的头指针
内存池如何管理释放的对象?
对于还回来的定长内存块,我们可以用自由链表将其链接起来,但我们并不需要为其专门定义链式结构,我们可以让内存块的前4个字节(32位平台)或8个字节(64位平台)作为指针,存储后面内存块的起始地址即可。
因此在向自由链表插入被释放的内存块时,先让该内存块的前4个字节或8个字节存储自由链表中第一个内存块的地址,然后再让_freeList指向该内存块即可,也就是一个简单的链表头插操作。
如何让一个指针在32位平台下解引用后能向后访问4个字节,在64位平台下解引用后能向后访问8个字节?
首先我们得知道,32位平台下指针的大小是4个字节,64位平台下指针的大小是8个字节。而指针指向数据的类型,决定了指针解引用后能向后访问的空间大小,因此我们这里需要的是一个指向指针的指针,这里使用二级指针就行了。
void* obj;
*(void**)obj;//解引用后向后访问void*长度的数据
需要注意的是,在释放对象时,我们应该显示调用该对象的析构函数清理该对象,因为该对象可能还管理着其他某些资源,如果不对其进行清理那么这些资源将无法被释放,就会导致内存泄漏。
void Delete(T* obj)
{//显示调用T的析构函数obj->~T();//头插*(void**)obj = _freeList;_freeList = obj;
}
内存池如何为我们申请对象?
当我们申请对象时,内存池应该优先把还回来的内存块对象再次重复利用,因此如果自由链表当中有内存块的话,就直接从自由链表头删一个内存块进行返回即可。
如果自由链表当中没有内存块,那么我们就在大块内存中切出定长的内存块进行返回,当内存块切出后及时更新_memory指针的指向,以及_remainBytes的值即可。
需要特别注意的是,由于当内存块释放时我们需要将内存块链接到自由链表当中,因此我们必须保证切出来的对象至少能够存储得下一个地址,所以当对象的大小小于当前所在平台指针的大小时,需要按指针的大小进行内存块的切分。
此外,当大块内存已经不足以切分出一个对象时,我们就应该调用我们封装的SystemAlloc函数,再次向堆申请一块内存空间,此时也要注意及时更新_memory指针的指向,以及_remainBytes的值。
需要注意的是,与释放对象时需要显示调用该对象的析构函数一样,当内存块切分出来后,我们也应该使用定位new,显示调用该对象的构造函数对其进行初始化。
T* New()
{T* obj = nullptr;//优先使用回来的内存块对象if (_freeList != nullptr){void* next = *(void**)_freeList;obj = (T*)_freeList;_freeList = next;}else{//剩余空间不够一个完整的T大小,开辟新的空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;//_memory = (char*)malloc(_remainBytes);_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;//预防T类型空间不够void*大小size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}//定位new,显示调用T的构造函数进行初始化new(obj) T;return obj;
}
定长内存池整体代码
#pragma once
#include <iostream>
#include <vector>using std::cout;
using std::endl;#ifdef _WIN32#include <Windows.h>
#else
//
#endif//定长内存池
//template<size_t N>
//class ObjectPool
//{
//
//};//在堆上按页申请空间 4KB的整数倍 -> 8KB
inline static void* SystemAlloc(size_t kpage)
{
//windows
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
//Linux
#else//Linux下的brk,mmap
#endifif (ptr == nullptr){throw std::bad_alloc();}return ptr;
}template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;//优先使用回来的内存块对象if (_freeList != nullptr){void* next = *(void**)_freeList;obj = (T*)_freeList;_freeList = next;}else{//剩余空间不够一个完整的T大小,开辟新的空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;//_memory = (char*)malloc(_remainBytes);_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;//预防T类型空间不够void*大小size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}//定位new,显示调用T的构造函数进行初始化new(obj) T;return obj;}void Delete(T* obj){//显示调用T的析构函数obj->~T();//头插*(void**)obj = _freeList;_freeList = obj;}private:char* _memory = nullptr;//指向内存块的指针size_t _remainBytes = 0;//切分内存块后剩余字节数void* _freeList = nullptr;//还回来的内存块链接的自由链表的头指针
};
高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本⾝其实已经很优秀,但是在并发场景下可能会因为频繁的加锁和解锁导致效率有所降低,而该项目的原型是基于tcmalloc实现的一种在多线程高并发场景下更胜一筹的内存池。
在实现内存池时我们一般需要考虑到性能问题和内存碎片的问题,但对于高并发内存池来说,我们还需要考虑在多线程环境下的锁竞争问题。
高并发内存池主要由以下三个部分构成:
-
thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
-
central cache:中心缓存是所有线程共享,thread cache是按需从central cache中获取对象。central cache要在合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,但这里用的是
桶锁
,其次只有thread cache没有内存对象时才会找central cache,所以这里竞争不会很激烈。 -
page cache:页缓存是在central cache缓存下面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。page cache访问是需要加锁的,这里不能是桶锁,而是需要锁住整个page cache。
threadcache
threadcache整体设计
定长内存池只支持固定大小内存块的申请释放,因此定长内存池中只需要一个自由链表管理释放回来的内存块。现在我们要支持申请和释放不同大小的内存块,那么我们就需要多个自由链表来管理释放回来的内存块,因此thread cache实际上一个哈希桶结构,每个桶中存放的都是一个自由链表。
thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。
这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如我们让这些字节数都按照8字节进行向上对齐,那么thread cache的结构就是下面这样的,此时当线程申请1~ 8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推。
因此当线程要申请某一大小的内存块时,就需要经过某种计算得到对齐后的字节数,进而找到对应的哈希桶,如果该哈希桶中的自由链表中有内存块,那就从自由链表中头删一个内存块进行返回;如果该自由链表已经为空了,那么就需要向下一层的central cache进行获取了。
但此时由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了6字节的内存,而thread cache却直接给了8字节的内存,这多给出的2字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内部碎片。
先对自由链表这个结构进行封装
static void*& NextObj(void* obj)
{return *(void**)obj;
}//管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);//头插NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);//头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}private:void* _freeList = nullptr;//指向自由链表的头指针
};
thread cache实际就是一个数组,数组中存储的就是一个个的自由链表。
threadcache哈希桶映射对齐规则
如何进行对齐?
上面已经说了,不是每个字节数都对应一个自由链表,这样开销太大了,因此我们需要制定一个合适的映射对齐规则。
首先,这些内存块是会被链接到自由链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为我们必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针。
但如果所有的字节数都按照8字节进行对齐的话,那么我们就需要建立256 × 1024 ÷ 8 = 32768 个桶,这个数量还是比较多的,实际上我们可以让不同范围的字节数按照不同的对齐数进行对齐,具体对齐方式如下:
虽然对齐产生的内碎片会引起一定程度的空间浪费,但按照上面的对齐规则,我们可以将浪费率控制到百分之十左右。
对齐和映射函数
//计算对象大小的对齐映射规则
class SizeClass
{
public://获取向上对齐后的字节数static inline size_t RoundUp(size_t bytes);//获取对应哈希桶的下标static inline size_t Index(size_t bytes);
};
注意:SizeClass类当中的成员函数最好设置为静态成员函数,否则我们在调用这些函数时就需要通过对象去调用,并且对于这些可能会频繁调用的函数,可以考虑将其设置为内联函数。
计算对齐后长度
在获取某一字节数向上对齐后的字节数时,可以先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理。
//计算对象对齐后的大小
static inline size_t RoundUp(size_t bytes)
{if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{assert(false);}return -1;
}
此时我们就需要编写一个子函数,该子函数需要通过对齐数计算出某一字节数对齐后的字节数,最容易想到的就是下面这种写法。
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{if (bytes % alignNum == 0){return bytes;}else{return (bytes / alignNum + 1) * alignNum;}
}
除了上述写法,我们还可以通过位运算的方式来进行计算,虽然位运算可能并没有上面的写法容易理解,但计算机执行位运算的速度是比执行乘法和除法更快的。
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{return (bytes + alignNum - 1) & ~(alignNum - 1);
}
计算下标
在获取某一字节数对应的哈希桶下标时,也是先判断该字节数属于哪一个区间,然后再通过调用一个子函数进行进一步处理。
//计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{assert(bytes <= MAX_BYTES);//每个区间的链数static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){//都从1开始,加上前面桶的数量return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2]+ group_array[3];}else{assert(false);}return -1;
}
此时我们需要编写一个子函数来继续进行处理,容易想到的就是根据对齐数来计算某一字节数对应的下标。
static inline size_t _Index(size_t bytes, size_t alignNum)
{if (bytes % alignNum == 0){return bytes / alignNum - 1;}else{return bytes / alignNum;}
}
当然,为了提高效率下面也提供了一个用位运算来解决的方法,需要注意的是,此时我们并不是传入该字节数的对齐数,而是将对齐数写成2的n次方的形式后,将这个n值进行传入。比如对齐数是8,传入的就是3。
static inline size_t _Index(size_t bytes, size_t align_shift)
{return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
ThreadCache类
按照上述的对齐规则,thread cache中桶的个数,也就是自由链表的个数是208,以及thread cache允许申请的最大内存大小256KB,进行数据定义。这里不用宏定义,因为c++宏定义容易出现bug
static const size_t MAX_BYTES = 256 * 1024;//thread chache申请的最大字节
static const size_t NFREELIST = 208;//自由链表哈希桶的大小
定义ThreadCache类
class ThreadCache
{
public://申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);//从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);private:FreeList _freeLists[NFREELIST];//桶
};
在thread cache申请对象时,通过所给字节数计算出对应的哈希桶下标,如果桶中自由链表不为空,则从该自由链表中取出一个对象进行返回即可;但如果此时自由链表为空,那么我们就需要从central cache进行获取
这里的FetchFromCentralCache函数也是thread cache类中的一个成员函数,在后面再进行具体实现。
//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{return nullptr;
}//申请内存对象 -> 从自由链表获取对象
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);//大小size_t alignSize = SizeClass::RoundUp(size);//下标size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{//向中心缓存申请空间return FetchFromCentralCache(index, alignSize);}}
在Thread Cache释放内存对象时,需要先找到它的哈希下标值,然后插入自由链表桶即可。
//释放内存对象 -> 放到自由链表里面
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);//找到对应位置插入自由链表size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);}
threadcacheTLS无锁访问
每个线程都有一个自己独享的thread cache,那应该如何创建这个thread cache呢?我们不能将这个thread cache创建为全局的,因为全局变量是所有线程共享的,这样就不可避免的需要锁来控制,增加了控制成本和代码复杂度。
要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage)
,这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。避免了多线程访问共享变量的竞争锁和数据冲突。
//TLS thread local storage
//线程局部存储(TLS),是一种变量的存储方法
//这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
//避免了多线程访问共享变量的竞争锁和数据冲突
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
//申请
static void* ConcurrentAlloc(size_t size)
{//通过TLS 每个线程无锁的获取专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << " : " << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}//释放
static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}
这两个函数通过线程局部存储实现无锁的内存分配和释放,每个线程使用自己专属的ThreadCache对象来高效管理内存。
centralcache
centralcache整体设计
当线程申请某一大小的内存时,如果thread cache中对应的自由链表不为空,那么直接取出一个内存块进行返回即可,但如果此时该自由链表为空,那么这时thread cache就需要向central cache申请内存了。
central cache的结构与thread cache是一样的,它们都是哈希桶的结构,并且它们遵循的对齐映射规则都是一样的。这样做的好处就是,当thread cache的某个桶中没有内存了,就可以直接到central cache中对应的哈希桶里去取内存就行了。
central cache与thread cache的不同
-
central cache与thread cache有两个明显不同的地方,首先,thread cache是每个线程独享的,而central cache是所有线程共享的,因为每个线程的thread cache没有内存了都会去找central cache,因此在访问central cache时是需要加锁的。但central cache在加锁时并不是将整个central cache全部锁上了,central cache在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。
-
thread cache哈希桶位置挂的是一个个切分好的小块内存, central cache的每个哈希桶位置挂是SpanList链表结构,每个映射桶下⾯的span中的⼤内存块被按映射关系切成了⼀个个⼩内存块对象挂在span的⾃由链表中。每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶这些内存块被切成了对应的大小。
centralcache结构设计
页号的类型
每个程序运行起来后都有自己的进程地址空间,在32位平台下,进程地址空间的大小是2^32;
而在64位平台下,进程地址空间的大小就是 2^64。页的大小一般是4K或者8K,我们以8K为例。在32位平台下,进程地址空间就可以被分成 2 ^32 / 2 ^13 = 2 ^19 个页;在64位平台下,进程地址空间就可以被分成 2^64 / 2^13 = 2^51个页。
为了确保页号( PAGE_ID) 这个类型有足够的位数来存储其对应平台下所有可能出现的页号。64位平台需要64位类型,32位平台32位类型就够用了。
#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else//Linux...
#endif
注意:在32位下,_WIN32有定义,_WIN64没有定义;而在64位下,_WIN32和_WIN64都有定义。因此在条件编译时,我们应该先判断_WIN64是否有定义,再判断_WIN32是否有定义。
span的结构
central cache的每个桶里挂的是一个个的span,span是一个管理以页为单位的大块内存,span的结构如下:
//管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0;//大块内存起始页的页号size_t _n = 0;//页的数量Span* _prev = nullptr;//双向链表的结构Span* _next = nullptr;size_t _useCount = 0;;//切分好的小块内存分配给thread chache的个数void* _freeList = nullptr;//切分好的小块内存的自由链表
};
- 页号是为了回收span时知道该位置的前后页,方便page cache进行合并页
- 页的数量是确定该span管理了多少页
- 双向链表的结构是为了回收span给page cache时,方便从链表里面删除
- _useCount是切分好的小块内存分配给thread chache的个数,为0说明central cache分配给thread cache的小块内存全部回来了,此时可以交给page cache进行合并
- _freeList是切分好的小块内存的自由链表
双链表结构
central cache的每个哈希桶里面存储的都是一个双链表结构,对于该双链表结构我们可以对其进行封装。
//封装带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_prev = _head;_head->_next = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;//prev newSpan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;//prev pos nextprev->_next = next;next->_prev = prev;}private:Span* _head;//哨兵位
public:std::mutex _mtx;//桶锁
};
需要注意的是,从双链表删除的span会还给下一层的page cache,相当于只是把这个span从双链表中移除,因此不需要对删除的span进行delete操作。
central cache的结构
central cache的映射规则和thread cache是一样的,因此central cache里面哈希桶的个数也是208。这样当thread cache的某个桶没有内存了,就可以直接去central cache对应的哈希桶进行申请就行了。
class CentralCache
{
public://...
private:SpanList _spanList[NFREELIST];
};
centralcache核心实现
central cache的实现方式
central cache和page cache在整个进程中只有一个,对于这种只能创建一个对象的类,我们可以将其设置为单例模式。
单例模式可以保证系统中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式相对较复杂,我们这里使用饿汉模式就足够了。
//单例模式 -> 饿汉 -> 静态对象
class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}//获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t size);//从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanList[NFREELIST];//桶private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;//声明
};
静态成员记得要在外面定义
CentralCache CentralCache::_sInst;//定义
慢开始反馈调节算法
central cache一次给thread cache的对象不能太多,也不能太少,所有我们这里使用慢开始反馈调节算法, 小对象一次批量上限高, 大对象一次批量上限低。
class SizeClass
{
public://....//一次thread cache 从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);//[2, 512] 一次批量移动多少个大小的上限(慢开始)//小对象一次批量上限高//大对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}
};
虽然这里进行了限制,但是一次申请512也是很多,我们可以在FreeList里面增加_maxSize成员,用来
控制向central cache申请的对象个数
//管理切分好的小对象的自由链表
class FreeList
{
public:size_t& MaxSize(){return _maxSize;}private:void* _freeList = nullptr;//指向自由链表的头指针size_t _maxSize = 1;//用于控制向central cache申请的对象个数
};
//申请的批量对象个数
size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
此时当thread cache申请对象时,我们会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。此外,如果本次采用的是_maxSize的值,那么还会将thread cache中该自由链表_maxSize的值进行加一。这样以来,控制效果更好了。
从中心缓存获取对象
每次thread cache向central cache申请对象时,我们先通过慢开始反馈调节算法计算出本次应该申请的对象的个数,然后再向central cache进行申请。
-
如果thread cache最终申请到对象的个数就是一个,那么直接将该对象返回即可。为什么需要返回一个申请到的对象呢?因为thread cache要向central cache申请对象,其实由于某个线程向thread cache申请对象但thread cache当中没有,这才导致thread cache要向central cache申请对象。因此central cache将对象返回给thread cache后,thread cache会再将该对象返回给申请对象的线程。
-
如果thread cache最终申请到的是多个对象,那么除了将第一个对象返回之外,还需要将剩下的对象挂到thread cache对应的哈希桶当中。
//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{//慢开始反馈调节算法//1. 开始不会向certral cache一次批量要太多,可能用不完//2. 不断用这个size向certral cache申请, batchNum会不断增大,直到上限//3. size越大,向certral cache申请的batchNum越小//4. size越小,向certral cache申请的batchNum越大//申请的批量对象个数size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (batchNum == _freeLists[index].MaxSize())_freeLists[index].MaxSize() += 1;void* start = nullptr;void* end = nullptr;//获取真实的申请批量对象个数size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);//至少有一个assert(actualNum >= 1);//申请到对象的个数是一个,则直接将这一个对象返回即可if (actualNum == 1){assert(start == end);return start;}//申请到对象的个数是多个,还需要将剩下的对象挂到thread cache中对应的哈希桶中else{_freeLists[index].PushRange(NextObj(start), end);return start;}}
从中心缓存获取一定数量的对象
从中心缓存获取batchNum数量的对象给thread cache,这些对象肯定都是从central cache对应哈希桶的某个span中取出来的,因此取出来的这n个对象是链接在一起的,我们只需要得到这段链表的头和尾即可,这里可以采用输出型参数进行获取。
//从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanList[index]._mtx.lock();//获取一个非空的spanSpan* span = GetOneSpan(_spanList[index], size);assert(span);assert(span->_freeList);//从span中获取batchNum个对象//如果不够batchNum,有多少拿多少start = span->_freeList;end = start;size_t i = 0;//真实获取的对象size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){i++;actualNum++;end = NextObj(end);}//取完后将剩下的对象继续链接到自由链表span->_freeList = NextObj(end);NextObj(end) = nullptr;//更新被分配给thread cache计数span->_useCount += actualNum;_spanList[index]._mtx.unlock();return actualNum;
}
- 由于central cache是所有线程共享的,所以我们在访问central cache中的哈希桶时,需要先给对应的哈希桶加上桶锁,在获取到对象后再将桶锁解掉。
- 从span中获取batchNum个对象,如果不够batchNum,有多少拿多少。
注意,虽然我们实际申请到对象的个数可能比n要小,但这并不会产生任何影响。因为thread cache的本意就是向central cache申请一个对象,我们之所以要一次多申请一些对象,是因为这样一来下次线程再申请相同大小的对象时就可以直接在thread cache里面获取了,而不用再向central cache申请对象。
获取一个非空的span函数与page cache关联,后面再介绍。
//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//...return nullptr;
}
插入一段范围的对象到自由链表
此外,如果thread cache最终从central cache获取到的对象个数是大于一的,那么我们还需要将剩下的对象插入到thread cache中对应的哈希桶中,为了能让自由链表支持插入一段范围的对象,我们还需要在FreeList类中增加一个对应的成员函数。
//管理切分好的小对象的自由链表
class FreeList
{
public:void PushRange(void* start, void* end){assert(start);assert(end);NextObj(end) = _freeList;_freeList = start;}
private:void* _freeList = nullptr;//指向自由链表的头指针size_t _maxSize = 1;//用于控制向central cache申请的对象个数
};
pagecache
pagecache整体设计
-
page cache与central cache一样,它们都是哈希桶的结构,并且page cache的每个哈希桶中里挂的也是一个个的span,这些span也是按照双链表的结构链接起来的。
-
page cache的映射规则与central cache, thread cache都不相同。page cache的哈希桶映射规则采用的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span。
-
central cache每个桶中的span被切成了一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span就应该由central cache自己来决定。
至于page cache当中究竟有多少个桶,这就要看你最大想挂几页的span了,这里我们就最大挂128页的span,为了让桶号与页号对应起来,我们可以将第0号桶空出来不用,因此我们需要将哈希桶的个数设置为129。
static const size_t NPAGES = 129;//page cache哈希桶的个数
定义PageCache类
当每个线程的thread cache没有内存时都会向central cache申请,此时多个线程的thread cache如果访问的不是central cache的同一个桶,那么这些线程是可以同时进行访问的。这时central cache的多个桶就可能同时向page cache申请内存的,所以page cache也是存在线程安全问题的,因此在访问page cache时也必须要加锁。
但是在page cache这里我们不能使用桶锁,因为当central cache向page cache申请内存时,page cache可能会将其他桶当中大页的span切小后再给central cache。此外,当central cache将某个span归还给page cache时,page cache也会尝试将该span与其他桶当中的span进行合并。
也就是说,在访问page cache时,我们可能需要访问page cache中的多个桶,如果page cache用桶锁就会出现大量频繁的加锁和解锁,导致程序的效率低下。因此我们在访问page cache时使用没有使用桶锁,而是用一个大锁将整个page cache给锁住。
page cache在整个进程中也是只能存在一个的,因此我们也需要将其设置为单例模式。
//一个以页为单位的span自由链表
//单例模式
class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}//获取一个k页的spanSpan* NewSpan(size_t k);//全局锁std::mutex _pageMtx;private:SpanList _spanLists[NPAGES];PageCache(){}PageCache(const PageCache&) = delete;static PageCache _sInst;
};
静态成员在外面定义
//定义
PageCache PageCache::_sInst;
在page cache获取一个k页的span的过程
如果central cache要获取一个n页的span,那我们就可以在page cache的第n号桶中取出一个span返回给central cache即可,但如果第n号桶中没有span了,这时我们要继续在后面的桶当中寻找span。
直接向堆申请以页为单位的内存时,我们应该尽量申请大块一点的内存块,因为此时申请到的内存是连续的,当线程需要内存时我们可以将其切小后分配给线程,而当线程将内存释放后我们又可以将其合并成大块的连续内存。如果我们向堆申请内存时是小块小块的申请的,那么我们申请到的内存就不一定是连续的了。
因此,当第n号桶中没有span时,我们可以继续找第n+1号桶,因为我们可以将n+1页的span切分成一个n页的span和一个1页的span,这时我们就可以将n页的span返回,而将切分后1页的span挂到1号桶中。但如果后面的桶当中都没有span,这时我们就只能向堆申请一个128页的内存块,并将其用一个span结构管理起来,然后将128页的span切分成n页的span和128-n页的span,其中n页的span返回给central cache,而128-n页的span就挂到第128-n号桶中。
即每次向堆申请的都是128页大小的内存块,central cache要的这些span实际都是由128页的span切分出来的。
pagecache中获取Span
获取一个非空的span
给SpanList类提供Begin和End成员函数,分别用于获取双链表中的第一个span和最后一个span的下一个位置,也就是头结点。以及判定是否为空。
//封装带头双向循环链表
class SpanList
{
public:Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head;}
private:Span* _head;//哨兵位
public:std::mutex _mtx;//桶锁
};
在SizeClass里面添加central cache一次向page cache申请的页数的函数
//计算对象大小的对齐映射规则
class SizeClass
{
public://central cache一次向page cache获取多少页static size_t NumMovePage(size_t size){//thread cache一次向central cache获取的对象个数size_t num = NumMoveSize(size);size_t npage = num * size;//一共需要的字节数npage >>= PAGE_SHIFT;//转换成页数//起码一页if (npage == 0)npage = 1;return npage;}};
PAGE_SHIFT是一页的大小,这里设置为13,即8K
static const size_t PAGE_SHIFT = 13;//一页的大小 2 ^ 13 = 8 K
当central cache申请到若干页的span后,还需要将这个span切成一个个对应大小的对象挂到该span的自由链表当中。
如何找到一个span所管理的内存块呢?首先需要计算出该span的起始地址,我们可以用这个span的起始页号乘以一页的大小即可得到这个span的起始地址,然后用这个span的页数乘以一页的大小就可以得到这个span所管理的内存块的大小,用起始地址加上内存块的大小即可得到这块内存块的结束位置。
//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//查看当前spanList里面是否有非空的spanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr)return it;elseit = it->_next;}//先解锁central cache,这样别的线程释放对象回来,不会阻塞list._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();//spanList里面没有空的span,向page cache申请Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));PageCache::GetInstance()->_pageMtx.unlock();//获取span进行切分,不需要加锁,因为这时别的线程访问不到这个span//得到大块span的起始地址和内存大小(字节数)char* start = (char*)(span->_pageId << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT;//把大块内存切分成size大小的对象链接起来char* end = start + bytes;//先切分一个做头,方便尾插span->_freeList = start;start += size;void* tail = span->_freeList;while (start < end){NextObj(tail) = start;tail = NextObj(tail);start += size;}//尾置空NextObj(tail) = nullptr;//现在切分好了,直接头插进spanList,再次加锁list._mtx.lock();list.PushFront(span);return span;
}
这里建议在访问page cache前,先把central cache对应的桶锁解掉。虽然此时central cache的这个桶当中是没有内存供其他thread cache申请的,但thread cache除了申请内存还会释放内存,如果在访问page cache前将central cache对应的桶锁解掉,那么此时当其他thread cache想要归还内存到central cache的这个桶时就不会被阻塞。
基于上面的操作,我们还需要添加SpanList的头插操作,为了后面取出空的span,相当于删除操作,将SpanList添加头删
//封装带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_prev = _head;_head->_next = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head;}void PushFront(Span* span){Insert(Begin(), span);}Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;//prev newSpan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;//prev pos nextprev->_next = next;next->_prev = prev;}private:Span* _head;//哨兵位
public:std::mutex _mtx;//桶锁
};
如果page cache的第k号桶中没有span,我们就应该继续找后面的桶,只要后面任意一个桶中有一个n页span,我们就可以将其切分成一个k页的span和一个n-k页的span,然后将切出来k页的span返回给central cache,再将n-k页的span挂到page cache的第n-k号桶即可。
但如果后面的桶中也都没有span,此时我们就需要向堆申请一个128页的span了,在向堆申请内存时,直接调用我们封装的SystemAlloc函数即可。
需要注意的是,向堆申请内存后得到的是这块内存的起始地址,此时我们需要将该地址转换为页号。由于我们向堆申请内存时都是按页进行申请的,因此我们直接将该地址除以一页的大小即可得到对应的页号。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);//优先查看第k个桶有没有spanif (!_spanLists[k].Empty()){return _spanLists[k].PopFront();}//查看后面的桶有没有span,如果有将其切分 //如果是第n个桶 -> 第k个桶 第 n-k 个桶for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在span前面切分k页kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//将剩下的span链接到spanList对应的位置_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}//如果都没有,向堆申请一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//递归调用自己,避免代码重复return NewSpan(k);
}
申请内存过程联调
出现的问题
完成上面的代码,进行编译时会出现错误,在ThreadCache::FetchFromCentralCache里面的min函数出现问题。这是因为algorithm头文件里面的min函数是一个函数模板,而Windows.h里面的min是一个宏。调用函数模板需要进行参数类型的推导,因此我们调用std::min的时候,编译器认为是一个宏,就错误了,这里我们去掉std::,让编译器使用这个宏。
测试一
这里线程申请5次内存,内存对齐后都是8,访问thread cache的0号桶
void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);void* p4 = ConcurrentAlloc(7);void* p5 = ConcurrentAlloc(8);cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;cout << p4 << endl;cout << p5 << endl;}
线程第一次申请内存,需要创建一个专属的thread cache对象
通过索引访问到了0号桶,0号桶里面没有自由链表,向Central cache申请内存块
在向central cache申请内存块之前,需要先由NumMoveSize函数计算, thread cache一次最多可向central cache申请8字节大小的对象的个数是512个。然后与该自由链表的_maxSize比较,得到最小值是1,即本次thread cache 向central cache申请的8字节大小对象的个数是1个。
由于本次最小值与_maxSize相等,所有该自由链表的_maxSize变成2。
获取对象之前,central cache需要先锁住该0号桶
先遍历该桶,没有找到一个非空span
在central cache向page cache申请内存之前,需要将该0号桶的锁给释放,然后page cache加锁,才能继续访问page cache
在向page cache申请内存时,由于central cache一次给thread cache 8字节对象的上限是512,对应就需要4096字节,所需字节数不足一页就按一页算,所以这里central cache就需要向page cache申请一页的内存块。
此时page cache的第1个桶以及之后的桶当中都是没有span的,因此page cache需要直接向堆申请一个128页的span。
可以看到,用于管理申请到的128页内存的span信息。
可以看到, 按页向堆申请的内存块的起始地址和页号之间是可以相互转换的。
现在将申请到的128页的span插入到page cache的第128号桶当中,然后再调用一次NewSpan,然后在第128号桶里面找到第一个span
此时把这个128页的span拿出来,切分成1页的span和127页的span,将1页的span返回给central cache,而把127页的span挂到page cache的127号桶。
从page cache返回后,就可以把page cache的大锁解掉了,但紧接着还要将获取到的1页的span进行切分,因此这里没有立刻重新加上central cache对应的桶锁。
在进行切分的时候,先通过该span的起始页号得到该span的起始地址,然后通过该span的页数得到该span所管理内存块的总的字节数。
在确定内存块的开始和结束后,就可以将其切分成一个个8字节大小的对象挂到该span的自由链表中了。可以看到,切分出来的每个8字节大小的对象的前四个字节存储的都是下一个8字节对象的起始地址。
当切分结束后再获取central cache第0号桶的桶锁,然后将这个切好的span插入到central cache的第0号桶中,然后返回这个非空span
thread cache向central cache申请了一个对象,因此拿到这个非空的span后,直接从这个span里面取出一个对象,此时该span的_useCount由0变成了1。
因为thread cache实际向central cache只申请一个对象,直接返回该对象即可。
当线程第二次申请内存块时就不会再创建thread cache了,因为第一次申请时就已经创建好了,此时该线程直接获取到对应的thread cache进行内存块申请
当该线程第二次申请8字节大小的对象时,此时thread cache的0号桶中还是没有对象的,因为第一次thread cache只向central cache申请了一个8字节对象,所以这次申请时还需要再向central cache申请对象。
thread cache向central cache申请对象时,thread cache第0号桶中自由链表的_maxSize已经慢增长到2了,所以这次在向central cache申请对象时就会申请2个。如果下一次thread cache再向central cache申请8字节大小的对象,那么central cache会一次性给thread cache3个,这就是所谓的慢增长。
由于第一次central cache向page cache申请了一页的内存块,并将其切成了1024个8字节大小的对象,因此这次thread cache向central cache申请2两个8字节的对象时,central cache的第0号桶当中是有对象的,直接返回两个给thread cache即可,而不用再向page cache申请内存了。
但线程实际申请的只是一个8字节对象,因此thread cache除了将一个对象返回之外,还需要将剩下的一个对象挂到thread cache的第0号桶当中。
如此,当线程第三次申请1字节的内存时,由于1字节对齐后也是8字节,此时thread cache也就不需要再向central cache申请内存块了,直接将第0号桶当中之前剩下的一个8字节对象返回即可。
程序结果如下
可以看到,该线程确实只有一个pTLSThreadCache实例,并且五次申请的内存是连续的。
测试二
让线程申请1024次8字节的对象,然后通过调试观察在第1025次申请时,central cache是否会再向page cache申请内存块。
void TestConcurrentAlloc2()
{for (size_t i = 0; i < 1024; i++){void* p1 = ConcurrentAlloc(6);cout << p1 << endl;}void* p2 = ConcurrentAlloc(8);cout << p2 << endl;
}
因为central cache第一次就是向page cache申请的一页内存,这一页内存被切成了1024个8字节大小的对象,当这1024个对象全部被申请之后,再申请8字节大小的对象时central cache当中就没有对象了,此时就应该向page cache申请内存块。
可以看到,第1025次申请8字节大小的对象时,central cache第0号桶中的这个span的_useCount已经增加到了1024,也就是说这1024个对象都已经被线程申请了,此时central cache就需要再向page cache申请一页的span来进行切分。
这次central cache在向page cache申请一页的内存时,page cache就是将127页span切分成了1页的span和126页的span了,然后central cache拿到这1页的span后,又会将其切分成1024块8字节大小的内存块以供thread cache申请。
threadcache回收内存
释放内存时,将对象回收到对应的自由链表里面,当该自由链表的长度过长时,有可能用不到该链表,导致一直占用内存资源,所以我们需要回收一定数量的对象到central cache
threadcache类定义
//释放对象,链表过长时,回收一段list给central cache
void ListTooLong(FreeList& list, size_t n);
那么什么时候自由链表的需要回收呢?当链表长度大于一次申请的内存长度时将list还给central cache
//释放内存对象 -> 放到自由链表里面
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);//找到对应位置插入自由链表size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);//当链表长度大于一次申请的内存长度时将list还给central cacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}}
对于ListTooLong,需要将该自由链表切分,取出MaxSize长度的链表给central cache进行处理
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;//将list取出MaxSize长度list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
在centralcache类定义函数用来函数这段list放到对应的span里面,后面我们在centralcache回收内存里面进行实现
//回收一定数量的对象到对应的span
void ReleaseListToSpans(void* start, size_t size);
基于上面的Size函数,在FreeList里面定义一个记录自由链表长度的成员_size,并且增加该成员的对外函数,修改之前的插入删除操作
//管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);//头插NextObj(obj) = _freeList;_freeList = obj;++_size;}void PushRange(void* start, void* end, size_t n){assert(start);assert(end);NextObj(end) = _freeList;_freeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);start = _freeList;end = start;for(size_t i = 0;i < n - 1;i++){end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}void* Pop(){assert(_freeList);//头删void* obj = _freeList;_freeList = NextObj(obj);--_size;return obj;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}size_t Size(){return _size;}private:void* _freeList = nullptr;//指向自由链表的头指针size_t _maxSize = 1;//用于控制向central cache申请的对象个数size_t _size;//记录该自由链表的长度
};
对于删除一段自由链表的PopRange,我们让end进行移动,移动次数为n -1,这样方便我们给切分的list的NextObj置空
centralcache回收内存
上面我们回收了一段自由链表,那么如何通过地址找到页号呢?
地址除以页的大小 = 页号
假设页号为2000, 一页的大小为8k,那么页号对应的地址为 2000* 8*1024=FA 0000, 可以知道在页号为2001之前的地址除以8k都是一个页号,也就是一个span
如此在pagecache定义页号与span的映射成员变量
//记录页号与span的映射关系
std::map<PAGE_ID, Span*> _idSpanMap;
在获取一个k页的span里面,填写对应id与span的映射关系,方便回收central cahce的小块内存时,找到对应的span
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);//优先查看第k个桶有没有spanif (!_spanLists[k].Empty()){return _spanLists[k].PopFront();}//查看后面的桶有没有span,如果有将其切分 //如果是第n个桶 -> 第k个桶 第 n-k 个桶for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在span前面切分k页kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//将剩下的span链接到spanList对应的位置_spanLists[nSpan->_n].PushFront(nSpan);//建立id与span的映射关系,方便回收central cahce的小块内存时,找到对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//如果都没有,向堆申请一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//递归调用自己,避免代码重复return NewSpan(k);
}
从对象到span的映射也要得到,这里将地址直接除以页大小即可得到页号
//获取从对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
centralcache回收List到对应的span,首先将该自由链表start得到对应的span,然后头插进该span的自由链表,该span的_useCount- -,直到为0,说明
该span切分好的对象都回来了,回收此span给page cache
//回收List到对应的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanList[index]._mtx.lock();while (start){void* next = NextObj(start);Span* span = PageCache::GetInstance()->MapObjToSpan(start);//头插NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;//说明该span切分好的对象都回来了,回收此span给page cacheif (span->_useCount == 0){//这个span回收给page cache,让page cache进行前后页的合并_spanList[index].Erase(span);//自由链表置空,前后结果置空span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;//移除完span后,可以解锁,让后面central cache申请释放内存不阻塞_spanList[index]._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanList[index]._mtx.lock();}start = next;}_spanList[index]._mtx.unlock();
}
这个我们在pagecache回收内存时完成
//释放空闲的span回收到page cahce,并且合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{}
pagecache回收内存
pagecache回收该span时,需要查看该span的前后页进行合并,那么还需要添加新成员_isUse用来判定该span是否正在被central cache使用。
不能使用_useCount=0来判断,因为如果刚好一个线程申请新的span,此时central cache分配一个已有的span,该span的_useCount=0,此span不能参与合并页,会出现线程安全的问题
//管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0;//大块内存起始页的页号size_t _n = 0;//页的数量Span* _prev = nullptr;//双向链表的结构Span* _next = nullptr;size_t _useCount = 0;;//切分好的小块内存分配给thread chache的个数bool _isUse = false;//是否在被使用void* _freeList = nullptr;//切分好的小块内存的自由链表
};
centralcache获取一个非空的span里面设置span使用的状态
//记录该span正在使用
span->_isUse = true;
在PageCache获取一个k页的span里面,将切分好的nSpan首尾页号跟nSpan映射,方便page cache回收内存时,进行合并查找
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);//优先查看第k个桶有没有spanif (!_spanLists[k].Empty()){return _spanLists[k].PopFront();}//查看后面的桶有没有span,如果有将其切分 //如果是第n个桶 -> 第k个桶 第 n-k 个桶for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在span前面切分k页kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//将剩下的span链接到spanList对应的位置_spanLists[nSpan->_n].PushFront(nSpan);//将切分好的nSpan首尾页号跟nSpan映射,// 方便page cache回收内存时,进行合并查找_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//建立id与span的映射关系,方便回收central cahce的小块内存时,找到对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//如果都没有,向堆申请一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//递归调用自己,避免代码重复return NewSpan(k);
}
对于ReleaseSpanToPageCache,释放空闲的span回收到page cahce,并且合并相邻的span,可以前后合并。
前后合并分为三种情况:
- 没有找到页号,退出合并
- 找到的span正在使用,退出合并
- 找到的span与本span合并后长度大于128页,无法管理,退出合并
在合并后将该span插入对应页桶里面,并且映射首尾页号,方便后面再次合并,设置span状态未使用。
//释放空闲的span回收到page cahce,并且合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//对span的前后页,尝试进行合并,解决内存碎片(外碎片)的问题//向前合并while (1){PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);//没有找到,直接结束if (ret == _idSpanMap.end()){break;}Span* prevSpan = ret->second;//该span正在被使用,直接结束if (prevSpan->_isUse == true){break;}//合并后长度太长,不能管理,直接结束if (prevSpan->_n + span->_n > NPAGES - 1){break;}//可以合并了span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;//将prevSpan从桶里面释放_spanLists[prevSpan->_n].Erase(prevSpan);//释放prevSpan结构delete prevSpan;}//向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);//没有找到if (ret == _idSpanMap.end()){break;}Span* nextSpan = ret->second;//正在使用if (nextSpan->_isUse == true){break;}//合并长度太大,无法管理if (nextSpan->_n + span->_n > NPAGES - 1){break;}//可以合并了span->_n += nextSpan->_n;//将nextSpan从桶里面释放_spanLists[nextSpan->_n].Erase(nextSpan);//释放nextSpan结构delete nextSpan;}//前后页合并完毕,插入桶里面_spanLists[span->_n].PushFront(span);//将该span的首尾页号与span映射,方便后面的page cache回收前后页_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;//设置该span未被使用span->_isUse = false;}
释放内存过程联调
我们申请3个空间并且释放观察其释放过程,这三个空间都向threadcache申请8字节的空间,位于0号桶。
void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;ConcurrentFree(p1, 6);ConcurrentFree(p2, 8);ConcurrentFree(p3, 1);}
刚开始p1申请的_maxSize为1,threadcache返回一个对象,p2申请的_maxSize为2,threadcache返回两个对象,给p2一个,自由链表还剩一个,
p3申请时可以从自由链表剩的一个直接返回,此时_size = 0,_maxSize = 3,
_useCount = 3
回收p1后该自由链表_size = 1,此时_size = 1小于 _maxSize = 3,不进行进一步回收工作
回收p2后该自由链表_size = 2,此时_size 还是小于_maxSize, 不进行进一步回收工作
回收p3该自由链表_size = 3满足回收条件,将该链表进行回收
通过PopRange将长度为3的自由链表移除桶,通过centralcache找到对应的span,可以看到此时_useCount = 3
当把第三块小对象还给span时,该span的_useCount = 0,说明该span所有的小块内存都回来了,接下来就可以把该span从centralcache里面移除,将span交给pagecache用来合并前后页
该span向前查找没有找到,向后查找到了页数为127的span
将该span于nextSpan合并,直接将span的页数加上nextSpan的页数即可
前后页合并完毕,插入pagecache桶里面,然后将该span的首尾页号与span映射,方便后面的page cache回收前后页,最后设置该span未被使用。
大于256KB内存申请问题
threadcache申请最大内存为256KB, 一页大小为8KB,所以threadcache最大可以申请256 / 8 = 32 页,而pagecache最大申请为128页,大于 128 页的直接向堆申请
申请内存大小 | 申请方式 |
---|---|
size <= 256KB(32页) | 向threadcache申请 |
32 页< size <= 128页 | 向pagecache申请 |
size > 128页 | 直接向堆申请 |
申请大于256KB的内存,也需要进行内存对齐,这里直接按一页进行对齐
//计算对象对齐后的大小
static inline size_t RoundUp(size_t bytes)
{if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{return _RoundUp(bytes, 1 << PAGE_SHIFT);}return -1;
}
在进行并发分配时,先判断申请的内存大小是否大于256KB,计算对齐后申请的页数,向pagecache申请kPage页数的span,然后在里面区分具体向堆申请还是向pagecache申请。
//申请
static void* ConcurrentAlloc(size_t size)
{//判断申请的内存大小是否大于256KBif (size > MAX_BYTES){//计算对齐后申请的页数size_t alignSize = SizeClass::RoundUp(size);PAGE_ID kPage = (PAGE_ID)(alignSize >> PAGE_SHIFT);//申请kPage页的spanPageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kPage);PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{//通过TLS 每个线程无锁的获取专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << " : " << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}}
PageCache::NewSpan获取一个k页的span, 申请超过128页的span直接向堆申请, 超过256KB(32页),不超过128页的继续向pagecache申请即可
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0);//申请超过128页的span直接向堆申请if (k > NPAGES - 1){void* ptr = SystemAlloc(k);Span* span = new Span;span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;//将页号与span映射_idSpanMap[span->_pageId] = span;return span;}//优先查看第k个桶有没有spanif (!_spanLists[k].Empty()){return _spanLists[k].PopFront();}//查看后面的桶有没有span,如果有将其切分 //如果是第n个桶 -> 第k个桶 第 n-k 个桶for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在span前面切分k页kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//将剩下的span链接到spanList对应的位置_spanLists[nSpan->_n].PushFront(nSpan);//将切分好的nSpan首尾页号跟nSpan映射,// 方便page cache回收内存时,进行合并查找_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//建立id与span的映射关系,方便回收central cahce的小块内存时,找到对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; i++){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//如果都没有,向堆申请一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//递归调用自己,避免代码重复return NewSpan(k);
}
申请大于256KB内存问题解决了,接下来就是释放问题
首先判断是否是大于256KB的内存释放,如果是,就通过映射关系找到对应的span, 找到后前往pagecache进行释放该span
//释放
static void ConcurrentFree(void* ptr, size_t size)
{//大于128KB的内存释放if (size > MAX_BYTES){//根据映射关系找到spanSpan* span = PageCache::GetInstance()->MapObjToSpan(ptr);//释放spanPageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}}
在pagecache,ReleaseSpanToPageCache释放内存时,首先判断是否是直接向堆申请的内存,如果是就直接由堆释放。如果是32页 ~ 128页中间由pagecache申请的span, 就继续进行合并span,归还给pagecache页桶,供后面centralcache向pagecache申请空的span
//释放空闲的span回收到page cahce,并且合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//判断是否是直接向堆申请的空间if (span->_n > NPAGES - 1){void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);delete span;return;}//对span的前后页,尝试进行合并,解决内存碎片(外碎片)的问题//向前合并while (1){PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);//没有找到,直接结束if (ret == _idSpanMap.end()){break;}Span* prevSpan = ret->second;//该span正在被使用,直接结束if (prevSpan->_isUse == true){break;}//合并后长度太长,不能管理,直接结束if (prevSpan->_n + span->_n > NPAGES - 1){break;}//可以合并了span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;//将prevSpan从桶里面释放_spanLists[prevSpan->_n].Erase(prevSpan);//释放prevSpan结构delete prevSpan;}//向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);//没有找到if (ret == _idSpanMap.end()){break;}Span* nextSpan = ret->second;//正在使用if (nextSpan->_isUse == true){break;}//合并长度太大,无法管理if (nextSpan->_n + span->_n > NPAGES - 1){break;}//可以合并了span->_n += nextSpan->_n;//将nextSpan从桶里面释放_spanLists[nextSpan->_n].Erase(nextSpan);//释放nextSpan结构delete nextSpan;}//前后页合并完毕,插入桶里面_spanLists[span->_n].PushFront(span);//将该span的首尾页号与span映射,方便后面的page cache回收前后页_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;//设置该span未被使用span->_isUse = false;}
这里我们封装堆释放空间的函数,还是用条件编译,可以跨平台使用。
//在堆上释放空间
inline static void SystemFree(void* ptr)
{//windows
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);#elif//Linux下的bsbrk, unmmap
#endif
}
测试函数
这里我们用p1申请257KB的内存,如果程序正常运行,可以看到是直接向pagecache申请内存的, 后面由pagecache回收到页桶里面。p2申请129页的内存,如果程序正常运行,可以看到是直接向堆申请空间,并且直接由堆回收空间。
void BigAlloc()
{//直接向pagecache申请void* p1 = ConcurrentAlloc(257 * 1024);ConcurrentFree(p1, 257 * 1024);//直接向堆申请void* p2 = ConcurrentAlloc(129 * 8 * 1024);ConcurrentFree(p2, 129 * 8 * 1024);
}
调试分析
对于p1直接向pagecache申请内存,计算的页号为33,pagecache向堆申请128页,分配给p1的span33页
在释放时,可以看到找到的这个与之前申请的地址一样。
在pagecache里面,该span与后面的span进行了合并,又回到了128页,供下次符合条件的内存申请。
对于p2申请页数是129,超过了pagecache申请的范围,直接向堆申请129页的空间
可以看到最后该span也由堆进行释放
使用定长内存池配合脱离使用new
既然这是仿tcmalloc高并发内存池,那么我们就要脱离malloc,new里面封装了malloc,这里借助定长内存池,脱离上面程序出现的new与delete。
class PageCache
{
public://...
private://使用定长内存池,脱离new,deleteObjectPool<Span> _spanPool;
};
在代码里面出现的new与delete,都可以换成定长内存池申请与释放
/*Span* span = new Span;*/
Span* span = _spanPool.New();
/*delete span;*/
_spanPool.Delete(span);
在并发分配的Alloc, 每一个线程第一次访问时都需要用new申请创建threadcache对象,这里也可以使用定长内存池。
这里我们将用于申请ThreadCache类对象的定长内存池定义为静态的,保持全局只有一个,让所有线程创建自己的thread cache时,都在个定长内存池中申请内存就行了。
但注意在从该定长内存池中申请内存时需要加锁,防止多个线程同时申请自己的ThreadCache对象而导致线程安全问题。
//申请
static void* ConcurrentAlloc(size_t size)
{//判断申请的内存大小是否大于256KBif (size > MAX_BYTES){//计算对齐后申请的页数size_t alignSize = SizeClass::RoundUp(size);PAGE_ID kPage = (PAGE_ID)(alignSize >> PAGE_SHIFT);//申请kPage页的spanPageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kPage);PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{//通过TLS 每个线程无锁的获取专属的ThreadCache对象if (pTLSThreadCache == nullptr){static std::mutex tcMtx;static ObjectPool<ThreadCache> tcPool;/*pTLSThreadCache = new ThreadCache;*/tcMtx.lock();pTLSThreadCache = tcPool.New();tcMtx.unlock();}cout << std::this_thread::get_id() << " : " << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}}
释放对象时优化为不传对象大小
malloc申请内存需要传入申请的大小,free释放内存只需要传入释放的指针。
我们这里要把释放对象,优化为只传入指针,不要传大小。那么又该如何控制释放的内存是走三级缓存(threadcache, centralcache, pagecache),还是走大块内存的pagecache,或者直接走堆释放。
可以改变Span的结构,增加一个成员用来记录切分的对象的大小。
//管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0;//大块内存起始页的页号size_t _n = 0;//页的数量Span* _prev = nullptr;//双向链表的结构Span* _next = nullptr;size_t _useCount = 0;;//切分好的小块内存分配给thread chache的个数bool _isUse = false;//是否在被使用void* _freeList = nullptr;//切分好的小块内存的自由链表size_t _objSize = 0;//切分好的对象的大小
};
然后在申请span后记录该对象的大小,即可在释放内存时,根据对象的大小选择什么方式释放。
//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{...PageCache::GetInstance()->_pageMtx.lock();//spanList里面没有空的span,向page cache申请Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));//储存该内存块的大小span->_objSize = size;//记录该span正在使用span->_isUse = true;PageCache::GetInstance()->_pageMtx.unlock();...
}
//申请
static void* ConcurrentAlloc(size_t size)
{//判断申请的内存大小是否大于256KBif (size > MAX_BYTES){//计算对齐后申请的页数size_t alignSize = SizeClass::RoundUp(size);PAGE_ID kPage = (PAGE_ID)(alignSize >> PAGE_SHIFT);//申请kPage页的spanPageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kPage);span->_objSize = alignSize;PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}...
}
读取映射关系时的加锁问题
MapObjToSpan作为获取从对象到span的映射,在当前程序的访问中会出现线程安全的问题。在pagecache使用map的映射关系时,由于本身加了锁所以不会出现问题,但是在释放内存的时候以及centralcache回收List到对应的span,是没有锁的,会出现一个线程根据map获取映射关系,另一个线程修改这个map,导致出现线程安全的问题。
为了解决这个问题,我们在MapObjToSpan里面添加一个由RAII管理的unique_lock锁,在访问的时候自动加锁,退出函数时自动解锁。
//获取从对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//为了安全访问std::unique_lock<std::mutex> mtx(_pageMtx);auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
多线程环境下对比malloc测试
之前都是各个功能单元的测试,接下来进行前面的测试。比较多线程环境下的高并发内存池与malloc的性能。
//测试系统的malloc,free
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; k++){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; j++){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin1);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned)free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, (unsigned)malloc_costtime + free_costtime);}
这个函数的参数
- ntimes 一次申请和释放内存的次数
- nworks 线程的个数
- rounds 轮次
注意,我们创建线程时让线程执行的是lambda表达式,而我们这里在使用lambda表达式时,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此我们可以将各个线程消耗的时间累加到一起。
这里我们创建4个线程,让每个线程执行10万次,总共执行40万次。
固定大小内存的申请和释放
v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));
可以看到固定大小内存的申请和释放,高并发内存池花费的时间比较多,原因在于定长内存的申请,都会集中在一个桶里面,threadcache不断回收内存,centralcache都访问一个桶,桶锁的作用没有体现出来。
不同大小内存的申请和释放
v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
可以看到现在高并发内存池的效率提高了不少,但是还是很接近malloc,我们是替代malloc,所以还要再次优化。
复杂问题的调试技巧
在多线程模式下,程序运行出现问题,不好调试,这里介绍三种方法用来调试。
条件断点
程序出现报错,如果是断言起了作用,一般写个满足这个断言的判断,然后打上断点,但是空语句是不能打断点的,这里随便加上一句语句就可以了。
查看函数栈帧
在程序调用多次函数错误时,可以查看函数栈帧,不断往前面回退,查看是否符合逻辑。
在弹出的窗口点击对应的函数即可不断回退,在里面查看各个变量的状态。
疑似死循环时中断程序
如果程序一直不进入断点,有可能是死循环的问题。这里可以点击全部中断,程序就会在当前运行的地方停止。
性能瓶颈分析
前面可以看到当前的程序与malloc还是有点差距,那么瓶颈在哪里呢?这里就需要性能分析的工具进行分析了。
VS编译器下性能分析的操作步骤
将n设置为1000,减少分析时间,同时屏蔽malloc,因为我们分析的是高并发内存池。
在debug模式下,点击调试,选择性能探查器
可以看到大部分时间都花在了MapObjToSpan,即查找对应的span这个函数上面,接下来就针对这个方面进行优化。
针对性能瓶颈使用基数树进行优化
基数树实际就是分层的哈希表,这里分了三层,分别为单层奇数树,二层奇数树,三层奇数树。
单层基数树
单层基数树实际采用的就是直接定址法,每一个页号对应span的地址就存储数组中在以该页号为下标的位置。最坏的情况下我们需要建立所有页号与其span之间的映射关系,因此这个数组中元素个数应该与页号的数目相同,数组中每个位置存储的就是对应span的指针。
非类型模板参数BITS表示存储页号最多需要比特位的个数。32位为32 - PAGE_SHIFT,64位为 64 - PAGE_SHIFT。
32位下一共有2 ^ 32内存,一页的大小为2 ^ 13,所以有 2 ^ 32 / 2 ^ 13 = 2 ^ 19个页。指针大小为4字节,故一共所需2 ^ 19 * 4 = 2 ^ 21字节=2MB内存,消耗不大。
64位下一共有2 ^ 64内存,一页的大小位2 ^ 13,所以有2 ^ 64 / 2 ^ 13 = 2 ^ 51 个页。指针大小为8字节,故一共所需2 ^ 51 * 8 = 2 ^ 54字节=2 ^ 24GB,这个明显太大了,对于64位用三层基数树。
//单层基数树
//BITS 表示存储页号最多需要比特位的个数
template<int BITS>
class TCMalloc_PageMap1
{
public:typedef uintptr_t Number;//explicit防止编译器隐式类型转换explicit TCMalloc_PageMap1(){//全部用来开辟指针需要的大小size_t size = sizeof(void*) << BITS;//对齐后大小size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, size);}void* get(Number k) const{//k的范围为[0, 2^BITS - 1]if ((k >> BITS) > 0){return NULL;}return array_[k];}void set(Number k, void* v){//k的范围为[0, 2^BITS - 1]assert((k >> BITS) == 0);array_[k] = v;}private:static const int LENGTH = 1 << BITS;//页的个数void** array_;//储存映射关系的数组
};
二层基数树
前面我们知道在32位下需要19位储存页号,二层基数树就是把这19位分了两次映射。用前5位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的位在基数树的第二层进行映射,映射后最终得到该页号对应的span指针。
在二层基数树中,第一层的数组占用2 ^ 5 * 4 = 2 ^ 7Byte空间,第二层最多占用2 ^ 5 * 2 ^ 14 * 4 = 2 ^ 21
Byte = 2MB空间。二层基数树相比单层基数树的好处就是,单层基数树必须一开始就把2 MB 的数组开辟出来,而二层基数树一开始时只需将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组就行了。
//二层基数树
template<int BITS>
class TCMalloc_PageMap2
{
private:static const int ROOT_BITS = 5;//第一层对应页号的前5个比特位static const int ROOT_LENFTH = 1 << ROOT_BITS;//第一层储存元素的个数static const int LEAF_BITS = BITS - ROOT_BITS;//第二层对应页号的其余比特位static const int LEAF_LENGTH = 1 << LEAF_BITS;//第二层储存元素的个数//第一层数组中储存元素类型struct Leaf{void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENFTH];//第一层数组public:typedef uintptr_t Number;explicit TCMalloc_PageMap2(){memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const{//第一层对应的下标const Number i1 = k >> LEAF_BITS;//第二层对应的下标const Number i2 = k & (LEAF_LENGTH - 1);//页号超过范围或者没有建立映射关系if ((k >> BITS) > 0 || root_[i1] == NULL){return NULL:}return root_[i1]->values[i2];}void set(Number k, void* v){//第一层对应的下标const Number i1 = k >> LEAF_BITS;//第二层对应的下标const Number i2 = k & (LEAF_LENGTH - 1);assert(i1 < ROOT_LENFTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n){for (Number key = start; key < start + n - 1;){const Number i1 = key >> LEAF_BITS;if (i1 >= ROOT_LENFTH){return false;}//该下标的空间没有开辟if (root_[i1] == NULL){static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}//继续后面的检查key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory(){//将第二层的空间全部开辟好Ensure(0, 1 << BITS);}};
三层基数树
三层基数树类似二层基数树,只不过多了一层映射关系。三层基数树适合64位下的映射关系,之前那两个适合32位下的映射关系。
//三层基数树
template<int BITS>
class TCMalloc_PageMap3
{
private://第一、二层对应页号的比特位个数static const int INTERIOR_BITS = (BITS + 2) / 3;//第一、二层存储元素的个数static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;//第三层对应页号的比特位个数static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;//第三层存储元素的个数static const int LEAF_LENGTH = 1 << LEAF_BITS;struct Node{Node* ptrs[INTERIOR_LENGTH];};struct Leaf{void* values[LEAF_LENGTH];};Node* NewNode(){static ObjectPool<Node> nodePool;Node* result = nodePool.New();if (result != NULL){memset(result, 0, sizeof(*result));}return result;}Node* root_;public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(){root_ = NewNode();}void* get(Number k) const{//第一层对应的下标const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);//第二层对应的下标const Number i2 = (k >> (LEAF_BITS)) & (INTERIOR_LENGTH - 1);//第三层对应的下标const Number i3 = k & (LEAF_BITS - 1);if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL){return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2]->values[i3]);}void set(Number k, void* v){assert((k >> BITS) == 0);//第一层对应的下标const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);//第二层对应的下标const Number i2 = (k >> (LEAF_BITS)) & (INTERIOR_LENGTH - 1);//第三层对应的下标const Number i3 = k & (LEAF_BITS - 1);//确保第k页的空间开辟好了Ensure(k, 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2]->values[i3]) = v;}bool Ensure(Number k, size_t n){for (Number start = k; start <= k + n - 1;){//第一层对应的下标const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);//第二层对应的下标const Number i2 = (k >> (LEAF_BITS)) & (INTERIOR_LENGTH - 1);if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH){return false;}if (root_->ptrs[i1] == NULL){Node* node = NewNode();if (node == NULL)return false;root_ptrs[i1] = node;}if (root_->ptrs[i1]->ptrs[i2] == NULL){static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();if (leaf == NULL)return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}//继续后面的检查key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;return true;}}void PreallocateMoreMemory(){//将第三层的空间全部开辟好Ensure(0, 1 << BITS);}};
使用基数树进行优化代码实现
基于此,我们改造代码,这里是32位进行更改
class PageCache
{
public://...
private://记录页号与span的映射关系//std::map<PAGE_ID, Span*> _idSpanMap;TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
};
建立页号与span的映射
_idSpanMap.set(kSpan->_pageId + i, kSpan);
读取某一页号对应的span
Span* ret = (Span*)_idSpanMap.get(id);
现在用于读取映射关系的MapObjectToSpan函数内部就不需要加锁
//获取从对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//为了安全访问//std::unique_lock<std::mutex> mtx(_pageMtx);//auto ret = _idSpanMap.find(id);Span* ret = (Span*)_idSpanMap.get(id);//if (ret != _idSpanMap.end())if(ret != NULL){//return ret->second;return ret;}else{assert(false);return nullptr;}
}
为什么读取基数树映射关系时不需要加锁?
当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的。
因为C++中map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当我们在插入数据时其底层的结构都有可能会发生变化。比如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此我们在读取映射关系的时候是需要加锁的。
而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在page cache进行的。也就是说,读取映射时读取的都是对应span的_useCount不等于0的页,而建立映射时建立的都是对应span的_useCount等于0的页,所以说我们不会同时对同一个页进行读取映射和建立映射的操作。
再次对比malloc进行测试
可以看到效率明显提升了好几倍
源码
高并发内存池