当前位置: 首页 > news >正文

高并发内存池

1. 项目介绍

该项目的原型是Google的开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代内存分配函数malloc和free

我们将tcmalloc的核心框架提取出来,模拟实现一个简单版本的高并发内存池,目的是学习tcmalloc的精华

主要的技术栈:

  • 数据结构:链表、哈希桶
  • C/C++:基础语法
  • 操作系统:内存管理、多线程
  • 设计模式:单例模式

2. 池化技术

所谓池化技术,就是一次向系统申请过量的资源,没用完的自己管理起来,以备不时之需

之所以要一次申请过量的资源,是因为向系统申请资源是有很大的开销的,频繁的向系统申请资源会降低系统的效率

如果一次申请过量的资源,下次再要时,就不需要向系统申请了,从而提高整个系统的性能

内存池、进程池、线程池、连接池等都使用了池化技术

内存池

所谓内存池,就是程序预先向操作系统申请一块大量的内存,当需要申请内存时,从内存池中获取,需要释放内存时,向内存池中释放;当程序结束时,内存池中的内存才会归还给操作系统

内存池的作用:

  • 提升了系统整体的效率
  • 减少系统的内存外碎片问题

在这里插入图片描述

上述图中,两块内存已经释放,但不连续的状况,称为内存外碎片问题

malloc

平常想从堆上获取内存,调用malloc函数

实际上,malloc底层也是采用内存池的方式,也就是说,调用malloc并不是直接从堆上获取内存,也是从malloc中的内存池获取内存

既然malloc也是采用内存池实现的,为啥我们还要重新设计自己的内存池?

malloc它适用于大部分场景,既然是通用的,就代表它的性能不会太高;接下来我们设计的内存池针对不同的场景有不同的内存池

在这里插入图片描述

3. 定长内存池

在这里插入图片描述

  • 预先向系统申请一块大内存,挂在memory下
  • 用户要申请内存时,向定长内存池获取:首先查看freelist是否为空,不为空直接从freelist取下内存块返回,否则从memory中切下内存返回给用户
  • 用户要释放内存时,定长内存池将释放的内存块挂在freelist下,以链表的方式组织起来
  • freelist中每个内存块的前4/8字节指向下一个内存块
#ifdef _WIN32
	#include <windows.h>
#elif
	// linux下的头文件
#endif

void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#elif
	// linux下申请内存的系统调用
#endif
	if (ptr == nullptr) throw std::bad_alloc();
	return ptr;
}

// 返回当前内存块的下一个内存块
void*& NextObj(void* ptr)
{
	return *(void**)ptr;
}

template<class T>
class ObjectPool
{
public:
	T* New()
	{
		T* obj = nullptr;
		if (_freeList)
		{
			void* next = NextObj(_freeList);
			obj = (T*)_freeList;
			_freeList = next;
		}
		else
		{
			// 一个内存块必须能容纳一个指针
			size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			if (_remain < objsize)
			{
				_remain = 128 * 1024;
				_memory = (char*)SystemAlloc(_remain >> 13);
			}
			obj = (T*)_memory;
			_memory += objsize;
			_remain -= objsize;
		}

		// 定位new,显示调用T的构造函数
		new(obj)T;

		return obj;
	}

	void Delete(T* ptr)
	{
		// 显示调用T的析构函数
		ptr->~T();

		NextObj(ptr) = _freeList;
		_freeList = ptr;
	}

private:
	char* _memory = nullptr;
	size_t _remain = 0;
	void* _freeList = nullptr;
};

4. 高并发内存池整体框架

大部分的应用场景都是多线程的,必然存在锁竞争的问题;tcmalloc在多线程并发场景下比malloc要略胜一筹,因此,我们模拟实现的tcmalloc主要考虑的问题有:

  • 多线程锁竞争
  • 性能
  • 内存碎片

高并发内存池主要由3个部分构成

  • ThreadCache
    • 每个线程独享一个ThreadCache,因此ThreadCache不需要加锁
    • ThreadCache中有多个freelist,每个freelist下挂着不同大小的内存块,用户申请内存最先从这里获取,释放内存也向这里释放,挂在指定的freelist下
    • 当某个freelist下内存块的个数超过某个值,将该freelist下的所有内存块归还给CentralCache
  • CentralCache
    • 当ThreadCache不能满足用户的要求,ThreadCache按需向CentralCache获取内存块
    • CentralCache在合适的时候回收ThreadCache中的对象,避免一个线程占了太多内存,达到内存分配在多个线程中更均衡的按需调度的目的
    • 所有线程共享一个CentralCache,因此需要加锁
  • PageCache
    • 该模块的每个对象是以页为单位的大块内存
    • 当CentralCache分配出去的小块内存都回收时,PageCache会回收CentralCache中的对象,同时跟自身的对象合并成更大的页,缓解内存碎片的问题
    • 多个线程共享一个PageCache,因此需要加锁

在这里插入图片描述

5. 申请内存的逻辑

ThreadCache

在这里插入图片描述

ThreadCache是一个哈希桶的结构,每个桶对应不同大小的内存块

申请内存时,先将要申请的空间大小对齐到某个数,再根据这个对齐数计算映射到哪个桶,再去对应的桶中取出内存块

对齐数与映射到的桶之间的关系:

字节数对齐数映射到的桶
[1, 128]8[0, 15]
[128 + 1, 1024]16[16, 71]
[1024 + 1, 8 * 1024]128[72, 127]
[8 * 1024 + 1, 64 * 1024]1024[128, 183]
[64 * 1024 + 1, 256 * 1024]8 * 1024[184, 207]

能向ThreadCache申请的内存最大为256KB,总共有208个桶

这样做会有一定的内存浪费,也叫做内碎片问题,比如要申请129字节,但对齐到144字节,有15字节浪费了

总体的浪费率保持在10%左右

如何根据字节数算出它对齐到的数以及映射到哪个桶?

// Communal.h

class SizeClass
{
	inline static size_t _AlignUp(size_t size, size_t align)
	{
		if (size % align == 0) return size;
		return (size / align + 1) * align;
	}

	// 计算对齐到的数
	static size_t AlignUp(size_t size)
	{
		if (size <= 128) return _AlignUp(size, 8);
		else if (size <= 1024) return _AlignUp(size, 16);
		else if (size <= 8 * 1024) return _AlignUp(size, 128);
		else if (size <= 64 * 1024) return _AlignUp(size, 1024);
		else if (size <= 256 * 1024) return _AlignUp(size, 8 * 1024);
		else
		{
			assert(false);
			return -1;
		}
	}

	inline static size_t _Index(size_t size, size_t align)
	{
		if (size % align == 0) return size / align - 1;
		return size / align;
	}

	// 计算映射到的桶的下标
	static size_t Index(size_t size)
	{
		int array[4] = { 16, 56, 56, 56 };
		if (size <= 128)
			return _Index(size, 8);
		else if (size <= 1024)
			return _Index(size - 128, 16) + array[0];
		else if (size <= 8 * 1024)
			return _Index(size - 1024, 128) + array[0] + array[1];
		else if (size <= 64 * 1024)
			return _Index(size - 8 * 1024, 1024) + array[0] + array[1] + array[2];
		else if (size <= 256 * 1024)
			return _Index(size - 64 * 1024, 8 * 1024) + array[0] + array[1] + array[2] + array[3];
		else
		{
			assert(false);
			return -1;
		}
	}
};

首先是最上层ConcurrentAlloc,如果ThreadCache的指针对象为空,则创建一个ThreadCache,向ThreadCache申请内存

void* ConcurrentAlloc(size_t size)
{
	assert(size <= MAX_SIZE); // 后续调整

	if (pThreadCache == nullptr)
		pThreadCache = new ThreadCache;
	return pThreadCache->Allocate(size);
}

ThreadCache的内存申请结构

// Communal.h

static void*& NextObj(void* ptr)
{
	return *(void**)ptr;
}

class FreeList
{
public:
	bool Empty()
	{
		return _freeList == nullptr;
	}

	void* PopFront()
	{
		void* ptr = _freeList;
		_freeList = NextObj(_freeList);
		return ptr;
	}

	void PushRange(void* start, void* end, size_t n)
	{
		NextObj(end) = _freeList;
		_freeList = start;
	}

	size_t& Factor()
	{
		return _factor;
	}

private:
	void* _freeList = nullptr;
	size_t _factor = 1; // 调节因子
};
// ThreadCache.h

class ThreadCache
{
public:
	// 从ThreadCache中获取内存块
	void* Allocate(size_t size);

	// 向CentralCache获取一定数量的小块内存
	void* FetchMemoryFromCentralCache(size_t size);

private:
	FreeList _freeLists[NFREELIST];
};

// 每个线程独有一个ThreadCache
static _declspec(thread) ThreadCache* pThreadCache = nullptr;
// ThreadCache.cpp

void* ThreadCache::Allocate(size_t size)
{
	size_t alignsize = SizeClass::AlignUp(size);
	size_t index = SizeClass::Index(size);

	if (!_freeLists[index].Empty())
		return _freeLists[index].PopFront();

	return FetchMemoryFromCentralCache(size);
}

void* ThreadCache::FetchMemoryFromCentralCache(size_t size)
{
	size_t alignsize = SizeClass::AlignUp(size);
	size_t index = SizeClass::Index(size);

	/*
	* 慢开始调节机制:
	* 一方面:小内存给多点,大内存给少点,通过NumOfMemory控制
	* 另一方面:每个FreeList中都有一个调节因子factor,随着要的次数增多,factor值越大,一次给出的内存个数越多
	* 取两者中的较小值
	* 通过这种机制,达到不至于最开始给的太多导致用不完
	*/
	int num = std::min(_freeLists[index].Factor(), SizeClass::NumOfMemory(alignsize));
	if (num == _freeLists[index].Factor())
		_freeLists[index].Factor()++;

	void* start = nullptr, * end = nullptr;
	size_t actual = CentralCache::GetSingleton()->RemoveMemory(size, start, end, num);
	if (actual == 1)
		assert(start == end);
	else
		_freeLists[index].PushRange(NextObj(start), end, actual - 1);

	return start;
}

CentralCache

在这里插入图片描述

CentralCache中每个桶都有一个SpanList的双向循环链表结构,节点是一个个的Span对象

Span对象中记录着当前Span管理的大块内存起始id、页数、分配给ThreadCache小块内存的个数等属性

由于不同的线程可能会去不同的SpanList获取内存,因此,这里使用桶锁

// Communal.h

struct Span
{
	PAGE_ID _page_id = 0;      // Span管理的内存的起始id
	size_t _n = 0;             // Span管理内存的页数
	size_t _count = 0;         // CentralCache分配出去的小块内存个数

	Span* _prev = nullptr;
	Span* _next = nullptr;

	void* _freeList = nullptr; // 自由链表
};

class SpanList
{
public:
	SpanList()
	{
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}
private:
	Span* _head = nullptr;
    
public:
	std::mutex _mtx;  // 桶锁
};

所有线程公用一个CentralCache,将CentralCache设计为单例模式,在.cpp文件中定义单例对象

// CentralCache.h

class CentralCache
{
public:
	// 获取单例对象
	static CentralCache* GetSingleton()
	{
		return &_singleton;
	}
	
	// 分配num个内存块给ThreadCache,返回实际分配内存块的个数
	size_t RemoveMemory(size_t size, void*& start, void*& end, int num);

	// 获取一个Span对象
	Span* GetOneSpan(SpanList& spanList, size_t size);

private:
	SpanList _spanLists[NFREELIST];

	static CentralCache _singleton; // 单例模式

	CentralCache() {}
	CentralCache(const CentralCache&) = delete;
};

CentralCache分配指定数量的小块内存给ThreadCache时,先找到对应的桶,在桶中查找是否有Span,其自由链表不为空,这是GetOneSpan的逻辑

拿到一个Span后,其自由链表下小块内存的个数不一定满足需要,这时就有多少给多少

整个RemoveMemory过程分为两步,1) 获取Span,2) 取出小块内存,不管是哪一步,都需要对_spanLists进行修改,因此,操作之前进行加锁,返回实际给出的小块内存的个数

// CentralCache.cpp

size_t CentralCache::RemoveMemory(size_t size, void*& start, void*& end, int num)
{
	size_t index = SizeClass::Index(size);

	_spanLists[index]._mtx.lock();

	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span);
	assert(span->_freeList);

	start = span->_freeList;
	end = start;
	int actual = 1;
	while (actual < num && NextObj(end) != nullptr)
	{
		end = NextObj(end);
		actual++;
	}
	span->_freeList = NextObj(end);
	NextObj(end) = nullptr;

	_spanLists[index]._mtx.unlock();

	return actual;
}

而在GetOneSpan中,如果没有找到满足要求的Span,则向PageCache申请Span

向PageCache申请之前,其他线程可能要对当前桶申请/释放内存,因此先将桶解锁,申请完Span后,要将新Span插入到当前桶前在进行加锁,从而提高整体的效率

获取到新Span后,根据页数计算出起始地址和结束地址,切分成小块内存,挂到指定的桶中

// CentralCache.cpp

Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
	Span* cur = spanList.Begin();
	while (cur != spanList.End())
	{
		if (!cur->_freeList) return cur;
		cur = cur->_next;
	}

	// 先将spanList的锁解开
	// 因为向PageCache NewSpan期间可能有其他线程来获取/释放内存块
	spanList._mtx.unlock();

	PageCache::GetSingleton()->_mtx.lock();

	Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));

	PageCache::GetSingleton()->_mtx.unlock();

	// 将新获得的Span对象进行小块内存的切分
	char* start = (char*)(span->_page_id << PAGE_SHIFT);
	span->_freeList = start;
	size_t total = span->_n << PAGE_SHIFT;
	char* end = start + total;
	void* prev = start;
	while (start < end - size)
	{
		start += size;
		NextObj(prev) = start;
		prev = start;
	}
	NextObj(prev) = nullptr;

	spanList._mtx.lock();

	spanList.PushFront(span);

	return span;
}

PageCache

在这里插入图片描述

PageCache中,同样也有很多桶,每个桶中都有一个SpanList,挂着多个Span,不同的是,每个Span管理的是以页为单位的大块内存;同时,PageCache使用全局锁,因为如果在当前页没找到Span,会去更大的页找,如果频繁的加锁解锁,会降低整体的效率

// PageCache.h

class PageCache
{
public:
	static PageCache* GetSingleton()
	{
		return &_singleton;
	}

	// 给CentralCache一个Span对象
	Span* NewSpan(size_t kpage);

private:
	SpanList _pageLists[NPAGE];

	static PageCache _singleton;

	PageCache(){}
	PageCache(const PageCache&) = delete;

public:
	std::mutex _mtx; // 全局锁
};

申请的页数满足: 大内存申请的页数多,小内存申请的页数少

// Communal.h

static size_t NumOfPage(size_t size)
{
    size_t num = NumOfMemory(size);
    size_t npage = num * size;
    npage >>= PAGE_SHIFT;
    if (npage == 0) npage = 1;
    return npage;
}

在NewSpan中,如果在当前页没有Span可以分配,则在更大的页中找,将大页数分割成两个小页数,如果找到128页还是没有,则向系统申请一个128页的内存

// PageCache.cpp

Span* PageCache::NewSpan(size_t kpage)
{
	assert(kpage < NPAGE);

	if (!_pageLists[kpage].Empty())
		return _pageLists[kpage].PopFront();

	for (size_t i = kpage + 1; i < NPAGE; i++)
	{
		if (_pageLists[i].Empty()) continue;

		Span* nSpan = _pageLists[i].PopFront();
		Span* kSpan = new Span;

		nSpan->_n -= kpage;
		kSpan->_n = kpage;
		kSpan->_page_id = nSpan->_page_id + nSpan->_n;

		_pageLists[nSpan->_n].PushFront(nSpan);
		return kSpan;
	}

	// 向系统申请
	void* ptr = SystemAlloc(NPAGE - 1);
	Span* bigSpan = new Span;
	bigSpan->_n = NPAGE - 1;
	bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
	_pageLists[NPAGE - 1].PushFront(bigSpan);
	
	return NewSpan(kpage);
}

单元测试

// UnitTest.cpp

#include "ConcurrentAlloc.h"

void Alloc1()
{
	for (size_t i = 0; i < 5; ++i)
	{
		void* ptr = ConcurrentAlloc(6);
	}
}

void Alloc2()
{
	for (size_t i = 0; i < 5; ++i)
	{
		void* ptr = ConcurrentAlloc(7);
	}
}

void TLSTest()
{
	std::thread t1(Alloc1);
	t1.join();

	std::thread t2(Alloc2);
	t2.join();
}

void TestConcurrentAlloc1()
{
	void* p1 = ConcurrentAlloc(6);
	void* p2 = ConcurrentAlloc(8);
	void* p3 = ConcurrentAlloc(1);
	void* p4 = ConcurrentAlloc(7);
	void* p5 = ConcurrentAlloc(8);

	std::cout << p1 << std::endl;
	std::cout << p2 << std::endl;
	std::cout << p3 << std::endl;
	std::cout << p4 << std::endl;
	std::cout << p5 << std::endl;
}

void TestConcurrentAlloc2()
{
	for (size_t i = 0; i < 1024; ++i)
	{
		void* p1 = ConcurrentAlloc(16);
		std::cout << p1 << std::endl;
	}

	void* p2 = ConcurrentAlloc(8);
	std::cout << p2 << std::endl;
}

int main()
{
	//TestConcurrentAlloc1();
	//TestConcurrentAlloc2();
	//TLSTest();

	return 0;
}

6. 释放内存的逻辑

从哪个桶申请的内存,就释放回哪个桶,释放内存时也要计算对应桶的下标

因此,释放内存时需要传递内存大小(暂时的做法)

void ConcurrentFree(void* ptr, size_t size)
{
	assert(pThreadCache);

	pThreadCache->Free(ptr, size);
}

ThreadCache

// ThreadCache.h

class ThreadCache
{
public:
	// 从ThreadCache中获取内存块
	void* Allocate(size_t size);

	// 向CentralCache获取一定数量的小块内存
	void* FetchMemoryFromCentralCache(size_t size);

	// 释放内存回ThreadCache
	void Free(void* ptr, size_t size);

	// 当FreeList中自由链表下的小块内存个数过多,归还给CentralCache
	void ListTooLong(FreeList& freeList, size_t size);

private:
	FreeList _freeLists[NFREELIST];
};

// 每个线程独有一个ThreadCache
static _declspec(thread) ThreadCache* pThreadCache = nullptr;

释放内存回ThreadCache时,直接将内存块挂到对应桶的自由链表下

为了避免一个线程独占过多内存,规定:当桶中自由链表下小块内存的个数超过了该桶的调节因子,就将该桶下的所有小块内存归还给CentralCache

void ThreadCache::Free(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_SIZE);

	size_t index = SizeClass::Index(size);

	_freeLists[index].PushFront(ptr);

	if (_freeLists[index].Size() >= _freeLists[index].Factor())
		ListTooLong(_freeLists[index], size);
}

void ThreadCache::ListTooLong(FreeList& freeList, size_t size)
{
	void* start = nullptr, * end = nullptr;
	freeList.PopRange(start, end, freeList.Size());

	CentralCache::GetSingleton()->ReleaseToCentralCache(start, size);
}

CentralCache

要想将小块内存释放回CentralCache,首先得知道当前内存是由CentralCache中哪个Span分出来的

因此,代码中需要有PAGE_ID到Span*的映射关系,因为由内存的起始地址,就能计算出PAGE_ID

PAGE_ID = 内存起始地址 / 一页的大小

使用unordered_map数据结构存储映射关系,将数据结构放到PageCache中,因为在PageCache中也需要

用到PAGE_ID到Span*的映射关系

// PageCache.h

class PageCache
{
    // ...
private:
	// ...
	std::unordered_map<PAGE_ID, Span*> _idSpanMap;
    // ...
};

在通过PAEG_ID查找Span*之前,使用PageCache中的全局锁进行加锁

// PageCache.cpp

Span* PageCache::IdToSpan(PAGE_ID id)
{
	std::unique_lock<std::mutex> lock(_mtx);

	auto it = _idSpanMap.find(id);
	if (it == _idSpanMap.end())
	{
		assert(false);
		return nullptr;
	}
	return it->second;
}

因此,PageCache在分配Span给CentralCache之前,需要先将Span管理的大块内存的所有PAEG_ID与Span*建立映射关系

// PageCache.cpp

Span* PageCache::NewSpan(size_t kpage)
{
	// ...
	for (size_t i = kpage + 1; i < NPAGE; i++)
	{
		if (_pageLists[i].Empty()) continue;

		Span* nSpan = _pageLists[i].PopFront();
		Span* kSpan = new Span;

		nSpan->_n -= kpage;
		kSpan->_n = kpage;
		kSpan->_page_id = nSpan->_page_id + nSpan->_n;

		_idSpanMap[nSpan->_page_id] = nSpan;
		_idSpanMap[nSpan->_page_id + nSpan->_n - 1] = nSpan;

		for (size_t i = 0; i < kSpan->_n; i++)
			_idSpanMap[kSpan->_page_id + i] = kSpan;

		_pageLists[nSpan->_n].PushFront(nSpan);
		return kSpan;
	}
    // ...
}

ReleaseToCentralCache()的逻辑:

先对桶进行加锁,遍历小块内存,首先找到对应的Span,将小块内存头插到Span的自由链表下,Span中的count–,如果count == 0,表示当前Span分配出去的小块内存已全部归还,这时就需要将Span归还给PageCache,归还之前,先对桶解锁,因为归还过程中可能有其他线程对当前桶进行操作,提高效率,归还给PageCache之后,再加锁,依次往复,直到遍历完小块内存,对桶解锁

// CentralCache.cpp

void CentralCache::ReleaseToCentralCache(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);

	_spanLists[index]._mtx.lock();

	while (start)
	{
		void* next = NextObj(start);

		PAGE_ID id = (PAGE_ID)start >> PAGE_SHIFT;
		Span* span = PageCache::GetSingleton()->IdToSpan(id);
		NextObj(start) = span->_freeList;
		span->_freeList = start;
		span->_count--;

		// 当前span分配给ThreadCache的小块内存已全部归还
		if (span->_count == 0)
		{
			_spanLists[index].Erase(span);
			span->_prev = span->_next = nullptr;
			span->_freeList = nullptr;

			_spanLists[index]._mtx.unlock();

			PageCache::GetSingleton()->_mtx.lock();

			PageCache::GetSingleton()->ReleaseSpanToPageCache(span);

			PageCache::GetSingleton()->_mtx.unlock();

			_spanLists[index]._mtx.lock();
		}

		start = next;
	}

	_spanLists[index]._mtx.unlock();
}

PageCache

// PageCache.h

class PageCache
{
public:
	static PageCache* GetSingleton()
	{
		return &_singleton;
	}

	// 给CentralCache一个Span对象
	Span* NewSpan(size_t kpage);

	// 查找id对应的Span
	Span* IdToSpan(PAGE_ID id);
	
	// CetralCache将Span归还给PageCache
	void ReleaseSpanToPageCache(Span* span);

private:
	SpanList _pageLists[NPAGE];

	std::unordered_map<PAGE_ID, Span*> _idSpanMap;

	static PageCache _singleton;

	PageCache(){}
	PageCache(const PageCache&) = delete;

public:
	std::mutex _mtx; // 全局锁
};

ReleaseSpanToPageCache()的逻辑:

CentralCache归还给PageCache一个Span,PageCache争取将Span与其他Span合并成一个更大的页,首先向前查找当前Span起始PAGE_ID - 1的Span,再向后查找当前Span结束PAGE_ID + 1的Span,如果遇到一下三种情况,则不合并:

  1. 要查找的Span不存在
  2. 找到的Span在被使用中(如果Span被分配给了CentralCache)
  3. 找到的Span与当前Span合并后的页数大于了128页

如何表示当前Span正在被使用?在Span中添加属性isusing,在CentralCache调用完NewSpan()后,即可将Span的isusing属性置true

// Communal.h

struct Span
{
    // ...
	bool _isusing = false;     // 当前Span是否在被使用
};
// CentralCache.cpp

Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
    // ...
	Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));
	span->_isusing = true;
    //...
}

由于需要在PageCache中查找Span,因此,挂在PageCache中的Span需要将Span的起始PAGE_ID和结束PAGE_ID添加到_idSpanMap中

// PageCache.cpp

Span* PageCache::NewSpan(size_t kpage)
{
	assert(kpage < NPAGE);

	if (!_pageLists[kpage].Empty())
	{
		Span* span = _pageLists[kpage].PopFront();
		for (size_t i = 0; i < span->_n; i++)
			_idSpanMap[span->_page_id + i] = span;
		return span;
	}

	for (size_t i = kpage + 1; i < NPAGE; i++)
	{
		if (_pageLists[i].Empty()) continue;

		Span* nSpan = _pageLists[i].PopFront();
		Span* kSpan = new Span;

		nSpan->_n -= kpage;
		kSpan->_n = kpage;
		kSpan->_page_id = nSpan->_page_id + nSpan->_n;

		_idSpanMap[nSpan->_page_id] = nSpan;
		_idSpanMap[nSpan->_page_id + nSpan->_n - 1] = nSpan;

		for (size_t i = 0; i < kSpan->_n; i++)
			_idSpanMap[kSpan->_page_id + i] = kSpan;

		_pageLists[nSpan->_n].PushFront(nSpan);
		return kSpan;
	}

	// 向系统申请
	void* ptr = SystemAlloc(NPAGE - 1);
	Span* bigSpan = new Span;
	bigSpan->_n = NPAGE - 1;
	bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
	_pageLists[NPAGE - 1].PushFront(bigSpan);
	
	return NewSpan(kpage);
}

合并完成之后,记得将原来的Span销毁,并将新Span的起始PAGE_ID和结束PAGE_ID添加到_idSpanMap中,Span的使用状态置为false,并加到PageCache中

// PageCache.cpp

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	// 向前合并
	while (1)
	{
		PAGE_ID prev = span->_page_id - 1;
		auto it = _idSpanMap.find(prev);
		if (it == _idSpanMap.end()) break;

		Span* prevSpan = it->second;
		if (prevSpan->_isusing == true) break;
		if (prevSpan->_n + span->_n > NPAGE - 1) break;

		_pageLists[prevSpan->_n].Erase(prevSpan);
		span->_page_id = prevSpan->_page_id;
		span->_n += prevSpan->_n;

		delete prevSpan;
	}

	// 向后合并
	while (1)
	{
		PAGE_ID next = span->_page_id + span->_n;
		auto it = _idSpanMap.find(next);
		if (it == _idSpanMap.end()) break;

		Span* nextSpan = it->second;
		if (nextSpan == nullptr) break;
		if (nextSpan->_isusing == true) break;
		if (nextSpan->_n + span->_n > NPAGE - 1) break;

		_pageLists[nextSpan->_n].Erase(nextSpan);
		span->_n += nextSpan->_n;

		delete nextSpan;
	}

	_idSpanMap[span->_page_id] = span;
	_idSpanMap[span->_page_id + span->_n - 1] = span;
	span->_isusing = false;
	_pageLists[span->_n].PushFront(span);
}

单元测试

// UnitTest.cpp

void MultiThreadAlloc1()
{
	std::vector<void*> v;
	for (size_t i = 0; i < 1000; ++i)
	{
		void* ptr = ConcurrentAlloc(6);
		v.push_back(ptr);
	}

	for (auto e : v)
	{
		ConcurrentFree(e, 6);
	}
}

void MultiThreadAlloc2()
{
	std::vector<void*> v;
	for (size_t i = 0; i < 1000; ++i)
	{
		void* ptr = ConcurrentAlloc(16);
		v.push_back(ptr);
	}

	for (auto e : v)
	{
		ConcurrentFree(e, 16);
	}
}

void TestMultiThread()
{
	std::thread t1(MultiThreadAlloc1);
	std::thread t2(MultiThreadAlloc2);

	t1.join();
	t2.join();
}

int main()
{
	TestMultiThread();

	return 0;
}

7. 代码测试

// BenchMark.cpp

#include"ConcurrentAlloc.h"

// ntimes 一轮申请和释放内存的次数
// nworks 线程的个数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime.load());

	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime.load());

	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}

// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i], (16 + i) % 8192 + 1);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime.load());

	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime.load());

	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}

int main()
{
	size_t n = 10000;
	BenchmarkConcurrentMalloc(n, 4, 10);

	std::cout << std::endl;

	//BenchmarkMalloc(n, 4, 10);

	return 0;
}

在这里插入图片描述

8. 细节优化

超过32页的内存的申请与释放

如果要申请和释放的内存超过32页,则直接向PageCache申请和释放

而在PageCache中,如果要申请和释放的内存超过128页,则直接向系统申请和释放

// ConcurrentAlloc.h

void* ConcurrentAlloc(size_t size)
{
	// 如果size > 32页,直接找PageCache申请
	if (size > MAX_SIZE)
	{
		size_t alignsize = SizeClass::AlignUp(size);
		size_t kpage = SizeClass::NumOfPage(alignsize);

		PageCache::GetSingleton()->_mtx.lock();
		
		Span* span = PageCache::GetSingleton()->NewSpan(kpage);

		PageCache::GetSingleton()->_mtx.unlock();

		return (void*)(span->_page_id << PAGE_SHIFT);
	}

	assert(size <= MAX_SIZE);

	if (pThreadCache == nullptr)
		pThreadCache = new ThreadCache;
	return pThreadCache->Allocate(size);
}
// PageCache.cpp

Span* PageCache::NewSpan(size_t kpage)
{
	// kpage > 128,直接向系统申请
	if (kpage > NPAGE - 1)
	{
		void* ptr = SystemAlloc(kpage);
		Span* span = new Span;
		span->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = kpage;
		_idSpanMap[span->_page_id] = span;
		return span;
	}
    
    // ...
}
// ConcurrentAlloc.h

void ConcurrentFree(void* ptr, size_t size)
{
	// 如果size > 32页,直接释放给PageCache
	if (size > MAX_SIZE)
	{
		PAGE_ID id = (PAGE_ID)ptr >> PAGE_SHIFT;
		Span* span = PageCache::GetSingleton()->IdToSpan(id);

		PageCache::GetSingleton()->_mtx.lock();

		PageCache::GetSingleton()->ReleaseSpanToPageCache(span);

		PageCache::GetSingleton()->_mtx.unlock();
	}
	else
	{
		assert(pThreadCache);

		pThreadCache->Free(ptr, size);
	}
}
// PageCache.cpp

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	if (span->_n > NPAGE - 1)
	{
		void* ptr = (void*)(span->_page_id >> PAGE_SHIFT);
		SystemFree(ptr);
		delete span;
		return;
	}
    // ...
}

释放无需指定内存大小

在Span中添加一个_objSize属性,当CentralCache向PageCache后去到新Span时,更新该属性值

// Communal.h

struct Span
{
    // ...
	size_t _objSize = 0;
};
// CentralCache.cpp

Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
    // ...
	Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));
	span->_isusing = true;
	span->_objSize = size;
    // ...
}
// ConcurrentAlloc.h

void* ConcurrentAlloc(size_t size)
{
	// 如果size > 32页,直接找PageCache申请
	if (size > MAX_SIZE)
	{
		size_t alignsize = SizeClass::AlignUp(size);
		size_t kpage = SizeClass::NumOfPage(alignsize);

		PageCache::GetSingleton()->_mtx.lock();
		
		Span* span = PageCache::GetSingleton()->NewSpan(kpage);
		span->_objSize = alignsize;

		PageCache::GetSingleton()->_mtx.unlock();

		return (void*)(span->_page_id << PAGE_SHIFT);
	}

	assert(size <= MAX_SIZE);

	if (pThreadCache == nullptr)
		pThreadCache = new ThreadCache;
	return pThreadCache->Allocate(size);
}

void ConcurrentFree(void* ptr)
{
	PAGE_ID id = (PAGE_ID)ptr >> PAGE_SHIFT;
	Span* span = PageCache::GetSingleton()->IdToSpan(id);
	size_t size = span->_objSize;

	// 如果size > 32页,直接释放给PageCache
	if (size > MAX_SIZE)
	{
		PageCache::GetSingleton()->_mtx.lock();

		PageCache::GetSingleton()->ReleaseSpanToPageCache(span);

		PageCache::GetSingleton()->_mtx.unlock();
	}
	else
	{
		assert(pThreadCache);

		pThreadCache->Free(ptr, size);
	}
}

接入定长内存池

将项目中所有使用new和delete的地方改成向定长内存池申请和销毁

class PageCache
{
    // ...
	ObjectPool<Span> _spanPool;
    // ...
};

使用_spanPool修改项目中所有使用new和delete的地方

void* ConcurrentAlloc(size_t size)
{
    // ...
	static ObjectPool<ThreadCache> pool;
	if (pThreadCache == nullptr)
		pThreadCache = pool.New();
    // ...
}

基数树优化

结果多轮测试,发现我们的高并发内存池总体效率是要比malloc和free要低的

实际上,每次查找PAGE_ID对应的Span时,都需要保证线程安全,频繁的加锁解锁会降低整体的效率,我们的高并发内存池效率低下主要是这个原因

可以使用基数树存储PAGE_ID到Span的映射关系

其本质就是利用数组的下标表示PAGE_ID,数组的内容表示Span,这样查找映射时,既不需要加锁解锁,又能在O(1)的时间内找到

在x86的环境下,系统总内存为232,一页的大小为213,总的页数为2^32 / 2^13 = 219,也就是说总共有219个PAGE_ID

// PageMap.h

template<size_t BITS>
class PageMap
{
public:
	PageMap()
	{
		size_t size = sizeof(void*) * _length;
		size_t alignSize = SizeClass::AlignUp(size);
		_array = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(_array, 0, alignSize);
	}

	void set(PAGE_ID id, void* ptr)
	{
		_array[id] = ptr;
	}

	void* get(PAGE_ID id)
	{
		if ((id >> BITS) > 0) return nullptr;
		return _array[id];
	}

private:
	size_t _length = 1 << BITS;
	void** _array = nullptr;
};
class PageCache
{
    // ...
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	PageMap<32 - PAGE_SHIFT> _idSpanMap;
    // ...
};

然后将项目中_idSpanMap替换成基数树中的成员函数即可

// PageCache.cpp

Span* PageCache::NewSpan(size_t kpage)
{
	// kpage > 128,直接向系统申请
	if (kpage > NPAGE - 1)
	{
		void* ptr = SystemAlloc(kpage);
		Span* span = _spanPool.New();
		span->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = kpage;
		_idSpanMap.set(span->_page_id, span);
		return span;
	}

	assert(kpage < NPAGE);

	if (!_pageLists[kpage].Empty())
	{
		Span* span = _pageLists[kpage].PopFront();
		for (size_t i = 0; i < span->_n; i++)
			_idSpanMap.set(span->_page_id + i, span);
		return span;
	}

	for (size_t i = kpage + 1; i < NPAGE; i++)
	{
		if (_pageLists[i].Empty()) continue;

		Span* nSpan = _pageLists[i].PopFront();
		Span* kSpan = _spanPool.New();

		nSpan->_n -= kpage;
		kSpan->_n = kpage;
		kSpan->_page_id = nSpan->_page_id + nSpan->_n;

		_idSpanMap.set(nSpan->_page_id, nSpan);
		_idSpanMap.set(nSpan->_page_id + nSpan->_n - 1, nSpan);

		for (size_t i = 0; i < kSpan->_n; i++)
			_idSpanMap.set(kSpan->_page_id + i, kSpan);

		_pageLists[nSpan->_n].PushFront(nSpan);
		return kSpan;
	}

	// 向系统申请
	void* ptr = SystemAlloc(NPAGE - 1);
	Span* bigSpan = _spanPool.New();
	bigSpan->_n = NPAGE - 1;
	bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;
	_pageLists[NPAGE - 1].PushFront(bigSpan);
	
	return NewSpan(kpage);
}

Span* PageCache::IdToSpan(PAGE_ID id)
{
	Span* span = (Span*)_idSpanMap.get(id);
	return span;
}

void PageCache::ReleaseSpanToPageCache(Span* span)
{
	if (span->_n > NPAGE - 1)
	{
		void* ptr = (void*)(span->_page_id >> PAGE_SHIFT);
		SystemFree(ptr);
		_spanPool.Delete(span);
		return;
	}

	// 向前合并
	while (1)
	{
		PAGE_ID prev = span->_page_id - 1;

		Span* prevSpan = (Span*)_idSpanMap.get(prev);
		if(prevSpan == nullptr) break;
		if (prevSpan->_isusing == true) break;
		if (prevSpan->_n + span->_n > NPAGE - 1) break;

		_pageLists[prevSpan->_n].Erase(prevSpan);
		span->_page_id = prevSpan->_page_id;
		span->_n += prevSpan->_n;

		_spanPool.Delete(prevSpan);

	}

	// 向后合并
	while (1)
	{
		PAGE_ID next = span->_page_id + span->_n;

		Span* nextSpan = (Span*)_idSpanMap.get(next);
		if (nextSpan == nullptr) break;
		if (nextSpan->_isusing == true) break;
		if (nextSpan->_n + span->_n > NPAGE - 1) break;

		_pageLists[nextSpan->_n].Erase(nextSpan);
		span->_n += nextSpan->_n;

		_spanPool.Delete(nextSpan);
	}

	_idSpanMap.set(span->_page_id, span);
	_idSpanMap.set(span->_page_id + span->_n - 1, span);

	span->_isusing = false;
	span->_objSize = 0;
	_pageLists[span->_n].PushFront(span);
}

在这里插入图片描述

结果多轮测试,我们的高并发内存池要比malloc和free高效很多

项目位置:高并发内存池

相关文章:

  • visual studio 中导入 benchmark
  • TouchSocket TcpService:构建高性能Tcp服务的终极利器
  • 某网关管理软件 9-12ping.php 命令执行漏洞(CVE-2025-1448)
  • YOLOv5
  • 基于 Milvus 和 BiomedBERT 的医学文献智能搜索系统
  • 如何通过iPaaS集成平台快速配置协议接口
  • vscode 源代码管理
  • Windows系统本地部署OpenManus对接Ollama调用本地AI大模型
  • Day 3
  • 快速入手-基于Django的主子表间操作mysql(五)
  • 学习111
  • VL开源模型实现文本生成图片
  • Python第六章06:列表的循环练习
  • 《白帽子讲 Web 安全》之开发语言安全深度解读
  • 17153. 班级活动(蓝桥杯-python)
  • CAN FD、传统CAN以及RS-485通信介绍
  • <C#> 详细介绍.net 三种依赖注入:AddTransient、AddScoped、AddSingleton 的区别
  • 如何在 Vue 项目中实现动态组件加载,有什么应用场景?
  • 又双叒叕Scrapy爬虫相关的面试题及详细解答
  • C++11 引入了的新特性与实例说明
  • 车展之战:国产狂飙、外资反扑、智驾变辅助
  • 建设银行南昌分行引金融“活水”,精准灌溉乡村沃土
  • 广东省副省长刘红兵跨省调任湖南省委常委、宣传部长
  • 王沪宁主持召开全国政协主席会议
  • 李在明涉嫌违反《公职选举法》案将于5月1日宣判
  • 《中国奇谭》首部动画电影《浪浪山小妖怪》定档8月2日