Waiting for one-off events with futures #include <future>
1. std::future
c++标准库使用了一种叫做 std::future
的东西来模拟一次性等待事件(one-off event)。(见末尾补充)
线程可以周期性地在 future
上等待较短的时间,以查看事件是否已经发生,同时在检查之间执行一些其他任务。或者,它可以执行另一项任务,直到需要这个事件发生后才能继续执行,然后等待future
变成 ready
状态。因为是一次性的,所以 future
不能重置 reset。
#include <future>
包含两个类模板 unique futures (std::future<>)
和 shared futures (std::shared_future<>)
。std::future
的一个实例是引用其关联事件的唯一实例,而std::shared_future
的多个实例可能引用同一事件(与 unique_ptr 和 shared_ptr 很像)。模板参数就是关联的数据类型,在没有关联数据的地方应该使用偏特化的 std:future<void>
和 std::shared_future<void>
。
如果多个线程需要访问单个 future
对象,它们必须通过互斥锁或其他同步机制来保护访问。但是多个线程可以各自访问 std::shared_future<>
的副本,而不需要进一步的同步,即使它们都引用相同的异步结果。
2. Returning values from background tasks 从后台任务返回值
这里需要用到 std::async
函数模板(也是声明在 <future>
头文件中)。可以使用std::async
来启动一个不需要立即得到结果的异步任务。与等待 std::thread
对象不同,std::async
返回一个 std::future
对象,它将最终保存函数的返回值。当需要该值时,只需在future
上调用 get(),线程会阻塞,直到 future
准备好,然后返回该值。
#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}
与std::thread
相同,async
允许通过向调用中添加额外的参数来向函数传递额外的参数(可以是 [成员函数,成员对象或指针,成员函数参数],或者 [函数,函数参数] 等)。与std::thread
一样,如果参数是右值,则通过移动原始值来创建副本。
#include <string>
#include <future>
struct X
{
void foo(int,std::string const&);
std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello");
auto f2=std::async(&X::bar,x,"goodbye");
struct Y
{
double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141);
auto f4=std::async(std::ref(y),2.718);
X baz(X&);
std::async(baz,std::ref(x));
class move_only
{
public:
move_only();
move_only(move_only&&)
move_only(move_only const&) = delete;
move_only& operator=(move_only&&);
move_only& operator=(move_only const&) = delete;
void operator()();
};
auto f5=std::async(move_only());
默认情况下,决定std::async
是否启动一个新线程,或者当等待future
调用wait()
或get()
时再同步地运行,是由实现它的库或者系统决定的。当然用户也可以自己选择,可以在 async
中传递一个参数,该参数的类型是std::launch
,可以是std::launch::deferred
,表示函数调用将被延迟到 future
调用wait()
或get()
, std::launch::async
,表示函数必须在自己的线程上运行,或者 std::launch::deferred | std::launch::async
表示由库或系统自己选择。
auto f6=std::async(std::launch::async,Y(),1.2);
auto f7=std::async(std::launch::deferred,baz,std::ref(x));
auto f8=std::async(
std::launch::deferred | std::launch::async,
baz,std::ref(x));
auto f9=std::async(baz,std::ref(x));
f7.wait();
然而,这并不是将 std::future
与任务关联的唯一方法,还可以将任务包装在std::packaged_task<>
类模板的实例中,或者编写代码使用std::promise<>
类模板显式设置值。packaged_ task
是比std::promise
更高层次的抽象。
3. Associating a task with a future 将任务和 future 关联
std::packaged_task<>
将future
绑定到函数或可调用对象。当调用 std:: packaged_task<>
对象时,它将调用这个函数或可调用对象,并将返回值作为相关数据存储。
std::packaged_task<>
的模板参数是一个函数签名。函数签名的返回类型将标识从get_future()
成员函数返回的std::future<>
的类型,而函数签名的参数列表用于指定任务函数的参数列表签名或可调用对象的()
操作符的参数列表签名。比如下面这个 packaged_task
的特化部分实现。
template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
template<typename Callable>
explicit packaged_task(Callable&& f);
std::future<std::string> get_future();
void operator()(std::vector<char>*,int);
};
从上面可以看出,std::packaged_task
对象是一个可调用对象,它可以包装在std::function
对象中,作为线程函数传递给std::thread
,或者传递给需要可调用对象的另一个函数,甚至可以直接调用。下面作者给出了一个例子,在线程间传递任务:7 创建一个任务;8 从任务中取出 future;9 将任务放在任务列表中,等待其他线程执行任务;10 将 future 从函数中返回;5 执行线程拿出任务;6 执行任务,任务执行结束会将 10 中返回的 future 中的 shared state 设置为 ready,即调用 future 即可得到任务执行结果。
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>
std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread() // 1
{
while(!gui_shutdown_message_received()) // 2
{
get_and_process_gui_message(); // 3
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty()) // 4
continue;
task=std::move(tasks.front()); // 5
tasks.pop_front();
}
task(); // 6
}
}
std::thread gui_bg_thread(gui_thread);
template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f); // 7
std::future<void> res=task.get_future(); // 8
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task)); // 9
return res; // 10
}
总结一下 packaged_task
和future
的关系:packaged_task
相当于将一个函数或者可调用对象(这里称为任务)与 future
进行绑定,即通过 packaged_task 的 get_future 成员函数可以获得这个任务的 future 对象。而 packaged_task 本身又是一个可调用对象,因此可以像对待一个普通对象一样对待它,而且它也是一个可调用对象,因此可以像函数一样对待它。回想一下最开始的 async 函数,future 是在定义 async 函数时就返回了,且要么是延迟执行任务,要么是马上在新线程中执行任务(它本身是一个函数,因此不可能使用如 vector 之类的容器来存储它)。
4. std::promise
前面的那两种方法,任务都是用函数表示的,那么那些不能用简单的函数调用来表示的任务,或者那些结果可能来自多个地方的任务,该怎么办呢?这些情况可以通过创建future
的第三种方法来处理:使用std::promise
显式地设置值。
std::promise<T>
提供了一种设置值(类型为T)的方法,后面可以通过关联的std::future<T>
对象读取该值。也就是说,可以先获取 promise
相关联的 future
对象,然后设置 promise
的值,这样 future
通过 get
成员函数就可以得到该值。
可以通过调用get_future()
成员函数来获得与给定std::promise
相关联的std::future
对象,就像std::packaged_task
一样。当设置了promise
的值(使用set_value()
成员函数)时,future
就准备好了,可以用来检索存储的值。如果在未设置值的情况下销毁std::promise
,则会存储异常。
// https://cplusplus.com/reference/future/promise/
// promise example
#include <iostream> // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future
void print_int (std::future<int>& fut) {
int x = fut.get();
std::cout << "value: " << x << '\n';
}
int main ()
{
std::promise<int> prom; // create promise
std::future<int> fut = prom.get_future(); // engagement with future
std::thread th1 (print_int, std::ref(fut)); // send future to new thread
prom.set_value (10); // fulfill promise
// (synchronizes with getting the future)
th1.join();
return 0;
}
作者给出的一个网络连接的例子:
#include <future>
void process_connections(connection_set& connections)
{
while(!done(connections)) // 1
{
for(connection_iterator // 2
connection=connections.begin(),end=connections.end();
connection!=end;
++connection)
{
if(connection->has_incoming_data()) // 3
{
data_packet data=connection->incoming();
std::promise<payload_type>& p=
connection->get_promise(data.id); // 4
p.set_value(data.payload);
}
if(connection->has_outgoing_data()) // 5
{
outgoing_packet data=
connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true); // 6
}
}
}
}
5. Saving an exception for the future future 保存异常
看如下一个例子:
double square_root(double x)
{
if(x<0)
{
throw std::out_of_range(âx<0â);
}
return sqrt(x);
}
std::future<double> f=std::async(square_root,-1);
double y=f.get();
如果作为std::async
的一部分被调用的函数调用抛出了一个异常,这个异常被存储在future中以代替存储的值,future就准备好了,然后调用get()
重新抛出存储的异常。
如果将函数包装在std::packaged_task
中,也会发生同样的情况——在调用任务时,如果被包装的函数抛出异常,则该异常将存储在将来的结果中,以便在调用get()
时抛出。
当然,std::promise
通过显式的函数调用提供了同样的功能。如果希望存储异常而不是值,则调用set_exception()
成员函数而不是set_value()
。
extern std::promise<double> some_promise;
try
{
some_promise.set_value(calculate_value());
}
catch(...)
{
some_promise.set_exception(std::current_exception());
}
它使用std::current_exception()
来检索抛出的异常;这里的替代方法是使用std::make_exception_ptr()
直接存储一个新的异常而不抛出:
some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo ")));
在future
中存储异常的另一种方法是销毁与future
关联的std::promise
或std::packaged_task
,而不调用promise
上的set
函数或调用打包的任务。
6. Waiting from multiple threads 从多个线程等待
然而 std::future
有它的局限性,就像最开始说的那样,只有一个线程可以等待结果。如果需要从多个线程等待相同的事件,则需要使用std::shared_future
来代替。std::future
仅可移动(因此所有权可以在实例之间转移,但一次只有一个实例引用特定的异步结果),std::shared_future
实例是可复制的(因此可以有多个对象引用相同的关联状态)。
现在,使用 std::shared_future
,单个对象上的成员函数仍然是不同步的,因此在从多个线程访问单个对象时,为了避免数据竞争,必须使用锁来保护访问(这里是说如果多个线程使用一个shared_future
对象,那么多个线程不能同时调用它的成员函数(因为存在共享资源,这些操作是加锁的),所以说这仍然是不同步的)。使用它的首选方法是将shared_future
对象的副本传递给每个线程,这样每个线程就可以安全地访问自己的本地shared_future
对象,因为内部内容现在已经被库正确地同步了。如果每个线程通过自己的std::shared_future
对象访问共享异步状态,那么从多个线程访问该状态是安全的。
将 future 转化为 shared_future:
std::promise<int> p;
std::future<int> f(p.get_future());
assert(f.valid());
std::shared_future<int> sf(std::move(f));
assert(!f.valid());
assert(sf.valid());
// another
std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future());
std::future
有一个share()
成员函数,可用来创建新的std::shared_future
,并且可以直接转移future
的所有权。
std::promise< std::map< SomeIndexType, SomeDataType, SomeComparator,
SomeAllocator>::iterator> p;
auto sf=p.get_future().share();
// 推导为 std::shared_future<std::map<SomeIndexType,
// SomeDataType, SomeComparator, SomeAllocator>::iterator>
有时需要限定等待事件的时间,为了处理这种情况,许多等待函数都有允许指定超时的变体。具体怎么做,见下节。
补充:
A future is an object that can retrieve a value from some provider object or function, properly synchronizing this access if in different threads.
"Valid" futures are future objects associated to a shared state, and are constructed by calling one of the following functions:
future objects are only useful when they are valid. Default-constructed future objects are not valid (unless move-assigned a valid future).
Calling future::get on a valid future blocks the thread until the provider makes the shared state ready (either by setting a value or an exception to it). This way, two threads can be synchronized by one waiting for the other to set a value.
/* 不知是否可以这样理解: future 与某一个 shared state 相关联,调用 future 的 get 或者 wait 等函数时,会检查 shared state 状态是否是 ready 状态,从而判断是否能够 provider object 的值或者函数的返回值,而该 shared state 的 ready 状态就是由 provider object 或者函数进行设置的,因此就可以实现线程间的同步. */
The lifetime of the shared state lasts at least until the last object with which it is associated releases it or is destroyed. Therefore, if associated to a future, the shared state can survive the object from which it was obtained in the first place (if any).
参考:
[1] Anthony Williams - C++ Concurrency in Action-Manning Publications (2019).