1. Functional programming with futures 使用 future 的函数式编程
函数式编程指的是一种编程方式,其函数调用的结果只依赖于函数参数,而不依赖于任何其他外部状态。一个纯粹的函数不会修改任何外部状态,比如数学库中的 sin 等函数。不修改共享数据,就不存在条件竞争,那也就没有必要使用互斥量保护共享数据。由于 future 可以在线程间相互传递,因此可以很好地运用在函数式编程模式并发(FP-style concurrency)中。
- FP-STYLE QUICKSORT
串行模式的快速排序:
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin()); // 1
T const& pivot=*result.begin(); // 2
auto divide_point=std::partition(input.begin(),input.end(),
[&](T const& t){return t<pivot;}); // 3
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),
divide_point); // 4
auto new_lower(
sequential_quick_sort(std::move(lower_part))); // 5
auto new_higher(
sequential_quick_sort(std::move(input))); // 6
result.splice(result.end(),new_higher); // 7
result.splice(result.begin(),new_lower); // 8
return result;
}
并发模式:
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),
[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),
divide_point);
std::future<std::list<T> > new_lower( // 1
std::async(¶llel_quick_sort<T>,std::move(lower_part)));
auto new_higher(
parallel_quick_sort(std::move(input))); // 2
result.splice(result.end(),new_higher); // 3
result.splice(result.begin(),new_lower.get()); // 4
return result;
}
使用 std::packaged_task 进行封装:
template<typename F,typename A>
std::future<std::result_of<F(A&&)>::type>
spawn_task(F&& f,A&& a)
{
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)>
task(std::move(f)));
std::future<result_type> res(task.get_future());
std::thread t(std::move(task),std::move(a));
t.detach();
return res;
}
FP并不是唯一避免共享可变数据的并发编程范式;另一种范式是CSP(Communicating Sequential Processer),其中线程在概念上是完全独立的,没有共享数据,但有允许消息在它们之间传递的通信通道。这种范型被Erlang语言所采纳,并且常用在MPI(Message Passing Interface,消息传递接口)上做高性能运算。
2. Synchronizing operations with message passing 使用消息传递的同步操作
CSP的概念很简单:没有共享数据时,每个线程可以基于所接收到的信息独立运行,所有消息都是通过消息队列传递。类似于状态机,根据输入进行状态更新。
具体实现参考 CSP。
3. Continuation-style concurrency with the Concurrency TS 扩展规范中的持续式并发
并发技术扩展规范在std::experiment命名空间中提供了新的类型std::promise和std::packaged_task。与std命名空间中类型完全不同,其返回实例类型为std::experimental::future,而不是std::future。
"When the data is ready, then do this processing". This is exactly what continuations give us; unsurprisingly, the member function to add a continuation to a future is called then(). Given a future fut, a continuation is added with the call fut.then(continuation).
与std::future类似 , std::experimental::future的存储值也只能检索一次。如果future处于持续使用状态,其他代码就不能访问这个furture。因此,使用fut.then()为fut添加持续性后,对原始fut的操作就是非法的。另外,调用fut.then()会返回一个新future,这个新future会持有持续性调用的结果。
std::experimental::future<int> find_the_answer();
auto fut=find_the_answer();
auto fut2=fut.then(find_the_question);
assert(!fut.valid());
assert(fut2.valid());
具体实现和使用参考 TS。
4. Chaining continuations 链式的持续式并发
假设有一系列耗时的任务要执行,并且希望异步地执行它们,以便为其他任务腾出主线程。例如,当用户登录到应用程序时,可能需要将凭证发送到后端进行身份验证;然后,当详细信息经过身份验证后,向后端进一步请求有关用户帐户的信息;最后,当检索到该信息后,用相关信息更新显示。
void process_login(std::string const& username, std::string const& password)
{
try{
user_id const id = backend.authenticate_user(username, password);
user_data const info_to_display = backend.request_current_info(id);
update_display(info_to_display);
} catch(std::exception& e){
display_error(e);
}
}
异步处理:
std::future<void> process_login(
std::string const& username, std::string const& password)
{
return std::async(std::launch::async,[=](){
try{
user_id const id = backend.authenticate_user(username, password);
user_data const info_to_display =
backend.request_current_info(id);
update_display(info_to_display);
} catch(std::exception& e){
display_error(e);
}
});
}
链式持续式:
std::experimental::future<void> process_login(
std::string const& username, std::string const& password)
{
return spawn_async([=](){
return backend.authenticate_user(username, password);
}).then([](std::experimental::future<user_id> id){
return backend.request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display){
try{
update_display(info_to_display.get());
} catch(std::exception& e){
display_error(e);
}
});
}
全异步处理:上面可以发现 backend.async_authenticate_user(username, password)返回 std::experimental::future<user_id> 会比返回user_id更加合适。
std::experimental::future<void> process_login(
std::string const& username, std::string const& password)
{
return backend.async_authenticate_user(username, password).then(
[](std::experimental::future<user_id> id){
return backend.async_request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display){
try{
update_display(info_to_display.get());
} catch(std::exception& e){
display_error(e);
}
});
}
此外还支持 std::experimental::shared_future同样支持持续性,具体参考 TS。
5. Waiting for more than one future 等待多个 future
使用 std::async 从多个 future 中收集结果
std::future<FinalResult> process_data(std::vector<MyData>& vec)
{
size_t const chunk_size = whatever;
std::vector<std::future<ChunkResult>> results;
for (auto begin=vec.begin(), end=vec.end(); beg!=end;){
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
std::async(process_chunk, begin, begin+this_chunk_size));
begin += this_chunk_size;
}
return std::async([all_results=std::move(results)](){
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f : all_results)
{
v.push_back(f.get()); // 1
}
return gather_results(v);
});
}
此代码生成一个新的异步任务来等待结果,然后在结果都可用时处理它们。但是,因为它分别等待每个任务,所以当每个结果可用时,它会被1的调度程序反复唤醒,然后在发现另一个结果尚未准备好时再次返回睡眠状态。这不仅占用了执行等待的线程,而且在每个future准备就绪时增加了额外的上下文切换,从而增加了额外的开销。使用std::experimental::when_all,可以避免这种等待和切换。将待等待的future集合传递给when_all,它将返回一个新的future,当集合中的所有future都准备好时,该future就准备好了,它还可以和 then 搭配。
std::experimental::future<FinalResult> process_data(
std::vector<MyData>& vec)
{
size_t const chunk_size = whatever;
std::vector<std::experimental::future<ChunkResult>> results;
for (auto begin = vec.begin(), end = vec.end(); beg != end){
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
spawn_async(
process_chunk, begin, begin+this_chunk_size));
begin += this_chunk_size;
}
return std::experimental::when_all(
results.begin(), results.end()).then( // 1
[](std::future<std::vector<std::experimental::future<ChunkResult>>> ready_results){
std::vector<std::experimental::future<ChunkResult>> all_results = ready_results.get();
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f: all_results){
v.push_back(f.get()); // 2
}
return gather_results(v);
});
}
为了补充when_all,还有when_any。这创建了一个future,当提供的任何一个future准备好时,它就准备好了。如果生成了多个任务以利用可用并发性,但需要在第一个任务准备好后进行一些操作,那么这种方法非常有效。
6. Waiting for the first future in a set with when_any 使用 when_any 等待第一个 ready 的 future
假设要在一个大型数据集中搜索一个满足特定条件的值,但如果有多个这样的值,那么任何一个都可以。这时可以生成多个线程,每个线程检查数据的一个子集;如果给定的线程找到一个合适的值,那么它设置一个标志,指示其他线程应该停止搜索,然后设置最终返回值。
在这里,可以使用std::experimental::when_any将future集聚集在一起,并提供一个新的future集,当至少一个future准备好时,这个future集就准备好了。when_all提供了一个future,它包装了传入的future集合,而when_any则添加了一个额外的层,并将集合与一个索引值结合起来,该索引值指示哪个future变成了 ready,并将这个future添加到std::experimental::when_any_result类模板实例中。
std::experimental::future<FinalResult>
find_and_process_value(std::vector<MyData> &data)
{
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_tasks = (concurrency > 0)? concurrency : 2;
std::vector<std::experimental::future<MyData *>> results;
auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
auto chunk_begin = data.begin();
std::shared_ptr<std::atomic<bool>> done_flag =
std::make_shared<std::atomic<bool>>(false);
for (unsigned i = 0; i < num_tasks; ++i){ // 1
auto chunk_end =
(i < (num_tasks - 1)? chunk_begin + chunk_size : data.end());
results.push_back(spawn_async([=]{ // 2
for (auto entry = chunk_begin;
!*done_flag && (entry != chunk_end);
++entry){
if (matches_find_criteria(*entry)){
*done_flag = true;
return &*entry;
}
}
return (MyData *)nullptr;
}));
chunk_begin = chunk_end;
}
std::shared_ptr<std::experimental::promise<FinalResult>> final_result =
std::make_shared<std::experimental::promise<FinalResult>>();
struct DoneCheck {
std::shared_ptr<std::experimental::promise<FinalResult>>
final_result;
DoneCheck(
std::shared_ptr<std::experimental::promise<FinalResult>>
final_result_)
: final_result(std::move(final_result_)) {}
void operator()( // 4
std::experimental::future<std::experimental::when_any_result<
std::vector<std::experimental::future<MyData *>>>>
results_param) {
auto results = results_param.get();
MyData *const ready_result =
results.futures[results.index].get(); // 5
if (ready_result)
final_result->set_value( // 6
process_found_value(*ready_result));
else {
results.futures.erase(
results.futures.begin() + results.index); // 7
if (!results.futures.empty()) {
std::experimental::when_any( // 8
results.futures.begin(), results.futures.end())
.then(std::move(*this));
} else {
final_result->set_exception(
std::make_exception_ptr( // 9
std::runtime_error("not found")));
}
}
}
}
std::experimental::when_any(results.begin(), results.end())
.then(DoneCheck(final_result)); // 3
return final_result->get_future(); // 10
}
也可以显示将 future 作为参数。
std::experimental::future<int> f1=spawn_async(func1);
std::experimental::future<std::string> f2=spawn_async(func2);
std::experimental::future<double> f3=spawn_async(func3);
std::experimental::future<
std::tuple<
std::experimental::future<int>,
std::experimental::future<std::string>,
std::experimental::future<double>>> result=
std::experimental::when_all(std::move(f1),std::move(f2),std::move(f3));
7. Latches and barriers in the Concurrency TS 锁存器和栅栏
锁存器是一种同步对象,当计数器减为0时,就处于就绪态了。锁存器是基于其输出特性——当处于就绪态时,就会保持就绪态,直到被销毁。因此,锁存器是为同步一系列事件的轻量级机制。
栅栏是一个可重用的同步组件,用于一组线程之间的内部同步。而锁存器并不关心哪个线程会使计数器减少——同一个线程可以使计数器减少多次,或者多个线程可以使计数器减少一次,或者两者的某种组合。栅栏,每个线程每个周期只能到达栅栏一次。当线程到达栅栏时,它们会阻塞,直到所有涉及的线程都到达栅栏,这时它们全部被释放。栅栏可以被重用。
- A basic latch type: std::experimental::latch
std::experimental::latch声明在<experimental/latch>头文件中。构造std::experimental::latch时,将计数器的值作为构造函数的唯一参数。当等待的事件发生,就会调用锁存器count_down成员函数。当计数器为0时,锁存器状态变为就绪。可以调用wait成员函数对锁存器进行阻塞,直到等待的锁存器处于就绪状态。如果需要对锁存器是否就绪的状态进行检查,可调用is_ready成员函数。想要减少计数器1并阻塞直至0,则可以调用count_down_and_wait成员函数。
void foo(){
unsigned const thread_count=...;
latch done(thread_count); // 1
my_data data[thread_count];
std::vector<std::future<void> > threads;
for(unsigned i=0;i<thread_count;++i)
threads.push_back(std::async(std::launch::async,[&,i]{ // 2
data[i]=make_data(i);
done.count_down(); // 3
do_more_stuff(); // 4
}));
done.wait(); // 5
process_data(data,thread_count); // 6
} // 7
6处的数据处理可能需要与线程的最终处理同步进行4——所以在std::future析构之前7,无法保证所有线程都已完成。需要注意的是,2传递给std::async的Lambda表达式中,通过引用的方式对除了i之外的所有内容进行捕获,而i是通过值捕获的方式进行传递。这是因为i是这里的循环计数器,数据和完成状态是共享访问的,所以通过引用捕获将会导致数据竞争和未定义的行为。此外,这里只要一个锁存器就够了,因为线程在数据准备好之后,还有其他任务要做。否则,就需要在处理数据前等待所有future,从确保所有任务都已经完成。
除了锁存器之外,并发技术扩展规范还提供了用于同步一组线程的可复用的同步对象——栅栏。
- 栅栏
并发技术扩展规范提供了两种栅栏机制,<experimental/barrier>头文件中,分别为:std::experimental::barrier 和std::experimental::flex_barrier 。前者更简单,开销更低。后者更灵活,开销较大。假设有一组线程正在操作一些数据。每个线程都可以独立于其他线程对数据进行处理,因此在处理过程中不需要同步,但所有线程必须在处理下一个数据项或完成后续的处理之前,要保证所有的线程都完成了它们当前的处理任务。std::experimental::barrier正是针对这种场景的。可以为同步组指定线程的数量,并为这组线程构造栅栏。当每个线程完成其处理任务时,都会到达栅栏处,并且通过调用栅栏对象的arrive_and_wait成员函数,等待小组的其他线程。当最后一个线程抵达时,所有线程将被释放,栅栏重置。组中的线程可以继续接下来的任务,或是处理下一个数据项,或是进入下一个处理阶段。
锁存器一旦就绪就会保持状态,不会有释放等待线程、重置、复用的过程。栅栏也只能用于一组线程内的同步——除非组中只有一个线程,否则无法等待栅栏就绪。可以通过显式调用栅栏对象的arrive_and_drop成员函数让线程退出组,这样就不用再受栅栏的约束,所以下一个周期到达的线程数就必须要比当前周期到达的线程数少一个了。
result_chunk process(data_chunk);
std::vector<data_chunk>
divide_into_chunks(data_block data, unsigned num_threads);
void process_data(data_source &source, data_sink &sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
std::experimental::barrier sync(num_threads);
std::vector<joining_thread> threads(num_threads);
std::vector<data_chunk> chunks;
result_block result;
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) { // 6
if (!i) { // 1
data_block current_block =
source.get_next_data_block();
chunks = divide_into_chunks(
current_block, num_threads);
}
sync.arrive_and_wait(); // 2
result.set_chunk(i, num_threads, process(chunks[i])); // 3
sync.arrive_and_wait(); // 4
if (!i) { // 5
sink.write_data(std::move(result));
}
}
});
}
} // 7
并发技术扩展规范不止提供了一种栅栏,与std::experimental::barrier相同, std::experimental::flex_barrier这个类型的栅栏更加的灵活。灵活之处在于,栅栏拥有完成阶段,一旦参与线程集中的所有线程都到达同步点,则由参与线程之一去执行完成阶段。
8. std::experimental::flex_barrier—std::experimental::barrier's flexible friend
std::experimental::flex_barrier 与std::experimental::barrier有一点不同:有一个额外的构造函数,需要传入一个完整的函数和线程数量。当所有线程都到达栅栏处,那么这个函数就由其中一个线程运行。其不仅指定了串行代码的运行方式,还提供了一种修改下一个周期到达栅栏处线程个数的方式。对于线程的计数可以修改成任何数字,无论这个数字比当前数字高或低。这样,开发者就能确定下一次到达栅栏处的线程数量了。
void process_data(data_source &source, data_sink &sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
std::vector<data_chunk> chunks;
auto split_source = [&] { // 1
if (!source.done()) {
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
};
split_source(); // 2
result_block result;
std::experimental::flex_barrier sync(num_threads, [&] { // 3
sink.write_data(std::move(result));
split_source(); // 4
return -1; // 5
});
std::vector<joining_thread> threads(num_threads);
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) { // 6
result.set_chunk(i, num_threads, process(chunks[i]));
sync.arrive_and_wait(); // 7
}
});
}
}
sync对象的类型为 std::experimental::flex_barrier,并且需要将一个完整的函数和线程数量对实例进行构造3。该函数会在所有线程抵达栅栏处的时候,运行在0号线程上,然后由0号线程调用Lambda表达式对数据进行拆分,当拆分结束后,下一轮迭代开始4。返回值-1表示线程数目保持不变,返回值为0或其他数值则指定的是下一个周期中参与迭代的线程数量。
参考:
[1] Anthony Williams - C++ Concurrency in Action-Manning Publications (2019).
[2] https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019