等待事件或等待其他条件
坐车案例
想象一种情况:假设晚上坐车外出,如何才能确保不坐过站又能使自己最轻松?
方法一:不睡觉,时刻关注自己的位置
1 #include <iostream> 2 #include <thread> 3 #include <mutex> 4 using namespace std; 5 6 mutex _mtx; 7 bool bFlag = false; 8 void wait_for_flag() 9 { 10 auto startTime = chrono::steady_clock::now(); 11 while (1) 12 { 13 unique_lock<mutex> lock(_mtx); 14 if (bFlag) 15 { 16 auto endTime = chrono::steady_clock::now(); 17 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 18 cout << "wait_for_flag consume : " << dCount << endl; 19 return; 20 } 21 } 22 } 23 24 void set_flag() 25 { 26 auto startTime = chrono::steady_clock::now(); 27 unique_lock<mutex> lock(_mtx); 28 for (int i = 0; i < 5; i++) 29 { 30 lock.unlock(); 31 //do something comsume 1000ms 32 this_thread::sleep_for(chrono::milliseconds(1000)); 33 lock.lock(); 34 } 35 36 bFlag = true; 37 auto endTime = chrono::steady_clock::now(); 38 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 39 cout << "set_flag consume : " << dCount << endl; 40 } 41 42 int main() 43 { 44 thread th1(wait_for_flag); 45 thread th2(set_flag); 46 th1.join(); 47 th2.join(); 48 return 0; 49 }
这种方式存在双重浪费:
- 线程 th1(wait_for_flag)须不断查验标志,浪费原本有用的处理时间,这部分计算资源原本可以留给其他线程使用。
- 线程 th1(wait_for_flag)每次循环都需要给互斥上锁,导致其他线程无法加锁。如果 th2 此时完成操作,则需要等待 th1 释放互斥才能操作。
程序输出如下:
set_flag consume : 5045.39 wait_for_flag consume : 5045.97
两个线程执行时间相近,但查看任务管理器,发现Debug程序CPU占用率始终保持10%。
方法二:通过设定多个闹钟,每隔一段时间叫醒自己
1 void wait_for_flag() 2 { 3 auto startTime = chrono::steady_clock::now(); 4 unique_lock<mutex> lock(_mtx); 5 while (!bFlag) 6 { 7 lock.unlock(); 8 //设置 500ms 的闹钟 9 this_thread::sleep_for(chrono::milliseconds(500)); 10 lock.lock(); 11 } 12 13 auto endTime = chrono::steady_clock::now(); 14 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 15 cout << "wait_for_flag consume : " << dCount << endl; 16 }
上面代码中引用了 this_thread::sleep_for()函数,如果暂时不满足条件,就让线程休眠。这确有改进,因为线程休眠,所以处理时间不再被浪费(不用熬夜)。但是,还是存在缺陷,休眠间隔时间难以确定。如果设置太短,会导致频繁检验,如果设置太长,又可能导致过度休眠(到站还没响)。如果线程 th2 完成了任务,线程 th1 却没有被及时唤醒,就会导致延迟。
上面的代码将休眠时间设置为500ms,CPU占用率始终为0%,但两个线程的运行时间相差过大。运行结果如下:
set_flag consume : 5061.66 wait_for_flag consume : 5570.77
方法三:让列车员叫醒你(使用 c++提供的同步机制)
若数据存在先后处理关系,线程甲需要等待线程乙完成处理后才能开始操作,那么线程甲则需等待线程乙完成并且触发事件,其中最基本的方式是条件变量。
1 mutex _mtx; 2 bool bFlag = false; 3 condition_variable _cond; //条件变量 4 void wait_for_flag() 5 { 6 auto startTime = chrono::steady_clock::now(); 7 unique_lock<mutex> lock(_mtx); 8 _cond.wait(lock, []() {return bFlag; }); //等待 9 10 auto endTime = chrono::steady_clock::now(); 11 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 12 cout << "wait_for_flag consume : " << dCount << endl; 13 } 14 15 void set_flag() 16 { 17 auto startTime = chrono::steady_clock::now(); 18 unique_lock<mutex> lock(_mtx); 19 for (int i = 0; i < 5; i++) 20 { 21 lock.unlock(); 22 //do something comsume 1000ms 23 this_thread::sleep_for(chrono::milliseconds(1000)); 24 lock.lock(); 25 } 26 27 bFlag = true; 28 _cond.notify_one(); //通知 29 auto endTime = chrono::steady_clock::now(); 30 double dCount = chrono::duration<double, std::milli>(endTime - startTime).count(); 31 cout << "set_flag consume : " << dCount << endl; 32 }
引用条件变量后,两线程执行时间相差不大,程序输出如下:
set_flag consume : 5015.84 wait_for_flag consume : 5016.75
注:上述案例的测试结果可能不尽相同,理解意思即可。
条件变量
C++标准库提供了条件变量的两种实现:
- std::condition_variable,只能和std::mutex一起使用。(推荐)
- std::condition_variable_any,只要某一类型符合成为互斥的最低标准,就能与其一起使用。
二者都在标准库的头文件<condition_variable>内声明。
std::condition_variable
构造函数
condition_variable(); ~condition_variable(); condition_variable(const condition_variable&) = delete; condition_variable& operator=(const condition_variable&) = delete;
不支持拷贝、也不支持移动。
通知
void notify_one(); //唤醒一个等待者 void notify_all(); //唤醒所有等待者
wait()函数
void wait(unique_lock<mutex>& _Lck); void wait(unique_lock<mutex>& _Lck, _Predicate _Pred);
参数:
- _Lck:独占锁,需要多次调用加锁、解锁操作。
- _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。
含义:
使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。
该函数执行过程如下:
- 当程序流程执行到wait时,如果指定了_Pred参数,wait会先执行_Pred。如果_Pred返回true,wait函数执行完毕,返回,执行后续代码;如果_Pred返回false,先将_Lck解锁并阻塞当前线程,等待其他线程唤醒。如果没有指定_Pred参数,等价于返回false的情况。
- 后续,其他线程调用notify_one()或notify_all()函数唤醒当前线程。当前线程被唤醒,先将_Lck上锁,如果指定_Pred参数,则先进行检查,根据返回值决定是否阻塞。如果没有指定_Pred参数,wait函数执行完毕,返回,执行后续代码。
wait_for()函数
template <class _Rep, class _Period> cv_status wait_for( unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time); template <class _Rep, class _Period, class _Predicate> bool wait_for( unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred);
参数:
- _Lck:独占锁
- _Rel_time:等待所消耗的最大时间
- _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。
- cv_status:如果在最大时间时间内被唤醒,则wait_for()函数返回cv_status::no_timeout,否则wait_for()函数返回cv_status::timeout。
- bool:返回_Pred的返回值。
使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。如果等待时常超过_Rel_time,wait函数将返回。
wait_until()函数
template <class _Clock, class _Duration> cv_status wait_until( unique_lock<mutex>&_Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time); template <class _Clock, class _Duration, class _Predicate> bool wait_until( unique_lock<mutex>&_Lck, const chrono::time_point<_Clock, _Duration>&_Abs_time, _Predicate _Pred);
参数:
- _Lck:独占锁
- _Abs_time:指定停止等待的时间点
- _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。
返回值:
- cv_status:如果在指定的时间点之前被唤醒,则wait_until()函数返回cv_status::no_timeout,否则wait_until()函数返回cv_status::timeout。
- bool:返回_Pred的返回值。
含义:
使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。如果等待时常超过指定的_Abs_time时间点,wait函数将返回。
std::condition_variable_any
std::condition_variable_any与std::condition_variable类似,这里只简单列出成员函数,具体含义可以参考上面的std::condition_variable。
构造函数
condition_variable_any(); ~condition_variable_any(); condition_variable_any(const condition_variable_any&) = delete; condition_variable_any& operator=(const condition_variable_any&) = delete;
不支持拷贝、也不支持移动。
通知
void notify_one(); void notify_all();
wait()函数
template <class _Lock> void wait(_Lock& _Lck); template <class _Lock, class _Predicate> void wait(_Lock& _Lck, _Predicate _Pred);
wait_for()函数
template <class _Lock, class _Rep, class _Period> cv_status wait_for( _Lock & _Lck, const chrono::duration<_Rep, _Period>& _Rel_time); template <class _Lock, class _Rep, class _Period, class _Predicate> bool wait_for( _Lock & _Lck, const chrono::duration<_Rep, _Period>&_Rel_time, _Predicate _Pred);
wait_until()函数
template <class _Lock, class _Clock, class _Duration> cv_status wait_until(_Lock & _Lck, const chrono::time_point<_Clock, _Duration>&_Abs_time) template <class _Lock, class _Clock, class _Duration, class _Predicate> bool wait_until(_Lock & _Lck, const chrono::time_point<_Clock, _Duration>&_Abs_time, _Predicate _Pred)
虚假唤醒
当线程从休眠状态中被唤醒,却发现等待条件未满足时,因而无事,这种情况被称为虚假唤醒。发生虚假唤醒最常见的情况是,多个线程争抢同一个条件,例如:
1 mutex _mtx; 2 condition_variable _cond; 3 queue<int> _dataQueue; 4 5 void data_preparation_thread() 6 { 7 while (true) 8 { 9 int _data = rand(); 10 { 11 std::lock_guard<mutex> lock(_mtx); 12 _dataQueue.push(_data); 13 } 14 _cond.notify_all(); 15 this_thread::sleep_for(chrono::milliseconds(1000)); 16 } 17 } 18 19 void data_processing_thread() 20 { 21 while (true) 22 { 23 std::unique_lock<mutex> lock(_mtx); 24 _cond.wait(lock, []() 25 { 26 bool bEmpty = _dataQueue.empty(); 27 if (bEmpty) 28 cout << this_thread::get_id() << " be spurious waken up\n"; 29 30 return !bEmpty; 31 }); 32 int _data = _dataQueue.front(); 33 _dataQueue.pop(); 34 lock.unlock(); 35 36 cout << "threadID : " << this_thread::get_id() << " data = " << _data << endl; 37 } 38 } 39 40 int main() 41 { 42 srand(time(NULL)); 43 44 thread th1(data_processing_thread); 45 thread th2(data_processing_thread); 46 thread th3(data_preparation_thread); 47 th1.join(); 48 th2.join(); 49 th3.join(); 50 return 0; 51 }
两个线程竞争队列中的一条数据,总有一个是被虚假唤醒的。
唤醒丢失
1 void wait_for_flag() 2 { 3 unique_lock<mutex> lock(_mtx); 4 _cond.wait(lock); //等待 5 } 6 7 void set_flag() 8 { 9 unique_lock<mutex> lock(_mtx); 10 bFlag = true; 11 _cond.notify_one(); //通知 12 } 13 14 int main() 15 { 16 thread th1(set_flag); 17 thread th2(wait_for_flag); 18 th1.join(); 19 th2.join(); 20 return 0; 21 }
先执行set_flag()函数,设置数据,然后通知等待线程,此时wait_for_flag()函数还未进入等待状态,导致通知信号丢失,后续又无新的通知信号,导致线程一直处于阻塞状态。
使用 future 等待一次性事件发生
本节介绍std::future类,该类一般用于处理一次性事件,可以获取该事件的返回值。
我们可以在某个线程启动一个目标事件,该目标事件由新的线程去执行,并获取一个std::future对象。之后,该线程执行自己余下的任务,等到未来某一时刻,获取目标事件中执行的结果。
C++标准程序库有两种future,分别由两个类模板实现,其声明都位于标准库的头文件<future>内:
- std::future:独占future。
- std::shared_future:共享future。
它们的设计参照了std::unique_ptr和std::shared_ptr。同一目标事件仅仅允许关联唯一一个std::future实例,但可以关联多个std::shared_future实例。大致含义是:目标事件中的返回结果,如果想被其他多个线程访问,则应该使用std::shared_future,否则使用std::future。
std::future
std::future类提供了访问异步操作执行结果的机制。通过std::async、std::packaged_task或std::promise创建的异步操作,这些函数会返回一个std::future对象,该对象中保存了异步操作的执行结果。但是,该类的结果并不是共享的,即,该结果只能访问一次。
构造函数
future(); ~future(); future(future&& _Other); future& operator=(future&& _Right); future(const future&) = delete; future& operator=(const future&) = delete;
仅支持移动语义,不支持拷贝。
valid()函数
bool valid() const;
检测当前结果是否就绪。
get()函数
template <class _Ty> _Ty get();
阻塞,等待future拥有合法结果并返回该结果。结果返回后,释放共享状态,后续调用valid()函数将返回false。若调用该函数前valid()为false,则行为未定义。
wait()函数
void wait() const;
阻塞直至结果变得可用,该函数执行后,valid() == true。
wait_for()函数
template <class _Rep, class _Per> future_status wait_for(const chrono::duration<_Rep, _Per>& _Rel_time);
等待结果,如果在指定的超时间隔后仍然无法得到结果,则返回。
future_status有如下取值:
- future_status::ready:共享状态就绪。
- future_status::timeout:共享状态在经过指定的等待时间内仍未就绪。
- future_status::deferred:共享状态持有的函数正在延迟运行,结果将在显式请求时计算。
wait_until()函数
template <class _Clock, class _Dur> future_status wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time);
等待结果,如果在已经到达指定的时间点时仍然无法得到结果,则返回。
share()函数
template <class _Ty> shared_future<_Ty> share();
将本对象移动到std::shared_future对象。
std::shared_future
类似std::future类,同样提供了访问异步操作执行结果的机制,与std::future类不同的是,该类的结果可以被访问多次(可以连续多次调用get()函数)。
构造函数
shared_future(); //构造函数 ~shared_future(); //析构函数 //支持拷贝 shared_future(const shared_future& _Other); shared_future& operator=(const shared_future& _Right); //支持移动 shared_future(future<_Ty>&& _Other); shared_future& operator=(shared_future&& _Right);
支持拷贝,可以复制多个std::shared_future对象指向同一异步结果。每个线程通过自身的std::shared_future对象副本访问共享的异步结果,这一操作是安全的。
valid()函数
bool valid() const;
检测当前结果是否可用。
get()函数
template <class _Ty> const _Ty& get() const;
阻塞,等待shared_future 拥有合法结果并获取它。若调用此函数前valid()为false,则行为未定义。
等待
void wait() const; template <class _Rep, class _Per> future_status wait_for( const chrono::duration<_Rep, _Per>&_Rel_time); template <class _Clock, class _Dur> future_status wait_until( const chrono::time_point<_Clock, _Dur>& _Abs_time);
和std::future相似。
案例-构造std::shared_future对象
1 std::promise<int> pro; 2 std::future<int> _fu = pro.get_future(); 3 std::shared_future<int> _sfu = std::move(_fu); //显示 4 5 std::promise<int> pro; 6 std::shared_future<int> _sfu = pro.get_future(); //隐式 7 8 std::promise<int> pro; 9 std::future<int> _fu = pro.get_future(); 10 std::shared_future<int> _sfu = _fu.share(); //share函数
std::async()函数-从后台任务返回值
std::async()函数用于构建后台任务,并允许调用者在未来某一时刻获取该任务的返回值。
函数定义
template <class _Fty, class... _ArgTypes> std::future<...> async(_Fty&& _Fnarg, _ArgTypes&&... _Args); template <class _Fty, class... _ArgTypes> std::future<...> async(launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args);参数:
- _Policy:该函数执行方式,有如下几种取值:
- std::launch::async:异步执行。该函数执行后,会启动新线程执行可调用对象。
- std::launch::deferred:惰性执行。该函数执行后,并不会启动线程,而是等到后续需要获取结果时,由获取值的线程直接执行可调用对象。如果没有调用get()或者wait(),可调用对象不会执行。
- std::launch::async | std::launch::deferred:由系统自行决定采用其中一个。
- _Fnarg:可调用对象。
- _Args:传递给可调用对象的参数包。
返回值:
std::future对象,通过该对象可用获取后台任务的返回值。
参数传递流程
//省略Res_Data类 int Entry(Res_Data data) { cout << "-----------"; return 5; } int main() { Res_Data _data; auto _fu = std::async(Entry, _data); cout << _fu.get() << endl; return 0; }
输出如下:
008FF9E3 Constractor 008FF6E8 Copy Constractor 008FF33C Move Constractor 00CE0854 Move Constractor 008FF33C Destractor 008FF6E8 Destractor 009FDE74 Move Constractor ----------- 009FDE74 Destractor 00CE0854 Destractor 5 008FF9E3 Destractor
结论:一次拷贝,3次移动。
案例
int ThreadEntry() { cout << "son threadId : " << std::this_thread::get_id() << " start to do something!" << endl; std::this_thread::sleep_for(std::chrono::milliseconds(5000)); cout << "son threadId : " << std::this_thread::get_id() << " end doing something!" << endl; return 5; } int main() { cout << "main threadId : " << std::this_thread::get_id() << endl; std::future<int> fu = std::async(ThreadEntry); cout << "main thread to do something" << endl; cout << fu.get() << endl; //等待线程结束并获取值,不能重复调用 //fu.wait(); //仅等待线程结束 cout << "main thread end doing something" << endl; return 0; }
如何判断std::async的执行方式
如果在构建异步任务时,不指定std::async的执行方式,那么操作系统会根据当前系统资源自动决定采用 std::launch::async 或 std::launch::deferred,那么,在程序中如何获知当前的执行方式呢?方法如下:
1 std::future _fu = std::async(ThreadEntry); 2 future_status status = _fu.wait_for(std::chrono::microseconds(0));//等待0ms 3 if (status == future_status::deferred) //延时执行 4 { 5 //todo 6 } 7 else if (status == future_status::ready || status == future_status::timeout) 8 { 9 //todo 10 }
std::future类的wait_for和wait_until成员函数会返回future_status枚举类型,该类型中记录当前的执行方式。
std::async与std::thread的区别
std::thread是创建一个线程执行相关操作,当系统资源紧张时,可能会导致线程创建失败。
std::async是创建一个异步任务,是否创建线程取决于操作系统,当系统资源紧张时,该函数可能不会创建线程,这取决于函数的执行方式。
std::thread不支持获取线程的执行结果,而std::async可以通过std::future获取异步任务的执行结果。
std::packaged_task - 关联future实例和任务
类模板 std::packaged_task 用于包装任何可调用对象,使之能被异步调用。可调用对象执行后,其返回值保存在std::future对象中,可用通过成员函数get_future()函数获取。
若一项庞杂的操作能分解为多个子任务,则可把它们分别包装到多个std::packaged_task实例之中,再传递给任务调度器或线程池。这就隐藏了细节,使任务抽象化,让调度器得以专注处理std::packaged_task<>实例,无须纠缠于形形色色的任务函数。
构造函数
packaged_task(); //默认构造函数 ~packaged_task(); //析构函数 template <class _Fty2, ...> packaged_task(_Fty2&& _Fnarg) //传入一个可调用对象 //支持移动 packaged_task(packaged_task&& _Other); packaged_task& operator=(packaged_task&& _Other); //不支持拷贝 packaged_task(const packaged_task&) = delete; packaged_task& operator=(const packaged_task&) = delete;
packaged_task的实例化与std::function类似,比如:
int ThreadEntry(string& str, double dValue, char* pBuf); std::packaged_task<int(string&, double, char*)> task(ThreadEntry);
valid()函数
bool valid() const;
检查任务对象是否拥有合法的可调用对象。
swap()函数
void swap(packaged_task& _Other);
交换std::packaged_task中包装的可调用对象和与之关联的std::future对象。
get_future()函数
future<_Ret> get_future();
返回与之关联的std::future对象。
operator()()函数
void operator()(_ArgTypes... _Args);
调用被包装的可调用对象,可调用对象执行完后,将返回值保存到std::future对象中,并使std::future对象变为可用状态。
make_ready_at_thread_exit()函数
void make_ready_at_thread_exit(_ArgTypes... _Args);
该函数会调用被包装的可调用对象,可调用对象执行完后,将返回值保存到std::future对象中,但不会立马让std::future对象变为可用,而是等到该线程结束后才使其可用。
例如:
1 int ThreadEntry() 2 { 3 cout << "son threadId : " << std::this_thread::get_id() << " start to do something!" << endl; 4 std::this_thread::sleep_for(std::chrono::milliseconds(5000)); 5 cout << "son threadId : " << std::this_thread::get_id() << " end doing something!" << endl; 6 return 5; 7 } 8 9 int main() 10 { 11 std::packaged_task<int(void)> task(ThreadEntry); 12 std::future<int> _fu = task.get_future(); 13 14 task.make_ready_at_thread_exit(); //执行该函数后,_fu未就绪,调用get将出错 15 //task(); //_fu立马就绪,调用get()不会出错 16 cout << "value = " << _fu.get() << endl; 17 return 0; 18 }
正确用法:
1 int main() 2 { 3 std::packaged_task<int(void)> task(ThreadEntry); 4 std::future<int> _fu = task.get_future(); 5 6 thread th1([&]() {task.make_ready_at_thread_exit(); }); 7 th1.detach(); 8 9 cout << "value = " << _fu.get() << endl; 10 return 0; 11 }
reset()函数
void reset();
抛弃之前的std::future对象和执行结果,重新构建新的std::future对象。不影响可调用对象。
案例-在线程间传递任务
1 std::mutex m; 2 std::deque<std::packaged_task<void()>> tasks; 3 4 void gui_thread() 5 { 6 while (...) 7 { 8 std::packaged_task<void()> task; 9 { 10 std::lock_guard<std::mutex> lk(m); 11 if (tasks.empty()) 12 continue; 13 task = std::move(tasks.front()); 14 tasks.pop_front(); 15 } 16 task(); 17 //其他操作 18 } 19 } 20 21 template<typename Func> 22 std::future<void> post_task_for_gui_thread(Func f) 23 { 24 std::packaged_task<void()> task(f); 25 std::future<void> res = task.get_future(); 26 std::lock_guard<std::mutex> lk(m); 27 tasks.push_back(std::move(task)); 28 return res; 29 }
std::promise-创建std::promise
std::promise给出了一种异步求值的方法,能与某个std::future对象关联,延后读出需要求取的值。配对的std::promise和std::future可实现下面的工作机制:等待数据的线程在future上阻塞,而提供数据的线程利用相配的promise设定关联的值,使future准备就绪。
1 void setValue(int nData, std::promise<int>& _pro) 2 { 3 cout << "set value threadID : " << this_thread::get_id() << endl; 4 this_thread::sleep_for(chrono::seconds(3)); //休息3秒 5 _pro.set_value( nData * 2 + 5 ); 6 } 7 8 void getValue(std::future<int>& _fu) 9 { 10 cout << "get value threadID : " << this_thread::get_id() << endl; 11 cout << "data = " << _fu.get() << endl; 12 } 13 14 int main() 15 { 16 std::promise<int> pro; 17 std::future fu = pro.get_future(); 18 19 thread th1(setValue, 10, std::ref(pro)); 20 thread th2(getValue, std::ref(fu)); 21 22 th1.join(); 23 th2.join(); 24 return 0; 25 }
构造函数
promise(); ~promise(); template <class _Alloc> promise(allocator_arg_t, const _Alloc& _Al); //移动构造 promise(promise&& _Other); promise& operator=(promise&& _Other); //无拷贝 promise(const promise&) = delete; promise& operator=(const promise&) = delete;
swap()函数
void swap(promise& _Other);
交换2个std::promise对象的内容,以及与之关联的std::future对象。
get_future()函数
future<_Ty> get_future();
获取std::future对象。
set_value()函数
void set_value(const _Ty& _Val); void set_value(_Ty&& _Val);
将_Val存储到共享状态,并使其进入就绪状态。如果std::promise对象在销毁时还未设置值,会用异常代替。
set_value_at_thread_exit()函数
void set_value_at_thread_exit(const _Ty& _Val); void set_value_at_thread_exit(_Ty&& _Val);
将_Val存储到共享状态,在线程结束时,使共享状态进入就绪状态。
set_exception()函数
void set_exception(exception_ptr _Exc);
将异常指针_Excl存储到共享状态,并使其进入就绪状态。
set_exception_at_thread_exit()函数
void set_exception_at_thread_exit(exception_ptr _Exc);
将异常指针_Excl存储到共享状态,在线程结束时,使共享状态进入就绪状态。
案例
1 void process_connections(vector<ConnectPackage>& connections) 2 { 3 while (!done(connections)) 4 { 5 for (auto itr : connections) 6 { 7 if (itr->has_incoming_data()) //有数据传入,接收 8 { 9 data_packet data = connection->incoming(); 10 std::promise<payload_type>& p = connection->get_promise(data.id); 11 p.set_value(data.payload); 12 } 13 14 if (connection->has_outgoing_data()) //有数据需向外传递 15 { 16 outgoing_packet data = connection->top_of_outgoing_queue(); 17 connection->send(data.payload); 18 data.promise.set_value(true); 19 } 20 } 21 } 22 }
将异常保存到future中
在允许多线程程序时,如果有异常情况发生,异常值如何向外传递是我们需要考虑的一个问题,例如:
1 double square_root(double x) 2 { 3 if (x < 0) 4 throw std::out_of_range("x < 0"); 5 6 return sqrt(x); 7 } 8 9 int main() 10 { 11 std::future<double> f = std::async(square_root, -1); 12 double dVal = f.get(); 13 return 0; 14 }
std::future实例化为double类型,因此f.get()将会返回一个double类型的值,那异常该如何传递呢?
c++规定,若经由std::async()调用的函数抛出异常,则会被保存到future中,代替本该设定的值,future随之进入就绪状态,等到其成员函数get()被调用,存储在内的异常即被重新抛出。假如我们把任务函数包装在std::packaged_task对象内,也依然如是。若包装的任务函数在执行时抛出异常,则会代替本应求得的结果,被保存到future内并使其准备就绪。只要调用get(),该异常就会被再次抛出。
对于std::promise类,可以调用调用成员函数set_exception()设置异常,例如:
1 try 2 { 3 pro.set_value(dVal); 4 } 5 catch (...) 6 { 7 pro.set_exception(std::make_exception_ptr(std::logic_error("error"))); 8 }
另一个方法是,直接销毁与std::future对象关联的std::promise对象或std::packaged_task对象。如果关联的future未能准备就绪,无论销毁两者中的哪一个,其析构函数都会将异常std::future_error存储为异步任务的状态数据,它的值是错误代码是std::future_errc::broken_promise。
限时等待
时钟周期-std::ratio
时间周期是指时钟的计时单位,每隔多长时间计数一次,用std::chrono::ratio来描述,该类定义如下:
template<std::intmax_t Num, std::intmax_t Denom = 1> class ratio;
std::chrono::ratio是一个比例类,其中Num代表分子,Denom代表分母。用于描述时间时,默认单位是秒,即:
ratio<2> //代表2秒 ratio<60> //代表1分钟 ratio<60*60> //代表1小时 ratio<1,1000> //代表1毫秒 ratio<1,1000000> //代表1微妙
时钟类
就C++标准库而言,时钟(clock)是时间信息的来源。具体来说,每种时钟都是一个类,提供4项关键信息:
- 当前时刻。
- 时间值的类型(int、double)。
- 该时钟的计时单元的长度(std::chrono::duration)。
- 计时速率是否恒定,即能否将该时钟视为恒稳时钟(steady clock)。
c++给出下列几种时钟:
- std::chrono::system_clock:
- std::chrono::steady_clock:
- std::high_resolution_clock:
其中,high_resolution_clock 被定义为:
using high_resolution_clock = steady_clock;
时钟类中都提供了一个静态成员函数now(),用于返回当前时刻。另一个静态成员is_steady,用于判断当前始终的计时速率是否恒定且无法调整,如果is_steady=true,则该时钟为恒定时钟。通常,std::chrono::system_clock类不是恒稳时钟,而std::chrono::steady_clock是一个恒定时钟。
时长类
c++标准库用std::chrono::duration<>类描述时长,该类定义如下:
template<class Rep, class Period = std::ratio<1,1>> class duration;
其中,Rep代表时钟数的类型;Period代表时钟周期,描述每一个计时周期代表多少秒。c++标准库也定义了一些常用的时长:
typedef duration<Rep ratio<3600,1>> hours; //小时 typedef duration<Rep ratio<60,1>> minutes; //分钟 typedef duration<Rep ratio<1,1>> seconds; //秒 typedef duration<Rep ratio<1,1000>> milliseconds; //毫秒 typedef duration<Rep ratio<1,1000000>> microseconds; //微妙 typedef duration<Rep ratio<1,1000000000>> nanoseconds; //纳秒
使用也非常方便,如果需要使线程休眠3秒,代码如下:
std::this_thread::sleep_for(std::chrono::seconds(3)); //休眠3秒
std::chrono::duration_cast时间转换
如果需要在两个时间周期转换,可以使用std::chrono::duration_cast<>进行显示转换,例如:
1 std::chrono::milliseconds ms(54802); 2 std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms); 3 4 cout << ms.count() << endl; //54802 5 cout << s.count() << endl; //54
强转将导致结果被截断。
count()函数
时长类支持算术运算,我们将时长乘或除以一个数值(数值应与时长类的计数类型相符)或对两个时长进行加减,就能得出一个新时长。可以通过count()函数获取该计时单位的数量:
1 std::chrono::seconds s = std::chrono::seconds(1) * 5; 2 cout << s.count() << endl; //5
时间点类
在c++标准库中,用std::chrono::time_point<>表示时间点,时间点是一个时间跨度,始于一个称为时钟纪元的特定时刻(一般是1970年1月1日0时0分0秒),终于该时间点本身。
template <class _Clock, class _Duration = typename _Clock::duration> class time_point;
其中,_Clock表示所参考的时钟,_Duration表示计时单元。
std::chrono::time_point类内提供了一个成员函数time_since_epoch(),该函数返回从时钟纪元到该时间点的时长跨度,下面例子是计算当前时间距1970年1月1日相隔多少天:
1 int main() 2 { 3 using days_type = std::chrono::duration<int, std::ratio<60 * 60 * 24>>; 4 5 std::chrono::time_point<std::chrono::system_clock, days_type> today = 6 std::chrono::time_point_cast<days_type>(std::chrono::system_clock::now()); 7 8 cout << today.time_since_epoch().count() << endl; 9 }
可以对某个时间进行加减操作,从而计算出新的时间点,例如:
std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(500);
上面代码计算出未来500毫秒时刻,我们可以用它来指定绝对超时时刻。在前面的案例中,也给出了计算某段代码执行时长的例子。
接受超时时限的函数
在c++标准库中,以_for结尾的,一般对应一个时长;以_until()结尾的,一般对应一个时间点。
1 std::chrono::milliseconds ms(500); 2 this_thread::sleep_for(ms); 3 4 auto tp = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(500); 5 this_thread::sleep_until(tp);
前面介绍的几个类中,都有包含_for和_until()结尾的函数,例如:std::this_thread、std::condition_variable、std::future、std::unique_lock等。
运用同步操作简化代码
注:该小节整理的不是很好,有部分内容我自己也没做过测试,大伙粗略看一下(或者跳过)。
利用future进行函数式编程
函数式编程是指一种编程风格,函数调用的结果完全取决于参数,而不依赖任何外部状态。若我们以相同的参数调用同一个函数两次,结果会完全一致。有一个函数类型叫“纯函数”,它的含义是:它产生的作用被完全限制在返回值上,而不会改动任何外部状态。这非常契合并发编程,只要共享数据没有改动,就不会引发条件竞争,因而无须动用互斥保护。
下面例子给出了快速排序并发版本:(大概理解一下就好,实际效率非常低)
template<typename T> std::list<T> parallel_quick_sort(std::list<T> input) { if (input.empty()) return input; std::list<T> result; result.splice(result.begin(), input, input.begin()); const T& pivot = *result.begin(); auto divide_point = std::partition(input.begin(), input.end(), [&pivot](const T& t) { return t < pivot; }); std::list<T> lower_part; lower_part.splice(lower_part.begin(), input, input.begin(), divide_point); std::future<std::list<T>> fu = std::async(parallel_quick_sort<T>, std::move(lower_part)); auto new_higher = parallel_quick_sort(std::move(input)); result.splice(result.end(), new_higher); result.splice(result.begin(), fu.get()); return result; }
使用消息传递进行同步
CSP(通信式串行线程):线程间没有共享数据,只负责接收消息,然后根据消息做出响应。比如去ATM机取钱,取款人需要先插入银行卡,之后输入密码,验证通过后输入金额,确认并等待机器吐钱。如果用线程来完成这个功能,则会有一个插卡线程,检测有没有插入银行卡,只有等卡插入了,才会通知输入线程显示并接收用户输入的密码,验证通过后,才会通知取钱线程,这样一环套一环的效果,没有完成前一项,就无法切换至下一项,这就是CSP。
符合并发技术规约的后续风格并发
并发技术规约在名字空间std::experimental内,提供了std::experimental::promise和std::experimental::packaged_task,二者与原始版本的差异是都返回std::experimental::future实例。
std::experimental::future提供了一个新的关键特性-then()后续。所谓后续是指,一旦future就绪,该函数立马执行某个函数。
并发技术规约中并没有提供与std::async()等价的函数,我们可以自己实现一个spawn_async函数,功能和std::async()函数类似:
template<typename Func> std::experimental::future<decltype(std::declval<Func>()())> spawn_async(Func&& func) { std::experimental::promise<decltype(std::declval<Func>()())> pro; auto fu = pro.get_future(); auto cbk = [_pro = std::move(pro), _func = std::decay_t<Func>(func)]() mutable { try { _pro.set_value_at_thread_exit(_func()); } catch (...) { _pro.set_exception_at_thread_exit(std::current_exception()); } }; std::thread th(cbk); th.detach(); return fu; }
后续函数的连锁调用
假定有一系列耗时的任务需要执行,而且,为了让主线程抽身执行其他任务,我们想按异步方式执行这些任务。例如,当用户登录应用程序时,我们就需向后端服务器发送信息以验证身份;完成身份验证之后,我们需再次向后端服务器请求其账户信息;最后,一旦取得了相关信息,就做出更新并显式呈现。
串行方式顺序执行代码如下:
1 void process_login(std::string const& username, std::string const& password) 2 { 3 try 4 { 5 user_id const id = backend.authenticate_user(username, password); //验证身份 6 user_data const info_to_display = backend.request_current_info(id); //请求账户信息 7 update_display(info_to_display); //更新显示 8 } 9 catch (std::exception& e) 10 { 11 display_error(e); 12 } 13 }
如果该代码在UI线程中执行,相关请求可能会较慢从而阻塞UI线程。
并行执行如下:
1 std::future<void> process_login(std::string const& username, std::string const& password) 2 { 3 auto cbk = [=]() 4 { 5 try 6 { 7 const user_id id = backend.authenticate_user(username, password); //验证身份 8 const user_data info_to_display = backend.request_current_info(id); //请求账户信息 9 update_display(info_to_display); //更新显示 10 } 11 catch (std::exception& e) 12 { 13 display_error(e); 14 } 15 }; 16 17 return std::async(std::launch::async, cbk); 18 }
大多数程序员可能会这样书写,实际上这也存在浪费。全部代码集中到后台的某一线程上运行,该后台线程仍会消耗大量资源以逐一完成各任务,因而发生阻塞。若这样的任务很多,那将导致大量线程无所事事,它们只是在空等。
我们需要按照后续函数的方式,将任务接合,形成调用链,每完成一个任务,就执行下一个。使用then()改进代码:
1 std::experimental::future<void> process_login(std::string const& username, std::string const& password) 2 { 3 //验证身份 4 auto cbk1 = [=]() 5 { 6 return backend.authenticate_user(username, password); 7 }; 8 9 //请求账户信息 10 auto cbk2 = [](std::experimental::future<user_id> id) 11 { 12 return backend.request_current_info(id.get()); 13 }; 14 15 //更新显示 16 auto cbk3 = [](std::experimental::future<user_data> info_to_display) 17 { 18 try { 19 update_display(info_to_display.get()); 20 } 21 catch (std::exception& e) { 22 display_error(e); 23 } 24 }; 25 26 return spawn_async(cbk1).then(cbk2).then(cbk3); 27 }
登录流程被拆分成一系列任务,每个任务各自作为上一个任务的后续函数,接合成调用链。
最后作者还提供了一个写法,但我没理解的太懂,先贴代码,后续补充:
1 std::experimental::future<void> process_login(std::string const& username, std::string const& password) 2 { 3 auto cbk1 = [](std::experimental::future<user_id> id) 4 { 5 return backend.async_request_current_info(id.get()); 6 }; 7 8 auto cbk2 = [](std::experimental::future<user_data> info_to_display) 9 { 10 try 11 { 12 update_display(info_to_display.get()); 13 } 14 catch (std::exception& e) 15 { 16 display_error(e); 17 } 18 }; 19 20 return backend.async_authenticate_user(username, password).then(cbk1).then(cbk2); 21 }
等待多个future全部就绪
假定有大量数据需要处理,而且每项数据都能独立完成,此时我们会想到生成一组异步任务分别处理数据,最后通过future等待所有任务完成。但是,有可能我们需要将所有的结果汇总起来,则需要等待所有线程将任务完成,为此可能需要新开一个线程,让该线程不断检查future是否就绪,代码如下:
1 std::future<FinalResult> process_data(std::vector<MyData>& vec) 2 { 3 size_t const chunk_size = whatever; 4 std::vector<std::future<ChunkResult>> results; 5 6 for (auto begin = vec.begin(), end = vec.end(); beg != end;) 7 { 8 const size_t remaining_size = end - begin; 9 const size_t this_chunk_size = std::min(remaining_size, chunk_size); 10 results.push_back(std::async(process_chunk, begin, begin + this_chunk_size)); 11 begin += this_chunk_size; 12 } 13 14 //新开线程,不断检查future是否就绪 15 auto cbk = [all_results = std::move(results)]() 16 { 17 std::vector<ChunkResult> v; 18 for (auto& f : all_results) 19 { 20 v.push_back(f.get()); 21 } 22 }; 23 return std::async(cbk); 24 }
std::experimental::when_all()函数
可以使用std::experimental::when_all()函数避免,该函数传入若干个需要等待的future对象,它在内部会生成并返回一个总future对象,等到传入的future全部就绪,总future也随之就绪。
std::experimental::when_all()函数产生一个新的future,将传入的多个future全部包装在内。
上述代码可以改为如下:
1 std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec) 2 { 3 size_t const chunk_size = whatever; 4 std::vector<std::experimental::future<ChunkResult>> results; 5 for (auto begin = vec.begin(), end = vec.end(); beg != end;) 6 { 7 const size_t remaining_size = end - begin; 8 const size_t this_chunk_size = std::min(remaining_size, chunk_size); 9 results.push_back(spawn_async(process_chunk, begin, begin + this_chunk_size)); 10 begin += this_chunk_size; 11 } 12 13 auto cbk = [](std::future<std::vector<std::experimental::future<ChunkResult>>> ready_results) 14 { 15 std::vector<std::experimental::future<ChunkResult>> all_results = ready_results.get(); 16 std::vector<ChunkResult> v; 17 v.reserve(all_results.size()); 18 19 for (auto& f : all_results) 20 { 21 v.push_back(f.get()); 22 } 23 return gather_results(v); 24 }; 25 26 return std::experimental::when_all(results.begin(), results.end()).then(cbk); 27 }
等待多个future中任意一个就绪
假定,我们依据某些具体条件,从庞大的数据集中查找值。不过,若存在多个值同时满足要求,则选取其中任意一个皆可。为此,我们可以生成多个线程,它们分别查找数据集的子集;若有线程找到了符合条件的值,就设立标志示意其他线程停止查找,并设置最终结果的值。
std::experimental::when_any()函数
我们可以采用std::experimental::when_any()函数统筹众多future,它生成一个新的future返回给调用者,只要原来的future中至少有一个准备就绪,则该新future也随之就绪。新future中包含了一个std::experimental::when_any_result<>内部实例,该实例由一个序列和一个索引值组成,其中序列包含传入的全体future,索引值则指明哪个future就绪。
代码如下:
1 struct DoneCheck 2 { 3 std::shared_ptr<std::experimental::promise<FinalResult>> final_result; 4 5 DoneCheck(std::shared_ptr<std::experimental::promise<FinalResult>> final_result_) : final_result(std::move(final_result_)) {} 6 7 void operator()(std::experimental::future<std::experimental::when_any_result<std::vector<std::experimental::future<MyData*>>>> results_param) 8 { 9 auto results = results_param.get(); 10 const MyData* ready_result = results.futures[results.index].get(); 11 12 if (ready_result) 13 final_result->set_value(process_found_value(*ready_result)); 14 else 15 { 16 results.futures.erase(results.futures.begin() + results.index); 17 if (!results.futures.empty()) 18 { 19 std::experimental::when_any(results.futures.begin(), results.futures.end()).then(std::move(*this)); 20 } 21 else 22 { 23 final_result->set_exception(std::make_exception_ptr(std::runtime_error("Not found"))); 24 } 25 } 26 } 27 }; 28 29 std::experimental::future<FinalResult> find_and_process_value(std::vector<MyData>& data) 30 { 31 const unsigned concurrency = std::thread::hardware_concurrency(); 32 const unsigned num_tasks = (concurrency > 0) ? concurrency : 2; 33 34 std::vector<std::experimental::future<MyData*>> results; 35 36 auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks; 37 38 auto chunk_begin = data.begin(); 39 40 std::shared_ptr<std::atomic<bool>> done_flag = std::make_shared<std::atomic<bool>>(false); 41 42 for (unsigned i = 0; i < num_tasks; ++i) 43 { 44 auto chunk_end = (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end(); 45 46 auto cbk = [=]() 47 { 48 for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end); ++entry) 49 { 50 if (matches_find_criteria(*entry)) 51 { 52 *done_flag = true; 53 return &*entry; 54 } 55 } 56 return (MyData*)nullptr; 57 }; 58 59 std::experimental::future<MyData*> _fu = spawn_async(cbk); 60 results.push_back(std::move(_fu)); 61 chunk_begin = chunk_end; 62 } 63 64 std::shared_ptr<std::experimental::promise<FinalResult>> final_result = std::make_shared<std::experimental::promise<FinalResult>>(); 65 66 std::experimental::when_any(results.begin(), results.end()).then(DoneCheck(final_result)); 67 return final_result->get_future(); 68 }
上面代码别看了,要命...(书上的更要命0_0),下面这句最要紧:
void operator()(std::experimental::future<std::experimental::when_any_result<std::vector<std::experimental::future<MyData*>>>> results_param)
返回的新future中保存的是一个std::experimental::when_any_result类型的实例,调用get函数可以获取future序列和索引值,如下:
1 auto results = results_param.get(); 2 const MyData* ready_result = results.futures[results.index].get();
上面代码中,results.futures是future序列,results.index是索引值,results.futures[results.index]是就绪的future。
基本的线程闩类std::experimental::latch
std::experimental::latch的构造函数接收唯一一个参数,在构建该类对象时,我们需通过这个参数设定其计数器的初值。接下来,每当等待的目标事件发生时,我们就在线程闩对象上调用count_down(),一旦计数器减到0,它就进入就绪状态。若我们要等待线程闩的状态变为就绪,则在其上调用wait()。若需检查其是否已经就绪,则调用is_ready()。最后,假如我们要使计数器减持,同时要等待它减到0,则应该调用count_down_and_wait()。
例子如下:
1 void foo() 2 { 3 unsigned const thread_count = 5; 4 std::experimental::latch done(thread_count); 5 my_data data[thread_count]; 6 std::vector<std::future<void>> threads; 7 8 for (unsigned i = 0; i < thread_count; ++i) 9 { 10 auto cbk = [&, i]() 11 { 12 data[i] = make_data(i); 13 done.count_down(); 14 do_more_stuff(); 15 }; 16 17 std::future<void> _fu = std::async(std::launch::async, cbk); 18 threads.push_back(std::move(_fu)); 19 } 20 21 done.wait(); 22 process_data(data, thread_count); 23 }
基本的线程卡类std::experimental::barrier
并发技术规约提出了两种线程卡:
- std::experimental::barrier
- std::experimental:: flex_barrier
假定有一组线程在协同处理某些数据,各线程相互独立,分别处理数据,因此操作过程不必同步。但是,只有在全部线程都完成各自的处理后,才可以操作下一项数据或开始后续处理,线程卡就是为了应对这种场景。为了同步一组线程,我们创建线程卡,并指定参与同步的线程数目。线程在完成自身的处理后,就运行到线程卡处,通过在线程卡对象上调用arrive_and_wait()等待同步组的其他线程。只要组内最后一个线程也运行至此,所有线程即被释放,线程卡会自我重置。接着,线程组视具体情况各自继续,或处理下一项数据,或进行下一阶段的处理。
代码如下:
1 void process_data(data_source& source, data_sink& sink) 2 { 3 const unsigned concurrency = std::thread::hardware_concurrency(); 4 const unsigned num_threads = (concurrency > 0) ? concurrency : 2; //线程个数 5 6 std::experimental::barrier sync(num_threads); 7 std::vector<joining_thread> threads(num_threads); 8 9 std::vector<data_chunk> chunks; 10 result_block result; 11 12 for (unsigned i = 0; i < num_threads; ++i) 13 { 14 auto cbk = [&, i]() 15 { 16 while (!source.done()) 17 { 18 if (!i) //在0号线程上做分割 19 { 20 data_block current_block = source.get_next_data_block(); 21 chunks = divide_into_chunks(current_block, num_threads); //分成num_threads块 22 } 23 24 sync.arrive_and_wait(); //其他线程等待0号线程分割完 25 result.set_chunk(i, num_threads, process(chunks[i])); //各个线程进行相关处理 26 sync.arrive_and_wait(); //等待所有线程处理完 27 28 if (!i) 29 { 30 sink.write_data(std::move(result)); //由0号线程负责处理写入操作 31 } 32 } 33 }; 34 35 threads[i] = joining_thread(cbk); //创建std::thread线程,会自动调用join() 36 } 37 }
std::experimental::flex_barrier
std::experimental::flex_barrier是std::experimental::latch的灵活版本,二者的不同之处在于,前者具备另一个构造函数,其参数既接收线程的数目,还接收补全函数。只要全部线程都运行到线程卡处,该函数就会在其中一个线程上运行(并且是唯一一个)。
代码如下:
1 void process_data(data_source& source, data_sink& sink) 2 { 3 const unsigned concurrency = std::thread::hardware_concurrency(); 4 const unsigned num_threads = (concurrency > 0) ? concurrency : 2; 5 std::vector<data_chunk> chunks; 6 7 auto split_source = [&] () 8 { 9 if (!source.done()) 10 { 11 data_block current_block = source.get_next_data_block(); 12 chunks = divide_into_chunks(current_block, num_threads); 13 } 14 }; 15 16 //先对数据进行分割 17 split_source(); 18 19 result_block result; 20 21 auto cbk1 = [&]() 22 { 23 sink.write_data(std::move(result)); 24 split_source(); 25 return -1; 26 }; 27 28 //等到线程都到达时,其中一个线程会调用cbk1补全函数 29 std::experimental::flex_barrier sync(num_threads, cbk1); 30 31 std::vector<joining_thread> threads(num_threads); 32 for (unsigned i = 0; i < num_threads; ++i) 33 { 34 auto cbk2 = [&, i] 35 { 36 while (!source.done()) 37 { 38 result.set_chunk(i, num_threads, process(chunks[i])); //各个线程进行相关处理 39 sync.arrive_and_wait(); //等待所有线程就位(就位后其中一个线程会调用补全函数cbk1) 40 } 41 }; 42 threads[i] = joining_thread(cbk2); //创建std::thread线程,会自动调用join() 43 } 44 }
Copyright
本文参考至《c++并发编程实战》 第二版,作者:安东尼·威廉姆斯。本人阅读后添加了自己的理解并整理,方便后续查找,可能存在错误,欢迎大家指正,感谢!
标签:std,const,函数,编程,c++,并发,future,线程,wait From: https://www.cnblogs.com/BroccoliFighter/p/17720580.html