c++项目篇:高并发内存池项目开发记录01
高并发内存池项目开发记录01
目录
- 为什么要做这个项目
- 今天的进展
- 上午:理解设计思路
- 下午:开始写代码
- NextObj函数
- FreeList类
- SizeClass类
- 架构图解
- 遇到的问题
- 今天的收获
- 测试结果
- 项目文件结构
- 完整代码
- 下一步计划
- 如果你想跟着做
为什么要做这个项目
最近在深入学C++,感觉光看书做题还是太虚了,总觉得对底层的理解不够深入。特别是内存管理这块,平时写代码都是new、delete或者malloc、free,但从来没想过这些函数到底是怎么工作的。
前段时间看到TCMalloc的相关资料,突然意识到内存分配器原来这么复杂,还涉及到多线程、系统调用、性能优化等各种问题。就想着,能不能自己实现一个简化版的内存池,这样既能加深对C++的理解,也能学到一些系统编程的知识。
说实话,一开始我对"高并发"这个词还挺虚的,后来想了想,其实就是多线程环境下的性能问题。平时写多线程程序的时候确实感觉有时候莫名其妙就慢了,原来问题可能就在内存分配这里。
今天的进展
上午:理解设计思路
花了差不多2小时搞清楚为什么需要内存池。原来malloc在多线程下性能这么差,是因为内部有全局锁,所有线程都要排队。我之前写多线程程序的时候确实感觉有时候莫名其妙就慢了,原来问题在这里。
解决思路其实也不难理解:给每个线程分配自己的内存缓存,这样大部分时候就不用竞争锁了。但是这又带来新问题,万一某个线程的内存用完了怎么办?所以需要设计一个三层的架构:
- ThreadCache:线程本地缓存(无锁)
- CentralCache:中心缓存(平衡各线程)
- PageCache:页缓存(和操作系统交互)
下午:开始写代码
下午就开始动手了,主要实现Common.h里的基础数据结构。
NextObj函数
这个函数挺巧妙的,利用内存块本身来存储链表指针。一开始我还没理解,后来画了个图才明白:
//获取/设置对象的下一个节点
static void*& NextObj(void* obj) {return *(void**)obj;
}
就是把内存块的前8字节当作指针来用,这样就不需要额外分配链表节点了。
内存复用原理:
当内存块在自由链表中时:
┌─────────────────┐
│ 前8字节存储 │ ← 存储下一个节点的地址
│ 下一个节点地址 │
└─────────────────┘当内存块被分配出去时:
这8字节完全给用户使用,用户可以随便写
关键在于返回引用(void*&
),这样既可以读取也可以设置:
NextObj(obj1) = obj2; // 设置obj1指向obj2
void* next = NextObj(obj1); // 获取obj1指向的下一个节点
FreeList类
这个就是管理相同大小内存块的链表,支持Push和Pop操作。完整的类定义:
class FreeList {
public:// 单个对象插入(头插法)void Push(void* obj) {assert(obj);NextObj(obj) = _freeList; // obj指向原头节点_freeList = obj; // 头指针指向obj++_size;}// 单个对象弹出(头删法)void* Pop() {assert(_freeList);void* obj = _freeList; // 必须先保存头节点_freeList = NextObj(obj); // 头指针后移--_size;return obj;}// 批量插入(后面会用到)void PushRange(void* start, void* end, size_t n) {NextObj(end) = _freeList;_freeList = start;_size += n;}// 批量弹出(后面会用到)void PopRange(void*& start, void*& end, size_t n) {assert(n <= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; ++i) {end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}bool Empty() { return _freeList == nullptr; }size_t Size() { return _size; }size_t& MaxSize() { return _maxSize; }private:void* _freeList = nullptr; // 链表头指针size_t _maxSize = 1; // 最大批量数(慢增长算法用)size_t _size = 0; // 当前长度
};
写的时候有个小坑,Pop函数必须先保存头节点再移动指针,不然就找不到原来的头节点了。
SizeClass类
这个类负责内存对齐和索引计算,是整个设计的核心。为了减少内存碎片,采用分段对齐的策略:
- 小对象(1-128字节):8字节对齐
- 中对象(129-1024字节):16字节对齐
- 更大的对象:对齐粒度继续增大
核心对齐函数:
class SizeClass {
public:// 位运算对齐(高效版本)static inline size_t _RoundUp(size_t bytes, size_t alignNum) {return ((bytes + alignNum - 1) & ~(alignNum - 1));}// 根据大小范围选择对齐策略static inline size_t RoundUp(size_t size) {if (size <= 128) {return _RoundUp(size, 8); // 8字节对齐}else if (size <= 1024) {return _RoundUp(size, 16); // 16字节对齐}else if (size <= 8 * 1024) {return _RoundUp(size, 128); // 128字节对齐}else if (size <= 64 * 1024) {return _RoundUp(size, 1024); // 1024字节对齐}else if (size <= 256 * 1024) {return _RoundUp(size, 8 * 1024); // 8KB对齐}else {return _RoundUp(size, 1 << PAGE_SHIFT); // 按页对齐}}// 计算应该使用哪个FreeListstatic inline size_t Index(size_t bytes) {assert(bytes <= MAX_BYTES);static int group_array[4] = {16, 56, 56, 56}; // 每个范围的索引数量if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}// ... 更多范围}
};
位运算对齐原理(这个花了我不少时间理解):
对于8字节对齐:
7 → (7 + 8 - 1) & ~7 = 14 & 0xFFF8 = 8
9 → (9 + 8 - 1) & ~7 = 16 & 0xFFF8 = 16
16 → (16 + 8 - 1) & ~7 = 23 & 0xFFF8 = 16
Index函数的实现用了位运算优化,比直接用除法快很多。一开始我还搞不清楚那些移位操作,后来理解了就觉得很巧妙。
架构图解
为了更好地理解内存池的整体设计,这里用图来说明:
三层架构设计
FreeList链表结构
关键设计理念:
- 分层缓存:每层处理不同粒度的内存管理
- 无锁设计:ThreadCache每个线程独有,避免锁竞争
- 内存复用:利用内存块本身存储链表指针
- 分段对齐:不同大小采用不同对齐策略,平衡碎片和效率
遇到的问题
1. 指针转换理解困难
NextObj函数里的*(void**)obj
这个写法一开始看着很绕。后来画了内存布局图才理解,就是把obj当作"指向指针的指针"来用。
2. 编译错误
PAGE_ID类型定义的位置有问题,定义在文件末尾但前面就用了,导致编译报错。调整了一下定义顺序就好了。
今天的收获
-
理解了内存池的核心思想:用空间换时间,通过预分配内存来避免频繁的系统调用和锁竞争。
-
学会了内存复用技巧:利用内存块本身存储链表指针,这种做法在系统编程中很常见。
-
掌握了对齐算法:分段对齐不仅能控制碎片,还能简化内存管理。位运算版本的实现确实比除法快很多。
-
体验了真实的调试过程:编译错误、逻辑错误,每个都需要仔细分析。
测试结果
最后写了个简单的测试程序,验证了基本功能。
int main() {cout << "Testing NextObj..." << endl;// 测试NextObj函数,看能不能把内存块串成链表void* obj1 = malloc(8);void* obj2 = malloc(8);NextObj(obj1) = obj2; // obj1指向obj2assert(NextObj(obj1) == obj2);cout << "NextObj OK" << endl;// 测试FreeList的Push和Popcout << "Testing FreeList..." << endl;FreeList list;void* ptr1 = malloc(8);void* ptr2 = malloc(8);list.Push(ptr1);list.Push(ptr2); // 头插,ptr2应该在前面cout << "Size after push: " << list.Size() << endl;void* p1 = list.Pop(); // 应该是ptr2void* p2 = list.Pop(); // 应该是ptr1cout << "Pop worked: " << (p1 == ptr2) << ", " << (p2 == ptr1) << endl;// 测试SizeClass的对齐和索引计算cout << "Testing SizeClass..." << endl;cout << "RoundUp(7): " << SizeClass::RoundUp(7) << endl; // 应该是8cout << "RoundUp(9): " << SizeClass::RoundUp(9) << endl; // 应该是16cout << "Index(8): " << SizeClass::Index(8) << endl; // 应该是0cout << "Index(16): " << SizeClass::Index(16) << endl; // 应该是1cout << "All tests passed" << endl;return 0;
}
运行结果:
所有基础组件都工作正常,为明天实现ThreadCache打下了基础。
项目文件结构
经过今天的开发,项目结构变成了这样:
HighConcurrencyMemoryPool/
├── src/
│ └── Common.h # 核心数据结构
├── test/
│ ├── test_common.cpp # 测试代码
│ └── test_common.exe # 编译后的可执行文件
├── README.md
└── .gitignore
完整代码
为了方便学习,这里贴出今天实现的核心代码:
Common.h 完整实现
#pragma once#include <iostream>
#include <vector>
#include <algorithm>
#include <cstring>
#include <thread>
#include <mutex>
#include <cassert>#ifdef _WIN32
#include <windows.h>
#else
// Linux下的头文件,暂时先不写
#endifusing std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024; // 申请内存的上限 256KB
static const size_t NFREELIST = 208; // 自由链表的个数
static const size_t NPAGES = 129; // 页的数量
static const size_t PAGE_SHIFT = 13; // 页大小为8KB,即2^13typedef size_t PAGE_ID;// 获取/设置对象的下一个节点
static void*& NextObj(void* obj) {return *(void**)obj;
}// 自由链表 - 管理相同大小的内存块
class FreeList {
public:void Push(void* obj) {assert(obj);NextObj(obj) = _freeList;_freeList = obj;++_size;}void* Pop() {assert(_freeList);void* obj = _freeList;_freeList = NextObj(obj);--_size;return obj;}void PushRange(void* start, void* end, size_t n) {NextObj(end) = _freeList;_freeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n) {assert(n <= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; ++i) {end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}bool Empty() { return _freeList == nullptr; }size_t Size() { return _size; }size_t& MaxSize() { return _maxSize; }private:void* _freeList = nullptr;size_t _maxSize = 1;size_t _size = 0;
};// 大小控制类 - 负责内存对齐和索引计算
class SizeClass {
public:// 位运算实现内存对齐static inline size_t _RoundUp(size_t bytes, size_t alignNum) {return ((bytes + alignNum - 1) & ~(alignNum - 1));}// 根据对象大小获取对齐后的大小static inline size_t RoundUp(size_t size) {if (size <= 128) {return _RoundUp(size, 8);}else if (size <= 1024) {return _RoundUp(size, 16);}else if (size <= 8 * 1024) {return _RoundUp(size, 128);}else if (size <= 64 * 1024) {return _RoundUp(size, 1024);}else if (size <= 256 * 1024) {return _RoundUp(size, 8 * 1024);}else {return _RoundUp(size, 1 << PAGE_SHIFT);}}// 计算在某个范围内的索引static inline size_t _Index(size_t bytes, size_t align_shift) {return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 根据对象大小获取在自由链表数组中的下标static inline size_t Index(size_t bytes) {assert(bytes <= MAX_BYTES);static int group_array[4] = {16, 56, 56, 56};if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}assert(false);return -1;}
};// 页结构 - 管理连续的内存页
struct Span {PAGE_ID _pageId = 0; // 起始页号size_t _n = 0; // 页数Span* _next = nullptr; // 双向链表指针Span* _prev = nullptr;size_t _objSize = 0; // 切分的对象大小size_t _useCount = 0; // 已分配出去的对象数量void* _freeList = nullptr; // 剩余对象的自由链表bool _isUse = false; // 是否正在被CentralCache使用
};// 页链表 - 管理Span对象
class SpanList {
public:SpanList() {_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin() { return _head->_next; }Span* End() { return _head; }bool Empty() { return _head->_next == _head; }void PushFront(Span* span) {Insert(Begin(), span);}Span* PopFront() {Span* front = _head->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan) {assert(pos && newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos) {assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}public:std::mutex _mtx;private:Span* _head;
};// 向操作系统申请内存
inline static void* SystemAlloc(size_t kpage) {void* ptr = nullptr;#ifdef _WIN32ptr = VirtualAlloc(0,kpage << PAGE_SHIFT,MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE);
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}// 向操作系统释放内存
inline static void SystemFree(void* ptr) {
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else// Linux版本后面再实现
#endif
}
测试代码 test_common.cpp
#include "../src/Common.h"int main() {cout << "Testing NextObj..." << endl;// 测试NextObj函数void* obj1 = malloc(8);void* obj2 = malloc(8);NextObj(obj1) = obj2;assert(NextObj(obj1) == obj2);cout << "NextObj OK" << endl;// 测试FreeListcout << "Testing FreeList..." << endl;FreeList list;void* ptr1 = malloc(8);void* ptr2 = malloc(8);list.Push(ptr1);list.Push(ptr2);cout << "Size after push: " << list.Size() << endl;void* p1 = list.Pop();void* p2 = list.Pop();cout << "Pop worked: " << (p1 == ptr2) << ", " << (p2 == ptr1) << endl;// 测试SizeClasscout << "Testing SizeClass..." << endl;cout << "RoundUp(7): " << SizeClass::RoundUp(7) << endl;cout << "RoundUp(9): " << SizeClass::RoundUp(9) << endl;cout << "Index(8): " << SizeClass::Index(8) << endl;cout << "Index(16): " << SizeClass::Index(16) << endl;// 测试SpanListcout << "Testing SpanList..." << endl;SpanList spanList;cout << "SpanList empty: " << spanList.Empty() << endl;// 测试SystemAlloccout << "Testing SystemAlloc..." << endl;void* sysPtr = SystemAlloc(1);cout << "SystemAlloc got ptr: " << (sysPtr != nullptr) << endl;SystemFree(sysPtr);cout << "All tests passed" << endl;// 清理内存free(obj1);free(obj2);free(ptr1);free(ptr2);return 0;
}
完整代码已经提交到GitHub: HighConcurrencyMemoryPool
下一步计划
明天计划实现ThreadCache层,这是整个内存池的核心。主要包括:
- 线程本地存储(TLS)的使用
- Allocate和Deallocate接口
- 与CentralCache的交互机制
- 批量分配和慢增长算法
这部分应该会比较有挑战性,因为涉及到多线程和性能优化的细节。
总的来说,今天进展还不错,从一头雾水到基本理解了整个设计思路,还实现了核心的数据结构。虽然还有很多细节不完善,但至少有了一个可以工作的基础框架。
如果你想跟着做
如果你也想跟着学习实现内存池,这里是完整的搭建步骤:
1. 创建项目目录
mkdir HighConcurrencyMemoryPool
cd HighConcurrencyMemoryPool
mkdir src test
2. 创建基础文件
创建 src/Common.h
,按照上面的代码实现:
- 常量定义(MAX_BYTES, NFREELIST等)
- NextObj函数
- FreeList类
- SizeClass类
- Span结构和SpanList类
- SystemAlloc/SystemFree函数
3. 编写测试代码
创建 test/test_common.cpp
,验证各个组件的功能。
4. 编译运行
# Windows下用VS或者g++都可以
g++ -std=c++11 test/test_common.cpp -o test/test_common.exe
./test/test_common.exe
5. 版本控制
git init
git add .
git commit -m "初始化Common模块"
学习重点
如果你是初学者,重点理解这几个概念:
- 内存复用:NextObj如何利用内存块本身存储链表指针
- 位运算对齐:为什么
(n + align - 1) & ~(align - 1)
能实现对齐 - 分段策略:为什么要针对不同大小采用不同的对齐粒度
- 双向循环链表:SpanList为什么需要哨兵节点
每个知识点都值得深入研究,不要急着往下走。