首页 > 编程语言 >c++并发编程实战-第4章 并发操作的同步

c++并发编程实战-第4章 并发操作的同步

时间:2023-09-21 17:56:18浏览次数:44  
标签:std const 函数 编程 c++ 并发 future 线程 wait

等待事件或等待其他条件

坐车案例

想象一种情况:假设晚上坐车外出,如何才能确保不坐过站又能使自己最轻松?

方法一:不睡觉,时刻关注自己的位置

 1 #include <iostream>
 2 #include <thread>
 3 #include <mutex>
 4 using namespace std;
 5 
 6 mutex _mtx;
 7 bool bFlag = false;
 8 void wait_for_flag()
 9 {
10     auto startTime = chrono::steady_clock::now();
11     while (1)
12     {
13         unique_lock<mutex> lock(_mtx);
14         if (bFlag)
15         {
16             auto endTime = chrono::steady_clock::now();
17             double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
18             cout << "wait_for_flag consume : " << dCount << endl;
19             return;
20         }
21     }
22 }
23 
24 void set_flag()
25 {
26     auto startTime = chrono::steady_clock::now();
27     unique_lock<mutex> lock(_mtx);
28     for (int i = 0; i < 5; i++)
29     {
30         lock.unlock();
31         //do something comsume 1000ms
32         this_thread::sleep_for(chrono::milliseconds(1000));
33         lock.lock();
34     }
35 
36     bFlag = true;
37     auto endTime = chrono::steady_clock::now();
38     double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
39     cout << "set_flag consume : " << dCount << endl;
40 }
41 
42 int main()
43 {
44     thread th1(wait_for_flag);
45     thread th2(set_flag);
46     th1.join();
47     th2.join();
48     return 0;
49 }

这种方式存在双重浪费:

  • 线程 th1(wait_for_flag)须不断查验标志,浪费原本有用的处理时间,这部分计算资源原本可以留给其他线程使用。
  • 线程 th1(wait_for_flag)每次循环都需要给互斥上锁,导致其他线程无法加锁。如果 th2 此时完成操作,则需要等待 th1 释放互斥才能操作。

程序输出如下:

set_flag consume : 5045.39
wait_for_flag consume : 5045.97

两个线程执行时间相近,但查看任务管理器,发现Debug程序CPU占用率始终保持10%。

方法二:通过设定多个闹钟,每隔一段时间叫醒自己

 1 void wait_for_flag()
 2 {
 3     auto startTime = chrono::steady_clock::now();
 4     unique_lock<mutex> lock(_mtx);
 5     while (!bFlag)
 6     {
 7         lock.unlock();
 8         //设置 500ms 的闹钟
 9         this_thread::sleep_for(chrono::milliseconds(500));    
10         lock.lock();
11     }
12 
13     auto endTime = chrono::steady_clock::now();
14     double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
15     cout << "wait_for_flag consume : " << dCount << endl;
16 }

上面代码中引用了 this_thread::sleep_for()函数,如果暂时不满足条件,就让线程休眠。这确有改进,因为线程休眠,所以处理时间不再被浪费(不用熬夜)。但是,还是存在缺陷,休眠间隔时间难以确定。如果设置太短,会导致频繁检验,如果设置太长,又可能导致过度休眠(到站还没响)。如果线程 th2 完成了任务,线程 th1 却没有被及时唤醒,就会导致延迟。

上面的代码将休眠时间设置为500ms,CPU占用率始终为0%,但两个线程的运行时间相差过大。运行结果如下:

set_flag consume : 5061.66
wait_for_flag consume : 5570.77

方法三:让列车员叫醒你(使用 c++提供的同步机制)

若数据存在先后处理关系,线程甲需要等待线程乙完成处理后才能开始操作,那么线程甲则需等待线程乙完成并且触发事件,其中最基本的方式是条件变量。

 1 mutex _mtx;
 2 bool bFlag = false;
 3 condition_variable _cond;    //条件变量
 4 void wait_for_flag()
 5 {
 6     auto startTime = chrono::steady_clock::now();
 7     unique_lock<mutex> lock(_mtx);
 8     _cond.wait(lock, []() {return bFlag; });    //等待
 9 
10     auto endTime = chrono::steady_clock::now();
11     double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
12     cout << "wait_for_flag consume : " << dCount << endl;
13 }
14 
15 void set_flag()
16 {
17     auto startTime = chrono::steady_clock::now();
18     unique_lock<mutex> lock(_mtx);
19     for (int i = 0; i < 5; i++)
20     {
21         lock.unlock();
22         //do something comsume 1000ms
23         this_thread::sleep_for(chrono::milliseconds(1000));
24         lock.lock();
25     }
26 
27     bFlag = true;
28     _cond.notify_one();    //通知
29     auto endTime = chrono::steady_clock::now();
30     double dCount = chrono::duration<double, std::milli>(endTime - startTime).count();
31     cout << "set_flag consume : " << dCount << endl;
32 }

引用条件变量后,两线程执行时间相差不大,程序输出如下:

set_flag consume : 5015.84
wait_for_flag consume : 5016.75

注:上述案例的测试结果可能不尽相同,理解意思即可。

条件变量

C++标准库提供了条件变量的两种实现:

  • std::condition_variable,只能和std::mutex一起使用。(推荐)
  • std::condition_variable_any,只要某一类型符合成为互斥的最低标准,就能与其一起使用。

二者都在标准库的头文件<condition_variable>内声明。

std::condition_variable

构造函数

condition_variable();
~condition_variable();

condition_variable(const condition_variable&) = delete;
condition_variable& operator=(const condition_variable&) = delete;

不支持拷贝、也不支持移动。

通知

void notify_one();      //唤醒一个等待者
void notify_all();      //唤醒所有等待者

wait()函数

void wait(unique_lock<mutex>& _Lck);
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred);

参数:

  • _Lck:独占锁,需要多次调用加锁、解锁操作。
  • _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。

含义:

使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。

该函数执行过程如下:

  • 当程序流程执行到wait时,如果指定了_Pred参数,wait会先执行_Pred。如果_Pred返回true,wait函数执行完毕,返回,执行后续代码;如果_Pred返回false,先将_Lck解锁并阻塞当前线程,等待其他线程唤醒。如果没有指定_Pred参数,等价于返回false的情况。
  • 后续,其他线程调用notify_one()或notify_all()函数唤醒当前线程。当前线程被唤醒,先将_Lck上锁,如果指定_Pred参数,则先进行检查,根据返回值决定是否阻塞。如果没有指定_Pred参数,wait函数执行完毕,返回,执行后续代码。

wait_for()函数

template <class _Rep, class _Period>
cv_status wait_for(
            unique_lock<mutex>& _Lck,
            const chrono::duration<_Rep, _Period>& _Rel_time);

template <class _Rep, class _Period, class _Predicate>
bool wait_for(
        unique_lock<mutex>& _Lck,
        const chrono::duration<_Rep, _Period>& _Rel_time,
        _Predicate _Pred);

参数:

  • _Lck:独占锁
  • _Rel_time:等待所消耗的最大时间
  • _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。
返回值:
  • cv_status:如果在最大时间时间内被唤醒,则wait_for()函数返回cv_status::no_timeout,否则wait_for()函数返回cv_status::timeout。
  • bool:返回_Pred的返回值。
含义:

使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。如果等待时常超过_Rel_time,wait函数将返回。

wait_until()函数

template <class _Clock, class _Duration>
cv_status wait_until(
            unique_lock<mutex>&_Lck,
            const chrono::time_point<_Clock, _Duration>& _Abs_time);

template <class _Clock, class _Duration, class _Predicate>
bool wait_until(
        unique_lock<mutex>&_Lck,
        const chrono::time_point<_Clock, _Duration>&_Abs_time,
        _Predicate _Pred);

参数:

  • _Lck:独占锁
  • _Abs_time:指定停止等待的时间点
  • _Pred:一个返回bool类型的可调用对象,用于检查条件是否成立。

返回值:

  • cv_status:如果在指定的时间点之前被唤醒,则wait_until()函数返回cv_status::no_timeout,否则wait_until()函数返回cv_status::timeout。
  • bool:返回_Pred的返回值。

含义:

使当前线程进入休眠状态,等待其他线程调用notify_one()函数或notify_all()函数唤醒。如果等待时常超过指定的_Abs_time时间点,wait函数将返回。

std::condition_variable_any

std::condition_variable_any与std::condition_variable类似,这里只简单列出成员函数,具体含义可以参考上面的std::condition_variable。

构造函数

condition_variable_any();
~condition_variable_any();

condition_variable_any(const condition_variable_any&) = delete;
condition_variable_any& operator=(const condition_variable_any&) = delete;

不支持拷贝、也不支持移动。

通知

void notify_one();
void notify_all();

wait()函数

template <class _Lock>
void wait(_Lock& _Lck);

template <class _Lock, class _Predicate>
void wait(_Lock& _Lck, _Predicate _Pred);

wait_for()函数

template <class _Lock, class _Rep, class _Period>
cv_status wait_for(
            _Lock & _Lck,
            const chrono::duration<_Rep, _Period>& _Rel_time);

template <class _Lock, class _Rep, class _Period, class _Predicate>
bool wait_for(
        _Lock & _Lck,
        const chrono::duration<_Rep, _Period>&_Rel_time,
        _Predicate _Pred);

wait_until()函数

template <class _Lock, class _Clock, class _Duration>
cv_status wait_until(_Lock & _Lck,
            const chrono::time_point<_Clock, _Duration>&_Abs_time)

template <class _Lock, class _Clock, class _Duration, class _Predicate>
bool wait_until(_Lock & _Lck,
        const chrono::time_point<_Clock, _Duration>&_Abs_time,
        _Predicate _Pred)

虚假唤醒

当线程从休眠状态中被唤醒,却发现等待条件未满足时,因而无事,这种情况被称为虚假唤醒。发生虚假唤醒最常见的情况是,多个线程争抢同一个条件,例如:

 1 mutex _mtx;
 2 condition_variable        _cond;
 3 queue<int> _dataQueue;
 4 
 5 void data_preparation_thread()
 6 {
 7     while (true)
 8     {
 9         int _data = rand();
10         {
11             std::lock_guard<mutex> lock(_mtx);
12             _dataQueue.push(_data);
13         }
14         _cond.notify_all();
15         this_thread::sleep_for(chrono::milliseconds(1000));
16     }
17 }
18 
19 void data_processing_thread()
20 {
21     while (true)
22     {
23         std::unique_lock<mutex> lock(_mtx);
24         _cond.wait(lock, []()
25             {
26                 bool bEmpty = _dataQueue.empty();
27                 if (bEmpty)
28                     cout << this_thread::get_id() << " be spurious waken up\n";
29 
30                 return !bEmpty;
31             });
32         int _data = _dataQueue.front();
33         _dataQueue.pop();
34         lock.unlock();
35 
36         cout << "threadID : " << this_thread::get_id() << " data = " << _data << endl;
37     }
38 }
39 
40 int main()
41 {
42     srand(time(NULL));
43 
44     thread th1(data_processing_thread);
45     thread th2(data_processing_thread);
46     thread th3(data_preparation_thread);
47     th1.join();
48     th2.join();
49     th3.join();
50     return 0;
51 }

两个线程竞争队列中的一条数据,总有一个是被虚假唤醒的。

唤醒丢失

 1 void wait_for_flag()
 2 {
 3     unique_lock<mutex> lock(_mtx);
 4     _cond.wait(lock);    //等待
 5 }
 6 
 7 void set_flag()
 8 {
 9     unique_lock<mutex> lock(_mtx);
10     bFlag = true;
11     _cond.notify_one();    //通知
12 }
13 
14 int main()
15 {
16     thread th1(set_flag);
17     thread th2(wait_for_flag);
18     th1.join();
19     th2.join();
20     return 0;
21 }

先执行set_flag()函数,设置数据,然后通知等待线程,此时wait_for_flag()函数还未进入等待状态,导致通知信号丢失,后续又无新的通知信号,导致线程一直处于阻塞状态。

使用 future 等待一次性事件发生

本节介绍std::future类,该类一般用于处理一次性事件,可以获取该事件的返回值。

我们可以在某个线程启动一个目标事件,该目标事件由新的线程去执行,并获取一个std::future对象。之后,该线程执行自己余下的任务,等到未来某一时刻,获取目标事件中执行的结果。

C++标准程序库有两种future,分别由两个类模板实现,其声明都位于标准库的头文件<future>内:

  • std::future:独占future。
  • std::shared_future:共享future。

它们的设计参照了std::unique_ptr和std::shared_ptr。同一目标事件仅仅允许关联唯一一个std::future实例,但可以关联多个std::shared_future实例。大致含义是:目标事件中的返回结果,如果想被其他多个线程访问,则应该使用std::shared_future,否则使用std::future。

std::future

std::future类提供了访问异步操作执行结果的机制。通过std::async、std::packaged_task或std::promise创建的异步操作,这些函数会返回一个std::future对象,该对象中保存了异步操作的执行结果。但是,该类的结果并不是共享的,即,该结果只能访问一次。

构造函数

future();
~future();
future(future&& _Other);
future& operator=(future&& _Right);

future(const future&) = delete;
future& operator=(const future&) = delete;

仅支持移动语义,不支持拷贝。

valid()函数

bool valid() const;

检测当前结果是否就绪。

get()函数

template <class _Ty>
_Ty get();

阻塞,等待future拥有合法结果并返回该结果。结果返回后,释放共享状态,后续调用valid()函数将返回false。若调用该函数前valid()为false,则行为未定义。

wait()函数

void wait() const;

阻塞直至结果变得可用,该函数执行后,valid() == true。

wait_for()函数

template <class _Rep, class _Per>
future_status wait_for(const chrono::duration<_Rep, _Per>& _Rel_time);

等待结果,如果在指定的超时间隔后仍然无法得到结果,则返回。

future_status有如下取值:

  • future_status::ready:共享状态就绪。
  • future_status::timeout:共享状态在经过指定的等待时间内仍未就绪。
  • future_status::deferred:共享状态持有的函数正在延迟运行,结果将在显式请求时计算。

wait_until()函数

template <class _Clock, class _Dur>
future_status wait_until(const chrono::time_point<_Clock, _Dur>& _Abs_time);

等待结果,如果在已经到达指定的时间点时仍然无法得到结果,则返回。

share()函数

template <class _Ty>
shared_future<_Ty> share();

将本对象移动到std::shared_future对象。

std::shared_future

类似std::future类,同样提供了访问异步操作执行结果的机制,与std::future类不同的是,该类的结果可以被访问多次(可以连续多次调用get()函数)。

构造函数

shared_future();      //构造函数
~shared_future();     //析构函数

//支持拷贝
shared_future(const shared_future& _Other);
shared_future& operator=(const shared_future& _Right);

//支持移动
shared_future(future<_Ty>&& _Other);
shared_future& operator=(shared_future&& _Right);

支持拷贝,可以复制多个std::shared_future对象指向同一异步结果。每个线程通过自身的std::shared_future对象副本访问共享的异步结果,这一操作是安全的。

valid()函数

bool valid() const;

检测当前结果是否可用。

get()函数

template <class _Ty>
const _Ty& get() const;

阻塞,等待shared_future 拥有合法结果并获取它。若调用此函数前valid()为false,则行为未定义。

等待

void wait() const;

template <class _Rep, class _Per>
future_status wait_for(
    const chrono::duration<_Rep, _Per>&_Rel_time);

template <class _Clock, class _Dur>
future_status wait_until(
    const chrono::time_point<_Clock, _Dur>& _Abs_time);

和std::future相似。

案例-构造std::shared_future对象

 1 std::promise<int> pro;
 2 std::future<int> _fu = pro.get_future();
 3 std::shared_future<int> _sfu = std::move(_fu);        //显示
 4 
 5 std::promise<int> pro;
 6 std::shared_future<int> _sfu = pro.get_future();        //隐式
 7 
 8 std::promise<int> pro;
 9 std::future<int> _fu = pro.get_future();
10 std::shared_future<int> _sfu = _fu.share();        //share函数

std::async()函数-从后台任务返回值

std::async()函数用于构建后台任务,并允许调用者在未来某一时刻获取该任务的返回值。

函数定义

template <class _Fty, class... _ArgTypes>
std::future<...> async(_Fty&& _Fnarg, _ArgTypes&&... _Args);

template <class _Fty, class... _ArgTypes>
std::future<...> async(launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args);
参数:
  • _Policy:该函数执行方式,有如下几种取值:
    • std::launch::async:异步执行。该函数执行后,会启动新线程执行可调用对象。
    • std::launch::deferred:惰性执行。该函数执行后,并不会启动线程,而是等到后续需要获取结果时,由获取值的线程直接执行可调用对象。如果没有调用get()或者wait(),可调用对象不会执行。
    • std::launch::async | std::launch::deferred:由系统自行决定采用其中一个。
  • _Fnarg:可调用对象。
  • _Args:传递给可调用对象的参数包。

返回值:

std::future对象,通过该对象可用获取后台任务的返回值。

参数传递流程

//省略Res_Data类
int Entry(Res_Data data)
{
    cout << "-----------";
    return 5;
}

int main()
{
    Res_Data _data;
    auto _fu = std::async(Entry, _data);
    cout << _fu.get() << endl;
    return 0;
}

输出如下:

008FF9E3  Constractor
008FF6E8  Copy Constractor
008FF33C  Move Constractor
00CE0854  Move Constractor
008FF33C  Destractor
008FF6E8  Destractor
009FDE74  Move Constractor
-----------
009FDE74  Destractor
00CE0854  Destractor
5
008FF9E3  Destractor

结论:一次拷贝,3次移动。

案例

int ThreadEntry()
{
    cout << "son threadId : " << std::this_thread::get_id() << " start to do something!" << endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(5000));
    cout << "son threadId : " << std::this_thread::get_id() << " end doing something!" << endl;
    return 5;
}

int main()
{
    cout << "main threadId : " << std::this_thread::get_id() << endl;
    std::future<int> fu = std::async(ThreadEntry);
    cout << "main thread to do something" << endl;

    cout << fu.get() << endl;            //等待线程结束并获取值,不能重复调用
    //fu.wait();                        //仅等待线程结束

    cout << "main thread end doing something" << endl;

    return 0;
}

如何判断std::async的执行方式

如果在构建异步任务时,不指定std::async的执行方式,那么操作系统会根据当前系统资源自动决定采用 std::launch::async 或 std::launch::deferred,那么,在程序中如何获知当前的执行方式呢?方法如下:

 1 std::future _fu = std::async(ThreadEntry);
 2 future_status status = _fu.wait_for(std::chrono::microseconds(0));//等待0ms
 3 if (status == future_status::deferred)        //延时执行
 4 {
 5     //todo
 6 }
 7 else if (status == future_status::ready || status == future_status::timeout)
 8 {
 9     //todo
10 }

std::future类的wait_for和wait_until成员函数会返回future_status枚举类型,该类型中记录当前的执行方式。

std::async与std::thread的区别

std::thread是创建一个线程执行相关操作,当系统资源紧张时,可能会导致线程创建失败。

std::async是创建一个异步任务,是否创建线程取决于操作系统,当系统资源紧张时,该函数可能不会创建线程,这取决于函数的执行方式。

std::thread不支持获取线程的执行结果,而std::async可以通过std::future获取异步任务的执行结果。

std::packaged_task - 关联future实例和任务

类模板 std::packaged_task 用于包装任何可调用对象,使之能被异步调用。可调用对象执行后,其返回值保存在std::future对象中,可用通过成员函数get_future()函数获取。

若一项庞杂的操作能分解为多个子任务,则可把它们分别包装到多个std::packaged_task实例之中,再传递给任务调度器或线程池。这就隐藏了细节,使任务抽象化,让调度器得以专注处理std::packaged_task<>实例,无须纠缠于形形色色的任务函数。

构造函数

packaged_task();      //默认构造函数
~packaged_task();     //析构函数

template <class _Fty2, ...>
packaged_task(_Fty2&& _Fnarg)    //传入一个可调用对象

//支持移动
packaged_task(packaged_task&& _Other);
packaged_task& operator=(packaged_task&& _Other);

//不支持拷贝
packaged_task(const packaged_task&) = delete;
packaged_task& operator=(const packaged_task&) = delete;

packaged_task的实例化与std::function类似,比如:

int ThreadEntry(string& str, double dValue, char* pBuf);

std::packaged_task<int(string&, double, char*)> task(ThreadEntry);

valid()函数

bool valid() const;

检查任务对象是否拥有合法的可调用对象。

swap()函数

void swap(packaged_task& _Other);

交换std::packaged_task中包装的可调用对象和与之关联的std::future对象。

get_future()函数

future<_Ret> get_future();

返回与之关联的std::future对象。

operator()()函数

void operator()(_ArgTypes... _Args);

调用被包装的可调用对象,可调用对象执行完后,将返回值保存到std::future对象中,并使std::future对象变为可用状态。

make_ready_at_thread_exit()函数

void make_ready_at_thread_exit(_ArgTypes... _Args);

该函数会调用被包装的可调用对象,可调用对象执行完后,将返回值保存到std::future对象中,但不会立马让std::future对象变为可用,而是等到该线程结束后才使其可用。

例如:

 1 int ThreadEntry()
 2 {
 3     cout << "son threadId : " << std::this_thread::get_id() << " start to do something!" << endl;
 4     std::this_thread::sleep_for(std::chrono::milliseconds(5000));
 5     cout << "son threadId : " << std::this_thread::get_id() << " end doing something!" << endl;
 6     return 5;
 7 }
 8 
 9 int main()
10 {
11     std::packaged_task<int(void)> task(ThreadEntry);
12     std::future<int> _fu = task.get_future();
13 
14     task.make_ready_at_thread_exit(); //执行该函数后,_fu未就绪,调用get将出错
15     //task();   //_fu立马就绪,调用get()不会出错
16     cout << "value = " << _fu.get() << endl;
17     return 0;
18 }

正确用法:

 1 int main()
 2 {
 3     std::packaged_task<int(void)> task(ThreadEntry);
 4     std::future<int> _fu = task.get_future();
 5 
 6     thread th1([&]() {task.make_ready_at_thread_exit(); });
 7     th1.detach();
 8     
 9     cout << "value = " << _fu.get() << endl;
10     return 0;
11 }

reset()函数

void reset();

抛弃之前的std::future对象和执行结果,重新构建新的std::future对象。不影响可调用对象。

案例-在线程间传递任务

 1 std::mutex m;
 2 std::deque<std::packaged_task<void()>> tasks;
 3 
 4 void gui_thread()
 5 {
 6     while (...)   
 7     {
 8         std::packaged_task<void()> task;
 9         {
10             std::lock_guard<std::mutex> lk(m);
11             if (tasks.empty())    
12                 continue;
13             task = std::move(tasks.front());    
14             tasks.pop_front();
15         }
16         task();
17         //其他操作
18     }
19 }
20 
21 template<typename Func>
22 std::future<void> post_task_for_gui_thread(Func f)
23 {
24     std::packaged_task<void()> task(f);  
25     std::future<void> res = task.get_future();  
26     std::lock_guard<std::mutex> lk(m);
27     tasks.push_back(std::move(task));    
28     return res;    
29 }

std::promise-创建std::promise

std::promise给出了一种异步求值的方法,能与某个std::future对象关联,延后读出需要求取的值。配对的std::promise和std::future可实现下面的工作机制:等待数据的线程在future上阻塞,而提供数据的线程利用相配的promise设定关联的值,使future准备就绪。

 1 void setValue(int nData, std::promise<int>& _pro)
 2 {
 3     cout << "set value threadID : " << this_thread::get_id() << endl;
 4     this_thread::sleep_for(chrono::seconds(3));        //休息3秒
 5     _pro.set_value( nData * 2 + 5 );
 6 }
 7 
 8 void getValue(std::future<int>& _fu)
 9 {
10     cout << "get value threadID : " << this_thread::get_id() << endl;
11     cout << "data = " << _fu.get() << endl;
12 }
13 
14 int main()
15 {
16     std::promise<int> pro;
17     std::future fu = pro.get_future();
18 
19     thread th1(setValue, 10, std::ref(pro));
20     thread th2(getValue, std::ref(fu));
21 
22     th1.join();
23     th2.join();
24     return 0;
25 }

构造函数

promise();
~promise();

template <class _Alloc>
promise(allocator_arg_t, const _Alloc& _Al);

//移动构造
promise(promise&& _Other);
promise& operator=(promise&& _Other);

//无拷贝
promise(const promise&) = delete;
promise& operator=(const promise&) = delete;

swap()函数

void swap(promise& _Other);

交换2个std::promise对象的内容,以及与之关联的std::future对象。

get_future()函数

future<_Ty> get_future();

获取std::future对象。

set_value()函数

void set_value(const _Ty& _Val);

void set_value(_Ty&& _Val);

将_Val存储到共享状态,并使其进入就绪状态。如果std::promise对象在销毁时还未设置值,会用异常代替。

set_value_at_thread_exit()函数

void set_value_at_thread_exit(const _Ty& _Val);

void set_value_at_thread_exit(_Ty&& _Val);

将_Val存储到共享状态,在线程结束时,使共享状态进入就绪状态。

set_exception()函数

void set_exception(exception_ptr _Exc);

将异常指针_Excl存储到共享状态,并使其进入就绪状态。

set_exception_at_thread_exit()函数

void set_exception_at_thread_exit(exception_ptr _Exc);

将异常指针_Excl存储到共享状态,在线程结束时,使共享状态进入就绪状态。

案例

 1 void process_connections(vector<ConnectPackage>& connections)
 2 {
 3     while (!done(connections))
 4     {
 5         for (auto itr : connections)
 6         {
 7             if (itr->has_incoming_data())        //有数据传入,接收
 8             {
 9                 data_packet data = connection->incoming();
10                 std::promise<payload_type>& p = connection->get_promise(data.id);
11                 p.set_value(data.payload);
12             }
13 
14             if (connection->has_outgoing_data())   //有数据需向外传递
15             {
16                 outgoing_packet data = connection->top_of_outgoing_queue();
17                 connection->send(data.payload);
18                 data.promise.set_value(true);
19             }
20         }
21     }
22 }

将异常保存到future中

在允许多线程程序时,如果有异常情况发生,异常值如何向外传递是我们需要考虑的一个问题,例如:

 1 double square_root(double x)
 2 {
 3     if (x < 0)
 4         throw std::out_of_range("x < 0");
 5     
 6     return sqrt(x);
 7 }
 8 
 9 int main()
10 {
11     std::future<double> f = std::async(square_root, -1);
12     double dVal = f.get();
13     return 0;
14 }

std::future实例化为double类型,因此f.get()将会返回一个double类型的值,那异常该如何传递呢?

c++规定,若经由std::async()调用的函数抛出异常,则会被保存到future中,代替本该设定的值,future随之进入就绪状态,等到其成员函数get()被调用,存储在内的异常即被重新抛出。假如我们把任务函数包装在std::packaged_task对象内,也依然如是。若包装的任务函数在执行时抛出异常,则会代替本应求得的结果,被保存到future内并使其准备就绪。只要调用get(),该异常就会被再次抛出。

对于std::promise类,可以调用调用成员函数set_exception()设置异常,例如:

1 try
2 {
3     pro.set_value(dVal);
4 }
5 catch (...)
6 {
7     pro.set_exception(std::make_exception_ptr(std::logic_error("error")));
8 }

另一个方法是,直接销毁与std::future对象关联的std::promise对象或std::packaged_task对象。如果关联的future未能准备就绪,无论销毁两者中的哪一个,其析构函数都会将异常std::future_error存储为异步任务的状态数据,它的值是错误代码是std::future_errc::broken_promise。

限时等待

时钟周期-std::ratio

时间周期是指时钟的计时单位,每隔多长时间计数一次,用std::chrono::ratio来描述,该类定义如下:

template<std::intmax_t Num, std::intmax_t Denom = 1>
class ratio;

std::chrono::ratio是一个比例类,其中Num代表分子,Denom代表分母。用于描述时间时,默认单位是秒,即:

ratio<2>          //代表2秒
ratio<60>         //代表1分钟
ratio<60*60>      //代表1小时
ratio<1,1000>    //代表1毫秒
ratio<1,1000000> //代表1微妙

时钟类

就C++标准库而言,时钟(clock)是时间信息的来源。具体来说,每种时钟都是一个类,提供4项关键信息:

  • 当前时刻。
  • 时间值的类型(int、double)。
  • 该时钟的计时单元的长度(std::chrono::duration)。
  • 计时速率是否恒定,即能否将该时钟视为恒稳时钟(steady clock)。

c++给出下列几种时钟:

  • std::chrono::system_clock:
  • std::chrono::steady_clock:
  • std::high_resolution_clock:

其中,high_resolution_clock 被定义为:

using high_resolution_clock = steady_clock;

时钟类中都提供了一个静态成员函数now(),用于返回当前时刻。另一个静态成员is_steady,用于判断当前始终的计时速率是否恒定且无法调整,如果is_steady=true,则该时钟为恒定时钟。通常,std::chrono::system_clock类不是恒稳时钟,而std::chrono::steady_clock是一个恒定时钟。

时长类

c++标准库用std::chrono::duration<>类描述时长,该类定义如下:

template<class Rep, class Period = std::ratio<1,1>>
class duration;

其中,Rep代表时钟数的类型;Period代表时钟周期,描述每一个计时周期代表多少秒。c++标准库也定义了一些常用的时长:

typedef duration<Rep ratio<3600,1>> hours;              //小时
typedef duration<Rep ratio<60,1>> minutes;              //分钟
typedef duration<Rep ratio<1,1>> seconds;               //秒
typedef duration<Rep ratio<1,1000>> milliseconds;       //毫秒
typedef duration<Rep ratio<1,1000000>> microseconds;    //微妙
typedef duration<Rep ratio<1,1000000000>> nanoseconds;  //纳秒

使用也非常方便,如果需要使线程休眠3秒,代码如下:

std::this_thread::sleep_for(std::chrono::seconds(3));    //休眠3秒

std::chrono::duration_cast时间转换

如果需要在两个时间周期转换,可以使用std::chrono::duration_cast<>进行显示转换,例如:

1 std::chrono::milliseconds ms(54802);
2 std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);
3 
4 cout << ms.count() << endl;    //54802
5 cout << s.count() << endl;     //54

强转将导致结果被截断。

count()函数

时长类支持算术运算,我们将时长乘或除以一个数值(数值应与时长类的计数类型相符)或对两个时长进行加减,就能得出一个新时长。可以通过count()函数获取该计时单位的数量:

1 std::chrono::seconds s = std::chrono::seconds(1) * 5;
2 cout << s.count() << endl;      //5

时间点类

在c++标准库中,用std::chrono::time_point<>表示时间点,时间点是一个时间跨度,始于一个称为时钟纪元的特定时刻(一般是1970年1月1日0时0分0秒),终于该时间点本身。

template <class _Clock, class _Duration = typename _Clock::duration>
class time_point;

其中,_Clock表示所参考的时钟,_Duration表示计时单元。

std::chrono::time_point类内提供了一个成员函数time_since_epoch(),该函数返回从时钟纪元到该时间点的时长跨度,下面例子是计算当前时间距1970年1月1日相隔多少天:

1 int main()
2 {
3     using days_type = std::chrono::duration<int, std::ratio<60 * 60 * 24>>;
4 
5     std::chrono::time_point<std::chrono::system_clock, days_type> today = 
6         std::chrono::time_point_cast<days_type>(std::chrono::system_clock::now());
7     
8     cout << today.time_since_epoch().count() << endl;
9 }

可以对某个时间进行加减操作,从而计算出新的时间点,例如:

std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(500);

上面代码计算出未来500毫秒时刻,我们可以用它来指定绝对超时时刻。在前面的案例中,也给出了计算某段代码执行时长的例子。

接受超时时限的函数

在c++标准库中,以_for结尾的,一般对应一个时长;以_until()结尾的,一般对应一个时间点。

1 std::chrono::milliseconds ms(500);
2 this_thread::sleep_for(ms);
3 
4 auto tp = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(500);
5 this_thread::sleep_until(tp);

前面介绍的几个类中,都有包含_for和_until()结尾的函数,例如:std::this_thread、std::condition_variable、std::future、std::unique_lock等。

运用同步操作简化代码

注:该小节整理的不是很好,有部分内容我自己也没做过测试,大伙粗略看一下(或者跳过)。

利用future进行函数式编程

函数式编程是指一种编程风格,函数调用的结果完全取决于参数,而不依赖任何外部状态。若我们以相同的参数调用同一个函数两次,结果会完全一致。有一个函数类型叫“纯函数”,它的含义是:它产生的作用被完全限制在返回值上,而不会改动任何外部状态。这非常契合并发编程,只要共享数据没有改动,就不会引发条件竞争,因而无须动用互斥保护。

下面例子给出了快速排序并发版本:(大概理解一下就好,实际效率非常低)

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if (input.empty()) return input;

    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    const T& pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(), [&pivot](const T& t) { return t < pivot; });
    std::list<T> lower_part;
    lower_part.splice(lower_part.begin(), input, input.begin(), divide_point);

    std::future<std::list<T>> fu = std::async(parallel_quick_sort<T>, std::move(lower_part));
    auto new_higher = parallel_quick_sort(std::move(input));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), fu.get());
    return result;
}

使用消息传递进行同步

CSP(通信式串行线程):线程间没有共享数据,只负责接收消息,然后根据消息做出响应。比如去ATM机取钱,取款人需要先插入银行卡,之后输入密码,验证通过后输入金额,确认并等待机器吐钱。如果用线程来完成这个功能,则会有一个插卡线程,检测有没有插入银行卡,只有等卡插入了,才会通知输入线程显示并接收用户输入的密码,验证通过后,才会通知取钱线程,这样一环套一环的效果,没有完成前一项,就无法切换至下一项,这就是CSP。

符合并发技术规约的后续风格并发

并发技术规约在名字空间std::experimental内,提供了std::experimental::promise和std::experimental::packaged_task,二者与原始版本的差异是都返回std::experimental::future实例。

std::experimental::future提供了一个新的关键特性-then()后续。所谓后续是指,一旦future就绪,该函数立马执行某个函数。

并发技术规约中并没有提供与std::async()等价的函数,我们可以自己实现一个spawn_async函数,功能和std::async()函数类似:

template<typename Func>
std::experimental::future<decltype(std::declval<Func>()())> spawn_async(Func&& func)
{
    std::experimental::promise<decltype(std::declval<Func>()())> pro;
    auto fu = pro.get_future();

    auto cbk = [_pro = std::move(pro), _func = std::decay_t<Func>(func)]() mutable 
    {
        try
        {
            _pro.set_value_at_thread_exit(_func());
        }
        catch (...)
        {
            _pro.set_exception_at_thread_exit(std::current_exception());
        }
    };

    std::thread th(cbk);
    th.detach();
    return fu;
}

后续函数的连锁调用

假定有一系列耗时的任务需要执行,而且,为了让主线程抽身执行其他任务,我们想按异步方式执行这些任务。例如,当用户登录应用程序时,我们就需向后端服务器发送信息以验证身份;完成身份验证之后,我们需再次向后端服务器请求其账户信息;最后,一旦取得了相关信息,就做出更新并显式呈现。

串行方式顺序执行代码如下:

 1 void process_login(std::string const& username, std::string const& password)
 2 {
 3     try 
 4     {
 5         user_id const id = backend.authenticate_user(username, password);        //验证身份
 6         user_data const info_to_display = backend.request_current_info(id);        //请求账户信息
 7         update_display(info_to_display);        //更新显示
 8     }
 9     catch (std::exception& e) 
10     {
11         display_error(e);
12     }
13 }

如果该代码在UI线程中执行,相关请求可能会较慢从而阻塞UI线程。

并行执行如下:

 1 std::future<void> process_login(std::string const& username, std::string const& password)
 2 {
 3     auto cbk = [=]()
 4     {
 5         try
 6         {
 7             const user_id id = backend.authenticate_user(username, password);        //验证身份
 8             const user_data info_to_display = backend.request_current_info(id);        //请求账户信息
 9             update_display(info_to_display);        //更新显示
10         }
11         catch (std::exception& e)
12         {
13             display_error(e);
14         }
15     };
16 
17     return std::async(std::launch::async, cbk);
18 }

 

大多数程序员可能会这样书写,实际上这也存在浪费。全部代码集中到后台的某一线程上运行,该后台线程仍会消耗大量资源以逐一完成各任务,因而发生阻塞。若这样的任务很多,那将导致大量线程无所事事,它们只是在空等。

我们需要按照后续函数的方式,将任务接合,形成调用链,每完成一个任务,就执行下一个。使用then()改进代码:

 1 std::experimental::future<void> process_login(std::string const& username, std::string const& password)
 2 {
 3     //验证身份
 4     auto cbk1 = [=]()
 5     {
 6         return backend.authenticate_user(username, password);
 7     };
 8 
 9     //请求账户信息
10     auto cbk2 = [](std::experimental::future<user_id> id)
11     {
12         return backend.request_current_info(id.get());
13     };
14 
15     //更新显示
16     auto cbk3 = [](std::experimental::future<user_data> info_to_display)
17     {
18         try {
19             update_display(info_to_display.get());
20         }
21         catch (std::exception& e) {
22             display_error(e);
23         }
24     };
25 
26     return spawn_async(cbk1).then(cbk2).then(cbk3);
27 }

登录流程被拆分成一系列任务,每个任务各自作为上一个任务的后续函数,接合成调用链。

最后作者还提供了一个写法,但我没理解的太懂,先贴代码,后续补充:

 1 std::experimental::future<void> process_login(std::string const& username, std::string const& password)
 2 {
 3     auto cbk1 = [](std::experimental::future<user_id> id)
 4     {
 5         return backend.async_request_current_info(id.get());
 6     };
 7 
 8     auto cbk2 = [](std::experimental::future<user_data> info_to_display)
 9     {
10         try
11         {
12             update_display(info_to_display.get());
13         }
14         catch (std::exception& e)
15         {
16             display_error(e);
17         }
18     };
19 
20     return backend.async_authenticate_user(username, password).then(cbk1).then(cbk2);
21 }

等待多个future全部就绪

假定有大量数据需要处理,而且每项数据都能独立完成,此时我们会想到生成一组异步任务分别处理数据,最后通过future等待所有任务完成。但是,有可能我们需要将所有的结果汇总起来,则需要等待所有线程将任务完成,为此可能需要新开一个线程,让该线程不断检查future是否就绪,代码如下:

 1 std::future<FinalResult> process_data(std::vector<MyData>& vec)
 2 {
 3     size_t const chunk_size = whatever;
 4     std::vector<std::future<ChunkResult>> results;
 5 
 6     for (auto begin = vec.begin(), end = vec.end(); beg != end;) 
 7     {
 8         const size_t  remaining_size = end - begin;
 9         const size_t  this_chunk_size = std::min(remaining_size, chunk_size);
10         results.push_back(std::async(process_chunk, begin, begin + this_chunk_size));
11         begin += this_chunk_size;
12     }
13 
14     //新开线程,不断检查future是否就绪
15     auto cbk = [all_results = std::move(results)]()
16     {
17         std::vector<ChunkResult> v;
18         for (auto& f : all_results)
19         {
20             v.push_back(f.get());
21         }
22     };
23     return std::async(cbk);
24 }

std::experimental::when_all()函数

可以使用std::experimental::when_all()函数避免,该函数传入若干个需要等待的future对象,它在内部会生成并返回一个总future对象,等到传入的future全部就绪,总future也随之就绪。

std::experimental::when_all()函数产生一个新的future,将传入的多个future全部包装在内。

上述代码可以改为如下:

 1 std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec)
 2 {
 3     size_t const chunk_size = whatever;
 4     std::vector<std::experimental::future<ChunkResult>> results;
 5     for (auto begin = vec.begin(), end = vec.end(); beg != end;) 
 6     {
 7         const size_t remaining_size = end - begin;
 8         const size_t this_chunk_size = std::min(remaining_size, chunk_size);
 9         results.push_back(spawn_async(process_chunk, begin, begin + this_chunk_size));        
10         begin += this_chunk_size;
11     }
12 
13     auto cbk = [](std::future<std::vector<std::experimental::future<ChunkResult>>> ready_results)
14     {
15         std::vector<std::experimental::future<ChunkResult>> all_results = ready_results.get();
16         std::vector<ChunkResult> v;
17         v.reserve(all_results.size());
18 
19         for (auto& f : all_results)
20         {
21             v.push_back(f.get());
22         }
23         return gather_results(v);
24     };
25 
26     return std::experimental::when_all(results.begin(), results.end()).then(cbk);
27 }

等待多个future中任意一个就绪

假定,我们依据某些具体条件,从庞大的数据集中查找值。不过,若存在多个值同时满足要求,则选取其中任意一个皆可。为此,我们可以生成多个线程,它们分别查找数据集的子集;若有线程找到了符合条件的值,就设立标志示意其他线程停止查找,并设置最终结果的值。

std::experimental::when_any()函数

我们可以采用std::experimental::when_any()函数统筹众多future,它生成一个新的future返回给调用者,只要原来的future中至少有一个准备就绪,则该新future也随之就绪。新future中包含了一个std::experimental::when_any_result<>内部实例,该实例由一个序列和一个索引值组成,其中序列包含传入的全体future,索引值则指明哪个future就绪。

代码如下:

 1 struct DoneCheck
 2 {
 3     std::shared_ptr<std::experimental::promise<FinalResult>> final_result;
 4 
 5     DoneCheck(std::shared_ptr<std::experimental::promise<FinalResult>> final_result_) : final_result(std::move(final_result_)) {}
 6 
 7     void operator()(std::experimental::future<std::experimental::when_any_result<std::vector<std::experimental::future<MyData*>>>> results_param)
 8     {
 9         auto results = results_param.get();
10         const MyData* ready_result = results.futures[results.index].get();
11 
12         if (ready_result)
13             final_result->set_value(process_found_value(*ready_result));
14         else
15         {
16             results.futures.erase(results.futures.begin() + results.index);
17             if (!results.futures.empty())
18             {
19                 std::experimental::when_any(results.futures.begin(), results.futures.end()).then(std::move(*this));
20             }
21             else
22             {
23                 final_result->set_exception(std::make_exception_ptr(std::runtime_error("Not found")));
24             }
25         }
26     }
27 };
28 
29 std::experimental::future<FinalResult> find_and_process_value(std::vector<MyData>& data)
30 {
31     const unsigned  concurrency = std::thread::hardware_concurrency();
32     const unsigned  num_tasks = (concurrency > 0) ? concurrency : 2;
33 
34     std::vector<std::experimental::future<MyData*>> results;
35 
36     auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
37 
38     auto chunk_begin = data.begin();
39 
40     std::shared_ptr<std::atomic<bool>> done_flag = std::make_shared<std::atomic<bool>>(false);
41 
42     for (unsigned i = 0; i < num_tasks; ++i)
43     {
44         auto chunk_end = (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
45 
46         auto cbk = [=]()
47         {
48             for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end); ++entry)
49             {
50                 if (matches_find_criteria(*entry))
51                 {
52                     *done_flag = true;
53                     return &*entry;
54                 }
55             }
56             return (MyData*)nullptr;
57         };
58 
59         std::experimental::future<MyData*> _fu = spawn_async(cbk);
60         results.push_back(std::move(_fu));
61         chunk_begin = chunk_end;
62     }
63 
64     std::shared_ptr<std::experimental::promise<FinalResult>> final_result = std::make_shared<std::experimental::promise<FinalResult>>();
65 
66     std::experimental::when_any(results.begin(), results.end()).then(DoneCheck(final_result));
67     return final_result->get_future();
68 }

上面代码别看了,要命...(书上的更要命0_0),下面这句最要紧:

void operator()(std::experimental::future<std::experimental::when_any_result<std::vector<std::experimental::future<MyData*>>>> results_param)

返回的新future中保存的是一个std::experimental::when_any_result类型的实例,调用get函数可以获取future序列和索引值,如下:

1 auto results = results_param.get();
2 const MyData* ready_result = results.futures[results.index].get();

上面代码中,results.futures是future序列,results.index是索引值,results.futures[results.index]是就绪的future。

基本的线程闩类std::experimental::latch

std::experimental::latch的构造函数接收唯一一个参数,在构建该类对象时,我们需通过这个参数设定其计数器的初值。接下来,每当等待的目标事件发生时,我们就在线程闩对象上调用count_down(),一旦计数器减到0,它就进入就绪状态。若我们要等待线程闩的状态变为就绪,则在其上调用wait()。若需检查其是否已经就绪,则调用is_ready()。最后,假如我们要使计数器减持,同时要等待它减到0,则应该调用count_down_and_wait()。

例子如下:

 1 void foo() 
 2 {
 3     unsigned const thread_count = 5;
 4     std::experimental::latch done(thread_count);
 5     my_data data[thread_count];
 6     std::vector<std::future<void>> threads;
 7 
 8     for (unsigned i = 0; i < thread_count; ++i)
 9     {
10         auto cbk = [&, i]()
11         {
12             data[i] = make_data(i);
13             done.count_down();
14             do_more_stuff();
15         };
16 
17         std::future<void> _fu = std::async(std::launch::async, cbk);
18         threads.push_back(std::move(_fu));
19     }
20 
21     done.wait();
22     process_data(data, thread_count);
23 }

基本的线程卡类std::experimental::barrier

并发技术规约提出了两种线程卡:

  • std::experimental::barrier
  • std::experimental:: flex_barrier

假定有一组线程在协同处理某些数据,各线程相互独立,分别处理数据,因此操作过程不必同步。但是,只有在全部线程都完成各自的处理后,才可以操作下一项数据或开始后续处理,线程卡就是为了应对这种场景。为了同步一组线程,我们创建线程卡,并指定参与同步的线程数目。线程在完成自身的处理后,就运行到线程卡处,通过在线程卡对象上调用arrive_and_wait()等待同步组的其他线程。只要组内最后一个线程也运行至此,所有线程即被释放,线程卡会自我重置。接着,线程组视具体情况各自继续,或处理下一项数据,或进行下一阶段的处理。

代码如下:

 1 void process_data(data_source& source, data_sink& sink)
 2 {
 3     const unsigned concurrency = std::thread::hardware_concurrency();
 4     const unsigned num_threads = (concurrency > 0) ? concurrency : 2;            //线程个数
 5 
 6     std::experimental::barrier sync(num_threads);
 7     std::vector<joining_thread> threads(num_threads);
 8 
 9     std::vector<data_chunk> chunks;
10     result_block result;
11 
12     for (unsigned i = 0; i < num_threads; ++i)
13     {
14         auto cbk = [&, i]()
15         {
16             while (!source.done())
17             {
18                 if (!i) //在0号线程上做分割
19                 {
20                     data_block current_block = source.get_next_data_block();    
21                     chunks = divide_into_chunks(current_block, num_threads);    //分成num_threads块
22                 }
23 
24                 sync.arrive_and_wait();        //其他线程等待0号线程分割完
25                 result.set_chunk(i, num_threads, process(chunks[i]));        //各个线程进行相关处理
26                 sync.arrive_and_wait();        //等待所有线程处理完
27 
28                 if (!i) 
29                 {
30                     sink.write_data(std::move(result));        //由0号线程负责处理写入操作
31                 }
32             }
33         };
34 
35         threads[i] = joining_thread(cbk);            //创建std::thread线程,会自动调用join()
36     }
37 }

std::experimental::flex_barrier

std::experimental::flex_barrier是std::experimental::latch的灵活版本,二者的不同之处在于,前者具备另一个构造函数,其参数既接收线程的数目,还接收补全函数。只要全部线程都运行到线程卡处,该函数就会在其中一个线程上运行(并且是唯一一个)。

代码如下:

 1 void process_data(data_source& source, data_sink& sink)
 2 {
 3     const unsigned concurrency = std::thread::hardware_concurrency();
 4     const unsigned num_threads = (concurrency > 0) ? concurrency : 2;
 5     std::vector<data_chunk> chunks;
 6 
 7     auto split_source = [&] ()
 8     {
 9         if (!source.done()) 
10         {
11             data_block current_block = source.get_next_data_block();
12             chunks = divide_into_chunks(current_block, num_threads);
13         }
14     };
15 
16     //先对数据进行分割
17     split_source();
18 
19     result_block result;
20 
21     auto cbk1 = [&]()
22     {
23         sink.write_data(std::move(result));
24         split_source();
25         return -1;
26     };
27 
28     //等到线程都到达时,其中一个线程会调用cbk1补全函数
29     std::experimental::flex_barrier sync(num_threads, cbk1);
30 
31     std::vector<joining_thread> threads(num_threads);
32     for (unsigned i = 0; i < num_threads; ++i) 
33     {
34         auto cbk2 = [&, i]
35         {
36             while (!source.done())
37             {
38                 result.set_chunk(i, num_threads, process(chunks[i]));    //各个线程进行相关处理
39                 sync.arrive_and_wait();        //等待所有线程就位(就位后其中一个线程会调用补全函数cbk1)
40             }
41         };
42         threads[i] = joining_thread(cbk2);        //创建std::thread线程,会自动调用join()
43     }
44 }

Copyright

本文参考至《c++并发编程实战》 第二版,作者:安东尼·威廉姆斯。本人阅读后添加了自己的理解并整理,方便后续查找,可能存在错误,欢迎大家指正,感谢!

 

 

标签:std,const,函数,编程,c++,并发,future,线程,wait
From: https://www.cnblogs.com/BroccoliFighter/p/17720580.html

相关文章

  • 《探索C++多线程》:condition_variable源码(一)
    https://blog.csdn.net/hujingshuang/article/details/70596630    现在接着学习关于多线程编程的特征,在这一节,将会了解到多线程中的condition_variable(条件变量)的相关知识。     在头文件<condition_variable>中有两种条件变量的类声明与定义:condition_varia......
  • 那些让我世界观崩塌的c/c++玩法
    if('\0'==0){printf("true");}else{printf("false");}----------------------------------------------inti=false; if('\0'||0||NULL||i){ printf("true&q......
  • Win32编程之通过SetWindowsHookEx注入DLL(十六)
    一、SetWindowsHookEx函数SetWindowsHookEx是用于在Windows操作系统中设置全局或本地的钩子(hook)。钩子是一种用于监视并拦截特定事件或消息的机制,通常用于拦截和处理键盘输入、鼠标操作、窗口消息等。SetWindowsHookEx允许你安装一个全局或本地的钩子过程,以便在事件发生时执行......
  • 结对编程博客
    结对编程队友:软件2103黄晖凯项目结构如下图通过项目结构可以清晰的看出每个JAVA类的实现功能,便于查阅和修改代码,这是优点。 为了满足个人项目要求建立的抽象类,过于简单抽象, 在主方法后面加上throwsIOException,确保在出现异常的情况下不会崩溃,无法处理,使得代码更加健康。......
  • c++中的四种cast转换?
    c++中的四种cast转换是:static_cast,const_cast,dynamic_cast,reinteroret_cast;static_cast:1.用于类层次结构中父类和子类之间指针或引用的转换。进行多态向上转换(子类指针或引用转换成父类)是安全的,多态向下转化是不安全的(把父类指针或引用转换成子类的指针或引用,没有动态类型检查)......
  • c++ 引用
     引用最大的价值是避免复制 #include<iostream>usingnamespacestd;voidmethod2(int&param){param=param+1;cout<<"method2inner:"<<param<<endl;//method2inner:2}voidmethod1(intparam){param=pa......
  • c++中指针和引用的区别?
    1.指针是一个实体,需要分配内存空间,引用是一个变量的别名,不需要分配内存空间。2.引用在定义的时候必须进行初始化,并且不能改变。引用的值不能为NULL,指针在定义的时候不一定要初始化,并且指针所指向的空间是可变的,可以指向NULL。3.sizeof指针得到的是指针本事的大小,sizeof引用得到......
  • 并发编程系列-AQS
    AbstractQueuedSynchronizer(AQS)是一个抽象队列同步器,它用于构建依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器的框架。该类的目的在于提供基本功能的封装,适用于大多数需要使用单个原子int值表示同步状态的同步器。举例来说,ReentrantLock、Semaphore以及FutureTask等都是基于AQS......
  • 【c&c++】C++中memset()函数的用法详解
    头文件:cstring 或 memory话说刚开始使用memset的时候一直以为memset是对每一个int赋值的,心里想有了memset还要for循环对数组进行初始化干嘛。但其实memset这个函数的作用是将数字以单个字节逐个拷贝的方式放到指定的内存中去memset(dp,0,sizeof(dp));int类型的变量一般占......
  • Win32编程之函数转发注入DLL(十五)
    一、创建目标DLL文件DLL名称:targetdll.dll头文件(targetdll.h):#pragmaonce__declspec(dllexport)void__stdcallhello();__declspec(dllexport)int__stdcalladd(inta,intb);源文件(targetdll.cpp)#include<stdio.h>#include"targetdll.h"void_......