当前位置: 首页 > news >正文

【高并发内存池——项目】central cache 讲解

提示:高并发内存池完整项目代码,在主页专栏项目中

文章目录

目录

提示:高并发内存池完整项目代码,在主页专栏项目中

 高并发内存池整体框架设计

一、central cache 完整代码实现

Common.h

ThreadCache.h

ThreadCache.cpp

CentralCache.h

CentralCache.cpp

ConcurrentAlloc.h

二、核心代码解析

1. CentralCache类定义

2. 跨度结构Span

3. 跨度链表SpanList

4. 批量获取内存对象

5. 智能批量分配策略


 高并发内存池整体框架设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本 ⾝其实已经很优秀,那么我们项⽬的原型tcmalloc就是在多线程⾼并发的场景下更胜⼀筹,所以这次 我们实现的内存池需要考虑以下⼏⽅⾯的问题。

1.性能问题。

2. 多线程环境下,锁竞争问题。

3. 内存碎⽚问题。

concurrent memorypool主要由以下3个部分构成:

1. threadcache:线程缓存是每个线程独有的,⽤于⼩于256KB的内存的分配,线程从这⾥申请内存 不需要加锁,每个线程独享⼀个cache,这也就是这个并发线程池⾼效的地⽅。

2. central cache:中⼼缓存是所有线程所共享,threadcache是按需从centralcache中获取的对 象。centralcache合适的时机回收threadcache中的对象,避免⼀个线程占⽤了太多的内存,⽽其 他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的⽬的。centralcache是存在竞 争的,所以从这⾥取内存对象是需要加锁,⾸先这⾥⽤的是桶锁,其次只有threadcache的没有内 存对象时才会找centralcache,所以这⾥竞争不会很激烈。

3. pagecache:⻚缓存是在centralcache缓存上⾯的⼀层缓存,存储的内存是以⻚为单位存储及分 配的,centralcache没有内存对象时,从pagecache分配出⼀定数量的page,并切割成定⻓⼤⼩ 的⼩块内存,分配给centralcache。当⼀个span的⼏个跨度⻚的对象都回收以后,pagecache会 回收centralcache满⾜条件的span对象,并且合并相邻的⻚,组成更⼤的⻚,缓解内存碎⽚的问 题。

               

一、central cache 完整代码实现

Common.h

#pragma once#include <iostream>
#include <vector>
#include <algorithm>
#include <time.h>
#include <assert.h>
#include <thread>
#include <mutex>using std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else// linux
#endifstatic void*& NextObj(void* obj)
{return *(void**)obj;
}// 管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);NextObj(obj) = _freeList;_freeList = obj;}void PushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}private:void* _freeList = nullptr;size_t _maxSize = 1;
};// 计算对象大小的对齐映射规则
class SizeClass
{
public:static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128) return _RoundUp(size, 8);else if (size <= 1024) return _RoundUp(size, 16);else if (size <= 8*1024) return _RoundUp(size, 128);else if (size <= 64*1024) return _RoundUp(size, 1024);else if (size <= 256 * 1024) return _RoundUp(size, 8*1024);else { assert(false); return -1; }}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) return _Index(bytes, 3);else if (bytes <= 1024) return _Index(bytes - 128, 4) + group_array[0];else if (bytes <= 8 * 1024) return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];else if (bytes <= 64 * 1024) return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];else if (bytes <= 256 * 1024) return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];else { assert(false); }return -1;}static size_t NumMoveSize(size_t size){assert(size > 0);int num = MAX_BYTES / size;if (num < 2) num = 2;if (num > 512) num = 512;return num;}
};// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0;size_t  _n = 0;Span* _next = nullptr;Span* _prev = nullptr;size_t _useCount = 0;void* _freeList = nullptr;
};// 带头双向循环链表 
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx;
};

ThreadCache.h

#pragma once
#include "Common.h"class ThreadCache
{
public:void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

ThreadCache.cpp

#include "ThreadCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{return nullptr;
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}void* ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);
}

CentralCache.h

#pragma once
#include "Common.h"class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}Span* GetOneSpan(SpanList& list, size_t byte_size);size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST];CentralCache() {}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};

CentralCache.cpp

#include "CentralCache.h"CentralCache CentralCache::_sInst;Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{return nullptr;
}size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;_spanLists[index]._mtx.unlock();return actualNum;
}

ConcurrentAlloc.h

#pragma once
#include "Common.h"
#include "ThreadCache.h"static void* ConcurrentAlloc(size_t size)
{if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);

二、核心代码解析

当线程申请某一大小的内存时,如果thread cache中对应的自由链表不为空,那么直接取出一个内存块进行返回即可,但如果此时该自由链表为空,那么这时thread cache就需要向central cache申请内存了。

  central cache的结构与thread cache是一样的,它们都是哈希桶的结构,并且它们遵循的对齐映射规则都是一样的。这样做的好处就是,当thread cache的某个桶中没有内存了,就可以直接到central cache中对应的哈希桶里去取内存就行了。
 

central cache与thread cache有两个明显不同的地方,首先,thread cache是每个线程独享的,而central cache是所有线程共享的,因为每个线程的thread cache没有内存了都会去找central cache,因此在访问central cache时是需要加锁的。

  但central cache在加锁时并不是将整个central cache全部锁上了,central cache在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。

  central cache与thread cache的第二个不同之处就是,thread cache的每个桶中挂的是一个个切好的内存块,而central cache的每个桶中挂的是一个个的span。
         

每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶这些内存块被切成了对应的大小。

1. CentralCache类定义

class CentralCache
{
public:static CentralCache* GetInstance()  // 单例模式{return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t byte_size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST];  // 跨度链表数组private:CentralCache() {}  // 私有构造函数CentralCache(const CentralCache&) = delete;  // 禁止拷贝static CentralCache _sInst;  // 单例实例
};
CentralCache采用单例模式设计,确保整个进程中只有一个实例。它维护了一个SpanList数组,每个元素对应不同大小的内存块。

2. 跨度结构Span

struct Span
{PAGE_ID _pageId = 0;     // 大块内存起始页的页号size_t  _n = 0;          // 页的数量Span* _next = nullptr;   // 双向链表结构Span* _prev = nullptr;size_t _useCount = 0;    // 被分配给thread cache的计数void* _freeList = nullptr;  // 切好的小块内存的自由链表
};
Span是内存管理的基本单位,代表一块连续的内存页:_pageId和_n:标识内存位置和大小_useCount:跟踪内存使用情况,为0时可回收_freeList:管理切分后的小内存块

    3. 跨度链表SpanList

     class SpanList
    {
    public:SpanList()  // 构造带头双向循环链表{_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan);  // 插入操作void Erase(Span* pos);                  // 删除操作private:Span* _head;
    public:std::mutex _mtx;  // 桶锁,每个桶独立加锁
    };
    SpanList采用带头双向循环链表结构,这种设计:插入删除操作高效(O(1)时间复杂度)每个桶有独立的锁,减少锁竞争便于遍历和管理Span

      4. 批量获取内存对象

      size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
      {size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();  // 加桶锁Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);// 从span中获取batchNum个对象start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;span->_useCount += actualNum;  // 增加使用计数_spanLists[index]._mtx.unlock();  // 解桶锁return actualNum;
      }

      这个过程体现了CentralCache的核心工作流程:

      1. 按大小定位:通过SizeClass::Index找到对应的内存桶

      2. 加桶锁:只锁定当前操作的桶,其他桶仍可并发访问

      3. 获取Span:找到有可用内存的Span

      4. 批量提取:一次性获取多个对象,减少频繁申请

      5. 更新计数:记录Span的使用情况

      5. 智能批量分配策略

      // 一次thread cache从中心缓存获取多少个
      static size_t NumMoveSize(size_t size)
      {assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;
      }

      这个函数实现了慢启动的批量分配策略:

      • 小对象:一次性获取较多数量(最多512个)

      • 大对象:一次性获取较少数量(最少2个)

      • 动态调整:根据对象大小智能计算批量数量

      http://www.dtcms.com/a/391965.html

      相关文章:

    • vue3 <el-image 的:src=“event.fileName[0]“ 长度为 “0“ 的元组类型 “[]“ 在索引 “0“ 处没有元素。
    • 问题记录: 跨服务接口调用日期类型字段格式转换问题
    • 亚马逊关键词按什么角度筛选?从人工摸索到智能化系统的全面升级
    • C语言基础【19】:指针6
    • 正则表达式【阿里版】
    • 使用云端GPU训练Lerobot
    • RNA-seq分析之基因ID转换
    • [视图功能9] 图表联动与多维度分析:打造协同动态的数据洞察仪表盘
    • Python基础 6》数据类型_列表(List)
    • 40、大模型工程平台全景对比 - 技术选型指南
    • BEVformer训练nusenes-mini数据集
    • 《Unity3D NavMeshAgent与Rigidbody移动同步问题的技术拆解》
    • Psy Protocol 技术核心解读
    • PS练习3:使用变形将图片放到实际场景中
    • 在排序数组中查找元素的第一个和最后一个位置
    • 一条命令在ubuntu安装vscode
    • 【开题答辩全过程】以 ASP.NET抗疫物资管理系统为例,包含答辩的问题和答案
    • 探饭 - 字节跳动推出的AI美食推荐助手
    • ZCC5515_耐压9.5V ,超低静态功耗5uA,完全替代CS5515
    • 端脑云AI生图体验:从提示词到精美肖像
    • 临界处有一条看不见的河
    • JavaWeb--day8-- Mybatis(正式)
    • 基于WSL BES2710编译环境搭建方法
    • 模块化设计逻辑:太阳镜气流单元 / 耳机可拆卸结构的装配精度与兼容性分析
    • 半监督学习实战:如何用少量标注数据获得媲美全监督学习的性能?
    • 作业3(初学CSS)
    • CSS基础(总结)
    • 【信创云架构 PACS系统】全网首发-基于JDK17+Vue3全新开发的信创国产化系统
    • 若依vue项目里面,使用到oss,这个 是什么
    • Linux中的Ubuntu系统安装配置 MATLAB 开发环境、离线安装非root安装vscode