首页 > 系统相关 >高并发内存池

高并发内存池

时间:2024-09-19 17:53:12浏览次数:9  
标签:span void cache 并发 内存 我们 size

个人主页:Lei宝啊 

愿所有美好如期而遇


一、项目介绍

这个项目做的是什么?我们要实现一个高并发的内存池,当然,我们这个项目是参考google的一个开源项目tcmalloc而实现的一个迷你版的高并发内存池,实现高效的多线程内存管理,用于代替系统提供的malloc和free。



 二、内存池

1. 池化技术

池化技术就是一次性向系统申请过量的资源,然后自己管理。为什么要一次性申请过量的资源?其他人会告诉你是因为每一次申请资源都有较大开销,但是这些开销体现在哪里呢?

  1. 系统调用开销:在实际应用过程中,无论是申请内存,创建进程,线程等,都需要系统调用,而系统调用需要程序从用户态切换到内核态,调用完成后还要切换回去,这个过程是非常耗时的。每次资源的申请和释放都伴随着这样的过程,所以频繁的系统调用会大大增加系统的开销。
  2. 资源初始化和清理开销:对于数据库连接和线程等,他们的创建不仅仅是内存的申请和释放。在创建时,对数据库连接来说需要建立网络连接,对线程来说,需要分配一系列资源。这些操作对于系统来说也有一定的消耗。
  3. 资源竞争与同步开销:在多线程和多进程场景中,资源的分配和释放可能会引起竞争,需要采用同步机制来确保数据的一致性和线程安全,这些同步机制(信号量,锁)本身也会带来一定的开销,包括等待时间,硬件上下文切换等。
  4. 内存碎片及效率:频繁地申请和释放小块内存会导致内存碎片化,降低内存的利用率和访问效率(稍后我们后面会细说)。

在计算机中,有很多地方都采用了池化技术,除了内存池,还有对象池,线程池,连接池,进程池等。以线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。 

2. 内存池

内存池是指预先向系统申请一块较大的内存,此后,当程序需要申请内存时,不直接向系统申请,而是从内存池中获取;同理,释放内存时,也不交还给操作系统,而是交还给内存池,当程序退出时,内存池才会真正释放之前申请的内存。

3. 内存池解决的问题

内存池主要解决的是效率问题,以及内存碎片问题,什么是内存碎片?内存碎片分为内碎片和外碎片,这里我们先介绍外碎片,内碎片我们后面结合代码讲解。

此时如果我们再想申请超过256字节的内存,就申请不下来了,因为内存已经碎片化了,不连续。

官方点讲就是:外部碎片是⼀些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求。

4. malloc

C/C++都是通过malloc去动态申请内存的(C++调用的new底层还会调用operator new, 它里面会调用malloc),但是我们要知道,实际上我们不是直接去堆申请内存的。

事实上malloc就是一个内存池。我们平时向系统申请20字节,他真的只申请20字节吗?不是的,他会申请更多,下次申请时如果足够就直接用,如果不够再继续向OS申请。

malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的⼀套,linux gcc用的glibc中的ptmalloc。

三、定长内存池

1. 解释和实现

作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是⼀个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很⾼的性能。我们要实现的高并发内存池在多线程内存管理上要高效很多,接下来我们要实现的定长内存池将会作为他的一个基础组件。

_memory指向内存池开辟的空间,_freeList指向释放归还给内存池的空间,_remainCapacity指向_memory剩余的空间。

现在我们假设程序要调用New()向内存池申请空间,此时内存池是没有空间的,所以要开辟一段空间:_memory = (char*)malloc(128 * 1024); 那么_memory为什么是char*类型的呢?因为这个定长内存池我们想要设计成模版类,能够适应各种对象,如果_memory不设计成char*,那么对象申请空间后,_memory向后走时,一次移动的大小就不是一个字节,也就无法适应类似于char这样的对象或者自定义类型对象。那么开辟大小为什么是128 * 1024呢?这个我们后面会讲述。

_memory申请空间后,我们要判断他是否申请空间成功,如果申请失败,我们就抛异常:throw std::bad_alloc(); 申请成功后,就可以设置_remainCapacity的大小了。

于是我们顺利成章,定义一个T* obj作为返回值,obj = _memory, _memory += sizeof(T), _remainCapacity -= sizeof(T), 最后将obj返回。

    			
			_memory = (char*)malloc(128 * 1024);
			if (_memory == nullptr)
			{
				throw std::bad_alloc();
			}
			_remainCapacity = 128 * 1024;

			obj = (T*)_memory;
			_memory += sizeof(T);
			_remainCapacity -= sizeof(T);

那么_memory什么时候会去申请空间呢?_remainCapcity小于sizeof(T)时,申请空间,假如说,分配空间到最后几个字节,这几个字节不足以分配对象,那么这几个字节就丢弃掉不去使用,_memory再去申请。

			if (_remainCapacity < sizeof(T)) 
			{
				_memory = (char*)malloc(128 * 1024);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
				_remainCapacity = 128 * 1024;
			}

			obj = (T*)_memory;
			_memory += sizeof(T);
			_remainCapacity -= sizeof(T);

那么有New也就有Delete,我们如何释放对象呢?

obj指向的空间现在要释放,我们就可以利用他头上的几个字节去存放地址,指向被释放的对象空间,但是对象类型是什么我们不知道,如果对象大小为char或者short等,存不下一个指针的大小怎么办?这就要修改我们的New方法。

		obj = (T*)_memory;
		int objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
		_memory += objSize;
		_remainCapacity -= objSize;

接下来我们要将释放的对象空间挂在_freeList链表上,并且我们采用头插,效率比尾插更高。

现在问题来了,头插代码怎么写?*(int*) = nullptr ?是这样吗,取他的头四个字节?可是你怎么知道这是32位平台还是64位平台,指针大小是不一样的,所以我们if判断指针大小去决定使用int*或者longlong*吗?可以,但是我们还有更好的方法。

*(void**)obj = nullptr; 将obj强转为void**,意思就是obj现在是一个指向void*的指针,解引用obj就能够操控void*大小的空间,对他进行修改,而不同平台指针大小的问题就不需要我们去担心了。 

现在我们来谈Delete也就简单多了。

		*(void**)obj = _freeList;
		_freeList = obj;

现在,不仅仅是_memory可以分配对象空间,_freeList也可以,所以,我们之前的New逻辑可以进行更新了。

当_freeList有空间时,从头上去取空间,其实类似于头删。 

		if (_freeList)
		{
			void* next = *(void**)_freeList;
			obj = (T*)_freeList;
			_freeList = next;
		}

所以,我们也就可以写出整体代码了,至此,定长内存池基本完成:

#pragma once

#include <iostream>
using std::cout;
using std::endl;

template<class T>
class ObjectPool
{
private:
	char* _memory;       // 内存池大小,char*为分配内存时,加减合理
	void* _freeList;     // 还回来的内存,构建成链表
	int _remainCapacity; // 内存池的剩余大小

public:
	ObjectPool()
		:_memory(nullptr)
		,_freeList(nullptr)
		,_remainCapacity(0)
	{}

	T* New()
	{
		T* obj = nullptr;

		if (_freeList)
		{
			void* next = *(void**)_freeList;
			obj = (T*)_freeList;
			_freeList = next;
		}
		else
		{
            //剩余空间不足以分配一个对象,且链表没有空间
			if (_remainCapacity < sizeof(T)) 
			{
				_memory = (char*)malloc(128 * 1024);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
				_remainCapacity = 128 * 1024;
			}

			obj = (T*)_memory;
			int objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			_memory += objSize;
			_remainCapacity -= objSize;
		}

		new(obj)T;
		return obj;
	}

	void Delete(T* obj)
	{
		obj->~T();

		//将要归还的空间
		*(void**)obj = _freeList;
		_freeList = obj;
	}
};



对于new(obj)T,叫做定位new,是在已申请的原始内存空间中调用对象的构造函数初始化对象。关于obj->~T(); 显式调用析构函数,并不是释放内存池的空间,而是去释放对象内部逻辑可能去自己申请的空间。

另外要修改的是,我们这里申请空间不使用malloc,因为他也是一个内存池,用于处理各种复杂场景,而我们这里只是需要单纯的申请空间,于是我们可以使用系统调用去申请空间,在Windows操作系统上,这个系统调用叫:VirtualAlloc

LPVOID VirtualAlloc(  
  LPVOID lpAddress,    // 要分配的内存区域的地址  
  SIZE_T dwSize,       // 分配的大小(以字节为单位)  
  DWORD flAllocationType, // 分配的类型  
  DWORD flProtect        // 该内存的初始保护属性  
);

inline static void* SystemAlloc(size_t kpage)
{

#ifdef _WIN32
		void* ptr = VirtualAlloc(0, kpage * (1 << 10), \
                            MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
		// linux下brk mmap等
#endif
		if (ptr == nullptr)  throw std::bad_alloc();
	    return ptr;
}

_WIN32是一个在Windows编程中经常遇到的宏定义,它主要用于判断当前的编译环境是否为Windows 32位环境。_WIN32是VC(Visual C++)编译器在编译Windows程序时自动定义的一个宏。这意味着,只要在使用VC编译器编写Windows程序,无论目标平台是32位还是64位_WIN32宏通常都会被定义。但是,需要注意的是,在64位Windows系统上编译64位程序时,虽然_WIN32会被定义,但通常还会定义_WIN64宏来明确指示64位环境。

_memory = (char*)SystemAlloc(128);

2. 性能测试

#include "FixedLenMemPool.h"

#include <vector>
#include <time.h>

struct TreeNode
{
    int _val;
    TreeNode* _left;
    TreeNode* _right;
    TreeNode()
        :_val(0)
        ,_left(nullptr)
        ,_right(nullptr)
    {}
};

void TestObjectPool()
{
    // 申请释放的轮次
    const size_t Rounds = 3;
    // 每轮申请释放多少次
    const size_t N = 1000000;

    std::vector<TreeNode*> v1;
    v1.reserve(N);

    size_t begin1 = clock();
    for (size_t j = 0; j < Rounds; ++j)
    {
        for (int i = 0; i < N; ++i)
        {
            v1.push_back(new TreeNode);
        }

        for (int i = 0; i < N; ++i)
        {
            delete v1[i];
        }
        v1.clear();
    }
    size_t end1 = clock();

    std::vector<TreeNode*> v2;
    v2.reserve(N);
    ObjectPool<TreeNode> TNPool;

    size_t begin2 = clock();
    for (size_t j = 0; j < Rounds; ++j)
    {
        for (int i = 0; i < N; ++i)
        {
            v2.push_back(TNPool.New());
        }

        for (int i = 0; i < N; ++i)
        {
            TNPool.Delete(v2[i]);
        }

        v2.clear();
    }
    size_t end2 = clock();

    cout << "new cost time:" << end1 - begin1 << endl;
    cout << "object pool cost time:" << end2 - begin2 << endl;
}

int main()
{
    TestObjectPool(); 
    return 0;
}

四、高并发内存池整体框架设计

现代开发环境大多都是多线程多进程,在申请内存的场景下,因为内存时临界资源,必然存在激烈的锁竞争问题,malloc本身其实已经足够优秀,但是我们的项目圆形tcmalloc就是在多线程高并发的场景更胜一筹,所以我们设计的内存池需要考虑下面几个问题:

  1. 性能问题
  2. 多线程环境下,锁竞争问题
  3. 内存碎片问题

高并发内存池主要由以下三个部分组成:

thread cache:线程缓存,每个线程独有一个thread cache,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,因为每个线程独有一个cache,这也是这个并发线程池高明的地方。

central cache:中⼼缓存是所有线程所共享,thread cache是按需从central cache中获取对象。当thread cache空间不足,就会从central cache中获取对象,central cache会在合适的时机回收thread cache中的对象,避免⼀个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的,central cache是存在竞争的,所以从这里取内存创建对象是需要加锁的,但是这里用的是桶锁,再一个只有thread cache没有内存创建对象时才会找central cache,所以这里竞争不会很激烈。

page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分 配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。page cache如果要再申请内存,就是从系统申请了。

当⼀个span {span通常指的是一段连续的内存区域, 可以被划分为多个更小的单元(如页或对象)以供分配} 的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。 

五、thread cache

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的的内存块对象的自由链表,不同桶的自由链表所指向的空间大小是不一样的(定长内存池办不到),也就是说,不同的对象可以向thread cache申请空间,并且,每个线程都有自己的thread cache,因此,在线程向各自的thread cache申请内存对象以及释放时,不会有锁的竞争。

我们首先创建ThreadCache类:

class ThreadCache
{
private:

public:
	// 申请和释放内存对象
	void* Allocate(size_t size);
	void Deallocate(void* ptr, size_t size);

	//从中心缓存获取内存对象
	void* FetchFromCentralCache(size_t index, size_t size);

};

类属性我们怎么设计呢?根据我们上面的分析,我们需要一个哈希桶,并且每个桶的位置挂上一个自由链表,那么现在,我们可以先设计出自由链表类(有了上面定长内存池的铺垫,这个应该很好理解):

class FreeList
{
private:
	void* _freelist;

	void*& nextobj(void* cur)
	{
		return *(void**)cur;
	}

public:

	FreeList() :_freelist(nullptr)
	{}

	void push(void* obj)
	{
		nextobj(obj) = _freelist;
		_freelist = obj;
	}

	void* pop()
	{
		void* obj = _freelist; 
		_freelist = nextobj(_freelist);;

		return obj;
	}

	bool Empty()
	{
		return _freelist == nullptr;
	}
};

我们哈希桶的设计打算使用数组,使用数组下标来映射桶的位置:

FreeList _freelist[MaxFreeListSize]; //MaxFreeListSize后面解释

到这里,我们其实可以发现一个问题,我们的自由链表中没有malloc空间之类的代码,这和我们的逻辑有关,我们设计的逻辑是,如果线程向thread cache申请内存对象,如果thread cache对应的桶的自由链表不为空,可以为他分配对象,那么就分配,否则,向central cache去申请内存对象,申请的内存对象释放时,不返回central cache,而是挂在thread cache的自由链表中,如果这个自由链表挂载的空闲内存对象过多,central cache中我们会回收这些内存对象。

我们上面提过,thread cache可以允许不同的对象来申请内存,我们的Allocate方法提供了size_t size参数,也就是要申请内存对象的大小,我们如何根据这个size大小来为其分配内存呢?是他要多少我们给多少吗?有上面定长内存池的铺垫,我们也知道,这是不行的,所以我们需要定制一个策略:

首先,我们设计的这个thread cache最大可申请内存对象大小为256KB,也就是256 * 1024字节,那么要申请内存的对象大小可能是1 ~ 256KB,所以,我们要有这么多个自由链表吗?当然不行,那么,按照这样的策略呢:申请1~8字节,都分配8字节,申请9~16字节,都分配16字节,以此类推,可行吗?按照这样的策略,自由链表的数量也达到了32KB个,这仍然不是我们所希望的,所以,我们来看看大神是如何设计的:

	// [1,128]                8byte对⻬       freelist[0,16)
	// [128+1,1024]           16byte对⻬      freelist[16,72)
	// [1024+1,8*1024]        128byte对⻬     freelist[72,128)
	// [8*1024+1,64*1024]     1024byte对⻬    freelist[128,184)
	// [64*1024+1,256*1024]   8*1024byte对⻬  freelist[184,208)

什么意思呢?我们解释第一行:意思就是,如果申请的内存对象大小在[1,128]这个区间内,分配内存的大小按照8的倍数对齐,每一个8的区间,对应一个自由链表,举个例子:

如果说申请内存对象大小为1字节,按照8对齐,分配8字节,对应自由链表的下标就是0号
如果说申请内存对象大小为9字节,按照8对齐,分配16字节,对应自由链表的下标就是1号

这样,根据我们上面的规则,最后的自由链表个数仅为208个(这就是MaxFreeListSize的由来),而且更妙的地方在于,这样设计,内碎片的浪费仅在10%左右:

我们不以申请1字节为例,我们是以整体情况而言,我们以129为例来计算:

如果申请129字节,那么会分配144字节的空间,有15字节的浪费,15 / 144 * 100% = 10.4%

如果申请1025字节,那么会分配1152字节,有127字节的浪费,127 / 1152 * 100% = 11%

那么我们的内存对象大小分配代码如何书写呢? 

	static size_tRoundUp(size_tsize)
	{
		if (size <= 128)
		{
			return _RoundUp(size, 8);
		}
		else if (size <= 1024)
		{
			return _RoundUp(size, 16);
		}
		else if (size <= 8 * 1024)
		{
			return _RoundUp(size, 128);
		}
		else if (size <= 64 * 1024)
		{
			return _RoundUp(size, 1024);
		}
		else if (size <= 256 * 1024)
		{
			return _RoundUp(size, 8 * 1024);
		}
		else
		{
			assert(false);
		}

		return -1;
	}

问题在于_RoundUp的设计,像博主这种普通人会怎么设计呢?

size_t _RoundUp(size_t size, size_t align)
{
    if(size % align == 0)
    {
        return size;
    }
    else
    {
        return (size / align + 1) * align;
    }
}

像这种频繁调用的函数,博主这种设计说实话不好,加减乘除还是比较吃资源,计算还是位运算更加高效,我们来看看大神的代码:

    static size_t _RoundUp(size_t size, size_t align)
	{
        //对于频繁调用的计算函数,位运算更高效
		return ((size + align - 1) & ~(align - 1)); 
	}

还是举个例子来解释:

如果我们申请的内存对象大小为1字节,size == 1, align == 8:

(size + 7) & ~(7)             我们只写出2进制中的前8位举例

00001000 & ~(00000111) = 8


如果我们申请的内存对象大小为9字节,size == 9, align == 8:

(size + 7) & ~(7)

00010000 & 11111000 = 16

现在,得到要申请内存对象,我们可以给他分配一个合适的内存大小了,与此同时,又产生一个问题:如何确定这个内存对象要从哪个自由链表去申请?

这里我们直接给出大神的代码:

	static size_t _Index(size_t size, size_t align)
	{
		return ((size + (1 << align) - 1) >> align) - 1;
	}

	static size_t Index(size_t size)
	{
		assert(size <= MaxSize);

		int group_size[4] = { 16, 72, 128, 184 };
		
		if (size <= 128)
		{
			return _Index(size, 3);
		}
		else if (size <= 1024)
		{
			return _Index(size - 128, 4) + group_size[0];
		}
		else if (size <= 8 * 1024)
		{
			return _Index(size - 1024, 7) + group_size[1];
		}
		else if (size <= 64 * 1024)
		{
			return _Index(size - 8 * 1024, 10) + group_size[2];
		}
		else if (size <= 256 * 1024)
		{
			return _Index(size - 64 * 1024, 13) + group_size[3];
		}
		else
		{
			assert(false);
		}

		return -1;
	}

于是,我们可以完善我们的Allocate方法和Deallocate方法:

void* ThreadCache::Allocate(size_t size)
{
	size_t realcapacity = Caculate::RoundUp(size);
	size_t index = Caculate::Index(size);

	if (!_freelist[index].Empty())
	{
		return _freelist[index].pop();
	}
	else
	{
		return FetchFromCentralCache(index, realcapacity);
	}

	return nullptr;
}

else里的情况就是映射的桶的位置处的自由链表为空,于是向central cache去申请内存对象。

void ThreadCache::Deallocate(void* ptr, size_t size)
{
	size_t index = Caculate::Index(size);
	_freelist[index].push(ptr);
}

//我们后面完善
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
	return nullptr;
}

现在,我们回到每个线程都有一个独立的thread cache,线程是如何拥有独立thread cache呢?什么时候创建了thread cache?

我们知道,线程之间全局变量是共享的,一个线程创建thead cache,其他线程也是可以看得见并使用的,我们如何使得一个线程创建的thread cache只有他自己看得见并使用呢?这就涉及到线程局部存储技术:TLS。

在C++中,TLS(Thread Local Storage,线程局部存储)是一种机制,它允许每个线程拥有自己的变量副本,从而避免多线程访问共享变量时可能发生的竞争条件和数据不一致问题。TLS 变量对于每个线程都是唯一的,确保了数据的安全性和隔离性。

在C++中,实现TLS有几种方式,包括使用编译器特定的扩展、标准库支持(如C++11及以后的线程支持库)以及操作系统提供的API。

1. 使用C++11及以后版本的thread_local关键字

thread_local int tls_counter = 0;


2. 使用编译器特定的TLS扩展(这些是非标准的)

// GCC/Clang  
__thread int tls_counter = 0;  
  
// MSVC  
// __declspec(thread) int tls_counter = 0;
 

3. 使用操作系统API

使用 TlsAlloc(Windows)或  pthread_key_create(POSIX线程)等函数来分配TLS索引,并在每个线程中设置和获取值。

我们使用thead_local以及__declspec举例测试:

//测试TLS
#include "ThreadCache.h"
#include <thread>

using namespace std;

thread_local ThreadCache* threadcache = nullptr;

void func1()
{
	threadcache = new ThreadCache();

	cout << "func1->threadcache : " <<  threadcache << \
                    " thread id : " << this_thread::get_id() << endl;
}

void func2()
{
	threadcache = new ThreadCache();

	cout << "func1->threadcache : " << threadcache << \
                    " thread id : " << this_thread::get_id() << endl;
}

int main()
{
	thread t1(func1);
	t1.join();

	thread t2(func2);
	t2.join();

	return 0;
}

我们可以发现,不同的两个线程各自独立拥有threadcache变量。

我们将thread_local换成__declspec(thread)在MVSC上效果是一样的,这里就不多做演示了。

 现在,我们将TLS技术结合进我们的thread cache代码中,在thread cache下新增:

static __declspec(thread)ThreadCache* threadcache = nullptr;

我们希望每一个线程都能够有自己独立的thread cache,这个thread cache什么时候创建呢?在第一次申请内存对象时,我们进行创建:

void* ConcurrentAlloc(size_t size)
{
	if (threadcache == nullptr)
	{
		threadcache = new ThreadCache();
	}

	return threadcache->Allocate(size);
}

这样,我们就可以使用这个方法去申请内存对象,且线程拥有自己独立的thread cache。最后,还有内存对象的释放:

void* ConcurrentFree(void* ptr, size_t size)
{
	assert(threadcache);
	threadcache->Deallocate(ptr, size);
}

总结:

在thread cache模块我们做了什么?首先,每个线程需要拥有一个独立的thread cache,于是我们创建了ThreadCache类,接着我们通过分析和画图得到了他的结构,是一个哈希桶,每个桶的位置是一个自由链表,于是我们设计了这个自由链表FreeList,有了这个自由链表,接着我们就需要定义我们的哈希桶需要多少个这样的自由链表,于是我们创建了Caculate类,里面我们学习大神的设计方法,根据不同申请内存大小的区间范围定下了不同的对齐方式,计算出了自由链表的数量为208,于是我们就有了这样的数据结构:FreeList _freelists[208]; 所以我们也就可以像他去申请内存对象以及释放了,就有了这样的两个方法:Allocate, Deallocate。对于Allocate方法,我们向thread cache申请内存对象,那么他会分配给我们多少字节的空间,以及从哪个自由链表去获取呢?于是我们通过学习大神的代码在Caculate中又写了Index以及RoundUp方法,这样我们就可以通过Allocate(size_t size),计算出了他将要给我们分配的对象大小,以及这个对象所在的自由链表的位置,如果这个自由链表是空的,那么就会向下一层CentralCache去申请内存对象,这个方法我们在下一层会去实现。对于Deallocate,我们释放内存对象并不需要delete,而是将内存对象返还到对应的自由链表中。至此,我们设计好了他的大致结构,并初步完成,接着,我们希望每一个线程都能拥有一个独立的thread cache,于是引入了TLS技术,也就是线程局部存储技术,并介绍了几种方式,接着在全局中定义了他,并将Allocate, Deallocate结合TLS进一步封装,于是每一个线程就能够通过这个方法拥有自己的thread cache并申请内存对象以及释放。

六、central cache

central cache也是一个哈希桶,不同于thread cache的是,中心缓存的桶下挂的是一个大内存span,并将span划分为一个个的小内存块挂在span的自由链表下分配给thread cache,就像这样:

图中,8Byte映射位置下挂的span中的页被划分为8字节的内存块的自由链表,那么256KB映射位置下挂的span中的页被划分为256Kb的内存块的自由链表。

Central Cache的结构如何设计?首先我们创建CentralCache类,类比ThreadCache需要设计FreeList,我们这里需要设计SpanList,FreeList里有void* _freelist,那么SpanList里就应该有Span* obj; Span结构我们又如何设计呢?

struct Span
{
	PageSize page_id; //页的起始号码
	size_t _n;        //页数

	Span* _prev;
	Span* _next;

	void* _freelist;
	size_t _usecount;

	Span()
		:page_id(0)
		,_n(0)
		,_prev(nullptr)
		,_next(nullptr)
		,_freelist(nullptr)
		,_usecount(0)
	{}
};

我们将Span设计成双向链表结构,将来希望在Span* pos位置增删时,能够达到O(1),我们谈页,OS的页一般来说是4KB或者8KB大小,_freelist就是自由链表,_usercout是分配出去的内存对象的数量,将来我们可以通过_freelist是否为空判断Span中是否还有对象,通过_usecount是否为0判断对象是否已经全部返还,每当有内存对象被分配,_usecount加上对应数量, 每当有内存对象被回收,_usecount减去对应数量,什么时候会回收呢?当thead cache对应的自由链表下的内存对象空闲过多时会将他们返还给CentralCache。

于是我们就可以设计SpanList类,这个链表我们设计为双向带头循环链表,所以,我们可以这样写:

//双向带头循环链表
class SpanList
{
private:
	Span* _head;

public:
	SpanList()
	{
		_head = new Span();
		_head->_next = _head;
		_head->_prev = _head;
	}
}

现在,我们完善这个类,完成增和删方法:

	void Insert(Span* pos, Span* newspan)
	{
		assert(pos && newspan);
		
		Span* prev = pos->_prev;
		
		newspan->_prev = prev;
		newspan->_next = pos;
		prev->_next = newspan;
		pos->_prev = newspan;
	}

	void Erase(Span* span)
	{
		assert(span && span != _head);

		Span* prev = span->_prev;
		Span* next = span->_next;

		prev->_next = next;
		next->_prev = prev;
	}

这个类对于CentralCache而言也就是一个桶,对于多线程来说,他向CentralCache申请内存时,由于CentralCache只有一个,如果说不同的线程向不同的桶申请内存对象,那么他们不会发生竞争,但是如果他们同时向CentralCache中的同一个桶去申请内存,我们就必须加锁,否则就会产生线程安全问题,所以,每一个桶都需要一把锁:

class SpanList
{
private:
	Span* _head;

public:
	std::mutex _mutex;
}

于是,我们的SpanList就初步设计完成了,接下来就是对CentralCache的设计,有了ThreadCache的经验,这里桶的个数应该也是208,于是:

class CentralCache
{
private:
	SpanList _spanlists[MaxFreeListSize];
}

并且由于我们希望CentralCache在全局中有唯一对象,所以我们将他设计成单例模式,并且我们此处采用饿汉模式,在main函数之前就创建,所以我们这样做:

class CentralCache
{
private:
	SpanList _spanlists[MaxFreeListSize];
	static CentralCache _instance;

	CentralCache(){}
	CentralCache(const CentralCache&) = delete;

public:
	static CentralCache* GetInstance()
	{
		return &_instance;
	}
}

我们将构造函数和拷贝构造函数私有,需要注意的是,如果我们将CentralCache这个类放在头文件当中,_instance的初始化就放在CentralCache的cpp源文件中,否则当头文件被多次包含时会发生重定义问题,即使你的头文件加上类似于#pragma once这样的预处理指令。

我们这里展开来讲:

#pragma once 是一个非标准的预处理指令,但它被许多现代编译器广泛支持,用于防止头文件被重复包含(或称为重复包含保护)。这个指令告诉编译器,在当前编译单元中,该头文件只应该被包含(include)一次。

在C和C++编程中,头文件(.h或.hpp文件)通常包含声明(如函数原型、类声明、宏定义等),这些声明需要在多个源文件中共享。为了防止因多次包含同一个头文件而导致的编译错误(如重复定义错误),传统上,人们会使用预处理宏(preprocessor macros)来提供保护。这种技术通常被称为“包含卫士”(Include Guards)或“头文件保护”(Header Guards)。

那么,如果我们在一个头文件中使用了包含卫士,但是仍然定义了类似于这样的全局变量:  int a = 1;在包含角度来看,因为我们使用了包含卫士,每个编译单元(cpp文件及其包含的头文件)不会在一个编译单元中被重复包含,但是,问题出现在链接阶段:每个包含这个头文件的源文件都会在自己的编译单元生成一个名为a的全局变量,当这些.cpp文件被编译成.o文件并最终链接一个可执行文件或库时,链接器会遇到多个名为a的全局变量,这会导致链接错误,因为在同一作用域内,全局变量不能有多个定义。


那么如果说,我们将int a改为static int a = 1;也就是说,在头文件中声明{当在头文件中使用 static 关键字声明一个变量时,这个声明实际上已经是一个定义(在C++中,对于基本数据类型如 intstatic 声明同时完成了声明和定义,除非该变量被显式初始化为extern)}了一个全局的静态变量。首先,static 变量具有内部链接属性,也就是说,该变量仅在其声明和定义的编译单元内可见和可访问,对其他编译单元是隐藏的,因此,当多个 .cpp 文件包含同一个头文件,且该头文件中声明了 static 变量时,每个 .cpp 文件都会生成一个独立的该变量的实例。这些实例之间互不影响,因为它们存在于不同的编译单元中。这样也就不会产生重定义的问题了。所以,static 在全局或命名空间作用域中的用法主要是限制变量的链接性,使其仅在当前编译单元内可见和可访问。

那么我们将static变量放在一个头文件所包含的类中,要说明的是,类中的static变量是属于类的,而不是类的特定实例,因此,所以类的实例共享一个static变量。在C++中,static 成员变量属于类,但它们在内存中的存储位置是在类的作用域之外的,所以,我们需要在类的外部去定义他们,现在有一个问题,我们将他定义在头文件中还是定义在源文件中?首先,类中的static变量不具有内部链接属性,它具有外部链接属性,因为访问这个变量时通过类名确定他的作用域去访问的,所以如果我们将他定义在头文件中,当多个源文件包含这个头文件时,链接阶段会出现重定义错误,所以,我们需要将他定义在源文件当中。

所以,我们的_instance定义在CentralCache.cpp中,将来其他线程要获取CentralCache对象,使用内部的GetInstance() 来获取这个唯一的CentralCache对象。

现在,可以拿到CentralCache的唯一对象,ThreadCache也就可以向他获取内存对象了:

//ThreadCache从CentralCache获取内存对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)

那么,我们要如何实现这个方法?现在,我们仍然要面对几个问题,CentralCache给ThreadCache分配对象分配多少个呢? 分配太多和太少都不好,我们希望有一个算法能够计算出一个合适的值,于是,我们在Caculate中实现了这样的方法:
 

	//确定ThreadCache申请内存对象的上下限,计算出合适的申请大小
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0 && size <= MaxSize);

		int num = MaxSize / size;
		if (num < 2) num = 2;
		if (num > 512) num = 512;

		return num;
	}

这样,对于8字节的对象,我们可以分配512个,对于256KB的对象,我们分配2个,这样也就决定了上下限。但是,对于8字节的内存对象申请,一次直接给出512个,真的用得完吗?我们不知道,所以我们再加入慢开始算法:

	//Caculate::NumMoveSize(size)后面决定上限
	int batchnum = min(Caculate::NumMoveSize(size), _freelist[index].Size());
	//慢开始算法
	if (batchnum == _freelist[index].Size()) _freelist[index].Size()++;

在FreeList中加入size_t _size属性,我们取NumMoveSize(size)和FreeList中_size两者更小的值,如果说batchnum的值和_size相同,那么这个_size++,如果对这个对象的申请比较频繁,我们也可以让_size加上一个较大值,将来如果这个值超过了NumMoveSize(size),那么上限的作用也就体现出来了。

接下来,就可以向CentralCache去申请batchnum个size大小的内存对象了,这个方法我们实现在CentralCache类中,如何设计这个方法呢?首先要有内存对象大小以及我们计算出的申请个数,接着,因为返回值只能返回一个值,所以我们可以设计两个输出型参数void* start, void* end,通过他们将申请好的内存对象带回来:

CentralCache::GetInstance()->FetchRangeObj(start, end, batchnum, size);

返回值我们设置为具体申请到的内存对象的数量。

接下来,我们得到这个返回值realnum,判断他,如果是1,那么直接将start返回,让用户直接去使用即可,如果大于1,那么返回start之前,将nextobj(start)和end之间的这些对象全部挂到_freelist[index]上:

	if (realnum == 1)
	{
		return start;
	}
	else
	{
		_freelist[index].RangeInsert(nextobj(start), end);
		return start;
	}

    //FreeList中新增方法,我们这里贴出来
	void RangeInsert(void* start, void* end)
	{
		assert(start && end);

		nextobj(end) = _freelist;
		_freelist = start;
	}

最后,我们实现一下FetchRangeObj方法,也就是从CentralCache获取内存对象,但是从它里面获取对象前,我们需要从它里面先获取一个自由链表不为空的Span,所以我们需要实现:

// 获取一个自由链表非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);

我们先假设这个函数实现了,先继续完善FetchRangeObj方法,通过Caculate中的Index方法获取到位置,将_spanlists[index]以及内存对象大小传给GetOneSpan方法获取到一个span后,从这个span上的自由链表拿下batchnum个内存对象,并用计数器记下实际上拿到的对象数量,用start以及end拿到这些对象,并从span中移除。整个过程加锁,因为到现在,要从指定的桶里去取内存对象。

//batchNUm是ThreadCache想要获取内存对象的数量
size_t CentralCache::FetchRangeObj(void*& start, void*& end, \
                                            size_t batchNum, size_t size)
{
	size_t index = Caculate::Index(size);
	_spanlists[index]._mutex.lock();

	Span* span = GetOneSpan(_spanlists[index], size);
	assert(span && span->_freelist);

	start = span->_freelist;
	end = start;

	size_t i = 1, actualnum = 1;
	while (i < batchNum && nextobj(end) != nullptr)
	{
		++i, ++actualnum;
		end = nextobj(end);
	}

	span->_freelist = nextobj(end);
	nextobj(end) = nullptr;

	_spanlists[index]._mutex.unlock();
	return actualnum;
}

总结:

在Central cache模块我们做了什么?同样的,我们首先画出了CentralCache的结构图,确定了其数据结构是哈希桶,以及每个桶都是一个SpanList结构,每一个Span下挂载的都是一个自由链表。接下来我们定义了Span以及实现了SpanList(采用带头双向循环链表),最终定下了_spanlists这样的数据结构,于是,我们接着定义了他的单例模式,实现了theadcache向centralcache申请内存对象的FetchFromCentralCache方法,在这个方法中,我们实现了慢开始算法以及上下限的方法,并接着将这个结果通过GetInstance传给了FetchRangeObj方法,在这个方法中,我们获取了一个可用的Span,并将分配的对象以及数量返回,在FetchFromCentralCache方法拿到后根据这个返回值判断返回start以及将对象链入对应的自由链表中。

七、page cache

page central也是一个哈希桶,和thread cache和central cache都不同的是,他索引使用的不再是内存对象的大小,而是页数,也就是他映射所在位置的spanList里span的页数。

关于为什么是128页,这个也是经过计算的,当然也可以更大,128页已经是1024KB,也就是1MB,即使分配256KB的内存对象也能分配四个。

我们怎么理解page cache,线程从thread cache开始申请内存对象,如果thread cache对应位置的自由链表不为空,那么就直接分配,否则就从central cache去申请内存对象,如果central cache对应位置的spanList中的span中的freelist不为空,那么返回这个span拿到一定数量的内存对象,否则,central cache向page cache去申请span,page cache要做的就是将这个span返回。

我们在central cache中曾经假设了Getonespan是完成的,现在我们去实现它:

根据上面我们的逻辑,首先我们需要去central cache中去寻找可用的span,并从上面拿下span返回,

Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
	Span* begin = list.begin();
	while (begin != list.end())
	{
		if (begin->_freelist != nullptr)
			return begin;
		else
			begin = begin->_next;
	}
}

我们这里没有封装迭代器,直接使用了指针,begin和end方法的实现:

	Span* begin()
	{
		return _head->_next;
	}

	Span* end()
	{
		return _head;
	}

这个函数如果继续向下走,那么就证明central cache在这个桶中没有可用的span,就需要向page cache去申请,所以接下来,我们需要完成PageCache类。

PageCache数据结构也是哈希桶,而且每个桶的位置都是SpanList,于是我们可以写出:

SpanList _pageSpanLists[Npages];

Npages就是我们定义的static const int类型的常量,大小是129,我们希望1页就存放在数组下标为1的位置上,没有0页,下标为0的位置空出,接着,我们需要将PageCache设置为单例模式:

class PageCache
{
private:
	SpanList _pageSpanLists[Npages];
	static PageCache _instance;
	
public:
	static PageCache* GetInstance()
	{
		return &_instance;
	}
};

当然,定义我们仍然写在源文件中,PageCache PageCache::_instance;

现在,我们需要思考一个问题,central cache在thread cache向他申请内存时,对于每一个桶使用的是桶锁,那么对于我们pageCache来说,因为也是唯一的,所以应该也需要锁,但对他而言,我们使用桶锁好不好呢?

为了回答这个问题,我们先来说一下PageCache的工作原理,在central cache向page cache申请内存时,我们先计算出一个合适的申请页数,来到page cache对应的映射位置上,如果说有span,那么就返回,如果说没有呢?那么我们就继续从当前映射位置开始向后面的映射位置走,假如说我们找到了一个span那么就将他切分,假如说,这个span有n页,central cache申请的是k页,那么切分后,将k页的span返回,将n-k页的span重新push进PageCache中,当central cache中的span中分配的内存对象全部返回,那么就将这个span归还给PageCache,pagecache会将相邻页号的span进行合并,合并成更大的内存块。

在上面的过程中,一定是需要加锁的,因为我们对PageCache的_pageSpanLists有访问,而且在多线程下,上面的流程没有锁一定会出问题,那么现在,思考一下,桶锁好还是PageCache给central cache分配内存这个流程整体加锁好?

假设我们使用的是桶锁,现在,我们再走一遍流程:central cache向pagecache申请内存,计算好了申请页数,确定好了向PageCache哪个桶去申请,那么PageCache在对应桶分配内存时,加锁,但是,这个桶是空的,没有span,于是这个桶解锁,继续向后面的桶去遍历,在遍历这些桶时,仍然需要加锁,同时,并不能保证其他位置的桶的状态不被修改,于是,遍历过程中可能看到的PageCache的状态是不同的,这是其一;再一个,如果一直找不到可用的span,就加锁解锁往后遍历,这是其二;其三,如果不同线程以不同顺序遍历访问桶,且持有部分桶的锁,那么可能会造成死锁问题(就好比一个线程访问的是2页的桶,一个线程访问的3页的桶,第一个线程没找到span向后遍历时要申请3页桶的桶锁,但是3页桶的锁被第二个线程持有,且第二个线程发现3页桶也没有span,于是向后找,在5页桶发现了span进行切分,切分后要将切分后的span插入2页桶,于是去申请这个锁,但是这个锁被第一个线程持有,于是发生死锁)

所以,综上所述,桶锁带来的主要问题是:遍历复杂性,桶的竞争和死锁,锁的范围限制这些问题,那么你可能会问,为什么central cache就能使用桶锁了呢?那是因为central cache每个桶之间是互不干扰的,不会说256KB的span切分成其他大小的span,每个桶的访问是互不干扰的;再一个不需要遍历,没有加锁解锁的复杂性;而且也不会因此出现死锁问题,所以对他而言,使用桶锁是更优的。

所以,我们选择使用整体锁,也叫做大锁。

public:
	std::mutex _pageMtx;

于是,我们可以写PageCache分配内存的方法了,就叫做NewSpan:

	Span* NewSpan(size_t num);

接下来有一个问题,申请多少页呢?我们仍然希望能够找到一个合适的大小,于是我们在Caculate中新增方法:NumPageSize:

	//计算给一个Span多少页合适,根据内存对象大小size
	static size_t NumPageSize(size_t size)
	{
		//num --- 内存对象数
		int num = NumMoveSize(size);
		//memsize --- 这些内存对象的总大小
		int memsize = num * size;
		//计算这些内存对象需要多少页
		int pagesize = memsize >> Nshift; //右移13位,即除以8 * 1024
		if (pagesize < 1)
			pagesize = 1;

		return pagesize;
	}

我们将他接进NewSpan的参数中: 

Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
	Span* begin = list.begin();
	while (begin != list.end())
	{
		if (begin->_freelist != nullptr)
			return begin;
		else
			begin = begin->_next;
	}
	
	PageCache::GetInstance()->_pageMtx.lock();
	Span* span = PageCache::GetInstance()->NewSpan(Caculate::NumPageSize(size));
	PageCache::GetInstance()->_pageMtx.unlock();
}

好,我们假设NewSpan已经完成,已经返回了一个span,接下来要做什么?根据开始PageCache的结构图我们知道,这个span是没有划分freelist的,也就是说,在给central cache返回span之前,还需要将他划分成freelist这样的小内存对象,那么如何划分?

我们先列出span的成员:

struct Span
{
	PageSize page_id; //页的起始号码
	size_t _n;        //页数

	Span* _prev;
	Span* _next;

	void* _freelist;
	size_t _usecount;
}

页的起始号码是什么?我们说,一页有4KB或8KB,但是一般来说是8KB,我们就按照8KB来说(包括前后我们一直都是这样做的),前面定长内存池中我们使用Windows的系统调用来申请内存:

inline static void* SystemAlloc(size_t kpage)
{

#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage * (1 << Nshift), \
                        MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;
}

Nshift值为13,1 << 13也就是8KB,他会给我们返回一个地址,这个地址的值会是8KB的整数倍,对齐页面大小,所以,页号也就是这个地址除以8KB。

现在,我们能够通过页号找到分配的内存的起始地址以及末尾地址:

char* start = (char*)(span->page_id << Nshift);
char* end = (span->_n << Nshift) + start;

关于为什么我们用char*而不是void*也很好理解,char*在我们计算末尾地址时是可以计算的,void*就不行,末尾地址就是页数*8KB再加上起始地址。

这样我们也就可以对这块空间进行划分,交给freelist,具体如何划分呢?我们来看张图:

现在,我们需要将上面的内存划分到freelist上,头插好还是尾插好呢?事实上,都行,只是有一点,上面的一块内存是连续的,如果选择头插,那么从freelist上的内存对象来看,地址正好是从尾到头也就不连续了,而尾插的话,freelist尽管在我们看来是链式结构,但在物理上他是连续的,这样cpu缓存利用率高,所以我们选择尾插。

cpu缓存利用率(缓存命中率):衡量cpu缓存效率的一个重要指标,它指的是CPU在访问内存时,从缓存中成功获取数据的比例。

cpu缓存的工作原理:cpu缓存是为了解决cpu与内存之间速度匹配的问题,当cpu需要读取数据时,他会优先在缓存中查找,如果查找到了所需数据,那么就叫做缓存命中,则直接从缓存中读取,这比从内存中读取要快的多,但是如果没有命中,就需要·从内存中读取数据。


数组的物理连续性:数组在内存中的物理地址是连续的,这意为这cpu访问数组中的第一个元素并发现他不在缓存中时,它可以将包含该元素及其后续元素的一段连续内存读取到缓存中,而这段内存由于是连续的,很有可能被cpu接下来接着访问,因此,一但缓存了数组的初始部分,后续的数组访问很可能在缓存中命中,从而提高了缓存命中率。

对于链表而言,cpu访问链表的元素,由于物理地址不连续,加载一个链表节点到缓存中,并不保证后续的链表访问也会缓存命中,所以缓存命中率较低,如果该链表元素被频繁访问,cpu就可能需要频繁从内存读取数据。

所以,我们这里采用尾插,是可以提高缓存命中率的。

	span->_freelist = start;
	start += size;
	char* tail = (char*)span->_freelist;

	while (start < end) //有些时候加不整
	{
		nextobj(tail) = start;
		tail = start;
		start += size;
	}

	list.PushFront(span);

将划分好的span插入到central cache对应的桶中。

现在,我们思考一个问题,加锁问题,GetOneSpan这个方法,现在有两处需要加锁,一处是必须加,一处比较有争议,但是我们认为加上更好。最显而易见的一处就是:

	list._mutex.lock();
	list.PushFront(span);

但是,在FetchRangeObj中:

我们已经对list加了锁,这样会造成死锁?所以,在前面一定要解锁,在什么位置解锁,解锁会不会有什么不好的影响?

我们可以想一下,在 FetchRangeObj 中,调用GetOneSpan是为了从central cache对应桶中获取一个span,所以对这个桶加了锁,在GetOneSpan中,我们前面的逻辑是先遍历central cache找到可用的span,所以这里我们是不能解锁的,在遍历后,如果找到了,直接返回这个span,也就没有向page cache申请内存,list的加锁解锁问题也就不存在,如果说没找到可用的span,向page cache申请内存,那么在遍历后,我们就可以对list这个桶进行解锁,因为后面一个线程获取的span在其他线程那里是看不到的,因为此时的span*是一个函数内的临时变量,直到划分后,我们需要将这个span插入central cache中时,我们才需要加锁。

所以,我们在这里解锁:

// 获取一个非空的span, size---内存对象大小
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
	Span* begin = list.begin();
	while (begin != list.end())
	{
		if (begin->_freelist != nullptr)
			return begin;
		else
			begin = begin->_next;
	}

	//为什么加在这个位置:
	list._mutex.unlock();
	
	PageCache::GetInstance()->_pageMtx.lock();
	Span* span = PageCache::GetInstance()->NewSpan(Caculate::NumPageSize(size));
	PageCache::GetInstance()->_pageMtx.unlock();

	//划分span---_freelist
	char* start = (char*)(span->page_id << Nshift);
	char* end = (span->_n << Nshift) + start;

	span->_freelist = start;
	start += size;
	char* tail = (char*)span->_freelist;

	while (start != end)
	{
		nextobj(tail) = start;
		tail = start;
		start += size;
	}

	list._mutex.lock();
	list.PushFront(span);

	return span;
}

另外,在上述位置解锁还有一个好处,就是此时因为我们对这个桶解锁,当其他线程将申请的span中的自由链表中的内存对象释放给central cache时,能够还回来,不会因为加锁问题而阻塞。

即使当其他线程也FetchRangeObj访问这个桶,也没有任何问题,因为这个桶是空的当他进去向pagecache申请内存时,因为上一个线程还在申请,所以最多也就是阻塞,并且此时还会解锁(因为遍历后发现都是空的)。

现在,我们来完善NewSpan方法:

对于NewSpan来说,就是需要从PageCache中通过申请的页数索引拿到span返回给centralcache,通过判断这个位置的Freelist是否为空:

	//SpanList中
    bool Empty()
	{
		return _head->_next == _head;
	}

如果不为空那么直接返回,如果为空那么就向下走,遍历PageCache,找到不为空的span进行切分,将需要的span返回,另一个挂进PageCache中,如果没有找到不空的span,那么就通过我们的系统调用去申请128page的内存(我们当然希望申请大的页,因为这样内存碎片问题会更少,并且之后还可以进行合并),然后再去切分。

切分逻辑是什么呢?首先,假设我们是要对PageCache中获取到的span进行切分,而且是以页来进行切分,假设这个span有n页,我们要申请的是k页(k < n),那么就可以切分成n-k页的span和k页的span,接着我们就可以设置他们的page_id,以及页数,将n-k页的span插入PageCache,将k页的span返回。

//num是要申请的页数,直接使用num进行索引
Span* PageCache::NewSpan(size_t num)
{
	if (!_pageSpanLists[num].Empty())
		return _pageSpanLists[num].PopFront();

	for (int i = num + 1; i < Npages; ++i)
	{
		if (!_pageSpanLists[i].Empty())
		{
			Span* npage = _pageSpanLists[i].PopFront();
			Span* kpage = new Span();

			kpage->page_id = npage->page_id;
			npage->page_id += num;

			kpage->_n = num;
			npage->_n -= num;
	
			_pageSpanLists[npage->_n].PushFront(npage);

			return kpage;
		}
	}
}

那么如果是没有从PageCache中拿到span呢?那么我们就创建一个newspan,通过系统调用申请128page的内存,然后将申请好的内存首地址通过转换(左移Nshift)得到页号,以及将页数填为Npages - 1,然后再去切分,但是我们再像上面把代码写一次,有些重复,当然,我们可以将这段逻辑单独写一个函数,当然,我们有更好的方法:

我们直接将申请好的span插入到PageCache中并再调用一次NewSpan。

	Span* newspan = new Span();
	void* ptr = SystemAlloc(Npages - 1);
	newspan->page_id = (PageSize)ptr >> Nshift;
	newspan->_n = Npages - 1;

	_pageSpanLists[newspan->_n].PushFront(newspan);
	return NewSpan(num);

于是到这里,我们的PageCache初步完成。

总结:

在page cache模块我们做了什么?首先我们根据他的结构图,确定了他的数据结构也是一个哈希桶,不过他是按照申请页数来进行索引的,按照central cache向page cache申请span的逻辑,我们完善了GetOneSpan方法,并在这期间完成了PageCache类,填充了他的成员以及方法,定义了他的单例模式,接着在GetOneSpan中完成了NewSpan方法,这期间,我们还实现了Caculate中的MovePageSize方法,计算出合适的申请页数,在NewSpan中申请这个页数的span,并制定了切分策略,在GetOneSpan中完成对span的划分。

八、内存申请联调

 测试代码如下:

#include "ConcurrentAlloc.h"

void Test1()
{
	
	void* p1 = ConcurrentAlloc(1);
	cout << p1 << endl;
	void* p2 = ConcurrentAlloc(4);
	cout << p2 << endl;
	void* p3 = ConcurrentAlloc(8);
	cout << p3 << endl;
	void* p4 = ConcurrentAlloc(6);
	cout << p4 << endl;
	void* p5 = ConcurrentAlloc(3);
	cout << p5 << endl;

}

void Test2()
{
	for (int i = 1; i <= 1024; i++)
	{
		void* p = ConcurrentAlloc(i);
		cout << p << endl;
	}

	void* p2 = ConcurrentAlloc(17);
	cout << p2 << endl;
}

int main()
{
	//Test1();
	Test2();

	return 0;
}

测试结果正常。

九、thread cache内存释放

我们前面已经基本完成了对三层缓存的内存申请,现在我们将开始完成三层缓存对内存对象的回收和释放。

对于thread cache,我们有Deallocate方法:

void Deallocate(void* ptr, size_t size);

将要释放的内存对象及其大小传入,我们该如何实现函数体?首先这个内存对象时返回给thread cache的,而thread cache的数据结构是:FreeList _freelist[MaxFreeListSize]; 也就是说,我们需要根据size计算index将他插入对应的桶中,所以:

	size_t index = Caculate::Index(size);
	_freelist[index].push(ptr);

基本的内存对象回收我们可以做到了,但是一个桶下如果挂了很多空闲的内存对象,也不好,因为这样其他线程想用时是申请 不到的,同时因为他们是小的内存对象,如果不回收,那么central cache中的span也不会释放返回给page cache,也就合不出大的页。所以,我们需要一种策略来回收内存对象。

Google的tcmalloc中,使用的策略是根据freelist的长度以及内存对象的总大小来回收,我们这里只采用根据freelist的长度来回收内存对象,当freelists中内存对象的数量大于他一次性批量申请内存的大小,我们就回收一批内存对象,这个数量就是一次性批量申请内存的大小,为什么不全部回收?因为线程不一定就不会继续申请,如果全部回收,下一次线程还需要向central cache申请。

	if(_freelist[index].ObjNum() > _freelist[index].MaxSize())
		TooLongLists(_freelist[index], size);

我们如何实现TooLongLists?将对应的桶和内存对象大小传入,然后在这个函数内部批量回收内存对象,所以FreeList我们需要新增一个方法:RangePop

	void RangePop(void*& start, void*& end, int num)
	{
		assert(_objnum >= num);
		end = start = _freelist;
		
		for (int i = 0; i < num - 1; ++i)
		{
			end = nextobj(end);
		}

		_freelist = nextobj(end);
		nextobj(end) = nullptr;	
		_objnum -= num;
	}

事实上,参数中的void* end没有必要,我们直接在这个函数内部定义即可,不过有这个参数也无所谓,我们画个图来看这个方法的逻辑:

现在我们拿到了start指向的一串内存对象,通过central cache提供的方法将这一串内存对象回收。

十、central cache内存释放

void ReleaselistToSpan(void* start, size_t size);

size仍然是内存对象的大小,现在我们思考这个方法如何实现:我们先看central cache的数据结构SpanList _spanlists[MaxFreeListSize];  所以仍然是通过这个size大小确定是哪个桶要回收,计算出index后, 我们面临一个问题,这些内存对象虽然是一个桶的,但不一定是一个Span中的,我们如何确定哪一个内存对象是哪个Span的呢?现在我们知道的是内存对象的起始地址,而Span之间是这样的:

也就是说从0x007D6000到0x00FAE000之间的地址,页号都是1003,所以,我们可以通过使用内存对象的地址除以8KB,也就是右移13位计算出页号,通过页号来找对应的Span。

那么我们计算出页号后,难道需要遍历SpanList吗?不需要,我们使用unordered_map建立页号与Span*的映射即可:

std::unordered_map<size_t, Span*> _idSpanMap;

那么这个属性写在哪里呢?我们写在PageCache中,因为PageCache也用的到,现在又有一个问题,那么页号与Span*的映射在哪里,什么时候去建立呢?我们想想Span是从哪里得到的?一开始三层缓存都是空的,首先由PageCache向堆去申请内存,切分内存,才有了Span,这个时候我们取建立这个映射:

			for (PageSize i = 0; i < kpage->_n; ++i)
			{
				_idSpanMap[i + kpage->page_id] = kpage;
			}

			return kpage;

于是,我们就可以通过内存对象的起始地址转化为页号找到对应的Span,这个过程我们封装一个函数去实现:

Span* PageCache::MapObjectToSpan(void* obj)
{
	auto ret = _idSpanMap.find((PageSize)obj >> Nshift);
	if (ret != _idSpanMap.end())
		return ret->second;
	else
		assert(false);

	return nullptr;
}

于是,我们就可以遍历从start开始的内存对象,将他们一一回收,直到start为空:

	while (start != nullptr)
	{
		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		Span* next = (Span*)nextobj(start);

		nextobj(start) = span->_freelist;
		span->_freelist = start;

		start = next;		
	}

 这个过程,我们需要加锁,因为我们在访问SpanList中的Span,并要将内存对象插入,所以:

void CentralCache::ReleaselistToSpan(void* start, size_t size)
{
	size_t index = Caculate::Index(size);
	_spanlists[index]._mutex.lock();

	while (start != nullptr)
	{
		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		void* next = (void*)nextobj(start);

		nextobj(start) = span->_freelist;
		span->_freelist = start;

		start = next;	
	}

	_spanlists[index]._mutex.unlock();

}

那么,central cache中的Span什么时候被PageCache回收呢?我们一开始在central cache就说过,Span是通过_usecount是否为0来判断内存对象是否全部回来,所以我们使对应的Span的usecount--,然后判断是否为0,如果为0,那么就回收,并从SpanList中移除:

		if (--span->_usecount == 0)
		{
			_spanlists[index].Erase(span);
			span->_freelist = span->_next = span->_prev = nullptr;

			_spanlists[index]._mutex.unlock();

			PageCache::GetInstance()->_pageMtx.lock();
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock();

			_spanlists[index]._mutex.lock();
		}		

对于要移除的span,我们清空他的指针,防止野指针问题,对_spanlists[index]._mutex.unlock(); 我们为什么要解这个桶锁?因为我们对桶的操作已经做完了,span已经从桶中拿下,后面的操作是PageCache回收span的操作,所以这个桶锁我们可以解,让其他线程能够申请和回收内存。

于是,PageCache::GetInstance()->ReleaseSpanToPageCache(span);就是我们下一步要做的。

十一、page cache内存释放

这里我们如何做呢?PageCache要回收central cache的span,PageCache::GetInstance()->ReleaseSpanToPageCache(span);也传入了一个空闲的span,span中可以拿到申请的内存的起始页号,我们需要做的是:

找到span首尾页号的相邻页的span进行合并,合并出一个更大的内存,所以,我们需要找到span首尾页的相邻页号是否在_idSpanMap中有映射,所以,我们需要对切分的页npage进行首尾映射:

//num是要申请的页数,直接使用num进行索引
Span* PageCache::NewSpan(size_t num)
{
	if (!_pageSpanLists[num].Empty())
		return _pageSpanLists[num].PopFront();

	for (size_t i = num + 1; i < Npages; ++i)
	{
		if (!_pageSpanLists[i].Empty())
		{
			Span* npage = _pageSpanLists[i].PopFront();
			Span* kpage = new Span();

			kpage->page_id = npage->page_id;
			npage->page_id += num;

			kpage->_n = num;
			npage->_n -= num;
	
			_pageSpanLists[npage->_n].PushFront(npage);
			//将npage的首尾插入_idSpanMap中
			_idSpanMap[npage->page_id] = npage;
			_idSpanMap[npage->page_id + npage->_n - 1] = npage;

			for (PageSize i = 0; i < kpage->_n; ++i)
			{
				_idSpanMap[i + kpage->page_id] = kpage;
			}

			return kpage;
		}
	}

	Span* newspan = new Span();
	void* ptr = SystemAlloc(Npages - 1);
	newspan->page_id = (PageSize)ptr >> Nshift;
	newspan->_n = Npages - 1;

	_pageSpanLists[newspan->_n].PushFront(newspan);
	return NewSpan(num);
}

对于kpage来说,他即将被划分小内存对象,将来小内存对象一定要回收的,所以kpage的每个页号都需要建立映射,而对于npage来说,他即将被挂在PageCache上,所以他只需要考虑被申请以及被合并,如果被申请,那么这个时候在将每个页号映射,而被合并只需要他的首尾页号,所以不需要每个页号都建立映射关系。

那么我们就直接索引映射吗?当然不行,比如这种情况:

所以我们如何判断central cache是否正在使用呢?通过_usecount吗?这是不可行的,当central cache刚刚申请一个span时,_usecount为0,如果此时其他线程返回给PageCache一个span正好和这个刚申请的span页号相邻,那么两个span就合并了,但其中一个是被central cache申请下来正在使用的,所以我们不能通过_usecount来判断·,而是为Span新增一个属性:isUse,默认为false,当central cahce通过GetOneSpan拿到Span*时,将其中的isUse置为true,表示正在使用,这样,即使其他线程有上面图片的情况,我们通过isUse也能判断,前面的span是不能被合并的。

那么,当span合并了前面或者后面的span,有没有可能,合并后仍然还能和前面或者后面的页继续合并?是可能的,所以我们写成死循环,直到不能合并,或span的isUse为true时break。

并且,我们还有一种情况可以break,就是如果即将合并的两个span,两者的页数相加大于Npages-1,超过了PageCache能申请的页数的上限,那么我们就不合并,也许你会问,我们申请的页最大就是128页,而且span都是从128页切分下来的,他们合并怎么会大于128页呢?看图:

 

那么当:

所以,我们需要判断三次,然后才能进行合并,现在,我们来写代码:

	//向上合并
	while (1)
	{
		PageSize pageid = span->page_id - 1;
		auto temp = _idSpanMap.find(pageid);

		if (temp == _idSpanMap.end())
		{
			break;
		}
		
		Span* prevSpan = temp->second;
		if (prevSpan->isUse == true)
		{
			break;
		}
		
		if (prevSpan->page_id + span->page_id > Npages - 1)
		{
			break;
		}

		span->page_id = prevSpan->page_id;
		span->_n += prevSpan->_n;

		_pageSpanLists[prevSpan->page_id].Erase(prevSpan);
		delete prevSpan;
	}

	//向下合并
	while (1)
	{
		PageSize pageid = span->page_id + 1;
		auto temp = _idSpanMap.find(pageid);

		if (temp == _idSpanMap.end())
		{
			break;
		}

		Span* nextSpan = temp->second;
		if (nextSpan->isUse == true)
		{
			break;
		}

		if (nextSpan->page_id + span->page_id > Npages - 1)
		{
			break;
		}

		span->_n += nextSpan->_n;

		_pageSpanLists[nextSpan->page_id].Erase(nextSpan);
		delete nextSpan;
	}

为什么要delete prevSpan和nextSpan?因为他们被合并了,且Span*都是被new出来的,所以我们需要delete。

接着,我们就需要将合并完的span重新插入_pageSpanLists中,并设置isUse为false,因为这个span是从central cache中来的,isUse一定是true。最后别忘了建立这个span的首尾映射,不仅仅在切分时对npage建立映射,合并后的span也需要建立关于_idSpanMap的映射。

	//因为这个span是从centra cache中刚刚拿回来的,isUse一定是true
	span->isUse = false;
	_pageSpanLists[span->_n].PushFront(span);
	_idSpanMap[span->page_id] = span;
	_idSpanMap[span->page_id + span->_n - 1] = span;

十二、内存释放联调

void TestFree1()
{

	void* p1 = ConcurrentAlloc(1);
	void* p2 = ConcurrentAlloc(4);
	void* p3 = ConcurrentAlloc(8);
	void* p4 = ConcurrentAlloc(6);
	void* p5 = ConcurrentAlloc(3);

	ConcurrentFree(p1,1);
	ConcurrentFree(p2,4);
	ConcurrentFree(p3,8);
	ConcurrentFree(p4,6);
	ConcurrentFree(p5,3);
}

经过简单的测试,运行正常,我们后面会进行多线程场景测试以及性能分析。

十三、综合优化

解决释放内存对象传size问题

正常的内存释放函数不应该传内存对象大小,所以我们要想办法去掉这个参数,我们需要思考的是,如果我们去掉这个参数,如何找到这个内存对象是属于哪个Span,现在,我们已知的是,可以通过这个内存对象的地址去算出一个页号,所以我们通过这个页号找到对应的Span,那么我们就应该在这个Span中新增属性--内存对象大小_objSize,于是我们的代码有如下修改:

	//GetOneSpan
    PageCache::GetInstance()->_pageMtx.lock();
	Span* span = PageCache::GetInstance()->NewSpan(Caculate::NumPageSize(size));
	span->isUse = true;
	span->_objSize = size;
	PageCache::GetInstance()->_pageMtx.unlock();
static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;

    //......
}

大于256KB的大块内存申请问题

我们前面说过,ThreadCache只能申请最大256KB的内存对象,那么如果我想申请大于256KB的内存块呢?256KB是32page,大小只要不超过128page,我们的三层缓存就还能用,也就是说,不超过1024KB,大于256KB的内存块,我们可以直接向PageCache去申请,如果大于1024KB,那么就向堆去申请:

所以,我们在ConcurrentAlloc有这样的修改:

static void* ConcurrentAlloc(size_t size)
{
	if (size > MaxSize)
	{
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = \
        PageCache::GetInstance()->NewSpan(Caculate::RoundUp(size) >> Nshift);

		span->_objSize = size; //为释放做准备,不希望free函数传size
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->page_id << Nshift);
		return ptr;
	}
	else
	{
		if (threadcache == nullptr)
		{
			static ObjectPool<ThreadCache> tcPool;
			threadcache = new ThreadCache();
		}

		return threadcache->Allocate(size);
	}

	return nullptr;
}

我们向堆的申请也写在了NewSpan中,因为如果没有大于128page,那就按照正常逻辑去申请,如果大于128page,我们也统一在NewSpan中向堆去申请,这样的好处是当我们释放内存对象时,也能够统一去释放:

	if (num > Npages - 1)
	{
		Span* newspan = new Span();
		void* ptr = SystemAlloc(num);
		newspan->page_id = (PageSize)ptr >> Nshift;
		newspan->_n = num;

		_idSpanMap[newspan->page_id] = newspan;
		return newspan;
	}

所以,对于大块内存的释放,我们也有对应的修改,ConcurrentAlloc:

static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;

	if (size > MaxSize)
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(threadcache);
		threadcache->Deallocate(ptr, size);
	}
}

在ReleaseSpanToPageCache中我们统一释放:

	if (span->_n > Npages - 1)
	{
		void* ptr = (void*)(span->page_id << Nshift);
		SystemFree(ptr);

		delete span;
		return;
	}

使用定长内存池脱离使用new

我们这个项目的目的就是为了在多线程场景下取代new/delete/malloc/free,而在我们的项目当中,有这么几处使用new的地方,一个是ThreadCache的创建,剩下的就是Span的创建,而我们的定长内存池就是为了这样的对象准备的,所以,我们那些使用new、delete的地方都可以通过创建ObjectPool,使用他的New和Delete方法来替代,这里我们简单贴出代码:

		if (threadcache == nullptr)
		{
			static ObjectPool<ThreadCache> tcPool;
			threadcache = tcPool.New();
			//threadcache = new ThreadCache();
		}

	if (span->_n > Npages - 1)
	{
		void* ptr = (void*)(span->page_id << Nshift);
		SystemFree(ptr);

		g_objPool.Delete(span);
		return;
	}

 类似于这样,其他几处修改这里就不贴出了。

十四、性能分析

我们经过测试后得出:

我们发现两者的效率事实上差不多,但是我们预期是希望我们的项目在多线程场景下效率能够优于malloc等,所以我们需要对我们的代码进行优化,但是我们如何进行优化?我们怎么知道哪块代码的性能如何呢?

这就需要性能分析工具:

如果你的Visual studio没有设置链接器 /PROFILE,那么就在项目属性/链接器/命令行下,将/PROFILE添加到其他项目中,确定或应用,就可以使用性能分析工具了。

这里的图片丢了,各位自行测试。这里贴出结论,是Deallocate中的TooLongLists中的ReleaselistToSpan中的MapObjectToSpan消耗很大。

十五、基数树代替哈希表优化

关于我们为什么要使用基数树,有几点,第一点就是STL容器,类似于哈希表,申请空间本质还是去调malloc,第二点就是在访问他时需要加锁,而基数树不需要。

首先我们先介绍单层基数树,这适用于32位平台:

//BITS--> 32 - Nshift也就是页数 使用预处理 BitNum - Nshift
template <int BITS>
class TCMalloc_PageMap1 
{
private:
	static const int LENGTH = 1 << BITS;
	void** array_;    

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap1() 
	{
        //算出要申请的内存大小
		size_t size = LENGTH * sizeof(void*);	
         //按照页申请内存
		size_t actualSize = Caculate::_RoundUp(size, 1 << Nshift);

		array_ = (void**)SystemAlloc(actualSize >> Nshift);
		memset(array_, 0, sizeof(void*) << BITS);
	}

	void* get(Number k) const 
	{
		if ((k >> BITS) > 0) 
		{
			return NULL;
		}

		return array_[k];
	}

	void set(Number k, void* v) 
	{
		array_[k] = v;
	}
};

这就好在我们不调用malloc,而是使用系统调用,再一个就是不需要加锁,因为我们之前效率不是很高的原因就是使用哈希表,当我们需要访问哈希表,读写时,都会加锁,像在NewSpan和ReleaseSpanToPageCache这两个函数中对哈希表进行写入,在ConcurrentFree和ReleaselistToSpan中对哈希表进行读我们其实可以发现,(先不从结构上来讲,因为从结构上来讲是有影响的,并且读写就不分离了)读写是分离的,也就是说读和写是互不影响的,在NewSpan时,new的同时这个新的Span一定不会被同时读,而ReleaseSpanToPageCache和ReleaselistToSpan是上下层关系,两者更不会同时操作一个Span,ConcurrentFree更是释放内存的上层,所以我们说,对读写事实上是分离的,但是我们需要加锁的原因就是哈希表的底层结构会因为插入和删除数据而变化,他的底层结构是桶或链表数组,也可以是更复杂的结构像红黑树,但是不管是哪个,如果发生写入,对读是有影响的,所以我们加了锁,其实主要原因还是他的写入会影响哈希表的结构。而我们上面的单层基数树在使用前就已经将空间全部开好,开的大小是2MB,然后让arrry_指向,这里采用了直接定址法,通过页号来索引Span*。

当然,还有两层三层基数树,两层的依然适用于32位机器,三层基数树就是给64位机器使用的,我们这里贴出两层基数树的代码,不过不多赘述,原理都一样,就是一个树形结构,不同的是页号的前14位是表示第二层的下标索引,从第15位到第19位是第一层的下标索引。

template <int BITS>
class TCMalloc_PageMap2 
{
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5;
	static const int ROOT_LENGTH = 1 << ROOT_BITS;
	static const int LEAF_BITS = BITS - ROOT_BITS;
	static const int LEAF_LENGTH = 1 << LEAF_BITS;
	// Leaf node
	struct Leaf 
	{
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
	void* (*allocator_)(size_t); // Memory allocator

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap2() 
	{	
		memset(root_, 0, sizeof(root_));
		PreallocateMoreMemory();
	}

	void* get(Number k) const 
	{
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);

		if ((k >> BITS) > 0 || root_[i1] == NULL) 
		{
			return NULL;
		}

		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) 
	{
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);

		assert(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	bool Ensure(Number start, size_t n) 
	{
		for (Number key = start; key <= start + n - 1;) 
		{
			const Number i1 = key >> LEAF_BITS;

			if (i1 >= ROOT_LENGTH)
				return false;

			if (root_[i1] == NULL) 
			{
				//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
				static ObjectPool<Leaf> objPool;
				Leaf* leaf = (Leaf*)objPool.New();

				if (leaf == NULL) return false;

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}
			// Advance key past whatever is covered by this leaf 
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	void PreallocateMoreMemory() 
	{
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

在最后,我们将代码中的哈希表替换为基数树即可,并将PageCache::MapObjectToSpan中的哈希表替换,去掉锁,因为基数树的结构是不会被改变的,他在使用前就已经创建好了,并且因为读写是分离的,所以事实上在这样的情况下,不加锁对于我们的基数树没有影响。 

现在我们可以看到的是,我们的效率明显高了很多。最后要说的是,我们的高并发内存池写出来后一般是要做成动态库或者静态库给别人使用。

至此,我们的高并发内存池项目就做完了。

十六、整理与总结

现在,我们对整个项目进行整理,拉通思路。

首先我们完成了一个定长内存池,也叫做对象池,在我们后续也确实在不少地方使用到了,比如对Span和ThreadCache,以及基数树中对Leaf的申请。

接着我们开始了ThreadCache的编写,于是我们首先创建了ThreadCache类,并且确定了结构为哈希表,由申请字节大小做key,这个申请字节大小我们定下了几个范围,每个范围都对应着一个对齐大小,由此,哈希表的下标范围和区间也就定下了,下标范围也就是0~207,申请内存对象大小最大为256KB,接着我们又要求每个线程独有一个ThreadCache缓存池,于是我们引入了TLS技术,使得每一个线程都独有一个缓存池。

接着线程向他们各自的线程需要申请空间,于是我们创建了Allocate方法去申请内存对象,并将他封装在ConcurrentAlloc方法中,但是显然我们的ThreadCache初始为空,所以他要向下一层去申请内存对象,于是我们开始了下一层的编写,下一层叫做CentralCache,由所有线程共享,数据结构为哈希表,也是由申请内存对象大小做key,但是value位置不再是freelist,而是Span,Span会切分成freelist,于是我们又在ThreadCache中创建了FetchFromCentralCache方法,向中心缓存去申请内存对象,以及在CentralCache中创建了FetchRangeObj方法,于是FetchFromCentralCache调用FetchRangeObj申请内存对象,在FetchRangeObj中调用GetOneSpan去申请Span,在FetchRangeObj中得到Span后将一定数量的内存对象设置到ThreadCache的数据结构中,并返回一个内存对象的空间,但是显然CentralCache是空的,于是CentralCache又向下一层去申请空间,于是我们创建了PageCache类,并在PageCache中创建了NewSpan方法,由GetOneSpan去调用NewSpan,由于PageCache一开始也是空的,所以他又向堆去申请,申请下来的页保存在PageCache的数据结构中,他的数据结构也是哈希表,只是以页为单位,并将申请好的页切分,切分后返回给NewSpan回到GetOneSpan方法,在GetOneSpan中将这个Span划分成freelist,然后返回到FetchFromCentralCache方法中,再返回到Allocate中,最后返回给用户。

释放时,就调用ConcurrentFree,将对象地址传入,于是此时有两条路,一是我们通过PageCache中的MapObjectToSpan找到对应的Span,然后调用ReleaseSpanToPageCache将这个Span释放,第二条路就是调用Deallocate,然后他去调用ReleaselistToSpan,在这个函数里再去调用ReleaseSpanToPageCache,将回来的Span进行合并。

然后,我们进行了一系列优化,其中最值得我们说的优化点在于基数树,因为我们的效率能够大幅提升就是因为经过我们性能分析后得到的结果显示Deallocate中的ToolongList中的MapObjectToSpan调用时间占用较长,因为他对哈希表_idSpanMap加了锁,深层次原因就是在Map中查找本身就比较耗费时间,当数据量变多后,对于锁的竞争也就更大,这样,调用这个函数就比较耗费时间了。

所以我们使用基数树去替代哈希表,这样就解决了锁的问题。

最后画个图总结下函数调用关系:

标签:span,void,cache,并发,内存,我们,size
From: https://blog.csdn.net/m0_74824254/article/details/141894860

相关文章

  • Go语言grequests库并发请求的实战案例
    在当今快速发展的互联网时代,数据的获取和处理速度成为了衡量一个系统性能的重要标准。Go语言以其并发处理能力而闻名,而grequests库则为Go语言的HTTP请求提供了简洁易用的API。本文将通过一个实战案例,介绍如何使用Go语言的grequests库来实现高效的并发HTTP请求。1.引言在......
  • 4G内存的32位系统,内核地址空间分配1G的情况下,为什么实际可用的低端内存只有800M左右?
    在32位系统中,即使物理内存为4GB,但由于地址空间和硬件限制,实际可用的低端内存(lowmemory)只有大约800MB左右。以下我将详细解释其中的原因。1.32位地址空间的限制1.132位系统的地址空间虚拟地址空间: 在32位系统中,虚拟地址空间的范围是0x00000000到0xFFFFFFFF,总共4G......
  • 【转】[C#] WebAPI 防止并发调用二(更多)
    转自:阿里的通义灵码接上篇:https://www.cnblogs.com/z5337/p/18181574在C#中防止接口的并发访问(即确保同一时间内只有一个线程能够访问某个资源或方法),可以通过多种方式实现。这里列出一些常见的方法:1.使用 lock 语句lock 是一种常用的同步原语,用于保护对共享资源的访问......
  • 优化下载性能:使用Python多线程与异步并发提升下载效率
    文章目录......
  • 揭秘:一行代码搞定.Net API高并发的烦恼!
            高并发下的接口请求重复提交问题在.Net开发中,我们经常遇到用户疯狂点击同一按钮,或者服务响应慢时重复发送请求,导致数据重复添加或混乱。这不仅浪费资源,更会得到错误的业务结果。如何高效解决这一普遍问题呢?        常规方案使用分布式锁 面对这问题......
  • C和指针:动态内存分配(malloc,calloc,realloc,free)
     动态内存分配⭐关联知识点:linux动态内存分配为什么使用动态内存分配声明数组必须用一个编译时常量指定数组的长度。但是,数组的长度常常在运行时才知道,由于它所需要的内存空间取决于输入数据。malloc和freemalloc和free,分别用于执行动态内存分配和释放。这些函数维护一个可用......
  • 探索Go语言中的Goroutine并发机制
    什么是Goroutine在Go语言中,Goroutine是程序中最基本的并发单位。事实上,每个Go程序都会自动创建一个goroutine,那就是主goroutine,程序启动时会立即执行。Goroutine是Go语言中处理并发问题的核心工具,因此理解它的工作原理至关重要。简而言之,Goroutine是并发执行的函数,这些函......
  • 深入理解Go并发编程:避免Goroutine泄漏与错误处理
    Go语言以其强大的并发模型和高效的协程(goroutine)而闻名。协程的轻量级和易用性使得并发编程变得更加简单。然而,如果不正确管理协程,可能会导致Goroutine泄漏,从而消耗系统资源,影响程序性能。本文将深入探讨如何避免Goroutine泄漏,并提供实用的代码示例和技巧,帮助您编写更加健壮......
  • Go语言并发编程之Channels详解
    并发编程是Go语言的一大特色,而channel(通道)则是Go语言中用于实现并发的核心工具之一。它源于CSP(CommunicatingSequentialProcesses)的概念,旨在让多个goroutine之间能够高效地进行通信和同步。本文将深入探讨channel的用法、原理和最佳实践,通过丰富的示例代码和详细的解释,帮......
  • 动态内存管理
    1.为什么要有动态内存分配我们已经掌握的内存开辟⽅式存在两个缺点 •空间开辟⼤⼩是固定的。•数组在申明的时候,必须指定数组的⻓度,数组空间⼀旦确定了⼤⼩不能调整但是对于空间的需求,不仅仅是上述的情况。有时候我们需要的空间⼤⼩在程序运⾏的时候才能知道,那数组的......