目录
.3- thread cache 向 central cache 申请内存资源
.1- central cache 向 page cache 申请内存资源
2)thread cache 向 central cache 归还内存
3)central cache 向 page cache 归还内存
一、项目背景
1)mini 版的 tcmalloc
高并发内存池是一种专门设计用于高并发环境下的内存管理机制。它的原型是 Google 的一个开源项目 tcmalloc,全称 Thread-Caching Malloc,实现了高效的多线程内存管理,用于替换系统的内存分配相关函数 malloc 和 free。
tcmalloc 的知名度是非常高的,例如 Go 语言就直接用它做了自己的内存分配器。该项目就是把 tcmalloc 中最核心的框架简化后拿出来,模拟实现出一个 mini 版的高并发内存池,目的就是学习 tcmalloc 的精华。
在高并发系统中,大量的线程或进程可能会频繁地申请和释放小块的内存,这种情况下,传统的内存分配方式(如使用 malloc 和 free)可能会因为频繁的系统调用、锁竞争和内存碎片等问题而导致性能瓶颈。高并发内存池则通过以下方式来解决这些问题:
- 预先分配内存:内存池会预先从操作系统中申请一大块内存,并将其划分为多个固定大小的内存块或可变大小的内存块(根据具体实现而定),这些内存块在内存池中进行管理,而不是直接由操作系统管理。
- 减少系统调用:由于内存池预先分配了内存,因此当需要分配内存时,可以直接从内存池中获取,而无需频繁地向操作系统发起内存分配请求,从而减少了系统调用的次数。
- 优化锁机制:在多线程环境下,内存池的访问需要是线程安全的。高并发内存池通常会采用高效的锁机制(如自旋锁、读写锁、无锁数据结构等),以减少锁竞争并提高并发性能。
- 减少内存碎片:内存池通过管理固定大小的内存块或采用高效的内存分配算法,可以显著减少内存碎片的产生,提高内存的利用率和访问效率。
- 快速分配和回收:内存池中的内存块可以快速分配和回收,因为它们已经预先分配好了,并且内存池内部通常会有高效的内存管理策略来优化分配和回收过程。
- 可定制性:高并发内存池通常支持可定制性,允许用户根据应用程序的需求调整内存块的大小、数量和管理策略等。
高并发内存池的应用场景非常广泛,特别是在需要处理大量并发请求和频繁内存操作的应用程序中,如数据库、Web服务器、游戏服务器、实时交易系统等。通过使用高并发内存池,这些应用程序可以显著提高内存管理的效率和性能,从而改善整体的系统性能和用户体验。
2)内存池是什么
要了解内存池是什么,首先要知道池化技术。
所谓“池化技术”,就是程序先向系统申请过量的资源,然后⾃⼰管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较⼤的开销,不如提前申请好了,这样使⽤时就会变得⾮常快捷,⼤⼤提⾼程序运⾏效率。
这就好比,从前有座山,山顶有座庙,庙里有许多和尚要喝水,而水在山下的湖里,每次下山去湖里打水再返回庙里,来来回回十分麻烦,于是为了更方便地取水和用水,和尚们在半山腰建了一个池子来储水,这样一来,定时把湖水定量地送往半山腰,等每次庙里缺水了就只需要到半山腰的池子打水即可,节省了大量上山下山的时间和人力。
而这就是池化技术,所谓池化就是将原本要跑很远、跑多次才能拿到的东西,按需屯在往返中途的"池"中,从此以后往"池"中存、从"池"中取。
在计算机中,有很多使⽤“池”这种技术的地⽅,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若⼲数量的线程,让它们处于睡眠状态,当接收到 客⼾端的请求时,唤醒池中某个睡眠的线程,让它来处理客⼾端的请求,当处理完这个请求,线程⼜进⼊睡眠状态。
【Tips】池化技术的优点
- 减少内存碎片化:内存池化技术通过预先分配一定大小的内存块,并在程序运行过程中重复使用这些内存块,避免了频繁地进行内存分配和释放操作。这样可以减少内存碎片化的问题,提高内存的利用率。
- 降低内存管理开销:频繁的内存分配和释放操作会带来较大的开销,包括时间开销和空间开销。而通过内存池化技术,可以避免多次的内存分配和释放,从而大大降低了内存管理的开销,提高了程序的运行效率。
- 提升程序性能:通过减少内存碎片化、降低内存管理开销,内存池化技术可以提升程序的整体性能。它能够减少因频繁内存操作而导致的性能下降,使程序更高效地利用内存资源,加快数据的访问速度,提高程序的响应能力和执行效率。
话又说回内存池。内存池是指,程序预先向操作系统申请一块足够大的内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当释放内存的时候,并不是真正将内存返回给操作系统,而是将内存返回给内存池。当程序退出时(或某个特定时间),内存池才将之前申请的内存真正释放。
内存池主要解决的就是效率的问题,它能够避免让程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还需要尝试解决内存碎片的问题。
【补】内存碎片
内存碎片分为内部碎片和外部碎片,其中:
- 内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。
- 外部碎片是一些空闲的小块内存区域,由于这些内存空间不连续,以至于合计的内存足够,但是不能满足一些内存分配申请需求。
3)C/C++ 的 malloc 和 new
在 C/C++ 中,要动态申请内存并不是直接去堆申请的,而是通过 malloc() 或封装了 malloc() 的关键字 new(本质是 operator new 函数)去申请的。
申请内存块时,是先调用 malloc,malloc 再去向操作系统申请内存。而 malloc 其实就是一个内存池,它相当于从操作系统中“批发”了一块较大的内存空间,然后“零售”给程序用,当内存空间全部“售完”或程序有更大量的内存需求时,它会再按需向操作系统“进货”。
malloc 的实现方式有很多种,一般不同编译器平台用的都是不同的,例如 Windows 的 VS 系列中的 malloc 是微软自行实现的,而 Linux 下的 gcc 用的是 glibc 中的 ptmalloc。
4)所用技术栈和项目环境
- 技术栈:C/ C++/ C++11。
- 项目环境:Windows、vs 2013 / vs 2019 / vs 2022 / vscode。
二、实现定长内存池
malloc 其实是一个通用的内存池,虽然在任何场景下都可以使用,但这也意味着它在任何场景下都不会有很高的性能。
而定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们可以将其性能发挥到极致,且不需要考虑内存碎片等问题。
我们可以通过实现一个定长内存池,来熟悉一下对简单内存池的控制。其次,这个定长内存池在之后也会作为高并发内存池的一个基础组件。
1)基本框架
定长内存池的基础功能,就是从内存中申请一段大块空间,以切分的方式按需分配给进程或线程使用,等使用完后又将切分的定长的小块内存归还回内存池中,内存池会专门好维护这些小块内存,以便下次继续使用,直到程序服务彻底关停时,再将内存池中的所有内存空间统一地释放、归还给系统。
对于向堆申请到的大块内存,我们可以用一个指针来对其进行管理。
而归还回内存池的小块内存,我们可以将它们构建成一个链表数据结构来管理。这个管理归还/释放的定长内存的链表,就叫做自由链表,为了能找到这个自由链表,我们需要一个指向自由链表头节点的指针。
特别的,自由链表中的节点并不是一个个封装了元素和 next 指针的结构体,而单纯就是一个个定长内存,它们之所以能被构建成链表,是因为每块定长内存的前 4 个(32位下)或前 8 个(64位下)字节存放了下一块定长内存的地址,而指向自由链表头节点的指针中,存放的其实就是位于头节点的定长内存的地址。
由此,我们可以为定长内存池创建一个 ObjectPool 类,且可以初步得到 ObjectPool 类的基本结构了:
- ObjectPool.h
#pragma once
#include<iostream>
using std::cout;
using std::endl;
//template<size_t N> /*非类型的模板参数规定大小*/
template<class T> /*一个对象的大小是固定的,类型模板参数也可以起到规定大小的作用*/
class ObjectPool
{
public:
//向进程或线程提供申请空间的接口
T* New()
{}
//向进程或线程提供释放空间的接口
void Delete(T* obj)
{}
private:
char* _memory=nullptr; //指向一次性向系统申请的大块内存空间,char*固定占一个字节,方便每次从内存中切空间
void* _freeList=nullptr;//自由链表的头指针,存了自由链表的头节点的地址,用于管理已向内存申请、使用后被归还的内存块
// 自由链表中每个节点都是一块被归还的内存,其中存放的是下一个节点的地址
};
2)申请内存块
一个进程或线程要从内存池中取到内存资源来使用,内存池中首先要有内存资源,其次,内存池还要按需将内存资源分给进程或线程。
由此,进程或线程申请内存块的过程就可以大致分为:
- 先向系统申请一段大块内存到内存池中;
- 将内存池中的内存资源进行定长切分,并将切分好的内存资源返回给进程或线程。
- ObjectPool.h
#pragma once
#include<iostream>
using std::cout;
using std::endl;
template<class T>
class ObjectPool
{
public:
//申请空间
T* New()
{
//1.申请大块内存
if( _memory == nullptr)
{
_memory = (char*)malloc(128 * 1024);//一次向内存申请128kb空间
if (_memory == nullptr)
throw bad_alloc();//申请失败则抛异常
}
//2.将大块内存切下一块(一个对象的大小)并返回
T* obj = (T*)_memory;
_memory += sizeof(T);//调整大块内存的可用空间
return obj;
}
//释放空间
void Delete(T* obj)
{}
private:
char* _memory=nullptr;
void* _freeList=nullptr;
};
此时我们编写的申请大块内存的执行条件是 _memory == nullptr。
如果已经申请的大块内存全部用完了,由于无法知道已申请的大块内存是否用完,就更别提后续怎么继续申请大块内存了,因此我们还需要一个成员变量来实时记录大块内存的使用情况。
而申请大块内存的执行条件,可以根据大块内存的使用情况来编写。
- ObjectPool.h
#pragma once
#include<iostream>
using std::cout;
using std::endl;
template<class T>
class ObjectPool
{
public:
//申请空间
T* New()
{
//1.申请大块内存
//if( _memory == nullptr) //该执行条件无法判断大块内存是否还有富余可用
//if(_remainBytes == 0) //该执行条件只适合,一个对象的大小能被128整除的情景
if (_remainBytes < sizeof(T))//剩余内存不够一个对象的大小,就重新申请大块空间
{
_remainBytes = 128 * 1024; //重置剩余字节数
_memory = (char*)malloc(128 * 1024);//一次向内存申请128kb空间
if (_memory == nullptr)
throw bad_alloc();
}
//2.将大块内存切下一块(一个对象的大小)并返回
T* obj = (T*)_memory;
_memory += sizeof(T);//调整大块内存的可用空间
_remainBytes -= sizeof(T);
return obj;
}
//释放空间
void Delete(T* obj)
{}
private:
char* _memory=nullptr;
size_t _remainBytes = 0;//大块内存被切分后的剩余字节数
void* _freeList=nullptr;
};
3)释放内存块
一个进程或线程要释放自己的内存资源,其实是将原先申请的定长内存块归还到内存池中。
归还的定长内存块会以特定的方式,被构建成一个自由链表,在 ObjectPool 类中,成员指针 _freeList 就存的是自由链表中位于头节点的定长内存块的地址,而在自由链表中,每个节点都是一个定长内存块,它们的前 4 个或前 8 个字节都存放有下一个定长内存块的地址。
由此,进程或线程释放定长内存块的过程,其实就是维护自由链表的过程。
在释放定长内存块时,如果自由链表中原先没有内存块,就需要将内存块接入自由链表。
如果原先已经有内存块了,就将内存块头插入自由链表,而选用头插的方式是因为,自由链表中使每个节点能够链接起来的特性。
构建节点之间链接的过程,其实是对指针变量以及内存块前几个字节进行赋值的过程,其中还涉及一些关键的细节,请见以下代码和相关注释:
- ObjectPool.h
#pragma once
#include<iostream>
using std::cout;
using std::endl;
template<class T>
class ObjectPool
{
public:
//申请空间
T* New()
{
//...
}
//释放空间
void Delete(T* obj)
{
if (_freeList == nullptr)
{
//1.自由链表还不存在,就将 obj 的内存块地址存入头指针
_freeList = obj;
//2.每个节点中的前4个或前8个字节存了下一个节点的地址,因此接入自由链表的内存块中也要存入下一个节点的地址
//*(int*)obj = nullptr;
// //这样写,在32位下可行,但在64位下不可行,
// //因为64位下地址是8字节的,int本身固定只有4字节,会导致数据丢失
//重点!
*(void**)obj = nullptr;
//void*(指针类型)在32位下是4字节,在64位下是8字节
}
else
{
//自由链表已经存在了,就将内存块接入自由链表中
//由于每个节点的前4个或前8个字节存了下一个节点的地址,因此内存块通过头插接入自由链表是最方便的
*(void**)obj = _freeList;
_freeList = obj;
}
}
private:
char* _memory=nullptr;
size_t _remainBytes = 0;
void* _freeList=nullptr;
};
不过,关于自由链表中是否已有内存块的分类讨论,其实可以合并成一种情况,因为头插本身对分类讨论的两种情况均适用。
- ObjectPool.h
#pragma once
#include<iostream>
using std::cout;
using std::endl;
template<class T>
class ObjectPool
{
public:
//申请空间
T* New()
{
//...
}
//释放空间
void Delete(T* obj)
{
if (_freeList == nullptr)
{
//但实际上,将内存块头插接入自由链表的过程,无所谓自由链表本身是否存在
*(void**)obj = _freeList;
_freeList = obj;
}
private:
char* _memory=nullptr;
size_t _remainBytes = 0;
void* _freeList=nullptr;
};
另外,进程或线程每次从内存池申请资源的时候,其实未必是直接从大块内存上切走一块的,还可能在自由链表中有之前用完并归还回内存池的资源,因此, 每次申请资源应优先从自由链表中获取,自由链表中不够了再从大块内存上切。
- ObjectPool.h
#pragma once
#include<iostream>
using std::cout;
using std::endl;
template<class T>
class ObjectPool
{
public:
//申请空间
T* New()
{
T* obj = nullptr;
//优先将归还的定长内存块,再次重复利用
//具体方式是对自由链表进行头删
if (_freeList)
{
void* next = *(void**)_freeList;
obj = (T*)_freeList;
_freeList = next;
return obj;
}
else
{
//申请大块内存
if (_remainBytes < sizeof(T))
{
_remainBytes = 128 * 1024;
_memory = (char*)malloc(128 * 1024);
if (_memory == nullptr)
throw bad_alloc();
}
//将大块内存切下一块(一个对象的大小)并返回
obj = (T*)_memory;
_memory += sizeof(T);
_remainBytes -= sizeof(T);
return obj;
}
}
private:
char* _memory=nullptr;
size_t _remainBytes = 0;
void* _freeList=nullptr;
};
4)细节优化和性能测试
- ObjectPool.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
using std::cout;
using std::endl;
#ifdef _WIN32
#include<windows.h>
#else
// ...
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
//windows系统下使用的是VirtualAlloc
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
template<class T>
class ObjectPool
{
public:
//申请空间
T* New()
{
T* obj = nullptr;
//优先将归还的定长内存块,再次重复利用
if (_freeList)
{
void* next = *(void**)_freeList;
obj = (T*)_freeList;
_freeList = next;
}
else
{
//申请大块内存
if (_remainBytes < sizeof(T))
{
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(128 * 1024);
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
throw std::bad_alloc();
}
//将大块内存切下一块(一个对象的大小)并返回
obj = (T*)_memory;
//这样分配空间存在一些问题
/*_memory += sizeof(T);
_remainBytes -= sizeof(T);*/
//如果T是一个int类型,它的大小固定是4字节,但在64位下,一个地址的大小是8字节,链表节点就存不下这个地址了
size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);//一个对象的大小不及一个指针,就把一个指针的大小作为定长
_memory += objsize;
_remainBytes -= objsize;
}
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
//释放空间
void Delete(T* obj)
{
//显示调用析构函数清理对象
obj->~T();
*(void**)obj = _freeList;
_freeList = obj;
}
private:
char* _memory=nullptr;
size_t _remainBytes = 0;
void* _freeList=nullptr;
};
//以下为性能测试代码
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
void TestObjectPool()
{
// 申请释放的轮次
const size_t Rounds = 5;
// 每轮申请释放多少次
const size_t N = 100000;
//申请和释放v1,测试关键字new(封装了malloc)
std::vector<TreeNode*> v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
///申请和释放v2,测试我们自己的定长内存池
std::vector<TreeNode*> v2;
v2.reserve(N);
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
//打印v1和v2的时间差
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
- UnitTest.cpp
#include "ObjectPool.h"
int main()
{
TestObjectPool();
return 0;
}
在 vs2022 下切换至 release 模式,编译运行后结果如下:
可见定长内存池的效率要比 malloc 高上不少。
三、高并发内存池的整体框架
在高并发系统中,大量的线程或进程可能会频繁地申请和释放小块的内存,这种情况下,传统的内存分配方式(如使用 malloc 和 free)可能会因为频繁的系统调用、锁竞争和内存碎片等问题而导致性能瓶颈,因此,在设计高并发内存池时,需着重考虑以下问题:
- 性能问题。
- 多线程环境下的锁竞争问题。
- 内存碎⽚问题。
而针对这三个问题,高并发内存池被设计为三层,它们分别是:
- thread cache(线程缓存): theard cache 是每个线程独有的,用于小于等于 256KB 的内存分配。线程从这一层申请内存并不需要加锁,且每个线程独享⼀个 theard cache,这就使得“高”并发线程池非常高效。
- central cache(中心缓存): central cache 会按需给 thread cache 分配内存资源,因此它是所有线程所共享的。central cache 还能在合适的时机回收 thread cache 中的资源,避免⼀个线程占用太多资源而引发资源竞争,这样就可以为多个线程更均衡地调度资源。由于 central cache 本质是所有线程所共享的,因此其内部是存在竞争的,从其内部申请资源也就需要加锁;不过 central cache 使用的是桶锁,且只有当 thread cache 中的内存资源耗尽才会向上层 central cache 继续申请,因此 central cache 内部的锁竞争其实不会特别激烈。
- page cache(页缓存):page cache 是在 central cache 之上的⼀层缓存,其中的内存是以页为单位来进行存储及分配的。当下层 central cache 中没有内存资源时,page cache 会分配出⼀定数量的页,并将其切割成定长的小块内存,再分配给 central cache;而当 central cache 中的内存资源满足一定条件时,page cache 就会在合适的时机将其进行回收,并将回收的内存尽可能地进行合并,以组成更大的连续内存块,进而缓解内存碎片的问题。
在下文中,我们将对 thread cache、central cache、page cache 这三个模块逐一进行实现。
四、实现 thread cache 申请内存
1)基本框架
thread cache 的整体设计类似于上文中的定长内存池,也通过自由链表管理释放回来的内存块,或从上层 central cache 申请一块内存放到自由链表中。
thread cache 支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要 20 多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。
不过,我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐,例如让这些字节数都按照 8 字节进行向上对齐,那么 thread cache 的结构就可以被设计成一个哈希桶。
thread cache 被设计成一个哈希桶就意味着, thread cache 本身可以是一张线性的哈希表,而这张哈希表上“挂满”了自由链表,由此哈希表存储的映射关系为 <内存块大小,管理相应内存块的自由链表>。
例如当线程申请 1~8 字节的内存时, thread cache 会统一地给出 8 字节的内存块,具体方式是从哈希表中 8 字节所对应位置上的、管理着大小为 8 字节的内存块的自由链表中,取一个节点返回给申请内存的线程;而当线程申请 9~16 字节的内存时,thread cache 又会统一地给出 16 字节的内存块......以此类推。
我们先定义一个公共的头文件,将所有文件都需要用到的一些内容编写在其中。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<thread>
#include<assert.h>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static void*& NextObj(void* obj)//取前4/8个字节,可读可写
{
return *(void**)obj;
}
class FreeList //管理切分好的定长内存块的自由链表
{
public:
//头插一个节点
void Push(void* obj)
{
assert(obj);
NextObj(obj) = _freeList;//用前4/8个字节存下一个节点的地址
_freeList = obj;
}
//头删一个节点
void* Pop()
{
assert(_freeList);
void* obj = _freeList; //记录头节点的地址
_freeList = NextObj(obj);//将头指针指向头节点的下一个节点
return obj; //返回头节点
}
//判断链表是否为空
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;//自由链表的头指针
};
然后再分别定义一个 ThreadCache 头文件和源文件,在其中编写 ThreadCache 的主要代码。
- ThreadCache.h
#pragma once
#include"Common.h"
class ThreadCache
{
public:
//申请内存
void* Allocate(size_t size);
//释放内存
void Deallocate(void* ptr, size_t size);
//从 central cache 中获取资源
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeLists[]; //挂满自由链表的哈希桶,可哈希桶具体的大小该是多少呢?
};
- ThreadCache.cpp
#include"ThreadCache.h"
void* ThreadCache::Allocate(size_t size)
{}
void ThreadCache::Deallocate(void* ptr,size_t size)
{}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{}
为了确定 ThreadCache 类中哈希桶 _freeLists 的大小,以及完善申请内存的 Allocate() 和释放内存的 Deallocate(),我们不得不先来思考存储了 <内存块大小,管理相应内存块的自由链表>的哈希桶中,映射对齐规则具体是怎样的。
2)哈希桶中的映射对齐规则
.1- 内存块大小的向上对齐
如果每个字节数都对应一个自由链表,那么哈希桶的空间开销就太大了,因此我们需要制定一个更合适的映射对齐规则。
由于这些内存块是会被链接到自由链表上的,且我们必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针,因此最初按 8 字节进行对齐肯定是最合适的。但如果所有的字节数都按照 8 字节进行对齐的话,就需要建立 256 × 1024 ÷ 8 = 32768 个桶,这个空间开销还是不小。那么,还有没有更好的对齐规则呢?
对此,tcmalloc 中提供了一个很好的方案,这里我们将其简化,就得到了以下对其规则:
虽然对齐产生的内碎片会引起一定程度的空间浪费,但按照上面的对齐规则,我们可以将浪费率控制到百分之十左右。需要说明的是,1~128这个区间我们不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,这里我们就从第二个区间开始进行计算。
由公式“浪费率 = 对齐后的字节数 / 浪费的字节数”,我们要得到某个区间的最大浪费率,就应该让分子尽量取大,让分母尽量取小。比如在 129~1024 这个区间,该区域的对齐数是 16,那么最大浪费的字节数就是 15,而最小对齐后的字节数就是这个区间内的前 16 个数所对齐到的字节数,也就是144,那么该区间的最大浪费率也就是15 ÷ 144 ≈ 10.42% 。同理,“1024 + 1 ~ 8 x 1024” 区间的最大浪费率为127 ÷ 1152 ≈ 11.02%,“8 x 1024 + 1 ~ 64 x 1024” 区间的最大浪费率为 1023 ÷ 9216 ≈ 11.10% 。
由以上对齐规则,ThreadCache 类中哈希桶 _freeLists 的大小只需定为 208 即可。
我们将这个对齐规则写成一个 SizeClass 类,并放在 Common.h 下。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static void*& NextObj(void* obj)//取前4/8个字节,可读可写
{
return *(void**)obj;
}
//管理切分好的定长内存块的自由链表
class FreeList
{
public:
//头插一个节点
void Push(void* obj)
{
assert(obj);
NextObj(obj) = _freeList;//用前4/8个字节存下一个节点的地址
_freeList = obj;
}
//头删一个节点
void* Pop()
{
assert(_freeList);
void* obj = _freeList; //记录头节点的地址
_freeList = NextObj(obj);//将头指针指向头节点的下一个节点
return obj; //返回头节点
}
//判断链表是否为空
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;//自由链表的头指针
};
//计算定长内存块的对齐映射规则
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));//进行向上对齐
}
//获取向上对齐后的字节数
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);
}
else
{
assert(false);
return -1;
}
}
//...
};
特别的,向上对齐是以位运算的方式来进行的,其中,RoundUp 的子函数 _RoundUp 的参数分别为 bytes(定长内存块的字节大小) 和 alignNum(相应的对齐数)。
假设 bytes 为 10 字节,则 bytes 对应的对齐数为 8,根据算式 “(bytes + alignNum - 1) & ~(alignNum - 1)” ,alignNum - 1(8-1)得 7,而 7 就是一个低三位为 1、其余位为 0 的二进制序列(00111);然后我们将 10 与 7 相加,相当于将 10 字节中不够 8 字节的部分给补上了。
alignNum - 1(值为7)按位取反后,会得到一个高位为 1、低位为 0 的二进制序列(11000),将其和 10 进行与运算,相当于屏蔽了 10 与 7 相加的值的低三位,且保持了该值的其余位不变,于是就得到了 10 字节按 8 字节对齐后的值,即 16 字节。
.2- 内存块在哈希桶中的下标
除了要获取内存块大小向上对齐后的字节数,还要获取内存块在哈希桶中对应的下标,以便知道内存块要链接到哪个自由链表中。
这里依然是用上文中的对齐规则:
获取内存块在哈希桶中对应下标的代码,同样编写在 Common.h 下的 SizeClass 类中。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static void*& NextObj(void* obj)//取前4/8个字节,可读可写
{
return *(void**)obj;
}
//管理切分好的定长内存块的自由链表
class FreeList
{
public:
//头插一个节点
void Push(void* obj)
{
assert(obj);
NextObj(obj) = _freeList;//用前4/8个字节存下一个节点的地址
_freeList = obj;
}
//头删一个节点
void* Pop()
{
assert(_freeList);
void* obj = _freeList; //记录头节点的地址
_freeList = NextObj(obj);//将头指针指向头节点的下一个节点
return obj; //返回头节点
}
//判断链表是否为空
bool Empty()
{
return _freeList == nullptr;
}
private:
void* _freeList = nullptr;//自由链表的头指针
};
//计算定长内存块的对齐映射规则
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
//获取向上对齐后的字节数
//...
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
//获取对应哈希桶的下标
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
static int group_array[4] = { 16, 56, 56, 56 };// 记录每个区间有多少个链表
if (bytes <= 128) {
return _Index(bytes, 3);
}
else if (bytes <= 1024) {
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024) {
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024) {
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024) {
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else {
assert(false);
}
return -1;
}
};
特别的,获取下标也是以位运算的方式来进行的,其中 Index() 的子函数 _Index() 的参数分别为 bytes(内存块的字节大小)和 align_shift(左移的转换位数,例如 align_shift 为 3,8 = 2 ^ 3,且二进制序列 0001 左移 3 位就能得到 8)。
假设 bytes 为 10 字节,则 bytes 对应的对齐数为 8,相应的,align_shift 就为 3。根据算式“(bytes + (1 << alignShift) - 1) >> align_shift) - 1”,“1 << alignShift” 得到的就是 8,然后 8 − 1 = 7 ,再与 bytes 相加,相当于还是让 10 与 7 相加
让 10 与 7 相加的结果再右移 align_shift 位,实际上就是让这个结果去除以 8,也就相当于屏蔽了该结果的二进制序列的低三位,此时就得到了 2。
而最后还需要减 1,是因为哈希桶数组的下标是从 0 开始的。
3)TLS 无锁访问
TLS(Thread Local Storage,线程本地存储)是一种变量的存储方法。
为了使每个线程都有一个自己独享的 thread cache,可以轻易想到将 thread cache 创建为全局的,但单纯将 thread cache 创建为全局变量,thread cache 就是所有线程共享的了,这样就不可避免地需要锁来控制,增加了控制成本和代码复杂度。
而 TLS 可以使一个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
- ThreadCache.h
#pragma once
#include"Common.h"
class ThreadCache
{
public:
//申请内存
void* Allocate(size_t size);
//释放内存
void Deallocate(void* ptr, size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// TLS - thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
//TLS的使用较为简单,只需对ThreadCache添加这句即可
然后,我们就可以将 ThreadCache 的成员函数们进一步完善了:
- ThreadCache.cpp
#include"ThreadCache.h"
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);//求向上对齐数
size_t index = SizeClass::Index(size); //求在哈希桶中的下标
if (!_freeLists[index].Empty()) //相应下标处的自由链表有资源,就从中取资源来用
{
return _freeLists[index].Pop();
}
else //没有就向上层申请资源
{
return FetchFromCentralCache(index, alignSize);
}
}
void ThreadCache::Deallocate(void* ptr,size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找映射的自由链表桶,将内存块插入
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
return nullptr;
}
为了测试 TLS,我们在此处引入以下代码:
- ConcurrentAlloc.h
#pragma once
#include "Common.h"
#include"ThreadCache.h"
static void* ConcurrentAlloc(size_t size)
{
// 通过TLS,每个线程无锁的获取自己的专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
//打印线程号和相应的内存块地址
cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
static void ConcurrentFree(void* ptr,size_t size)
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr,size);
}
- UnitTest.cpp
#include "ObjectPool.h"
#include"ConcurrentAlloc.h"
void Alloc1()
{
for (size_t i = 0; i < 5; i++)
{
void* ptr = ConcurrentAlloc(6);
}
}
void Alloc2()
{
for (size_t i = 0; i < 5; i++)
{
void* ptr = ConcurrentAlloc(7);
}
}
void TLSTest()
{
//创建两个线程,测试TLS
//只要它们所持有的内存块地址不同,就说明TLS确实可以支持无锁访问
std::thread t1(Alloc1);
t1.join();
std::thread t2(Alloc2);
t2.join();
}
int main()
{
//TestObjectPool();
TLSTest();
return 0;
}
将上文中所有代码在 vs 下编译运行后,可以看到以下结果:
由图,TID 不同的两个线程所持有的内存块地址也不同,这说明 TLS 使每个 thread cache 对象在它所在的线程内是全局可访问的,但是不能被其他线程访问到,也就是说,TLS 可以使每个线程都有一个自己独享的 thread cache,且每个线程可以无锁地获取自己的专属的 ThreadCache 对象。
五、实现 central cache 申请内存
1)基本框架
当线程申请某一大小的内存时,如果 thread cache 中对应的自由链表不为空,那么直接从中取出一个内存块返回即可;但如果该自由链表为空,那么 thread cache 就需要向 central cache 申请内存了。
central cache 的结构与 thread cache 是一样,都是哈希桶,且它们遵循着相同的对齐映射规则。这样做的好处是,当 thread cache 的某个桶中没有内存了,可以直接到 central cache 中对应的哈希桶里去取内存,十分方便。
不过,central cache 与 thread cache 有两个明显不同的地方。
首先,central cache 是所有线程共享的,而不是每个线程独享的,因此访问 central cache 时需要加锁。每个线程的 thread cache 没有内存了,都会去找 central cache 申请,也就是说,在访问 central cache 时是需要加锁的。但在加锁时,并非要将整个 central cache 全部锁上,因为 central cache 用的是桶锁,每个桶都有一个锁。只有当多个线程同时访问同一个桶时,才会存在锁竞争,如果是多个线程同时访问不同桶,就不会存在锁竞争。
其次,central cache 的每个桶中挂的是一个个的 span(管理一个跨度的大块内存的结构体),而非一个个切好的定长内存块。每个 span 管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的形式链接起来的,并且每个 span 里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其所在的哈希桶,这些内存块被切成了对应的大小。
【Tips】thread cache 如何向 central cache 申请内存
当 thread cache 中没有内存时,就会批量向 central cache 申请⼀些内存对象,这⾥的批量获取对象的过程,使用了类似网络 tcp 协议拥塞控制的慢开始算法。central cache 也有⼀个哈希映射的 spanlist,spanlist 中挂着 span,从 span 中取出对象给 thread cache,这个过程是需要加锁的,不过,这⾥使⽤的是⼀个桶锁,可以尽可能提⾼效率。
当 central cache 映射的 spanlist 中的所有 span 都没有内存以后,则需要向上层 page cache申请⼀个新的 span 对象。在拿到新的 span 以后,将 span 管理的内存按⼤⼩切好并构建成一个⾃由链表,以支持从 span 中取内存块给 thread cache。
另外,central cache 挂的 span 中,有一个计数器 use_count,记录分配了多少个内存块出去,每分配⼀个内存块给 thread cache,就 ++use_count。
【Tips】thread cache 如何将内存归还回 central cache
当 thread cache 过⻓或线程被销毁,则会将内存释放回 central cache 中。每释放回来一个内存块,就 -- use_count。当 use_count 减到0时,则表⽰所有内存块都回到了 span 中,此时就将 span 释放回 page cache, page cache 中会对前后相邻的空闲⻚进⾏合并。
.1- 页号的存储
每个程序运行起来后都有自己的进程地址空间,在 32 位平台下,进程地址空间的大小是 2^32;而在 64 位平台下,进程地址空间的大小就是 2^64。
页的大小一般是 4K 或者 8K。以 8K为例,在32位平台下,进程地址空间就可以被分成 2^32 ÷ 2^13 = 2^19 个页;在 64 位平台下,进程地址空间就可以被分成 2^64 ÷ 2^13 = 2^51 个页。页号本质与地址是一样的,它们都是一个编号,只不过地址是以一个字节为一个单位,而页是以多个字节为一个单位。
由于页号在 64 位平台下的取值范围是 [ 0,2^51 ),因此不能简单的用一个无符号整型来存储页号,需要借助条件编译来解决这个问题。需要注意的是,在 32 位下,_WIN32 有定义,_WIN64 没有定义;而在 64 位下,_WIN32 和 _WIN64 都有定义。因此在条件编译时,我们应该先判断 _WIN64 是否有定义,再判断 _WIN32 是否有定义。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
//管理切分好的定长内存块的自由链表
//...
//计算定长内存块的对齐映射规则
//...
//管理多个连续页大块内存跨度结构
struct Span
{
//...
};
//带头双向循环链表
class SpanList
{
//...
};
.2- 定义管理页的结构 Span
central cache 是哈希桶结构,每个桶里挂的都是一个个双向循环链表 spanList,链表中的每一个节点 span 都是一个管理以页为单位的大块内存的结构体。
span 结构体中既封装了页的相关字段,也有维护双向链表结构的前驱指针和后继指针,还有对分配给 thread cache 的内存块进行计数的计数器,以及管理着切好的小块内存的自由链表头指针。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
//管理切分好的定长内存块的自由链表
//...
//计算定长内存块的对齐映射规则
//...
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; //大块内存的起始页页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _useCount = 0; //记录分配给thread cache的定长内存块的数量
void* _freeList = nullptr; //由定长内存块构建的自由链表的头指针
};
//带头双向循环链表
class SpanList
{
//...
};
【Tips】span 成员字段的说明
对于 span 管理的以页为单位的大块内存,我们需要知道这块内存具体在哪一个位置,便于之后 page cache 进行前后页的合并,因此 span 结构当中会记录所管理大块内存起始页的页号。
至于每一个 span 管理的到底是多少个页,这并不是固定的,需要根据多方面的因素来控制,因此 span 结构当中有一个 _n 成员,该成员就代表着该 span 管理的页的数量。
此外,每个 span 管理的大块内存,都会被切成相应大小的内存块挂到当前span的自由链表中,比如 8 Byte 哈希桶中的 span,会被切成一个个 8 Byte 大小的内存块挂到当前span的自由链表中,因此 span 结构中需要存储切好的小块内存的自由链表。
span 结构当中的 _useCount 成员记录的就是,当前 span 中切好的小块内存,被分配给thread cache 的计数,当某个 span 的 _useCount 计数变为 0 时,代表当前span切出去的内存块对象全部还回来了,此时 central cache 就可以将这个 span 再还给 page cache。
每个桶当中的 span 是以双链表的形式组织起来的,当我们需要将某个 span 归还给 page cache 时,就可以很方便的将该 span 从双链表结构中移出。如果用单链表结构的话就比较麻烦了,因为单链表在删除时,需要知道当前节点的前一个节点。
.3- 定义双向链表 SpanList
central cache 的每个桶里存储的都是一个由 span 构建的双向链表 spanList,我们可以定义一个 spanList 类来实现这个双向链表,其中包含的成员变量有链表的头指针、桶锁,成员函数有基本的插入节点和删除节点。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
//管理切分好的定长内存块的自由链表
//...
//计算定长内存块的对齐映射规则
//...
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; //大块内存的起始页页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _useCount = 0; //记录分配给thread cache的定长内存块的数量
void* _freeList = nullptr; //由定长内存块构建的自由链表的头指针
};
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//在pos位置插入
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
//先找到pos的前一个位置
Span* prev = pos->_prev;
//prev<-->newSpan<-->pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
//先找到pos前后的位置
Span* prev = pos->_prev;
Span* next = pos->_next;
//prev<-->next
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
需要注意的是,尽管双向链表的节点(一个 span)是在链表的构造函数中 new 出来的,但由于从双向链表删除的 span 会还给下一层的 page cache,相当于只是把这个 span 从双链表中移除,因此不需要对删除的 span 进行 delete 操作。
.4- 定义 CentralCache 类
central cache 的映射规则和 thread cache是一样的,因此 central cache 里面哈希桶的个数也是208,桶中存储就是上文中定义的双向链表。
这里,我们在项目中再创建一个 CentralCache.h 头文件,将 central cache 的本体 CentralCache 类编写在其中。
- CentralCache.h
#pragma once
#include"Common.h"
class CentralCache
{
public:
//...
private:
SpanList _spanLists[NFREELIST];
};
2)与 thread cache 联动
.1- CentralCache 类应为单例模式
为了使每个线程都有一个属于自己的 thread cache,我们是用了 TLS 来实现每个线程无锁访问属于自己的 thread cache 的。而 central cache 和 page cache 在整个进程中只有一个,对于这种只能创建一个对象的类,我们需将其设置为单例模式,这样可以保证系统中该类只有一个实例。
为了保证 CentralCache 类只能创建一个对象,我们需要将 central cache 的构造函数设置为私有,且在拷贝构造函数的后面加上 =delete 进行修饰;此外,还需要在 CentralCache 类中添加一个静态的成员变量,且提供一个公有的成员函数用于获取单例对象。这样,当程序运行起来后,程序中就只有一个单例的 CentralCache 对象了。
- CentralCache.h
#pragma once
#include"Common.h"
class CentralCache //单例的饿汉模式
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
//...
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
- CentralCache.cpp
//_sInst的声明与定义不在同一个头文件中,
//避免所有包含了CentralCache.h的源文件下都有_sInst
#include"CentralCache.h"
CentralCache CentralCache::_sInst;
//...
.2- 慢开始反馈调节算法
当 thread cache 向central cache申请内存时,如果 central cache 给的太少,那么 thread cache 在短时间内用完了又会来申请;但如果一次性给的太多了,可能 thread cache 用不完,进而造成浪费。
为了解决这个资源分配问题,我们采用了一个慢开始反馈调节算法——当 thread cache 向central cache 申请内存时,如果申请的是较小的定长内存块,就可以稍微多分配一些;如果申请的是较大的定长内存块,就可以稍微少分配一些。
我们可以根据 thread cache 申请的内存块的大小,计算出 central cache 具体要分配的内存块数量,且将内存块数量控制在 2~512 之间,也就是说,thread cache 要申请的内存块再小, central cache 最多一次性给出 512 个;thread cache 要申请的内存块再大,central cache 至少一次性给出 2 个。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
//管理切分好的定长内存块的自由链表
//...
//计算定长内存块的对齐映射规则
class SizeClass
{
public:
//...
//慢开始反馈调节算法
//thread cache一次从central cache获取对象的上限
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
//对象越小,计算出的上限越高
//对象越大,计算出的上限越低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
};
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; //大块内存的起始页页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _useCount = 0; //记录分配给thread cache的定长内存块的数量
void* _freeList = nullptr; //由定长内存块构建的自由链表的头指针
};
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//在pos位置插入
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
//先找到pos的前一个位置
Span* prev = pos->_prev;
//prev<-->newSpan<-->pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
//先找到pos前后的位置
Span* prev = pos->_prev;
Span* next = pos->_next;
//prev<-->next
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
但就算申请的内存块很小,一次性给出 512 个,分配的空间资源也是比较多的。
由此,我们可以在 Common.h 文件下编写的自由链表 FreeList 类中,增加一个 _maxSize 成员变量,用于记录一个自由链表中的内存块数量,_maxSize 的初始值设置为1,且类内会提供一个公有成员函数便于外部获取 _maxSize。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
//管理切分好的定长内存块的自由链表
class FreeList
{
public:
//头插一个节点
void Push(void* obj)
{
assert(obj);
NextObj(obj) = _freeList;//用前4/8个字节存下一个节点的地址
_freeList = obj;
}
//范围头插(一次头插多个节点)
void PushRange(void* start, void* end)
{
NextObj(end) = _freeList;
_freeList = start;
}
//头删一个节点
void* Pop()
{
assert(_freeList);
void* obj = _freeList; //记录头节点的地址
_freeList = NextObj(obj);//将头指针指向头节点的下一个节点
return obj; //返回头节点
}
//判断链表是否为空
bool Empty()
{
return _freeList == nullptr;
}
//获取 _maxSize
size_t& MaxSize()
{
return _maxSize;
}
private:
void* _freeList = nullptr;//自由链表的头指针
size_t _maxSize = 1; //记录一个自由链表中的内存块数量
};
//计算定长内存块的对齐映射规则
class SizeClass
{
public:
//...
//thread cache一次从central cache获取对象的上限
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
//对象越小,计算出的上限越高
//对象越大,计算出的上限越低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
};
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; //大块内存的起始页页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _useCount = 0; //记录分配给thread cache的定长内存块的数量
void* _freeList = nullptr; //由定长内存块构建的自由链表的头指针
};
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//在pos位置插入
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
//先找到pos的前一个位置
Span* prev = pos->_prev;
//prev<-->newSpan<-->pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
//先找到pos前后的位置
Span* prev = pos->_prev;
Span* next = pos->_next;
//prev<-->next
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
当 thread cache 向 central cache 申请内存块时,我们会比较 _maxSize 和由慢开始反馈调节算法得出的值,取出其中的较小值作为本次申请内存块对象的个数。如果本次申请采用的是 _maxSize 的值,那么还需要对 thread cache 中相应的自由链表的 _maxSize 进行 + 1 操作。
由此,thread cache 第一次向 central cache 申请某大小的对象时,申请到的内存块数量统一是 1 个,但下一次 thread cache 再向 central cache 申请同样大小的内存块时,就会申请到两个。直到该自由链表中 _maxSize 的值,增长到超过由慢开始反馈调节算法得出的值后,就不会继续增长了,此后申请到的内存块个数就是由慢开始反馈调节算法得出的值。
而慢开始反馈调节算法的调用,和 _maxSize 的比较操作,应是在 ThreadCache 类中,负责从 central cache 获取对象的成员函数 FetchFromCentralCache() 中进行的,且要实现 thread cache 向 central cache 申请内存资源,也就是实现 ThreadCache::FetchFromCentralCache()。
- ThreadCache.h
#pragma once
#include"Common.h"
class ThreadCache
{
public:
//申请内存
void* Allocate(size_t size);
//释放内存
void Deallocate(void* ptr, size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// TLS - thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
- ThreadCache.cpp
#include"ThreadCache.h"
#include"CentralCache.h"
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);//求向上对齐数
size_t index = SizeClass::Index(size); //求在哈希桶中的下标
if (!_freeLists[index].Empty()) //相应下标处的自由链表有资源,就从中取资源来用
{
return _freeLists[index].Pop();
}
else //没有就向上层申请资源
{
return FetchFromCentralCache(index, alignSize);
}
}
void ThreadCache::Deallocate(void* ptr,size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找映射的自由链表桶,将内存块插入
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//...
}
.3- thread cache 向 central cache 申请内存资源
要实现 thread cache 向 central cache 申请内存资源,也就是实现 ThreadCache::FetchFromCentralCache()。
我们先将慢开始反馈调节算法的调用、_maxSize 的比较操作添加到 FetchFromCentralCache() 中。
- ThreadCache.cpp
#include"ThreadCache.h"
#include"CentralCache.h"
void* ThreadCache::Allocate(size_t size)
{
//...
}
void ThreadCache::Deallocate(void* ptr,size_t size)
{
//...
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果不断有这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小;size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
//...
}
至此,我们暂时求得了 thread cache 要向 central cache 一次性申请的内存块数量 batchNum。接下来,应该是从 central cache 中获取 batchNum 个内存块,那么 CentralCache 类中还要提供一个成员函数,以支持 ThreadCache::FetchFromCentralCache() 可以从 central cache 中申请到内存。
- CentralCache.h
#pragma once
#include"Common.h"
class CentralCache //单例的饿汉模式
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 从 page cache 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// central cache分配一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
//参数说明:切割的起始位置、结束位置,内存块的个数,一个内存块的字节大小
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
- CentralCache.cpp
#include"CentralCache.h"
CentralCache CentralCache::_sInst;
// 从 page cache 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// ...
}
// central cache分配一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
//加锁
_spanLists[index]._mtx.lock();
//在相应的哈希桶中获取一个非空的span
Span* span = GetOneSpan(_spanLists[index],size);
assert(span);
assert(span->_freeList);
//从span中获取batchNum个对象,如果不够batchNum个,就有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0; //防越界!_freeList中的节点可能不够batchNum个
size_t actualNum = 1;//记录实际获取的内存块对象的数量
while(i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end); //end从头往后走 batchNum - 1 步
i++; //防越界
++actualNum; //记录实际获取的内存块对象的数量
}
span->_freeList = NextObj(end);//取完后,剩下的对象继续放到自由链表
NextObj(end) = nullptr; //取出的一段链表的表尾置空
span->_useCount += actualNum; //更新被分配给thread cache的计数
//解锁
_spanLists[index]._mtx.unlock();
return actualNum; //最终返回实际取到的内存块数量
}
【Tips】 CentralCache::FetchRangeObj() 的实现细节
由于 central cache 是所有线程共享的,因此在访问 central cache 中的哈希桶时,需要先给对应的哈希桶加上桶锁,在获取到对象后再将桶锁解掉。
在向 central cache 获取对象时,先是在 central cache 对应的哈希桶中获取到一个非空的 span,然后从这个 span 下的自由链表中,取出 batchNum 个对象即可。但可能这个非空的 span 的自由链表下,对象的个数不足 batchNum 个,那此时有多少给多少就行了。
换句话说,thread cache 实际从 central cache 获得的对象的个数,可能与我们传入的 batchNum 的值是不一样的,因此我们需要统计本次申请过程中,thread cache 实际获取到的对象个数,并根据该值及时更新 span 中相应的计数器 _useCount。
另外需要注意的是,虽然实际申请到对象的个数可能比 batchNum 要小,但这并不会产生任何不好的影响,因为 thread cache 的本意就是向 central cache 申请至少一个对象,我们之所以要将这个过程设计为一次性多申请一些对象,是为了下次线程再申请相同大小的对象时,就可以直接在 thread cache 里获取了,而不用再向central cache申请,以此提高效率。
然后,我们就可以在 ThreadCache::FetchFromCentralCache() 调用 CentralCache::FetchRangeObj(),支持 thread cache 向 central cache 申请内存资源了。
另外,我们还需要根据申请的内存块数量,来返回申请的内存块或其起始地址。如果申请的数量为 1,则直接返回这一个申请的内存块对象即可;如果申请的数量大于 1,则需要先将剩下的内存块对象挂到 thread cache 中对应的哈希桶里,再返回一个申请的内存块对象。
- ThreadCache.cpp
#include"ThreadCache.h"
#include"CentralCache.h"
void* ThreadCache::Allocate(size_t size)
{
//...
}
void ThreadCache::Deallocate(void* ptr,size_t size)
{
//...
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果不断有这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小;size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
void* start = nullptr, * end = nullptr;//两个输入输出型参数
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum >= 1);
//申请到对象的个数是一个,则直接将这一个对象返回即可
if (actualNum == 1)
{
assert(start == end);
return start;
}
//申请到对象的个数是多个,还需要将剩下的对象挂到thread cache中对应的哈希桶中
else //actualNum > 1
{
_freeLists[index].PushRange(NextObj(start), end);//范围头插,已经写在上文中的Common.h里了
return start;
}
}
六、实现 page cache 申请
1)基本框架
page cache 与 central cache 一样,都是哈希桶的结构,且 page cache 的每个哈希桶中里挂的也是一个个被构建成双向链表的 span。
不过,page cache 的映射规则与 central cache、thread cache 都不相同,却更加简单明了,采用的是直接定址法,比如 1 号桶挂的都是 1 页的 span,2 号桶挂的都是 2 页的 span......以此类推。
central cache 的每个桶中的 span,被切成了一个个对应大小的内存块对象,以供 thread cache 申请。而 page cache 则不同,page cache 中的 span 是没有被进一步切小的,因为 page cache 服务的是 central cache 而非 thread cache。当 central cache 没有 span 时,向 page cache 申请的是某一固定页数的 span,而如何切分申请到的这个span就应该由 central cache 自己来决定。
【Tips】central cache 如何向 page cache 申请内存资源
page cache 中究竟有多少个桶,与其中最大挂多少页的 span 有关,这里我们就最大挂128 页的span。这是因为,线程申请单个内存块对象最大为 256KB,而 128 页可以被切成 4 个 256KB 的对象,对于线程的申请来说是绰绰有余的。如果想在 page cache 中挂更大的span,也可以根据具体需求进行设置。为了让桶号与页号对应起来,我们可以将第 0 号桶空出来不用,将哈希桶的个数设置为129。
当 central cache 向 page cache 申请内存时,page cache 先检查对应的页位置有没有 span,如果没有,则向更⼤的页位置寻找⼀个 span,如果找到则将其分裂成两个。⽐如:申请的是 4 ⻚ page,但 4 ⻚ page 后⾯没有挂 span,则向后⾯寻找更⼤的 span;如果在 10 ⻚ page 位置找到⼀个 span,则将 10 ⻚ page span 分裂 为⼀个 4 ⻚ page span 和⼀个 6 ⻚ page span; 4 ⻚ page span 被挂到 4 页对应的桶里并返回给 central cache 使用,而 6 ⻚ page span 则被挂到 6 页对应的桶中待用。
如果 page cache 的 _spanList[129] 中都没有合适的span,则向系统使⽤ mmap、brk 或 VirtualAlloc 等⽅式,申请 128 ⻚ page span 挂在⾃由链表中,再重复上述的申请过程。
需要注意的是 central cache 和 page cache 的核⼼结构都是 spanlist 的哈希桶,但是他们是有本质区别的。central cache 中哈希桶,是按跟 thread cache ⼀样的⼤⼩对⻬关系映射的,他的 spanlist 中挂的 span 中的内存,都被按映射关系切好链接成⼩块内存的⾃由链表;⽽ page cache 中的 spanlist 则是按下标桶号映射的,也就是说第 i 号桶中挂的 span 都是 i ⻚内存。
另外,page cache 的哈希桶是需要一把大锁的,需在哈希桶被访问的时候让哈希桶整体被锁住。central cache 向 page cache 申请内存并访问某个桶使,page cache可能会将其他桶中大页的 span 切小后再给 central cache;而当 central cache 将某个 span 归还给 page cache 时,page cache 也会尝试将该 span 与其他桶中的 span 进行合并——也就是说,在访问 page cache 时,可能需要访问 page cache 的多个桶。如果 page cache 用的是桶锁,就会涉及频繁的加锁和解锁,导致程序的效率低下,代码编写也十分麻烦;而用一个大锁将整个 page cache 给锁住,就能避免这些问题,同时又能保证线程安全。
和 central cache 一样,page cache 在整个进程中也应只有一个,因此也应被设计为单例模式。
- PageCache.h
#pragma once
#include"Common.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
//获取一个k页的span
Span* NewSpan(size_t k);
private:
SpanList _spanLists[NPAGES];//哈希桶,挂的是以⻚为单位的span
public:
std::mutex _pageMtx; //大锁
private:
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
- PageCache.cpp
#include"PageCache.h"
PageCache PageCache::_sInst;
Span* PageCache::NewSpan(size_t k)
{}
2)与 central cache 联动
.1- central cache 向 page cache 申请内存资源
central cache 无法按需给 thread cache 分配足够的内存资源时,就会向 page cache 申请内存资源,并将其进行切分,再分配给 thread cache。
上文中,我们已经完善了 central cache 为 thread cache 分配内存的接口 FetchRangeObj()。
- CentralCache.cpp
#include"CentralCache.h"
#include"PageCache.h"
CentralCache CentralCache::_sInst;
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{}
// central cache分配一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
//加锁
_spanLists[index]._mtx.lock();
//在相应的哈希桶中获取一个非空的span
Span* span = GetOneSpan(_spanLists[index],size);
assert(span);
assert(span->_freeList);
//从span中获取batchNum个对象,如果不够batchNum个,就有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0; //防越界!_freeList中的节点可能不够batchNum个
size_t actualNum = 1;//记录实际获取的内存块对象的数量
while(i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end); //end从头往后走 batchNum - 1 步
i++; //防越界
++actualNum; //记录实际获取的内存块对象的数量
}
span->_freeList = NextObj(end);//取完后,剩下的对象继续放到自由链表
NextObj(end) = nullptr; //取出的一段链表的表尾置空
span->_useCount += actualNum; //更新被分配给thread cache的计数
//解锁
_spanLists[index]._mtx.unlock();
return actualNum;//最终返回实际取到的内存块数量
}
接下来,我们继续完善 central cache 向 page cache 申请内存资源的接口 CentralCache::GetOneSpan()。
thread cache 向 central cache 申请内存块对象时,central cache 需要先从对应的哈希桶中获取到一个非空的 span,然后从这个非空的 span 中取出若干内存块对象返回给 thread cache。具体的方式是,先遍历 central cache 对应哈希桶中的双链表,如果该双链表中有非空的 span,那么直接将该 span 进行返回即可。
为了方便遍历这个双链表,我们可以模仿迭代器的遍历方式,给 SpanList 类提供 Begin() 和 End() 成员函数,分别用于获取双链表中的第一个 span 和最后一个 span 的下一个位置,也就是头节点的下一个位置和头节点。同时,我们引入头插和头删的成员函数,方便后续控制哈希桶中的spanList 双向链表。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
using std::cout;
using std::endl;
//...
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; //大块内存的起始页页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _useCount = 0; //记录分配给thread cache的定长内存块的数量
void* _freeList = nullptr; //由定长内存块构建的自由链表的头指针
};
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//取首
Span* Begin()
{
return _head->_next;
}
//取尾
Span* End()
{
return _head;
}
//验空
bool Empty()
{
return _head->_next == _head;
}
//头插
void PushFront(Span* span)
{
Insert(Begin(), span);
}
//头删
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
//在pos位置插入
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
//先找到pos的前一个位置
Span* prev = pos->_prev;
//prev<-->newSpan<-->pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
//先找到pos前后的位置
Span* prev = pos->_prev;
Span* next = pos->_next;
//prev<-->next
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
但如果遍历双链表后,发现双链表中没有 span,那么 central cache 就需要向 page cache 申请内存块了。
特别的,page cache 中的 span 不再是以字节为单位,而是以页为单位了。
我们可以先根据内存块对象的大小计算出,thread cache 一次向 central cache 申请对象的个数上限,然后将这个上限的值乘以单个对象的大小,就算出了具体需要多少字节,最后再将这个算出来的字节数转换为页数。若转换后不够一页,则按一页去申请,否则转换出来是几页,就申请几页。如此就保证了,central cache 向 page cache 申请内存时,能够尽量使申请到的内存满足 thread cache 向 central cache 申请的上限。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
using std::cout;
using std::endl;
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13; //页大小转换偏移,即一页定义为2^13,也就是8KB
//...
//计算定长内存块的对齐映射规则
class SizeClass
{
public:
//...
//thread cache一次从central cache获取对象的上限
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
//对象越小,计算出的上限越高
//对象越大,计算出的上限越低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
};
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; //大块内存的起始页页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _useCount = 0; //记录分配给thread cache的定长内存块的数量
void* _freeList = nullptr; //由定长内存块构建的自由链表的头指针
};
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//取首
Span* Begin()
{
return _head->_next;
}
//取尾
Span* End()
{
return _head;
}
//验空
bool Empty()
{
return _head->_next == _head;
}
//头插
void PushFront(Span* span)
{
Insert(Begin(), span);
}
//头删
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
//在pos位置插入
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
//先找到pos的前一个位置
Span* prev = pos->_prev;
//prev<-->newSpan<-->pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
//先找到pos前后的位置
Span* prev = pos->_prev;
Span* next = pos->_next;
//prev<-->next
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
当 central cache 申请到若干页的 span 后,还需要将这个 span 切成一个个对应大小的对象,并挂到该 span 对应的自由链表中。
central cache 要向 page cache 申请,就需要 PageCache 类为其提供一个接口 PageCache::NewSpan(),来获取若干页大小的 span,这样,在 CentralCache::GetOneSpan() 中按需调用该接口,即可申请到若干页的 span(PageCache::NewSpan() 在下一小节完善)。
- PageCache.h
#pragma once
#include"Common.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
//获取一个k页的span
Span* NewSpan(size_t k);
private:
SpanList _spanLists[NPAGES];//哈希桶
public:
std::mutex _pageMtx;//大锁
private:
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
而要找到一个 span 所管理的内存块,可以通过以下方式:首先需要计算出该span的起始地址,我们可以用这个span的起始页号乘以一页的大小,即可得到这个 span 的起始地址;用这个 span 的页数乘以一页的大小,即可得到这个span所管理的内存块的大小;用起始地址加上内存块的大小,即可得到这块内存块的结束位置。
明确了这块内存的起始和结束位置后,我们就可以进行切分了。
即根据所需对象的大小,每次从大块内存切出一块固定大小的内存块尾插到 span 的自由链表中。使用尾插是因为,这样可以使尾插到自由链表中的内存块对象,看起来是按照链式结构链接起来的,且实际它们在物理上就是连续的,如此,当我们把这些连续内存分配给某个线程使用时,可以提高该线程的 CPU 缓存利用率。
把 span 切好后,就将其挂到 central cache 对应哈希桶的 spanList 中,这里我们使用的是上文中 SpanList 类提供的头插。使用头插来将切好的 span 挂到哈希桶中,这样当 central cache 下一次从该桶下的 spanList 中获取非空 span 时,一来就能找到。
- CentralCache.cpp
#include"CentralCache.h"
#include"PageCache.h"
CentralCache CentralCache::_sInst;
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
// 1、查看当前的spanlist中是否有还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
//2、代码走到这里,说明没有空闲的span,只能找page cache去申请
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size));
// 计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageId << PAGE_SHIFT);//char*是一字节的,更方便控制切分,更精确
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
//3、把大块内存切小,并用自由链表链接起来
// 1)先切一块下来去做头,方便尾插
span->_freeList = start;//标识原先的头
start += byte_size; //切一块
void* tail = span->_freeList;//标识切下的一块
// 2)尾插
while (start < end)
{
NextObj(tail) = start;
tail = NextObj(tail);
start += byte_size;
}
NextObj(tail) = nullptr; //最终将尾的指向置空
//4、将切好的span头插到spanList
list.PushFront(span);
return span;
}
// central cache分配一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
//加锁
_spanLists[index]._mtx.lock();
//在相应的哈希桶中获取一个非空的span
Span* span = GetOneSpan(_spanLists[index],size);
assert(span);
assert(span->_freeList);
//从span中获取batchNum个对象,如果不够batchNum个,就有多少拿多少
start = span->_freeList;
end = start;
size_t i = 0; //防越界!_freeList中的节点可能不够batchNum个
size_t actualNum = 1;//记录实际获取的内存块对象的数量
while(i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end); //end从头往后走 batchNum - 1 步
i++; //防越界
++actualNum; //记录实际获取的内存块对象的数量
}
span->_freeList = NextObj(end);//取完后,剩下的对象继续放到自由链表
NextObj(end) = nullptr; //取出的一段链表的表尾置空
span->_useCount += actualNum; //更新被分配给thread cache的计数
//解锁
_spanLists[index]._mtx.unlock();
return actualNum;//最终返回实际取到的内存块数量
}
.2- page cache 向系统申请内存资源
当调用 CentralCache::GetOneSpan() 从 central cache 的某个哈希桶获取一个非空的 span时,如果该桶中没有 span了,那么此时 central cache 就需要向 page cache 申请若干页的 span了,也就是在 CentralCache::GetOneSpan() 调用 PageCache::NewSpan()。
page cache 是直接按照页数进行映射的,因此我们要从page cache获取一个k页的span,就应该直接先去找 page cache 的第k号桶。
如果第 k 号桶中有 span,那我们直接使用上文中 SpanList 类提供的头删,将一个 span 从双向链表中删除,并返回给 central cache 即可。
如果 page cache 的第 k 号桶中没有span,就应该继续找后续的桶。只要后续任意一个桶中有一个 n 页span,就可以将其切分成一个 k 页的 span 和一个 n - k 页的 span,然后将切出来 k 页的 span 返回给 central cache,并将 n - k 页的 span 挂到 page cache 的第 n - k 号桶即可。
但如果后面的桶中也都没有 span,就需要向堆申请一个 128 页的 span了,具体的方式是,调用 windows 系统下提供的 VirtualAlloc() 接口。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
using std::cout;
using std::endl;
#ifdef _WIN32
#include<windows.h>
#else
// ...
#endif
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13; //页大小转换偏移,即一页定义为2^13,也就是8KB
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
//windows系统下使用的是VirtualAlloc
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
//...
//计算定长内存块的对齐映射规则
class SizeClass
{
public:
//...
//thread cache一次从central cache获取对象的上限
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
//对象越小,计算出的上限越高
//对象越大,计算出的上限越低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
};
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; //大块内存的起始页页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _useCount = 0; //记录分配给thread cache的定长内存块的数量
void* _freeList = nullptr; //由定长内存块构建的自由链表的头指针
};
//带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//取首
Span* Begin()
{
return _head->_next;
}
//取尾
Span* End()
{
return _head;
}
//验空
bool Empty()
{
return _head->_next == _head;
}
//头插
void PushFront(Span* span)
{
Insert(Begin(), span);
}
//头删
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
//在pos位置插入
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
//先找到pos的前一个位置
Span* prev = pos->_prev;
//prev<-->newSpan<-->pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
//先找到pos前后的位置
Span* prev = pos->_prev;
Span* next = pos->_next;
//prev<-->next
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
另外需要注意的是,向堆申请内存后,得到的是这块内存的起始地址,此时还需要将该地址转换为页号。由于我们向堆申请内存时都是按页进行申请的,因此我们直接将该地址除以一页的大小即可得到对应的页号。
- PageCache.cpp
#include"PageCache.h"
PageCache PageCache::_sInst;
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
// 先检查第k个桶里面有没有span
if (!_spanLists[k].Empty())
{
return _spanLists[k].PopFront();//有就返回
}
//再检查一下后面的桶里面有没有span,有就把它进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
// 在nSpan的头部切一个k页下来,将k页span返回,将nSpan再挂到对应的映射位置
//更新kSpan的页号和页数
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
//更新nSpan的页号和页数
nSpan->_pageId += k;
nSpan->_n -= k;
//将nSpan再挂到对应的映射位置
_spanLists[nSpan->_n].PushFront(nSpan);
//将k页span返回
return kSpan;
}
}
// 代码走到这个位置,就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
【Tips】CentralCache::GetOneSpan() 的加锁细节
向堆申请到128页的 span 后,需要将其切分成 k 页的 span 和 128-k 页的 span,但为了尽量避免出现重复的代码,我们最好不要再编写对应的切分代码,可以先将申请到的 128 页的 span 挂到 page cache 对应的哈希桶中,然后再递归调用 NewSpan() 即可,此时在往后找 span 时,就一定会在第 128 号桶中找到该 span,然后进行切分。
但这里有一个问题:当 central cache 向 page cache 申请内存时,central cache 对应的哈希桶是处于加锁的状态的,那在访问 page cache 之前,我们也应该先把 central cache 对应的桶锁解掉。
虽然此时 central cache 的这个桶中是没有内存供其他 thread cache 申请的,但 thread cache 除了申请内存还会释放内存,如果在访问 page cache 前,将central cache对应的桶锁解掉,那么此时当其他 thread cache 想要归还内存到 central cache 的这个桶时,就不会被阻塞。
因此在调用 NewSpan() 之前,我们需要先将 central cache 对应的桶锁解掉,然后再将page cache 的大锁加上。而当申请到 k 页的 span 后,我们需要将 page cache 的大锁解掉,但此时不需要立刻获取到 central cache 中对应的桶锁,因为 central cache 拿到 k 页的span 后还会对其进行切分操作,因此可以在将切好的 span 挂到 central cache 对应的桶上时,再获取对应的桶锁。
- CentralCache.cpp
#include"CentralCache.h"
#include"PageCache.h"
CentralCache CentralCache::_sInst;
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
//先把central cache的桶锁解掉,这样其他线程释放内存,不会被阻塞
list._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();//对page cache加锁
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size));
PageCache::GetInstance()->_pageMtx.unlock();//解锁
//对获取的span进行切分,无须加锁,因为此时其他线程访问不到这个span
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
span->_freeList = start;
start += byte_size;
void* tail = span->_freeList;
while (start < end)
{
NextObj(tail) = start;
tail = NextObj(tail);
start += byte_size;
}
NextObj(tail) = nullptr;
//切好span以后,需要把span挂到相应的桶里,此时需要加锁
list._mtx.lock();
list.PushFront(span);
return span;
}
// central cache分配一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
//加锁
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index],size);
assert(span);
assert(span->_freeList);
start = span->_freeList;
end = start;
size_t i = 0;
size_t actualNum = 1;
while(i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
i++;
++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
span->_useCount += actualNum;
//解锁
_spanLists[index]._mtx.unlock();
return actualNum;
}
七、实现释放内存资源
1)线程向 thread cache 归还内存
当某个线程申请的对象不用了,可以将其释放给 thread cache,然后 thread cache 将该对象插入到对应哈希桶的自由链表中即可。
但随着 thread cache 不断回收线程所释放的内存块对象,相应的自由链表的长度也会越来越长。这些内存堆积在某一个 thread cache 中就是一种浪费,因此应该将它们向上还给 central cache,这样一来,这些内存对其他线程来说也是可申请的。
- ThreadCache.h
#pragma once
#include"Common.h"
class ThreadCache
{
public:
// 申请内存
void* Allocate(size_t size);
// 从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);
// 释放内存
void Deallocate(void* ptr, size_t size);
// 线程释放对象导致链表过长时,回收内存回到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// TLS - thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
- ThreadCache.cpp
#include"ThreadCache.h"
#include"CentralCache.h"
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignSize);
}
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
size_t batchNum = min(_freeLists[index].MaxSize(),SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
void* start = nullptr, * end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum >= 1);
if (actualNum == 1)
{
assert(start == end);
return start;
}
else //actualNum > 1
{
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1); //此处需修改,PushRange()的新参数见下文
return start;
}
}
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找映射的自由链表桶,将内存块插入
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//当链表长度大于一次性批量申请的内存时,就开始归还一部分给cetral cache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index],size);
}
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
//从相应的自由链表中批量取出内存对象
list.PopRange(start, end, list.MaxSize());
//将取出的内存对象归还给在central cache中对应的span
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
为了支持 thread cache 回收和归还内存的接口,我们还需要在实现自由链表功能的 FreeList 类中,添加一个新的成员变量_size,以记录当前自由链表中对象的个数,同时要对 FreeList 类中的各个成员函数进行功能调整;然后再添加一个成员函数 Size() 以实时获取自由链表中对象的个数,和一个成员函数 PopRange() 以从自由链表中取出指定个数的对象。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
using std::cout;
using std::endl;
//...
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13; //页大小转换偏移,即一页定义为2^13,也就是8KB
//...
static void*& NextObj(void* obj)//取前4/8个字节,可读可写
{
return *(void**)obj;
}
//管理切分好的定长内存块的自由链表
class FreeList
{
public:
//头插一个节点
void Push(void* obj)
{
assert(obj);
NextObj(obj) = _freeList;//用前4/8个字节存下一个节点的地址
_freeList = obj;
++_size;
}
//范围头插
void PushRange(void* start, void* end,size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
//头删一个节点
void* Pop()
{
assert(_freeList);
void* obj = _freeList; //记录头节点的地址
_freeList = NextObj(obj);//将头指针指向头节点的下一个节点
--_size;
return obj; //返回头节点
}
//范围头删
void PopRange(void*& start, void*& end, size_t n)
{
assert(n <= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; i++)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
//判断链表是否为空
bool Empty()
{
return _freeList == nullptr;
}
//获取 _maxSize
size_t& MaxSize()
{
return _maxSize;
}
//获取_size
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;//自由链表的头指针
size_t _maxSize = 1; //一个自由链表中的内存块数量上限
size_t _size = 0; //当前自由链表中内存块的数量
};
//...
2)thread cache 向 central cache 归还内存
当 thread cache 中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span,具体方式是在 ThreadCache::Deallocate() 中调用了 ThreadCache::ListTooLong()。
而 ThreadCache::ListTooLong 要将内存对象归还给 central cache 中对应的 span,还需要 central cache 提供一个接口 CentralCache::ReleaseListToSpans,使其能成功将一定数量的对象释放到相应的 span。
- CentralCache.h
#pragma once
#include"Common.h"
class CentralCache //单例的饿汉模式
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 从 page cache 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// central cache分配一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 将一定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
但需要注意的是,还给 central cache 的内存块对象不一定都属于同一个 span 。central cache中 的每个哈希桶中可能都不止有一个 span,因此在确定内存块对象应该还给 central cache 的哪一个桶后,还需要确定它们应该还给该桶中的哪一个 span。
【Tips】如何根据内存块对象的地址得到其对应的页号
某个页中的所有地址,除以页的大小,都等该页的页号。
假设一页的大小是 100,那么地址 0~99 都属于第 0 页,它们除以 100 都等于 0,而地址 100~199 都属于第 1 页,它们除以 100 都等于 1。
而实际上,一页的大小为 2^13 字节,即 8 k。假设有两个页号,一个为 2000,另一个为 2001,这两个页号之间的所有地址除以 2^13,都等于 2000,因为它们都是属于第 2000 页的。
由此,我们可以通过这个关系,来进行地址和页号之间的换算。
为了方便 central cache 根据内存块对象的地址得到其对应的页号,我们在 page cache 的 PageCache 类中增加一个存储 span 地址和页号的映射的成员变量,和可以根据映射获取页号的成员函数。
- PageCache.h
#pragma once
#include"Common.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
//获取从内存块对象到span的映射
Span* MapObjToSpan(void* obj);
//获取一个k页的span
Span* NewSpan(size_t k);
private:
SpanList _spanLists[NPAGES];//哈希桶
std::unordered_map<PAGE_ID, Span*> _idSpanMap;//建立span的地址和其对应页号的映射
public:
std::mutex _pageMtx;//大锁
private:
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
- PageCache.cpp
#include"PageCache.h"
PageCache PageCache::_sInst;
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
// 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
//在切分内存块时,顺便建立page id和span的映射,
//方便central cache回收小块内存时,查找对应的span
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
//获取从内存块对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//计算页号
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
此时,当 thread cache 还对象给 central cache 时,就可以依次遍历这些对象,将这些对象插入到其对应 span 的自由链表当中,并及时更新该 span 的 _useCount 计数。而在此过程中,如果 central cache 中某个 span 的 _useCount 计数减到了 0 ,就说明这个 span 分配出去的对象全部都还回来了,于是就可以将这个 span 再进一步还给 page cache。
需要注意的是,要把某个span还给page cache,需要先将这个 span 从 central cache 对应的桶中移除,同时将该 span 的自由链表头指针置空、前后指针也置空,方便后续将其插入到page cache 对应桶中的双向链表。但 span 中记录的起始页号和页数是万万不能清除的,否则对应的内存块就找不到了。
另外,在 central cache 还 span 给 page cache 时,还存在线程安全问题需要留心,例如对 central cache 的桶锁问题和对 page cache 的大锁问题(详见以下代码注释)。
- CentralCache.cpp
#include"CentralCache.h"
#include"PageCache.h"
CentralCache CentralCache::_sInst;
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
//...
}
// central cache分配一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
//...
}
// 将一定数量的对象释放到span跨度
void CentralCache::ReleaseListToSpans(void* start, size_t byte_size)
{
size_t index = SizeClass::Index(byte_size);
_spanLists[index]._mtx.lock();//加桶锁
while (start)
{
void* next = NextObj(start);//先记录一下,下一个节点
Span* span = PageCache::GetInstance()->MapObjToSpan(start);//根据page id找到对应span
//将内存块对象头插到对应span的自由链表
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--; //每挂一个内存块对象回对应span的自由链表,分配给thread cache的内存块计数-1
if (span->_useCount == 0)//分配给thread cache的内存块都挂完了
{
//将这个span回收给page cache,让page cache可以尝试去左前后页的合并
_spanLists[index].Erase(span); //将该span从桶中取出
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
_spanLists[index]._mtx.unlock(); //先解桶锁,避免其他正在申请或释放的线程被阻塞
PageCache::GetInstance()->_pageMtx.lock(); //把page cache锁上
PageCache::GetInstance()->ReleaseSpanToPageCache(span); //将该span还给page cache
PageCache::GetInstance()->_pageMtx.unlock(); //解掉page cache的大锁
_spanLists[index]._mtx.lock(); //结束后要把桶锁加回来
}
start = next;
}
_spanLists[index]._mtx.unlock();//解锁
}
且为了能支持 CentralCache::ReleaseListToSpans 将内存块归还给 page cache,PageCache 类中还需提供一个成员函数 PageCache::ReleaseSpanToPageCache()。
- PageCache.h
#pragma once
#include"Common.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
//获取从内存块对象到span的映射
Span* MapObjToSpan(void* obj);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
//获取一个k页的span
Span* NewSpan(size_t k);
private:
SpanList _spanLists[NPAGES];//哈希桶
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
public:
std::mutex _pageMtx;//大锁
private:
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
3)central cache 向 page cache 归还内存
如果 central cache 中有某个 span 的_useCount 计数减为了 0,那么 central cache 就可以将该 span 向上还给 page cache 了,而 page cache 则既需要尝试该 span 与其他空闲的 span 进行合并,还需要将合并后的 span 挂到对应的哈希桶上,这个过程是在 PageCache 类提供的成员函数 PageCache::ReleaseSpanToPageCache() 实现的。
【Tips】page cache 如何进行前后页的合并
合并的过程可以分为与前页合并和与后页合并。
如果还回来的 span 的起始页号是 num,且该 span 所管理的页数是 n,那么在与前页合并时,就需要判断第 num-1 页对应 span 是否空闲,是则可以将其进行合并,且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。而与后页合并时,就需要判断第 num + n 页对应的 span 是否空闲,是则可以将其进行合并,且合并后也需要继续向后尝试进行合并,直到不能进行合并为止。
在合并期间,page cache 可以借由存储了页号与 span 之间映射的成员变量 _idSpanMap,来通过页号获取到对应的span的。
但需要注意的是,当通过页号找到其对应的 span 时,该 span 此时可能挂在 page cache,也可能挂在 central cache,而由于挂在 central cache 的 span 中的内存块对象正在被其他线程使用,因此我们只能合并挂在 page cache 的 span。那么,我们就需要判断,该 span 究竟挂在哪里。
不难想到通过 span 结构中的 _useCount 计数,来判断某个span到底是在central cache还是在page cache。但当 central cache 刚向 page cache 申请到一个 span 时,这个 span 的 _useCount 原本就是等于 0 的,如果我们正在对该 span 进行切分,page cache 就可能会把该 span 拿去进行合并了,这显然是不合理的。
由此,我们不能使用 _useCount 计数来进行判断,于是就还需在 span 结构中再增加一个 _isUse 字段,用于标记该 span 是否正在被使用。
在一个 span 结构被创建之初,我们默认该 span 是没有被使用的;当 central cache 向 page cache 申请到一个 span 时,需要立即将该 span 的 _isUse 改为 true;而当 central cache 将某个 span 还给 page cache 时,也就需要将该 span 的 _isUse 改成 false。
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<unordered_map>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
using std::cout;
using std::endl;
//...
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0; //大块内存的起始页页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //双向链表的结构
Span* _prev = nullptr;
size_t _useCount = 0; //记录分配给thread cache的定长内存块的数量
void* _freeList = nullptr; //由定长内存块构建的自由链表的头指针
bool _isUse = false; //标识该span是否在被使用(默认是没被使用的)
};
//...
由于在合并 page cache 中的span时,需要通过页号找到其对应的 span,而一个 span 是在被分配给 central cache 时,才建立各个页号与 span 之间的映射关系的,因此 page cache 中的 span 其实也需要建立页号与 span 之间的映射关系。
但与 central cache 中的 span 不同的是,page cache 中只需建立一个 span 的首尾页号与该span 之间的映射关系。这是因为,当一个 span 在尝试进行合并时,如果是往前合并,那么只需要通过一个 span 的尾页找到这个 span;如果是向后合并,那么只需要通过一个 span 的首页找到这个 span。也就是说,在进行合并时,其实用到 span 与其首尾页之间的映射关系就够了。
由此,当我们调用 PageCache 类提供的 PageCache::NewSpan() 申请 k 页的 span 时,如果是将 n 页的 span 切成了一个 k 页的 span 和一个 n - k 页的 span,除了需要建立 k 页 span 中每个页与该 span 之间的映射关系之外,还需要建立剩下的 n - k 页的 span 与其首尾页之间的映射关系。
而 page cache 中的 span 与其首尾页之间都建立了映射关系,就方便进行 span 的合并了。
【Tips】PageCache::ReleaseSpanToPageCache() 的实现细节
在向前或向后进行合并的过程中:
- 如果没有通过页号获取到其对应的 span,说明对应到该页的内存块还未申请,此时停止合并即可。
- 如果通过页号获取到了其对应的 span,但该 span 处于被使用的状态,那也必须停止合并。
- 如果合并后的 span 大于 128 页,则不能进行本次合并,因为 page cache 无法对大于 128 页的 span 进行管理。
在合并span时,由于这个 span 是在 page cache 的某个哈希桶中的,因此在合并后,还需要将其从对应的双向链表中移除,然后 delete 掉这个被合并了的 span 结构。
在合并结束后,除了将合并后的 span 挂到 page cache 对应哈希桶中,还需要建立该 span 与其首尾页之间的映射关系,便于此后继续合并。
- PageCache.cpp
#include"PageCache.h"
PageCache PageCache::_sInst;
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0 && k < NPAGES);
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
//存储nSpan的首尾页号与nSpan的映射,方便page cache回收内存时进行合并查找
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
//获取从内存块对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//计算页号
auto ret = _idSpanMap.find(id);
if (ret == _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
//将还回来的span与其他空闲的span进行前后页合并,缓解内存碎片问题
//向前合并
while (1)
{
PAGE_ID prevId = span->_pageId - 1;//找归还的span的前一个
auto ret = _idSpanMap.find(prevId);
if (ret != _idSpanMap.end())// 不存在前一个页号,就不合并了
{
break;
}
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)//前一个页号的span在被使用中,就不合并了
{
break;
}
if (prevSpan->_n + span->_n > NPAGES - 1)// 合并后如果超过128页,span就没办法管理,那也不合并了
{
break;
}
//代码走到这里,可以合并了
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);//将prevSpan从对应的双链表中移除
delete prevSpan;
}
//向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())// 不存在后一个页号,就不合并了
{
break;
}
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)//后一个页号的span在被使用中,就不合并了
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)// 合并后如果超过128页,span就没办法管理,那也不合并了
{
break;
}
//代码走到这里,可以合并了
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);//将nextSpan从对应的双链表中移除
delete nextSpan;
}
//将合并后的span挂到page cache对应的桶中
_spanLists[span->_n].PushFront(span);
span->_isUse = false;//将该span设置为未被使用的状态
//建立该span与其首尾页的映射
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}
八、其他细节优化
1)大于 256KB 的大块内存的申请和释放
每个线程都独享一个 thread cache ,但 thread cache 是用于申请小于等于 256 KB 的内存的,当线程对内存的需求大于 256KB ,thread cache 显然无法满足。
对于大于 256KB 的内存申请,也就是大于 32 页的内存申请,我们可以考虑直接让线程向 page cache 申请,毕竟 page cache 中最大的页有 128。但如果内存申请比 128 页还大,就只能向堆申请了。
需要注意的是,当申请的内存大于 256KB 时,虽然不是从 thread cache 进行获取,但在分配内存时,也是需要进行向上对齐的,对于大于 256KB 的内存,我们可以直接按页进行对齐。
既然如此,当释放内存时,也相应地需要判断释放的内存块对象的大小。
这里,小编将项目中需要修改的部分全部展示出来:
- Common.h
//增加向堆释放内存的接口
#pragma once
#include<iostream>
#include<vector>
#include<unordered_map>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
using std::cout;
using std::endl;
#ifdef _WIN32
#include <windows.h>
#else
// ...
#endif
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13; //页大小转换偏移,即一页定义为2^13,也就是8KB
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
// 直接向堆释放空间
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
//...
- ConcurrentAlloc.h
//修改申请和释放内存的接口的内部实现
#pragma once
#include "Common.h"
#include"ThreadCache.h"
#include"PageCache.h"
static void* ConcurrentAlloc(size_t size)
{
if (size > MAXBYTE)//大于256KB的内存申请
{
//计算出对齐后需要申请的页数
size_t alignSize = SizeClass::RoundUp(size);
size_t kpage = alignSize >> PAGE_SHIFT;
//直接向page cache申请kPage页的span
PageCache::GetInstance()->_pageMtx.lock();//加锁
Span* span = PageCache::GetInstance()->NewSpan(kpage);
PageCache::GetInstance()->_pageMtx.unlock();//解锁
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else //小于等于256KB的内存申请
{
// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象
if (pTLSThreadCache == nullptr)
{
pTLSThreadCache = new ThreadCache;
}
cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
static void ConcurrentFree(void* ptr,size_t size)
{
if (size > MAXBYTE)//大于256KB的内存释放
{
Span* span = PageCache::GetInstance()->MapObjToSpan(ptr);//获取相应页号
PageCache::GetInstance()->_pageMtx.lock();//加锁
PageCache::GetInstance()->ReleaseSpanToPageCache(span);//直接向page cache释放,最终释放回堆
PageCache::GetInstance()->_pageMtx.unlock();//解锁
}
else //小于等于256KB的内存释放
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
- PageCache.cpp
//修改PageCache::NewSpan()申请分配内存的实现细节
//和PageCache::ReleaseSpanToPageCache()释放归还内存的实现细节
#include"PageCache.h"
PageCache PageCache::_sInst;
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
if (k > NPAGES - 1)//大于128页直接找堆申请
{
void* ptr = SystemAlloc(k);
Span* span = new Span;
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
_idSpanMap[span->_pageId] = span;//建立页号与span之间的映射
return span;
}
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = new Span;
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
Span* bigSpan = new Span;
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
//获取从内存块对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{
//...
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > NPAGES - 1) //大于128页的,就直接还给堆
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
delete span;
return;
}
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
if (ret != _idSpanMap.end())
{
break;
}
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)
{
break;
}
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
delete prevSpan;
}
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
delete nextSpan;
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}
2)通过定长内存池脱离 new 的使用
tcmalloc 是要在高并发场景下替代 malloc 进行内存申请的,因此 tcmalloc 在实现的时,其内部是不调用 malloc 的。当前我们的代码中,存在通过 new 获取到的内存,而 new 在底层实际上就是封装了malloc,是调用了 malloc 的。
为了完全脱离 malloc ,我们将上文中实现的定长内存池引入,来代替 new 向堆申请空间。
- ObjectPool.h
#pragma once
#include"Common.h"
template<class T> /*一个对象的大小是固定的,类型模板参数也可以起到规定大小的作用*/
class ObjectPool
{
public:
//申请空间
T* New()
{
T* obj = nullptr;
//优先将归还的定长内存块,再次重复利用
//具体方式是对自由链表进行头删
if (_freeList)
{
void* next = *(void**)_freeList;
obj = (T*)_freeList;
_freeList = next;
}
else
{
//申请大块内存
if (_remainBytes < sizeof(T))//剩余内存不够一个对象的大小,就重新申请大块空间
{
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(128 * 1024);//一次向内存申请128kb空间
_memory = (char*)SystemAlloc(_remainBytes >> 13);
if (_memory == nullptr)
throw std::bad_alloc();
}
//将大块内存切下一块(一个对象的大小)并返回
obj = (T*)_memory;
size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);//一个对象的大小不及一个指针,就把一个指针的大小作为定长
_memory += objsize;
_remainBytes -= objsize;
}
//定位new,显示调用T的构造函数初始化
new(obj)T;
return obj;
}
//释放空间
void Delete(T* obj)
{
*(void**)obj = _freeList;
_freeList = obj;
}
private:
char* _memory=nullptr; //指向一次性向系统申请的大块内存空间,char*占一个字节,方便每次从内存中切空间
size_t _remainBytes = 0;//大块内存被切分的剩余字节数
void* _freeList=nullptr;//自由链表的头指针,存了自由链表的头节点的地址,用于管理已向内存申请、使用后被归还的内存块
// 自由链表中每个节点都是一块被归还的内存,其中存放的是下一个节点的地址
};
span对象基本都是在 page cache 层创建的,因此我们可以在 PageCache 类中定义一个成员变量 _spanPool,以便申请和释放 span 对象。
- PageCache.h
#pragma once
#include"Common.h"
#include"ObjectPool.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
Span* MapObjToSpan(void* obj);
void ReleaseSpanToPageCache(Span* span);
Span* NewSpan(size_t k);
private:
SpanList _spanLists[NPAGES];
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
ObjectPool<Span> _spanPool; //定长内存池
public:
std::mutex _pageMtx;
private:
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
由此,项目中使用 new 的地方,均要替换为调用定长内存池提供的 New(),而使用 delete 的地方,均要替换为调用定长内存池提供的 Delete()。
- PageCache.cpp
//修改PageCache::NewSpan()申请分配内存的实现细节
//和PageCache::ReleaseSpanToPageCache()释放归还内存的实现细节
#include"PageCache.h"
PageCache PageCache::_sInst;
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
if (k > NPAGES - 1)//大于128页直接找堆申请
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();//调用定长内存池提供的New()向堆上申请空间
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
_idSpanMap[span->_pageId] = span;
return span;
}
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();//调用定长内存池提供的New()向堆上申请空间
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
_idSpanMap[nSpan->_pageId] = nSpan;
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
}
return kSpan;
}
}
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();//调用定长内存池提供的New()向堆上申请空间
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
//获取从内存块对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{
//...
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span); //调用定长内存池提供的Delete()向堆释放空间
return;
}
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
if (ret != _idSpanMap.end())
{
break;
}
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)
{
break;
}
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);//调用定长内存池提供的Delete()向堆释放空间
}
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}
Span* nextSpan = ret->second;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);//调用定长内存池提供的Delete()向堆释放空间
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}
- ConcurrentAlloc.h
#pragma once
#include "Common.h"
#include"ThreadCache.h"
#include"PageCache.h"
static void* ConcurrentAlloc(size_t size)
{
if (size > MAXBYTE)//大于256KB的内存申请
{
//...
}
else //小于等于256KB的内存申请
{
if (pTLSThreadCache == nullptr)
{
//pTLSThreadCache = new ThreadCache;
static ObjectPool<ThreadCache> tcPool;
pTLSThreadCache = tcPool.New();
}
//cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; //for debug
return pTLSThreadCache->Allocate(size);
}
}
static void ConcurrentFree(void* ptr,size_t size)
{
//...
}
- Common.h
#pragma once
#include<iostream>
#include<vector>
#include<unordered_map>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>
template<class T>
class ObjectPool;
using std::cout;
using std::endl;
//...
//带头双向循环链表
class SpanList
{
public:
SpanList();//构造函数的声明,与定长内存池对象一起在类外定义
//...
private:
Span* _head;
static ObjectPool<Span> _spanPool; //静态的定长内存池对象,全局只有一个,但需在类外定义
public:
std::mutex _mtx;
};
- Common.cpp
#include "Common.h"
#include "ObjectPool.h"
ObjectPool<Span> SpanList::_spanPool;
SpanList::SpanList()
{
//_head = new Span;
_head = _spanPool.New();
_head->_next = _head;
_head->_prev = _head;
}
3)释放内存块时,优化为不传内存块大小
使用 free() 释放内存时,只需要传入指向这块内存的指针即可。
目前我们实现的内存池,在释放对象时除了需要传入指向该对象的指针,还需要传入该对象的大小,这是为了区分释放操作的实现细节。如果释放的是大于 256KB 的对象,需要根据对象的大小来判断这块内存到底应该还给 page cache,还是应该直接还给堆;如果释放的是小于等于 256KB 的对象,需要根据对象的大小计算出应该还给 thread cache 的哪一个哈希桶。
如果也想做到,在释放对象时不用传入对象的大小,可以这样实现:
在 Span 结构中再增加一个 _objSize 成员,该成员代表着这个 span 管理的内存块被切成的一个个对象的大小。
- Common.h
//...
//管理多个连续页大块内存跨度结构
struct Span
{
PAGE_ID _pageId = 0;
size_t _n = 0;
Span* _next = nullptr;
Span* _prev = nullptr;
size_t _objSize = 0; //切好的小内存的大小
size_t _useCount = 0;
void* _freeList = nullptr;
bool _isUse = false;
};
//...
同时,项目中的其他代码要进行相应的修改。
- CentralCache.cpp
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t byte_size)
{
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
list._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte_size));
span->_isUse = true; //修改标识为被使用
span->_objSize = byte_size; //向page cache获取一个k页的span后,将其大小填入_objSize字段
PageCache::GetInstance()->_pageMtx.unlock();
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
span->_freeList = start;
start += byte_size;
void* tail = span->_freeList;
while (start < end)
{
NextObj(tail) = start;
tail = NextObj(tail);
start += byte_size;
}
NextObj(tail) = nullptr;
list._mtx.lock();
list.PushFront(span);
return span;
}
- ConcurrentAlloc.h
#pragma once
#include "Common.h"
#include"ThreadCache.h"
#include"PageCache.h"
static void* ConcurrentAlloc(size_t size)
{
if (size > MAXBYTE)//大于256KB的内存申请
{
size_t alignSize = SizeClass::RoundUp(size);
size_t kpage = alignSize >> PAGE_SHIFT;
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_objSize = size; //将申请内存块的大小填入_objSize字段
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
if (pTLSThreadCache == nullptr)
{
//pTLSThreadCache = new ThreadCache;
static ObjectPool<ThreadCache> tcPool;
pTLSThreadCache = tcPool.New();
}
cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
return pTLSThreadCache->Allocate(size);
}
}
static void ConcurrentFree(void* ptr) //优化为不传对象大小
{
Span* span = PageCache::GetInstance()->MapObjToSpan(ptr);//获取相应页号
size_t size = span->_objSize; //获取内存块对象的大小
if (size > MAXBYTE)
{
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
4)读取 page cache 中映射关系时的加锁问题
我们用 PageCache 类的一个成员变量来存储页号与 span 之间的映射关系,以便后续通过页号找到相应的 span。
对于当前代码来说,如果我们此时正在 page cache 进行相关操作,那么访问这个映射关系是安全的,因为当进入 page cache 之前是需要加锁的,因此可以保证此时只有一个线程在进行访问。但如果是在 central cache 访问,或是在调用 ConcurrentFree() 释放内存时访问,那么就存在线程安全的问题,此时可能有其他线程正在 page cache 中进行某些操作,且也在访问这个映射关系,因此当在 page cache 外部访问这个映射关系时是需要加锁的。
也就是说,在调用 PageCache::MapObjToSpan() 时,其实是需要加锁的。
- PageCache.cpp
//...
//获取从内存块对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;
//C++提供的unique_lock,支持构造时加锁,析构时自动解锁
std::unique_lock<std::mutex> lock(_pageMtx);//加锁,出函数作用域会自动解锁
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}
}
//...
5)针对性能瓶颈使用基数树进行优化
尽管当前项目维护了线程安全,但项目中锁带来的竞争仍会导致性能瓶颈,例如频繁调用 PageCache::MapObjToSpan() 等接口,会频繁涉及锁的申请和释放,进而影响程序运行的效率。
针对这一点,tcmalloc 中使用了基数树代替了 PageCache 类中的 unordered map 成员来进行优化,使得 PageCache::MapObjToSpan() 在读取页号与 span 的映射时可以不必加锁。
【Tips】为什么读取基数树映射关系的时候不需要加锁呢
当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是 C++ 中的 map 还是 unordered_map,在读取映射关系时都是需要加锁的。因为 C++ 中 map 的底层数据结构是红黑树,unordered_map 的底层数据结构是哈希表,而无论是红黑树还是哈希表,当我们在插入数据时其底层的结构都有可能会发生变化。比如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此我们在读取映射关系的时候是需要加锁的。
而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在 page cache 进行的。也就是说,读取映射时读取的都是对应 span 的_useCount 不等于 0 的页,而建立映射时建立的都是对应 span 的 _useCount 等于 0 的页,因此我们不会同时对同一个页进行读取映射和建立映射的操作。
.1- 基数树是什么
基数树本质是一个分层的哈希表,根据所分层数不同可分为单层基数树、二层基数树、三层基数树等。
- 单层基数树
单层基数树实际采用的就是直接定址法,每一个页号对应 span 的地址就存储数组中在以该页号为下标的位置。最坏的情况下,我们需要建立所有页号与其span之间的映射关系,因此这个数组中元素个数应该与页号的数目相同,数组中每个位置存储的就是对应span的指针。
//单层基数树
template <int BITS>
class TCMalloc_PageMap1
{
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap1()
{
size_t size = sizeof(void*) << BITS; //需要开辟数组的大小
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按页对齐后的大小
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申请空间
memset(array_, 0, size); //对申请到的内存进行清理
}
void* get(Number k) const
{
if ((k >> BITS) > 0) //k的范围不在[0, 2^BITS-1]
{
return NULL;
}
return array_[k]; //返回该页号对应的span
}
void set(Number k, void* v)
{
assert((k >> BITS) == 0); //k的范围必须在[0, 2^BITS-1]
array_[k] = v; //建立映射
}
private:
void** array_; //存储映射关系的数组
static const int LENGTH = 1 << BITS; //页的数目
};
需要建立映射时,就调用成员函数 set(),需要读取映射关系时,就调用成员函数 get()。
代码中的非类型模板参数 BITS,表示存储页号最多需要比特位的个数。在 32 位下,我们传入的是 32-PAGE_SHIFT,在 64 位下传入的则是 64-PAGE_SHIFT。而其中的 LENGTH 成员代表的就是页号的数目,即 2 ^ BITS。
以 32 位平台下,一页大小的为 8K 为例,此时页的数目就是2 ^32 ÷ 2 ^13 = 2 ^19
,因此存储页号最多需要 19 个比特位,此时传入非类型模板参数的值就是 32 − 13 = 19。由于 32 位平台下一个指针的大小是 4 字节,因此该数组的大小就是2 ^19 × 4 = 2 ^21 = 2 M,内存消耗其实不大,是可行的。但如果是在 64 位平台下,此时该数组的大小是2 ^51 × 8 = 2 ^54 = 2 ^24 G,这显然是不可行的。而实际上对于 64 位的平台,我们需要使用三层基数树。
- 二层基数树
这里还是以 32 位平台下,一页的大小为 8K 为例来说明二层基数树。
此时,存储页号最多需要 19 个比特位,而二层基数树会把这 19 个比特位分为两次进行映射,例如用前 5 个比特位在基数树的第一层进行映射,映射后得到对应的第二层,然后用剩下的比特位在基数树的第二层进行映射,映射后最终得到该页号对应的 span 指针。
//二层基数树
template <int BITS>
class TCMalloc_PageMap2
{
private:
static const int ROOT_BITS = 5; //第一层对应页号的前5个比特位
static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层存储元素的个数
static const int LEAF_BITS = BITS - ROOT_BITS; //第二层对应页号的其余比特位
static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层存储元素的个数
//第一层数组中存储的元素类型
struct Leaf
{
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; //第一层数组
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap2()
{
memset(root_, 0, sizeof(root_)); //将第一层的空间进行清理
PreallocateMoreMemory(); //直接将第二层全部开辟
}
void* get(Number k) const
{
const Number i1 = k >> LEAF_BITS; //第一层对应的下标
const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
if ((k >> BITS) > 0 || root_[i1] == NULL) //页号值不在范围或没有建立过映射
{
return NULL;
}
return root_[i1]->values[i2]; //返回该页号对应span的指针
}
void set(Number k, void* v)
{
const Number i1 = k >> LEAF_BITS; //第一层对应的下标
const Number i2 = k & (LEAF_LENGTH - 1); //第二层对应的下标
assert(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v; //建立该页号与对应span的映射
}
//确保映射[start,start_n-1]页号的空间是开辟好了的
bool Ensure(Number start, size_t n)
{
for (Number key = start; key <= start + n - 1;)
{
const Number i1 = key >> LEAF_BITS;
if (i1 >= ROOT_LENGTH) //页号超出范围
return false;
if (root_[i1] == NULL) //第一层i1下标指向的空间未开辟
{
//开辟对应空间
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
}
return true;
}
void PreallocateMoreMemory()
{
Ensure(0, 1 << BITS); //将第二层的空间全部开辟好
}
};
在二层基数树中,第一层的数组占用2 ^5 × 4 = 2 ^7 字节的空间,而第二层的数组最多占用2 ^5 × 2 ^14 × 4 = 2 ^21 = 2M。二层基数树相比一层基数树的好处就是,一层基数树必须一开始就把 2M 的数组开辟出来,而二层基数树一开始时只需将第一层的数组开辟出来,当需要进行某一页号映射时再开辟对应的第二层的数组就行了。
因此在二层基数树中提供了一个成员函数 Ensure(),当需要建立某一页号与其 span 之间的映射关系时,需要先调用 Ensure() 确保用于映射该页号的空间是开辟了的,如果没有开辟则会立即开辟。
而在 32 位平台下,就算将二层基数树第二层的数组全部开辟出来也就消耗了 2M 的空间,内存消耗也不算太多,因此我们可以在构造二层基数树时就把第二层的数组全部开辟出来。
- 三层基数树
对于 64 位的平台,就要用到三层基数树了。
与二层基数树类似,三层基数树实际上就是把存储页号的若干比特位分为三次进行映射,只有当要建立某一页号的映射关系时,再开辟对应的数组空间,而没有建立映射的页号就可以不用开辟其对应的数组空间,此时就能在一定程度上节省内存空间。
//三层基数树
template <int BITS>
class TCMalloc_PageMap3
{
private:
static const int INTERIOR_BITS = (BITS + 2) / 3; //第一、二层对应页号的比特位个数
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二层存储元素的个数
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三层对应页号的比特位个数
static const int LEAF_LENGTH = 1 << LEAF_BITS; //第三层存储元素的个数
struct Node
{
Node* ptrs[INTERIOR_LENGTH];
};
struct Leaf
{
void* values[LEAF_LENGTH];
};
Node* NewNode()
{
static ObjectPool<Node> nodePool;
Node* result = nodePool.New();
if (result != NULL)
{
memset(result, 0, sizeof(*result));
}
return result;
}
Node* root_;
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3()
{
root_ = NewNode();
}
void* get(Number k) const
{
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一层对应的下标
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
const Number i3 = k & (LEAF_LENGTH - 1); //第三层对应的下标
//页号超出范围,或映射该页号的空间未开辟
if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL)
{
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回该页号对应span的指针
}
void set(Number k, void* v)
{
assert(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS); //第一层对应的下标
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
const Number i3 = k & (LEAF_LENGTH - 1); //第三层对应的下标
Ensure(k, 1); //确保映射第k页页号的空间是开辟好了的
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立该页号与对应span的映射
}
//确保映射[start,start+n-1]页号的空间是开辟好了的
bool Ensure(Number start, size_t n)
{
for (Number key = start; key <= start + n - 1;)
{
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS); //第一层对应的下标
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二层对应的下标
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下标值超出范围
return false;
if (root_->ptrs[i1] == NULL) //第一层i1下标指向的空间未开辟
{
//开辟对应空间
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二层i2下标指向的空间未开辟
{
//开辟对应空间
static ObjectPool<Leaf> leafPool;
Leaf* leaf = leafPool.New();
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //继续后续检查
}
return true;
}
void PreallocateMoreMemory()
{}
};
当我们要建立某一页号的映射关系时,需要先确保存储该页映射的数组空间是开辟好了的,也就是调用其成员函数 Ensure(),而如果对应数组空间未开辟,则会立马开辟对应的空间。
.2- 代码实现
我们在项目中新创建一个 PageMap 头文件,将上文中基数树的代码全部放在该头文件下。
- PageMap
#pragma once
#include"Common.h"
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};
然后我们将基数树引入 page cache 中来优化性能。由于我们的项目当前是 32 位平台的(x86),因此这里用几层基数树都可以。
- PageCache.h
#pragma once
#include"Common.h"
#include"ObjectPool.h"
#include"PageMap.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
//获取从内存块对象到span的映射
Span* MapObjToSpan(void* obj);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
//获取一个k页的span
Span* NewSpan(size_t k);
private:
SpanList _spanLists[NPAGES];//哈希桶
//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
ObjectPool<Span> _spanPool;
public:
std::mutex _pageMtx;//大锁
private:
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};
- PageCache.cpp
#include"PageCache.h"
PageCache PageCache::_sInst;
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
if (k > NPAGES - 1) {
void* ptr = SystemAlloc(k);
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);
return span;
}
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
//再检查一下后面的桶里面有没有span,有就把它进行切分
for (size_t i = k + 1; i < NPAGES; i++)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = _spanPool.New();
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
//_idSpanMap[nSpan->_pageId] = nSpan;
//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
//将k页span返回
return kSpan;
}
}
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
//获取从内存块对象到span的映射
Span* PageCache::MapObjToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;
//std::unique_lock<std::mutex> lock(_pageMtx);
//auto ret = _idSpanMap.find(id);
//if (ret != _idSpanMap.end())
//{
// return ret->second;
//}
//else
//{
// assert(false);
// return nullptr;
//}
auto ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
}
// 释放空闲span回到Pagecache,并合并相邻的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
_spanPool.Delete(span);
return;
}
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
//auto ret = _idSpanMap.find(prevId);
//if (ret == _idSpanMap.end())
//{
// break;
//}
auto ret = (Span*)_idSpanMap.get(prevId);
if (ret == nullptr)
{
break;
}
Span* prevSpan = ret;
if (prevSpan->_isUse == true)
{
break;
}
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
//向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
//auto ret = _idSpanMap.find(nextId);
//if (ret == _idSpanMap.end())
//{
// break;
//}
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)
{
break;
}
Span* nextSpan = ret;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
_spanLists[span->_n].PushFront(span);
span->_isUse = false;//将该span设置为未被使用的状态
//_idSpanMap[span->_pageId] = span;
//_idSpanMap[span->_pageId + span->_n - 1] = span;
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
.3- 对比 malloc 进行简单的性能测试
这里我们引入一段代码,来简单测试我们的项目与 malloc 的性能。
- Benchmark.cpp
#include"ConcurrentAlloc.h"
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime(0);
std::atomic<size_t> free_costtime(0);
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(malloc(16));
v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%d ms\n",
nworks, rounds, ntimes, (int)malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%d ms\n",
nworks, rounds, ntimes, (int)free_costtime);
printf("%u个线程并发malloc&free %u次,总计花费:%d ms\n",
nworks, nworks * rounds * ntimes, (int)(malloc_costtime + free_costtime));
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime(0);
std::atomic<size_t> free_costtime(0);
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, (int)malloc_costtime);
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, (int)free_costtime);
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, (int)(malloc_costtime + free_costtime));
}
int main()
{
size_t n = 10000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}
可以看到,我们的高并发内存池比 malloc 要快得多。
九、项目总结
- 项目目的:通过模拟实现 tcmalloc 的核心功能,学习积累开发经验和提升代码能力。
- 项目原理及框架:
- 技术栈:C/ C++/ C++11。
- 项目环境:Windows、vs 2013 / vs 2019 / vs 2022 / vscode。
- gitee 源码地址:https://gitee.com/the-driest-one-in-varoran/concurrent-memory-pool
-
项目拓展:
标签:span,void,cache,并发,内存,Span,综合,size From: https://blog.csdn.net/waluolandao/article/details/142098617(1)打包成动静态库
在 vs2022 中,右键单击解决方案资源管理器中的项目名称,然后点击属性。
此时会弹出一些选项,按照以下图示就可以选择将其打包成静态库或动态库了。
(2)替换到系统调用 malloc
实际 Google 开源的 tcmalloc 是会直接用于替换 malloc 的,不同平台替换的方式不同。比如基于 Unix 的系统上的 glibc,使用了 weak alias 的方式替换;而对于某些其他平台,需要使用 hook 的钩子技术来做。相关资料——