参考:《Professional c++》,《并发编程实战》
1 基本概念
1.1 竞争
原子性
"原子"(atomic)操作是指一种不可分割的操作, 即在执行过程中不会被中断的操作。这种操作要么完全执行,要么完全不执行,不会出现部分执行的情况。
应用场景
- 计数器:在多线程环境下安全地递增或递减计数器。
- 标志位:设置或检查某个标志位,以确保操作的原子性。
- 交换操作:在多线程环境下安全地交换两个变量的值。
- 比较并交换(CAS, Compare And Swap):用于实现无锁的数据结构和算法。
竞争条件的概念
当操作不是原子时, 可能发生竞争条件.
竞争条件(Race Condition)指的是并发编程中出现的现象或问题.
不是逻辑意义上的"条件"
例子
有两个线程对同一个变量操作, 初始值为1
时间 | 线程 1 (递增) | 线程 2 (递减) |
---|---|---|
1 | 加载值 (值 = 1) | |
2 | 递增值 (值 = 2) | |
3 | 加载值 (值 = 1) | |
4 | 递减值 (值 = 0) | |
5 | 存储值 (值 = 2) | |
6 | 存储值 (值 = 0) |
- 这两个线程发生了竞争, "同时" 处理一个值.
- 如果没有竞争, 而是原子操作, 结果应该是
1
竞争的后果
发生撕裂, 导致数据不一致, 程序出错
1.2 撕裂
撕裂概念
撕裂是竞争条件的一种特定情况
- 当多个线程同时访问和修改共享变量时, 由于缺乏适当的同步机制,导致读取到部分更新的值。
- 撕裂关注的是单个变量的读写操作是否原子化。
1.3 互斥锁和死锁
阻塞的概念
[[八股--操作系统#进程的状态]]
当阻塞状态时, 进程/线程让出 cpu 资源, 并等待一些事件(以便进入就绪态)
互斥锁
互斥锁的基本操作包括
- 锁定(Lock):一个线程在进入临界区之前需要锁定互斥锁。如果互斥锁已经被其他线程锁定,则当前线程将阻塞,直到互斥锁被解锁。
- 解锁(Unlock):当线程完成了对共享资源的访问后,需要解锁互斥锁,以便其他被阻塞的线程可以继续执行。
- 尝试锁定(Try Lock):尝试锁定互斥锁,如果锁定成功则返回
true
,否则立即返回false
. 对于两种情况可自定义不同的操作.
c++ 互斥体
在 c++中, 互斥体类型为 std::mutex
, 这是最基本的互斥锁,用于保护共享资源。
- 它有两个基本的成员
lock
,unlock
- 当被某个线程
lock
后, 其他线程无法lock
它, 他们被阻塞. 直到unlock
- 一般
lock()
操作应该写在 执行需要保护的操作前
例子: 使用锁后, 线程将会分别进行, 不会发生竞争条件
std::mutex mtx; //全局变量 互斥体
void print_block (int n, char c) {
mtx.lock(); //执行开始时 锁定
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
mtx.unlock(); //结束时解锁
}
int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');
th1.join();
th2.join();
return 0;
}
死锁概念
两个线程都在等待对方 释放锁/资源. 因此它们同时被阻塞
例子
std::mutex mtx1;
std::mutex mtx2;
void func1(){
mtx1.lock();
std::cout<<"我是函数1, 锁定了mtx1"<<std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2)); // 休眠2秒
mtx2.lock();
mtx1.unlock();
mtx2.unlock();
}
void func2(){
mtx2.lock();
std::cout<<"我是函数2, 锁定了mtx2"<<std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2)); // 休眠2秒
mtx1.lock();
mtx1.unlock();
mtx2.unlock();
}
int main ()
{
std::thread th1 (func1);
std::thread th2 (func2);
th1.join();
th2.join();
return 0;
}
程序开始后, 两个锁都被两个线程分别锁定, 然后他们开始争抢对方的锁. 这就死锁了
1.4 原子
原子操作
- 原子操作指的是一种不可分割的操作
- 要么完全执行完毕,要么完全不执行,期间不会被中断
- 可以避免竞争条件(Race Conditions)和数据不一致的问题
- 在不使用同步机制 (比如互斥锁机制) 下, 实现线程安全
c++中的原子类型
- 通过
<atomic>
头文件中的std::atomic
模板类来实现 std::atomic
可以对基础数据类型(如整数、指针等)进行原子操作,同时保证这些操作的线程安全性。
例子
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
std::atomic<int> counter{0}; //原子变量
void increment(int num_iterations) {
for (int i = 0; i < num_iterations; ++i) {
counter.fetch_add(1, std::memory_order_relaxed);
//fetch_add 是一个原子加法操作
//指定内存序为memory_order_relaxed
//不对内存操作进行排序,仅保证操作的原子性
}
}
int main() {
const int num_threads = 10;
const int num_iterations = 100000;
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment, num_iterations);
}
for (auto& t : threads) {
t.join();
}
std::cout << "Final counter value: " << counter.load() << std::endl;
return 0;
}
1.5 伪共享
现代计算机的缓存结构
内存层次结构包括多个缓存层次(通常称为L1、L2和L3缓存),
这些缓存旨在减少处理器访问主内存(RAM)的延迟。
每个缓存层次通常有不同的大小和速度
缓存行
缓存行是处理器缓存中的最小单位,通常为64字节
作用如下
- 减少内存访问延迟:如果缓存命中, 则不会访问内存.
- 批量传输数据:由于缓存行包含多个字节的数据,处理器在一次内存访问中可以加载更多的数据,从而提高内存访问的效率。
- 缓存一致性:缓存一致性协议确保多个处理器核心在访问和修改共享数据时能够保持数据的一致性。当一个核心修改了某个缓存行中的数据,该缓存行在其他核心的缓存中会被标记为“无效”,迫使其他核心在下次访问该缓存行时重新加载最新的数据
伪共享的概念
- 多个线程使用多个处理器核心, 修改不同变量, 但这些变量恰好位于同一个缓存行中时
- 当一个核心修改了一个缓存行中的变量时,整个缓存行会被标记为无效 Invalid
- 导致其他核心重新加载该缓存行
- 这样会导致频繁的缓存同步开销
为什么当一个核心修改时, 要将它标记无效?
为了保护数据的一致性:
当某个核心修改了缓存行中的数据后,其它核心中的副本数据已经不再有效,需要重新从主内存中获取最新的数据。
避免伪共享
为避免伪共享,可以采取以下措施:
- 数据对齐:确保不同线程操作的数据位于不同的缓存行中。
- 适当的填充:在变量之间增加填充空间,以确保它们不会共享同一个缓存行。
#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
// 定义一个结构体,确保两个变量位于不同的缓存行
struct alignas(64) PaddedInt {
std::atomic<int> value;
};
PaddedInt var1;
PaddedInt var2;
void increment(PaddedInt& var, int count) {
for (int i = 0; i < count; ++i) {
++var.value;
}
}
int main() {
const int iterations = 1000000;
std::thread t1(increment, std::ref(var1), iterations);
std::thread t2(increment, std::ref(var2), iterations);
t1.join();
t2.join();
std::cout << "var1: " << var1.value << "\n";
std::cout << "var2: " << var2.value << "\n";
return 0;
}
2 多线程编程基础
std:: this_thread 命名空间
它是一个命名空间, 含有很多函数
作用: 这些函数都用于查询当前代码所在线程的信息, 并控制当前线程, 比如如下函数
//命名空间的一些函数如下:
get_id(); //返回当前线程的唯一标识符 std::thread::id
sleep_for(duration); //让当前线程休眠指定的时长
sleep_until(time_point); //让当前线程休眠到指定时间
yield(); //提示操作系统当前线程可以让出处理器时间片
thread_local 关键字
- 为变量指定了[[§1. 变量和基本类型, 类别#存储期的概念|存储期]] 属性.
- 用于定义变量时, 将变量标记为线程局部化, 也就说, 当线程使用该变量时, 它使用的是它的一个拷贝.
#include <thread>
using namespace std;
thread_local int n = 0; //线程局部化声明
void f() { ++n; }
void g(int &k) { ++k; }
int main() {
thread t1(f);
t1.join();
cout << n << endl; //操作后,依然是0
int k = 0;
thread t2(g, std::ref(k));
t2.join();
cout << k << endl;
}
停止某个线程
有两种办法 在一个线程中停止另一个线程.
- 设置用于通信的变量
- 设置一个共享变量, 让一个 A线程修改它, 另一线程监听 B它.
- B 不断地检查变量, 满足特定条件时, 就停止B.
- 使用
jthread
jthread 介绍
它创建一种能自动 join
的线程对象
- 在对象析构时, 自动调用
join()
函数 - 支持协作式取消 (c++20新特性)
什么是协作式取消?
协作式和强行停止不同
协作式取消是接收到取消命令 (停止命令) 后, 继续执行一些收尾工作, 然后退出.
如何实现协作式取消?
通过 jthread
和 std::stop_token
实现
- 当
jthread
执行某个函数任务时, 它自动传入一个stop_token
对象作为函数的第一个参数. - 在外部可以通过 jthread 的成员函数, 控制
stop_token
的状态, 任务可以监听状态并做出反应.
例子
#include <iostream>
#include <thread>
#include <chrono>
#include <stop_token>
void task(std::stop_token stopToken) {
while (!stopToken.stop_requested()) {
std::cout << "Running task..." << std::endl;
}
std::cout << "Task is stopping..." << std::endl;
}
int main() {
std::jthread t(task); // std::jthread会自动传递stop_token对象
std::this_thread::sleep_for(std::chrono::seconds(1));
t.request_stop(); // 请求停止线程
// jthread自动连接在作用域结束时
return 0;
}
3 原子操作
3.1 atomic 类型模板
原子类型模板 atomic 介绍
atomic<T>
是一种以T
为构建的原子类型.atomic
本身是类型模板- 其提供的操作都是原子性的,保证了在多线程环境下的数据安全
- 对于类型
A
,它的原子类型std::atomic<A>
不能直接调用A
的成员函数- 如果允许直接调用成员函数,那么成员函数内部的代码可能会修改原子对象的状态, 从而破坏了
std::atomic
的线程安全保证
- 如果允许直接调用成员函数,那么成员函数内部的代码可能会修改原子对象的状态, 从而破坏了
原子模板不一定支持自定义的类型, 除非满足某些条件
- 可简单复制的: 该类型的复制构造函数、复制赋值运算符和析构函数都必须是编译器默认生成的,或者具有等效的行为。可以按位复制,而不会产生任何副作用
- 所有的成员是可原子化的
例子
atomic<int> at_int = 10; //定义一个原子类型 变量
对于基础内置整型, 可以直接使用下划线的类型名
![[Pasted image 20240716200112.png|925]]
atomic 的 fetch 成员函数
整型的原子类型有一些 fetch
成员, 用于数值操作
它的成员函数都是原子性的.
它返回值是操作前的备份值
![[Pasted image 20240716222307.png]]
例子
atomic<int> a = 10;
std::cout<< a.fetch_add(5); //输出10
std::cout<< a; //输出15
原子类型 使用其原来的成员
类型 T 对应的 atomic<T>
类型, 不能直接调用类型 T 的成员函数, 但有下面一些方法
- 使用
load()
获取值,操作后再store()
回去:
std::atomic<A> atomic_a;
A a = atomic_a.load(); // 获取当前值
a.member_function(); //使用成员函数
atomic_a.store(a); // 将修改后的值存储回去
这种方法可以确保对 A
对象的修改是原子性的,但它需要进行两次原子操作,可能会影响性能。
- 使用
compare_exchange_weak()
或compare_exchange_strong()
实现原子更新:
std::atomic<A> atomic_a;
A expected = atomic_a.load();
A desired;
do {
desired = expected;
desired.member_function();
} while (!atomic_a.compare_exchange_weak(expected, desired));
这种方法使用循环和比较交换操作来实现原子更新,可以避免多次原子操作,但代码相对复杂一些。
- 将第1条封装, 将成员函数封装成自由函数,并使用原子操作
void update_a(std::atomic<A>& atomic_a) {
A a = atomic_a.load();
a.member_function();
atomic_a.store(a);
}
3.2 内存序
什么是内存序?
在多处理器核心系统中用于控制不同处理器上的操作如何看到其他处理器上内存访问顺序
memory_order_relaxed
: 松散序,只保证原子操作的完整性,不保证操作的顺序。即,加载和存储操作的顺序可以被处理器自由重排。memory_order_consume
: 消费序,较少使用,确保只读取依赖于原子操作结果的数据必须按照正确的顺序来执行。在实际应用中,由于其复杂性和难以正确实现,很多编译器将其视为与memory_order_acquire相同。memory_order_acquire
: 获取序,用于读取操作,保证该操作之前的所有写入在当前处理器中可见。这防止了之后的读取或写入被重排到原子操作之前。memory_order_release
: 释放序,用于写入操作,保证该操作之后的所有读取或写入不会被重排到原子操作之前。memory_order_acq_rel
:获取-释放序,结合了memory_order_acquire和memory_order_release的效果,适用于同时具有读取和写入需求的原子操作。memory_order_seq_cst
: 顺序一致序,是最严格的内存序,保证所有线程看到相同的操作顺序。顺序一致性操作比其他内存序操作更昂贵(性能上的开销更大),但提供了最直观的多线程行为
越是严格的内存序表明开销可能越大, 这是因为操作系统需要做额外的同步操作
原子运算指定内存序
- 原子操作(即它的成员)可接受额外参数, 指定内存顺序
- 它的函数声明带有默认实参, 指定了默认内存序为最严格的
memory_order_seq_cst
template <typename T>
T atomic<T>::fetch_add(T value, memory_order __m = memory_order_seq_cst);
3.3 原子智能指针和引用
智能指针的风险
shared_ptr
不能被拷贝shared_ptr
中引用计数的控制模块是线程安全的, 保证指向的内存只被释放一次.shared_ptr
其他设计内容则是线程不安全的. 比如在多个线程中调用reset
为此 c++20引入了 `atomic<shared_ptr>` , 保证其线程安全.
**注意:** 对 `atomic<shared_ptr>` 所指对象调用非 const 方法仍然是 *线程不安全的*, 应该自定义同步控制.
原子引用 atomic_ref
atomic_ref
类型对象 是可拷贝的- 一旦某个量被
atomic_ref
对象绑定, 则该量不能通过其他手段访问. - 一旦
std::atomic_ref
对象被销毁,它所引用的对象将不再受到原子性保护 std::atomic_ref
是类型安全的,它确保只能对与引用类型相同的对象进行原子操作
例子
使用原子引用时, 不能对它引用的东西访问
#include <atomic>
int main() {
int a = 1;
int &Ra = a;
std::atomic_ref<int> ref_a(a);
std::atomic_ref<int> ref_b(a); //ok
Ra = 2; // 错误, Segmentation fault, 不能访问了
std::cout << a;
std::cout << ref_a; // 错误, atomic_ref<>没有重载<<运算符
}
例子
使用 atomic 和 atomic_ref 防止数据竞争
void increase(int &counter) { // 每隔1ms增加counter的值
for (int i = 0; i < 100; ++i) {
++counter;
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // 休眠1ms
}
}
int main() {
int counter = 0; //由于counter不是原子性的, 在递增时发生争用, 导致结果不为1000
vector<thread> threads;
for (int i = 0; i < 10; ++i) {
threads.push_back(thread(increase, std::ref(counter)));
}
for (auto &t : threads) {
t.join();
}
}
为解决上述问题, 有两种方法
//方法1, 将main中的 counter 改为原子类型
atomic<int> counter{0};
//方法2, 修改函数increase, 使用std::atomic_ref
void increase(int &counter) {
atomic_ref<int> c_ref{counter};
for (int i = 0; i < 100; ++i) {
++c_ref;
std::this_thread::sleep_for(1ms); // 休眠1ms
}
}
llvmclang64套件似乎没有 该类型
等待原子变量
4 互斥和锁
自旋锁
- 是互斥锁的一种形式
- 可以使用
atomic_flag
类型实现. - 使用自旋的方式, 尝试获取锁
- 进入线程后, 它开始使用自旋操作(也称为忙碌循环), 尝试获取锁, 也就是尝试 lock()
- 当锁定后, 就能执行后面的任务了.
忙碌循环是指在获取自旋锁时,如果锁当前不可用,线程**不会进入阻塞状态**,而是在一个循环中不断地尝试获取锁,**持续占用 CPU 资源进行检查**,直到成功获取锁为止。
- 优点: 当锁被占用的时间很短时,线程不需要进行上下文切换
- 缺点: 可能浪费大量浪费 CPU
atomic_flag spinlock = ATOMIC_FLAG_INIT;
4.2 互斥体类 mutex
互斥体介绍
c++提供了
- 非定时的互斥体类型 :
mutex
,recursive_mutex
,shared_mutex
.
- 定时的互斥体类型 :
timed_mutex
,recursive_timed_mutex
,shared_timed_mutex
所有的锁都支持的操作如下:
![[Pasted image 20240730101541.png]]
![[Pasted image 20240730101415.png]]
定时的互斥体
带有 timed
前缀的定时互斥体, 他们支持特殊的方法 尝试锁定
在指定时间内, 会不断尝试锁定, 而不会阻塞休眠.
![[Pasted image 20240730101736.png]]
所谓**定时**的互斥体类型, 就是可以定一个时间 t, 在指定时间内, 会不断尝试锁定, 而不会阻塞休眠.
超过了这个时间还没获取到锁, 就放弃
shared 的互斥体, 读写互斥体
- 带有 shard 前缀的互斥体, 称为读写互斥体
- 它可以被多个线程同时用
lock_shared()
锁定lock_shared
获取读锁/共享锁, 也就是说, 多个线程可以同时读取它- 对应的可以用
unlock_shared
解开读锁
- 对应的可以用
lock
是获取写锁/独占锁, 同一时刻只能被一个线程独占
- 当一个线程获取写锁, 那么其他线程将无法获取读锁/写锁
- 当一个线程获取读锁, 那么其他线程不能获取写锁, 但可以获取读锁
4.3 锁类型 lock
锁类介绍
- 锁类一般和互斥体类同时使用, 用于自动获取 互斥体的锁, 以及自动释放 互斥体的锁
- 锁类是 RAII (Resource Acquisition Is Initialization, 资源获取即初始化) 类型
- 将资源的获取(分配)和释放与对象的生命周期绑定在一起。具体来说,就是在对象的构造函数中获取资源,在对象的析构函数中释放资源
- 有四种类型的锁
lock_guard, unique_lock, shared_lock, scoped_lock
std::lock_guard
- 一般用于管理互斥体上的(写)锁, 可以对互斥体上的锁进行 自动上锁和解锁.
- 它简化了互斥量的使用
- 它的构造函数是
explicit
的, 必须使用直接初始化的方式, 不能用拷贝初始化
源码
template <typename _Mutex>
class lock_guard {
public:
typedef _Mutex mutex_type;
explicit lock_guard(mutex_type &__m) : _M_device(__m) {
_M_device.lock(); //在初始化后立即上锁
}
lock_guard(mutex_type &__m, adopt_lock_t) noexcept : _M_device(__m) {} //如果使用了adopt_lock_t,那么将不会在构造时上锁
~lock_guard() { _M_device.unlock(); }
//禁止复制和赋值
lock_guard(const lock_guard &) = delete;
lock_guard &operator=(const lock_guard &) = delete;
private:
mutex_type &_M_device;
};
struct adopt_lock_t { //类型adopt_lock_t只是一种用于标记的类型, 本身没有数据
explicit adopt_lock_t() = default;
};
- 锁类型在构造时, 调用
mutex
的lock
成员对互斥体上锁 - 如果构造时传入
adopt_lock_t
则什么也不做adopt_lock_t
类型是一个空格类型, 它只是用于作为一种标志, 调用不上锁的构造函数
- 在析构时调用
unlock
- 不可复制和赋值, 都是删除的函数.
锁类型就是用来*自动管理互斥体*的
利用局部变量*自动销毁调用析构函数的机制*, 在析构函数中释放 互斥体
std::scoped_lock
- 它的源码和
lock_guard
非常相似, 但是有一点区别是, 它是可变参数模板, 可以同时获取多个mutex
的锁. - 它接收的
adopt_lock_t
必须作为第一个参数. - 当它绑定多个
mutex
时, 调用的是 [[#std lock()]] 函数, 而非成员函数
std::unique_lock
它是一个类型模板, 模板参数为传入的 mutex 的类型
有多种使用方法, 它的构造函数除了接受一个 mutex_type 的引用, 还可以接受另一个对象.
通过例子解释
using namespace std;
mutex mtx;
void do_something1(){
unique_lock<mutex> lock(mtx); //1
// ...
}
void do_something2(){
unique_lock<mutex> lock(mtx, std::try_to_lock); //2
if(lock.owns_lock()){
// ...
}else{
// ...
}
}
void do_something3(){
unique_lock<mutex> lock(mtx, std::defer_lock); //3
// ...
lock.lock(); //手动锁定
// ...
lock.unlock(); //手动解锁
}
- 不传入额外对象, 此时
unique_lock
作用类似于lock_guard
, 它会在 mtx 可用时立即锁定.(如果不能锁定, 则进入阻塞态) - 传入
try_to_lock
对象作为标记, 在对象构造后它将立即进行尝试锁定.- 如果锁定成功, 则"拥有锁", 它的成员函数
owns_lock
返回 true. - 可以设计两条分支语句.
- 如果锁定成功, 则"拥有锁", 它的成员函数
- 传入
defer_lock
, 表示延迟锁定, 对象构造完成后, 它什么也不做,- 在构造之后可以手动锁定, 手动调用方法
lock
和unlock
. - 也可以调用方法
try_lock
进行尝试锁定
- 在构造之后可以手动锁定, 手动调用方法
锁定策略:
- **尝试锁定**:
- 程序尝试获取锁, 比如使用try_lock函数, 如果获取到锁, 它返回true, 否则返回false. 根据返回值编写条件分支进行不同处理
- 在锁定失败后, 仍然可以手动锁定或者继续尝试锁定
- 在锁定成功的分支中, 不能再锁定了
- **手动锁定**:
- 手动获取锁, 如果锁被占用, 则进入阻塞态
- **延迟锁定**:
- 锁对象构造后, 不进行操作.
策略 | 描述 |
---|---|
std::adopt_lock |
如果 mutex 已经上锁, 然后绑定到一个 lock 类型, 则应该加上该标志, 表示不要在构造函数中调用 lock 方法. 它只能绑定已锁定的 mutex |
std::defer_lock |
延迟获取锁, 表示不要在构造函数中调用 lock 方法. |
std::try_to_lock |
尝试获取锁 |
std::timed_lock |
在指定时间内尝试获取锁 |
std::shared_lock
- 只能绑定 shared 类别的 mutex. 只能管理 读写共享锁
- 如果要管理 shared 类 mutex 的写锁, 则应该使用
lock_guard
或unique_lock
- 它和 unique_lock 有相同名称的成员函数, 但可调用的方法在底层有区别
- 它的
lock()
成员, 实际上是调用互斥体的lock_shared()
成员 - 当直接绑定时, 它将锁定 读写锁/共享锁, 即调用互斥体的
lock_shared()
成员
- 它的
std::scoped_lock
- 它是一个可变参数模板类型, 可以同时绑定多个互斥体.
- 绑定的互斥体类型可以不同, 但初始化时, 都将调用它们的 独占/写锁定方法.
- 不能传入锁定策略标志 (比如
std::defer_lock
)- 如果需要策略, 则应该使用
unique_lock
- 如果需要策略, 则应该使用
using namespace std;
shared_mutex smtx;
mutex mtx;
void func(int &counter) { // 每隔1ms增加counter的值
std::scoped_lock<shared_mutex, mutex> lock(smtx, mtx);
std::this_thread::sleep_for(1s);
}
4.4 一次性锁定多个 mutex
除了上述的 scoped_lock
, 还有一些模板函数
std::lock()
- 是可变参数模板函数, 可以同时锁定多个
mutex
互斥体. - 锁定时顺序是随机的
- 如果发生异常, 则立即 解锁 它刚刚锁定的那些互斥体. 可视为一种原子操作
std::try_lock()
当都成功时, 返回-1
4.5 std::call_once
call_once
可以确保函数只调用一次- 它和
std::once_flag
配套使用, 用于记录函数是否已经被调用过
call_once 接收两个参数
- 第一个是
once_flag
对象 - 第二个是 可调用对象
std::once_flag flag;
void do_once() {
std::call_once(flag, []() { std::cout << "Called once" << std::endl; });
}
int main() {
std::thread t1(do_once);
std::thread t2(do_once);
std::thread t3(do_once);
t1.join();
t2.join();
t3.join();
return 0;
}
5 信号量
std::counting_semaphore
他是一个类型模板, 它的模板参数为 一种整数类型, 表示信号量的最大值 (资源数)
std::binary_semaphore
实际上, 它是类型别名
using std::binary_semaphore = std::counting_semaphore<1>;
6 条件变量 condition_variable
condition_variable 例子
条件变量(Condition Variable)是用于线程间同步的一种机制,
允许一个线程阻塞并等待另一个线程发送信号来唤醒它。
例子
using namespace std;
mutex mtx;
condition_variable cv;
bool ready = false;
void print_id(int id) {
unique_lock<mutex> lck(mtx);
while (!ready)
cv.wait(lck); //这等价于 cv.wait(lck, []{return ready;})
//ready 用于防止意外唤醒
cout << "Thread " << id << '\n';
}
void go() {
unique_lock<mutex> lck(mtx);
ready = true;
cv.notify_all(); // 通知所有等待的线程, 使他们结束休眠进入就绪态
}
int main() {
// 创建并启动 10 个线程
thread threads[10];
for (int i = 0; i < 10; ++i)
threads[i] = thread(print_id, i);
cout << "10 threads ready to race...\n";
go(); // 设置 ready 为 true 并通知所有等待的线程
// 等待所有线程完成
for (auto &th : threads)
th.join();
return 0;
}
condition_variable
通过wait
函数, 释放了获取的 mutex 锁, 并进入阻塞态, 并等待 cv 的通知- 一旦调用
notify_all
时, 就唤醒所有相关的线程, 让他们进入就绪态
为什么用使用 `notify` ?
一般来说线程会在一段时间后自动醒来, 但这样太被动了, 不如直接在条件可行的时候就唤醒它
wait 成员函数
函数原型:
void condition_variable::wait(unique_lock<mutex>& __lock); //1
template <typename _Predicate> //2
void condition_variable::wait(
unique_lock<mutex> &__lock,
_Predicate __p
) {
while (!__p())
wait(__lock);
}
不带谓词的版本
wait
成员函数一般接受一个锁类型对象- 如
unique_lock
类型对象
- 如
- 当线程运行到
wait
时, 将自动释放那个锁对象绑定的互斥体, 并使当前线程进入阻塞态- 为什么要释放锁? 因为该线程要阻塞了, 不占用资源, 也就没有必要持有锁
- 此时其他的线程可以获取这个锁了.
- 原理是, 调用了锁对象的
unlock()
成员函数
- 当线程唤醒后, 将重新上锁, 然后继续执行
wait()
之后的代码.- 使用了
wait()
函数的线程可以被cv.notify_one()
或cv.notify.all()
唤醒 - 或者由于调度机制而自动唤醒. 称为[[#虚假唤醒]]
- 使用了
带谓词的版本
它会检查谓词. 如果醒来后, 谓词不通过, 那么会重新释放锁, 进入阻塞态
如果线程被唤醒, 会检查谓词 __p()
, 如果不满足则继续阻塞等待.
- 注意传入的谓词是一个可调用对象, 而不能是一个变量.
- 如果要传入一个
bool
型变量, 可以使用 lambda 表达式捕获它的引用, 直接返回这个变量.
例子
#include <condition_variable>
#include <mutex>
#include <iostream>
#include <chrono>
#include <thread>
using namespace std;
condition_variable cv;
mutex mtx;
bool ready = false;
auto Ready = [] {return ready; };
void f1() {
unique_lock<mutex> lk{ mtx };
cout << "f1开始" << endl;
cv.wait(lk, Ready);
cout << "f1真正开始" << endl;
//...
}
void f2() {
unique_lock<mutex> lk{ mtx };
cout << "f2开始" << endl;
std::this_thread::sleep_for(chrono::milliseconds(1000));
ready = true; // 如果将这行注释, 则f1将永远阻塞在wait处.
cv.notify_one();
}
int main() {
std::thread t1(f1);
std::this_thread::sleep_for(chrono::milliseconds(100));
std::thread t2(f2);
t1.join();
t2.join();
}
虚假唤醒
- 即使 cv 没有调用
notify_all
或notify_once
, 线程也可能醒来 - 为了防止虚假唤醒, 可以设置 额外变量并用
while
循环检查, 或者使用谓词版本的wait
.
notify 成员
将那些被条件变量 使用 wait
导致阻塞的线程唤醒
通知线程和等待线程使用同一个锁的情况
using namespace std::chrono_literals;
std::condition_variable cv;
std::mutex cv_m;
int i = 0;
bool done = false;
void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
std::cout << "我是wait,我将进入等待状态 \n";
cv.wait(lk, []{ return i == 1; });
std::cout << "完成等待; i == " << i << '\n';
done = true;
}
void signals()
{
std::this_thread::sleep_for(200ms);
std::cout << "我是通知者, 我马上要发出通知了\n";
cv.notify_one(); //这个通知是失败的,因为 i 依然等于1
std::unique_lock<std::mutex> lk(cv_m); //获取锁
i = 1;
while (!done)
{
std::cout << "我是通知者 我要发出有效的通知\n";
lk.unlock();
cv.notify_one();
std::this_thread::sleep_for(300ms);
lk.lock();
}
}
int main()
{
std::thread t1(waits), t2(signals);
t1.join();
t2.join();
}
- 首先 等待线程 获取锁
- 然后由于 wait 函数, 释放锁, 进入阻塞态
- 通知进程休眠了200ms, 然后开始通知, 一旦发出通知
- 等待进程醒来, 获取锁, 这导致通知进程无法获取锁.
- 等待进程发现 谓词不能通过, 因此重新释放锁, 进入阻塞
- 通知进程又可以获取锁了, 然后继续执行, 将变量
i=1
- 此时
!done
是 true 的, 进入 while 循环- 释放锁, 然后发出通知, 并休眠300ms
- 等待进程醒来, 发现谓词通过,
wait
函数返回并上锁- 等待进程完成等待, 并将
done=true
- 线程执行结束后, 将释放锁
- 等待进程完成等待, 并将
- 通知进程 休眠结束, 然后重新上锁
- 最后结束while 循环, 通知进程结束, 释放锁
7 future, promise 和 async
异步的概念
异步编程 是一种编程范式,它允许程序在执行一个可能长期运行的任务的同时继续对其他事件做出反应而不必等待任务完成。简单来说,就是让程序不必按照严格的顺序一行一行地执行,而是可以同时处理多个任务。
异步和并行并发的区别?
- 异步是一种编程模型,它本身不关心程序是否并发或并行执行。
- 并发和并行是程序执行的方式,它们可以通过异步编程模型来实现,但异步编程模型本身并不等同于并发或并行。
future 类型模板
基本概念
- 用于表示异步操作的结果
future<T>
中的T
表示结果的类型std::future
对象是不可复制的,但可以使用移动语义来转移其所有权- 一个
std::future
只能被一个线程使用。如果需要多个线程访问同一个异步结果,可以使用std::shared_future
创建 future
- 通过
std::async(Func)
来创建, 他将返回一个future<T>
对象, 其中T
正是调用体Func
所返回的类型 - 通过
std::promise
类型的get_future
成员来获取与 承诺体对应的future
体.
成员函数
- 使用
get()
成员可以通过阻塞当前线程, 直到与future
绑定的内容 被计算出. - 使用
wait()
成员作用类似, 但他不返回那个 绑定的内容
primise 类型模板
promise
对象只能被移动, 不能复制.primise
提供了存储值或异常的功能. 存储的值可以通过与它绑定的future
对象来访问- 通过
get_future
成员函数可以得到 与它绑定的future
对象
void func(std::promise<int> x){
x.set_value(1314);
}
int main(){
std::promise<int> P;
std::future<int> F = P.get_future();
func(std::move(P)); //这里必须使用std::move, 这将触发移动构造函数.
}
future 和 promise 使用的例子
future
和promise
一般成对使用.future
表示未来的值,promise
表示承诺会给出一个未来.promise
对象不可复制, 只能移动
一个例子如下:
#include <iostream>
#include <thread>
#include <future>
void calculate(std::promise<int>&& promiseObj) {
int result = 42; // 模拟计算
promiseObj.set_value(result); // 设置计算结果
}
int main() {
std::promise<int> promiseObj;
std::future<int> futureObj = promiseObj.get_future(); // 获取与 promise 关联的 future 对象
std::thread t(calculate, std::move(promiseObj)); // 将 promise 传递给线程
std::cout << "Waiting for result...\n";
int result = futureObj.get(); // 获取计算结果(阻塞直到结果可用)
std::cout << "Result: " << result << std::endl;
t.join();
return 0;
}
要点:
- 通过
promiseObj.get_future()
获取关联的 future 对象,- 可以在一开始就获取. 他们是绑定的.
promiseObj.set_value
可以设置与其关联的 future 对象的内容.- 最后通过
futureObj.get()
获取到这个设置的值.- 当使用 get 方法后, 程序将阻塞直到 这个未来值真的到来.
- 如果 `promise` 由于移动到线程中, 它具有线程存储期. 当线程完成时, 它被析构.
- 而先前通过 `get_future` 得到的, 与它关联的 `future` 对象不会伴随着失效.
- 当如果在获取结果前, `promise`就失效了, 那么关联的 `future` 对象将无法获得值.
- 如果`promise`对象被移动, 那么原来的 `future` 和新的 `promise` 关联.
async 函数模板
async 基本用法
async
是一个函数模板- 有两种形式
template<class F, class...Args>
std::future<ReturnType> std::async(std::launch launch_policy, F &&function, Args&&...args);
template<class F, class...Args>
std::future<ReturnType> std::async(F &&function, Args&&...args);
std::future<ReturnType>
对象代表异步操作的结果,ReturnType
是异步函数function
的返回类型。launch_policy
是启动策略,控制异步任务的启动方式,可以是以下值或它们的组合:std::launch::async
:异步执行,保证在新的线程中执行函数。std::launch::deferred
:延迟执行,直到调用future
对象的get()
或wait()
方法时才执行函数。
如何知道返回类型 ReturnType?
利用如下元函数可以解析其返回的类型
std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
例子
int func(int x){
std::sleep_for(5s);
std::cout<<"func\n";
return x;
}
int main(){
auto future_int = std::async(std::launch::async,f,42);
std::cout<<"11111111111111\n";
int result = funture_int.get(); //调用get时, 将等待异步程序执行结束
std::cout <<"22222222222222\n";
std::cout << result;
}
packaged_task 类型模板
packaged_task 基本用法
作用和用法
- 它用于将任何可调用对象 包装起来,
- 包装起来后可以用于异步调用.
- 将它作为任务启动后 (即调用
operator()
), 它的返回值将存储在共享状态中. - 它有一个
get_future
成员函数, 用于得到与该 callable 体对应的future
对象. 通过该future
对象可以访问它的返回值
例子
int func(int a, int b) {
return a + b;
}
int main() {
std::packaged_task<int(int, int)> packfun{ func };
std::future<int> myFuture = packfun.get_future();
packfun(1, 1);
std::cout << myFuture.get() << std::endl;
}
packaged_task 不能嵌套打包
除了移动构造函数 (移动显然不意味着嵌套) 可以传入 packaged_task<>
类型的参数,
其他构造函数都不接收 packaged_task<>
类型的参数, 防止嵌套打包
源码
//future
template <class _Rp, class... _ArgTypes>
class packaged_task<_Rp(_ArgTypes...)> {
template <
class _Fp,
class = __enable_if_t<!is_same<__remove_cvref_t<_Fp>, packaged_task>::value> //用于约束模板实参
>
explicit packaged_task(_Fp&& __f) : __f_(std::forward<_Fp>(__f)) {}
template <
class _Fp,
class _Allocator,
class = __enable_if_t<!is_same<__remove_cvref_t<_Fp>, packaged_task>::value> //用于约束模板实参
>
packaged_task(allocator_arg_t, const _Allocator& __a, _Fp&& __f)
: __f_(allocator_arg_t(),
__a, std::forward<_Fp>(__f)),
__p_(allocator_arg_t(), __a) {}
};
原理是: class = __enable_if_t<!is_same<__remove_cvref_t<_Fp>, packaged_task>::value>
提供了对模板实参的约束, 要求 _Fp
不能和 packaged_task
类型相同.
packaged_task 如何打包 lambda 和函数指针?
从它的构造函数上看, 似乎不能接受 lambda 和函数指针作为构造函数的参数
但可以通过如下[[2. 类型模板#自定义推导指引|自定义推导规则]]来实现.
//1
template <class _Rp, class... _Args>
packaged_task(_Rp (*)(_Args...)) -> packaged_task<_Rp(_Args...)>;
//2
template <class _Fp, class _Stripped = typename __strip_signature<decltype(&_Fp::operator())>::type>
packaged_task(_Fp) -> packaged_task<_Stripped>;
- 如果传入模板实参是函数指针类型
_Rp (*)(_Args...)
, 那么- 被引导为
_Rp(_Args...)
, 从而使用类模板packaged_task<_Rp(_Args...)>
- 被引导为
- 如果传入的模板实参是 lambda 对象的类型
_Fp
, 那么decltype(&_Fp::operator())
解析它的 operator 成员函数指针类型__strip_signature
删除成员函数指针类型的 类签名- 最后引导为
_Stripped
类型, 它是函数指针类型 - 再通过
//1
的引导, 使用类模板packaged_task<_Rp(_Args...)>
异步工具的比较
标签:std,20,--,lock,C++,互斥,int,线程,atomic From: https://www.cnblogs.com/easify/p/18642689https://www.zhihu.com/question/547132461/answer/2657296340?utm_psn=1850628489222422530