高并发内存池的thread cache部分实现及测试
并发内存池的三个主要组成部分:
- 线程缓存(Thread Cache)
- 每个线程拥有独立的线程缓存,用于处理小于
256KB
的内存分配。 - 由于每个线程都有自己的缓存,线程在从线程缓存中分配内存时无需加锁,这有效避免了竞争,提升了并发性能。
- 每个线程拥有独立的线程缓存,用于处理小于
- 中心缓存(Central Cache)
- 中心缓存是全局共享的,负责管理所有线程缓存的内存。
- 当线程缓存中的内存不足时,会从中心缓存获取资源;同时,中心缓存也会定期回收线程缓存中的未使用对象,避免单个线程占用过多内存,导致资源不均衡。
- 因为所有线程都从中心缓存获取内存对象,中心缓存的访问会存在竞争,通常通过使用桶锁来减少锁竞争的影响。只有当线程缓存没有足够资源时,才会从中心缓存获取,这样能确保锁竞争不会过于激烈。
- 页缓存(Page Cache)
- 页缓存是管理页级内存分配的缓存,位于中心缓存的上一级。
- 当中心缓存中的内存不足时,页缓存会分配一定数量的页,并将其切割成固定大小的小块,供中心缓存使用。
- 页缓存通过回收和合并相邻的空闲页,缓解内存碎片(外碎片)问题。当多个页的内存块被回收后,它们会合并成更大的页,避免内存的浪费。
分层管理:
- 线程缓存:高效避免加锁,提升并发性能。
- 中心缓存:确保内存资源的均衡分配,减少锁竞争。
- 页缓存:减少内存碎片,优化内存回收和分配策略。
线程缓存(Thread Cache)
核心思想
为每个线程提供一个独立的缓存,使得线程在申请和释放内存时不需要加锁,从而提高性能。通过内存对齐和哈希桶的方式,控制内存碎片,同时避免了大量小内存分配带来的效率损失。inline
和static inline
则用来优化代码的执行效率。
- 线程局部存储(TLS):每个线程都会有一个自己的线程缓存(
ThreadCache
),它通过“哈希桶”结构来管理内存。每个哈希桶对应一个特定大小的内存块。 - 哈希桶结构:线程缓存内部是由多个哈希桶组成的,每个哈希桶代表一个内存大小区间。如果某个内存块请求的大小在某个区间内,线程就会根据大小映射到相应的哈希桶。每个哈希桶里维护着一个自由链表,用于管理空闲的内存对象。
内存申请
当一个线程申请内存时,如果申请的内存小于256KB
,首先会查找自己线程的缓存。如果哈希桶中有可用的内存块,直接返回。如果没有,就会从 central cache(中央缓存) 中批量获取对象,插入到哈希桶的自由链表里。
内存释放
- 当一个内存块被释放时,如果它小于
256KB
,线程会将其释放到自己的线程缓存里。释放时,通过计算哈希桶的位置,将对象加入到自由链表中。 - 如果链表中内存对象的数量过多,线程会将一部分内存回收到中央缓存中,以保持内存池的效率。
内存对齐与映射
- 内存对齐:为了减少内存碎片,内存分配时会进行对齐。根据内存请求的大小,内存会被分配到不同的对齐大小:
1
~128
字节的内存会对齐到8
字节。128
~1024
字节的内存对齐到16
字节。1024
~8KB
的内存对齐到128
字节,以此类推。
- 映射规则:这种内存对齐规则通过Roundup函数实现,它将内存请求的大小对齐到最接近的有效大小。例如,如果你请求
11
字节的内存,但由于对齐规则,最终会分配16
字节 - 哈希桶的映射:通过 Index函数 ,内存的对齐后大小会映射到一个特定的哈希桶。在计算时,会根据内存对齐的大小,确定内存块所在的桶。
static inline
与 inline
函数
inline
函数:是编译器在调用时尝试将函数代码插入调用点,从而避免函数调用的开销。这通常用于提高性能。static inline
函数:结合了static
和inline
的优势。static
确保该函数只在当前文件中可见,防止了多文件间重复定义,而inline
则减少了调用开销,提高了效率。
自由链表的设计
设计自由链表,其实就是实现一个单链表,方便插入删除,同时标识链表是否为空,自由链表在中心缓存中也有使用,所以放入common.h
中。
Common.h
#pragma once
#include <iostream>
#include <assert.h>
const int MAX_BYTES = 1024 * 256; // 最大内存块尺寸限制(256KB)
// 自由链表类 - 用于管理空闲内存块
class FreeList {
public:
// 将内存块插入链表头部
void Push(void* obj) {
assert(obj);
// 将obj头插到链表,利用内存块头部空间存储下一个节点指针
*(void**)obj = _free_list; // 将obj的前4/8字节指向当前链表头
_free_list = obj; // 更新链表头为当前对象
}
// 从链表头部弹出一个内存块
void* Pop() {
assert(_free_list); // 确保链表非空
void* obj = _free_list; // 获取当前链表头
_free_list = *(void**)obj; // 将链表头更新为下一个节点
return obj;
}
// 判断链表是否为空
bool Empty() {
return _free_list == nullptr;
}
private:
void* _free_list = nullptr; // 链表头指针(存储空闲内存块地址)
};
// 内存块尺寸对齐与索引计算类
class SizeClass {
public:
// 内存对齐辅助函数(将bytes向上对齐到alignNum的倍数)
static inline size_t _Roundup(size_t bytes, size_t alignNum) {
// 计算公式解释:(bytes + alignNum - 1) 确保超过对齐基数时进位
// & ~(alignNum - 1) 用于抹去低位实现对齐
// 示例:alignNum=8时,~(0b111) => 0b...11111000,清除后三位
return (bytes + alignNum - 1) & ~(alignNum - 1);
}
// 根据请求大小计算对齐后的实际分配大小
static size_t Roundup(size_t size) {
if (size <= 128) { // [1,128]字节按8字节对齐
return _Roundup(size, 8);
}
else if (size <= 1024) { // (128,1024]按16字节对齐
return _Roundup(size, 16);
}
else if (size <= 8 * 1024) { // (1KB,8KB]按128字节对齐
return _Roundup(size, 128);
}
else if (size <= 64 * 1024) { // (8KB,64KB]按1KB对齐
return _Roundup(size, 1024);
}
else if (size <= 128 * 1024) { // (64KB,128KB]按8KB对齐
return _Roundup(size, 8 * 1024);
}
else {
return 0; // 超过最大限制返回0(需配合断言使用)
}
}
// _Index计算的是当前size所在区域的第几个下标,所以Index的返回值需要加上前面所有区域的哈希桶的个数
static inline size_t _Index(size_t bytes, size_t align_shift) {
// align_shift表示对齐数的二进制位移量(如8字节对齐对应shift=3)
// 公式等效:(bytes + alignNum - 1) / alignNum - 1
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算内存块在对应规格数组中的索引位置
static inline size_t Index(size_t bytes) {
assert(bytes <= MAX_BYTES); // 确保请求大小在合法范围内
// 各规格组的链表数量(经验值设定)
// 对应不同区间:[8B对齐组][16B对齐组][128B对齐组][1KB对齐组][8KB对齐组]
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128) { // 8字节对齐区间(16个规格:8,16,...,128)
return _Index(bytes, 3); // 3=log2(8)
}
else if (bytes <= 1024) { // 16字节对齐区间(56个规格:144,160,...,1024)
return _Index(bytes - 128, 4) + group_array[0]; // 4=log2(16)
}
else if (bytes <= 8 * 1024) { // 128字节对齐区间(56个规格:1152,1280,...,8K)
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) { // 1KB对齐区间(56个规格:9K,10K,...,64K)
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) { // 8KB对齐区间(3个规格:72K,80K,...,256K)
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false); // 触发断言表示超出设计容量
}
return -1; // 无效返回值(实际会被断言拦截)
}
};
关键设计说明
- 自由链表管理
Push
/Pop
操作时间复杂度O(1)
- 利用内存块头部空间存储链表指针(节省管理开销)
- 分级内存对齐策略
- 索引计算优化
- 使用分组累计偏移量(group_array)快速定位规格位置
- 示例:1024字节请求计算过程:
Index = _Index(1024-128,16) + 16 = ((896 + 15)/16 - 1) + 16 = (911/16 - 1) + 16 = (56 - 1) + 16 = 71
- 性能优势
- 对齐操作使用位运算替代除法,效率提高
- 分级策略减少内存碎片
- 索引计算时间复杂度O(1),适合高频调用场景
线程缓存的设计
在有了上述的基础之后,我们可以开始搭建线程缓存(thread cache
)的框架。实际上,这个框架就是一个哈希桶,每个桶里都维护着一个自由链表,用来存储可用的内存对象。
为了让每个线程都有自己的缓存,我们可以使用一个叫做thread_local
的关键字。它的作用是声明一个线程局部存储 TLS
变量。线程局部存储 TLS
,是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
ThreadCache.h
#pragma once
#include <assert.h>
#include <thread>
#include "Common.h" // 包含公共定义(如FreeList、SizeClass等)
const size_t NFREELISTS = 208; // 自由链表总数(对应SizeClass的分组计算)
// 线程本地缓存类 - 每个线程独立的内存缓存
class ThreadCache {
public:
// TLS指针(延迟初始化)
static thread_local ThreadCache* pTLSThreadCache;
// 内存分配接口
void* Allocate(size_t size) {
assert(size <= MAX_BYTES); // 校验请求大小合法性
// 1. 计算对齐后的实际需求大小
size_t alignSize = SizeClass::Roundup(size);
// 2. 计算对应的自由链表索引
size_t index = SizeClass::Index(alignSize);
// 3. 优先从本地自由链表获取内存
if (!_freelists[index].Empty()) {
return _freelists[index].Pop();
}
// 4. 自由链表为空时从中央缓存批量获取
return FetchFromCentralCache(index, alignSize);
}
// 内存释放接口
void Deallocate(void* ptr, size_t size) {
assert(ptr && size <= MAX_BYTES); // 校验参数有效性
// 1. 计算原始分配大小
size_t alignSize = SizeClass::Roundup(size);
// 2. 计算对应的自由链表索引
size_t index = SizeClass::Index(alignSize);
// 3. 将内存块插回自由链表
_freelists[index].Push(ptr);
}
// 从中央缓存补充内存块(具体实现依赖CentralCache类)
void* FetchFromCentralCache(size_t index, size_t size) {
return nullptr;
}
private:
FreeList _freelists[NFREELISTS]; // 自由链表数组(每个元素管理特定尺寸内存块)
};
// 静态成员变量初始化
thread_local ThreadCache* ThreadCache::pTLSThreadCache = nullptr;
关键设计说明
- 线程本地存储(TLS)机制
// 使用示例(需在cpp文件中初始化): thread_local ThreadCache* ThreadCache::pTLSThreadCache = nullptr;
- 每个线程首次访问时初始化
pTLSThreadCache
- 线程退出时自动销毁,无需显式资源释放
- 每个线程首次访问时初始化
- 内存分配流程
- 内存释放流程
- 性能优化点
- 零锁竞争:通过
thread_local
实现无锁操作 - 缓存友好:高频操作完全在本地链表完成
- 批量传输:
FetchFromCentralCache
批量获取内存块
- 零锁竞争:通过
该实现需要配合CentralCache
中央缓存类和PageHeap
页堆类共同工作,形成完整的三层内存池架构。
只有线程缓存的内存池设计及测试
ConcurrentAlloc.h
#pragma once
#include "Common.h"
#include "ThreadCache.h"
// 并发内存分配入口(仿tcmalloc实现)
// 参数:size - 请求分配的内存大小(字节)
static void* ConcurrentAlloc(size_t size ){
// TLS延迟初始化(每个线程首次调用时创建专属缓存)
if (ThreadCache::pTLSThreadCache == nullptr) {
// 保证线程安全:thread_local保证每个线程只初始化一次
ThreadCache::pTLSThreadCache = new ThreadCache();
}
std::cout << std::this_thread::get_id() << ":" << ThreadCache::pTLSThreadCache<< std::endl;
// 执行实际内存分配
void* ptr = ThreadCache::pTLSThreadCache->Allocate(size);
return ptr;
}
// 线程调用这个函数回收空间
static void ConcurrentFree(void* ptr){
// 后续完成
}
test.cpp
#include "ConcurrentAlloc.h"
// 分配内存的函数1
void Alloc1() {
// 循环进行5次内存分配,每次分配6字节的内存
for (size_t i = 0; i < 5; i++) {
void* ptr = ConcurrentAlloc(6); // 使用并发内存分配函数分配6字节内存
// 这里可以对 ptr 进行操作或释放内存
}
}
// 分配内存的函数2
void Alloc2() {
// 循环进行5次内存分配,每次分配7字节的内存
for (size_t i = 0; i < 5; i++) {
void* ptr = ConcurrentAlloc(7); // 使用并发内存分配函数分配7字节内存
// 这里可以对 ptr 进行操作或释放内存
}
}
// 测试线程本地存储(TLS)机制
void TestTLS() {
std::thread t1(Alloc1);
std::thread t2(Alloc2);
t1.join();
t2.join();
}
int main() {
// 调用TestTLS函数,启动并执行分配内存的测试
TestTLS();
}
推荐一下
https://github.com/0voice