高并发内存池日志
目录
1.了解concurrent memory pool的构成(第一天)
2.thread cache
(第二天)
2.1哈希桶对齐和映射规则
(第三天)
2.2threadcacheTLS无锁访问
3.central cache(第四天)
3.1centralcache的核心实现
4.Page Cache
4.1page cache中获取span
5.thread cache的回收机制(第五天)
6.central cache的回收机制
7.PageCache的回收机制
8.大于256KB的大块内存申请释放问题(第六天)
9.使用定长内存池脱离使用new
10.多线程并发环境下,对比malloc和ConcurrentAlloc申请和释放内存效率对比
11.使用tcmalloc源码中实现基数数进行优化
1.了解concurrent memory pool的构成(第一天)
主要由三个部分构成
- thread cache:线程缓存时每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享的,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配再多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈
- page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收c entral cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题
2.thread cache
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的
第一天完成了 线程用完内存块后放回thread cache、线程需要申请内存时从thread cache拿出内存块代码。
//Common.h#pragma once
#include <assert.h>
void*& NextObj(void* obj)
{return *(void**)obj;
}class FreeList
{
public:void Push(void* obj){assert(obj);NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);}
private:void* _freeList;
};//ThreadCache.h#pragma once
#include "Common.h"class ThreadCache
{
public:void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);
private:FreeList _freeLists[];
};
(第二天)
2.1哈希桶对齐和映射规则
// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
原因:
小对象多、敏感于浪费,用更细粒度对齐把内碎片控制住。
大对象少、浪费的“绝对值”虽大但相对比例很小,可用更粗粒度来减少规格数量,降低元数据和热路径复杂度。
这类分段使总体内碎片≈≤10%。例如 130B 需求会落在 16B 粒度的 144B 桶,浪费 14B≈10.8%;而 2KB 以上使用 128B 粒度,最坏只多出 127B,比例 <6%
要是觉得麻烦 可以就用8/16对齐 也还ok
/*size_t _RoundUp(size_t size, size_t alignNum)
{size_t alignSize;if (size % alignNum != 0){alignSize = (size / alignNum + 1)*alignNum;}else{alignSize = size;}return alignSize;
}*/static inline size_t _RoundUp(size_t bytes, size_t AlignNum)
{return (bytes + AlignNum - 1) & ~(AlignNum - 1);
}
static inline size_t RoundUp(size_t bytes)
{if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{assert(false);return -1;}
}
这个运算对齐数的方法是大佬想出来的(注释掉的方法是容易理解一点的)
哈希桶映射
static inline size_t _Index(size_t bytes, size_t align_shift)
{return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}static inline size_t Index(size_t bytes)
{assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024,10) + group_array[0] + group_array[1] + group_array[2];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];}else{assert(false);}return -1;
}
第二天最终代码
Common.h#pragma once
#include <assert.h>static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;void*& NextObj(void* obj)
{return *(void**)obj;
}class FreeList
{
public:void Push(void* obj){assert(obj);NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);}bool Empty(){return _freeList == nullptr;}
private:void* _freeList;
};class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)static inline size_t _RoundUp(size_t bytes, size_t AlignNum){return (bytes + AlignNum - 1) & ~(AlignNum - 1);}static inline size_t RoundUp(size_t bytes){if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{assert(false);return -1;}}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024,10) + group_array[0] + group_array[1] + group_array[2];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];}else{assert(false);}return -1;}
};ThreadCache.h#pragma once
#include "Common.h"class ThreadCache
{
public:void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};ThreadCache.cpp#define _CRT_SECURE_NO_WARNINGS
#include "ThreadCache.h"void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);int AlignSize = SizeClass::RoundUp(size);int index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, size);}
}
(第三天)
2.2threadcacheTLS无锁访问
TLS--thread local storage
linux gcc下 tls
windows vs下 tls
一、先搞懂:为什么 threadcache 需要 TLS?
高并发内存池的threadcache设计初衷很明确:给每个线程配专属缓存,避免锁竞争(≤256KB 的小内存分配直接本地搞定)。但这里藏着两个坑:
线程识别难题:如何让线程安全获取自己的 cache,而非别人的?
资源管理难题:线程退出后,cache 占用的内存怎么自动回收?
没有 TLS 的话,你可能需要用全局哈希表存<线程ID, threadcache>,但查一次要加全局锁 —— 这又回到了 “锁竞争” 的原点。
二、TLS 的三板斧:完美适配 threadcache
TLS(线程局部存储)的核心是 “线程私有全局变量”,每个线程有独立副本,互不干扰。它用三个特性解决问题:
1. 无锁初始化:线程专属 cache 的 “身份证”
用thread_local关键字声明缓存指针,线程首次访问时自动初始化,全程无锁:
// 每个线程独有一份pTLSThreadCachestatic thread_local ThreadCache* pTLSThreadCache = nullptr;// 线程首次分配内存时if (pTLSThreadCache == nullptr) {pTLSThreadCache = new ThreadCache(); // 只属于当前线程}
效果:100 个线程就有 100 个独立指针,不会抢同一块内存,自然不用加锁。
2. 本地直接访问:性能起飞的关键
线程后续分配 / 释放内存时,直接通过 TLS 指针操作自己的 cache:
分配:算内存大小→找对应哈希桶→取空闲块(全程线程内操作)
释放:直接插回本地自由链表(O (1) 时间)
对比传统全局缓存:从 “加锁竞争” 变成 “线程内无锁操作”,并发越高优势越明显。
3. 自动回收:避免内存泄漏的兜底机制
TLS 变量有个天生优势:线程退出时自动销毁。
当线程结束,pTLSThreadCache指向的 cache 会被自动释放,不用手动管理 —— 解决了 “线程消失后 cache 占内存” 的老大难问题。
三、TLS + 中心缓存:不止于 “独”,更在于 “衡”
TLS 只管 “线程私有”,但还需配合CentralCache(中心缓存)解决 “资源均衡”:
当 threadcache 空了:通过 TLS 指针触发批量从中心缓存拿内存(用慢开始算法控制数量)
当 threadcache 满了:自动把多余内存还给中心缓存(避免单个线程占太多)
中心缓存用 “桶锁”:不同大小内存的请求抢不同的锁,进一步减少竞争
三层架构逻辑:
线程TLS缓存(无锁快取)→ 中心缓存(均衡调度)→ 页缓存(碎片合并)
四、一句话总结:TLS 的核心价值
它让 threadcache 实现了 “线程私有、无锁高效、自动清理”,既保留了 “每个线程一个 cache” 的性能优势,又解决了其管理难题 —— 这就是高并发内存池(如 tcmalloc)的性能密码。
测试代码:
UnitTest.cpp#define _CRT_SECURE_NO_WARNINGS
#include "ConcurrentAlloc.h"void Alloc1()
{for (int i = 0; i < 5; i++){void *ptr = ConcurrentAlloc(6);}
}void Alloc2()
{for (int i = 0; i < 5; i++){void* ptr = ConcurrentAlloc(7);}
}
void TLSTest()
{std::thread t1(Alloc1);t1.join();std::thread t2(Alloc2);t2.join();
}int main()
{TLSTest();return 0;
}Common.h
#pragma once
#include <assert.h>
#include <thread>
#include <iostream>
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;static void*& NextObj(void* obj)
{return *(void**)obj;
}class FreeList
{
public:void Push(void* obj){assert(obj);NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}
private:void* _freeList = nullptr;
};class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)static inline size_t _RoundUp(size_t bytes, size_t AlignNum){return (bytes + AlignNum - 1) & ~(AlignNum - 1);}static inline size_t RoundUp(size_t bytes){if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{assert(false);return -1;}}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024,10) + group_array[0] + group_array[1] + group_array[2];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];}else{assert(false);}return -1;}
};ConCurrentAlloc.h#pragma once
#include "ThreadCache.h"static void* ConcurrentAlloc(size_t size)
{if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}std::cout << std::this_thread::get_id() <<":" << pTLSThreadCache << std::endl;return pTLSThreadCache->Allocate(size);
}static void ConcurrentFree(void* obj, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(obj, size);
}ThreadCache.h#pragma once
#include "Common.h"class ThreadCache
{
public:void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
3.central cache(第四天)
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂的是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存对象挂在span的自由链表中
注意:在span结构体那里 SpanId为什么要条件编译ifdef,为了确保跨平台性。因为后续我们要获取的内存后,得到起始地址,需要确定该内存的页号,所以起始地址转页号时,在64位平台下,地址为8字节;在x86平台下,地址为4字节,所以地址转页号,pageid需要匹配相应的内存大小。
#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else //linux
#endifstruct Span
{PAGE_ID _pageId = 0; //大块内存起始页的页号size_t _n = 0; //页的数量Span* _prev = nullptr; //双向链表的结构Span* _next = nullptr;size_t _usecount = 0; //切好小块内存,被分配给thread cache的数量void* _freeList = nullptr; //切好的小块内存的自由链表
};class SpanList
{
public:SpanList(){_head = new Span;_head->_prev = _head;_head->_next = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head; //哨兵位
public:std::mutex _mtx;//桶锁
};
3.1centralcache的核心实现
问题:threadcache的内存被用完了,就会向centralcache申请,那要申请多少内存块才合适?如果threadcache的8bytes内存块用完了,向centralcache申请一个,那每次申请8bytes内存块都得从centralcache中获取,会导致多个线程频繁访问centralcache,导致申请内存效率降低。如果每次向centralcache申请200个,首先申请的内存块过多,如果不及时归还,会导致其他线程的threadcache向centralcache出现没有内存块的问题,centralcache会频繁访问pagecache申请大量的页,也会导致大量内存浪费的问题。
解决方法:慢开始反馈调节算法
申请内存块越大,一次向central cache要的内存块数量就越少
申请内存块越小,一次向central cache要的内存块数量就越多
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (batchNum == _freeLists[index].MaxSize()){_freeLists[index].MaxSize() += 1;}void* start;void* end;size_t acturalNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(acturalNum > 0);if (acturalNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(NextObj(start), end);return start;}
} static size_t NumMoveSize(size_t size){assert(size > 0);int num = MAX_BYTES / size;if (num < 2) num = 2;if (num > 512)num = 512;return num;}
threadcache向centralcache申请内存块,也许会出现多个threadcache同时向centralcache申请,所以要加上桶锁(mutex)确保数据的一致性和完整性,并且centralcache没有那么多的内存块,所以我们记录实际上centralcache给了多少内存块给threadcache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);start = span->_freeList;end = start;int i = 0;int actualNum = 1;while (i < batchNum - 1 && NextObj(end)){i++;actualNum++;end = NextObj(end);}span->_freeList = NextObj(end);NextObj(end) = nullptr;_spanLists[index]._mtx.unlock();return actualNum;
}
4.Page Cache
central cache和page cache的核心结构都是spanlist的哈希桶,但是它们是有本质区别的,central cache哈希桶,是跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存
class PageCache
{static PageCache* GetInstance(){return &_sInst;}Span* NewSpan(size_t k);std::mutex _pageMtx;
private:SpanList _spanlists[NPAGES];PageCache(){}PageCache(const PageCache&) = delete;static PageCache _sInst;
};
第四天代码总和:
Common.h#pragma once
#include <assert.h>
#include <thread>
#include <iostream>
#include <mutex>
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else //linux
#endifstatic void*& NextObj(void* obj)
{return *(void**)obj;
}class FreeList
{
public:void Push(void* obj){assert(obj);NextObj(obj) = _freeList;_freeList = obj;}void PushRange(void* start,void* end){NextObj(end) = _freeList;_freeList = start;}void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}private:void* _freeList = nullptr;size_t _maxSize = 1;
};class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)static inline size_t _RoundUp(size_t bytes, size_t AlignNum){return (bytes + AlignNum - 1) & ~(AlignNum - 1);}static inline size_t RoundUp(size_t bytes){if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{assert(false);return -1;}}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024,10) + group_array[0] + group_array[1] + group_array[2];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];}else{assert(false);}return -1;}static size_t NumMoveSize(size_t size){assert(size > 0);int num = MAX_BYTES / size;if (num < 2) num = 2;if (num > 512)num = 512;return num;}};struct Span
{PAGE_ID _pageId = 0; //大块内存起始页的页号size_t _n = 0; //页的数量Span* _prev = nullptr; //双向链表的结构Span* _next = nullptr;size_t _usecount = 0; //切好小块内存,被分配给thread cache的数量void* _freeList = nullptr; //切好的小块内存的自由链表
};class SpanList
{
public:SpanList(){_head = new Span;_head->_prev = _head;_head->_next = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx;//桶锁
};ThreadCache.h#pragma once
#include "Common.h"class ThreadCache
{
public:void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;ThreadCache.cpp#define _CRT_SECURE_NO_WARNINGS
#include "ThreadCache.h"
#include "CentralCache.h"
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);int AlignSize = SizeClass::RoundUp(size);int index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, size);}
}void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);
}void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (batchNum == _freeLists[index].MaxSize()){_freeLists[index].MaxSize() += 1;}void* start;void* end;size_t acturalNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(acturalNum > 0);if (acturalNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(NextObj(start), end);return start;}
}CentralCache.h#pragma once
#include "Common.h"class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);Span* GetOneSpan(SpanList& list, size_t size);
private:SpanList _spanLists[NFREELIST];
private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};CentralCache.cpp#define _CRT_SECURE_NO_WARNINGS
#include "CentralCache.h"CentralCache CentralCache::_sInst;Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// ...return nullptr;
}size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);start = span->_freeList;end = start;int i = 0;int actualNum = 1;while (i < batchNum - 1 && NextObj(end)){i++;actualNum++;end = NextObj(end);}span->_freeList = NextObj(end);NextObj(end) = nullptr;_spanLists[index]._mtx.unlock();return actualNum;
}PageCache.h#pragma once
#include "Common.h"class PageCache
{static PageCache* GetInstance(){return &_sInst;}Span* NewSpan(size_t k);std::mutex _pageMtx;
private:SpanList _spanlists[NPAGES];PageCache(){}PageCache(const PageCache&) = delete;static PageCache _sInst;
};
4.1page cache中获取span
当central cache向page cache申请内存时,page cache先检查对应位置有没有span,并且得到span之后要对其进行处理,把多余的内存空间插入到central cache对应的桶里
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//遍历CentralCache的Span链表:查找已有可用SpanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->_next;}}//若 CentralCache 中没有可用 Span,// 就需要向更上层的 PageCache 申请。// SizeClass::NumMovePage(size)是根据待分配内存 size,// 计算需要向 PageCache 申请的页数量Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));//Span的_pageId 是 “页编号”,// PAGE_SHIFT 是页偏移量// (这里 PAGE_SHIFT = 13,即 2 ^ 13 = 8192 = 8KB,// 所以 << PAGE_SHIFT 等价于 “页编号 × 8KB”);// span->_n 是 Span 包含的页数量,// 因此 bytes 是 Span 总内存大小(页数量 × 每页大小);// start 是 Span 内存的起始地址(页编号 × 每页大小,转成 char* 方便后续内存操作)char* start = (char*)(span->_pageId << PAGE_SHIFT);size_t bytes = span->_n << PAGE_SHIFT; //PAGE_SHIFT = 13, <<PAGE_SHIFT相当于*8k(8*1024);char* end = start + bytes;//切分 Span 为多个小内存块,串联成 freeListspan->_freeList = start;start += size;void* tail = span->_freeList;while (start < end){NextObj(tail) = start;tail = NextObj(tail);start += size;}list.PushFront(span);return nullptr;
}
central cache向page cache得到span,首先page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是2页的span,2页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个2页的page span和一个8页page span
如果找了_spanlists[128]都没有合适的span,则向系统使用mmap,brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复上面的过程
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);// 先检查第k个桶里面有没有spanif (!_spanlists[k].Empty()){return _spanlists[k].PopFront();}// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanlists[k].Empty()){Span* kSpan = new Span;Span* nSpan = _spanlists[k].PopFront();// 在nSpan的头部切一个k页下来// k页span返回// nSpan再挂到对应映射的位置kSpan->_pageId = nSpan->_pageId;nSpan->_pageId += k;kSpan->_n = k;nSpan->_n -= k;_spanlists[k].PushFront(nSpan);}}// 走到这个位置就说明后面没有大页的span了// 这时就去找堆要一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> 13;bigSpan->_n = NPAGES - 1;_spanlists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}
5.thread cache的回收机制(第五天)
当链表过长时,我们要回收一部分内存到central cache。因为链表过长会导致大量的空间浪费,还给central cache,可以让其他需要大量内存的线程申请使用。
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);// 当链表长度大于一次批量申请的内存时就开始还一段list给central cacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start;void* end;list.PopRange(start, end, size);CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
6.central cache的回收机制
当thread cache过长或者线程销毁,则会将内存释放回central cache中,释放回来时--use_count。当use_count减到0时则表示所有对象都会到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();while (start){void* next = NextObj(start);Span* span = PageCache::GetInstance()->MapObjectToSpan(start);NextObj(start) = span->_freeList;span->_freeList = start;span->_usecount--;// 说明span的切分出去的所有小块内存都回来了// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并if (span->_usecount == 0){_spanLists[index].Erase(span);span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;// 释放span给page cache时,使用page cache的锁就可以了// 这时把桶锁解掉_spanLists[index]._mtx.unlock();PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanLists[index]._mtx.lock();}start = next;}_spanLists[index]._mtx.unlock();
}//用一个unordered_map容器存page_id对应span,这样就不用遍历一遍_spanlists去找span
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
7.PageCache的回收机制
如果central cache释放回一个span,则依次寻找span前后pageid的有没有在使用的空间的span(判断span是否空闲 就在span类中加入一个bool类型的参数,来判断是否空闲),如果合并继续向前或向后寻找。这样就可以将切小的内存合并收缩成大的span,减少内碎片
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 对span前后的页,尝试进行合并,缓解内存碎片问题while (1){PAGE_ID previd = span->_pageId - 1;auto ret = _idSpanMap.find(previd);// 前面的页号没有,不合并了if (ret == _idSpanMap.end()){break;}Span* prevspan = ret->second;// 前面相邻页的span在使用,不合并了if (prevspan->_isUse == true){break;}// 合并出超过128页的span没办法管理,不合并了if (prevspan->_n + span->_n > NPAGES - 1){break;}span->_n += prevspan->_n;span->_pageId = prevspan->_pageId;_spanlists[prevspan->_n].Erase(prevspan);delete prevspan;} // 向后合并while (1){PAGE_ID nextid = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextid);if (ret == _idSpanMap.end()){break;}Span* nextspan = ret->second;if (nextspan->_isUse == true){break;}if (nextspan->_n + span->_n > NPAGES - 1){break;}span->_n += nextspan->_n;_spanlists[nextspan->_n].Erase(nextspan);delete nextspan;}_spanlists[span->_n].PushFront(span);span->_isUse = false;_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;
}
8.大于256KB的大块内存申请释放问题(第六天)
- 首先我们申请内存前,先判断申请内存的大小,如果申请内存大于MAX_BYTES = 256 * 1024,也就是超过threadcache哈希桶中存储最大内存的桶。
- 该内存的对齐数,算出所需页数量
- 算出页数量后,如果页数量小于128,我们可以直接从pagecache中获取,所以是会有可能访问到pagecache的哈希桶的,所以我们得上锁。
- 最后将返回的span通过spanid,算出申请内存的起始地址后返回该地址
if (size > MAX_BYTES){size_t alignSize = SizeClass::RoundUp(size);size_t kpage = alignSize >> PAGE_SHIFT;PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kpage);PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}
在NewSpan中要进行判断,申请页数量是否超出pagecache中哈希桶存储的最大页数量,如果超出就直接从系统申请,否则依旧从pagecache哈希桶中获取
Span* PageCache::NewSpan(size_t k)
{assert(k > 0);if (k > NPAGES - 1){void* ptr = SystemAlloc(k);Span* span = new Span;span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;_idSpanMap[span->_pageId] = span;return span;}// 先检查第k个桶里面有没有spanif (!_spanlists[k].Empty()){return _spanlists[k].PopFront();}// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分for (size_t i = k + 1; i < NPAGES; i++){if (!_spanlists[i].Empty()){Span* kSpan = new Span;Span* nSpan = _spanlists[i].PopFront();// 在nSpan的头部切一个k页下来// k页span返回// nSpan再挂到对应映射的位置kSpan->_pageId = nSpan->_pageId;nSpan->_pageId += k;kSpan->_n = k;nSpan->_n -= k;_spanlists[nSpan->_n].PushFront(nSpan);_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;for (int i = 0; i < kSpan->_n; i++){_idSpanMap[i + kSpan->_pageId] = kSpan;}return kSpan;}}// 走到这个位置就说明后面没有大页的span了// 这时就去找堆要一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanlists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}
上述我们用容器<pageid,span*>存储了每个页号对应的span*,我们就可以通过需要释放内存的内存的起始地址,得到span*,还能通过span*快速得到span中记录的页数量,得知需要释放内存的内存大小
static void ConcurrentFree(void* obj, size_t size)
{if(size > MAX_BYTES){Span* span = PageCache::GetInstance()->MapObjectToSpan(obj);PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(obj, size);}
}
如果释放页数量超出pagecache中哈希桶存储的最大页数量,就直接通过系统释放掉,否则就按正常的程序走,释放到pagecache的哈希桶中
void PageCache::ReleaseSpanToPageCache(Span* span)
{if (span->_n > NPAGES - 1){void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);delete span;return;}while (1){PAGE_ID previd = span->_pageId - 1;auto ret = _idSpanMap.find(previd);if (ret == _idSpanMap.end()){break;}Span* prevspan = ret->second;if (prevspan->_isUse == true){break;}if (prevspan->_n + span->_n > NPAGES - 1){break;}span->_n += prevspan->_n;span->_pageId = prevspan->_pageId;_spanlists[prevspan->_n].Erase(prevspan);delete prevspan;} while (1){PAGE_ID nextid = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextid);if (ret == _idSpanMap.end()){break;}Span* nextspan = ret->second;if (nextspan->_isUse == true){break;}if (nextspan->_n + span->_n > NPAGES - 1){break;}span->_n += nextspan->_n;_spanlists[nextspan->_n].Erase(nextspan);delete nextspan;}_spanlists[span->_n].PushFront(span);span->_isUse = false;_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;
}
9.使用定长内存池脱离使用new
因为我们要用tcmalloc去替换malloc,所以我们内部最好不要用new,直接把new span的地方或者delete span的地方替换成用定长内存池的申请释放内存的方式
10.多线程并发环境下,对比malloc和ConcurrentAlloc申请和释放内存效率对比
Benmarkch.cpp#include"ConcurrentAlloc.h"// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(ConcurrentAlloc(16));//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}int main()
{size_t n = 100000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
11.使用tcmalloc源码中实现基数数进行优化
PageMap.h#pragma once
#include"Common.h"// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
project: 和项目有关的代码