C++高性能内存池
目录
1. 项目介绍
1. 这个项目做的是什么?
2. 该项目要求的知识储备
2. 什么是内存池
1. 池化技术
2. 内存池
3. 内存池主要解决的问题
4.malloc
3. 先设计一个定长的内存池
4.高并发内存池 -- 整体框架设计
5. 高并发内存池 -- thread cache
6. 高并发内存池 -- central cache
7. 高并发内存池 -- page cache
8. 高并发内存池 -- thread cache回收内存
9. 高并发内存池 -- central cache回收内存
10. 高并发内存池 -- page cache回收内存
11. 高并发内存池 -- 大于256KB的大块内存申请问题
12. 高并发内存池 -- 使用定长内存池配合脱离使用new
13. 高并发内存池 -- 释放对象时优化为不传对象大小
14. 高并发内存池 -- 多线程环境下对比malloc测试
15. 高并发内存池 -- 性能瓶颈分析
16. 使用tcmalloc源码中实现基数树进行优化
17. 扩展学习及当前项目实现的不足
附录
几个内存池库的对比
tcmalloc源码学习
TCMALLOC源码阅读
如何设计内存池
tcmalloc源代码
1. 项目介绍
1. 这个项目做的是什么?
这个项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread--Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc,free)。
我们这个项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出了一个自己的高并发内存池,目的就是学习tcmalloc的精华。
tcmalloc: TCMalloc (google-perftools) 是用于优化C++写的多线程应用,比glibc 2.3的malloc快。这个模块可以用来让MySQL在高并发下内存占用更加稳定。https://gitee.com/mirrors/tcmalloc
2. 该项目要求的知识储备
这个项目会用到C/C++、数据结构(链表、哈希桶),操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。
2. 什么是内存池
1. 池化技术
所谓"池化技术"就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用"池"这种技术的地方,除了内存池,还有连接池,线程池,对象池等。以服务器上的线程池为例,它的主要思想就是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
2. 内存池
内存池是指程序预先从操作系统申请一块足够大的内存,此后,当程序中需要申请内存的时候,不要直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不正真将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
3. 内存池主要解决的问题
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?
内存碎片分为外碎片和内碎片,上面图中的是外碎片问题。外碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。内碎片的化我们的项目中也会涉及到。
4.malloc
C/C++中我们要动态申请都是通过malloc去申请内存的,但是我们要知道,实际上我们不是直接去堆获取内存的,而malloc就是一个内存池,malloc()相当于向操作系统"批发"了一块较大的内存空间,然后"零售"给程序用。当全部"售完"或程序有大量的内存需求时,再根据实际需求向操作系统"进货"。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的是微软自己写的一套,linux gcc用的glibc中的ptmalloc。
下面有几篇关于这块的文章,大概可以简单看看了解一下。
一文了解,Linux内存管理,malloc、free 实现原理 - 知乎https://zhuanlan.zhihu.com/p/384022573malloc()背后的实现原理——内存池 - 阿照的日志
https://azhao.net/index.php/archives/81/malloc的底层实现(ptmalloc)-CSDN博客
https://blog.csdn.net/z_ryan/article/details/79950737
3. 先设计一个定长的内存池
我们知道申请内存使用的是malloc,malloc其实是一个通用的大众货,什么场景下都可以,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池,当然这个定长内存池再我们后面的高并发内存池中也有价值的,所以学习他的目的有两层,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。
定长内存池设计
首先我们需要申请一个池子,把向系统申请的一大块内存给存起来,但是它还面临第二个问题,第二个问题就是内存池的所有管理除了申请还有释放,释放回来的内存怎么办呢?不能说把释放回来的内存给丢掉,也不能说把释放回来的内存还给操作系统,因为我们向系统申请其实是有要求的,比如我一次向系统申请了10万字节,那要还也要把这10万字节全还回去,但是现在申请了一大块内存,把它切成一小块一小块给要用的地方进行用,用了以后把它还回来的时候咋管理?
那这个定长内存池解决的就是固定大小,大小是固定的,比如20字节,30字节,这种比较纯粹的需求,但是它的性能可以达到极致,并且它不考虑内存碎片等问题。
那这个时候就涉及一个问题,它还回来的时候怎么办呢?
那它就要通过一个东西,_freeList,自由链表把还回来的内存挂起来,在要用的时候就可以给你。
windows和Linux下如何直接向堆申请页为单位的大块内存:
VirtualAlloc_百度百科https://baike.baidu.com/item/VirtualAlloc/1606859?fr=aladdinLinux进程分配内存的两种方式--brk() 和mmap() - VinoZhu - 博客园
https://www.cnblogs.com/vinozly/p/5489138.html定向内存池代码:
Thread--Caching Malloc: 这个项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread--Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc,free)。这个项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出了一个自己的高并发内存池,目的就是学习tcmalloc的精华。 - Gitee.comhttps://gitee.com/Axurea/thread--caching-malloc/tree/master/ConcurrentMemoyPool
4.高并发内存池 -- 整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
- 性能问题。
- 多线程环境下,锁竞争问题。
- 内存碎片问题。
concurrent memory pool主要由以下3个部分构成:
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁的,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- oage cache:页缓存时在central cache缓存上面的一层缓存,缓存的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
5. 高并发内存池 -- thread cache
thread ache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象 的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
内碎片和外碎片分别在什么场景下产生的:
内碎片:就像上面的情况,因为对齐的原因,你要5字节,给你8字节,你要6字节,还给你8字节,最后有一些字节在分配的对象里面用不上。
外碎片:是一片连续的空间,被切成了好多块分出去,只有部分还回来了,但是它们不连续,比如两块加起来是300多,但是我刚好要300,这样也是不行的,因为两块空间不连续。
申请内存:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
- 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置,将对象Push到_freeList[i]。
- 当链表的长度过长,则回收一部分内存对象到central cache。
TLS--thread local storage:
线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。而熟知的全局变量,是所有线程都可以访问的,这样就不可避免需要锁来控制,增加了控制成本和代码复杂度。
linux gcc下 tls:
Thread Local Storage(线程局部存储)TLS - 知乎https://zhuanlan.zhihu.com/p/142418922
windows vs下tls:
线程本地存储(Thread Local Storage) - 坦坦荡荡 - 博客园https://www.cnblogs.com/liu6666/p/12729014.html
thread cache的映射:
当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,如果我们每个字节都挂一个自由链表的话,那么就得256 * 1024个自由链表,这样的话太多了,所以这个时候就不需要创建那么多自由链表,选择做一些平衡的牺牲。
比如说第一个位置挂的这个自由链表都是8字节的,第二个位置挂的是16字节的,第三个位置挂的都是24字节的,然后一直往下推的话就不需要挂那么多。
这个时候,如果你的需求是小于等于8的,那么就去第一个下面申请内存块对象,如果是大于8并且小于等于16,那么就去第二个下面申请内存块对象,这个时候存在的问题就是空间浪费,你要10字节,结果给了你16字节,这样是可以的。
但是你要的是5字节,我给你8字节,也就意味着后面3个字节永远用不上,这个就叫内碎片。
计算对象大小的对齐映射规则:
首先我们第一个肯定是按8字节对齐,如果按4字节对齐的话在32位下就没问题,但是在64位下就会有问题。
最简单粗暴的处理就是所有的都按照8对齐,1~8对齐到8,9~16对齐到16,那也得创建32768个自由链表的桶,可以,但是有点多。
所有我们就有了以下规则:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16]
// [128+1,1024] 16byte对齐 freelist[16,72]
// [1024+1,8*1024] 128byte对齐 freelist[72,128]
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184]
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208]
1~128之间都按照8字节对齐,129~1024按照16字节对齐,1K~8K按照128字节对齐。
对齐会引入一个问题,这个问题就是内碎片。
比如我要129字节,但是要按16对齐,所以实际是144字节,也就是浪费了15字节
实际上是浪费了百分之10。
比如说我要1025个字节,按照128字节对齐,实际上就分配了1152字节,也就是浪费了127字节。
实际上是浪费了百分之11。
对齐数越大,桶的数量就越少,最开始对齐数不敢太大,就浪费太多了,所以是分段来对齐的。对齐数不断增大。
对齐数的计算:
//普通人想到的
size_t _RoundUp(size_t size,size_t alignNum)
{size_t alignSize = 0;if (size % alignNum != 0){alignSize = (size / alignNum + 1) * alignNum;}else{alignSize = size;}return alignSize;
}
//高手想到的
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
我们具体来看看第二个,我们先算算1~8的。
桶的数量:
1~128,按8字节对齐的话,总共就16个桶,0~15。128 / 8 = 16。
129~1024,按照16字节对齐,总共56个桶。1024 - 128 (因为前128字节是按照8字节对齐的) 896 / 16 = 56个桶。
//计算桶的位置 - 我们自己写的
static inline size_t _Index(size_t bytes,size_t alignNum)
{if (bytes % alignNum == 0){return bytes / alignNum - 1;}else{return bytes / alignNum;}
}//计算桶的位置 - 高手写的
static inline size_t _Index(size_t bytes, size_t align_shift)
{return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
1左移align_shift算出的是对齐数,然后加上字节数再-1,然后在右移动align_shift,比如说align_shift是3,右移3位就相当于除8,然后在减1,减1是因为下标是从0开始的。
//计算映射到哪一个自由链表桶
static inline size_t Index(size_t bytes)
{assert(bytes < MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16,56,56,56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}
}
1~128是按照8字节对齐的,到后面就不是8对齐,是16,128对齐,前128字节对应16个桶,但是129到1024之间对应56号桶,桶的数量可以提前算好。
比如说我要算129,那129在多少号桶呢?
首先得先减掉128,因为128不是按16字节对齐的,是按8字节对齐的,剩下的1个是按照16对齐的,那就按照16对齐的算就可以了。
小于128就直接计算按照8对齐的就可以了,如果是在129到1024之间,先把128减掉,然后按照2的4次方,16字节对齐,然后算第几号桶,但是它已经不是从第0号桶开始排的了,前面已经排了15号,要从第16号桶开始排,那么就要把前面的桶的数量,前面的桶的数量我们写到一个数组中了,然后以此类推。
6. 高并发内存池 -- central cache
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂的是SpanList链表结构,不过每个映射桶下面的span中的大内存被映射关系切成了一个个小内存块对象挂在span的自由链表中。
我们的每个线程没有内存之后要申请内存找的是thread cache,每个线程独享一个thread cache,thread cache里面,如果挂的自由链表下面挂的有内存就去自由链表下面获取一个对象,比如说再24下面,如果这个下面有,就头删拿走一个,还有就是映射的那个下面没有,这个时候就只能找下一层,下一层就是central cache,central cache自己也是一个哈希桶的结构,他的结构跟thread cache还是有非常多的相似之处,它们的核心都是一个按这个大小的对齐规则映射的,它也是一个哈希桶结构,但是它们又有一些不同。
它们的映射规则都是一样的,这个时候就方便取对象,但是不同的是thread cache是每个线程独享一个,而central cache是每个线程如果没有内存都会找它,所以说central cache里面是需要加锁的,比如说线程1,16字节1号桶下面没有,线程2在16字节,1号桶下面也没有,所以两个线程同时来找它的1号桶,那都在1号桶里面取数据,不加锁的话是会有问题的,但是不同的是这块用的这个锁是一个桶锁,桶锁就是每个桶会有一个锁,如果线程1和线程2来了以后都找这个桶(16字节映射的这个1号桶),那这个时候谁先到谁获取锁,另外一个阻塞,等着前面线程搞完了,另外一个线程才能进来,假设线程1要的是1号桶,线程2要的是2号桶,那这个时候你获取你的锁,我获取我的锁,这个时候不会存在,对性能的影响也不大,(存在竞争的话势必有的会被阻塞住)。
central cache和thread cache不同的地方除了要加锁,不同的地方在于thread cache下面挂的是一个一个切好的对象,而central cache下面挂的不是一个一个切好的对象,这个下面挂的叫做一个一个的span,span是跨度的意思,span在这个地方管理的是以页为单位的大块内存。
//Span管理一个跨度的大块内存
//管理以页为单位的大块内存
//管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; //大块内存起始页的页号size_t _n = 0;Span* _next = nullptr; //双向链表的结构Span* _prev = nullptr;size_t _objSize = 0; //切好的小对象的大小size_t _useCount = 0; //切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; //切好的小块内存的自由链表bool _isUse = false; //是否被使用
};
还涉及的一个问题就是下面不止一个span,可能会有多个span,为什么会存在多个span呢?比如说thread cache的16这个位置没有,那就找central cache的16的位置,central cache的16的位置也没有的话就找下一层,下一层就是page cache,要一个大块内存,要到一个大块内存之后就会把它切好,切成16字节的好多个对象,然后的话比如你只要一个,那么我这里不一定给你一个,你要一个我一次尽量给你多给一点,比如说你这一次要一个,那么可以批量的给你10个,以后你还要16字节大小的时候就在thread cache可以取了,thread cache中取是最好的,它没有锁,但是这里如果只有一个span的话,这个span终究不够你用,比如说你这个线程对16字节的需求非常非常的大,比如我一个span是两页,切成好多个,一个span你拿完了,那我就申请下一个span,以此类推,所以说下面挂了好多个span,因为一个span的内存切出来不一定够你用。所以说它这个下面挂的是一个一个的span,然后这个span是以多个页为单位的大块内存,把这个大块内存又切小,切成了一个小块小块的内存挂载这,那这个小块是多少呢?8字节映射下面把这个span就切成8,16字节映射下面把这个大块内存就切成16,24就切成14,一直往下切,每个span页数大块内存不是一样大的,比如你要的是8字节,那么给页的话就给个一两页就可以了,比如说你这个下面是156K,那这个span就要更大一些,那谁来决定这个呢?页数就可以控制span的大小。
也就是说thread cache没有内存了,就找central cache,central cache把这个内存给你,把span里面的切给你,如果它这里没有span,或者所有的span就给完了,它会再找下一层,page cache。
它里面还有一个_usecount,_usecount这个值比如说你切出来,假如是1页,1页我们给8k,1页给8k的话就是1024*8,然后再除8字节,可以分成1024块,那我就切成1024块挂在span下,thread cache没有内存了都找central cache要,central cache然后就不断的分给thread cache,那么是怎么知道分完的呢?里面有个自由链表,分完的话自由链表就为空了,同时还在做另一件事情,_usecount也在++,最开始是0,不断++就加到1024了,那么这个有什么价值呢?
central cache要起到两层作用,
第一层就是均衡调度,均衡调度的意思是你这个线程不断的申请,我把内存给你,但是你现在线程现在不用了,把内存还给thread cache,thread cache就把它插入到对应的桶,但是有可能这个桶里面就挂了好多好多的内存,但是线程2要内存的话取不到线程1的啊,那么怎么办呢?也就是说thread cache不仅仅要插入进去,下面还需要做一些处理,如果下面挂的内存太多了,就要把它还给central cache,那这个时候线程1申请之后还回来的,线程2,线程3都可以用,你不够了可以给你给,但是你太多了要还给我,还给我我就可以分给其他线程,这就是均衡调度。
第二层就是,我把大块内存切分成小块给你的时候_usecount就++,你还回来的时候就--,比如说我这个span是一页,我切出了1024个8字节,最开始一个都没分配的时候usecount是0 ,说明没有人在用,然后有人再用,没分1个就加1,每分10个就加10,然后别人还回来就--,那么我是怎么知道以页为单位的这个大块内存都还回来了呢?申请出去是++,还回来是--,当usecount是0的时候,就意味着没人用了,全部就还回来了,所有对象都回到了span,所有对象都回到了span的话那对下一层就要做处理了,它把这个span要还的下一层,page cache,page cache会对前后的页进行合并,合并出更大块的内存,来解决内存碎片的问题,这个解决内存碎片是外碎片的问题。它不是完全解决,而是缓解内存碎片的问题。
central cache起的作用就是承上启下的作用。
总结:
申请内存:
- 当tthread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类型网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
- central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
- central cache的中挂的span中use_count记录分配了的多少个对象出去,分配一个对象给thread cache,就++use_count。
释放内存:
- 当thread_cahce 过长或者线程销毁,则会将内存释放回central cache中的,释放回来时-- use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
span下面挂的是一页或者多页的跨度,然后这个跨度又被切成了小对象,我们可以看到,有些span里面有,有些span里面没有,这里为什么要多个span呢?
假如现在有一个span,在8字节下面,一页,8K,然后这个span下面有一千多个对象,不可能一千多个都给你thread cache,给你你可能也用不完,其他的就浪费了,那我就可能批量的给你,给你5个,8个,或者10个,一个span中的内容有可能给线程1,有可能给线程2,有可能给线程3,这个span的小块对象用完了就再申请一个span,如果一个span下面为空,就说明它的小对象都被分配出去了,也有可能有两个span下面不为空,原因就是这个span还在用的时候又有一些线程内存太多了,还了一些回来,挂到这个span下面,所有就是哪个span里面有东西是不好说的,本来就是有来有回。所有会有多个span,哪个span下面还有对象,有多少对象都不好说。
这里的span结构为什么要设计成双向链表呢?
你要的时候_usecount会++,你还回来的我就--,直到减到0的时候就说明这个span对象全还回来了,然后在还给page cache,进行前后页的合并,那这个时候结构里面就涉及一个问题,这个问题就是可能某一个span回来了,这个span不知道在哪个位置,要删除掉,还给下层的page cache,所有我们设计成双向链表,设计成单向链表的话删除的话就比较麻烦,设计成双向链表的话最好设计成带头双向循环链表,因为带头双向循环任意位置插入删除最简单,又高效,又是O(1)。
页号:
所以我们第一个写_WIN64,第二个写_WIN32。
#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else//linux
#endif
单例模式:
一个线程通过TLS,获取到独享的thread cache,有内存了就直接取了,没有内存的话就找central cache,每个线程走到thread cache里面,如果没有内存,就要调FetchFromCentralCache这个函数,那么FetchFromCentralCache如何去获取到central cache呢?
- 第一种呢就是用全局的变量。
- 但是更好的方式,一般在项目中像central cache,page cache都一样,他们尽量要求全局只有唯一一个,每个线程都有独享的thread cache,但是整个进程中只有一个central cache,每个线程的thread cace都能很容易的找到它,这种场景就非常使用单例模式。
单例模式分为懒汉模式、饿汉模式,一般懒汉模式略复杂点,我们这个项目中用饿汉模式就可以了。
慢开始启动算法:
thread cache本身是无锁的,但是走到central cache,按理来说桶锁也还好,但是我们去竞争同一个桶的话,那这个锁的问题就严重了,那这个竞争还是非常严重的,效率就会有损失,那可不可以这样呢?你这次要一个,我多给你一点,就像增容一样,你空间不够了,要插入数据,不是增一个,而是增2倍。但是多给多少呢?最粗暴的就是你要1个,我给10个,那么你要1个,我给10个,后面的9次来申请,都是无锁的,那第11次申请又要走到它里面,那么我能不能再多给一些呢,比如给100个,1000个,可以,但是不好,给多了用不完,浪费,并且我们还要明白另外一个问题,如果你是8字节还好,给你10个,100个都还好,但如果后面的大对象呢?最好的就是小对象就可以多给一点,大对象就少给一点,并且最开始也不要给太多,比如说一开始就给10个,但是只用了一两个,剩下的也用不着。别人想用,可能你把资源占用了,基于这样的原因,这里用了一个慢开始反馈调节算法。
//一次thread cache从中心缓存获取多少个
static size_t NumMoveSize(size_t size)
{assert(size > 0);//[2,512],一次批量移动多少个对象的(慢启动)上限值//小对象一次批量上线高//小对象一次批量上线低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;
}
7. 高并发内存池 -- page cache
申请内存:
- 当central cache向page cache申请内存时,page cache线检查对应位置有没有span,如果没有则向更大页找一个span,如果找到则分裂成了两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
- 如果找到_spanList[128]都没有合适的span,则向系统使用mmap,brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache的核心结构都是spanlist的哈希桶,但是它们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小和对齐关系映射的,他的spanlist中挂的span中的内存都呗按映射关系切好连接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,页就是说第i号桶中挂的span都是i页内存。
释放内存:
- 如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果可以合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
page cache自己也是一个哈希桶,但是它的结构跟之前的结构有本质的区别,本质的区别在哪呢?它的这个哈希桶里面挂的也是一个一个的span,首先它有两个不一样的地方,page cache哈希桶映射挂的是span,而central cache也挂的span,但是它们有区别:
-
central cache 这块的映射规则跟thread cache保持一致,而page cache和前面不一致,page cache的映射规则是第1个桶中挂的是1页的span,第128个桶挂的是128页span。
-
central cache里面的span被切成了小对象,供给thread cache用,thread cache用完了或者不用了,太多了可以还给central cache,而page cache里面的span大块内存不切小,因为page cache针对是给central cache服务的,central cache没有span了,那你就按你的对象大小算,你要几页的span,也就是说thread cache找central cache,central cache如果来找span,如果没有span或者没有空闲的span的时候要找下一节,如果我要的单个对象小,比如我8字节这个桶中要span,那我就要一页,两页的就可以了,如果我要的是256K的,那我要的span可能是很多个页的,central cache找page cache的时候关注的是我要的是几页的span,那我是怎么知道我要的几页span是在哪呢? 它这个是直接定址法的映射,按页数,1页的就在第0个桶,两页的就在第一个桶,或者把第0个位置空出来,1页的就在第一个桶,两页的就在第2个桶。
另外一个问题就是span的central cache里面给thread cache,thread cache不用了还回来了,usecount--,减到0了以后那我把这个span要还给page cache,那page cache就会找前后的页进行合并,不管合并还是没合并,算出来我是一个10页或者5页的span,那我插入到哪个桶里面去呢?那这里也是直接定址法,你是几页就找几号桶,把这个span插入进去就可以了。
这块就是page cache的结构的设计。
我们最大申请的单个对象是256KB,当然我们的桶最大页可以变成256个,最大的单个对象是256,128 * 8(一页8k),最大的span已经是1M,就是1024KB,切你最大的对象可以切4个,当然向再放大一些也是可以的。
第一个thread cache没有内存找central cache,central cache找page cache,第二个thread cache没有内存找central cache,central cache找page cache,page cache也是需要加锁的,page cache里面不能用桶锁了,这里要用一个全局的,整体给锁住。
page cache没有就去找堆要了,但是我们找堆要尽量要的大一点,连续我切小了,还回来以后还能合并成一个大的,如果你要两页的span,找堆要,它是可能不连续的,如果两页的span没有,那我就往下找,找3页的span,将3页的span,切分成一个两页的span和一个1页的span,两页的给用,1页的挂在对应的桶的下面。申请的流程就是找对应位置,如果对应的位置没有,那么就要去切。
整体这样的好处就是合并,合并的意思就是central cache中的小对象都回来了,那么page cache是要做合并的。
最开始page cache中一个东西都没有挂,假设现在需要一个两页的span,但是两页这样地方没有,你是去找堆直接要一个两页的吗?不是,首先是往后去找,看有没有比两页大的,因为一个大页可以被切成小页,你有一个3页的,可以被切成两页和1页,所有就往后找,最极端的情况就是一个都没有,那这个时候怎么办呢?
最开始都为空的情况下,需求是两页的soan,那这个时候就是向系统申请一个128页的span,然后挂到128位置,我向系统申请,只会申请128页,你要两页的话,那么就切,切出来的两页给你用,剩下的126挂在对应的126的下面。
当thread有的内存不要了,就相当于这个线程malloc了,free回来了,太长了,满足一定条件就会给central cache,还到central cache对应的span,central分出去是++usecount,回来是--usecount,当某个span的usecount减到0的时候,说明它的这些小内存全回来了,没有人再用了,那我就可以把span还给page cache,而page cache里面就可以做一件事情,还回来一个span以后,比如说你的页号是100(假设当前是一个10页的span),那么就可以查一下前面99页所在的span和后面的110页所在的span是不是空闲,先往前查,如果这个是空闲,就合并,再往前查,如果再空闲,就继续合并,直到前面的不空闲了,就往后查,后面的空闲,也合并。
如果central cache中的span suecount等于0,说明切分给thread cache的小对象都回来了,则我们的central cache把这个span还给page cache,page cache通过页号查看前后相邻页是否空闲,空闲就合并,合并出更大的页,解决内存碎片的问题。
如果前后都是散的页,那么我申请大内存就申请不出来,那么如果能够合并,就不存在这个问题了,如果你的内存都还回来了,那么前后的页是一定空闲的,因为我本身就是一个大块的内存,向堆申请了一个128页的span,然后被切小了,分出去了,如果这些span都还回来了,那它去搜索前后的页进行合并,那么一定就会被合并成一个128页的span,那这个时候就不至于这些span都是两页三页的,你想用一个10页的我出不来,所以说page cache本质做的就是我一次向堆要一个大一点的,这个时候你要小块的,再按页给你切小,切小了当你还回来以后又会被合并大,所以说在page cache里面它在不同的桶之间可能会把大页给切小,切小了拿出去用,剩下的给挂起来,会把小页给拿出来,合大,那这个时候还能搞成桶锁吗?不能,两个线程同时获取到某个位置的span,同时切,切好了同时往对应的位置插入,所以说用一个桶锁是不行的,因为桶锁锁不住,我两个线程同时在操作这一块的时候你只把取对象给锁住了,那我取到了这个对象插入的时候还是解决不了,所以这里不敢用桶锁,因为这是整体,我再取页,合并,把大页切小,挂起来,还回去,所以说挂一把整体的大锁才是安全的。
有的人还是觉得用桶锁是可以的,但是有一个非常难控制的东西,就是说线程1要的是1页的span,线程2要的是两页的span,1页和两页都没有,我们都会往后去搜索,那么每找后面的桶都会加一下锁,否则3页的span,线程1要,线程2也要,它们就获取到同一个东西,那就会涉及到用桶锁也不是完全不可以的,用桶锁的话就涉及到不断的加锁,解锁,一把大锁更好,加锁解锁的消耗也挺大的。
central cache用桶锁就是因为我要用8字节,那么就只会找8字节,不会找其他的桶,page cache这里的可能会去找其他的桶,把大页切小,或者说释放的时候,要找小页,要把小页合大,所以说桶锁不是完全不可以,效率反而受到了影响,因为加锁解锁会导致线程频繁的睡眠,保存上下文,唤醒,都会有性能的消耗。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);//加锁_sInst.GetMtx();// 先检查第k个桶里面有没有spanif (_spanLists[k].Empty()){return _spanLists->PopFront();}// 检查一下后面的桶里面有没有span,如果有,可以把它进行切分for (size_t i = k + 1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){/** 切分成一个k页的span和一个n-k页的span* k页的span返回给central cache* n-k页的span挂到n-k桶中去*/Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切一个k页下来//k页的span返回//nSpan再挂到对应映射的位置kSpan->_pageid = nSpan->_pageid;kSpan->_n = k;nSpan->_pageid += k;nSpan->_n -= k;//把nSpan头插进去_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}//走到这个位置就说明后面没有大页的span了//这时就去找堆要一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;//计算页号bigSpan->_n = NPAGES - 1;//页数_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}
我在断言后面加锁是肯定不行的,这里互斥锁锁住了会有一个特别恶心的问题,假设锁在这里,往下走,下面都没有,申请一个128的会再调自己,然后就把自己给锁住了。
这种也是可以解决的,有两种方式。
- 互斥锁中有个专门支持递归锁的,不会出现刚才的死锁,递归互斥锁的话可以解决刚才的问题,如果是递归调自己会检查一下。
- 可以分离一个子函数出来。
Span* PageCache::NewSpan(size_t k) {//加锁_sInst.GetMtx();return _NewSpan(k); }//获取一个k页的span Span* PageCache::_NewSpan(size_t k) {assert(k > 0 && k < NPAGES);// 先检查第k个桶里面有没有spanif (_spanLists[k].Empty()){return _spanLists->PopFront();}// 检查一下后面的桶里面有没有span,如果有,可以把它进行切分for (size_t i = k + 1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){/** 切分成一个k页的span和一个n-k页的span* k页的span返回给central cache* n-k页的span挂到n-k桶中去*/Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切一个k页下来//k页的span返回//nSpan再挂到对应映射的位置kSpan->_pageid = nSpan->_pageid;kSpan->_n = k;nSpan->_pageid += k;nSpan->_n -= k;//把nSpan头插进去_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}//走到这个位置就说明后面没有大页的span了//这时就去找堆要一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;//计算页号bigSpan->_n = NPAGES - 1;//页数_spanLists[bigSpan->_n].PushFront(bigSpan);return _NewSpan(k); }
这里还涉及到的一个问题就是thread cache是无锁的,里面没内存的话就会去找central cache,但是central cache是桶锁,central cache没有内存的话就去找page cache,page cache是整把大锁,那就是central cache锁住了,page cache也锁住了,那去page cache中的时候要不要把cantral cache中的桶锁给解开呢?
答案是解了好,因为把桶锁给解了,走到page cache中去了,别的线程就可以访问这个位置了,有的人可能会这样想,一号线程再central cache中没找到对象就去找page cache,二号线程来了也找不到对象啊,然后又去找page cache,这里的一把大锁也给锁住了,但是别忘了,除了有申请,还有释放啊,如果central cache不解桶锁的话,直接去找page cache,带来的问题就是再有其他线程来,要访问这个桶,也访问不了,申请确实没啥价值,因为这个下面确实没有什么内存了,但是如果我是释放呢?也会锁到这个里面,我是释放的话我是可以往这些spanl cache中还内存,把释放也给堵住了不好,所以说最好解了。
不用,因为我去page cache获取到一个span了以后是加锁的,那我获取的k页的span只有我当前线程才能拿到,其他线程拿不到,下面的过程就是对获取的span进行切分了,这个过程不需要加锁,什么情况下需要加锁,多个线程需要竞争的情况才需要加锁。所以说只要访问这个桶才需要加锁,但是我们切分这个span其他线程访问不到,不需要加锁。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);// 先检查第k个桶里面有没有spanif (_spanLists[k].Empty()){return _spanLists->PopFront();}// 检查一下后面的桶里面有没有span,如果有,可以把它进行切分for (size_t i = k + 1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){/** 切分成一个k页的span和一个n-k页的span* k页的span返回给central cache* n-k页的span挂到n-k桶中去*/Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切一个k页下来//k页的span返回//nSpan再挂到对应映射的位置kSpan->_pageid = nSpan->_pageid;kSpan->_n = k;nSpan->_pageid += k;nSpan->_n -= k;//把nSpan头插进去_spanLists[nSpan->_n].PushFront(nSpan);return kSpan;}}//走到这个位置就说明后面没有大页的span了//这时就去找堆要一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;//计算页号bigSpan->_n = NPAGES - 1;//页数_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}
现在我要申请一个两页的span,两页的span后面都没有了,我现在去申请一个128页的span挂载这,我不如直接把128页的切成两页的和126页,126页挂下来,两页给上面,如果不直接切就会递归调自己,我明明知道两页这个位置没有,再从两页走到128页,最后再128页这个位置找到了,再来切分,这就是选择的问题,如果不递归调自己,直接切分,也是没有问题的。
有人会说,递归调自己,慢了,确实慢了,直接切相较于递归调自己再遍历一次所有的桶肯定快,无谓的消耗,因为我知道前面肯定没有,从本质上来说,肯定递归慢一点点,但是计算机太快了,几乎没啥影响,可以忽略的。
反而从代码的角度来说复用更重要。
8. 高并发内存池 -- thread cache回收内存
9. 高并发内存池 -- central cache回收内存
实际thread cache申请的时候一批一批的申请,thread cache中那个线程再用内存的时候,谁先还回来的是不确定的,满足大于一批的长度,然后就开始回收,那也就意味着每个下面有多个span,每个内存块到底数据哪一个span,都是不确定的,那么我怎么知道哪个对象是属于哪个span呢?
这些小块内存一定是由页内存切出来的,比如说这些小块内存是由两个页切出来的,比如是第2000页和第2001页,这两个页里面切出的内存,比如说前两块是属于2000这个页号的,后两块是属于2001这个页号的,那我怎么知道我是属于哪个页的呢?
你这个内存块是属于这个页之内,那内存地址就是从0xFA0000这个地址往上加,那这个地址除以8k是2000,那上面的地址除以8k也是2000,不仅仅是页的起始地址除8k是2000,因为页的起始地址除8k是2000整除,中间的不整除,余数也就丢了,要除尽地址也就走到了下一页。
void TestAddressShift()
{PAGE_ID id1 = 2000;PAGE_ID id2 = 2001;char* p1 = (char*)(id1 << PAGE_SHIFT);char* p2 = (char*)(id2 << PAGE_SHIFT);cout << (void*)p1 << endl;cout << (void*)p2 << endl;while (p1 < p2){ cout << (void*)p1 << ":" << ((PAGE_ID)p1 >> PAGE_SHIFT) << endl;p1 += 8;}
}
这里我们要注意的是p1是char*类型的,char*的话cout输出走char*会把它识别成字符串,遇到'\0'才终止,其它指针都按十六进制输出,但是char*会按字符串输出,所有我们要强转,不然输出会有问题。
我知道了内存块的地址,确实找不到对应的span,但是可以算出我是哪个页,如果我们暴力点的话我知道我是属于哪个页的话我就可以遍历一下这些span,这些span里面都记录了起始页号,我就可以计算在不在这些span的范围内,就可以知道它是属于哪个span了,这是第一种比较暴力的方式,但是这样的话就是一个n方的算法了,因为我每个内存块都需要把你的span遍历一遍,这个时候我们就可以搞一个映射,我们可以在page cache中加一个页号跟span的映射,因为page cache中也要用这个来合并内存。
10. 高并发内存池 -- page cache回收内存
为了解决内存锁片或者缓解内存碎片的问题,假设还回来了一个span,假设这个span是1页,我们第一种的处理方式就是直接把它挂到1页的span下面就可以了,但是这样的话就会导致一个问题,比如说刚才的128页被切成了好多1页,2页,3页,切出来给central cache用了,central cache都不用了,还回来了,page cache把它挂起来,都是一些零零散散的小块内存,假设都是10页之内的,但是我现在想要一个10页以上的内存,如果这些内存块前后拼一下,可能是连续的,但是现在要一个10页的内存,要不了,而这些内存碎片化的散在这里,所以我们要对这种外碎片问题解决一下,怎么解决呢?
内碎片问题我们解决不了,想要桶少就只能忍受内碎片,但内碎片没啥问题,内碎片我申请出去给别人用了,比如你只要6字节,我给了你8字节,有两字节的碎片,给你用了,然后你还回来的8字节下次还可以给你申请出去,只是暂时的,但是外碎片问题如果不去解决就一直在,所以这里的span回来了,我们先得对前后页尝试进行合并,缓解内存碎片问题(外碎片),而不是直接挂起来。
比如说我这个页是第1000页,那么pageid就是1000,n就是1,那么我们就可以找一下前面的页,找一下999,看999是不是空闲的,找了999所在页的span,如果空闲就合并,合并了还要再往前走,有可能还要再往前合并,直到不能合并,往后就找一下1001,如果pageid是2000,n是5,往前走找到是1999,找到这个页所在的span,假设1999所在的是两页的span,那么这两个就合出一个7页的span,往后就找2005,假设2005这个又是一个5页的span,那么我们就可以合并出12页的span,那么我们这里就要想办法找到前后页所在的span,因为前面的页再一个span,那个span起始页是多少,总共有几页,都不好说,我们要找前后页去进行合并,那么这里我们又要需要id到span的映射,它可以再central cache中通过内存找所对应的span,因为内存块的地址可以转换成页号,页号可以找到它所在的span,合并的时候找前后页也是一样的,不用转,直接通过页号找前后相邻页。
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
还有一个问题就是,通过页号找到了span,但是那个span能不能进行合并呢?
那么就要看两个问题,那个span有可能挂在page cache,只要在page cache中都可以合并,在central cache中不可以合并,正在用,那么我们应该怎么识别是在central cache还是在page cache中呢?有的同学可能会觉得在page cache中usecount都是0,需要内存,没有内存了才会把span切好给central cache,central cache把它切好了再用,听起来是没有问题的,但是实际上是有问题的,central cache在自己的spanlist中找span,没有找到,然后向page cache要了一个,要了一个之后还在切的过程中,这个时候还没有给别人分的时候,usecount就是0,然后正准备把切分的内存给了然后再增加usecount的时候,另外一个线程还回来了一个相邻页,然后把这个span拿过去合并,所以说不敢用usecount,用usecount可能会出现一个问题,就是刚把它从page cache拿到central cache,central cache又拿回来合并了,这个时候就有线程安全的问题。
本质就是你要合并你只能合并page cache的,因为page cache的没人再用,central cache的都有可能在用,但是central cache中并不是每个span都是0,也有可能正在切分的,从page cache中刚拿过来的,所以这个时候我们要想一个办法解决这个问题,我们可以增加一个布尔值,_isUse,来判断当前是否正在使用,默认是false,都可以合并的,只要这个span被分给了central cache,central cache的所有span肯定都是被分过来的,只要被分过来我们就把这个span中的_isUse的值修改为true。
我们先找一下前一个id合不合适,前一个id一定是span->_pageId - 1,比如我是2000,减1就成了1999,那这里能不能找到呢?其实是找不到的,因为我们的这个map中只有分给central cache的我们才缓存。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);// 先检查第k个桶里面有没有spanif (!_spanLists[k].Empty()){return _spanLists->PopFront();}// 检查一下后面的桶里面有没有span,如果有,可以把它进行切分for (size_t i = k + 1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){/** 切分成一个k页的span和一个n-k页的span* k页的span返回给central cache* n-k页的span挂到n-k桶中去*/Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切一个k页下来//k页的span返回//nSpan再挂到对应映射的位置kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;//把nSpan头插进去_spanLists[nSpan->_n].PushFront(nSpan);// 建立id和span的映射,方便central cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageId + i] = kSpan;}return kSpan;}}//走到这个位置就说明后面没有大页的span了//这时就去找堆要一个128页的spanSpan* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;//计算页号bigSpan->_n = NPAGES - 1;//页数_spanLists[bigSpan->_n].PushFront(bigSpan);return NewSpan(k);
}
我们的NewSpan中,我们有一个n页的span,我们把这个n页的span分为k页的和n-k页的,那我把k页的span和它的id映射了,剩下的就挂到对应的位置,没必要把所有的都缓存起来,因为如果是后面的页的话去往前找只会找整个页的最后的位置,或者前面的页合并也只会找起始页。
只要是page cache中的页,首和尾都是被记录下来的,就算是在central cache中,也是被记录下来的。
它会一直往前合,有两种情况会停下来:
1. 合并的时候前面的页正在被使用,就不能往前合并了,不往前合了,但是我们可以往后合,向后合也是类似的道理,如果向前向后都不能合了,我们才介截至。
2. 一直合,和前一块合起来的话超过了128,就不能合了,因为合并的页要挂载page cache下,超过128的话就没办法管理了。
没内存的时候我们申请内存申请的就是128,那么合的时候怎么会超过128呢?
不排除向堆申请的时候申请的两个128页的span是连着的,也有可能第一个的一半和第二个的一半合并,这样也是没有问题的,因为内存池有一个特点,申请过来的内存是不会还给系统的,本质就是接管这个进程中的内存管理,存在这里和存在进程那没啥区别,这个进程只要结束了,这些内存都会释放。
// 释放空闲的span回到page cache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{// 对span前后的页,尝试进行合并,缓解内存碎片问题// 向前合并while (1){PAGE_ID prevId = span->_pageId - 1;auto ret = _idSpanMap.find(prevId);// 前面的页号没有,不合并了if (ret == _idSpanMap.end()){break;}// 前面相邻页的span在使用,不合并了Span* prevSpan = ret->second;if (prevSpan->_isUse == true){break;}span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanLists[prevSpan->_n].Erase(prevSpan);delete prevSpan;}// 向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;auto ret = _idSpanMap.find(nextId);if (ret == _idSpanMap.end()){break;}Span* nextSpan = ret->second;if (nextSpan->_isUse == true){break;}if (nextSpan->_n + span->_n > NPAGES - 1){break;}span->_n += nextSpan->_n;_spanLists[nextSpan->_n].Erase(nextSpan);delete nextSpan;}_spanLists[span->_n].PushFront(span);span->_isUse = false;_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId + span->_n - 1] = span;
}
11. 高并发内存池 -- 大于256KB的大块内存申请问题
小于256KB我们是先找thread cache,thread cache没有在去找central cache,central cache没有再去找page cache,那这样的话我们就可以分成两种情况:
1、 <= 256KB -> 三层缓存
2、> 256KB(32页) -> 我们直接考虑去找page cache,page cache把128页的都能满足,找page cache有一个好处,好处就是比如page cache剩了一个100页的,但是你要50页的,那么它可以把这个100页切成两个50页,其中一个50页给你用,用完了还回来,它就可以把两个50页合并掉,合并后挂起来,其他线程在来用小内存或者大内存,还可以切,但是大于128页就不行了,大于128页就直接去找堆申请了。
12. 高并发内存池 -- 使用定长内存池配合脱离使用new
我们上面写的定长内存池还是有使用的场景的,像tcmalloc本身就是要去替代malloc的,要替代malloc的话,那么tcmalloc内部是不能使用malloc的。
像我们当前项目有的地方是使用了new,那么我们可以变成我们之前写的定长内存池。
13. 高并发内存池 -- 释放对象时优化为不传对象大小
我们正常的申请内存给一个内存块的大小,就可以得到一个内存块,我们要释放内存的话按理来说给个指针就可以了,而我们实现的内存池为什么要传size呢?因为只有给了size才能区分是大块内存还是小块内存,定义的就是小于256KB的就是小块内存,大于256KB(32页到128页之间)走page cache,大于128页直接找系统要,就是分成了这样几层去走的,那我必须得知道这个size,那么我们不想传size应该怎么办呢?
其实要解决这个问题有很多很多种方式,你给我每个内存块对象,我都可以算在哪一个页,一个页里面的内存不仅仅是起始地址,是这里面的任意一块内存除以8K都可以算出页号,我们知道页号的话就好办了,我们之前有一个map,这个map存储的是PAGE_ID跟Span的映射,还有一种方式就是再建立一个map去存储页号跟size的映射,这种方式也是可以的。
std::unordered_map<PAGE_ID, size_t> _idSizeMap;
专门用一个容器去存页号跟size的映射,一个页里面的内存不可能切出两种大小,要么我们可以单独用一个容器进行存储,这是一种方式。
还有一种方式就是可以再简化一下,直接处理,我有内存块的地址,那我可以用内存块的地址计算出页号,通过页号就可以找到对应的span,那我在span中增加一个东西就可以了。
所有的span都是从page cache中拿来的,然后page cache就要按size去切,在切分的时候将size直接保存起来。
static void ConCurrentFree(void* ptr)
{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objSize;if (size > MAX_BYTES){PageCache::GetInstance()->GetMtx().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->GetMtx().unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}
来一个指针,先获取到span,size就是span里面的_objSize,你是属于哪个span,那你这个span切出来都是这么大,size在大块内存里面可以区分到底是向堆要的还是向page cache要的,如果走的是三层缓存是要算在哪个桶里面,当然如果是大块内存的话这块还是要处理一下的。
static void* ConcurrentAlloc(size_t size)
{if (size > MAX_BYTES){size_t alignSize = SizeClass::RoundUp(size);size_t kpage = alignSize >> PAGE_SHIFT;PageCache::GetInstance()->GetMtx().lock();//加锁Span* span = PageCache::GetInstance()->NewSpan(kpage);span->_objSize = size;PageCache::GetInstance()->GetMtx().unlock();//解锁void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{// 通过TLS,每个线程无锁的获取自己的专属的ThreadCache对象if (pTLSThreadCache == nullptr){//pTLSThreadCache = new ThreadCache;static ObjectPool<ThreadCache> tcpool;pTLSThreadCache = tcpool.New();}// 获取线程号cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}
}
如果是小于256K的走的三层缓存,大于256K的是整了一个span,这就是为什么之前要搞一个span出来,如果找page cache是一个span,找堆,大于128页的时候也是直接VirtualAlloc或者brk搞一块内存出来,为什么要转成span呢?因为跟这块就可以结合了。统一都用span,那这个时候我们就可以把大小存在这个span当中。
注意:
STL容器中并不保证线程安全的问题,所以我们再访问unordered_map的时候要加锁。这块的问题就是申请span的时候有可能再访问这个map,再插入或者再修改,再释放的时候也有可能再修改或者再插入,有两个地方都再读这个map,一个地方就是释放内存的时候要读map,通过对象获取到页号,再通过页号获取到span,再central cache里面再释放一个对象的时候也一样给了一个小块内存也算属于哪一个span,到处都在访问,那么我们都知道,有些线程再读,有些线程在写,都是需要加锁的,那么应该怎么加锁?哪些地方需要加锁?
static void ConcurrentFree(void* ptr)
{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objSize;if (size > MAX_BYTES){PageCache::GetInstance()->GetMtx().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->GetMtx().unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}
首先MapObjectToSpan不能放到下面的锁里面,因为你放到里面其他地方还没有保证加锁,我们先看一看我们其他地方访问map有没有被加锁呢?
其实是加了锁的,page cache里面的NewSpan和ReleaseSpanToPageCache这里面都在访问map,比如说里面的查找前后的页,但是这两个函数都是加了锁的,放到central cache里面加的锁,他把桶锁解了,把page cache的大锁都给加上了,但是读没加锁,有人在写,我去读,有可能把那个节点删掉了我还在读,或者我再遍历这个哈希桶,别人在插入数据,那这个时候就有可能出现线程安全的问题,有可能出现,这就要看多线程场景下复杂的程度,但是这种东西不能留隐患,终究会爆发。
/ 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;std::unique_lock<std::mutex> lock(_pageMtx);//出了函数的作用域自动解锁auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}
所以基于这样的原因,在这里我们还是要加锁的,包括访问这块数据的时候都是要加锁的。
所以我们的MapObjectToSpan接口要放在外面的,放在里面的话就成死锁了,因为加的是同一个锁,先把page cache的锁加上,再去调map里面又要加锁,因为其他地方也在用MapObjectToSpan这个接口。
14. 高并发内存池 -- 多线程环境下对比malloc测试
本身这个项目就是一个性能的项目,本质上就要搞出再并发的场景下面,再多线程场景下面要比malloc和free要更快。
测试代码:
#define _CRT_SECURE_NO_WARNINGS 1
#include"ConcurrentAlloc.h"// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(malloc(16));v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){//v.push_back(ConcurrentAlloc(16));v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)free_costtime);printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, (unsigned int)(malloc_costtime + free_costtime));
}int main()
{size_t n = 10000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 1, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
测试结果(Debug):
测试结果(Release):
这个需要注意一个点,vs2013和vs2019、vs2022使用的库有所差异,所以我们再printf函数%u打印变量时,需要做unsigned int强转。
1. 定义atomic变量时
std::atomic<size_t> malloc_costtime = 0;
2. printf函数%u打印atomic变量时,需要做unsigned int强转
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, (unsigned int)malloc_costtime);
15. 高并发内存池 -- 性能瓶颈分析
刚才我们看到我们的性能对比malloc还是不行,那这个时候怎么性能优化呢?
这里我们要使用性能分析的工具,VS下有性能分析的工具,Linux下也有性能分析的工具。
首先我们要调成Debug模式,然后再调试中点击性能探查器。这里我把n从10000修改成了1000,如果是1000的话4个线程就是4000,本身就有点慢。
到这里我们点击检测,因为我们要检测的是函数的调用计时,不是看CPU的资源。
Deallocate占了百分之26的时间。
Deallocate里面ListTooLong占的时间最多。
ListTooLong里面的ReleaseListToSpans占用的时间最多。
ReleaseListToSpans中最大的消耗是两个,一个是里面的锁lock,再其实就是map中的锁,也消耗了很多。
这个项目的瓶颈点都再锁竞争上面。
MapObjectToSpan中find占了百分之七,但是锁占了百分之三十五。我们的这个锁的消耗其实是相当的大的。
tcmalloc源码中用基数数进行了优化,其实就算是一种哈希。存整型的话就把整型的一些位分段映射。
基数数存的id到span的映射,存的是span的指针,指针其实本质上也是一个整数,id也是一个整数,整数到整数的映射用基数数来存储会非常的合适,会比我们用map和unordered_map还会更快一些,最重要的是可以在读的时候不加锁。
16. 使用tcmalloc源码中实现基数树进行优化
均匀大小:
有时候哪个好都不好说,因为这种多线程竞争状况。有时候malloc还略快那么一点点,这个是均衡申请的,每个桶都去申请一部分。
单个大小:
数据量大了以后,map或者unordered_map使用查找消耗,第二个就是锁竞争的消耗,map越慢,锁的竞争就越激烈,数据量大了查找写入就慢,竞争就越激烈。
代码:
//pageMap.h#pragma once
#include "Common.h"//Single-level array// BITS在32位下是32-PAGE_SHIFT 64位下是64-PAGE_SHIFT
// BITS的意思就是存储页号需要多少位
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;}
};// Two-level radix tree
template <int BITS>
// 一层或者两层都适用与32位,用谁都一样,基本上没有区别,64位下就得用三层了。class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
单个大小:
均匀大小:
17. 扩展学习及当前项目实现的不足
实际中我们测试了,当前实现的并发内存池比malloc/free是更加高效的,那么我们能否替换到系统调用malloc呢?实际上是可以的。
不同平台替换方式不同。基于unix的系统上的glibc,使用了weak alies的方式替换。具体来说是因为这些入口含食宿都被定义成了weak symbols,再加上gcc支持alias attribute,所以替换就变成了这种通用的形式:
void* malloc(size_t size) THROW attribute__((alias(tc_malloc)))
因此所有malloc的调用都跳转到了tc_malloc的实现
具体参考这里:
GCC __attribute__ 之weak,alias属性_gcc weak alias-CSDN博客https://blog.csdn.net/BingoAmI/article/details/78683906有些平台不支持这样的东西,需要使用hook的钩子技术来做。
关于hook请看这里:
Hook技术 - zzfx - 博客园https://www.cnblogs.com/feng9exe/p/6015910.html
附录
几个内存池库的对比
内存优化总结:ptmalloc、tcmalloc和jemalloc_ptmalloc jemalloc tcmalloc-CSDN博客https://blog.csdn.net/junlon2006/article/details/77854898
tcmalloc源码学习
TCMalloc源码学习(一) - persistentsnail - 博客园https://www.cnblogs.com/persistentsnail/p/3442185.html
TCMALLOC源码阅读
TCMALLOC 源码阅读-CSDN博客https://blog.csdn.net/math715/article/details/80654167
如何设计内存池
如何设计内存池? - 知乎https://www.zhihu.com/question/25527491/answer/1851688957
tcmalloc源代码
线程本地存储(Thread Local Storage) - 坦坦荡荡 - 博客园如果一个变量是全局的,那么所有线程访问的是同一份,某一个线程对其修改会影响其他所有线程。如果我们需要一个变量在每个线程中都能访问,并且值在每个线程中互不影响,这就是TLS。 线程局部存储在不同平台有不同的实现,可移植性不好。线程局部存储不难实现,最简单的办法是建立一个全局表,通过当前线程ID去查询相https://cnblogs.com/liu6666/p/12729014.html