内存对齐与伪共享
时间测试类
该类会在后续的测试中用于运行时间测试。
// public/timer.h
#include <chrono>
#include <iostream>
#include <functional>
struct ScopeTimer
{
ScopeTimer(const char *msg):_msg(msg),_now(std::chrono::high_resolution_clock::now()){}
~ScopeTimer(){
std::cout << _msg << ",espaced " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - _now).count() << "ms" << std::endl;
}
const char *_msg;
std::chrono::high_resolution_clock::time_point _now;
};
void test_cycle(std::function<void()> f,long cycle,const char *msg)
{
ScopeTimer s(msg);
for (long index = 0;index < cycle;++index)
{
f();
}
}
内存对齐
内存对齐规则
我们知道,在我们写结构体时,默认情况下编译器会自动对这些数据结构进行对齐,对齐的规则为按照最大的pod
类型进行对齐。同时,我们也可以强制指定对齐的大小,此时将按照默认与指定的字节中较小的进行对齐。在程序员看来,内存是一个一个字节的,但是现代操作系统在进行内存管理时会要求内存按照N字节进行对齐,一般是4
字节。假设有如下几个结构体,按照不同的对齐规则,有不同的大小。
#include <iostream>
// 使用默认对齐
struct t1
{
char c;
int a;
char b;
short d;
};
// 强制1字节对齐
#pragma pack(1)
struct t2
{
char c;
int a;
char b;
short d;
};
struct t3
{
char c;
int a;
short d;
};
// 恢复默认对齐
#pragma pack()
int main()
{
std::cout << "sizeof(t1):" << sizeof(t1) << std::endl; // 12
std::cout << "sizeof(t2):" << sizeof(t2) << std::endl; // 8
std::cout << "sizeof(t3):" << sizeof(t3) << std::endl; // 7
return 0;
}
内存对齐对读取次数的影响
同样的,对于上面的2个结构体t1
和t2
,在要访问成员变量a
时,内存访问的次数是不一样的。对于t1
,由于是4
字节对齐,t1.a
本身就是符合内存对齐要求的,因此只需要一次存取。对于t2
,由于是1
字节强制对齐,t2.a
在内存中的布局如下:
struct t2
{
char c; // 0
int a; // 1、2、3、4
char b; // 5
short d; // 6、7
};
要访问t2.a
,首先要将0~3
字节和4~7
字节分2
次读取到内存中,然后再从1~4
字节获取变量t2.a
的值,因此未对齐的数据结构的访问速度要比对齐的数据结构访问次数慢不止2倍。经过测试,实际的访问速度基本没有差别,是测试代码有问题??测试代码如下:
#include <thread>
#include <iostream>
#include <chrono>
#include "../public/timer.h"
// 使用默认对齐
struct t1
{
char c{0};
int a{0};
char b{0};
short d{0};
};
// 强制1字节对齐
#pragma pack(1)
struct t2
{
char c{0};
int a{0};
char b{0};
short d{0};
};
#pragma pack()
long cycle{10000};
int main()
{
std::thread x1(
[](){
struct t1 a;
ScopeTimer s("inc t1.a");
for (long index = 0;index < cycle;++index)
{
a.a++;
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
);
std::thread x2([](){
struct t2 a;
ScopeTimer s("inc t2.a");
for (long index = 0;index < cycle;++index)
{
a.a++;
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
});
x1.join();
x2.join();
return 0;
}
//inc t2.a,espaced 7110ms
//inc t1.a,espaced 7115ms
嗯,这是由于cache line
造成的吗??还是说内存对齐与否造成的影响本身是可以忽略的??后续在介绍cpu cache line
之后,在排除了缓存命中问题后,使用数组来再次测试内存对齐与否的性能差异。
cpu cache line
现代cpu
都带有缓存,一般分为3级,离cpu
越近的缓存存取速度越快,同时缓存的容量越小。现代cpu
的一级缓存一般大小为4~64k
,并且存取时是以cache line
的形式进行的。我们日常使用的cpu cache line
一般64
字节,也就是cpu
在读取数据时会一次性的从上级内存将64
字节的数据读取到当前缓存中。因此,当我们要读取一个long
类型的数据时,cpu
实际上会将和它临近的一些字节一起读取到一级缓存中,以满足一次读取一个cache line
的要求。
伪共享
如果cpu
只有一个核,在多线程编程时,每个线程进行切换时,都需要将当前线程的上下文进行保存,然后加载下次要运行的线程的上下文,这就叫做上下文切换。现代cpu
一般都会有多个核,因此实际运行时会有多个线程并行运行,每个核都有独立的缓存,正常情况下并行运行的2
个线程如果没有访问或者修改相同内存是不会相互影响。但由于cache line
的存在,如果一个线程修改了运行在另外一个核上线程cache line
上的某一数据,则此时cpu
需要重新加载该cache line
上的数据。我们可以通过以下代码来证明该现象的存在:
#include "../public/timer.h"
#include <thread>
struct Array
{
long size{100};
long curIndex{0};
};
void incIndex(struct Array &arr)
{
arr.curIndex++;
}
void getSize(struct Array &arr)
{
long s = arr.size;
}
constexpr long maxIndex{100000000};
int main(int argc,char **argv)
{
struct Array arr;
{
ScopeTimer s("main");
long index{0};
while(index++ < maxIndex)
{
incIndex(arr);
}
}
std::thread t1([&arr](){
ScopeTimer s("thread1");
long index{0};
while(index++ < maxIndex)
{
incIndex(arr);
}
});
std::thread t2([&arr](){
ScopeTimer s("thread2");
long index{0};
while(index++ < maxIndex)
{
getSize(arr);
}
});
t1.join();
t2.join();
return 0;
}
//main,espaced 250ms
//thread2,espaced 606ms
//thread1,espaced 699ms
main
函数中的输出证明对Array
的N
次递增只需要250ms
,但是当我们在独立线程中让线程1
对curIndex
进行递增,让线程2
获取size
的值,可以看到他们的运行时间都有答复提升。这是由于线程1
和线程2
分别运行在不同的cpu
核心上,线程2
的cpu
会同时将curIndex
和size
同时读取到cache line
,当线程1
修改了curIndex
的时候,会造成线程2
中的curIndex
的值发生改变,虽然线程2
不关心curIndex
,但是此时cpu
还是需要重新从内存获取整个cache line
的数据,因此造成运行时间的大幅提升。如果在两个线程都运行getSize
,可以看到两个线程运行的时间都在200ms
左右。
解决伪共享
虽然两个线程访问的数据是独立的,但是可能会存在某一线程修改的数据,在另外一个线程的cache line
中,这样会造成另外一个线程需要重新存取整个cache line
,这种现象叫做伪共享。要解决伪共享,就要避免多个线程的cache line
相互影响,此时我们可以通过强制补充不需要的数据,让我们要访问的数据相互隔离,避免cache line
的影响。测试代码如下:
#include "../public/timer.h"
#include <thread>
struct Array
{
long size{100};
char padding[64-sizeof(long)];
long curIndex{0};
};
void incIndex(struct Array &arr)
{
arr.curIndex++;
}
void getSize(struct Array &arr)
{
long s = arr.size;
}
constexpr long maxIndex{100000000};
int main(int argc,char **argv)
{
struct Array arr;
{
ScopeTimer s("main incIndex");
long index{0};
while(index++ < maxIndex)
{
incIndex(arr);
}
}
{
ScopeTimer s("main getSize");
long index{0};
while(index++ < maxIndex)
{
getSize(arr);
}
}
std::thread t1([&arr](){
ScopeTimer s("thread1 incIndex");
long index{0};
while(index++ < maxIndex)
{
incIndex(arr);
}
});
std::thread t2([&arr](){
ScopeTimer s("thread2 getSize");
long index{0};
while(index++ < maxIndex)
{
getSize(arr);
}
});
t1.join();
t2.join();
return 0;
}
//main incIndex,espaced 244ms
//main getSize,espaced 236ms
//thread2 getSize,espaced 192ms
//thread1 incIndex,espaced 259ms
由于我们在要访问的数据中添加了padding
,此时两个线程要访问/修改的数据都是相互独立的,可以看到运行的时间基本和他们在main
函数中依次运行时大致一致。
测试代码
屏蔽cache line测试内存对齐的影响
在内存对齐对读取次数的影响
的测试中,我们看到对齐和不对齐实际的测试结果和我们预期不符,两者的运行时间大致一致。这可能是由于cache line
的原因造成的。这里我们创建2个结构体,大小都是64
字节,然后动态创建数组,并对数组中的每个元素的处于4
字节对齐位置和非对齐位置进行累加,这样可以避免由于cache line
对同一元素进行访问时由于缓存造成N
次循环中实际只有第一次访问时是存在差异的。通过测试,我们可以看到内存对齐的访问速度是非内存对齐的9
倍!!!
#include <thread>标签:index,struct,c++,long,char,线程,内存,对齐 From: https://blog.51cto.com/u_6650004/5997218
#include <iostream>
#include <chrono>
#include "../public/timer.h"
// 使用默认对齐
struct t5
{
char c{0};
int a{0};
char b{0};
short d{0};
char x[64-12];
};
// 强制1字节对齐
#pragma pack(1)
struct t6
{
char c;
int a;
char x[64-5];
};
#pragma pack()
constexpr long cycle{100000000};
int main()
{
std::cout << "sizeof(t5):" << sizeof(t5) << std::endl; // 64
std::cout << "sizeof(t6):" << sizeof(t6) << std::endl; // 64
// 提前申请内存,避免内存申请造成的差异
struct t5 * a1 = new struct t5[cycle];
struct t6 * a2 = new struct t6[cycle];
// 对内存对齐的N个元素分别进行1次访问
{
ScopeTimer s("inc t5.a");
for (long index = 0;index < cycle;++index)
{
a1[index].a++;
}
}
// 对非内存对齐的N个元素分别进行1次访问
{
ScopeTimer s("inc t6.a");
for (long index = 0;index < cycle;++index)
{
a2[index].a++;
}
}
}
//sizeof(t5):64
//sizeof(t6):64
//inc t5.a,espaced 568ms
//inc t6.a,espaced 4611ms