【高并发内存池——项目】page cache 回收内存
提示:高并发内存池完整项目代码,在主页专栏项目中
文章目录
目录
提示:高并发内存池完整项目代码,在主页专栏项目中
一、page cache 回收内存部分,实现代码
Common.h
ThreadCache.h
ThreadCache.cpp
二、核心数据结构:Span的全局管理
2.1 Page Cache的架构设计
三、内存回收核心:ReleaseSpanToPageCache
3.1 方法框架与锁设计
3.2 关键技术:伙伴系统与合并算法
四、映射表管理:_idSpanMap的核心作用
4.1 映射表的维护
4.2 映射表的查询优化
五、完整回收流程分析
5.1 从CentralCache到PageCache的旅程
5.2 合并策略的智慧
六、并发安全与性能优化
6.1 全局锁的设计考量
6.2 锁粒度优化策略
七、与系统内存的交互
7.1 系统内存申请
7.2 内存释放策略
一、page cache 回收内存部分,实现代码
Common.h
#pragma once#include <iostream>
#include <vector>
#include <unordered_map>
#include <algorithm>
#include <time.h>
#include <assert.h>
#include <thread>
#include <mutex>using std::cout;
using std::endl;#ifdef _WIN32
#include <windows.h>
#else
// ...
#endifstatic const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13;#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
// linux
#endifinline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr) throw std::bad_alloc();return ptr;
}static void*& NextObj(void* obj) { return *(void**)obj; }class FreeList
{
public:void Push(void* obj){assert(obj);NextObj(obj) = _freeList;_freeList = obj;++_size;}void PushRange(void* start, void* end, size_t n){NextObj(end) = _freeList;_freeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; ++i) end = NextObj(end);_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}void* Pop(){assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);--_size;return obj;}bool Empty() { return _freeList == nullptr; }size_t& MaxSize() { return _maxSize; }size_t Size() { return _size; }private:void* _freeList = nullptr;size_t _maxSize = 1;size_t _size = 0;
};class SizeClass
{
public:static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128) return _RoundUp(size, 8);else if (size <= 1024) return _RoundUp(size, 16);else if (size <= 8 * 1024) return _RoundUp(size, 128);else if (size <= 64 * 1024) return _RoundUp(size, 1024);else if (size <= 256 * 1024) return _RoundUp(size, 8 * 1024);else { assert(false); return -1; }}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) return _Index(bytes, 3);else if (bytes <= 1024) return _Index(bytes - 128, 4) + group_array[0];else if (bytes <= 8 * 1024) return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];else if (bytes <= 64 * 1024) return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];else if (bytes <= 256 * 1024) return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];else { assert(false); }return -1;}static size_t NumMoveSize(size_t size){assert(size > 0);int num = MAX_BYTES / size;if (num < 2) num = 2;if (num > 512) num = 512;return num;}static size_t NumMovePage(size_t size){size_t num = NumMoveSize(size);size_t npage = num * size;npage >>= PAGE_SHIFT;if (npage == 0) npage = 1;return npage;}
};struct Span
{PAGE_ID _pageId = 0;size_t _n = 0;Span* _next = nullptr;Span* _prev = nullptr;size_t _useCount = 0;void* _freeList = nullptr;bool _isUse = false;
};class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin() { return _head->_next; }Span* End() { return _head; }bool Empty() { return _head->_next == _head; }void PushFront(Span* span) { Insert(Begin(), span); }Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan){assert(pos); assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos); assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx;
};
ThreadCache.h
#pragma once
#include "Common.h"class ThreadCache
{
public:void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);void* FetchFromCentralCache(size_t index, size_t size);void ListTooLong(FreeList& list, size_t size);
private:FreeList _freeLists[NFREELIST];
};static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
ThreadCache.cpp
cpp#include "ThreadCache.h"
#include "CentralCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum) _freeLists[index].MaxSize() += 1;void* start = nullptr; void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum > 0);if (actualNum == 1) return start;else { _freeLists[index].PushRange(NextObj(start), end, actualNum - 1); return start; }
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()) return _freeLists[index].Pop();else return FetchFromCentralCache(index, alignSize);
}void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr); assert(size <= MAX_BYTES);size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);if (_freeLists[index].Size() >= _freeLists[index].MaxSize())ListTooLong(_freeLists[index], size);
}void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr; void* end = nullptr;list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
ThreadCache.cpp
#include "ThreadCache.h"
#include "CentralCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// 慢开始反馈调节算法// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum > 0);if (actualNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(NextObj(start), end, actualNum-1);return start;}
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);// 当链表长度大于一次批量申请的内存时就开始还一段list给central cacheif (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
二、核心数据结构:Span的全局管理
2.1 Page Cache的架构设计
class PageCache
{
public:static PageCache* GetInstance() { return &_sInst; }Span* NewSpan(size_t k); // 获取K页Spanvoid ReleaseSpanToPageCache(Span* span); // 归还Spanstd::mutex _pageMtx; // 全局页面锁private:SpanList _spanLists[NPAGES]; // 128个桶,管理1-128页的Spanstd::unordered_map<PAGE_ID, Span*> _idSpanMap; // 页号到Span的映射// ... 其他私有方法和成员
};
设计精妙之处:
-
桶状结构:按Span大小组织,快速查找合适的内存块
-
全局映射表:记录每个页属于哪个Span,支持快速合并
-
单例模式:全局唯一实例,统一管理所有内存资源
三、内存回收核心:ReleaseSpanToPageCache
3.1 方法框架与锁设计
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 获取全局锁 - 因为涉及全局数据结构的修改std::unique_lock<std::mutex> lock(_pageMtx);// 向前合并:检查前面的相邻Span是否空闲while (true) {PAGE_ID prevId = span->_pageId - 1;auto it = _idSpanMap.find(prevId);// 如果前一个Span不存在或者正在使用,停止向前合并if (it == _idSpanMap.end() || it->second->_isUse) {break;}Span* prevSpan = it->second;// 合并到当前Spanspan->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;// 从原链表中移除前一个Span_spanLists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}// 向后合并:检查后面的相邻Span是否空闲while (true) {PAGE_ID nextId = span->_pageId + span->_n;auto it = _idSpanMap.find(nextId);// 如果后一个Span不存在或者正在使用,停止向后合并if (it == _idSpanMap.end() || it->second->_isUse) {break;}Span* nextSpan = it->second;// 合并到当前Spanspan->_n += nextSpan->_n;// 从原链表中移除后一个Span_spanLists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}// 将合并后的Span挂到对应的桶中_spanLists[span->_n].PushFront(span);span->_isUse = false;// 更新映射表for (PAGE_ID i = 0; i < span->_n; ++i) {_idSpanMap[span->_pageId + i] = span;}
}
3.2 关键技术:伙伴系统与合并算法
向前合并算法:
// 计算前一个Span的起始页号
PAGE_ID prevId = span->_pageId - 1;// 查找前一个Span是否存在且空闲
auto it = _idSpanMap.find(prevId);
if (it != _idSpanMap.end() && !it->second->_isUse) {// 执行合并操作Span* prevSpan = it->second;span->_pageId = prevSpan->_pageId; // 更新起始页号span->_n += prevSpan->_n; // 更新页数// 清理原Span_spanLists[prevSpan->_n].Erase(prevSpan);delete prevSpan;
}
向后合并算法类似,但是检查后面的Span。
四、映射表管理:_idSpanMap的核心作用
4.1 映射表的维护
// 在分配Span时更新映射表
for (PAGE_ID i = 0; i < span->_n; ++i) {_idSpanMap[span->_pageId + i] = span;
}// 在合并Span时更新映射表
for (PAGE_ID i = 0; i < mergedSpan->_n; ++i) {_idSpanMap[mergedSpan->_pageId + i] = mergedSpan;
}
4.2 映射表的查询优化
使用unordered_map
提供O(1)时间复杂度的查找,确保合并操作的高效性。
五、完整回收流程分析
5.1 从CentralCache到PageCache的旅程
-
触发条件:CentralCache发现Span完全空闲(
_useCount == 0
) -
移交Span:CentralCache调用
ReleaseSpanToPageCache(span)
-
合并准备:PageCache获取全局锁,开始合并操作
-
向前合并:检查并合并前面的空闲Span
-
向后合并:检查并合并后面的空闲Span
-
更新存储:将合并后的Span放入对应大小的桶中
-
更新映射:刷新页号到Span的映射关系
5.2 合并策略的智慧
为什么需要双向合并?
-
向前合并:解决左侧碎片
-
向后合并:解决右侧碎片
-
双向合并:最大程度减少外部碎片
示例:
text
初始状态: [空闲SpanA: 页1-2] [使用中] [空闲SpanB: 页4-5] [空闲SpanC: 页6-8]SpanB归还后: 1. 向前合并:页4前是使用中,不能合并 2. 向后合并:页5后是SpanC(页6-8),可以合并 3. 结果:生成新的SpanD: 页4-8
六、并发安全与性能优化
6.1 全局锁的设计考量
std::mutex _pageMtx; // PageCache全局锁
为什么需要全局锁?
-
映射表
_idSpanMap
是全局数据结构 -
Span合并操作涉及多个桶的修改
-
保证原子性:合并过程中不能被中断
6.2 锁粒度优化策略
虽然使用全局锁,但通过以下方式优化:
-
快速操作:合并算法尽可能高效,减少持锁时间
-
批量处理:一次合并多个Span,减少整体锁竞争
-
无锁读取:
NewSpan
中的查找操作可以在特定情况下无锁进行
七、与系统内存的交互
7.1 系统内存申请
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);// ... 错误处理return ptr;
#endif
}
7.2 内存释放策略
PageCache通常不会立即将内存归还给系统,而是:
-
缓存空闲Span:保留一定数量的空闲Span供后续使用
-
延迟释放:只有在内存压力很大时才真正释放
-
智能策略:根据系统内存状态动态调整缓存策略