目录
- 项目介绍
- 什么是内存池
- 定长内存池
- 高并发内存池
- 基数树结构
- 申请和释放接口
- 多线程并发环境测试
- 全文件
- 扩展和不足
- 参考资料
1. 项目介绍
1. 这个项目做的是什么?
当前项目是实现一个高并发的内存池,它的原型是google的一个开源项目tcmalloc,全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程管理,用于替代系统的内存分配函数(malloc、free)
我们这个项目式简化了之后的,模拟实现一个自己的高并发内存池,目的是学习tcmallco的精华,这种方式类似STL容器的学习,但是相比STL,代码量和复杂度都上升了很多,要有心里准备
tcmalloc是顶级的高手写出来的,知名度也非常高,不少公司也在用,Go语言直接用它做了自己的内存分配器
2. 项目要求的知识储备和难度
会用到c/c++,数据结构(链表,哈希桶),操作系统内存管理,单例模式,多线程,互斥锁等等知识
2. 什么是内存池
1. 池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。。之所以要申请过量的资源,是因为每次申请资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有线程池,对象池等,以服务器上的线程池为例,主要思想是:先启动若干数量的线程,让他们处于睡眠状态,当接收到客户端请求时,唤醒其中某个睡眠的线程,让它来处理客户端的请求,当完成这个请求后,线程又进入睡眠状态
2. 内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内促时,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或特定时间)时,内存池才将之前申请的内存真正释放
3. 内存池主要解决的问题
主要解决的是效率问题,其次作为系统内存分配器的角度,还需要解决内存碎片的问题
内存碎片
在需要补充说明的是内存碎片分为外碎片和内碎片,上面讲的外碎片问题,外部碎片是一些空闲的连续内存区域大小,这些内存空间不连续,以至于合计的内存足够,但是不能满足一些的内存分配申请需求,内部碎片式由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用,内碎片问题,会解释
4. malloc
c/c++中要动态申请内存是通过malloc去申请的,但是我们要知道,实际不是直接去获取内存的。而malloc是一个内存池,malloc()相当于向操作系统“批发了”一块较大的内存空间,然后“零售”给程序用,当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs用的微软自己的,linux gcc用的glibc的ptmalloc。下面几片文章
一文了解,Linux内存管理,malloc、free 实现原理
malloc()背后的实现原理
malloc的底层实现(ptmalloc)
3. 先设计一个定长的内存池
malloc在什么场景下都可以用,意味着不会有很高的性能,下面实现一个定长内存池,定长内存池简单了解内存池的框架,同时也是它的一个组件
windows和linux下如何直接向堆申请页为单位的大块内存
VirtualAlloc
brk和mmap
框架结构
成员一段内存,用剩余字节数来维护。当用完还回来时,用一个链表结构保存空间,下次使用直接在链表里取一个
定长申请的大小用模板参数传递,每次申请一个T对象大小。首先看链表里有没有,如果有直接在链表里取一个对象空间。如果没有,从大内存中切一个对象,如果大内存为空或不够了,调用系统函数申请固定大小的大内存
释放的时候直接插入到链表中
ObjectPool.h
#pragma once
// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage) throw (std::bad_alloc)
{
#ifdef _WIN32
// 1页8k
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE,
PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
template <class T>
class ObjectPool
{
public:
T* New() throw (std::bad_alloc)
{
T* obj = nullptr;
// 优先把还回来的内存重复利用
if (_free_list != nullptr)
{
// 转void**保证取到不同的指针大小
void* next = *((void**)_free_list);
obj = (T*)_free_list;
_free_list = next;
}
else
{
// 剩余内存不够一个对象大小, 则重新开大块空间
if (_remain < sizeof(T))
{
_remain = 128 * 1024;
//_memory = (char*)malloc(_remain);
_memory = (char*)SystemAlloc(_remain >> 13); // 计算页数
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
size_t obj_size = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
obj = (T*)_memory;
_memory += obj_size;
_remain -= obj_size;
// 定位new初始化, 显示调用T的构造
new(obj)T;
}
// 定位new,
return obj;
}
void Delete(T* obj) noexcept
{
// 显示调用析构函数清理对象
obj->~T();
*(void**)obj = _free_list;
_free_list = obj;
}
private:
char* _memory = nullptr; // 指向大块内存的指针
size_t _remain = 0; // 大块内存剩下的字节数
void* _free_list = nullptr; // 自由链表的头指针
};
关键点
- 自由链表结构
自由链表中每一个挂的都是一个对象大小的内存块,freelist指向第一个内存的地址,后面的为了连接起来,都会用每一个内存块的前一部分存储下一个内存块的地址,最后一个指向空,这样就连接了起来
取出一个内存块,也就是头删的过程,freelist指向第一个内存块,头删返回这个内存块,并让freelist指向后面的内存块,就需要取出存放地址的数据。如何确保取出一个指针大小,32位指针是4字节,64位指针是8字节,直接转为int*等等取出都不能保证,当强转为二级指针后,取内容就能确保是一个指针的大小,插入也是如此
- 位移计算页数
封装的SystemAlloc函数需要传入的是页数,这里默认一页是8k。当没有内存时申请128k大小,右移13位就是除以8k,计算出需要多少页
- 对象大小
T作为需要申请的内存大小,多大都有可能,但自由链表里挂内存时必须确保足够字节数存放下一个内存块的指针,所以判断当对象大小不够一个指针时,将大小扩展为指针大小,虽然返回的还是需要的大小,但维护的时候是扩展后的大小
- 构造和析构
如果需要的是类对象,由于是直接给的地址所以不会构造和析构,这里调用定位new构造,释放的时候显示调用析构
4. 高并发内存池框架
很多开发环境都是多核多线程,申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身已经很优秀,tcmalloc在多线程高并发的场景下更胜一筹,所以实现的内存池要考虑以下问题:
1.性能问题
2.多线程环境下,锁竞争问题
3.内存碎片问题
concurrent memory pool主要由以下3个部分组成
1.thread cache:线程缓存时每个线程独有的,用于小于256kb的内存的分配,线程从这里申请不需要加锁,每个线程独享一个cache,这就是并发线程池高效的地方
2.central cache:中心缓存时所有线程共享,threadcache按需从centra cache中获取的对象。central cache会在合适的时机回收thread cache对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以这里取内存对象是需要加锁的,这里用的是桶锁,只有thread chache没有内存对象时才会找central cache,所以竞争不会很激烈
3.page cache:页缓存时central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cahce会回收cengral cache中满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片问题
thread cache
thrad cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表,每个线程都有一个thrad cache对象,这样每个线程在这里获取对象和释放都是无锁的
申请内存
1.当内存申请size<=256kb,先获取到现场本地存储的thread cahhe对象,计算size映射的哈希桶自由链表下标i
2.如果自由链表_freelist[i]中有对象,则直接Pop一个内存对象返回
3.如果_freelist[i]中没有对象时,则批量从central cche中获取一定数量的镀锡,插入到自由链表并返回一个对象
释放内存
1.当释放内存小于256k时将内存释放回threadchache,计算size映射自由链表桶位置i,将对象push到_freelist[i]
2.当链表长度过长,则回收一部分内存对象到central cache
TLS-thread locak storage
linux gcc下 tls
框架
成员自由链表数组,数组的大小取决于对于小于256k内存的映射规则
函数,申请内存和释放内存,释放时需要传入大小,方便挂回桶中,如果内存不够了,需要从centralcache取空间,内存过大,将内存还给central
用tls保证每个线程拥有一个独立的对象
ThreadCache.h
#pragma once
#include "Common.h"
// 线程内存池
class ThreadCache
{
public:
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// central内存池取空间
void* FetchCentralCache(size_t index, size_t size);
// 释放对象时,链表过长,回收内存到中心缓存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freelist[NUM_FREELISTS]; // 大小取决于桶的映射规则
};
// tls每个线程一个局部ThreadCache
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
ThreadCache.cpp
#include "ThreadCache.h"
#include "CentralCache.h"
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t align_size = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
// 桶中为空, 在central内存池取
if (!_freelist[index].Empty())
{
return _freelist[index].Pop();
}
else
{
return FetchCentralCache(index, align_size);
}
}
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 插入回自由链表
size_t index = SizeClass::Index(size);
_freelist[index].Push(ptr);
// 当链表长度大于一次批量申请的内存时就开始还一段list给central cache
if (_freelist[index].Size() >= _freelist[index].MaxSize())
{
ListTooLong(_freelist[index], size);
}
}
void* ThreadCache::FetchCentralCache(size_t index, size_t size)
{
// 慢开始反馈调节算法
// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
size_t batch_num = min(_freelist[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freelist[index].MaxSize() == batch_num)
{
_freelist[index].MaxSize() += 1;
}
void* start = nullptr; // 输出参数
void* end = nullptr;
size_t actual_num = CentralCache::GetInstance()->FetchRangeObj(start, end, batch_num, size);
assert(actual_num > 0);
if (actual_num == 1)
{
assert(start == end);
return start;
}
else
{
// 插入到自由链表
_freelist[index].PushRange(NextObj(start), end, actual_num - 1);
return start;
}
return nullptr;
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
// 中心回收
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
自由链表和哈希桶的映射
static const size_t MAX_BYTES = 256 * 1024;
static const size_t NUM_FREELISTS = 208;
static const size_t NUM_PAGES = 129;
static const size_t PAGE_SHIFT = 13; // 一页大小
// 条件编译, 32位没有64定义, 所以先判断64
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#endif
// 获取自由链表下一个内存块
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
// 桶的映射规则, 计算出在哪个桶
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
计算对齐数为标准向上对齐的内存大小
//static inline size_t _RoundUp(size_t bytes, size_t align_num)
//{
// if (bytes % align_num != 0)
// {
// return (bytes / align_num + 1) * align_num; // 对齐
// }
// else
// {
// return bytes; // 整除不用改变
// }
//}
// 计算对齐数为标准向上对齐的内存大小
static inline size_t _RoundUp(size_t bytes, size_t align_num)
{
// 进一位去掉余数
return (bytes + align_num - 1) & ~(align_num - 1);
}
// 按照对齐规则向上对齐
static inline size_t RoundUp(size_t bytes)
{
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
return _RoundUp(bytes, 1 << PAGE_SHIFT);
}
}
计算桶的下标
//size_t _Index(size_t bytes, size_t align_num)
//{
// if (bytes % align_num == 0)
// {
// return bytes / align_num - 1;
// }
// else
// {
// return bytes / align_num;
// }
//}
// 计算映射桶的下标
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每个区间有多少链
static int group_array[4] = {16, 56, 56, 56};
if (bytes <= 128)
{
return _Index(bytes, 3);
}
else if (bytes <= 1024)
{
// 减去8字节对齐的内存, 加上之前桶的下标
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
}
else
{
assert(false);
}
return -1;
}
// 一次thread cache从中心获取多少个
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512], 一次性批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 大对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
{
num = 2;
}
if (num > 512)
{
num = 512;
}
return num;
}
// 计算一次向系统获取几个页
// 单个对象 8byte
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num * size;
npage >>= PAGE_SHIFT;
if (npage == 0)
{
npage = 1;
}
return npage;
}
};
关键点
- 映射规则
将需要申请的内存按映射大小对齐,这样就可以让自由链表容量不需要太大,也可以更好的利用。申请时将用户传入的大小先向上对齐到最近的对齐数,根据对齐数计算出桶的下标,然后访问到对应的自由链表结构做插入删除
- 从central取内存的慢开始调节方法
当没有内存向central要时,可以多要一些,这样下次再需要的时候就可以直接分配了,那么要多少?不能一开始就要很多,这里用一个慢开始的方法,用两个数据控制,一个是不断增长的maxsize,一个是根据需要的大小计算,需要的内存小就多给一些,需要的大就少给一些。mansize从1开始,这两个取小的那个,由maxsize不断向更多的数量调节,NumMove函数最终控制一个上限。当当前空闲的内存大于等于当前批量申请的maxsize,证明有很多内存不用了,就向central回收一个maxsize大小的内存
central cache
central cache也是一个哈希桶结构,哈希桶的映射关系跟thread cache是一样的。不同的是它的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大块内存块被按映射关系切成了一个个小内存对象挂在span的自由链表中
申请内存
1.当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法,central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread chache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率
2.central cache映射的spanlist中所有span的都没有内存以后,则需要向page chache申请一个新的span对象,拿到span以后将span管理的内存大小切好作为自由链表连接在一起,然后span中取对象给thread cache
3.cnetral cache中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++usecount
释放内存
当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并
框架
成员span的链表数组,span链表存储span的头节点,就可以找到所有span。每一个span结构是一页内存,需要妥善管理,合并申请等都是按页管理,维护span的页号,这块span的页数量,双向链表,计数,span内分割的对象大小,自由链表,是否在使用
central只能有一个,所以用单例模式
函数,申请批量内存给thread,释放批量内存,如果没有内存,从page申请span
CentralCache.h
#pragma once
#include "Common.h"
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_inst;
}
// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 中心获取一定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batch_num, size_t size);
// 将一定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
CentralCache(){}
CentralCache(const CentralCache&) = delete;
private:
SpanList _spanlist[NUM_FREELISTS];
static CentralCache _inst;
};
CentralCache.cpp
#include "CentralCache.h"
#include "PageCache.h"
CentralCache CentralCache::_inst; // 在cpp创建, 不会过多包含
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
// 查看当前的spanlist中是否有还有未分配对象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freelist != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}
// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
// page cache申请加锁
PageCache::GetInstance()->_page_mtx.lock();
// 走到这里说没有空闲span了,只能找page cache要
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_obj_size = size;
span->_isuse = true;
PageCache::GetInstance()->_page_mtx.unlock();
if (span->_pageid > 200000)
{
int x = 0;
}
// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span
// 计算span的大块内存的起始地址和大块内存的大小(字节数)
char* start = (char*)(span->_pageid << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
// 大块内存切成自由链表连接起来
span->_freelist = start;
start += size;
void* tail = span->_freelist;
while (start < end)
{
NextObj(tail) = start;
tail = NextObj(tail); // tail = strat
start += size;
}
NextObj(tail) = nullptr;
// 切好span以后,需要把span挂到桶里面去的时候,再加锁
list._mtx.lock();
// span插入到list中
list.PushFront(span);
return span;
}
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batch_num, size_t size)
{
size_t index = SizeClass::Index(size);
// 同一个桶加锁
_spanlist[index]._mtx.lock();
// 获取一个span跨度
Span* span = GetOneSpan(_spanlist[index], size);
assert(span);
assert(span->_freelist);
// 从span中获取batchnum个对象
// 如果不够, 有多少拿多少
start = span->_freelist;
end = start;
size_t i = 0;
size_t actual_num = 1;
while (i < batch_num - 1 && NextObj(end) != nullptr) // 要batch_num个, 走-1步
{
end = NextObj(end);
i++;
actual_num++;
}
// 给完中心链表更新
span->_freelist = NextObj(end);
// 给的链表设置结尾
NextObj(end) = nullptr;
span->_usecount += actual_num;
_spanlist[index]._mtx.unlock();
return actual_num;
}
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanlist[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
// 查找属于哪个span,头插进去
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
NextObj(start) = span->_freelist;
span->_freelist = start;
span->_usecount--;
// 说明span切分的所有小块内存都回来了
// 这个span就可以再回收给page cache, pagecache可以再尝试做前后页的合并
if (span->_usecount == 0)
{
_spanlist[index].Erase(span);
span->_freelist = nullptr;
span->_next = span->_prev = nullptr;
_spanlist[index]._mtx.unlock();
PageCache::GetInstance()->_page_mtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCahe(span);
PageCache::GetInstance()->_page_mtx.unlock();
_spanlist[index]._mtx.lock();
}
start = next;
}
_spanlist[index]._mtx.unlock();
}
以页为单位的大内存管理span的定义及spanlist定义
// 管理页为单位的大块内存
struct Span
{
PAGE_ID _pageid = 0; // 页号
size_t _n = 0; // 页数量
struct Span* _prev = nullptr; // 双向循环
struct Span* _next = nullptr;
size_t _usecount = 0; // 计数,0说明对象都还回来了
size_t _obj_size = 0; // 切分的对象大小
void* _freelist = nullptr; // 自由链表连接
bool _isuse = false; // 使用
};
// 带头双向循环链表
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_prev = _head;
_head->_next = _head;
}
void Insert(struct Span* pos, struct Span* new_span)
{
assert(pos);
assert(new_span);
struct Span* prev = pos->_prev;
prev->_next = new_span;
new_span->_prev = prev;
new_span->_next = pos;
pos->_prev = new_span;
}
void PushFront(struct Span* new_span)
{
Insert(Begin(), new_span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
void Erase(struct Span* pos)
{
assert(pos);
assert(pos != _head);
struct Span* prev = pos->_prev;
struct Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next == _head;
}
private:
struct Span* _head;
public:
std::mutex _mtx; // 桶锁
};
关键点
- 归还内存正确管理页
归还时每次通过起始地址得到这段内存在哪个页,头插到span中,这个span的usecount为0时,说明内存都回来了,前后页合并
- page申请span后切分
如果链表中没有span了,就向page申请,申请的数量一次性也多一些,以最大限度256k内存分配为标准,大的分少点,小的分多点,控制上下限,最多512个,计算512个需要多少页,最少分配1页。申请到后按字节大小切分,最后挂到central对应的桶中。由批量申请函数返回内存
page cache
申请内存
1.当central向page申请内存时,page先检查对应的位置有没有span,如果没有则向更大页寻找一个span,如果找到分成两份。比如:申请的是4页的page,4页的后面没有挂span,向后面寻找更大页的span,假设在10页的page位置找到一个span,将10页的page span分成一个4页的span和6页的span。4页返回,6页换位置挂起来
2.如果找到_spanlist[128]都没有合适的span,则向系统使用mmap、brk或者是Virtual等方式申请128页page span挂在自由链表中,再重复1的过程
3.需要注意的是central和page的核心结构都是spanlist的哈希桶,但是本质有区别,central的哈希桶是和thread一样的大小对齐关系映射的,它的spanlist中挂的span中内存都被按映射关系切好链接成小块的自由链表,而page中的spanlist是按下标桶号映射的,也就是说第i号桶挂的span都是i页的内存
释放内存
如果centra释放回一个span,则依次寻找span的前后page id有没有使用的空闲span,看看是否可以合并,如果合并继续向后寻找,这样就可以将切小的内存合并成更大的span,减少内存碎片
windows和linux下如何直接向堆申请页为单位的大块内存
VirtualAlloc
brk和mmap
框架
成员,用定长内存池代替malloc的内存管理,spanlist的哈希桶数组,基数树结构的哈希结构存储id和span指针的映射
函数,申请k页的span,回收span并合并相邻页,通过地址获取对应的span
PageCache.h
#pragma once
#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_inst;
}
// 获取一个K页的span
Span* NewSpan(size_t k);
// 获取对象到span的映射
Span* MapObjectToSpan(void* obj);
// 回收空间span, 并合并相邻的span
void ReleaseSpanToPageCahe(Span* span);
public:
std::mutex _page_mtx;
private:
PageCache(){}
PageCache(const PageCache&) = delete;
private:
SpanList _spanlist[NUM_PAGES];
//std::unordered_map<PAGE_ID, Span*> _id_span_map;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _id_span_map;
static PageCache _inst;
ObjectPool<Span> _span_pool;
};
PageCache.cpp
#include "PageCache.h"
PageCache PageCache::_inst;
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 大于128 page的直接向堆申请
if (k > NUM_PAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _span_pool.New();
span->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
if (span->_pageid > 200000)
{
int x = 0;
}
//_id_span_map[span->_pageid] = span;
_id_span_map.set(span->_pageid, span);
return span;
}
// 先检查第k个桶里有没有span
if (!_spanlist[k].Empty())
{
Span* span = _spanlist[k].PopFront();
if (span->_pageid > 20000)
{
int x = 0;
}
// 建立id和span的映射,方便central cache回收时检查小块内存、
for (PAGE_ID i = 0; i < span->_n; i++)
{
//_id_span_map[span->_pageid + i] = span;
_id_span_map.set(span->_pageid + i, span);
}
return span;
}
// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分
for (size_t i = k + 1; i < NUM_PAGES; i++)
{
if (!_spanlist[i].Empty())
{
Span* nspan = _spanlist[i].PopFront();
Span* kspan = _span_pool.New();
if (nspan->_pageid > 200000)
{
int x = 0;
}
if (kspan->_pageid > 200000)
{
int x = 0;
}
// 在nSpan的头部切一个k页下来
// k页span返回
// nSpan再挂到对应映射的位置
kspan->_pageid = nspan->_pageid;
kspan->_n = k;
nspan->_pageid += k;
nspan->_n -= k;
_spanlist[nspan->_n].PushFront(nspan);
// 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时进行的合并查找
/*_id_span_map[nspan->_pageid] = nspan;
_id_span_map[nspan->_pageid + nspan->_n - 1] = nspan;*/
_id_span_map.set(nspan->_pageid, nspan);
_id_span_map.set(nspan->_pageid + nspan->_n - 1, nspan);
// 要切出去的span加入映射
for (PAGE_ID i = 0; i < kspan->_n; i++)
{
//_id_span_map[kspan->_pageid + i] = kspan;
_id_span_map.set(kspan->_pageid + i, kspan);
}
return kspan;
}
}
// 走到这个位置就说明后面没有大页的span了
// 这时就去找堆要一个128页的span
//Span* big_span = new Span;
Span* big_span = _span_pool.New();
void* ptr = SystemAlloc(NUM_PAGES - 1);
big_span->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;
big_span->_n = NUM_PAGES - 1;
_spanlist[big_span->_n].PushFront(big_span);
if (big_span->_pageid > 20000)
{
int x = 0;
}
return NewSpan(k);
}
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
/*std::unique_lock<std::mutex> lock(_page_mtx);
auto ret = _id_span_map.find(id);
if (ret != _id_span_map.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}*/
auto ret = (Span*)_id_span_map.get(id);
assert(ret != nullptr);
return ret;
}
void PageCache::ReleaseSpanToPageCahe(Span* span)
{
// 大于128 page直接还给堆
if (span->_n > NUM_PAGES)
{
void* ptr = (void*)(span->_pageid << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_span_pool.Delete(span);
return;
}
// 对span的前后页尝试合并,缓解内存碎片问题
while (1)
{
PAGE_ID pre_id = span->_pageid - 1;
//auto ret = _id_span_map.find(pre_id);
页号没有,不合并
//if (ret == _id_span_map.end())
//{
// break;
//}
//
auto ret = (Span*)_id_span_map.get(pre_id);
if (ret == nullptr)
{
break;
}
// 页在使用,不合并
Span* pre_span = ret;
if (pre_span->_isuse == true)
{
break;
}
// 合并出超过128页的span没办法管理,不合并了
if (pre_span->_n + span->_n > NUM_PAGES - 1)
{
break;
}
span->_pageid = pre_span->_pageid;
span->_n += pre_span->_n;
// 删除合并了的span
_spanlist[pre_span->_n].Erase(pre_span);
//delete pre_span;
_span_pool.Delete(pre_span);
}
while (1)
{
PAGE_ID next_id = span->_pageid + span->_n;
//auto ret = _id_span_map.find(next_id);
页号没有,不合并
//if (ret == _id_span_map.end())
//{
// break;
//}
//
auto ret = (Span*)_id_span_map.get(next_id);
if (ret == nullptr)
{
break;
}
// 页在使用,不合并
Span* next_span = ret;
if (next_span->_isuse == true)
{
break;
}
// 合并出超过128页的span没办法管理,不合并了
if (next_span->_n + span->_n > NUM_PAGES - 1)
{
break;
}
span->_n += next_span->_n;
// 删除合并了的span
_spanlist[next_span->_n].Erase(next_span);
//delete next_span;
_span_pool.Delete(next_span);
}
// 合并后挂回去
_spanlist[span->_n].PushFront(span);
span->_isuse = false;
// 加入映射
/*_id_span_map[span->_pageid] = span;
_id_span_map[span->_pageid + span->_n - 1] = span;*/
_id_span_map.set(span->_pageid, span);
_id_span_map.set(span->_pageid + span->_n - 1, span);
}
关键点
- 三层缓存,小于小于256k的申请走thread,如果申请大于128页的内存直接向系统申请,同样用span结构管理,加入到映射中,归还也直接给系统,如果大于256k小于128页的,可以直接从page里申请
- 存储页映射时,因为合并要和前后页合并,所以这个页可能会成为头也可能是尾,所以存储头尾的映射
- 合并时,只需要修改span的结构,将合并了的span从list中删掉,最后合并好的加入映射,挂到链表中
5. 优化速度的基数树结构
测试时发现,接近一半的时间消耗在加锁和解锁的过程中,在这里面,主要消耗的有map通过地址返回span的查询,所以对这个结构优化,使用了基数树
基数树类似于位图,有三层,32位的环境用1和2层,64位用第三层。成员有两个,一个是存储页号总共需要多少位,另一个是数组,每个元素是一个指针。32位总共有232个地址,一页大小8k,所以总共有219个页,需要19位,模板参数BITS填19,LENGTH就是219
构造的时候开辟219(void)指针的大小,就是总共需要开辟的空间,大约是2m。插入的时候传入页号和地址,页号就是下标
get函数直接返回地址
第2层用了两个层,两个数组映射。总共需要19位,第一个数组5位,第二个14位,第一个数组就是32个位置,每一个指针保存的都是214大小的数组。当得到一个页号时,右移14位,就取到了剩下5位的值,就是第一个数组的下标。用k与2的14次方-1,也就是保留低14位,得到第二个数组的下标。虽然结构不一样,但是总共用的空间和上面一样,都是2m。这样的好处是第二层中不需要的空间可以暂时不开,在用的时候再开,提高了空间的利用
第一层是结构数组,里面保存的又是一个数组,这样就是二级映射
第三层和第二层类似,用了三层的映射,这样每一层需要的大小更小,因为在64位中,2的51次方大小不能一次性开出来。分成三层,在需要的时候开对应的空间
PageMap.h
#pragma once
#include"Common.h"
// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool<Leaf> leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};
6. 统一的申请和释放接口
申请和释放不直接调用thread,而是提供一个统一的接口,初始化一个线程局部存储的对象调用函数
ConcurrentAlloc.hpp
#pragma once
#include "Common.h"
#include "ThreadCache.h"
#include "PageCache.h"
#include "ObjectPool.h"
static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_BYTES)
{
size_t align_size = SizeClass::RoundUp(size);
size_t kpage = align_size >> PAGE_SHIFT;
PageCache::GetInstance()->_page_mtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_obj_size = size;
PageCache::GetInstance()->_page_mtx.unlock();
void* ptr = (void*)(span->_pageid << PAGE_SHIFT);
return ptr;
}
else
{
if (pTLSThreadCache == nullptr)
{
//pTLSThreadCache = new ThreadCache;
static ObjectPool<ThreadCache> tc_pool;
pTLSThreadCache = tc_pool.New();
}
return pTLSThreadCache->Allocate(size);
}
}
static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_obj_size;
if (size > MAX_BYTES)
{
PageCache::GetInstance()->_page_mtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCahe(span);
PageCache::GetInstance()->_page_mtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
7. 多线程并发环境下,malloc和ConcurrentAlloc的对比
分几轮大量申请释放,分别对比malloc和tcmalloc在定长和变长申请的时间消耗
Benchmark.cpp
#include "Common.h"
#include"ConcurrentAlloc.hpp"
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(malloc(16));
v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(ConcurrentAlloc(16));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
int main()
{
size_t n = 10000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}
测试为4个线程同时申请10轮,每轮10000次,relase32位,先是定长然后是变长
ConcurrentAlloc在并发环境下效率有明显提高,但本项目旨在学习tcmalloc的结构,为简化版本,并不是tcmalloc真正的效果,应用场景也不全面,不能代表最终结果
8. 全文件
9. 扩展和不足
上面已经测试了结果,那么能否替换系统的malloc,可以的。不同平台的替换方式不同,基于unix的系统上的glibc,使用了weak alias的方式替换,具体说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持alias attribute,所以替换就变成了这种通用形式:
void* mallco (size_t size) THROW attribute_((alias(tc_malloc)))
因此所有malloc的调用都跳转到了tc_malloc的实现
具体参考:GCC attribute 之weak alias属性
有些平台不支持这样的东西,需要使用hook的钩子技术来做
关于hook:hook
10. 参考资料
几个内存池库的对比
tcmalloc源码学习
tcmalloc源码阅读
如何设计内存池?-码农的荒岛求生
tcmalloc源代码