【项目笔记】高并发内存池项目剖析(三)
·
【项目笔记】高并发内存池项目剖析(三)
🔥个人主页:大白的编程日记
🔥专栏:项目笔记
文章目录
- 【项目笔记】高并发内存池项目剖析(三)
- 前言
- 一.Threadcache释放内存
- 二.Centralcache释放内存
- 三.Pagecache释放内存
- 四.释放过程联调
- 五.项目优化&&项目遇到的问题解决过程
- 5.1 大于256kb的内存申请和释放
- 5.2 使用定长内存池替代new
- 5.3 释放内存的参数优化
- 5.4 性能测试
- 5.5 性能瓶颈分析
- 5.6 基数树优化
- 5.6.1 基数树结构
- 5.6.2 基数树的优化点
- 5.7 遇到的问题&&发现过程&&解决过程
- 5.7.1 Bug1
- 5.7.2 Bug2
- 5.8 调试技巧
- 后言
前言
哈喽,各位小伙伴大家好!上期我们讲了高并发内存池的申请流程 今天我们来讲一下释放流程以及最后的优化 话不多说,我们进入正题!向大厂冲锋!
一.Threadcache释放内存
当内存不用时 直接归还插入到Threadcache对应的桶中 但是如果Threadcache的桶的内存太多 其他线程又没有内存使用
此时我们就可以考虑当桶的对象超过一次慢增长申请的个数时
我们就归还一次慢增长申请的对象个数 此时就可以实现均衡调度
让其他线程也可以申请到当前线程空闲的内存
再调用ReleaseListToSpans向centralcache归还内存
//释放内存块
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);//计算对应桶下标size_t index = SizeClass::Index(size);//头插对应桶的自由链表中中_freeLists[index].Push(ptr);//如果自由链表长度超过慢增长一次申请的长度 向centralcache归还慢增长一次申请的长度的节点//这样可以实现负载均衡 链表太长了 就可以考虑归还中心缓存 //这样其他threadcache线程内存不足时 就可以在中心缓存中使用当前threadcache线程不使用的内存块了if (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);//更新自由链表节点/*_freeLists[index].Size() -= size;*/}
}
// 释放对象时,链表过长时,回收内存回到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{void* start = nullptr, *end = nullptr;//弹出对应桶start到end长度为size的链表//cout <<"list.MaxSize():" <<list.MaxSize() << " list.Size():"<<list.Size() << endl;list.PopRange(start, end, list.MaxSize());//向central插入归还的链表CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
二.Centralcache释放内存
现在我们要把Threadcache的内存对象归还到对应的桶中
但是因为这些内存块归还的时间都不一样 所以他们可能来自不同的span对象
- 问题:如何找到这些对象所在的span呢?
所以这里我们让2000到2001页的所有地址 假设每个对象大小为8字节 此时我们算出来的对象页号都是2000页!
我们就可以先算出对象的桶下标 桶枷锁和对应的span 然后遍历归还对象链表 头插到span的自由链表中 更新span的usecount
同时如果usecount==0 说明span的对象都归还了 此时我们就可以把span归还给pagecache 就可以进行前后页的合并减少内存碎片
同时归还给pagecache前我们解开span后就可以解开桶锁
让其他线程可以申请和归还内存 减少锁消耗 同时pagecache加锁
// 将一定数量的对象释放到central对应的span中
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{//计算桶下标size_t index = SizeClass::Index(size);//对应桶加锁_spanList[index]._mutex.lock();while (start){void* next = Nextobj(start);//算出内存块所处的spanSpan * span = PageCache::GetInstance()->MapobjectToSpan(start);//头插到对应span的自由链表 申请出去的引用计数--Nextobj(start) = span->_freeList;span->_freeList = start;span->_useCount--;//引用计数为0 说明全部归还 此时就可以上page归还//page归还时可以进行前后页合并减少内存碎片if (span->_useCount == 0){//弹出对应的span 自由链表置空_spanList[index].Pop(span);span->_freeList = nullptr;span->_next = nullptr;span->_prev = nullptr;//归还时 先把桶锁结掉_spanList[index]._mutex.unlock();//向归还page过程加锁PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanList[index]._mutex.lock();//归还完后把桶锁加上}start = next;}_spanList[index]._mutex.unlock();//归还桶锁解掉
}
三.Pagecache释放内存
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{//大于128页直接释放if (span->_n > NPAGES - 1){void* ptr = (void*)(span->_pageId << PAGE_SHIFT);SystemFree(ptr);_spanList[span->_n].Pop(span);}//向后查找可以合并的spanwhile (1){PAGE_ID id = span->_pageId - 1;auto ret = (Span*)_idSpanMap.get(id);//判断是否存在对应的页号的spanif (ret==nullptr){break;}//判断该span是否正在被使用if (ret->_isUse == true){break;}//判断长度是否超过128if (span->_n + ret->_n > 128){break;}//更新span的起始页号 更新页大小 将被合并的span弹出span->_pageId = ret->_pageId;span->_n += ret->_n;_spanList[ret->_n].Pop(ret);/*delete _idSpanMap[id];*/_spanPool.Delete(ret);}while (1){PAGE_ID id = span->_pageId+span->_n;auto ret = (Span*)_idSpanMap.get(id);//判断是否存在对应的页号的spanif (ret == nullptr){break;}//判断该span是否正在被使用if (ret->_isUse == true){break;}//判断长度是否超过128if (span->_n + ret->_n > 128){break;}// 更新页大小 将被合并的span弹出span->_n += ret->_n;_spanList[ret->_n].Pop(ret);/*delete _idSpanMap[id];*/_spanPool.Delete(ret);}//将合并后的span插入桶中 更新span的前后页映射 更新使用状态为false_idSpanMap.set(span->_pageId,span); _idSpanMap.set(span->_pageId + span->_n - 1,span);if (span->_pageId >= 5000){int a = 0;}//cout << span->_n - 1;span->_isUse = false;_spanList[span->_n].PushFront(span);
}
四.释放过程联调
- 测试代码:
void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);//申请1块 拿走1块 还剩0块 下一次一次申请2块void* p2 = ConcurrentAlloc(8);//申请2块 拿走1块 还剩1块 下一次一次申请3块void* p3 = ConcurrentAlloc(1);//没有申请 拿走1块 还剩0块 下一次一次申请3块void* p4 = ConcurrentAlloc(7);//去central申请 申请3块 拿走1块还剩2块 下一次申请4块void* p5 = ConcurrentAlloc(8);//没有申请 拿走1块 还剩1块 下一次申请4块void* p6 = ConcurrentAlloc(8);//没有申请 拿走1块还剩0块 下一次申请4块void* p7 = ConcurrentAlloc(8);//申请4块 拿走1块 还剩3块 下一次一次申请5块cout << p1 << endl;cout << p2 << endl;cout << p3 << endl;cout << p4 << endl;cout << p5 << endl;cout << p6 << endl;cout << p7 << endl;cout << "第1次归还:" << endl;ConcurrentFree(p1,6);//Maxsize:5 size:4cout << "第2次归还:" << endl;ConcurrentFree(p2,8);//Maxsize:5 size:5 归还cout << "第3次归还:" << endl;ConcurrentFree(p3,1);//Maxsize:5 size:1cout << "第4次归还:" << endl;ConcurrentFree(p4,7);//Maxsize:5 size:2cout << "第5次归还:" << endl;ConcurrentFree(p5,8);//Maxsize:5 size:3cout << "第6次归还:" << endl;ConcurrentFree(p6,8);//Maxsize:5 size:4cout << "第7次归还:" << endl;ConcurrentFree(p7,8);//Maxsize:5 size:5 归还
}
这里我们打印日志发现和我们预估的释放过程一样说明释放正常! 并且我们根据内存块的地址算出来的页号也是一样的!
并且第七次归还时maxsize和size都是5 符合我们的预期!
然后全部归还给centralcache后此时就会向pagecache合并归还
此时分割出来的1页的Span就会与127页的的Span进行合并出一个128页内存块 再插入对应的桶中
这里我们并行测试多线程的场景 运行发现没问题
void MultiThreadAlloc2()
{std::vector<void*> v;for (size_t i = 0; i < 7; ++i){void* ptr = ConcurrentAlloc(16);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 16);}
}
void MultiThreadAlloc1()
{std::vector<void*> v;for (size_t i = 0; i < 7; ++i){void* ptr = ConcurrentAlloc(6);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 6);}
}
void TestMultiThread()
{std::thread t1(MultiThreadAlloc1);std::thread t2(MultiThreadAlloc2);t1.join();t2.join();
}
然后这里我们调试观察多线程进行centralcache向page归还合并的过程这里53216这个线程向前找到一个另一个线程申请页号为3017的页号 大小为1页的内存块
但是此时这个另一个线程还在使用 所以无法合并 只能先把自己的3016页的1页内存块进行归还
然后到另一个线程27196的归还内存块 此时前一个线程已经归还了 内存块不再使用 所以此时他向前找到前一个线程归还的3016页就可以合并出一个2页大小的内存块 所以他就是3017的页
然后两个线程各自拿走1页内存块 还剩余126的内存块
所以该线程继续向后合并就可以找到126页大小的内存块 页号为3018 此时就可以合并出申请的128页大小内存块 在插入128页的桶中
五.项目优化&&项目遇到的问题解决过程
5.1 大于256kb的内存申请和释放
大于256kb但是小于128页的内存直接向pagecache要内存 直接向pagecache要 这样还可以进行合并内存 内存池还在起作用
大于256kb并且大于128页 直接向系统申请和释放内存
测试代码
void BigAlloc()
{void* p1 = ConcurrentAlloc(257 * 1024);ConcurrentFree(p1,257 * 1024);void* p2 = ConcurrentAlloc(129 * 8 * 1024);ConcurrentFree(p2, 129 * 8 * 1024);
}
所以此时我们申请大于256kb但是小于128页的内存块时 此时就会向系统申请128页的内存块 然后切分为33页和95页的内存块
33页内存块就分配给我们 当我们归还时他又会和之前的95页的内存块进行合并 形成128页的内存 我们的内存池还可以起作用!
而当我们申请超过128页的内存大小时 此时就会指向向系统堆区申请和释放!
5.2 使用定长内存池替代new
- 我们的代码中申请span还是使用new 而我们本身就是为了替代malloc 所以我们就不能使用new
- 这里我们就可以使用定长内存池直接向堆申请和释放内存 效率比new要高
5.3 释放内存的参数优化
我们释放内存时需要传一个内存大小的参数 因为我们需要根据这个参数判断释放的内存是否大于256
进行进行分流处理 我们可以根据地址找到对应的span 而所有的内存块都是从span切分出来的
所以我们可以给span增加一个_size成员 我们只需要在page切分span时把size变为切分的大小
如果是直接向堆或pagecache申请 那span的大小就是申请内存块的大小
同时我们在申请内存修改哈希表时加了锁 但是我们在访问哈希表时没有加锁 这时就可能导致一个线程读取时 另一个线程正在修改 指针就会存在线程安全 问题
而我们访问哈希都是通过哈希访问的 所以在调用查询哈希的函数中加锁即可
5.4 性能测试
这里我们让n个线程并发执行n轮 每轮申请内存和释放内存x次 并统计总共所有线程每轮申请和释放的花费
分别对比使用malloc和我们的内存池的性能如何
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&,k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}int main()
{size_t n = 1000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
5.5 性能瓶颈分析
这里我们对比malloc和自己内存池 发现性能比malloc还要低
此时我们就需要借助性能分析的工具进行性能瓶颈分析 vs下自带了性能分析的工具
- 根据性能报告我们就可以发现我们的lock锁函数调用消耗占比很大 也就是因为锁竞争消耗太大导致我们的性能不如malloc 因为使用锁就会涉及上下文切换 阻塞等待 申请和释放锁等消耗
5.6 基数树优化
5.6.1 基数树结构
基数数结构就是一个分层的直接定制的哈希表
此时我们把哈希表改为基数树结构 修改调用哈希表的接口的地方即可 并且基数树本身更快
5.6.2 基数树的优化点
使用基数数我们可以不用对基数树加锁 就可以减少锁的性能消耗
并发访问需要解决数据不一致的问题 读读不会修改数据 数据可以保持一致 需要解决的场景就是读写 和写写 因为写的过程都加了锁 所以线程安全
-
结构不会改变:基数树的结构在访问前已经开好了 开好后删除不会动数据结构了
而红黑树和stl的开放定制法哈希表开好后删除会动数据结构 一旦修改数据结构就会涉及指针的修改导致线程安全问题 -
读写分离:读写不可能访问同一位置
-
因为基数树只有pagechace申请和释放内存的时候会写
因为是直接定制法的哈希表 并且开空间满足所有的页空间 不会出现冲突 也就不会访问其他位置 所以只访问对应的span的位置 不会访问其他span的位置 -
访问的过程空间已经开好了 而访问span* 一定是开空间之后 释放span之前
-
同一个span之后分给一个线程 所以每个线程都访问自己的span 其他线程不会访问别的线程的 span*
所以同一位置不会同时读写 线程安全 -
读写不分离:而红黑树或stl的哈希表是开放定制法
-
红黑树访问时需要遍历其他线程span来查找 而此时其他线程可能正在插入或删除span就会修改指针 会导致野指针问题
-
开放定制法的哈希表同一个桶也需要遍历其他span*的位置 其他线程也可能插入或删除时也要修改指针
此时当一个线程遍历读时其他线程就可能该位置写 同一位置就可以同时读写 指针指向就会出问题 导致野指针等问题
访问时数据结构不会改变和同一位置的读写分离保证了数据数据一致性 所以使用基数数可以不用加锁 减少锁的消耗提高性能!
5.7 遇到的问题&&发现过程&&解决过程
5.7.1 Bug1
void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);//申请1块 拿走1块 还剩0块 下一次一次申请2块void* p2 = ConcurrentAlloc(8);//申请2块 拿走1块 还剩1块 下一次一次申请3块void* p3 = ConcurrentAlloc(1);//没有申请 拿走1块 还剩0块 下一次一次申请3块void* p4 = ConcurrentAlloc(7);//去central申请 申请3块 拿走1块还剩2块 下一次申请4块
}
- 问题描述:测试申请流程时第4次 应该去
centralcache
桶的span中申请内存 按理说桶里面还有Span
可是却没有
所以继续去pagecache
申请
发现问题:对应桶里面没有Span 而centralcahce桶里面的Span又是从pagecahce申请来的之后插入的
所以我就去向centralcache向pagecache申请内存的函数中查看
就发现原来申请后的span 插入位置错了
竟然直接根据threadcache申请内存的大小直接作为centralcache哈希桶的下标插入span
这才导致下次去对应哈希桶查找时对应哈希桶为空!
- 解决问题:把插入下标为根据size计算后的哈希桶即可!
5.7.2 Bug2
- bug描述:进行多线程性能测试时发行写入权限冲突 说明我们访问了不该访问的地址空间
解决过程:这里start显示无法读取内存 说明start的值是有问题的
然后查看start的值是哪里来的 发现是Span的页号转化为的地址 然后发现span的页号很大
span的页号不应该那么大按理说 所以就是span的页号不正确导致转化出来的地址非法!
- big的span是从newspan来的 就可以去该函数的所有返回值前加上页号过大的判断语句
- 然后打一个断点 这就是条件断点 如果怀疑是某些条件导致的错误 可以使用这样的方式
- 然后F5调试 如果条件断点触发说明该条件成立 这里触发条件断点 果然是因为span的页号异常导致的
而span的页号都是在Pagecache申请和切分赋值的 所以可以去Pagecache中使用find查看页号赋值的地方
然后找出是三处设计页号修改的地方 此时第一处的页号有来自另一个span的页号 此时我们无需关注
因为我们本身就是要查看span的页号异常的问题 而这里只是span页号异常的传递而不是页号异常的原因
第二处是我们申请地址后通过位运算直接计算出页号 申请的地址自然没问题 运算的值也是没有问题的
第三处页号来自于我们之前存储页号和对应span映射的哈希表 所以大概率是因为我们的哈希表里面的span的值的问题导致的
而如果此时又纠结于哈希表里面的span的页号值的问题 此时又回到span页号的问题 此时不就是死循环了吗?
所以我们还要考虑一下页号确实是有问题 但是我们也需要考虑可能也存在映射位置的问题!
所以不管怎么我们可以find查找所有和哈希表映射有关的地方
此时我们终于发现问题了 我们在进行合并减少内存碎片时 对应的span后面的页号映射错了
应该是起始页号+页大小-1 所以此时我们就可以标记了一个错误的页号和span的映射
此时如果我们使用该错误的映射关系找到的span 此时的span已经被删除了 所以他的页号就是随机值
所以就会导致页号很大 导致转换的地址后就会访问到不属于我们的地址导致野指针问题!
此时我们程序就可以在多线程下正常运行了!
5.8 调试技巧
条件断点检测+调用栈帧+一一排查对应的参数 变量是否符合预期 排查成功发现没问题 就向上一次调用层查看 如果程序死循环调用全部中断 同时平时assert断言非常重要!
后言
这就是高并发内存池项目。大家自己好好消化!今天就分享到这! 感谢各位的耐心垂阅!咱们下期见!拜拜~