当前位置: 首页 > news >正文

【项目设计】高并发内存池

        高并发内存池(tcmalloc),是谷歌的一项开源项目,用于替代传统的malloc,实现高效的多线程内存管理。

目录

一、项目介绍

        1. 池化技术

        2. 内存池

        3. tcmalloc与malloc

二、开胃菜 --- 定长内存池设计

        1. 定长内存池申请释放逻辑

        2. 定长内存池的设计思路

3. 向系统申请与释放内存

4. FreeList的代码设计

5. alloc / free

6. New / Delete

三、项目整体框架设计思路

        1. 项目整体框架介绍

        2. (bytes <= 256)内存的申请逻辑

        3. (bytes <= 256)内存的释放逻辑

四、高并发内存池 --- thread cache

五、高并发内存池 --- central cache

六、高并发内存池 --- pages cache

七、高并发内存池  --- 大块内存的申请

八、高并发内存池  --- 定长内存池替换new/malloc

九、高并发内存池  --- 基数树存储[PAGE_ID,Span*]的映射

十、源代码


一、项目介绍

        1. 池化技术

        何为池化技术?一次申请过量资源用以留存备用来避免频繁申请资源导致的性能损耗

        2. 内存池

        内存池,向系统申请内存的时候是过量申请的,多申请出来的内存用于下次使用;当释放内存的时候,也不会把这些内存还给操作系统,而是当进程结束后,才会真正的归还内存于操作系统。我们平时用的malloc的底层代码设计,就采用了内存池。

        内存池主要解决的问题有:性能问题(系统调用的性能消耗),一定程度上解决外碎片问题(小块内存空间因不连续而造成的浪费),减少内碎片问题的发生。

        3. tcmalloc与malloc

        多线程环境下,为了不破坏malloc底层,不可避免的会出现锁竞争问题tcmalloc对多线程环境下,锁竞争问题有了更进一步的优化,解决。所以,tcmalloc在多线程环境下,会比malloc更胜一筹;

二、开胃菜 --- 定长内存池设计

        1. 定长内存池申请释放逻辑

        所谓定长内存池只负责一种大小的内存申请释放;用户申请sizebytes内存的时候,首先会去检查_freelist指向的链表是否有剩余,若有剩余,则返回第一个结点的地址,若无剩余,则去查看_memory指向的内存块剩余的内存大小,是否满足申请需求sizebytes,若满足,则从中切割sizebytes大小的内存返回,若依旧不满足,则向系统申请过量内存。释放内存的时候,将内存块头插到空闲链表_freelist中即可。

        2. 定长内存池的设计思路
    class FreeList;template<class T> // T为类型,sizeof T 可求出需要申请内存的大小class ObjectPool // 定长内存池{public:T* alloc(); // 仅完成内存申请void free(T* obj); // 仅完成内存的释放T* New(); // 完成内存申请 + 对象初始化void Delete(T* obj); // 完成内存释放 + 对象销毁private:char* _memory = nullptr; // 指向系统申请的过量内存size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数FreeList _freelist; // 自由链表,管理还回来的小块内存std::mutex _mtx; // 多个线程使用同一个内存池,需要互斥锁保持同步};class FreeList // 管理小块内存的自由链表{public:void clear(); // _freelist = nullptr,_size = 0void push_front(void* obj); // 头插void pop_front(); // 头删void* front() const; // 获取第一个内存块地址bool empty() const; // 判断链表是否为空size_t size() const; // 返回FreeList管理的小块内存个数private:void* _freelist = nullptr;size_t _size = 0;};

         正在被管理的每一块内存,会将起始4字节/8字节空间存储下一块内存的起始地址,最后一块内存的起始4/8字节值为nullptr。

3. 向系统申请与释放内存
#define UNIT_PAGE_SHIFT 13 // 1 << 13即为一页内存的大小 #include <Windows.h>void* SystemAlloc(size_t kpage) // 按页申请内存{
#ifdef _WIN32 // windows系统void* ptr = VirtualAlloc(0, kpage << UNIT_PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#elif __linux__ // linux系统
#endifif (ptr == nullptr) throw std::bad_alloc();return ptr;}void SystemFree(void* ptr){if (ptr == nullptr) return;
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#elif __linux__ // linux系统
#endif}
4. FreeList的代码设计
class FreeList // 管理小块内存的自由链表
{
public:void clear();void push_front(void* obj);void pop_front();void* front() const;bool empty() const;size_t size() const;
private:void* _freelist = nullptr;size_t _size = 0;
};// 指针ptr前4/8个字节的信息
static inline void*& NextMemoryAddr(void* ptr) { return *(void**)ptr; } void FreeList::clear()
{_freelist = nullptr;_size = 0;
}
void FreeList::push_front(void* obj)
{assert(obj);NextMemoryAddr(obj) = _freelist;_freelist = obj;++_size;
}
void FreeList::pop_front()
{assert(_freelist);_freelist = NextMemoryAddr(_freelist);--_size;
}
void* FreeList::front() const
{assert(_freelist);return _freelist;
}
bool FreeList::empty() const
{return _size == 0;
}
size_t FreeList::size() const
{return _size;
}

         结合上文所讲的定长内存池的设计思路,只要熟悉链表的相关操作,FreeList的类实现就不成问题,本质就是阉割后的单向链表。为什么要封装FreeList?后续各种模块都需要使用FreeList类,甚至需要FreeList进行函数之间的传参。所以进行封装是优化代码逻辑不可避免的。

5. alloc / free
		T* alloc() // 仅完成内存申请{std::lock_guard<std::mutex> lock(_mtx);// 优先把还回来内存块对象,再次重复利用if (_freelist.empty() == false){T* obj = (T*)_freelist.front();_freelist.pop_front();return obj;}else{// 剩余内存不够一个对象大小时,则重新开大块空间if (_remainBytes < sizeof(T)){_remainBytes = 131072; // 128字节_memory = (char*)SystemAlloc(_remainBytes >> UNIT_PAGE_SHIFT);if (_memory == nullptr)throw std::bad_alloc();}T* obj = (T*)_memory;size_t objSize = sizeof(T) >= sizeof(void*) ? sizeof(T) : sizeof(void*);_memory += objSize;_remainBytes -= objSize;return obj;}}void free(T* obj){std::lock_guard<std::mutex> lock(_mtx);_freelist.push_front((void*)obj);}

        有一个问题需要解决,那就是申请的内存大小小于一个指针类型所占内存的大小,32位下指针是4字节大小,64位下指针是8字节大小。如何才能统一处理呢? 所以在申请内存的时候,将内存大小最少要开辟为sizeof(void*),即可完成统一处理。

6. New / Delete
	T* New() // 完成内存申请 + 对象初始化{T* obj = alloc();new(obj)T;return obj;}void Delete(T* obj) // 完成内存释放 + 对象销毁{obj->~T();free(obj);}

    三、项目整体框架设计思路

            1. 项目整体框架介绍

    "Common.h"#define MAX_BYTES_COUNT (1 << 18) // 单次从内存池中申请的最大内存 256KB
    #define MIN_BATCH_COUNT 1   // 所获取内存块的批量下限
    #define MAX_BATCH_COUNT 512 // 所获取内存块的批量上限
    #define MAX_BARREL_COUNT 208 // thread cache与central cache的最大桶数
    #define MAX_PAGES_COUNT 129  // 单个span的最大页数
    #define UNIT_PAGE_SHIFT 13 // 一页的大小8KB
    "ConMalloc.h"// 用户调用该函数,申请size大小的内存,成功则返回内存的首地址
    void* CacheAlloc(size_t size){ ...; } 
    // 用户调用该函数,释放obj指向的内存
    void CacheFree(void* obj){ ...; }

            ConMalloc核心功能与特点介绍:

    • CacheAlloc对应库里的malloc,用来申请内存
    • CacheFree对应库里的free ,用来释放内存
    "ThreadCache.h"#define MAX_BARREL_COUNT 208class ThreadCache
    {
    public:void* Alloc(size_t sizeByte); // 申请内存void FetchFromCentralCache(size_t alignBytes); // ThreadCache内存不足void Dealloc(void* obj, size_t sizeByte); // 回收内存void RecycleFromThreadCache(size_t index, size_t count); // ThreadCache占用内存过多
    private:Tool::FreeList _freelist[MAX_BARREL_COUNT]; // 管理小块内存的自由链表集合
    };extern thread_local ThreadCache* thread_cache; // 创建了一个线程看见的全局变量

            ThreadCache核心功能与特点介绍:

    • ThreadCache类会在每一个线程的上下文实例化一份,调用ThreadCache相关接口的时候,不存在线程安全等问题
    • ThreadCache::Alloc与ThreadCache::FetchFromCentralCache:当被调用时,首先通过sizeByte经过一系列的计算找到对应的桶_freelist[index],如果_freelist[index]不为空,则返回第一个结点。如果_freelist[index]为空,则调用ThreadCache::FetchFromCentralCache,向CentralCache获取内存。
    • ThreadCache::Dealloc与ThreadCache::RecycleFromThreadCache:当被调用时,首先通过sizeByte经过一系列的计算找到对应的桶_freelist[index],后进行头插即可,以便下次使用;而ThreadCache::RecycleFromThreadCache,则是通过某些算法,为了避免ThreadCache占用空闲内存过多,且其它线程看不到,所造成内存利用效率不高等问题。
    "CentralCache.h"class CentralCache
    {
    public:// apply memory for thread cacheunsigned Alloc(FreeList& retList,size_t alignBytes, size_t count); Span* FetchFromPagesCache(size_t alignBytes, size_t count);// Recycle memory for thread cachevoid Dealloc(FreeList list); void RecycleFromPagesCache(Span* span);void lock(size_t index) { _mtx[index].lock(); }void unlock(size_t index) { _mtx[index].unlock(); }private:SpanList _spanlist[MAX_BARREL_COUNT]; // 管理大块内存的链表集合std::mutex _mtx[MAX_BARREL_COUNT]; // 桶锁
    };
    "PagesCache.h"class PagesCache : public nocopy
    {
    public:Span* Alloc(unsigned pages);void Dealloc(Span* span);
    private:SpanList _pages[MAX_PAGES_COUNT]; // 管理大块内存的集合std::mutex _mtx; 
    };

              其余模块的核心功能与特点介绍:

      • 每一个模块都离不开这四个接口:申请内存、自身所管理的内存不足向上一层申请、释放内存、自身所负责的内存过度归还内存给上一层。而PagesCache没有内存的时候,会直接向系统申请。至于释放内存与系统,则会等到整个程序运行结束后。
      • CentralCache与PagesCache是全局可见,所以多线程访问的时候,需要申请互斥锁,以保持同步。
      • CentralCache是桶锁,只有多线程同时访问到具体某个桶的时候,才会存在竞争。
      • PagesCache则不同,只要调用该类的接口,就需要申请锁。
      #include <Windows.h>
      void* SystemAlloc(size_t kpage)
      {
      #ifdef _WIN32 // windows系统void* ptr = VirtualAlloc(0, kpage << UNIT_PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
      #elif __linux__ // linux系统
      #endifif (ptr == nullptr) throw std::bad_alloc();return ptr;
      }
      void SystemFree(void* ptr)
      {if (ptr == nullptr) return;
      #ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
      #elif __linux__ // linux系统
      #endif
      class FreeList // 管理小块内存的自由链表
      {
      public:void clear();void push_front(void* obj);void pop_front();void* front() const;bool empty() const;size_t size() const;
      private:void* _freelist = nullptr;size_t _size = 0;
      };

              FreeList类 --- 管理小块内存的数据结构(单链表)

      class Span  // 大块内存的信息描述
      {
      public:PAGE_ID _page_id = 0; // 起始页PAGE_NUM _page_len = 0; // 页的长度bool _use_if = false; // 是否正在使用(挂载在CentralCache位置即算使用)unsigned _size = 0;// 记录该Span切割后小块内存的大小unsigned _use_count = 0; // 正在使用的小块个数FreeList _freelist; // 对(由span平均分割后)小块内存的管理;Span* _prev = nullptr; // 前一个大块内存的地址Span* _next = nullptr; // 下一个大块内存的地址
      };

              Span类 --- 以页为单位,对大块内存的组织(我规定的是一页为8KB),Span的存在,最大的用处在于缓解外碎片问题。

      class SpanList // 大块内存的管理 
      {
      public:static void erase(Span* span); // 将span从所属的带头双向循环链表中剥离出来SpanList(){_head = new Span;_head->_prev = _head;_head->_next = _head;}bool empty();Span* front();Span* GetNoEmptySpan();void pop_front();void push_front(Span* obj);private:Span* _head; // 头指针
      };

              SpanList --- 管理大块内存的数据结构(带头双向循环链表)

              2. (bytes <= 256)内存的申请逻辑

              3. (bytes <= 256)内存的释放逻辑

      四、高并发内存池 --- thread cache

              ThreadCache,负责申请内存<256KB的小块内存,我们已经知道开辟的内存大小至少为8字节(因为64位平台下,指针的大小为8字节),如果用自由链表管理所有大小的内存块,即大概需要256KB * 8 ≈ 200w字节,而每个线程上下文都会存在一个ThreadCache对象,那将太恐怖了。所以我们需要采用一个内存对齐数,但是内存对齐数采用也会有不同的影响

      • 采用的内存对齐数较小:无法缓解_freelist的长度过大问题,如对齐数为8、16、32、都不太行。
      • 采用的内存对齐数较大:因内存对齐所造成内碎片问题则会很严重。

              所以区间[1,256KB],不应该采用的一样的内存对齐数,应该根据区间的不同设置不同的内存对齐数。下面则有一组关于内存对齐数的设计,以及求对齐数与对应桶位置的函数声明与实现。

       整体控制在最多10%左右的内碎⽚浪费
       [1,128] 8byte对⻬ freelist[0,16)
       [128+1,1024] 16byte对⻬ freelist[16,72)
       [1024+1,8*1024] 128byte对⻬ freelist[72,128)
       [8*1024+1,64*1024] 1024byte对⻬ freelist[128,184)
       [64*1024+1,256*1024] 8*1024byte对⻬ freelist[184,208)

      所以freelist的桶数只需要208个。

      static inline size_t _Align(size_t sizeBytes, size_t alignShift)
      {return (sizeBytes + ((1 << alignShift) - 1)) & (~((1 << alignShift) - 1));
      }
      static inline size_t _Index(size_t alignBytes, size_t alignShift)
      {return ((alignBytes + (1 << alignShift) - 1) >> alignShift) - 1;
      }
      size_t Align(size_t sizeByte)
      {assert(sizeByte); // 申请大小为0的字节int align_shift[] = { 3,4,7,10,13 };if (sizeByte <= 128)return _Align(sizeByte, align_shift[0]);else if (sizeByte <= 1024)return _Align(sizeByte, align_shift[1]);else if (sizeByte <= 8 * 1024)return _Align(sizeByte, align_shift[2]);else if (sizeByte <= 64 * 1024)return _Align(sizeByte, align_shift[3]);elsereturn _Align(sizeByte, align_shift[4]);
      }
      size_t Index(size_t alignByte)
      {assert(alignByte);int align_shift[] = { 3,4,7,10,13 };int prev_count[] = { 0,16,72,128,184 };if (alignByte <= 128)return _Index(alignByte, align_shift[0]) + prev_count[0];else if (alignByte <= 1024)return _Index(alignByte - 128, align_shift[1]) + prev_count[1];else if (alignByte <= 8 * 1024)return _Index(alignByte - 1024, align_shift[2]) + prev_count[2];else if (alignByte <= 64 * 1024)return _Index(alignByte - 8 * 1024, align_shift[3]) + prev_count[3];elsereturn _Index(alignByte - 64 * 1024, align_shift[4]) + prev_count[4];
      }

               因为ThreadCache在每个线程的上下文都会实例化一份,所以申请内存的时候是线程安全的,不需要锁。而这个特性也是tcmalloc会比库里malloc在多线程环境下,效率更高的地方所在。

      // 创建了一个线程看见的全局变量

      thread_local ThreadCache* thread_cache = nullptr;

      void* ThreadCache::Alloc(size_t sizeByte)
      {assert(sizeByte <= MAX_BYTES_COUNT);// step1 --- 通过size进行对齐与桶下标映射size_t alignBytes = Align(sizeByte);size_t index = Index(alignBytes);// step2 --- 若thread cache已无内存,则先从Center Cache中获取批量内存if (_freelist[index].empty())FetchFromCentralCache(alignBytes);// step3 --- 从Thread Cache中获取内存void* obj = _freelist[index].front();_freelist[index].pop_front();return obj;
      }void ThreadCache::FetchFromCentralCache(size_t alignBytes) // 从CentralCache中获取内存
      {size_t index = Index(alignBytes);// step1 --- 计算获取内存的批量// 慢开始算法,通过_batch的调节,申请的次数越多,批量值则越大。int batchSize = std::min(_batch[index], BatchNum(alignBytes));_batch[index] += (batchSize < BatchNum(alignBytes) ? 1 : 0);// step2 --- 向central cache申请内存FreeList list; // 输出型参数,用来接受获得的内存CentralCache::GetCentralCache()->lock(index);int acturalSize = CentralCache::GetCentralCache()->Alloc(list, alignBytes, batchSize);CentralCache::GetCentralCache()->unlock(index);assert(acturalSize >= 1); // 至少申请成功一块,否则程序出现问题// step3 --- 将list插入到_freelist[index]while (list.empty() == false){auto front = list.front();list.pop_front();_freelist[index].push_front(front);}
      }

              ThreadCache::FetchFromCentralCache():ThreadCache去向CentralCache申请内存的时候,应该申请过量的内存,以便下次的使用。但一次申请多少块内存才合适呢?其次,在大部分情况下,较大内存使用的频率会比小块内存的频繁低。所以就有了下面函数的设计。

      unsigned BatchNum(size_t alignByte) // 计算获取内存的批量
      {assert(alignByte);// 申请的内存块单位大小与获取的内存批量成反比int retCount = MAX_BYTES_COUNT / alignByte;// 调下限retCount = std::max(MIN_BATCH_COUNT, retCount);// 调上限retCount = std::min(MAX_BATCH_COUNT, retCount);return retCount;
      }
      void ThreadCache::Dealloc(void* obj, size_t sizeByte)
      {// 1. 我有个疑问,要是obj是野指针,怎么办???,凡是obj指向的地址,没有对应的Span就是野指针// 2. 我又有个疑问,要是obj重复释放,怎么办// step1 --- 通过size进行对齐与桶下标映射size_t alignBytes = Align(sizeByte);size_t index = Index(alignBytes);// step2 --- 完成thread cache的回收_freelist[index].push_front(obj);// step3 --- 如果_freelist[index]已经存在的个数,>单次申请的批量,进行回收if (_freelist[index].size() >= _batch[index])RecycleFromThreadCache(index, _batch[index]); // 回收的位置,回收的个数
      }
      void ThreadCache::RecycleFromThreadCache(size_t index,size_t count)
      {FreeList list; // 回收的内存快while (!_freelist[index].empty() && list.size() < count){auto front = _freelist[index].front();_freelist[index].pop_front();list.push_front(front);}// 调用central cache的相应接口CentralCache::GetCentralCache()->lock(index);CentralCache::GetCentralCache()->Dealloc(list);CentralCache::GetCentralCache()->unlock(index);
      }

              回收内存的时候,找到内存块对应的索引位置,然后头插即可。当_freelist[index].size() >= _batch[index]的时候。则开始回收_freelist[index]上全部的内存块。

      五、高并发内存池 --- central cache

               CentralCache类的特点:不可拷贝的、符合单例模式,全局唯一、多个线程访问同一个桶的时候,需要申请锁。

      class CentralCache : public nocopy
      {CentralCache() = default;static CentralCache central_cache;
      public:static CentralCache* GetCentralCache() {// 饿汉模式return &central_cache;}unsigned Alloc(FreeList& retList,size_t alignBytes, size_t count); // apply memory for thread cacheSpan* FetchFromPagesCache(size_t alignBytes, size_t count);void Dealloc(FreeList list); // Recycle memory for thread cachevoid RecycleFromPagesCache(Span* span);void lock(size_t index) { _mtx[index].lock(); }void unlock(size_t index) { _mtx[index].unlock(); }private:SpanList _spanlist[MAX_BARREL_COUNT]; // 管理大块内存的链表集合std::mutex _mtx[MAX_BARREL_COUNT]; // 桶锁
      };

              为了简化项目代码的复杂度,我们对图示所用的带头双向循环链表进行了封装。

      class SpanList // 大块内存的管理 
      {
      public:static void erase(Span* span); // 将span从所属的带头双向循环链表中剥离出来SpanList(){_head = new Span;_head->_prev = _head;_head->_next = _head;}bool empty();Span* front();Span* GetNoEmptySpan();void pop_front();void push_front(Span* obj);private:Span* _head; // 头指针
      };bool SpanList::empty()
      {assert(_head);return _head->_next == _head;
      }
      Span* SpanList::front()
      {assert(!empty());return _head->_next;
      }
      Span* SpanList::GetNoEmptySpan()
      {if (empty()) return nullptr;auto cur = _head->_next;while (cur != _head){if (cur->_freelist.empty() == false)return cur;cur = cur->_next;}return nullptr;
      }
      void SpanList::pop_front()
      {assert(!empty());erase(_head->_next);
      }
      void SpanList::push_front(Span* obj)
      {auto next = _head->_next;obj->_next = next;obj->_prev = _head;next->_prev = obj;_head->_next = obj;
      }
      void SpanList::erase(Span* obj) // 将span从所属的带头双向循环链表中剥离出来
      {assert(obj); assert(obj->_prev && obj->_next);auto prev = obj->_prev;auto next = obj->_next;prev->_next = next;next->_prev = prev;obj->_prev = obj->_next = nullptr;
      }
      unsigned CentralCache::Alloc(FreeList& retList, size_t alignBytes, size_t count)
      {// stpe0 --- 根据所需申请内存大小alignBytes,求出对应的索引size_t index = Index(alignBytes);// step1 --- 在_spanlist[index]找到一个非空的span,否则,就继续向pages cache申请内存Span* span = _spanlist[index].GetNoEmptySpan();if (span == nullptr){unlock(index);span = FetchFromPagesCache(alignBytes, count);lock(index);_spanlist[index].push_front(span);}// step2 --- 获取actualSize个内存块(actualSize<=count)unsigned actualSize = 0;while (span->_freelist.empty() == false && actualSize < count){auto front = span->_freelist.front();span->_freelist.pop_front();span->_use_count++;retList.push_front(front);actualSize++;}return actualSize;
      }
      Span* CentralCache::FetchFromPagesCache(size_t alignBytes, size_t count)
      {// step1 --- 计算我要获取多大的span,我得知道内存块大小(alignBytes)、批量内存个数(可通过对齐内存进行计算)PAGE_NUM len = PagesNum(alignBytes * count);// step2 --- 从PagesCache中获取一个spanPagesCache::GetCentralCache()->mtx().lock();Span* newSpan = PagesCache::GetCentralCache()->Alloc(len);newSpan->_use_if = true;newSpan->_size = alignBytes;newSpan->_use_count = 0;newSpan->_freelist.clear();PagesCache::GetCentralCache()->mtx().unlock();// 此时span已进入Central cache层,修改_use_if状态// step3 --- 将新获得的Span进行切割,切割出alignBytes的内存块挂在自由链表上,其余的忽略char* start = (char*)(newSpan->_page_id << UNIT_PAGE_SHIFT);char* end = (char*)((newSpan->_page_id + newSpan->_page_len) << UNIT_PAGE_SHIFT);while ((size_t)(end - start) >= alignBytes) // 左闭右开{// 将[start,end]这段内存切快插到span的自由链表上,头插即可char* next = start + alignBytes;newSpan->_freelist.push_front(start);start = next;}return newSpan;
      }
      void CentralCache::Dealloc(FreeList list) // Recycle memory for thread cache
      {{std::lock_guard<std::mutex> lock(PagesCache::GetCentralCache()->mtx());auto& hash = PagesCache::GetCentralCache()->Hash();while (list.empty() == false){auto front = list.front(); // front指向内存块的起始地址list.pop_front();PAGE_ID pageid = AddrToPageID(front);Span* span = hash[pageid];span->_use_count--;span->_freelist.push_front(front);// 要离开Central Cache层了if (span->_use_count == 0){span->_use_if = false;span->_size = 0;span->_freelist.clear();RecycleFromPagesCache(span);}}}
      }
      void CentralCache::RecycleFromPagesCache(Span* span)
      {// 1. 将span从central cache挂载位置剥离下来 ???SpanList::erase(span);// 2. 交给pages cachaPagesCache::GetCentralCache()->Dealloc(span);
      }

      六、高并发内存池 --- pages cache

      #pragma once#include "Common.h"
      #include <unordered_map>class PagesCache : public nocopy
      {static PagesCache* ptr_pages_cache;static std::mutex global_mtx;
      public:static PagesCache* GetCentralCache(){//if (ptr_pages_cache)//	return ptr_pages_cache;std::lock_guard<std::mutex> lock(global_mtx); // 自动加锁if (ptr_pages_cache == nullptr){static ObjectPool <PagesCache> pages_cache_pool;// 内存申请 与 对象构造分离 其实饿汉单例模式能解决问题且更简单,但不影响正确性,我不想修改了ptr_pages_cache = pages_cache_pool.alloc();new(ptr_pages_cache)PagesCache;}return ptr_pages_cache;}Span* Alloc(unsigned pages);void Dealloc(Span* span);std::mutex& mtx() { return _mtx; }std::unordered_map<PAGE_ID, Span*>& Hash() { return _hash; };ObjectPool<Span>& SpanPool() { return _span_pool; };private:SpanList _pages[MAX_PAGES_COUNT];std::unordered_map<PAGE_ID, Span*> _hash;ObjectPool<Span> _span_pool;std::mutex _mtx; // 使用PagesCache的接口,需要先申请锁,但个人认为_hash与内存池的使用可单独用个锁,等调试通了再说吧.// 其次函数内部使用锁,可优化函数的使用
      };
      
      #include "PagesCache.h"
      #include <windows.h>PagesCache* PagesCache::ptr_pages_cache = nullptr;
      std::mutex PagesCache::global_mtx;Span* PagesCache::Alloc(unsigned pagelen)
      {// 1. 尝试在[pagelen,MAX_PAGES_COUNT-1]中查找一个>=pagelen的spanfor (unsigned i = pagelen; i < MAX_PAGES_COUNT; i++){SpanList& head = _pages[i];if (head.empty() == false){Span* ret = _pages[i].front();_pages[i].pop_front();// 1. 如果是一个大小合适的Span,将其Span每一页进行记录,返回给上一层即可// 2. 如果是较大的Span,分割成[待返回Span,剩余Span]if (ret->_page_len == pagelen){// 记录所有ret页号的对应for (PAGE_ID i = ret->_page_id; i < ret->_page_id + ret->_page_len; i++){_hash[i] = ret;}return ret;}else{Span* remain = _span_pool.New();remain->_page_id = ret->_page_id + pagelen;remain->_page_len = ret->_page_len - pagelen;_pages[remain->_page_len].push_front(remain);ret->_page_len = pagelen;// 记录起始页号与结束页号_hash[remain->_page_id] = remain;_hash[remain->_page_id + remain->_page_len - 1] = remain;// 记录所有ret页号的对应for (PAGE_ID i = ret->_page_id; i < ret->_page_id + ret->_page_len; i++){_hash[i] = ret;}return ret;}}}// 2. 向系统申请一块128(MAX_PAGES_COUNT - 1)页的内存(暂时先用malloc向系统申请空间,后期优化用系统调用)// 1. 但是系统申请的内存块首地址,不一定会是8KB的倍数啊,应该是需要系统直接帮我们申请一个对齐到8KB的内存块void* ptr = SystemAlloc(MAX_PAGES_COUNT - 1);// 根据所申请内存的起始地址填充spanSpan* newSpan = _span_pool.New();newSpan->_page_id = (PAGE_ID)ptr >> UNIT_PAGE_SHIFT;newSpan->_page_len = MAX_PAGES_COUNT - 1;// 挂载Span_pages[newSpan->_page_len].push_front(newSpan);// 记录起始页号与结束页号[_page_id,_page_len - 1]_hash[newSpan->_page_id] = newSpan;_hash[newSpan->_page_id + newSpan->_page_len - 1] = newSpan;return Alloc(pagelen);
      }
      #include <iostream>
      std::mutex mmmm;
      void PagesCache::Dealloc(Span* span)
      {// 1.从Span的PAGE_ID往前查找,合并Span,直到PAGE_NUM == MAX_PAGES_COUNT - 1;PAGE_ID prev_page_id = span->_page_id - 1;while (span->_page_len < MAX_PAGES_COUNT){auto ret = _hash.find(prev_page_id);if (ret == _hash.end()){break;}Span* prev_span = ret->second;// 找到了前面的Spanif (prev_span->_page_len + span->_page_len >= MAX_PAGES_COUNT) break;if (prev_span->_prev == nullptr || prev_span->_next == nullptr){int x = 0;}if (prev_span->_use_if == false){span->_page_id = prev_span->_page_id;span->_page_len = prev_span->_page_len + span->_page_len;prev_page_id = span->_page_id - 1; SpanList::erase(prev_span);_span_pool.Delete(prev_span);// 删除Span之后理应有后续的处理for (PAGE_ID i = span->_page_id; i < span->_page_id + span->_page_len; i++){_hash[i] = span;}}else{break;}}// 2.从Span的PAGE_ID往后查找,合并Span,直到PAGE_NUM == MAX_PAGES_COUNT - 1;PAGE_ID next_page_id = span->_page_id + span->_page_len;while (span->_page_len < MAX_PAGES_COUNT){auto ret = _hash.find(next_page_id);if (ret == _hash.end()){break;}Span* next_span = ret->second;if (next_span->_prev == nullptr || next_span->_next == nullptr){int x = 0;}// 找到了后面的Spanif (next_span->_page_len + span->_page_len >= MAX_PAGES_COUNT) break;if (next_span->_use_if == false){span->_page_len = next_span->_page_len + span->_page_len;next_page_id = span->_page_id + span->_page_len;SpanList::erase(next_span);_span_pool.Delete(next_span);// 删除Span之后理应有后续的处理for (PAGE_ID i = span->_page_id; i < span->_page_id + span->_page_len; i++)_hash[i] = span;}else{break;}}// 3.完成挂载if (span->_page_len < MAX_PAGES_COUNT)_pages[span->_page_len].push_front(span);elseassert(-1);
      }
      

      七、高并发内存池  --- 大块内存的申请

              当我们完成了高并发内存池的核心功能:申请小于256KB内存,与释放小于256KB内存。我们就需要考虑大块内存的申请问题。我是如何设计大块内存的申请逻辑呢?请看如图所示:

              所以我们就需要针对CacheAlloc函数接口,完成对上面所述逻辑的封装;当然,我们已经理解并完成了项目申请<=256KB大小内存的代码后,完成这个工作,是手拿把掐般的容易。代码和注释如下:

      void* CacheAlloc(size_t size)
      {if (size > ((MAX_PAGES_COUNT - 1) << UNIT_PAGE_SHIFT)) // size > 128 pages{PAGE_NUM pageNum = PagesNum(size);void* ptr = SystemAlloc(pageNum); // 这是我们封装的系统调用{Span* newSpan = PagesCache::GetCentralCache()->SpanPool().New();newSpan->_page_id = AddrToPageID(ptr);newSpan->_page_len = pageNum;newSpan->_use_if = true;newSpan->_size = size; newSpan->_use_count = 1;PagesCache::GetCentralCache()->Hash().set(newSpan->_page_id, newSpan);}return ptr;}else if (size > MAX_BYTES_COUNT) // size > 256 KB{PAGE_NUM pageNum = PagesNum(size);Span* newSpan = nullptr;{std::lock_guard<std::mutex> lock(PagesCache::GetCentralCache()->mtx());newSpan = PagesCache::GetCentralCache()->Alloc(pageNum); // 从Pages Cache层申请内存}newSpan->_use_if = true;newSpan->_size = size;newSpan->_use_count = 1;return (void*)((newSpan->_page_id) << UNIT_PAGE_SHIFT);}else // size > 0 bytes{if (thread_cache == nullptr){static ObjectPool <ThreadCache> thread_cache_pool;thread_cache = thread_cache_pool.New();}return thread_cache->Alloc(size);}
      }

              这里还有一个特别重要的问题,需要着重强调与阐述的:

      1. 我们使用的内存都是有页号的 --- 内存 / 8KB,就是页号
      2. 每一块内存都是被Span描述的 --- ThreadCache层使用的小块内存都是由大块内存切割而来,而Span是以页为单位的管理的大块内存

              所以我们所使用的内存,就得保存内存的相关信息,如:该内存块的大小、来自哪一个Span(被切割使用,还是整体使用)、该内存对应的页号、Span是否被使用等等重要信息。

      • 如果你不知道该内存块的大小,那释放该内存的时候就有如何知道释放多少字节;其次申请内存的情况分三种,那释放内存的时候自然也有三种情况,你又改如何判断呢?
      • CentralCache回收ThreadCache内存的时候,你是通过修改Span::的use_count变量-1,头插Span::freelist指向的链表。如果你不知道该内存块来自哪一个Span,你又该如何进行这个工作呢?
      • 我们又知道Span::use_if,如变量名含义那样:是否被使用,闲置在PagesCache层为true,其余情况为false。可是会有人会想,这是一个来标记一个Span是否在PagesCache层,还是在CentralCache层的变量。可是当一个Span正在合并大块内存的时候,他前面的那一块Span也碰巧正在合并大块内存,而他们的成员属性都是false,那他们直接就会相互合并,导致了一个多线程问题:数据不一致问题。
      • 这些工作,都是上面我们完成内存池的核心逻辑的时候,就已经做过的。但这个认知很重要。

              如果你不注意这一点,轻则代码运行逻辑出现错误,重则出现野指针问题。多线程环境下,会有更棘手的线程安全问题(一共有三处线程安全问题,我被线程安全问题折磨了三四天)

              所以无论从系统堆中申请的内存,还是从三层缓存中申请的内存,都要保证有对应的数据结构Span对象,并正确的描述着。且维护好全局的页表:页号与Span*的对应关系。因为内存块首地址可以求出页号。

              完成了项目申请内存的全部工作后,释放内存就很小儿科的问题,反着来呗,代码如下:

      void CacheFree(void* obj)
      {if (obj == nullptr)return;// step1 --- 根据obj求出obj指向内存块对应的size大小PAGE_ID pageId = AddrToPageID(obj);Span* span = (Span*)PagesCache::GetCentralCache()->Hash().get(pageId);if (span == nullptr)assert(-1); // 按理来说,不会走到这个地方,用户使用的每一块内存的所在页都应该有所记录size_t size = span->_size;// step2 --- 根据size的大小释放内存if (size > ((MAX_PAGES_COUNT - 1) << UNIT_PAGE_SHIFT)) // size > 128 pages{SystemFree(obj);PagesCache::GetCentralCache()->Hash().set(pageId,nullptr);PagesCache::GetCentralCache()->SpanPool().Delete(span);}else if (size > MAX_BYTES_COUNT) // size > 256 KB{span->_use_if = false;span->_size = 0;span->_use_count = 0;{std::lock_guard<std::mutex> lock(PagesCache::GetCentralCache()->mtx());PagesCache::GetCentralCache()->Dealloc(span);}}else{if (thread_cache == nullptr)assert(-1);thread_cache->Dealloc(obj, size);}
      }

      八、高并发内存池  --- 定长内存池替换new/malloc

              项目中会使用new的地方无非就两处:生成ThreadCache对象、生成Span对象用以管理大块内存;我们在开始写项目前,就已经完成了定长内存池的设计,替换malloc,那还不简单?(其余我们项目中使用的unordered_map底层也使用了malloc,这个问题也可以解决,请看:九、高并发内存池  --- 基数树存储[PAGE_ID,Span*]的映射)

              生成ThreadCache对象:(因为项目我是先写完的,可能上面有的地方已经采用了这种写法)

      		if (thread_cache == nullptr){static ObjectPool <ThreadCache> thread_cache_pool;thread_cache = thread_cache_pool.New();}return thread_cache->Alloc(size);

              生成Span对象用以管理大块内存:只要PagesCache层有生成Span对象的需要求,所以我们在PagesCache类中添加一个成员变量

      class PagesCache : public nocopy
      {
      public:static PagesCache* GetCentralCache() { return &pages_cache; }// ...ObjectPool<Span>& SpanPool() { return _span_pool; };
      private:SpanList _pages[MAX_PAGES_COUNT];std::mutex _mtx;#ifdef _WIN64 // windows 64TCMalloc_PageMap3<64 - UNIT_PAGE_SHIFT> _hash;
      #elif _WIN32 // windows 32TCMalloc_PageMap2<32 - UNIT_PAGE_SHIFT> _hash;
      #endifObjectPool<Span> _span_pool; // 这就是我们添加的Span对象池
      };

      九、高并发内存池  --- 基数树存储[PAGE_ID,Span*]的映射

      #pragma once#include"Common.h"template <size_t BITS>
      class TCMalloc_PageMap1 {
      private:static const size_t LENGTH = 1 << BITS; void** array_;public:typedef uintptr_t Number;explicit TCMalloc_PageMap1(){const size_t bytes = sizeof(void*) * LENGTH;array_ = static_cast<void**>(SystemAlloc(PagesNum(bytes)));memset(array_, 0, bytes);}// 简化边界检查,使用位运算替代比较void* get(Number k) const {return (k >> BITS) ? nullptr : array_[k];}// 添加断言确保k在有效范围内void set(Number k, void* v) {assert((k >> BITS) == 0);array_[k] = v;}
      };template <size_t BITS>
      class TCMalloc_PageMap2 {
      private:static const size_t ROOT_BITS = 5;static const size_t LEAF_BITS = BITS - ROOT_BITS;static const size_t ROOT_LENGTH = 1 << ROOT_BITS;static const size_t LEAF_LENGTH = 1 << LEAF_BITS;struct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH];// ObjectPool<Leaf> leaf_pool_;  // 使用成员变量而非静态变量public:typedef uintptr_t Number;explicit TCMalloc_PageMap2(){memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}// 使用更简洁的位运算表达式void* get(Number k) const {if ((k >> BITS) > 0) return nullptr;const auto i1 = k >> LEAF_BITS;const auto i2 = k & (LEAF_LENGTH - 1);return root_[i1] ? root_[i1]->values[i2] : nullptr;}// 添加更详细的断言检查void set(Number k, void* v) {assert((k >> BITS) == 0);assert(root_[k >> LEAF_BITS] != nullptr);const auto i1 = k >> LEAF_BITS;const auto i2 = k & (LEAF_LENGTH - 1);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key < start + n;) {const auto i1 = key >> LEAF_BITS;if (i1 >= ROOT_LENGTH) return false;if (!root_[i1]) {//Leaf* leaf = leaf_pool_.New();// 直接通过系统调用申请内存Leaf* leaf = static_cast<Leaf*>(SystemAlloc(PagesNum(sizeof(Leaf))));if (!leaf) return false;memset(leaf, 0, sizeof(Leaf));root_[i1] = leaf;}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {Ensure(0, 1 << BITS);}
      };// 三级基数树
      // [root_] (Node**)        // 第0层:根指针(固定)
      // |
      // [Node]                // 第1层:动态分配(i1索引)
      // |
      // [Node]                // 第2层:动态分配(i2索引)
      // |
      // [Leaf]                // 第3层:实际存储数据(i3索引)
      // |
      // [values[LEAF_LENGTH]]    // 数据数组
      template <size_t BITS>
      class TCMalloc_PageMap3 {
      private:// 定义每一级消耗的位数static const size_t INTERIOR_BITS = (BITS + 2) / 3; // 向上取整static const size_t INTERIOR_LENGTH = 1 << INTERIOR_BITS;// 定义叶子级消耗的位数static const size_t LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const size_t LEAF_LENGTH = 1 << LEAF_BITS;// 中间节点结构struct Node {Node* ptrs[INTERIOR_LENGTH];};// 叶子节点结构struct Leaf {void* values[LEAF_LENGTH];};Node* root_;                          // 基数树的根节点//ObjectPool<Node> node_pool_;          // 中间节点内存池//ObjectPool<Leaf> leaf_pool_;          // 叶子节点内存池public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(){//root_ = node_pool_.New();root_ = static_cast<Node*>(SystemAlloc(PagesNum(sizeof(Node))));if (root_ == nullptr) {throw std::bad_alloc();}memset(root_, 0, sizeof(Node));}void* get(Number k) const {if ((k >> BITS) > 0) return nullptr;const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if (root_ == nullptr ||root_->ptrs[i1] == nullptr ||root_->ptrs[i1]->ptrs[i2] == nullptr) {return nullptr;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {assert((k >> BITS) == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);// 确保第一级节点存在if (root_->ptrs[i1] == nullptr) {//Node* n = node_pool_.New();Node* n = static_cast<Node*>(SystemAlloc(PagesNum(sizeof(Node))));if (n == nullptr) throw std::bad_alloc();memset(n, 0, sizeof(Node));root_->ptrs[i1] = n;}// 确保第二级节点存在if (root_->ptrs[i1]->ptrs[i2] == nullptr) {// Leaf* leaf = leaf_pool_.New();// 直接通过系统调用申请内存Leaf* leaf = static_cast<Leaf*>(SystemAlloc(PagesNum(sizeof(Leaf))));if (leaf == nullptr) throw std::bad_alloc();memset(leaf, 0, sizeof(Leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// 设置值reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) {return false;}// 确保第一级节点存在if (root_->ptrs[i1] == nullptr) {//Node* n = node_pool_.New();Node* n = static_cast<Node*>(SystemAlloc(PagesNum(sizeof(Node))));if (n == nullptr) return false;memset(n, 0, sizeof(Node));root_->ptrs[i1] = n;}// 确保第二级节点存在if (root_->ptrs[i1]->ptrs[i2] == nullptr) {//Leaf* leaf = leaf_pool_.New();// 直接通过系统调用申请内存Leaf* leaf = static_cast<Leaf*>(SystemAlloc(PagesNum(sizeof(Leaf))));if (leaf == nullptr) return false;memset(leaf, 0, sizeof(Leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {Ensure(0, 1 << BITS);}
      };

              基数树到底是什么,可以自行通过AI或者网上查阅资料进行了解,这里我将着重于讲解我写的代码是如何使用的,以及改如何理解它又是如何可行的。

              同理TCMalloc_PageMap3就是封装之后的三维数组,只需要开辟一维的数组空间,后面二维的可以根据需要再进行开辟,这个机制很类似与操作系统的页表。下面我们来讲解使用:

      #ifdef _WIN64 // windows 64using PageMap = TCMalloc_PageMap3<64 - UNIT_PAGE_SHIFT>;TCMalloc_PageMap3<64 - UNIT_PAGE_SHIFT> _hash;
      #elif _WIN32 // windows 32using PageMap = TCMalloc_PageMap2<32 - UNIT_PAGE_SHIFT>;TCMalloc_PageMap2<32 - UNIT_PAGE_SHIFT> _hash;
      #endif
      

      十、源代码

      高并发内存池源代码链接https://gitee.com/FSRMWK/c-language-project/tree/master/ConMalloc/ConMalloc        代码在windows32/64位平台下都可正常运行,若需要Linux平台下的源代码可通过评论告知。感谢大家的支持。

      http://www.dtcms.com/a/327776.html

      相关文章:

    • windows系统端口异常占用删除教程
    • Go面试题及详细答案120题(0-20)
    • [TryHackMe]Internal(hydra爆破+WordPress主题修改getshell+Chisel内网穿透)
    • 《Q————Mysql连接》
    • Linux软件编程:IO(二进制文件)、文件IO
    • 【25-cv-08993】T Miss Toys 启动章鱼宠物玩具版权维权,15 项动物玩偶版权均需警惕
    • 如何使用gpt进行模式微调(2)?
    • 使用Spring Boot对接欧州OCPP1.6充电桩:解决WebSocket连接自动断开问题
    • 无文件 WebShell攻击分析
    • php+apache+nginx 更换域名
    • SpringCloud 核心内容
    • 82. 删除排序链表中的重复元素 II
    • 计算机网络摘星题库800题笔记 第4章 网络层
    • “冒险玩家”姚琛「万里挑一」特别派对 打造全新沉浸式户外演出形式
    • Javase 之 字符串String类
    • 亚马逊手工制品类目重构:分类逻辑革新下的卖家应对策略与增长机遇
    • 高性能web服务器Tomcat
    • 嵌入式Linux内存管理面试题大全(含详细解析)
    • 元宇宙虚拟金融服务全景解析:技术创新、场景重构与未来趋势
    • 数据结构:链表栈的操作实现( Implementation os Stack using List)
    • LDAP 登录配置参数填写指南
    • 文件io ,缓冲区
    • 【智慧城市】2025年湖北大学暑期实训优秀作品(3):基于WebGIS的南京市古遗迹旅游管理系统
    • 简单的双向循环链表实现与使用指南
    • 小黑课堂计算机一级Office题库安装包2.93_Win中文_计算机二级考试_安装教程
    • 使用shell脚本执行需要root权限操作,解决APK只有系统权限问题
    • mysql参数调优之 sync_binlog (二)
    • 计算机网络摘星题库800题笔记 第2章 物理层
    • 防御保护11
    • Flutter GridView的基本使用