高并发内存池项目
文章目录
项目介绍
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目TCmalloc,TCmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free).我们这个项目是把TCmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的就是学习TCamlloc的精华,不是为了造出来一个更好的轮子.
项目地址:https://gitee.com/RachelByers/concurrentmemorypool
技术栈需求
- C/C++ 11,单例模式,哈希桶,链表等等数据结构
- C++多线程,互斥锁
- 操作系统内存管理
浅谈池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态.
内存池
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放.
内存池主要解决的问题
内存池主要解决的当然是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。那么什么是内存碎片呢?
内存碎片分为两种,内碎片&&外碎片
外碎片:外步碎片就是指在分配空间时候外部碎片是一些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够, 但是不能满足一些的内存分配申请需求.
内碎片:内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用.
重新认识malloc和free
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的.
而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,Linux GCC用的glibc中的ptmalloc。
第一步!设计一个定长内存池
定长内存池,顾名思义,就是对可申请内存长度做了限制,实例出来一个定长为int的内存池就只能申请4/8个字节的对象.这个定长内存池是我们接触池化技术的一个开胃菜,当然,这个定长内存池也是对后边整个项目有作用的.
//eg:ObjectPool.h
#pragma once
#include<iostream>
#ifdef _WIN32
#include<Windows.h>
#else
//LinuxHeader
#endif // WIN32
const static int MemorySize = 128 * 1024;
template<class T>
class ObjectPool
{
public:
inline static void* SystemCallAlloc(size_t size)
{
#ifdef _WIN32
void* ret = VirtualAlloc(nullptr, size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
if (ret == nullptr)
{
throw std::bad_alloc();
}
return ret;
#else
//Linux brk();
#endif // WIN32
}
T* New()
{
T* obj = nullptr;
if (_freelist)
{
obj = (T*)_freelist;
void* next = *(void**)_freelist;
_freelist = next;
}
else
{
if (_remain < sizeof(T))
{
//如果剩余空间小于申请的空间,重新申请一块内存
_memory = (char*)SystemCallAlloc(MemorySize);//128KB
if (_memory == nullptr)
{
throw std::bad_alloc();
}
_remain = MemorySize;
obj = (T*)_memory;
_memory += sizeof(T);
_remain -= sizeof(T);
}
else
{
obj = (T*)_memory;
_memory += sizeof(T);
_remain -= sizeof(T);
}
}
new(obj)T;
return obj;
}
void Delete(T* obj)
{
*(void**)obj = _freelist;
_freelist = (void*)obj;
}
private:
char* _memory = nullptr;
size_t _remain = 0;
void* _freelist = nullptr;
};
重要原理
1.首先定长内存池的长度,我们可以用泛型模板来确定,这个并不是难点.
2.定长内存池,既然是池化技术,就必须可以存储一定的资源,来等待下次的申请,我们默认一次申请128KB的内存,不够了再次申请,128KB通过静态常量"MemorySize"来定义.
3.申请出来的空间,我们存在 memory 里,remain存放剩余的空间字节数,这些都比较容易理解,但是这个内存池他也要具有回收内存的功能,简而言之就是也要可以delete内存,那么delete是怎么实现的呢?
- 我们可以简单分析,回收的内存长度都是一致的,他申请的长度也是如此,那么为了方便管理,我们直接拿链表将回收的内存串起来不就 方便管理了嘛?没错!我们也是这么处理的,但并不是直接使用STL容器的链表进行管理.
- 关于我们链表的实现,首先理解void* 是什么,void* 是无类型指针.他的用途和普通的int* 并不是完全相同的,因为它本身没有类型,所以 无法进行加减运算.他也无法直接进行解引用操作,使用时候必须进行强转才可以,这些都是因为他是无类型指针的原因.那么void**呢?
- 我们来看void* * ,乍一看,他应该和void * 是一类货色,其实不然,他是具有类型的,他的类型是void* ,当我们对他进行解引用时候,他会把sizeof(void*)个字节大小给我们使用.这样我们便可以取出一片空间里一个指针所需要的空间大小了.
- 我们将链表的后一个节点指针直接存放在整个对象内存的前N个字节.删除时进行头插,实现方法为 (void*) delobj=_freelist;
- 当申请内存时候先看链表内有没有空间可分配,若没有则去剩余空间查看,若还是没有,则去找OS申请内存.
综上就是定长内存池实现的注意细节
高并发内存池整体框架
我们的项目主要针对的是多线程高并发申请释放内存的场景,多线程申请必然会存在强烈的锁竞争问题.这也是内存池的性能瓶颈,malloc本身作为一个内存池,他已经很优秀了,我们的项目就是要达到在多线程情境下比malloc更胜一筹!
主要考虑的问题如下:
- 性能问题
- 多线程下锁竞争问题
- 内存碎片问题
CurrentMemoryPool 主要由三层结构组成
- thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
- central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
图解:
ThreadCache
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
申请内存
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存
- 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
- 当链表的长度过长,则回收一部分内存对象到central cache
TLS–thread local storage:
在 Windows 操作系统下,线程本地存储(TLS)是一种机制,用于在不同线程之间管理和存储线程特有的数据。Windows 提供了几种方法来实现线程本地存储,主要包括使用 Windows API 函数来管理和访问线程局部数据。
TLS在编译器中的支持
在 C++11 和之后的标准中,可以使用 thread_local
关键字来声明线程本地变量:
thread_local int myThreadLocalValue = 0;
Common Code:
/*****************************************************************//**
* \file Common.h
* \brief
*
*
* \author Rachel
* \date August 2024
*********************************************************************/
#pragma once
#include<iostream>
#include<time.h>
#include<assert.h>
#include<thread>
#include<unordered_map>
#include<algorithm>
#include<mutex>
#include"ObjectPool.h"
#ifdef _WIN32
#include<Windows.h>
#elif
//Linux下头文件
#endif
using std::cout;
using std::endl;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
//Linux...
#endif
const static int MAX_BYTES = 256 * 1024;//单个申请内存最大256KB
const static int N_FREElISTS = 208;
const static int NPAGES = 129;
const static int PAGE_SHIFT = 13;
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
static size_t RoundUp(size_t bytes)
{
//计算向上对齐到多少字节
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
//超过256KB
return _RoundUp(bytes, 1 << PAGE_SHIFT);
}
}
static size_t Index(size_t bytes)
{
//计算在哈希桶的下标
assert(bytes <= MAX_BYTES);
static size_t group_array[4] = { 16,56,56,56 };
if (bytes <= 128)
{
return _Index(bytes, 8);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 16) + group_array[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 128) + group_array[0] + group_array[1];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 1024) + group_array[0] + group_array[1] + group_array[2];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 8 * 1024) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
}
else
{
//超过256KB
assert(false);
}
}
static size_t NumMoveSize(size_t size)
{//一次从central Cache获取对象多少
assert(size);
//小空间申请一次上限高,大空间申请一次上限低
int num = MAX_BYTES / size;
if (num < 2)num = 2;
if (num > 512)num = 512;
return num;
}
static size_t NumMovePages(size_t bytes)
{
size_t n = NumMoveSize(bytes);
bytes *= n;
size_t pages = bytes >> PAGE_SHIFT;
if (pages < 1)
{
pages = 1;
}
return pages;
}
static inline size_t _RoundUp(size_t bytes, size_t align)
{
size_t alignNum;
if (bytes % align != 0)
{
alignNum = (bytes / align + 1) * align;
}
else
{
alignNum = bytes;
}
return alignNum;
}
static inline int _Index(size_t bytes, size_t align)
{
if (bytes % align == 0)
{
return bytes / align - 1;
}
else
{
return bytes / align;
}
}
};
ThreadCache code:
//ThreadCache.h
class ThreadCache
{
public:
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
void* FetchFromCentralCache(size_t index, size_t alignSize);
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[N_FREElISTS];//本质是一个哈希桶,数组多大取决于映射关系怎么设计
};
static _declspec(thread)ThreadCache* pTLSThreadCache = nullptr;
//ThreadCache.cpp
#define _CRT_SECURE_NO_WARNINGS
#include"ThreadCache.h"
#include"CentralCache.h"
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);
size_t index = SizeClass::Index(size);
//判断哈希桶index下标下有没有空间可分配
if (!_freeLists[index].isEmpty())
{
return _freeLists[index].Pop();
}
else
{
//若index下为空了,则从上层拉取下来新空间;
return FetchFromCentralCache(index, alignSize);
}
}
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
//先找要归还的桶(_freelists[index]),然后直接Push进自由链表就可以
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//当ThreadCache自由链表中的内存过长,开始释放部分还给CentralCache
if (_freeLists[index].MaxSize()<= _freeLists[index].size())
{
ListTooLong(_freeLists[index], size);
}
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
//将ThreadCache内的链表拿掉一部分
void* start = nullptr, * end = nullptr;
list.PopRange(start, end, list.MaxSize());
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t alignSize)
{
//慢反馈调节算法
//这里std::min 和windows的min冲突了,使用括号括起来避免宏替换
size_t batchNum = (std::min)(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(alignSize));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
void* start = nullptr, * end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, alignSize);
assert(actualNum>0);
if (actualNum == 1)
{
assert(start == end);
return start;
}
else
{
_freeLists[index].PushRange(NextObj(start), end,actualNum-1);
return start;
}
}
CentralCache
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
申请内存:
- 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
- central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
- central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count
释放内存:
- 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache, page cache中会对前后相邻的空闲页进行合并。
Common.h Code:
#pragma once
#include<iostream>
#include<time.h>
#include<assert.h>
#include<thread>
#include<unordered_map>
#include<algorithm>
#include<mutex>
#include"ObjectPool.h"
#ifdef _WIN32
#include<Windows.h>
#elif
//Linux下头文件
#endif
using std::cout;
using std::endl;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
//Linux...
#endif
const static int MAX_BYTES = 256 * 1024;//单个申请内存最大256KB
const static int N_FREElISTS = 208;
const static int NPAGES = 129;
const static int PAGE_SHIFT = 13;
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
static void* SystemCallAlloc(size_t kPage)
{
#ifdef WIN32
void* ret = VirtualAlloc(nullptr, kPage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
if (ret == nullptr)
{
std::cout << "bad alloc" << std::endl;
throw std::bad_alloc();
}
return ret;
#else
//Linux brk();
#endif // WIN32
}
static void SystemCallFree(void* ptr)
{
#ifdef WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
//Linux ();
#endif // WIN32
}
class SizeClass
{
public:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
static size_t RoundUp(size_t bytes)
{
//计算向上对齐到多少字节
if (bytes <= 128)
{
return _RoundUp(bytes, 8);
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16);
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else
{
//超过256KB
return _RoundUp(bytes, 1 << PAGE_SHIFT);
}
}
static size_t Index(size_t bytes)
{
//计算在哈希桶的下标
assert(bytes <= MAX_BYTES);
static size_t group_array[4] = { 16,56,56,56 };
if (bytes <= 128)
{
return _Index(bytes, 8);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 16) + group_array[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 128) + group_array[0] + group_array[1];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 1024) + group_array[0] + group_array[1] + group_array[2];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 8 * 1024) + group_array[0] + group_array[1] + group_array[2] + group_array[3];
}
else
{
//超过256KB
assert(false);
}
}
static size_t NumMoveSize(size_t size)
{//一次从central Cache获取对象多少
assert(size);
//小空间申请一次上限高,大空间申请一次上限低
int num = MAX_BYTES / size;
if (num < 2)num = 2;
if (num > 512)num = 512;
return num;
}
static size_t NumMovePages(size_t bytes)
{
size_t n = NumMoveSize(bytes);
bytes *= n;
size_t pages = bytes >> PAGE_SHIFT;
if (pages < 1)
{
pages = 1;
}
return pages;
}
static inline size_t _RoundUp(size_t bytes, size_t align)
{
size_t alignNum;
if (bytes % align != 0)
{
alignNum = (bytes / align + 1) * align;
}
else
{
alignNum = bytes;
}
return alignNum;
}
static inline int _Index(size_t bytes, size_t align)
{
if (bytes % align == 0)
{
return bytes / align - 1;
}
else
{
return bytes / align;
}
}
};
class FreeList
{
public:
void Push(void* obj)
{
//头插
assert(obj);
NextObj(obj) = _freeList;
//*(void**)obj = _freeList;
_freeList = obj;
_size++;
}
void* Pop()
{
assert(_freeList);
void* obj = _freeList;
_freeList = NextObj(obj);
_size--;
return obj;
}
bool isEmpty()
{
return _freeList == nullptr;
}
void PushRange(void* start, void* end, size_t actualNum)
{
NextObj(end) = _freeList;
_freeList = start;
_size += actualNum;
}
void PopRange(void*& start, void*& end, size_t max_size)
{
assert(max_size <= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < max_size - 1; i++)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= max_size;
}
size_t& MaxSize() { return _maxSize; }
size_t size() { return _size; }
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
size_t _size = 0;//记录自由链表长度,超过maxSize自动释放一部分给CentralCache
};
struct Span
{//管理多个连续页大块内存跨度结构
PAGE_ID _pageId = 0; //大块内存起始页号
size_t _n = 0; //页数
void* _freeList = nullptr; //切好小块内存链表
Span* _prev = nullptr; //Span双向链表
Span* _next = nullptr; //Span双向链表
size_t _usedCount = 0; //切好的小块内存,被分配给threadCache的计数
bool _isUsed = false; //表示这块span有没有被分配给CentralCache或者正在被分配
size_t _objSize = 0; //表示这个span里切割存放的小块内存大小
};
class SpanList
{//带头双向循环Span链表
public:
SpanList()
{
_head = _spanPool.New();
_head->_next = _head;
_head->_prev = _head;
}
~SpanList()
{
_spanPool.Delete(_head);
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos && newSpan);
Span* prev = pos->_prev;
newSpan->_prev = prev;
newSpan->_next = pos;
prev->_next = newSpan;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
prev->_next = pos->_next;
pos->_next->_prev = prev;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool isEmpty()
{
return _head->_next == _head;
}
private:
Span* _head;
ObjectPool<Span> _spanPool;
public:
std::mutex _mutex; //桶锁
};
CentralCache Code:
//CentralCache.h
#pragma once
#include"Common.h"
class CentralCache
{
public:
static CentralCache* GetInstance();//单例模式
size_t FetchRangeObj(void*& start, void*& end, size_t n, size_t bytes);//从CentralCache获取一定数量对象给ThreadCache
Span* GetOneSpan(SpanList& list, size_t bytes);
void ReleaseListToSpans(void* start, size_t size);
private:
CentralCache() {}
CentralCache(const CentralCache&) = delete;
SpanList _spanLists[N_FREElISTS];
static CentralCache _instance;
};
//CentralCache.cpp
#include"CentralCache.h"
#include"PageCache.h"
CentralCache CentralCache::_instance;
CentralCache* CentralCache::GetInstance()
{//单例模式
return &_instance;
}
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t bytes)
{
//返回值实际拿到的个数
//计算出CentralCache下的哈希桶index
size_t index = SizeClass::Index(bytes);
//给index下的桶加锁
_spanLists[index]._mutex.lock();
Span* span = GetOneSpan(_spanLists[index], bytes);
//判span和_freelist不为空
assert(span);
assert(span->_freeList);
//给span保存小对象大小
span->_objSize = bytes;
//从span中开始给start和end分配空间,默认分配bactNum个,如果不够则有多少拿多少
size_t i = 0;
start = span->_freeList;
end = start;
while (i < batchNum - 1 && NextObj(end) != nullptr)
{
end = NextObj(end);
++i;
}
span->_freeList = NextObj(end);
size_t actualNum = i + 1;
span->_usedCount += actualNum;
_spanLists[index]._mutex.unlock();
return actualNum;
}
Span* CentralCache::GetOneSpan(SpanList& list, size_t bytes)
{
Span* it = list.Begin();
//遍历下层ThreadCache所需要大小的CentralCache对应位置的Spanlist是否有符合条件的
while (it != list.End())
{
if (it->_freeList != nullptr)
{
//如果存在则符合,直接返回
return it;
}
it = it->_next;
}
list._mutex.unlock();
PageCache::GetInstance()->_pageMutex.lock();
//到此时已经遍历完CentralCache对应位置的spanlist,说明对应位置的spanlist均为空.只能找PageCache要Span
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePages(bytes));
span->_isUsed = true;
PageCache::GetInstance()->_pageMutex.unlock();
//为下层空间释放留出机会
//切分申请到的span
//1.计算span的大小
//2.计算span起始地址
//3.计算span结束地址
size_t spanSize = span->_n << PAGE_SHIFT;
char* start = (char*)(span->_pageId << PAGE_SHIFT);
char* end = start + spanSize;
//切分
span->_freeList = start;
start += bytes;
void* tail = span->_freeList;
//切分然后插入到_freelist
while (start < end)
{
NextObj(tail) = start;
tail = start;
start += bytes;
}
//切好span,将span挂载到桶里要再加锁
NextObj(tail) = nullptr;
list._mutex.lock();
//将span插入到CentralCache对应的桶中,即是list桶
list.PushFront(span);
return span;
}
/**
* void ReleaseListToSpans
*
*
* \param start 表示开始地址,不需要传end,因为最后会以nullptr结尾
* \param size 用来计算CentralCache里的桶号
*/
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mutex.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//找到start所在的span,将其放入span
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_usedCount--;
if (span->_usedCount == 0)
{
//说明这个span所有小块内存都回来了,这个span可以归还给PageCache了
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
_spanLists[index]._mutex.unlock();
PageCache::GetInstance()->_pageMutex.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMutex.unlock();
_spanLists[index]._mutex.lock();
}
start = next;
}
_spanLists[index]._mutex.unlock();
}
PageCache
申请内存:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
- 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
释放内存:
如果central cache释放回一个span,则依次寻找span的前后page id的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片
PageCache Code:
//PageCache.h
#include"Common.h"
#include"ObjectPool.h"
#include"PageMap.h"
class PageCache
{
public:
static PageCache* GetInstance();
Span* MapObjectToSpan(void* ptr);
void ReleaseSpanToPageCache(Span* span);
Span* NewSpan(size_t k);//获取一个K页的span
std::mutex _pageMutex;
private:
static PageCache _instance;
SpanList _spanLists[NPAGES];
ObjectPool<Span> _spanPool;
//优化后的在注释
//TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap; //这里只做32位适配
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
private:
PageCache() {}
PageCache(const PageCache&) = delete;
};
//PageCache.cpp
#include"PageCache.h"
PageCache PageCache::_instance;
/**
* 单例模式.
*
* \return单例对象
*/
PageCache* PageCache::GetInstance()
{
return &_instance;
}
Span* PageCache::MapObjectToSpan(void* ptr)
{
//std::unique_lock<std::mutex> lock(_pageMutex); //使用基数树就不需要加锁了
PAGE_ID id = (PAGE_ID)(ptr) >> PAGE_SHIFT;
Span* ret = (Span*)_idSpanMap.get(id);
if (ret != nullptr)
{
return ret;
}
else
{
//返回的地址必定是申请出来的,申请出来的必定会保存在map中,不存在找不到的情况,但是若是在合并内存则可能存在找不到的情况,故返回nullptr
return nullptr;
}
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > NPAGES - 1)
{
//调用系统释放
void* ptr = (void*)((span->_pageId) << PAGE_SHIFT);
SystemCallFree(ptr);
}
else
{
while (true)
{//向前合并
PAGE_ID prev = (span->_pageId - 1);
Span* ret = (Span*)_idSpanMap.get(prev);
if (ret == nullptr)break;
Span* prevSpan = ret;
if (prevSpan->_isUsed == true)break;
if (span->_n + prevSpan->_n > NPAGES - 1)break;
//到这里则可以开始进行合并
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
_spanPool.Delete(prevSpan);
}
while (true)
{//向后合并
PAGE_ID next = (span->_pageId + span->_n);
Span* ret = (Span*)_idSpanMap.get(next);
if (ret == nullptr)break;
Span* nextSpan = ret;
if (nextSpan->_isUsed == true)break;
if (span->_n + nextSpan->_n > NPAGES - 1)break;
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
_spanPool.Delete(nextSpan);
}
_spanLists[span->_n].PushFront(span);
span->_isUsed = false;
//优化后的在注释
_idSpanMap[span->_pageId] = span;
//_idSpanMap.set(span->_pageId, span);
_idSpanMap[span->_pageId + span->_n - 1] = span;
//_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
}
/* .由于PageCache和CentralCache和ThreadCache的哈希映射规则不同
* \param k的含义是上层申请K个页大小的Span对象
* \return 返回符合要求或者处理后符合规则的Span地址
*/
Span* PageCache::NewSpan(size_t k)//K is memory pages
{
assert(k > 0);
if (k < NPAGES - 1) {
if (!_spanLists[k].isEmpty())
{//先检查第K个桶有没有span
Span* kSpan = _spanLists[k].PopFront();
//大坑!这里取走k记得缓存每个的位置,不然换回来时候可能找不到,因为我们这个我们可能只换存了首尾的地址,因为他可能是上次剩下的nSpan
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
//优化后的在注释
_idSpanMap[kSpan->_pageId + i] = kSpan;
//_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
else
{
for (size_t i = k + 1; i < NPAGES; i++)//第K个桶为空,检查后边的桶.如果有则拆分为K个页表返回,n-k的页表插入PageCache的哈希桶中
{
if (!_spanLists[i].isEmpty())//不为空,开始切分
{
Span* nSpan = _spanLists[i].PopFront();
Span* kSpan = _spanPool.New();
//从nSpan头部切下来个kSpan
kSpan->_pageId = nSpan->_pageId;
nSpan->_pageId += k;
kSpan->_n = k;
nSpan->_n -= k;
//将(n-k)_span插入到哈希桶
_spanLists[nSpan->_n].PushFront(nSpan);
//建立在PageCache未用的span映射,即是nSpan的映射,用作内存合并
//优化后的在注释
_idSpanMap[nSpan->_pageId] = nSpan;
//_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
//_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
nSpan->_isUsed = false;
//建立页号和span的映射,方便回收内存,原因是每个小内存/PageSize 均可以得到页号,建立页号到地址的映射关系,可以在ThreadCache向Central还内存时直接查找.
for (PAGE_ID i = 0; i < kSpan->_n; i++)
{
_idSpanMap[kSpan->_pageId + i] = kSpan;
//_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
//kspan is returned toCentralCache
return kSpan;
}
}
//运行到此说明k桶后面均没有span
//找Heap申请128页的span
Span* bigSpan = _spanPool.New();
void* ptr = SystemCallAlloc(NPAGES - 1);
//装载bigSpan
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);//递归再来一次,这次不会进入到systemCallAlloc,直接在最后进行拆分返回
}
}
else
{
//大于128页的内存找系统直接申请
void* ret = SystemCallAlloc(k);
Span* SuperBigSpan = _spanPool.New();
SuperBigSpan->_pageId = ((PAGE_ID)(ret)) >> PAGE_SHIFT;
SuperBigSpan->_n = k;
_idSpanMap[SuperBigSpan->_pageId] = SuperBigSpan;
//_idSpanMap.set(SuperBigSpan->_pageId, SuperBigSpan);
return SuperBigSpan;
}
}
多线程并发环境下,malloc和ConcurrentAlloc申请和释放内存效率对比
基准测试代码 BenchMark.cpp
#define _CRT_SECURE_NO_WARNINGS
#include"ConcurrentAlloc.h"
#include<atomic>
#include<vector>
#include<thread>
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次malloc " << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;
cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次 free " << ntimes << "次: 花费:" << free_costtime << "ms" << endl;
cout << nworks << "个线程并发 malloc&free" << nworks * rounds * ntimes << "次,总花费:" << malloc_costtime + free_costtime << "ms" << endl;
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
cout << nworks << "个线程并发执行" << rounds << "轮次,每轮次concurrent alloc " << ntimes << "次: 花费:" << malloc_costtime << "ms" << endl;
cout << nworks << "个线程并发执行" <<rounds << "轮次,每轮次concurrent dealloc " << ntimes << "次: 花费:" <<free_costtime<<"ms" << endl;
cout << nworks << "个线程并发concurrent alloc&dealloc" << nworks * rounds * ntimes << "次,总花费:"<< malloc_costtime + free_costtime <<"ms" << endl;
}
int main()
{
size_t n = 100000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}
运行结果:
性能瓶颈分析
我们看到,我们的内存池效率并没有malloc高,下边我们开始对性能进行分析,这里我们借助VS自带的性能探查器进行分析
性能问题:
可看到这里PageCache里的锁竞争是非常激烈的,大部分性能开销是在MapObjectToSpan这个函数这里的.那么我们就需要对这里想办法进行优化.
优化方案:
我们知道在32位系统下,内存地址一共有232个,将其换算成页,也就是2(32-13)即2^19个
如果我们将每页的地址直接定址映射在一张哈希表上那么我们,我们在通过MapObjectToSpan进行查找的时候是不是就不需要加锁了呀,因为同一个页我们不可能同时申请和释放的.这样效率就提高上去了.
我们将他封装成一个类,提供set和get的方法用来写入和查询.替换之前的unorderedmap
这样也就彻底的脱离了malloc了,因为STL的unorderedmap底层扩容也是使用的new
,项目内的堆内存申请统一使用之前开胃菜里的定长内存池来申请,这里也就前后呼应了.
PageCache里的代码我已经用注释写了一份优化后的代码,这里不再另行展示了,代码量有点多了.直接上效果
固定长度内存申请对比
malloc花费64ms
currentalloc花费18ms
分散内存申请对比
malloc花费2143ms
courrentalloc花费178ms
性能分析器对比
可以看到,锁的竞争下降幅度还是很大的.
这个项目到这里已经接近尾声了,目前还存在一些不足,比如如何去替换malloc,每次手动使用还是比较麻烦的,可不可以替换malloc呢?实际是可以的.
这里作为一个后续的扩展有时间可以了解一些东西比如钩子技术,和GCC attribute 之weak,alias属性.