目录
项目源码:ConcurrentPool · 自律的阿龙/项目仓 - 码云 - 开源中国
项目介绍
高并发内存池,原型是谷歌(Google)的一个开源项目tcmalloc(Thread-Cache Malloc),线程缓存的malloc,目的是为了实现高效的多线程的内存管理。可以说是malloc和free在特定场景下的升级版。
这个技术是非常有名的,go语言的内存分配器用的就是tcmalloc。
使用场景:多线程应用,高性能服务器等。
我在这里实现的并不是完全实现,而是把tcmalloc的核心框架进行简化后的实现,就是一个简易版,只是为了学习tcmalloc的精华。
该项目的使用技术栈如下:
C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的技术。
内存池
池化技术
我把池化技术想成一个预备仓库,就像做电动车生意,不可能卖一台进货一台,而是先拥有一个仓库,里面就是预备品,一台电动车卖出后,要能及时补货。
所以池化技术:一次性向内存申请过量的资源,然后我们自己对这块内存进行分配管理。就是一个仓库
为什么要池化技术?
可以思考,上述例子如果我们卖一台进货一台,如果是生产地可能还好说,但如果我们不在生产地做的生意,电动车的组装好从生产地发出,到我们收到,这段时间成本可就大了。
记住一个点:在计算机中创建一个资源是有损耗的(相当于每次的进货)。
注意:当然不只是内存,还有很多需要储备的资源,都可以使用“池化技术”,线程池,对象池,连接池,线程池:一次申请多个线程,等待任务到达,从线程池中选择一个睡眠的线程完成任务。
内存池
所以对于内存池,我们就像一次进货一大批内存,卖完先从仓库里内存,仓库没了再继续进货。而且我们内存是可以回收的,不需要时就放回“池”中即可(二手电动车)而不是直接还给操作系统(产家),这样就可以大大提升效率。
在内存池中需要解决的问题
内存碎片:内存创建空间需要连续的空间
外碎片:浏览器释放后,在游戏和音乐中间有一块空闲空间,和下面有一块,此时我们空间够了,但是由于不连续新的程序没办法执行。
内碎片:由于内存中是有对齐规则的,所以在每一个对象里面可能还存在一部分的碎片
内存池解决了外部碎片,同时能减少内部碎片。
malloc
我们知道C++中的new 和 delete是调用了 malloc 和 free。但这两个函数也只是一个接口,他们还要进行往下调用系统调用。
但其实malloc其实也是一块内存池,它也是一次向内存申请一大块内存,然后我们需要使用时,是去和它拿而不是内存中拿,只有malloc申请的空间用完才会继续像操作系统进货。
定长内存池的实现
定长内存池:只能支持固定大小的内存申请的内存池,定长的就强制性要求,每次申请的空间都是相同的好处就是外部碎片就没了。
可以把定长内存池,现在你的电动车店只卖一种车型。
定长内存池会作为后期高并发内存池的实现的组件
定长实现
定长也有一个隐藏的含义:对象池,因为对象的大小都是一样的。
有两种方法:1.非类型模版参数 2.类型模版参数
非类型模版参数:
template<size_t N>
class ObjectPool
{};
这里要求程序猿在使用时传入定长size N,即一个整数作为定长内存大小。
类型模版参数:
template<class T>
class ObjectPool
{};
上面说定长的隐藏含义:对象池,所以只要用一个模版参数就能变形的要求定长,因为对象的大小按理说是一样的。
如何一次申请大块空间
不同的操作系统之间,底层的系统调用是不同的,而我们申请大块空间时要直接和操作系统要的,malloc函数是通用接口(标准),而对于不同的操作系统来说,底层调用的函数是不同的,就比如上面的Linux调用的是brk或mmap函数。 而我们如果用windows的话,接口是VirtualAlloc函数。
条件编译:#ifdef #else #endif 满足条件才编译下述代码 。
#ifdef _WIN32
#include <Windows.h>
#else
//...
#endif
//直接去堆上申请按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
throw抛出:不会强制结束程序,抛出异常,可以根据异常进行解决。
通过条件编译我们解决了不同平台运行的差异(底层函数)。
成员变量解释
我们上面的申请内存拿到了一个指针。可以思考指针只是能找到我们要的位置,即每次取出内存的首位置,但是我们剩余多少空间是不知道的,而且对于还回来的空间我们也没办法统计
所以我们成员变量有三个:
- _memory: 指向大块内存的指针
- _remainBytes: 大块内存剩余的空间
- _freeList: 用来管理还回来的定长内存的链表头指针
为什么要重新用一个结构来管理还回来的内存?
因为我们并不知道是那一块被使用的内存还回来了,就比如现在切了三块内存出去,2号先回来了,此时直接让它回去大块内存?那就很繁琐了
由上图看出2号还回去后,我们还要给他找对位置。所以重新用一个结构管理,在调用时先看_freelist是否为空,不为空先从_freelist中调用
如果重新用结构管理返回对象不就增大内存需求了吗?
当然如果你直接定义一个List肯定会销毁很多资源,所以也不会这样使用,而是用到了头指针的思想。
在一个对象销毁后,这块空间应该就是不使用了,所以此时这块空间我们把前4/8字节(32位/64位系统)定义为一个指针,这个指针的作用就是指向前一个销毁的对象。这样也就假定了我们定长内存的最小分配应该是大于等于一个指针的。
由上图我们看出我们进行的是头插操作,不能使用尾插,因为为了减少空间,如果我们用尾插,那我们的freelist指向最后一个块,找不到前面的,如果在用一个head指针的话就加大了空间,而用头插,则一个指针也就足够。
怎么使用这个void* _freelist?
void*& NextObj(void* ptr)
{
return (*(void**)ptr);
}
指针的解引用:void*是不能解引用的,因为不知道类型,所以void*是需要强转的。因为我们要固定访问一个指针的大小,所以转换成void** 解引用出来是一个void*,void*是一个指针所以是4/8字节的。void*不能再被解引用。为什么不用int**和int*。虽然原理相同但是void*用于指向随便一块空间,符合语法特性,int*指向的空间解引用是int。所以指向性不明确。
//释放对象
void Delete(T* obj)
{
//显示调用T的析构函数清理对象
obj->~T();
//将释放的对象头插到自由链表
NextObj(obj) = _freeList;
_freeList = obj;
}
记住我们这块空间是不用的,所以要调用析构函数,然后在指定指针。
为什么大块内存用char*
这个比上面的好理解,因为char*是固定一字节的,所以根据传进来的类型的大小用char*向后对象大小的距离就可以找到下一块。
内存池申请对象
申请对象有两种情况:
1._freelist不为空
_freelist不为空那就用_freelist的,因为是定长的所以从头取就可以了,然后_freelist指向下一个块
2._freelist为空
_freelist为空说明还没有归还的空间,此时就从大块内存中拿即可
同时要记得维护好remainBytes,当remainBytes用完了要去找内存拿。
内容申请模拟:
//申请对象
T* New()
{
T* obj = nullptr;
//优先把还回来的内存块对象,再次重复利用
if (_freeList != nullptr)
{
//从自由链表头删一个对象
obj = (T*)_freeList;
_freeList = NextObj(_freeList);
}
else
{
//保证对象能够存储得下地址
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
//剩余内存不够一个对象大小时,则重新开大块空间
if (_remainBytes < objSize)
{
_remainBytes = 128 * 1024;
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
//从大块内存中切出objSize字节的内存
obj = (T*)_memory;
_memory += objSize;
_remainBytes -= objSize;
}
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
定位new: 指定位置初始化对象(就是调用构造函数),但要保证这里的初始化对象和定位new的对象是同一种。
定长内存池模拟实现:
//定长内存池
template<class T>
class ObjectPool
{
public:
//申请对象
T* New()
{
T* obj = nullptr;
//优先把还回来的内存块对象,再次重复利用
if (_freeList != nullptr)
{
//从自由链表头删一个对象
obj = (T*)_freeList;
_freeList = NextObj(_freeList);
}
else
{
//保证对象能够存储得下地址
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
//剩余内存不够一个对象大小时,则重新开大块空间
if (_remainBytes < objSize)
{
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(_remainBytes);
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
//从大块内存中切出objSize字节的内存
obj = (T*)_memory;
_memory += objSize;
_remainBytes -= objSize;
}
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
//释放对象
void Delete(T* obj)
{
//显示调用T的析构函数清理对象
obj->~T();
//将释放的对象头插到自由链表
NextObj(obj) = _freeList;
_freeList = obj;
}
private:
char* _memory = nullptr; //指向大块内存的指针
size_t _remainBytes = 0; //大块内存在切分过程中剩余字节数
void* _freeList = nullptr; //还回来过程中链接的自由链表的头指针
};
性能测试:
构建一个树节点结构。然后上面用malloc 和 free,下面用我们的定长内存池的New 和 Delete
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
void TestObjectPool()
{
// 申请释放的轮次
const size_t Rounds = 3;
// 每轮申请释放多少次
const size_t N = 1000000;
std::vector<TreeNode*> v1;
v1.reserve(N);
//malloc和free
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
//定长内存池
ObjectPool<TreeNode> TNPool;
std::vector<TreeNode*> v2;
v2.reserve(N);
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
看出翻倍的快。 但是定长内存池缺陷就是只能是相同类型的申请。所以接下来就有了我们的高并发内存池。
高并发内存池框架设计
先说说tcmalloc 和 malloc的具体区别在哪? 由于现在大部分的应用采用的多线程,malloc已经很快乐,但是对于多线程中有锁竞争这个问题,频繁的加锁和解锁,效率就下来了,tcmalloc就是在多线程下减少锁竞争的问题,规避锁的使用。从而提高效率
高并发内存池的三小只
-
thread cache:顾名思义:线程缓存,所以就是每个线程独有的缓存。用于小于等于256kb的内存分配。
-
central cache:中心缓存,多个线程共享的缓存。当thread cache中没有内存或者用干时,可以从central cache中申请,thread cache都是从 central cache中拿的,用完以后要还回去的。
-
page cache:页缓存,以页为单位存储和分配,central cache 没有内存或内存用光时,从pagecache中获取,同时用完以后是要还回去的。因为存在内存碎片的问题,所以这里是会出现内存合并的
1.thread cache是各个线程独有的,所以不会出现锁竞争问题。
2.central cache中心分配,页缓存只管从系统中拿内存,而不用管理下面的thread cache怎么拿。central cache从上拿下来分配给threadcache,从而减轻pagecache的内存管理
3.pagecache以页为单位拿内存,通常一页是 8kb
套娃的过程
threadcache(主线1开始)
设计思路
前面的定长内存池:我们每次只能拿相同的内存,而threadcache有点像定长内存池的升级版,即threadcache可以有更多内存选择,这样的结构很像是一个hash桶,那就需要更多的freelist来管理了。
如果我们精细化到每个byte都给一个自由链表,那20w多个指针,这个内存也不可小视了,所以我们还需要更好的管理方法,下图就类似于每八个比特位划分一个链表,但是也依然还有 3w多个指针左右(最终选择也不是这样),但是已经大大减少了指针数量。
但是这样就可能出现内碎片,比如说24byte我们只用了20个byte,剩下的4个byte是没用的。
threadcache就是一个数组,里面存的是freelist。前面我们削减到了3w多个指针(3w个数组下标),现在我们再次进行削减。
threadcache哈希桶映射对齐规则
对齐规则:每块内存分配多大,这里我们根据不同的需求进行了对齐分配
如下图:
这是我们的对齐规则,1-128算是一种小需求所以精细化做好所以每8字节一块,128-1024 16字节一块,因为我们的需要的内存越大时,相对的如果还要要求精细化,那就会导致大量的freelist,这里我们越大对齐数就越大。虽然内碎片可能增多,但相对的减少了freelist。
浪费率
我们谈论129到1024的区间的浪费率,我们最多浪费15个字节,128 + 16 = 144,此时
15 / 144 = 10.42%。后面的区间也这样计算 最大也是 11.10.就说在我们使用的过程中最大的浪费也就是11%左右。第一区间 1 / 8 是12 .5%。但不可能每个情况都是最浪费的。所以最终的浪费是较少的
对齐映射函数的编写
//管理对齐和映射等关系
class SizeClass
{
public:
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes);
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes);
};
RoundUp:因为我们已经指定好内存大小了,所以RoundUp就是用来找到最近的能放下需要内存的桶。
Index:因为我们不是一个一个字节的桶,所以下标对应要做好,比如说第一个下标代表8byte的块,第二个下标代表16byte的块......
为什么要把成员函数设置为静态函数
非静态成员函数:类对象创建后,通过对象调用 ,但是函数是在代码区的,所以是编译时就有的。
静态成员函数:不需要创建类对象,只要指定类就能调用。
全是静态函数的类通常被称为工具类,里面不需要成员变量,只是封装了方法的类。
RoundUp实现
1.我们先划分区间:根据传入的字节数,选择不同的区间传入不同的对齐数
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
assert(false);
return -1;
}
}
可以看到我们里面写了一个子函数_RoundUp,代码复用,因为多个区间执行的逻辑是相同的
_RoundUp简单写法:
//一般写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
size_t alignSize = 0;
if (bytes%alignNum != 0)
{
alignSize = (bytes / alignNum + 1)*alignNum;
}
else
{
alignSize = bytes;
}
return alignSize;
}
alignsize:对齐后是指定区间的哪一个:比如说 17字节就是128字节区间中的第三个桶24字节的。
如果%对齐数 == 0说明这个 bytes就是一个桶的大小,!=0说明这个bytes要向上对齐。
进阶写法:
//位运算写法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1)&~(alignNum - 1));
}
使用比特位操作,这样的操作是明显更快的,但你说正常来想也是想不出来的,所以我们学习学习就好,对于上面的要验证就用一个数字带入验证即可。
Index实现
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes)
{
//每个区间有多少个自由链表
static size_t groupArray[4] = { 16, 56, 56, 56 };
if (bytes <= 128)
{
return _Index(bytes, 3);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 4) + groupArray[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];
}
else
{
assert(false);
return -1;
}
}
上述代码中groupArray是每一层有多少种内存块大小的个数。
传入的bytes 要减去 上一个区间的最大值,因为每一个区间有的个数不同,所以-前一个区间的最大值就是把前面区间跳过,可以从后面的 + groupArray看出。
后面的 3 4 7 10 13是步长。注意用步长的话要使用的是进阶写法:
//位运算写法
static inline size_t _Index(size_t bytes, size_t alignShift)
{
return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}
一样的进阶写法,大家学习学习,可以带入一个数字模拟一下,单纯想出来是很困难的。而我们
通常的写法如下:
//一般写法
static inline size_t _Index(size_t bytes, size_t alignNum)
{
size_t index = 0;
if (bytes%alignNum != 0)
{
index = bytes / alignNum;
}
else
{
index = bytes / alignNum - 1;
}
return index;
}
有余数时说明是大于对齐数的倍数的此时,找到的下标不用-1.
而没有余数,说明正好就是对齐数的倍数,下标-1.才是对的
注意:上面Index函数中传入的 3 4 7 10 13针对的是进阶写法,用于二进制的移位
而对于通常写法alignNum通常就是我们的每一层的对齐数:
ThreadCache类编写
//小于等于MAX_BYTES,就找thread cache申请
//大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由链表哈希桶的表大小
static const size_t NFREELISTS = 208;
相当于给一些特别的数字起了个别名,MAX_BYTES代表thread cache最大接收小 。NFREELISTS代表总共的freelist链表个数
申请对象
class ThreadCache
{
public:
//申请内存对象
void* Allocate(size_t size);
private:
FreeList _freeLists[NFREELISTS]; //哈希桶
};
下面是我们封装的两个函数,用于freelist的push和pop。
Nextobj在上面的定长内存池有实现:用void**和void*的联动指向下一个内存块。
//管理切分好的小对象的自由链表
class FreeList
{
public:
//将释放的对象头插到自由链表
void Push(void* obj)
{
assert(obj);
//头插
NextObj(obj) = _freeList;
_freeList = obj;
}
//从自由链表头部获取一个对象
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = NextObj(_freeList);
return obj;
}
private:
void* _freeList = nullptr; //自由链表
};
外面来了一个需求size,此时在threadcache中申请空间
//申请内存对象
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignSize);
}
}
有两种情况:1.如果我们threadcache这个大小的桶里有,那就直接拿走就好了
2.如果没有就要去centralcache中拿FetchFromCentralCache(index, alignSize);
FetchFromCentralCache(index, alignSize);在centralcache中获取内存(后面讲解)
threadcache无锁访问(不熟)
因为threadcache要求是每一个线程独有一个,如果我们把threadcache设置为一个全局变量,那全部线程都能访问就又要加锁了。
所以我们要实现每个线程只能访问自己的threadcache,TLS(thread local storage)线程局部存储,用TLS存储的变量,在他的线程中是可以全局访问,但是别的线程不可见。这样就完成了独立的threadcache。
//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
_declspec(thread): 是windows中的扩展关键字,显示指定后表示后面的变量每个线程独一份。
static把这个变量变成全局变量,其他线程不看见但是全局都可以用。即central cache可以访问每个线程的threadcache。
centralcache
在某个threadcache中没有内存可以给出时,这时就要找更上层的中间商:centralcache找他拿货(内存)。
centralcache的结构和threadcache的结构类似也是哈希桶的结构。二者的映射规则是一样的,所以说centralcache的桶的大小和threadcache一样,所以在映射时的规则是一样的。
centralcache和threadcache的不同点
1.threadcache是线程独享所以没有锁,但是centralcache是多进程共用所以是要上锁的,避免线程竞争问题。由下图看出我们的centralcache也是多个桶,所以上锁的规则就会有所改变,不是对整个centralcache上锁,而是对被使用的桶进行上锁。
2.可以看到虽然桶大小是一样的,但是centralcache 后面挂的是span (双链表),span里面挂的是一个个小块内存_freelist。threadcache就直接练的小块内存,直接是一个_freelist。
为什么要加一个span,而不是直接用小块内存
1.这就要说明pagecache里面,pagecache也是哈希桶结构单位是以页为单位的,里面存的是span,在centralcache需要内存时,根据需求从pagecache中获取页为单位的span,一页我们这里把看做8K即可。
centralcache结构设计
页号有多大
我们知道进程地址空间(地址的大小)在32位平台下,进程空间的大小为2的32次方为4G,但是有1G是分给内核的,剩下3G是用户空间,所以我们能有的地址就是2的32次方个。对于64位平台就是2的64次方个(这个量是很大的 int存不下)。一页是8k(2的13次方),所以32位的页号有
2的32次方 / 2的13次方 = 2的19次方,所以int就够了。64位同理
所以我们要区分32位和64位时页号的大小。
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
//linux
#endif
注意:win32下没有_win64的定义,但是_win64是 32 和 64 都有,所以不能让win32的定义在前面,win64也会进入win32的定义,而win32进入不了win64的。
span结构
//管理以页为单位的大块内存
struct Span
{
PAGE_ID _pageId = 0; //大块内存起始页的页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双链表结构
Span* _prev = nullptr;
size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
void* _freeList = nullptr; //切好的小块内存的自由链表
};
Span是pagecache和centralcache共用的。
Span我们设计成了双链表的结构,里面存储了_pageID,_n,_usecount,_freelist;
_pageid: 这里记录的是第一个page的id,因为我们分配span时可能是多个page(256k需要32page),所以不同的span里面的page是不同的。
_n: 记录了当前span中有多少page(用于page)
_useCount: 记录当前的span有多少小块内存被使用(用于小块内存)
_freelist: 表示每个span里的小块内存链表
为什么要使用双链表
因为span 在没使用时(_usecount == 0)是要还给 pagecache的,这时要把span从centralcache中删除的,如果是单链表那删除时就要遍历一遍找到前一个节点,如果是双链表直接删除即可(前一个节点和后一个节点连接)。
span的双链表结构
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head); //不能删除哨兵位的头结点
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
成员变量:_head:头指针 _mtx:因为centralcache是共用的所以要上锁
我们封装了两个接口:1.插入span,2.删除span。
span并不需要析构,因为从centralcache用完后,是要还给pagecache的。
centralcache的核心类结构
//单例模式
class CentralCache
{
public:
//提供一个全局访问点
static CentralCache* GetInstance()
{
return &_sInst;
}
private:
SpanList _spanLists[NFREELISTS];
private:
CentralCache() //构造函数私有
{}
CentralCache(const CentralCache&) = delete; //防拷贝
CentralCache& operator=(CentralCache&) = delete;
static CentralCache _sInst;
};
我们的centralcache应该是单例的,全部的threadcache都是找的同一个centralcache。
所以我们的构造函数私有,然后把拷贝函数全都删除,且对象的获取只能通过GetInstance()
因为我们的centralcache _sInst是一个私有的对象。不放到public 是保证了封装性。
这里的单例模式是饿汉模式:即在函数开始时就对单例进行创建。
CentralCache CentralCache::_sInst;
直接写在全局上就相当于我们声明过这个对象了,不声明的话是不会生成的。
慢开始反馈算法
threadcache要内存时,我们是固定给呢?还是怎么给。如果固定给不可避免一个问题:如果我们固定每次获取10个,10个256kb就是2M,这还说的过去,但是如果是要8Byte呢?你也给10个,80Byte太少了,万一我需要更多呢?所以现在设计一个算法对大需求和小需求进行区分
//管理对齐和映射等关系
class SizeClass
{
public:
//thread cache一次从central cache获取对象的上限
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
//对象越小,计算出的上限越高
//对象越大,计算出的上限越低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
};
我们num首先根据大小分配(MAX_BYTES == 256KB),如果超过上限或低于下限,此时就用限制的个数,当然这是不够的,因为这样我们还是固定住了数量,当8byte的需求只有10个时你给512个也是有点多的
所以就有了慢启动算法(类似网络拥塞控制)
//管理切分好的小对象的自由链表
class FreeList
{
public:
size_t& MaxSize()
{
return _maxSize;
}
private:
void* _freeList = nullptr; //自由链表
size_t _maxSize = 1;
};
Freelist 这个结构里我们要有一个自由链表,还要一个_maxsize记录,申请空间时,我们会对比Nummovesize的值和_maxsize的值,二者取min,此时_maxsize++,下次就多给,直到有一次_maxsize 大于 Nummovesize后使用的才是固定的。侧面的写了,一个大小很少被使用时就少给,被多次使用时就要多给。_maxsize会越来越大。
从centralcache中获取对象
当我们的Maxsize == 1时,那就是返回一个,但如果要一次弹出很多时,那就是弹出一串 。这个怎么理解看下面的代码
//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始反馈调节算法
//1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
//2、如果你不断有size大小的内存需求,那么batchNum就会不断增长,直到上限
size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (batchNum == _freeLists[index].MaxSize())
{
_freeLists[index].MaxSize() += 1;
}
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum >= 1); //至少有一个
if (actualNum == 1) //申请到对象的个数是一个,则直接将这一个对象返回即可
{
assert(start == end);
return start;
}
else //申请到对象的个数是多个,还需要将剩下的对象挂到thread cache中对应的哈希桶中
{
_freeLists[index].PushRange(NextObj(start), end);
return start;
}
}
batchnum:是本次要的个数。
如果batchnum == Maxsize 此时就让maxsize+1;(慢启动)
定义两个指针 start end,用来表示获得的一串小块内存的头和尾(也有可能是一个)
就像我们可能获取到三个我们就需要他的头尾,用于等一会把这一串插入threadcache中,因为插入时有头尾更方便。
FetchRangeObj()获取一串小块内存从centralcache中
//从central cache获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock(); //加锁
//在对应哈希桶中获取一个非空的span
Span* span = GetOneSpan(_spanLists[index], size);
assert(span); //span不为空
assert(span->_freeList); //span当中的自由链表也不为空
//从span中获取n个对象
//如果不够n个,有多少拿多少
start = span->_freeList;
end = span->_freeList;
size_t actualNum = 1;
while (NextObj(end)&&n - 1)
{
end = NextObj(end);
actualNum++;
n--;
}
span->_freeList = NextObj(end); //取完后剩下的对象继续放到自由链表
NextObj(end) = nullptr; //取出的一段链表的表尾置空
span->_useCount += actualNum; //更新被分配给thread cache的计数
_spanLists[index]._mtx.unlock(); //解锁
return actualNum;
}
我们之前说了centralcache是多线程共享,所以在从中取内存时要上锁。
1.index确定我们要取多大内存的桶。
2.Getonespan是从centralcache中获取一个非空的span。
3.我们获取小块内存时有两种情况:
- 小块内存足够 n 个。(那就取出n个)
- 小块内存不够 n 个。(不够n个那就有多少拿多少)
所以while循环有两种条件。判空和 n - 1.n - 1是因为我们的start 和 end本身就指向了一个小块内存(_freelist),然后只用向后走n - 1步就足够了。
因为我们的实际传出去actualnum可能和n不一样,所以不是直接用n
4.最后把这一串小块内存从_freelist中拔下来。同时usecount记录好。解锁
tips:如果我们取了比n少的个数有影响吗?当然没有,我们一次取多个只是为了后续减少threadcache向centralcache申请的次数。给出一个 threadcache也足够这一次请求。
PushRange(),将取出的小串内存放入freelist中
//管理切分好的小对象的自由链表
class FreeList
{
public:
//插入一段范围的对象到自由链表
void PushRange(void* start, void* end)
{
assert(start);
assert(end);
//头插
NextObj(end) = _freeList;
_freeList = start;
}
private:
void* _freeList = nullptr; //自由链表
size_t _maxSize = 1;
};
上面我们取出了一串内存,此时要做的就是插入这一串。(头插)
我们这一串是添加到 freelist上的,所以写在Freelist这个类里,对这个类的对象进行操作。
pagecache
我们的threadcache和centralcache和pagecache 三者其实都是hash桶结构,threadcache存的是_freelist小块内存,centralcache 存的是 span,但还连接了切好的_freelist(小块内存),而我们的pagecache 里面只有span ,这也就说明了centralcache 是中间缓存的原因。
pagecache只管以页为单位的span,至于centralcache的功能就是获取这个span,并且根据需求将span切分。这是由centralcache决定,pagecache不受影响。(有点类似经销商的感觉)
这里的page大小是不限的,128page就是 128page * 8k = 4 * 256kb。我们实现的是最大是这么大,你也可以把大小加大。因为下标对应的关系所以我们可以直接创建129个桶但是0号桶不用。
//page cache中哈希桶的个数
static const size_t NPAGES = 129;
pagecache怎么给centralcache分配span
1.最开始我们的pagecache中应该也是什么都没有的,此时有一个120page的申请,首先会先到120page这个桶里面找,目前是没有的,但 不是直接向系统拿120page,而是往上找。
2.往上找后,发现都没有,那这时就会直接申请一个128page的最大span,然后从这个128page中切出120page的大小给到120page的桶,剩下的8page给到8page的桶。
3.如果我们向上找时发现某一个桶有span那就直接用这个span,把它切分放到不同的桶里即可。
pagecache的实现
1.我们的pagecache也需要时线程安全的,之前我们的centralcache是每个桶都有一个锁,因为每个桶只用考虑怎么分配下去即可。而pagecache的锁是整体的锁不是桶锁,因为pagecache在分内存时会涉及到span切分合并。所以只是桶锁的话不安全
2.如果真的是桶锁的话效率也不高,因为我们切分时要不断地进行加锁解锁,反而还拖慢了节奏。
3.threadcache去centralcache里拿内存时因为centralcache里的对齐规则和threadcache是对应的。并且从pagecache拉取span时会把小块内存切分好,所以用桶锁。
和centralcache一样pagecache也是单例模式
//单例模式
class PageCache
{
public:
//提供一个全局访问点
static PageCache* GetInstance()
{
return &_sInst;
}
private:
SpanList _spanLists[NPAGES];
std::mutex _pageMtx; //大锁
private:
PageCache() //构造函数私有
{}
PageCache(const PageCache&) = delete; //防拷贝
static PageCache _sInst;
};
PageCache PageCache::_sInst;
pagecache中获取Span
获取一个非空的span(centralcache->threadcache)
threadcache申请小块内存时当然是找到对应桶里 一个还有小块内存的span。
用的方法是最古朴的遍历,找到非空就像上面一样弹出一串小块内存即可。但是如果我们没有非空的span呢?就是所有span全被使用干净或者这个桶上就没有span
此时就需要去pagecache中获取了,那一次获取多少个page呢?
//管理对齐和映射等关系
class SizeClass
{
public:
//central cache一次向page cache获取多少页
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size); //计算出thread cache一次向central cache申请对象的个数上限
size_t nPage = num*size; //num个size大小的对象所需的字节数
nPage >>= PAGE_SHIFT; //将字节数转换为页数
if (nPage == 0) //至少给一页
nPage = 1;
return nPage;
}
};
之前Nummovesize是用来控制 threadcache 向 centralcache获取大小的,这里我们根据centralcache需要的size * num 就可以知道要 num个size大小的小块内存的总共大小为 num *size
然后page的单位是8k ,PAGE_SHIFT是13,比特位右移13位就是除以8k。所以最终算出num * size 需要的page数量。但有可能计算出0(num * size < 8k),所以最小也要给1page。
Getonespan()获取一个非空span (代码实现)
上面的描述 是对整体运作的描述下面是代码:
//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
//1、先在spanList中寻找非空的span
Span* it = spanList.Begin();
while (it != spanList.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
//2、spanList中没有非空的span,只能向page cache申请
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
//计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
//把大块内存切成size大小的对象链接起来
char* end = start + bytes;
//先切一块下来去做尾,方便尾插
span->_freeList = start;
start += size;
void* tail = span->_freeList;
//尾插
while (start < end)
{
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
}
NextObj(tail) = nullptr; //尾的指向置空
//将切好的span头插到spanList
spanList.PushFront(span);
return span;
}
1.遍历spanlist 如果找到非空span则直接返回
2.找不到就去pagecache中拿 NewSpan()
3.拿到以后这个span是需要切分为小块内存的。char*是一字节移动的,所以用char*
4.char* start:(char*)(span->_pageId << PAGE_SHIFT;后面的代码是为了找到start的地址即span开始的地址。记住左移<<page_shift就是乘以8k,右移就是除以8k,PAGE_SHIFT是我们在上面定义的13。 2的13次方是8k,(至于pageid 怎么得出的后面会有解释)
begin() 和 end() 不用说就是span双链表的头和尾。
//带头双向循环链表
class SpanList
{
public:
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
Newspan()获取k页的span
上面在我们遍历spanlist 中会出现两种情况:1.找到后直接返回,2.没有空闲的span
当没有空闲的span后,这时就需要我们去pagecache 中申请对应size 的 k页span了。对于1.情况我们是要从spanlist中pop出去一串小块内存。而2.情况,则是有可能为空,也有可能是没有空闲的
所以下面实现了判空和pop一串小块内存的逻辑代码
//带头双向循环链表
class SpanList
{
public:
bool Empty()
{
return _head == _head->_next;
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
对于pagecache我们上面介绍说,如果需求k页的桶里并没有span,那就向后面大的找,找到后就把大的切成k页span和另一个的span,把k页的给出去,把剩下的另一个span连到对应的桶下面即可,一瞬间就感觉我们的程序有生命力了。
如果后面也没有找到更大的span,那就需要找系统要了,这时就可以调用封装的systemAlloc,从系统中获取128页的span。
当我们申请下来时,我们的systemAlloc 返回的是这块内存的首地址。此时我们就可以把这个地址转换为一个页号pageid,我们一页是8k,所以让地址除以8k就可以代表页号了。上面我们获取span地址其实就是让页号乘以8k。
下面给出 Newspan()的实现代码
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
return _spanLists[k].PopFront();
}
//检查一下后面的桶里面有没有span,如果有可以将其进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
//将剩下的挂到对应映射的位置
_spanLists[nSpan->_n].PushFront(nSpan);
return kSpan;
}
}
//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
Span* bigSpan = new Span;
//之前封装的系统调用
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//尽量避免代码重复,递归调用自己
return NewSpan(k);
}
和我们说的逻辑一样
1.先判断k是否在可能范围内,不能小于1,且不能超过128页。
2.然后判断k页的桶是否有剩余。有就直接取出一个返回即可
3.k页的桶没有剩余,那就去后面的桶(k+1开始)找,如果找到就执行分块逻辑,把k页切出去,剩下的部分放到对应的桶后面
4.如果前三步都没返回,就说明现在后面根本就没有span了,那就向系统申请一个最大的128页的page,记住地址转pageid的逻辑。然后把这块内存放到128页的桶上
5.此时可以直接切出返回,但是逻辑和上面的一样所以可能出现代码冗余,所以我们可以采用递归的写法,因为我们现在有128页的span,所以在上面的代码就一定返回,不会导致死递归。
在向pagecache申请span时centralcache做什么?
还记得吗?我们调用centralcache是用的桶锁,那在我们去pagecache申请span这段期间centralcache是不动的,为了更大效率的提高,此时我们应该让centralcache继续工作起来,榨干员工的价值(qaq)
1.我们centralcache在一个桶里实际做的就两件事,分出span给threadcache,从threadcache中回收span。这时我们在进去pagecache时把centralcache的桶锁给解开,那需要回收的逻辑就能执行就不会卡住,分出span的逻辑会因为pagecache里面的锁而卡在newspan那里。所以是线程安全的。
2.那在newpan结束后,我们的工作就是先把桶锁继续加上,再把pagecache的大锁给打开。
下面的代码是我们要加锁解锁的部位在newspan时
spanList._mtx.unlock(); //解桶锁
PageCache::GetInstance()->_pageMtx.lock(); //加大锁
//从page cache申请k页的span
PageCache::GetInstance()->_pageMtx.unlock(); //解大锁
//进行span的切分...
spanList._mtx.lock(); //加桶锁
//将span挂到central cache对应的哈希桶
注意:这里是用到了两把锁:1.centralcache 的桶锁,2.pagecache的全局锁。
申请内存的联调(主线1完成,现在是使用阶段)
ConcurrentAlloc函数
这个名字就是并发调用,上面的三个主类threadcache和centralcache和pagecache已经完结。所以现在是来需求的时候。
static void* ConcurrentAlloc(size_t size)
{
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
return pTLSThreadCache->Allocate(size);
}
pTLSThreadCache还有印象吗?threadcache是每个线程各一份。但它要放在全局上使用,但放到全局会导致全部线程只能用一个threadcache,然后就有一个关键字
//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
min函数出错
我们上面执行逻辑后会发现,min函数总是报错(在慢启动时用了min),在windows中自己封装了一个min是用宏的,但是我们在使用时用的是std::min。即标准库中的min,这个min是一个模版函数。两个min会报错,因为不知道调用谁,所以我们把std去掉,用windows中的即可。这也体现了没有命名空间指向不清的问题。
联调测试1
我们现在单线程下看看,这堆代码是否符合我们的逻辑,然后主要观察是单桶的逻辑调用是否出错
void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);
我们申请三个在一个桶的内存。
1.创建PTLSThreadCache对象
之后就执行到了threadcache对象的Allocate函数(取小块内存)。
目前我们第一次进入时threadcache中是没有小块内存的,所以就去centralcache中获取。但是还记得我们的size是需要对齐的,所以 1 对齐到了 8 。然后根据size找到index下标即这个桶的下标(16 56 56 56,每个区间的桶数量)。
FetchfromCentralCache的逻辑是很长的我们分段讨论:
1.慢启动算法
我们的freelist对象中有一个Maxsize,以及Nummovesize 是给对应的size找到上下限的。
最开始我们的桶只会拿一个,慢慢地用的越多 += 1越多,拿的就越多,直到 == Nummovesize后,就不增加了。
2.从centralcache中获取batchNum个小块内存
我们算出下标后给对应的桶加锁。
然后我们就要取对应的桶中找到一个空闲的span
我们遍历spanlist 即对应的桶如果没找到,接下来就是给桶锁解锁,然后取pagecache中获取k页的span
3.从pagecache中获取k页page的span (newspan())
我们现在pagecache中没有span,所以会直接申请128页的page,然后切出k页(这里是1页就够了)所以会切成127页和1页的span,然后1页的给出,127页的会挂到127页的桶上
可以看到我们现在是申请了一个 128页的page _n == 128.
然后我们发现我们的_pageId == 1648,此时我们可以用1648 * 8k。然后就可以算出地址。因为ptr是16进制所以要对应
因为是递归函数,所以后面又会回到上面的代码,然后把128页给切割为 127页和 1页。
之后kspan就被返回了,然后外界就要进行锁的操作
4.回到centralcache对小块内存进行切分
1.页号是 地址除以8k的结果,所以要找到地址就用页号即可,然后感觉页的数量,1页是8k,算出这个span占了多大的内存(bytes)。
2.算出span的内存后,我们知道内存的申请是连续的,如果没有足够的连续内存是无法申请的。然后我们从首位置向后移动span大小,就是这块span的尾。
确定好首尾以后我们就可以进行切分了,从start开始,我们每次切出一块小块内存,然后把其放入_freelist中(尾插),最终切到start 和 end相交后。 可以看到我们的前八个字节指向的就是下一块
为什么要用尾插?因为我们分配下来地址是有顺序的,尾插使顺序和拉取下来时的地址一致,方便我们之后返还给pagecache 。
在我们切分好以后,剩下的就是把这个span给到当前的0号桶,这个操作也是原子的所以要加锁。
然后我们就要返回这个span给到上一个调用函数,然后把threadcache需要的1个8字节块给到threadcache,然后在这个切出一块的操作结束后,整体在centralcache的动作就结束了这时候把桶锁解开。
继续向上返回,到了threadcache里,根据actualnum 决定调用插入一个还是插入一串,这里是插入一个逻辑所以直接返回start因为是直接使用的,如果是一串则把头给出即可,剩下的放到threadcache对应的桶里。
此时第一次的操作结束:
第二次申请同一个桶的小块内存时因为慢启动算法的原因第一次只拿了一个,我们第二次还是要从centralcache里面拿,但是这次要拿两个,拿出两个以后,用了一个,现在threadcache中还剩一个
第三次申请同一个桶的小块内存时因为我们第二次要了两个,所以这次不用去centralcache中拿,直接从threadcache中获取即可。
所以,我们同一个桶用的越多,它进入centralcache和pagecache的次数就会越少。慢启动。越到后面给的越多。
第二次联调测试
我们知道第一次联调测试我们centralcache中获取的span是一页大小的,那对应就是8k,以8字节切分所以它就会有1024个小块(换算,8k实际是8192字节)。所以我们这次申请1025次8字节的桶,看看会不会继续向pagecache中获取
for (size_t i = 0; i < 1024; i++)
{
void* p1 = ConcurrentAlloc(6);
}
void* p2 = ConcurrentAlloc(6);
最终在GetOneSpan时,发现我们的_it 的next 和 prev 已经相同了说明现在只剩下头指针了。所以就会走到 newspan里面去
发现这次对127进行了切分,1的kspan被分了出去,剩下的126的nspan继续挂在126span的桶上。
所以我们的逻辑是没错的。
内存回收(主线2开始)
我们知道内存是非常稀缺的资源,如果我们在使用完后不对其进行回收,那就会导致浪费。就像这里的threadcache,使用完后如果一直挂着不还给centralcache,那pagecache在每次128页使用完就会申请,那最后如果申请的多了,那一个桶里面的小块内存是冗余的,所以在没有使用时要把小块内存回收。
threadcache内存回收
Dealloc函数
我们知道,随着使用,我们释放的内存是回到freelist中的,而不是大块内存。所以freelist会越来越长。所以我们可以根据freelist的长度给threadcache设计一个归还函数。
代码如下:
//释放内存对象
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
//找出对应的自由链表桶将对象插入
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//当自由链表长度大于一次批量申请的对象个数时就开始还一段list给central cache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
1.我们是直接把用完的小块内存先回到freelist。
2.我们归还的逻辑是:我们根据上一次跟centralcache拿了几个作对比,如果上次要的个数已经全部用完了,此时就可以把这一串小块内存还给centralcache。
ListTooLong就是归还函数:
//释放对象导致链表过长,回收内存到中心缓存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
//从list中取出一次批量个数的对象
list.PopRange(start, end, list.MaxSize());
//将取出的对象还给central cache中对应的span
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
1.我们先把这一串内存从Freelist中弹出Maxsize个小块内存。
2.我们这里的start和end是输入输出函数,即传入的变量更改后可以拿出来。下面的代码是Freelist的最终实现:我们既需要单个的push和pop也要一串的push和pop
//管理切分好的小对象的自由链表
class FreeList
{
public:
//将释放的对象头插到自由链表
void Push(void* obj)
{
assert(obj);
//头插
NextObj(obj) = _freeList;
_freeList = obj;
_size++;
}
//从自由链表头部获取一个对象
void* Pop()
{
assert(_freeList);
//头删
void* obj = _freeList;
_freeList = NextObj(_freeList);
_size--;
return obj;
}
//插入一段范围的对象到自由链表
void PushRange(void* start, void* end, size_t n)
{
assert(start);
assert(end);
//头插
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
//从自由链表获取一段范围的对象
void PopRange(void*& start, void*& end, size_t n)
{
assert(n <= _size);
//头删
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1;i++)
{
end = NextObj(end);
}
_freeList = NextObj(end); //自由链表指向end的下一个对象
NextObj(end) = nullptr; //取出的一段链表的表尾置空
_size -= n;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr; //自由链表
size_t _maxSize = 1;
size_t _size = 0;
};
我们需要_maxsize用来统计这次给出的小块内存,_size表示的是threadcache中对应桶的小块内存数量
3.我们拿到start和end后,就要把这串还给centralcache了。
我们的逻辑是把小块内存全还给centralcache,为什么我们还要设计按个数pop和push
这其实提高了代码的可修改性,我们的归还思路不一定是根据maxsize归还。我们还可以设计出其他的逻辑,这种情况下弹出指定数量的pop。所以这个设计就是满足不同场景的需求的。毕竟同一个逻辑不同的厂家设计出来的也可能是不同的,根据需求设计(设计软件的目的就是为了利益的)
centralcache的内存回收
threadcache根据maxsize全归还给centralcache后,centralcache是把这一串小内存块,放回对应的span下。
我们的小块内存的归还是不确定的,因为使用过程是不同的。所以当我们有很多span时,我们还要考虑这个小块内存属于哪个span。记住:span是一块以页申请下来的连续的内存。
怎么知道小块内存对应的页号?
因为我们这一串小块内存也是有地址的,我们的小块内存的地址 / 8k就是页号pageid。为什么呢?我们知道计算机的除法是不管余数的。
所以假设我们现在有一页span 页号是 1,然后这个页号 * 8k,就是 8192就是这个页号的地址,假设我们的小块内存是2k一个。我们第一块的地址就是8192,第二块的地址 8192 + 2k。然后第二块地址/ 8k,还是8192,所以就是一个页。
怎么知道对应的页号属于哪个span?
我们知道一个span可能有多页。那我们怎么找到对应的span呢?
我们不知道这个页号是哪个span拔下来的,如果有很多span,要确定一个那就需要把全部存在的span都遍历一遍然后来找,那这个效率可想而知
说到查找效率,我们就应该想到哈希表,因为键值对的存储,查找o1的复杂度。快的不行~
因为我们的页和span产生的地方是pagecache所以在产生时就把对应关系打上。
//单例模式
class PageCache
{
public:
//获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
private:
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
};
然后在newspan时 我们根据地址 / 8k的计算出对应的页号,所以我们要在newspan就建立好映射关系。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查一下后面的桶里面有没有span,如果有可以将其进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
//将剩下的挂到对应映射的位置
_spanLists[nSpan->_n].PushFront(nSpan);
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//尽量避免代码重复,递归调用自己
return NewSpan(k);
}
然后我们建立 pageid和 span的联系应该是在分出span给centralcache时,根据span的页数,把每一页都和span建立联系(for循环)。
所以现在我们的页号和span有联系了,所以在threadcache返回时就可以先求出小块内存的页号,然后返回给对应的span了。
//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
上述函数就是给小块内存找到对应span的。这个过程是on的(find),n是哈希中元素的个数。
这个逻辑是一定能找到的,如果没有找到就说明之前的对应关系就错了。
centralcache回收内存函数
//将一定数量的对象还给对应的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock(); //加锁
while (start)
{
void* next = NextObj(start); //记录下一个
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//将对象头插到span的自由链表
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--; //更新被分配给thread cache的计数
if (span->_useCount == 0) //说明这个span分配出去的对象全部都回来了
{
//此时这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并
_spanLists[index].Erase(span);
span->_freeList = nullptr; //自由链表置空
span->_next = nullptr;
span->_prev = nullptr;
//释放span给page cache时,使用page cache的锁就可以了,这时把桶锁解掉
_spanLists[index]._mtx.unlock(); //解桶锁
PageCache::GetInstance()->_pageMtx.lock(); //加大锁
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock(); //解大锁
_spanLists[index]._mtx.lock(); //加桶锁
}
start = next;
}
_spanLists[index]._mtx.unlock(); //解锁
}
1.主逻辑:我们先确定threadcache返回的size是哪个桶的。
2.然后传入的start是这一串的小块内存,遍历根据页号对应关系把对应的小块内存放给对应的span。
3.如果某个span的_usecount == 0这时就要还给pagecache了,但是和之前一样在调用pagecache时centralcache可以解锁,然后给pagecache的大锁加上。
4.直到全部的小块内存都归还好了,centralcache才可以解锁(原子操作)。中间使用pagecache时也解锁。
pagecache内存回收
这一步有这个项目的精华:解决内存碎片问题,我们在pagecache回收时,并不是简单的将span挂到对应的桶上,而是有可能进行合并的。(从前面看出来span包含的页越大越好,因为可以分块)。
pagecache中的前后页合并
我们先知道合并的逻辑:我们申请下来的内存是连续的所以合并时也是要连续的。所以只能连续页进行合并,我们第一次申请是128页的span,之后可能被切成了很多份,这些不同的块返还的时间也不一样,申请的大小可能也不一样。
我们在一个页归还时执行合并逻辑,先看看前面的页是否空闲(span是否空闲),即这个span是否已经还给了pagecache。后面的页也是同理。
那我们怎么判断一个span是否在被使用呢?
1.错误的想法:_usecount == 0时说明这个块已经没人使用,那在我们刚申请下来span时_usecount == 0,在他还没分给 centralcache时。这就导致我们程序在切分span时如果这时候合并逻辑也在进行,就会把这个切分好的块就又被合并了。
2.我们再增加一个变量_isUse.用来判断这个span是否被使用(即是否挂在pagecache上)。
//管理以页为单位的大块内存
struct Span
{
PAGE_ID _pageId = 0; //大块内存起始页的页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双链表结构
Span* _prev = nullptr;
size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
void* _freeList = nullptr; //切好的小块内存的自由链表
bool _isUse = false; //是否在被使用
};
所以在centralcache 申请span时把 _isUse置为true。_usecount == 0时 在变成false。
我们知道之前在我们把K页的span切出去时,我们把这K页都和span做了映射关系。那对于切剩下的n页的span,我们是没有建立关系的。
而我们合并时需要知道前一页或者后一页的span是否空闲,此时我们是找不到对应的span的,因为我们知道页号,但是如果这个页号是尾页号,我们的span中存的是首页号,我们就不知道是哪个span。
因为我们合并时是前后一页合并,所以我们只用知道前一页和后一页的span的情况,所以不用全部页都建立联系,首尾页建立好联系后,之后合并只要知道首尾其中一个页号,根据span中的_n变量知道有几页就可以合并了。
所以我们还要建立挂在pagecache的映射关系:
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查一下后面的桶里面有没有span,如果有可以将其进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
//将剩下的挂到对应映射的位置
_spanLists[nSpan->_n].PushFront(nSpan);
//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//尽量避免代码重复,递归调用自己
return NewSpan(k);
}
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
相当于就是在newspan中加了两句代码。
之后就是我们合并逻辑的代码:
//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//对span的前后页,尝试进行合并,缓解内存碎片问题
//1、向前合并
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
//前面的页号没有(还未向系统申请),停止向前合并
if (ret == _idSpanMap.end())
{
break;
}
//前面的页号对应的span正在被使用,停止向前合并
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)
{
break;
}
//合并出超过128页的span无法进行管理,停止向前合并
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
//进行向前合并
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
//将prevSpan从对应的双链表中移除
_spanLists[prevSpan->_n].Erase(prevSpan);
delete prevSpan;
}
//2、向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
//后面的页号没有(还未向系统申请),停止向后合并
if (ret == _idSpanMap.end())
{
break;
}
//后面的页号对应的span正在被使用,停止向后合并
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
//合并出超过128页的span无法进行管理,停止向后合并
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
//进行向后合并
span->_n += nextSpan->_n;
//将nextSpan从对应的双链表中移除
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
//将合并后的span挂到对应的双链表当中
_spanLists[span->_n].PushFront(span);
//建立该span与其首尾页的映射
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
//将该span设置为未被使用的状态
span->_isUse = false;
}
两个主逻辑:向前合并和向后合并
1.三个判断:
- 第一个判断前一个页得有映射关系,如果没有就说明前面的这个页号还没申请过。
- 第二个判断就是判断当前的span是否被使用
- 第三个判断如果前面一块span + 我当前要合并的span 的 页数大于128,那管理不了就不管。
2.如果三个判断都过了,那就通过页号找到了空闲的span(挂在pagecache里的)。
3.合并逻辑:把span的n加到当前合并的块上,然后对应的被合并的块delete,把新的大块加到对应的桶中。
4.合并结束后,记住在pagecache里span和页的映射关系。
pagecache 中页号对应 和 centralcache 中页号对应
我们看到pagecache只需要前后两个页,而centralcache需要全部页。
原因很简单:
我们的centralcache 中的span是要切成很多小块内存的,threadcache归还时,小块内存的地址 / 8k可以算出对应的页号,此时用这个页号来判断在哪个span,从而规划小块内存centralcache。
而我们pagecache中的页关系的对应主要是为了span之间的合并,而我们合并时并不需要知道每一个页号,我们只想知道前后两页对应的span是否空闲,空闲就把span合并,所以只要前后。
释放内存过程联调 (主线2结束)
ConcurrentFree函数
三个cache的释放逻辑也结束了,和申请一样,我们也提供一个ConcurrentFree函数。
记住了,我们不管申请还是释放都是一种类似套娃的过程,都是先调用threadcache,在threadcache内部它会去调用centralcache和pagecache。
static void ConcurrentFree(void* ptr, size_t size/*暂时*/)
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
上面的size其实是不用传的,因为用户指定传入删除大小有点麻烦,所以我们等会内置方法解决。
联调测试
void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);
ConcurrentFree(p1, 6);
ConcurrentFree(p2, 8);
ConcurrentFree(p3, 1);
之前申请流程已经看过了,现在来看释放流程。
上面是前两次的释放过程,因为我们前两次归还size最多只会到2,而没有大于等于 MAXSIZE,所以前两次都是把小块内存挂到threadcache的0号桶上就结束了。
第三次时0号桶的size应该就变成3了,此时会归还小块内存给centralcache
我们在listtoolong中我们调用poprange(获得小块内存的头尾,方便插入span)
接下来就是调用归还逻辑把这一串小块内存还给centralcache。
因为poprange获取了start,然后我们通过遍历start到空,就能获得这一串小块内存。然后调用pagecache中的MapObjectToSpan获取对应的span,此时span的_useCount--(后续==0要进行回收)。
最终_useCount == 0,此时归还内存给pagecache。
还记得使用pagecache时centralcache解锁,给pagecache的大锁加上。然后调用relesespan的函数。
向前找没找到:因为我们切分时的逻辑是从前切,剩下的保存。
我们向后找找到之前127页的span,此时让二者的 _n相加就变成了128页,然后把这个span挂到128页的桶上,之前的127页的就可以删掉了。
之后就是把合并好的内存挂好,然后建立首尾页映射span的关系。并且要把_isUse置为false,不然这块内存就一直都使用不了了。
pagecache使用完后,就把大锁解开,把桶锁继续挂上,因为可能还有span要归还。
最后桶中操作结束就把桶锁解开即可。
大于256KB的内存申请
前面我们的逻辑:不管多大都向threadcache申请,这就会出现一个问题,我们threadcache最大就256KB的桶,256KB == 32页,那如果有更大的内存需求怎么办呢?
我们的pagecache最大是128页,所以在128页之内的大于32页的内存申请,可以直接向pagecache申请,此时这块内存就不用交给centralcache和threadcache了,而是直接使用,使用结束后,继续回到pagecache。
当申请内存大于128页时,就要回归堆取内存的过程了。
RoundUp函数更改
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t bytes)
{
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
//大于256KB的按页对齐
return _RoundUp(bytes, 1 << PAGE_SHIFT);
}
}
我们之前对于else是直接断言退出的,考虑到大于256KB的使用,我们看到里面1<<PAGE_SHIFT其实就是8K的意思:因为我们大于256KB的申请它的向上对齐就是向上对齐1页了。
PAGE_SHIFT:还是通过比特位的移动 乘以或除以8K。
concurrentAlloc函数的更改
static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_BYTES) //大于256KB的内存申请
{
//计算出对齐后需要申请的页数
size_t alignSize = SizeClass::RoundUp(size);
size_t kPage = alignSize >> PAGE_SHIFT;
//向page cache申请kPage页的span
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kPage);
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
1.现在我们需要根据大小进行判断了,之前是直接调用threadcache,这里就需要直接去pagecache中调用了,roundup算出向上对齐数(大小),右移 page_shift算出页数
2.大于256KB直接调用NewSpan()即可,但要记得调用newspan是需要上锁的。
3.我们的newspan返回值是span*,所以我们可以通过span找到页号,然后利用页号 * 8k == 地址,找到这大块内存的起始地址。
newspan()函数更改
我们对于小于等于128页的内存,pagecache还能支持,但是大于128页的就需要更改成去堆中申请了可以直接调用之前的SystemAlloc()函数。
//获取一个k页的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
if (k > NPAGES - 1) //大于128页直接找堆申请
{
void* ptr = SystemAlloc(k);
Span* span = new Span;
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//建立页号与span之间的映射
_idSpanMap[span->_pageId] = span;
return span;
}
//先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
//检查一下后面的桶里面有没有span,如果有可以将其进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
//在nSpan的头部切k页下来
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
//将剩下的挂到对应映射的位置
_spanLists[nSpan->_n].PushFront(nSpan);
//存储nSpan的首尾页号与nSpan之间的映射,方便page cache合并span时进行前后页的查找
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
//建立页号与span的映射,方便central cache回收小块内存时查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//走到这里说明后面没有大页的span了,这时就向堆申请一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
//尽量避免代码重复,递归调用自己
return NewSpan(k);
}
其实就是多加了一个判断,看看申请的内存是否大于128页。大于就调用SystemAlloc(页数)
1.我们还是创建了一个span,虽然这个span不用插入任何桶,但是可以用span和页号的对应关系把页号给保存好,可以用于后期的大块内存回归返还内存。
大于256KB内存的释放
和申请一样是分三种情况的,所以释放函数也要整体修改一遍。
ConcurrentFree()函数修改
static void ConcurrentFree(void* ptr, size_t size)
{
if (size > MAX_BYTES) //大于256KB的内存释放
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
和Alloc一样,小于256KB的内存还是通过之前走的threadcache那一套回收。大于256KB的直接去pagecache中的release函数中。
因为pagecache的合并是span的合并所以要先通过块内存的地址(转换为页号)找到对应的span。
ReleaseSpanToPageCache()函数更改
//释放空闲的span回到PageCache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > NPAGES - 1) //大于128页直接释放给堆
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
delete span;
return;
}
//对span的前后页,尝试进行合并,缓解内存碎片问题
//1、向前合并
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
//前面的页号没有(还未向系统申请),停止向前合并
if (ret == _idSpanMap.end())
{
break;
}
//前面的页号对应的span正在被使用,停止向前合并
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)
{
break;
}
//合并出超过128页的span无法进行管理,停止向前合并
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
//进行向前合并
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
//将prevSpan从对应的双链表中移除
_spanLists[prevSpan->_n].Erase(prevSpan);
delete prevSpan;
}
//2、向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
//后面的页号没有(还未向系统申请),停止向后合并
if (ret == _idSpanMap.end())
{
break;
}
//后面的页号对应的span正在被使用,停止向后合并
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
//合并出超过128页的span无法进行管理,停止向后合并
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
//进行向后合并
span->_n += nextSpan->_n;
//将nextSpan从对应的双链表中移除
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
//将合并后的span挂到对应的双链表当中
_spanLists[span->_n].PushFront(span);
//建立该span与其首尾页的映射
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
//将该span设置为未被使用的状态
span->_isUse = false;
}
其实逻辑没什么大变化:只是在大于128页时要特判,选择直接归还给内存。
SystemFree()函数
//直接将内存还给堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
//linux下sbrk unmmap等
#endif
}
和Alloc一样就是把不同系统的申请空间和释放空间的函数根据条件编译进行封装,代码就可以进行复用了。
256KB以上内存的申请释放测试
//找page cache申请
void* p1 = ConcurrentAlloc(257 * 1024); //257KB
ConcurrentFree(p1, 257 * 1024);
//找堆申请
void* p2 = ConcurrentAlloc(129 * 8 * 1024); //129页
ConcurrentFree(p2, 129 * 8 * 1024);
联调测试
申请257KB时,我们直接调用NewSpan()函数进行申请,可以看到申请页数为33页
在是否内存时也是直接调用的pagecache的释放函数,没有走threadcache。
对于大于128页的内存,我们直接调用了SystemAlloc和Free
最终没有走我们高并发内存这一套直接向堆申请的空间。
定长内存池升级项目(强强联手)
前面不是写了定长内存池吗?这个内存池的缺点就是只能申请相同的内存,但是和我们的高并发内存打配合就是强强联手,因为我们发现只要是小于等于128页的内存我们只要pagecache有就不会去堆上申请。每次申请是因为不够然后从内存中只申请128页内存。
那我们的定长内存池就能派上用场了,我们定长内存池中固定为128页的内存块,之后pagecache直接来定长内存池拿,效率就又高了,减少了内核态和用户态的切换
当然不只是pagecache中可以使用,其它只要使用到new的地方都可以切换成定长内存池的new
//单例模式
class PageCache
{
public:
//...
private:
ObjectPool<Span> _spanPool;
};
我们在pagecache中加一个定长内存池对象(组合关系)
//申请span对象
Span* span = _spanPool.New();
//释放span对象
_spanPool.Delete(span);
把申请空间的逻辑变为定长内存池的
threadcache申请替换
//通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
static std::mutex tcMtx;
static ObjectPool<ThreadCache> tcPool;
tcMtx.lock();
pTLSThreadCache = tcPool.New();
tcMtx.unlock();
}
因为threadcache是多线程的,所以在new时要加锁。
spanlist头结点申请替换
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = _spanPool.New();
_head->_next = _head;
_head->_prev = _head;
}
private:
Span* _head;
static ObjectPool<Span> _spanPool;
};
为什么使用静态(static)的定长内存池
因为我们定长内存池是给相同大小的对象使用的,在申请空间时应该是相同的对象申请同一个内存池,static申请为静态变量,表示这个对象只有一份且全局可用。(在类中的private静态对象只能对应的类对象才能调用)
释放内存时的优化(不传size)
我们之前的内存释放时,后面的size是写的待定的,因为在我们写代码时,还要传size,万一这个代码很长这个size我们已经记不住了,那就很麻烦。所以我们可以升级我们的函数
//管理以页为单位的大块内存
struct Span
{
PAGE_ID _pageId = 0; //大块内存起始页的页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双链表结构
Span* _prev = nullptr;
size_t _objSize = 0; //切好的小对象的大小
size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
void* _freeList = nullptr; //切好的小块内存的自由链表
bool _isUse = false; //是否在被使用
};
因为释放内存是threadcache给centralcache,centralcache给pagecache,而需要小块内存size的只有threadcache向centralcache时,所以我们可以在span中可以记录小当前切成小块的size。_objsize.
我们的记录时机在:我们newspan下来后都可以,因为此时centralcache要对span进行切分,当前函数有size的记录。
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_objSize = size;
释放内存使用objsize
static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;
if (size > MAX_BYTES) //大于256KB的内存释放
{
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
可以看到之前我们Free还有两个参数,记录后size在函数内获取,之后调用就方便了。
哈希表调用时加锁问题
我们知道STL的容器都是线程不安全的,这里我们在pagecache中建立了多个关系,此时线程是安全的因为pagecache一时间只能一个线程进入
但是我们在centralcache中需要通过调用哈希表找到对应的span来让threadcache归还内存 ,此时线程就不安全了,因为centralcache是桶锁,可能多个桶都在调用这个关系,然后此时pagecache在进行更改,然后有线程直接查看了,就是线程不安全了。
所以在查看关系是要加锁
//获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
std::unique_lock<std::mutex> lock(_pageMtx); //构造时加锁,析构时自动解锁
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
多线程下对比malloc测试
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime);
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime);
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}
int main()
{
size_t n = 10000;
cout << "==========================================================" <<
endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" <<
endl;
return 0;
}
很简单:1.我们写了两个函数分别调用concurrentmalloc和malloc,然后通过时间作对比。
- ntimes:单轮次申请和释放内存的次数。
- nworks:线程数。
- rounds:轮次。
固定大小的内存申请和释放
v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));
两边都一直申请16字节。对比结果:
发现我们的concurrentmalloc慢了很多。
原因:
固定的对象申请的都是同一个桶,所以占用了同一个桶锁,而我们不用全局锁用桶锁就是为了让threadcache可以同一时间访问不同的锁,同一个锁一直解锁开锁就慢了。
不同大小内存的申请和释放
v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
此时我们完成一个不停+i 但不超过8KB的对象申请。
还是比不过:也从侧面说明了malloc其实是很强大的。
多线程调试技巧
条件断点
在使用assert会直接结束程序,而这样我们并不清楚哪里出错了,只知道断言错了,但变量内容什么的都看不见,此时我们通过条件断点,就可以让程序停在这一块,而不是直接退出
函数帧栈
调试->窗口->调用堆栈。
双击桢栈 就可以跳回当前函数之前调用的函数里面,这样就可以查看已经过去的函数里的内容
死循环判断
比如说设置了一个断点但是一直没有运行到,那很可能就是死循环了,在调试->全部中断,此时会直接跳出死循环的点,然后到下一步就可以确认哪里死循环
性能分析工具
我们写出高并发内存池是为了提高malloc效率的,但是上面被完爆了,所以我们通过性能分析工具看看究竟是哪里慢了,根据这些地方在进行优化
VS编译器下的性能分析工具使用
调试->性能和诊断
但是要注意:这个分析是很慢的,所以数据量可以减少
int main()
{
size_t n = 1000;
cout << "==========================================================" <<
endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
//BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" <<
endl;
return 0;
}
把n改为1000.并且把malloc的调用屏蔽,因为我们只想看tcmalloc的情况。
接下来就是根据想法选择
然后就可以通过分析结果看到函数消耗时间占比
性能结果分析
发现Dealloc(释放)和MapObjectToSpan(通过地址找span)消耗时间是最多的
在Dealloc中ListTooLong是占用时间最久的
在ListTooLong中是ReleaseListToSpan最久。
最终发现在Dealloc中也是MapObjectToSpan占用时间最久。
为什么占用的久?
发现锁就占了40%,所以想办法让哈希表不用加锁。 怎么让哈希表不加锁就能访问呢?基数树法
那我们就找到对应的解决方法
基数树优化
基数树就是分层的哈希表。单层双层三层等。
基数树优化的点在哪里?
我们知道在建立映射关系时,unordered_map和map正在修改或者添加新的对应关系时,unordered_map可能就涉及到了扩容,map红黑树可能涉及到了红黑树的旋转。所以在做这些操作时是线程不安全的,此时需要加锁,而如果在这时读取关系就会导致错误。所以基数树就是给所有可能得页号单独开一个空间存储span*,此时就不会相互影响,就可以不用加锁了
注意:会导致错误的是操作,读取只是会读到错误的元素。在线程不安全时
单层基数树
单层基数树就是直接采用的直接定址法,事先就开辟好哈希表的大小,这个大小根据页号的大小来定,我们知道32位下的地址在0 - 2的32次方,页号除以8k就可以得出可以有2的19次方大小的页号。
我们这个单层基数树哈希表就是让每一个页号都对应一个span,而不是多个页号指向一个span。
所以我们开辟的空间应该和应该和我们的页号大小一样。才能保证每一个页号都对应一个span,即每个下标就是页号,每个下标存的是一个span的指针。
//单层基数树
template <int BITS>
class TCMalloc_PageMap1
{
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap1()
{
size_t size = sizeof(void*) << BITS; //需要开辟数组的大小
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按页对齐后的大小
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申请空间
memset(array_, 0, size); //对申请到的内存进行清理
}
void* get(Number k) const
{
if ((k >> BITS) > 0) //k的范围不在[0, 2^BITS-1]
{
return NULL;
}
return array_[k]; //返回该页号对应的span
}
void set(Number k, void* v)
{
assert((k >> BITS) == 0); //k的范围必须在[0, 2^BITS-1]
array_[k] = v; //建立映射
}
private:
void** array_; //存储映射关系的数组
static const int LENGTH = 1 << BITS; //页的数目
};
void*是一个模版问题,我们实现一个基数树不仅可以用来存span*其他内容也可以,所以不用限定成span*
1.我们的模版给了一个BITS代表我们需要开辟的指针数组大小的比特位。
2.LENGTH = 1 << BITS,就代表了我们指针数组的大小。void** _array是这个数组的本体。
3.我们根据一个指针的大小<<8k,BITS,算出的就是需要开辟空间的大小。
4.然后二维指针 _array 指向这块内存。
5.set就是分别设置每一位(页号)对应的span,然后是以void*传入的。如果超过了范围应该出错
6.get就是获取对应页号的span的。当然也不能越界。
为什么用void**
我们基数树管理的是一个void*数组,所以用void**指向这个数组(c语言指针知识)。
malloc和tcmalloc再次进行对比
成就感就来了,我们的tcmalloc打败了malloc。
总结:
项目源码:ConcurrentPool · 自律的阿龙/项目仓 - 码云 - 开源中国
我们的主线逻辑就是:threadcache centralcache pagecache他们之间的申请释放流程。
他们之间就是一种类似套娃的逻辑:
申请逻辑:从threadcache开始,如果有就返回,没有就向后一层借。
释放的逻辑也是一样:从threadcache开始,如果满足判断就释放内存给上一层,在pagecache中还要对span进行合并防止内存碎片的问题。
之后对于大于256KB的内存的释放我们有两种情况:
1.大于256kb(32页)小于128页:此时直接从pagecache中获取。
2.大于128页的:此时我们的pagecache最高就是128,所以就需要我们直接去堆上申请 == malloc。
项目优化
我们发现我们的性能一开始是不如malloc的,所以我们用基数树优化,优化了STL哈希表的线程不安全加锁问题。
centralcache用的是桶锁。pagecache用的是全局锁。threadcache线程独享所以不用锁。
部分细节
定长内存池:只能申请固定长度大小的内存池,和我们的tcmalloc配合起来后强强联手。
调试技巧:条件断点,栈桢分析,死循环中断。
性能分析工具:VS中自带的在调试下。
释放时不用传size:span中再加一个变量记录
span是否被使用:不能用_usecount,所以在设定一个变量_isUse。
感谢您的阅读,也感谢自己在认知道路上更进一步。
标签:span,简化版,void,开源,内存,centralcache,pagecache,size From: https://blog.csdn.net/a1275174052/article/details/144165015