第八章 设计并发代码
数据划分工作
在处理开始前在线程间划分数据方面,C++与MPI或OpenMP的方式较为相似.
一个任务被分成一个并行任务集,工作的线程独立运行这些任务.并且在最后的化简步骤中合并这些结果.
尽管这种方法是很有效的,但是只有在数据可以实现划分时,才可如此.
考虑这样一种情景:快速排序算法有两个基本步骤,基于一个关键值将数据划分为两部分,一部分在关键值之前,一部分在关键值之后,然后递归地排序这两个部分.
这种情境下,将无法通过预先划分数据来实现并行. 此时应该通过递归划分总体任务.
下面给出一个使用待排序块栈的并行快速排序:
#include <iostream>
#include <thread>
#include <future>
#include <atomic>
#include <algorithm>
#include <vector>
#include <stack>
#include <list>
template<typename T>
struct sorter {
struct chunk_to_sort {
std::list<T>data;
std::promise<std::list<T>>inner_promise;
};
unsigned const max_thread_count;
std::stack<chunk_to_sort>chunks;
std::vector<std::thread>threads;
std::atomic<bool>end_of_data;
sorter() :
max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false) { return; }
~sorter()
{
end_of_data = true;
for (unsigned i = 0; i < threads.size(); i++)
if(threads[i].joinable())
threads[i].join();
return;
}
void sort_chunk(std::shared_ptr<chunk_to_sort>const& chunk)
{
chunk->inner_promise.set_value(do_sort(chunk->data));
return;
}
void try_sort_chunk()
{
std::shared_ptr<chunk_to_sort>chunk = std::make_shared<chunk_to_sort>(std::move(chunks.top()));
chunks.pop();
if (chunk)
sort_chunk(chunk);
return;
}
void sort_thread()
{
while (!end_of_data) {
try_sort_chunk();
std::this_thread::yield();
}
return;
}
std::list<T>do_sort(std::list<T>& chunk_data)
{
if (chunk_data.empty())
return chunk_data;
std::list<T>result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
T const& partition_var = *(result.begin());
typename std::list<T>::iterator divide_point = std::partition(chunk_data.begin(),
chunk_data.end(),
[&](T const& val) {return val < partition_var; }
);
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data, chunk_data.begin(), divide_point);
std::future<std::list<T>>new_lower = new_lower_chunk.inner_promise.get_future();
chunks.push(std::move(new_lower_chunk));
if (threads.size() < max_thread_count)
threads.push_back(std::thread(&sorter<T>::sort_thread, this));
std::list<T>new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
while (new_lower.wait_for(std::chrono::seconds(0)) != std::future_status::ready)
try_sort_chunk();
result.splice(result.begin(), new_lower.get());
return result;
}
};
template<typename T>
std::list<T>parallel_quick_sort(std::list<T>input)
{
if (input.empty())
return input;
sorter<T>s;
return s.do_sort(input);
}
int main()
{
std::list<double>datas, results;
double temp_data;
for (int i = 0; i < 10; i++) {
std::cin >> temp_data;
datas.push_back(temp_data);
}
results = parallel_quick_sort(datas);
for (auto iter : results)
std::cout << iter << std::endl;
return 0;
}
其中,do_sort中new_higher通过不断分治,将每次所得前半段保留在本线程,而将后半段的任务作为块压入栈中供其他线程进行处理.
try_sort_chunk()函数即用于处理块栈中的任务,其存在于sort_thread()与do_sort()中,体现了一定的线程池思想.
以任务类型划分工作
然而,通过给每个线程分配不同数据块在线程间划分工作仍然是基于线程会对每个数据块做同样工作的假设.
划分工作的另一种方法是使得线程变得专业化,即 不同线程执行不同任务.
这种划分工作的方式源自于将并发中的关注点分离.每个线程都有不同的任务,并且独立于别的线程来工作.
然而,多线程关键点分离主要有两个危害,首先是有肯能分离错误的关键点,导致大量线程间共享数据.其次是有可能的死锁,不同的线程都以等待彼此作为结束.
这两种情况都可以总结为线程间存在过多通信.
如果你的任务是由很多独立数据项运行同样的操作序列组成的话,就可以使用 管道 来开发系统可能的并发性.
数据通过一系列操作从一端流入,并且从另一端流出.
为了用这种方式划分工作,你在管道的每个步骤都创造一个独立的线程:即序列中的每个操作都有一个线程.当操作完成时,数据元素被放入队列中供下一个线程获得.
使用管道处理并行数据,处理整个分批会花费更长的时间,但是更为平滑有规律.
影响并发代码性能的因素
处理器的数量与结构 是多线程程序的性能首要和关键因素.
如果线程数量超过硬件线程数导致算力浪费,那么称其为 过度订阅.
而即使你已经考虑了程序中所有运行的线程,你仍然会被其他同时运行的程序影响.
如果两个线程同时在不同的处理器上运行,它们同时读取同样的数据通常不会有问题.
但是,如果其中一个线程修改了数据,这个修改需要花费时间传播到另一个处理器的缓存.
如果存在一个处理器已经准备好更新一个值,而另一个处理器已经在做了而导致等待改动传播的情况,那么这种情况称为 高竞争(high contention).如果处理器很少需要互相等待,则称为低竞争(low contention).
- 乒乓缓存: 在存在高竞争的循环中,共享数据在各处理器的缓存间来回传递,被称为 乒乓缓存(cache ping-pong).
为了解决乒乓缓存问题,最有效的方式是尽可能避免两个线程竞争同一个内存位置.
- 伪共享: 处理器缓存的最小单位通常不是一个内存地址,而是一小块被称为 缓存行(cache line) 的内存,这些内存块一般大小32~64字节,因而有可能存在缓存行由线程共享而其中数据不共享导致的复杂所有权问题.这就是 伪共享.
伪共享问题可能导致潜在的乒乓缓存问题,这是我们所不希望看到的.
为了减少伪共享所产生的影响,我们应当在划分时尽量让同一个操作相邻的数据.
并行算法中的异常安全
异常安全是好的C++代码的一个基本方面.
而并行算法通常比普通算法需要考虑更多关于异常方面的问题.
如果线性算法中的操作抛出异常,该算法只需要确保它能够处理好以避免资源泄露及破碎的不变量.
而在并行算法程序中,如果一个函数产生大量异常,那么该应用就会被终止.
我们现在来看一个例子:
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first, Iterator last, T result)
{
result = std::accumulate(first, last, result());
return;
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<T>results(num_threads);
std::vector<std::thread>threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(accumulate_block<Iterator, T>(),
block_start,
block_end,
std::ref(results[i])
);
block_start = block_end;
}
results[num_threads - 1] = std::accumulate(block_start, last, results[num_threads - 1]);
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
return std::accumulate(results.begin(), results.end(), init);
}
const int NUM = 1234567;
const int TURNS = 100;
int main()
{
for (int j = 0; j < TURNS; j++) {
std::chrono::steady_clock::time_point begin_time;
std::vector<unsigned long>datas;
unsigned long temp_data;
double run_time;
begin_time = std::chrono::steady_clock::now();
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
temp_data = parallel_accumulate(datas.begin(),
datas.end(),
0.0
);
run_time = (std::chrono::steady_clock::now() - begin_time).count() / 1e9;
std::cout << "data sum:" << temp_data << std::endl;
std::cout << "run_time:" << run_time << "s" << std::endl;
std::cout << "turn:" << j + 1 << std::endl;
}
return 0;
}
这个例子为我们提供了一个并行版本的std::accumulate()函数.然而,它并不是异常安全的.
在其中,std::thread,std::accumulate与accumulate_block()都有可能出现异常.
于是,针对上面所述问题,对代码进行修改:
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
template<typename Iterator,typename T>
struct accumulate_block
{
T operator()(Iterator first, Iterator last)
{
return std::accumulate(first, last, T());
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<T>>futures(num_threads - 1);
std::vector<std::thread>threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<T(Iterator, Iterator)>the_task((accumulate_block<Iterator, T>()));
futures[i] = the_task.get_future();
threads[i] = std::thread(std::move(the_task), block_start, block_end);
block_start = block_end;
}
T last_result = std::accumulate(block_start, last, T());
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
T result = init;
for (unsigned long i = 0; i < (num_threads - 1); i++)
result = result + futures[i].get();
result = result + last_result;
return result;
}
const int NUM = 1234567;
const int TURNS = 100;
int main()
{
for (int j = 0; j < TURNS; j++) {
std::chrono::steady_clock::time_point begin_time;
std::vector<unsigned long>datas;
unsigned long temp_data;
double run_time;
begin_time = std::chrono::steady_clock::now();
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
temp_data = parallel_accumulate(datas.begin(),
datas.end(),
0.0
);
run_time = (std::chrono::steady_clock::now() - begin_time).count() / 1e9;
std::cout << "data sum:" << temp_data << std::endl;
std::cout << "run_time:" << run_time << "s" << std::endl;
std::cout << "turn:" << j + 1 << std::endl;
}
return 0;
}
第一个改变:函数调用accumulate_block直接返回结果,而不是返回存储地址应用.使用std::packaged_task与std::future来保证异常安全.
第二个改变:使用std::vector<std::future<T>>来管理future.当运行任务时,future将捕获结果与异常.
如此,如果多于一个工作线程抛出异常,只有一个异常会被传播.
再考虑到std::thread可能没被join()的可能,于是再给出一个例子:
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
template<typename Iterator,typename T>
struct accumulate_block
{
T operator()(Iterator first, Iterator last)
{
return std::accumulate(first, last, T());
}
};
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) { return; }
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<T>>futures(num_threads - 1);
std::vector<std::thread>threads(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<T(Iterator, Iterator)>the_task((accumulate_block<Iterator, T>()));
futures[i] = the_task.get_future();
threads[i] = std::thread(std::move(the_task), block_start, block_end);
block_start = block_end;
}
T last_result = std::accumulate(block_start, last, T());
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
T result = init;
for (unsigned long i = 0; i < (num_threads - 1); i++)
result = result + futures[i].get();
result = result + last_result;
return result;
}
const int NUM = 1234567;
const int TURNS = 100;
int main()
{
for (int j = 0; j < TURNS; j++) {
try {
std::chrono::steady_clock::time_point begin_time;
std::vector<unsigned long>datas;
unsigned long temp_data;
double run_time;
begin_time = std::chrono::steady_clock::now();
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
temp_data = parallel_accumulate(datas.begin(),
datas.end(),
0.0
);
run_time = (std::chrono::steady_clock::now() - begin_time).count() / 1e9;
std::cout << "data sum:" << temp_data << std::endl;
std::cout << "run_time:" << run_time << "s" << std::endl;
std::cout << "turn:" << j + 1 << std::endl;
}
catch (...) {
std::cout << "something wrong happened" << std::endl;
}
}
return 0;
}
其中使用join_threads类对线程进行管理,同时补充了try-catch语句块.
下面是一个通过std::async()实现的并行版本std::accumulate()
#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <numeric>
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
unsigned long const max_chunk_size = 25;
if (length <= max_chunk_size)
return std::accumulate(first, last, init);
else {
Iterator mid_point = first;
std::future<T>first_half_result = std::async(
parallel_accumulate<Iterator, T>,
first,
mid_point,
init
);
T second_half_result = parallel_accumulate(mid_point, last, T());
return first_half_result.get() + second_half_result;
}
}
const int NUM = 25;
const int TURNS = 100;
int main()
{
for (int j = 0; j < TURNS; j++) {
try {
std::chrono::steady_clock::time_point begin_time;
std::vector<unsigned long>datas;
unsigned long temp_data;
double run_time;
begin_time = std::chrono::steady_clock::now();
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
temp_data = parallel_accumulate(datas.begin(),
datas.end(),
0.0
);
run_time = (std::chrono::steady_clock::now() - begin_time).count() / 1e9;
std::cout << "data sum:" << temp_data << std::endl;
std::cout << "run_time:" << run_time << "s" << std::endl;
std::cout << "turn:" << j + 1 << std::endl;
}
catch (...) {
std::cout << "something wrong happened" << std::endl;
}
}
return 0;
}
这个版本通过递归方式将工作划分为多个异步任务执行.
很显然,这是异常安全的.但是,也同样很显然,当NUM数量略微变大,其就发生了栈溢出.这是因为未使用任务栈导致的.
用并发提高响应性
很多现代图形用户接口框架是事件驱动的,使用者通过键盘输入或移动鼠标在用户接口上执行操作,产生一系列事件或消息,而稍后应用就会处理它.
为了确保所有事件与消息被正确处理,通常应用都有下面所示的一个事件循环:
while(true)
{
event_data event=get_event();
if(event.type==quit)
break;
process(event);
}
显然,API的细节是不同的,但是结构通常是一样的,等待一个事件,处理它,然后等待下一个事件.
通过用并发分离关注点,可以将长任务放到一个新线程上执行,并且用一个专用的GUI线程来处理时间.
我们把第四章我们所学的内容拿出来重新学习:
#include <iostream>
#include <format>
#include <random>
#include <thread>
#include <mutex>
#include <future>
#include <vector>
#include <deque>
#include <windows.h>
#include "include/graphics.h"
#pragma comment(lib,"graphics64.lib")
#define PRESSED(nVirtKey) ((GetKeyState(nVirtKey) & (1<<(sizeof(SHORT)*8-1))) != 0)
#define TOGGLED(nVirtKey) ((GetKeyState(nVirtKey) & 1) != 0)
typedef struct point_2d {
public:
double x, y, r;
}point_2d;
std::mutex lk;
std::deque<std::packaged_task<void(point_2d)>>tasks;
std::deque<point_2d>datas;
void mainGuiTask()
{
ege::initgraph(640, 480);
ege::setcaption(L"parallel draw");
ege::setbkcolor(ege::BLACK);
ege::setcolor(ege::LIGHTGREEN);
for (; ege::is_run(); delay_fps(60)) {
ege::cleardevice();
std::packaged_task<void(point_2d)>gui_task;
point_2d task_data;
{
std::lock_guard<std::mutex>guard(lk);
if (tasks.empty())
continue;
gui_task = std::move(tasks.front());
tasks.pop_front();
task_data = std::move(datas.front());
datas.pop_front();
}
gui_task(task_data);
}
ege::closegraph();
return;
}
template<typename Func>
std::future<void>postTask(Func func)
{
std::packaged_task<void(point_2d)>post_task(func);
std::future<void>res = post_task.get_future();
std::lock_guard<std::mutex>guard(lk);
tasks.push_back(std::move(post_task));
return res;
}
void mainVKboardTask()
{
std::uniform_real_distribution<double> u_x(0.0, 640.0);
std::uniform_real_distribution<double> u_y(0.0, 480.0);
std::uniform_real_distribution<double> u_r(50.0, 150.0);
std::default_random_engine engine(time(0));
point_2d temp_point;
POINT cursor;
double temp_x, temp_y, temp_r;
for (; ege::is_run();) {
if (PRESSED(32)) {
datas.push_front(temp_point);
postTask(
[](point_2d ilist){ ege::circle(
ilist.x,
ilist.y,
ilist.r
);
return;
}
);
GetCursorPos(&cursor);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
temp_x = u_x(engine);
temp_y = u_y(engine);
temp_r = u_r(engine);
temp_point = { temp_x,temp_y,temp_r };
}
return;
}
int main()
{
std::thread gui_thread(mainGuiTask);
std::thread mouse_thread(mainVKboardTask);
gui_thread.join();
mouse_thread.join();
return 0;
}
这个程序便为我们揭示了一般多线程GUI程序的设计.
在实践中中设计并发代码
std::for_each的并行实现
std::for_each在概念上很简单,轮流在范围内的每个元素上调用用户所提供的函数.
为了实现并行版本,只需要将范围划分为集合分配到每个线程上处理.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
{
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<void>>futures(num_threads - 1);
std::vector<std::thread>threads(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void(void)>task(
[=]() {
std::for_each(block_start, block_end, f);
return;
}
);
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task));
block_start = block_end;
}
std::for_each(block_start, last, f);
for (unsigned long i = 0; i < (num_threads - 1); i++)
futures[i].get();
return;
}
std::mutex lk;
void test_func(int data)
{
std::lock_guard<std::mutex>guard(lk);
std::cout << "the data is:" << data << std::endl;
std::cout << "from thread:" << std::this_thread::get_id() << std::endl;
return;
}
const int SIZE = 1000;
int main()
{
std::vector<int>datas;
for (int i = 0; i < SIZE; i++)
datas.push_back(i);
parallel_for_each(datas.begin(), datas.end(), test_func);
return 0;
}
上面便是并行后的std::for_each函数.
然而,通过std::async递归划分任务,其代码量还可进一步减少.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
{
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
if (length < (2 * min_per_thread))
std::for_each(first, last, f);
else {
Iterator const mid_point = first + length / 2;
std::future<void>first_half = std::async(¶llel_for_each<Iterator, Func>,
first,
mid_point,
f
);
parallel_for_each(mid_point, last, f);
first_half.get();
}
return;
}
std::mutex lk;
void test_func(int data)
{
std::lock_guard<std::mutex>guard(lk);
std::cout << "the data is:" << data << std::endl;
std::cout << "from thread:" << std::this_thread::get_id() << std::endl;
return;
}
const int SIZE = 1000;
int main()
{
std::vector<int>datas;
for (int i = 0; i < SIZE; i++)
datas.push_back(i);
parallel_for_each(datas.begin(), datas.end(), test_func);
return 0;
}
这种通过std::async划分的方式还是很方便的.
std::find的并行实现
std::find是下一个考虑的有用的算法,因为它是不用处理完所有元素就可以完成的几个算法之一.
std::find只需要范围内第一个元素符合搜索准则便不需要再检查其他元素.
于是,为了满足这种需求,我们需要在找到符合要求的元素后中断其他线程.
为了找到中断条件,一种方法是通过一个原子变量作为一个标志,并在处理完每个元素后检查这个标志.
而关于如何返回值和传递异常有两个选择:可以使用future数组或者std::packaged_task来转移值和异常,然后在主线程中处理返回的结果;或者使用std::promise来从工作线程中直接设置最终结果.
在std::find的并行实现中,std::promise更符合要求.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match)
{
struct find_element
{
void operator()(Iterator begin,
Iterator end,
MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag)
{
try {
for (; (begin != end) && !done_flag->load(); begin++)
if (*begin == match) {
result->set_value(begin);
done_flag->store(true);
return;
}
}
catch (...) {
try {
result->set_exception(std::current_exception());
done_flag->store(true);
}
catch (...) {}
}
}
};
unsigned long const length = std::distance(first, last);
if (!length)
return last;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::promise<Iterator>result;
std::atomic<bool>done_flag(false);
std::vector<std::thread>threads(num_threads - 1);
{
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(find_element(), block_start, block_end, match, &result, &done_flag);
block_start = block_end;
}
find_element()(block_start, last, match, &result, &done_flag);
}
if (!done_flag.load())
return last;
return result.get_future().get();
}
const int NUM = 100;
int main()
{
std::vector<double>datas;
double temp_data;
for (int i = 0; i < 100; i++) {
temp_data = sqrt(i);
datas.push_back(temp_data);
}
auto iter = parallel_find(datas.begin(),
datas.end(),
sqrt(15)
);
std::cout << "try to find:" << *iter << std::endl;
std::cout << "the index is:" << (int)(iter - datas.begin()) << std::endl;
return 0;
}
同样的,上面的std::find算法也存在一个std::async的版本
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType match, std::atomic<bool>& done)
{
try {
unsigned long const length = std::distance(first, last);
unsigned long const min_per_threads = 25;
if (length < (2 * min_per_threads)) {
for (; (first != last) && !done.load(); first++)
if (*first == match) {
done = true;
return first;
}
return last;
}
else {
Iterator const mid_point = first + (length / 2);
std::future<Iterator>async_result = std::async(¶llel_find<Iterator,MatchType>, mid_point, last, match, std::ref(done));
Iterator const direct_result = parallel_find(first, mid_point, match, done);
return (direct_result == mid_point) ? async_result.get() : direct_result;
}
}
catch (...) {
done = true;
throw;
}
}
const int NUM = 100;
int main()
{
std::vector<double>datas;
std::atomic<bool>done;
double temp_data;
for (int i = 0; i < 100; i++) {
temp_data = sqrt(i);
datas.push_back(temp_data);
}
auto iter = parallel_find(datas.begin(),
datas.end(),
sqrt(15),
done
);
std::cout << "try to find:" << *iter << std::endl;
std::cout << "the index is:" << (int)(iter - datas.begin()) << std::endl;
return 0;
}
在使用std::async版本的参数中,多了一个std::atomic<bool>的flag.
这是因为在递归划分任务时,每一个线程都需要共享这个flag.
std::partial_sum的并行实现
std::partial_sum计算了一个范围内的总和.因此每个元素都被这个元素及它先前元素的和所代替.
一种用来决定范围内部分和的方法就是计算独立块的部分和,然后将第一个块中计算得到的最后一个元素的值加到下一个块的元素,并以此类推.
同原始划分成块一样,也可以并行加上前一个块的部分和.如果每个块的最后一个元素首先被更新,那么当第二个线程更新下一个块的时候,第一个线程可以更新这个块中剩下的元素,并以此类推.这与我们前面所学的管道相似.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator,typename ValueType>
void parallel_partial_sum(Iterator first, Iterator last,ValueType temp)
{
unsigned long const length = std::distance(first, last);
if (!length)
return;
struct process_chunk
{
void operator()(Iterator begin,
Iterator last,
std::future<ValueType>* previous_end_value,
std::promise<ValueType>* end_value)
{
try {
Iterator end = last;
++end;
std::partial_sum(begin, end, begin);
if (previous_end_value) {
ValueType& addend = previous_end_value->get();
*last += addend;
if (end_value)
end_value->set_value(*last);
std::for_each(begin,
last,
[addend]( ValueType& item) {item += addend; }
);
}
else if (end_value)
end_value->set_value(*last);
}
catch (...) {
if (end_value)
end_value->set_exception(std::current_exception());
else
throw;
}
}
};
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::thread>threads(num_threads - 1);
std::vector<std::promise<ValueType>>end_values(num_threads - 1);
std::vector<std::future<ValueType>>previous_end_values;
previous_end_values.reserve(num_threads - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); i++) {
Iterator block_last = block_start;
std::advance(block_last, block_size - 1);
threads[i] = std::thread(process_chunk(), block_start, block_last, (i != 0) ? &previous_end_values[i - 1] : 0, &end_values);
block_start = block_last;
block_start++;
previous_end_values.push_back(end_values[i].get_future());
}
Iterator final_element = block_start;
std::advance(final_element, std::distance(block_start, last) - 1);
process_chunk()(block_start, final_element, (num_threads > 1) ? &previous_end_values.back() : 0, 0);
return ;
}
const int NUM = 100;
int main()
{
std::vector<unsigned long>datas;
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
parallel_partial_sum(datas.begin(), datas.end(),datas[0]);
return 0;
}
下面是通过成对更新的partial_sum的并行实现.
由于教材所提供的std::partial_sum的两个并行例子都存在较大问题,此处仅展示教材代码.
#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <vector>
#include <numeric>
struct barrier
{
std::atomic<unsigned>count, spaces, generation;
barrier(unsigned count_) :
count(count_), spaces(count_), generation(0) { return; }
void wait()
{
unsigned const gen = generation.load();
if (!--spaces) {
spaces = count.load();
generation++;
}
else {
while (generation.load() == gen)
std::this_thread::yield();
}
return;
}
void done_waiting()
{
count--;
if (!--spaces) {
spaces = count.load();
generation++;
}
return;
}
};
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_) {
return;
}
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); i++)
if (threads[i].joinable())
threads[i].join();
return;
}
};
template<typename Iterator>
void parallel_partial_sum(Iterator first, Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_element
{
void operator()(Iterator first,
Iterator last,
std::vector<value_type>& buffer,
unsigned i,
barrier& b)
{
value_type& ith_element = *(first + i);
bool update_source = false;
for (unsigned step = 0, stride = 1; stride <= i; step++, stride *= 2) {
value_type const& source = (step % 2) ? buffer[i] : ith_element;
value_type& dest = (step % 2) ? ith_element : buffer[i];
value_type const& buffer[i - stride] : *(first + i - stride);
dest = source + addend;
update_source = !(step % 2);
b.wait();
}
if (update_source)
ith_element = buffer[i];
b.done_waiting();
}
};
unsigned long const length = std::distance(first, last);
if (length <= 1)
return;
std::vector<value_type>buffer(length);
barrier b(length);
std::vector<std::thread>threads(length - 1);
join_threads joiner(threads);
Iterator block_start = first;
for (unsigned long i = 0; i < (length - 1); i++)
threads[i] = std::thread(process_element(), first, last, std::ref(buffer), i, std::ref(b));
process_element()((first, last, buffer, length - 1, b));
return;
}
const int NUM = 10000;
int main()
{
std::vector<unsigned long>datas;
for (unsigned long i = 0; i < NUM; i++)
datas.push_back(i);
parallel_partial_sum(datas.begin(), datas.end());
for (auto iter : datas)
std::cout << iter << std::endl;
return 0;
}
标签:std,include,unsigned,第八章,long,threads,C++,多线程,block
From: https://www.cnblogs.com/mesonoxian/p/18024215