首页 > 编程语言 >C++多线程 第九章 高级线程管理

C++多线程 第九章 高级线程管理

时间:2024-02-23 13:46:39浏览次数:29  
标签:std function return thread C++ queue 线程 threads 多线程

第九章 高级线程管理


注意:本章内容由于教材本身问题,例子存在较大问题.请自行在理解基础上重新设计.

在大多数系统上面,为每个可以与其他任务并行执行的任务分配一个单独的线程是不切实际的.

但线程池允许尽量充分利用硬件提供的并发性.

在线程池帮助下,可以被并发执行的任务被提交到线程池中,在线程池中被放入一个等待队列.

每个任务都会被某个工作线程从等待队列中取出来执行.

工作线程的任务就是当空闲时从等待队列中取出任务来执行.

最简单的线程池

线程池最简单的形式是一个含有固定数量工作线程来处理任务的对象.

当有任务要处理的时候,调用一个函数将任务放到等待队列中.

每个工作线程都是从该队列中取出任务,执行完任务后继续从等待队列取出更多任务来处理.

下面是一个最简单的线程池的实现:

#include <iostream>
#include <thread>
#include <future>
#include <atomic>

#include <vector>
#include <queue>


class join_threads
{
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) :
        threads(threads_) {
        return;
    }
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); i++)
            if (threads[i].joinable())
                threads[i].join();
        return;
    }
};

class thread_pool
{
	std::atomic_bool done;
	std::queue<std::function<void()>>work_queue;
	std::vector<std::thread>threads;
    join_threads joiner;
    
    void worker_thread()
    {
        while (!done) {
            std::function<void()>task=work_queue.front();
            work_queue.pop();
            if (task)
                task();
            else
                std::this_thread::yield();
        }
        return;
    }

public:
    thread_pool() :
        done(false), joiner(threads)
    {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try {
            for (unsigned i = 0; i < thread_count; i++)
                threads.push_back(std::thread(&thread_pool::worker_thread, this));
        }
        catch (...) {
            done = true;
            throw;
        }
        return;
    }
    ~thread_pool()
    {
        done = true;
        return;
    }
    template<typename FunctionType>
    void submit(FunctionType f)
    {
        work_queue.push(std::function<void()>(f));
        return;
    }
};

为了使用线程池,只需要将需要执行的任务submit至任务队列即可.

在许多情况下,这样一个简单的线程池已经足够使用.

但是在这种情况下很可能会引发死锁等问题.

在简单情况下,使用std::async分治可能会是更好的解决方法.

等待线程池的任务

与一般并行程序不同,使用线程池之后,需要等待提交到线程池的任务结束,而不是等待工作线程.

一般并行程序基于std::async实现,而线程池中必须人为使用条件变量来实现.

通过将复杂度移到线程中,可以直接等待任务的结束.

可以让submit()函数返回一个任务句柄,利用这个句柄可以等待任务结束.

这个任务句柄包装了条件变量或者其他用来简化线程池使用的代码.

下面是如此的一个线程池的代码:

#include <iostream>

#include <thread>
#include <future>
#include <mutex>
#include <atomic>

#include <vector>
#include <queue>

class join_threads
{
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) :
        threads(threads_) {
        return;
    }
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); i++)
            if (threads[i].joinable())
                threads[i].join();
        return;
    }
};
class function_wrapper
{
    struct impl_base{
        virtual void call() = NULL;
        virtual ~impl_base() { return; }
    };
    std::unique_ptr<impl_base>impl;
    template<typename F>
    struct impl_type:impl_base
    {
        F f;
        impl_type(F&& f_) :
            f(std::move(f_)) { return; }
        void call()
        {
            f();
            return;
        }
    };
public:
    template<typename F>
    function_wrapper(F&& f) :
        impl(new impl_type<F>(std::move(f))) { return; }
    function_wrapper() = default;
    function_wrapper(function_wrapper&& other) :
        impl(std::move(other.impl)) { return; }

    void operator()()
    {
        impl->call();
        return;
    }
    function_wrapper& operator= (function_wrapper&& other)
    {
        impl = std::move(other.impl);
        return *this;
    }
    function_wrapper(const function_wrapper&) = delete;
    function_wrapper(function_wrapper&) = delete;
    function_wrapper& operator=(const function_wrapper&) = delete;
};

class thread_pool
{
    std::queue<function_wrapper>work_queue;
    std::vector<std::thread>threads;
    join_threads joiner;

    void worker_thread()
    {
        while (!done) {
            if (!work_queue.empty()) {
                function_wrapper task = std::move(work_queue.front());
                work_queue.pop();
                task();
            }
            else
                if(work_queue.empty())
                    std::this_thread::yield();
        }
    }
public:
    std::atomic_bool done;
    template<typename FunctionType>
    std::future<typename std::result_of<FunctionType()>::type>
        submit(FunctionType f)
    {
        typedef typename std::result_of<FunctionType()>::type result_type;
        std::packaged_task<result_type()>task(std::move(f));
        std::future<result_type>res(task.get_future());
        work_queue.push(std::move(task));
        return res;
    }
    thread_pool() :
        done(false), joiner(threads)
    {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try {
            for (unsigned i = 0; i < thread_count; i++)
                threads.push_back(std::thread(&thread_pool::worker_thread, this));
        }
        catch (...) {
            done = true;
            throw;
        }
        return;
    }
    ~thread_pool()
    {
        done = true;
        return;
    }
};

为了演示如何使用该线程池,下面通过一个parallel accumulate来实现.

#include <iostream>

#include <thread>
#include <future>
#include <mutex>
#include <atomic>

#include <numeric>
#include <vector>
#include <queue>

class join_threads
{
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) :
        threads(threads_) {
        return;
    }
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); i++)
            if (threads[i].joinable())
                threads[i].join();
        return;
    }
};
class function_wrapper
{
    struct impl_base{
        virtual void call() = NULL;
        virtual ~impl_base() { return; }
    };
    std::unique_ptr<impl_base>impl;
    template<typename F>
    struct impl_type:impl_base
    {
        F f;
        impl_type(F&& f_) :
            f(std::move(f_)) { return; }
        void call()
        {
            f();
            return;
        }
    };
public:
    template<typename F>
    function_wrapper(F&& f) :
        impl(new impl_type<F>(std::move(f))) { return; }
    function_wrapper() = default;
    function_wrapper(function_wrapper&& other) :
        impl(std::move(other.impl)) { return; }

    void operator()()
    {
        impl->call();
        return;
    }
    function_wrapper& operator= (function_wrapper&& other)
    {
        impl = std::move(other.impl);
        return *this;
    }
    function_wrapper(const function_wrapper&) = delete;
    function_wrapper(function_wrapper&) = delete;
    function_wrapper& operator=(const function_wrapper&) = delete;
};

class thread_pool
{
    std::queue<function_wrapper>work_queue;
    std::vector<std::thread>threads;
    join_threads joiner;

    void worker_thread()
    {
        while (!done) {
            if (!work_queue.empty()) {
                function_wrapper task = std::move(work_queue.front());
                work_queue.pop();
                task();
            }
            else
                if(work_queue.empty())
                    std::this_thread::yield();
        }
    }
public:
    std::atomic_bool done;
    virtual void run_pending_task();
    template<typename FunctionType>
    std::future<typename std::result_of<FunctionType()>::type>
        submit(FunctionType f)
    {
        typedef typename std::result_of<FunctionType()>::type result_type;
        std::packaged_task<result_type()>task(std::move(f));
        std::future<result_type>res(task.get_future());
        work_queue.push(std::move(task));
        return res;
    }
    thread_pool() :
        done(false), joiner(threads)
    {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try {
            for (unsigned i = 0; i < thread_count; i++)
                threads.push_back(std::thread(&thread_pool::worker_thread, this));
        }
        catch (...) {
            done = true;
            throw;
        }
        return;
    }
    ~thread_pool()
    {
        done = true;
        return;
    }
};
void thread_pool::run_pending_task()
{
    if (!work_queue.empty()) {
        function_wrapper task = std::move(work_queue.front());
        work_queue.pop();
        task();
    }
    else
        std::this_thread::yield();
    return;
}

template<typename Iterator, typename T>
struct accumulate_block
{
    T operator()(Iterator first, Iterator last)
    {
        return std::accumulate(first, last, T());
    }
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{   
    unsigned long const length = std::distance(first, last);
    if (!length)
        return init;
    unsigned long const block_size = 25;
    unsigned long const num_blocks = (length + block_size - 1) / block_size;

    std::vector<std::future<T>>futures(num_blocks - 1);
    thread_pool pool;

    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_blocks - 1); i++) {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);
        auto block_accumulate = [&]() -> T {
            return std::accumulate(block_start, block_end, T());
        };
        futures[i] = pool.submit(block_accumulate);
        block_start = block_end;
    }
    T last_result = accumulate_block<Iterator, T>()(block_start, last);
    T result = init;
    for (unsigned long i = 0; i < (num_blocks - 1); i++)
        result = result + futures[i].get();
    result = result + last_result;

    return result;
}

int main()
{
    std::vector<double>datas;
    double result;
    try {
        for (int i = 0; i < 10; i++)
            datas.push_back(sqrt(i));

        for (auto iter : datas)
            std::cout << iter << " " << std::ends;
        std::cout << std::endl;
        result = parallel_accumulate(datas.begin(), datas.end(), 0.0);
        std::cout << result << std::endl;
    }
    catch (...) {
        std::cout << "wow,something wrong." << std::endl;
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    return 0;
}

等待其他任务的任务

当使用线程池来管理任务列表及关联的线程时,不必通过显示访问任务列表来完成.需要做的是修改线程池结构以自动完成这个.

最简单的访问来完成这个功能的是在线程池中增加一个新的函数来执行队列中的任务以及自己管理循环.

高级线程池的实现可能会是在等待函数添加逻辑来处理这种情形,有可能是通过在等待的任务赋予优先级来解决.

下面是一个基于线程池的快速排序,这是教材所提供的内容.

值得注意的是,它与前面一章的内容一样出现了内部编译器错误.(教材这几章的错误有些多了)

#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <atomic>

#include <type_traits>
#include <numeric>
#include <algorithm>

#include <vector>
#include <list>
#include <queue>

class join_threads {
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) :
        threads(threads_) {
        return;
    }
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); i++)
            if (threads[i].joinable())
                threads[i].join();
        return;
    }
};

class function_wrapper {
    struct impl_base {
        virtual void call() = NULL;
        virtual ~impl_base() { return; }
    };
    std::unique_ptr<impl_base>impl;
    template<typename F>
    struct impl_type : public impl_base {
        F f;
        impl_type(F&& f_) :
            f(std::move(f_)) {
            return;
        }
        void call()
        {
            f();
            return;
        }
    };
public:
    template<typename F>
    function_wrapper(F&& f) :
        impl(new impl_type<F>(std::move(f))) {
        return;
    }
    function_wrapper() = default;
    function_wrapper(function_wrapper&& other) :
        impl(std::move(other.impl)) {
        return;
    }

    void operator()()
    {
        impl->call();
        return;
    }
    function_wrapper& operator= (function_wrapper&& other)
    {
        impl = std::move(other.impl);
        return *this;
    }
    function_wrapper(const function_wrapper&) = delete;
    function_wrapper(function_wrapper&) = delete;
    function_wrapper& operator=(const function_wrapper&) = delete;
};

class thread_pool {
    std::queue<function_wrapper>work_queue;
    std::vector<std::thread>threads;
    join_threads joiner;

    void worker_thread()
    {
        while (!done) {
            if (!work_queue.empty()) {
                function_wrapper task = std::move(work_queue.front());
                work_queue.pop();
                task();
            }
            else
                if (work_queue.empty())
                    std::this_thread::yield();
        }
    }
public:
    std::atomic_bool done;
    virtual void run_pending_task();
    template<typename FunctionType>
    std::future<typename std::invoke_result<FunctionType()>::type>
        submit(FunctionType f)
    {
        typedef typename std::result_of<FunctionType()>::type result_type;
        std::packaged_task<result_type()>task(std::move(f));
        std::future<result_type>res(task.get_future());
        work_queue.push(std::move(task));
        return res;
    }
    thread_pool() :
        done(false), joiner(threads)
    {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try {
            for (unsigned i = 0; i < thread_count; i++)
                threads.push_back(std::thread(&thread_pool::worker_thread, this));
        }
        catch (...) {
            done = true;
            throw;
        }
        return;
    }
    ~thread_pool()
    {
        done = true;
        return;
    }
};

void thread_pool::run_pending_task()
{
    if (!work_queue.empty()) {
        function_wrapper task = std::move(work_queue.front());
        work_queue.pop();
        task();
    }
    else
        std::this_thread::yield();
    return;
}

template<typename T>
struct sorter {
    thread_pool pool;
    std::list<T> do_sort(std::list<T>& chunk_data)
    {
        if (chunk_data.empty())
            return chunk_data;

        std::list<T> result;
        result.splice(result.begin(), chunk_data, chunk_data.begin());
        T const& partition_val = *result.begin();
        typename std::list<T>::iterator divide_point = std::partition(
            chunk_data.begin(),
            chunk_data.end(),
            [&](T const& val) {
                return val < partition_val;
            }
        );

        std::list<T> new_lower_chunk;
        new_lower_chunk.splice(new_lower_chunk.end(), chunk_data, chunk_data.begin(), divide_point);
        std::future<std::list<T>> new_lower = pool.submit(
            [this, new_lower_chunk = std::move(new_lower_chunk)]() {
                return do_sort(new_lower_chunk);
            }
        );

        std::list<T> new_higher(do_sort(chunk_data));
        result.splice(result.end(), new_higher);
        new_lower.wait();
        result.splice(result.begin(), new_lower.get());

        return result;
    }
    std::list<T> parallel_quick_sort(std::list<T> input)
    {
        if (input.empty())
            return input;
        sorter<T> s;

        return s.do_sort(input);
    }
};

int main()
{
    std::list<double>datas;
    sorter<double> temp_sorter;
    double temp_data;
    try {
        for (int i = 0; i < 10; i++) {
            std::cin >> temp_data;
            datas.push_back(temp_data);
        }

        std::cout << "before arrange" << std::endl;
        for (auto iter : datas)
            std::cout << iter << " " << std::ends;
        std::cout << std::endl;

        temp_sorter.parallel_quick_sort(datas);
        std::cout << "after arranged" << std::endl;
        for (auto iter : datas)
            std::cout << iter << " " << std::ends;
        std::cout << std::endl;
    }
    catch (...) {
        std::cout << "wow,something wrong." << std::endl;
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    return 0;
}

避免工作队列上的竞争

每次线程调用submit()时,它向单个共享工作队列添加一个新的元素.

随着处理器树木的增加,工作队列的竞争会越来越多,这回极大地降低性能.及

即使使用无锁队列,乒乓缓存仍然会导致非常耗时.

避免乒乓缓存的一个方法是在每个线程都使用一个单独的工作队列.

每个线程将新的任务添加到它自己的队列中,只有当自己队列为空的时候才从全局的工作队列中取任务.

下面便是经过了如此修改后的线程池:

class join_threads {
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) :
        threads(threads_) {
        return;
    }
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); i++)
            if (threads[i].joinable())
                threads[i].join();
        return;
    }
};

class function_wrapper {
    struct impl_base {
        virtual void call() = NULL;
        virtual ~impl_base() { return; }
    };
    std::unique_ptr<impl_base>impl;
    template<typename F>
    struct impl_type : public impl_base {
        F f;
        impl_type(F&& f_) :
            f(std::move(f_)) {
            return;
        }
        void call()
        {
            f();
            return;
        }
    };
public:
    template<typename F>
    function_wrapper(F&& f) :
        impl(new impl_type<F>(std::move(f))) {
        return;
    }
    function_wrapper() = default;
    function_wrapper(function_wrapper&& other) :
        impl(std::move(other.impl)) {
        return;
    }

    void operator()()
    {
        impl->call();
        return;
    }
    function_wrapper& operator= (function_wrapper&& other)
    {
        impl = std::move(other.impl);
        return *this;
    }
    function_wrapper(const function_wrapper&) = delete;
    function_wrapper(function_wrapper&) = delete;
    function_wrapper& operator=(const function_wrapper&) = delete;
};

class thread_pool
{
    std::queue<function_wrapper>pool_work_queue;
    typedef std::queue<function_wrapper>local_queue_type;
    static thread_local std::unique_ptr<local_queue_type>local_work_queue;
    std::queue<function_wrapper>work_queue;
    std::vector<std::thread>threads;
    join_threads joiner;

    void worker_thread()
    {
        local_work_queue.reset(new local_queue_type);
        while (!done)
            run_pending_task();
    }
public:
    std::atomic_bool done;
    template<typename FunctionType>
    std::future<typename std::invoke_result<FunctionType()>::type>submit(FunctionType f)
    {
        typedef typename std::invoke_result<FunctionType()>::type result_type;
        std::packaged_task<result_type()>task(f);
        std::future<result_type>res(task.get_future());
        if (local_work_queue)
            local_work_queue->push(std::move(task));
        else
            pool_work_queue.push(std::move(task));
        return res;
    }
    void run_pending_task()
    {
        function_wrapper task;
        if (local_work_queue && !local_work_queue->empty()) {
            task = std::move(local_work_queue->front());
            local_work_queue->pop();
            task();
        }
        else if (!pool_work_queue.empty()) {
            task = std::move(pool_work_queue.front());
            pool_work_queue.pop();
            task();
        }
        else
            std::this_thread::yield();
    }

    thread_pool() :
        done(false), joiner(threads)
    {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try {
            for (unsigned i = 0; i < thread_count; i++)
                threads.push_back(std::thread(&thread_pool::worker_thread, this));
        }
        catch (...) {
            done = true;
            throw;
        }
        return;
    }
    ~thread_pool()
    {
        done = true;
        return;
    }
};

使用本地队列可以很好地降低对全局队列的竞争,但是任务分布不均衡可能导致效率降低.

这样就引出了工作窃取.应当允许线程在其他私有队列中窃取工作.

工作窃取

为了允许一个空间的线程执行其他线程上的任务,每个工作线程的私有队列必须在run_pending_task中窃取任务的时候可以被访问到.

这要求每个工作线程将自己的私有任务队列向线程池注册,或每个线程都被线程池分配一个工作队列.

此外,必须保证工作队列中的数据被适当的同步与保护.

下面是一个支持工作窃取的线程池例子,与上面的例子一样,其由教材提供并且存在大量错误.

#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <atomic>

#include <type_traits>
#include <numeric>
#include <functional>

#include <algorithm>
#include <vector>
#include <list>
#include <queue>

class join_threads {
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) :
        threads(threads_) {
        return;
    }
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); i++)
            if (threads[i].joinable())
                threads[i].join();
        return;
    }
};

class function_wrapper {
    struct impl_base {
        virtual void call() = NULL;
        virtual ~impl_base() { return; }
    };
    std::unique_ptr<impl_base>impl;
    template<typename F>
    struct impl_type : public impl_base {
        F f;
        impl_type(F&& f_) :
            f(std::move(f_)) {
            return;
        }
        void call()
        {
            f();
            return;
        }
    };
public:
    template<typename F>
    function_wrapper(F&& f) :
        impl(new impl_type<F>(std::move(f))) {
        return;
    }
    function_wrapper() = default;
    function_wrapper(function_wrapper&& other) :
        impl(std::move(other.impl)) {
        return;
    }

    void operator()()
    {
        impl->call();
        return;
    }
    function_wrapper& operator= (function_wrapper&& other)
    {
        impl = std::move(other.impl);
        return *this;
    }
    function_wrapper(const function_wrapper&) = delete;
    function_wrapper(function_wrapper&) = delete;
    function_wrapper& operator=(const function_wrapper&) = delete;
};

template<typename T>
class thread_safe_queue
{
private:
    struct node {
        std::shared_ptr<T>data;
        std::unique_ptr<node>next;
    };
    std::mutex head_mutex;
    std::unique_ptr<node>head;
    std::mutex tail_mutex;
    node* tail;
    std::condition_variable data_cond;
    node* get_tail()
    {
        std::lock_guard<std::mutex>tail_lock(tail_mutex);
        return tail;
    }
    std::unique_ptr<node>pop_head()
    {
        std::unique_ptr<node>old_head = std::move(head);
        head = std::move(old_head->next);
        return old_head;
    }
    std::unique_ptr<node>try_pop_head()
    {
        std::lock_guard<std::mutex>head_lock(head_mutex);
        if (head.get() == get_tail())
            return std::unique_ptr<node>();
        return pop_head();
    }
    std::unique_ptr<node>try_pop_head(T& value)
    {
        std::lock_guard<std::mutex>head_lock(head_mutex);
        if (head.get() == get_tail())
            return std::unique_ptr<node>();
        value = std::move(*head->data);
        return pop_head();
    }
public:
    thread_safe_queue() :
        head(new node), tail(head.get()) { return; }
    thread_safe_queue(const thread_safe_queue& other) = delete;
    thread_safe_queue& operator=(const thread_safe_queue& other) = delete;
    void push(T new_value)
    {
        std::shared_ptr<T>new_data(std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node>p(new node);
        {
            std::lock_guard<std::mutex>tail_lock(tail_mutex);
            tail->data = new_data;
            node* const new_tail = p.get();
            tail->next = std::move(p);
            tail = new_tail;
        }
        data_cond.notify_one();
        return;
    }
    std::shared_ptr<T>try_pop()
    {
        std::unique_ptr<node>old_head = try_pop_head();
        return old_head ? old_head->data : std::shared_ptr<T>();            
    }
    bool try_pop(T& value)
    {
        std::unique_ptr<node>const old_head = try_pop_head(value);
        return old_head ? true : false;
    }
    void empty()
    {
        std::lock_guard<std::mutex>head_lock(head_mutex);
        return (head.get() == get_tail());
    }
};

class work_stealing_queue
{
private:
    typedef function_wrapper data_type;
    std::deque<data_type>the_queue;
    mutable std::mutex the_mutex;
public:
    work_stealing_queue() { return; }
    work_stealing_queue(const work_stealing_queue& other) = delete;
    work_stealing_queue& operator=(const work_stealing_queue& other) = delete;

    void push(data_type data)
    {
        std::lock_guard<std::mutex>lock(the_mutex);
        the_queue.push_front(std::move(data));
    }
    bool empty()const
    {
        std::lock_guard<std::mutex>lock(the_mutex);
        return the_queue.empty();
    }
    bool try_pop(data_type& res)
    {
        std::lock_guard<std::mutex>lock(the_mutex);
        if (the_queue.empty())
            return false;
        res = std::move(the_queue.front());
        the_queue.pop_front();
        return true;
    }
    bool try_steal(data_type& res)
    {
        std::lock_guard<std::mutex>lock(the_mutex);
        if (the_queue.empty())
            return false;
        res = std::move(the_queue.back());
        the_queue.pop_back();
        return true;
    }
};
class thread_pool
{
    typedef function_wrapper task_type;
    std::atomic_bool done;
    thread_safe_queue<task_type>pool_work_queue;
    std::vector<std::unique_ptr<work_stealing_queue>>queues;
    std::vector<std::thread>threads;
    join_threads joiner;

    static thread_local work_stealing_queue* local_work_queue;
    static thread_local unsigned my_index;
   
    void worker_thread(unsigned my_index_)
    {
        my_index = my_index_;
        local_work_queue = queues[my_index].get();
        while (!done)
            run_pending_task();
        return;
    }
    bool pop_task_from_local_queue(task_type& task)
    {
        return local_work_queue && local_work_queue->try_pop(task);
    }
    bool pop_task_from_pool_queue(task_type& task)
    {
        return pool_work_queue.try_pop(task);
    }
    bool pop_task_from_other_thread_queue(task_type& task)
    {
        for (unsigned i = 0; i < queues.size(); i++) {
            unsigned const index = (my_index + i + 1) % queues.size();
            if (queues[index]->try_steal(task))
                return true;
            return false;
        }
    }
public:
    thread_pool() :
        done(false), joiner(threads)
    {
        unsigned const thread_count = std::thread::hardware_concurrency();
        try {
            for (unsigned i = 0; i < thread_count; i++) {
                queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue));
                threads.push_back(std::thread(&thread_pool::worker_thread, this, i));
            }
        }
        catch (...) {
            done = true;
            throw;
        }
    }
    ~thread_pool()
    {
        done = true;
    }

    template<typename FunctionType>
    std::future<typename std::invoke_result<FunctionType()>::type>submit(FunctionType f)
    {
        typedef typename std::invoke_result<FunctionType()>::type result_type;

        std::packaged_task<result_type()>task(f);
        std::future<result_type>res(task.get_future());
        if (local_work_queue)
            local_work_queue->push(std::move(task));
        else
            pool_work_queue.push(std::move(task));
        return res;
    }
    void run_pending_task()
    {
        task_type task;
        if (pop_task_from_local_queue(task) ||
            pop_task_from_pool_queue(task) ||
            pop_task_from_other_thread_queue(task))
            task();
        else
            std::this_thread::yield();
        return;
    }
};

void test_func()
{
    std::cout << "this is from:" << std::this_thread::get_id() << std::endl;
    return;
}

int main()
{
    thread_pool pool;
    try {
        for (int i = 0; i < 10; i++)
            pool.submit([&]() { test_func(); });
    }
    catch (const std::exception& e) {
        std::cerr << "An exception occurred: " << e.what() << '\n';
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    return 0;
}

到此,不得不说这本书这一章简直处处都是灾难.

也不知道本书第二版中是否将这些错误修正.

中断线程

在许多场景中,向一个长时间运行的线程发出一个信号告诉线程停止执行是一个可能的行为.

这有可能是因为线程是一个工作线程,而线程池正在被销毁,或者是因为线程正在执行的工作被用户取消了.

不管是什么原因,其基本思想是一样的,你需要从一个线程发送一个信号告诉另一个线程应该停止运行.

而且,你同样需要让线程适当地结束而不是简单地退出而造成线程池不一致的状态.

启动和中断另一个线程

通过flag的方式,你可以设计指定的中断点.这是通过std::future实现的.

class interrupt_flag
{
public:
    void set();
    bool is_set()const;
};
thread_local interrupt_flag this_thread_interrupt_flag;

class interruptible_thread
{
    std::thread internal_thread;
    interrupt_flag* flag;
public:
    template<typename FunctionType>
    interruptible_thread(FunctionType f)
    {
        std::promise<interrupt_flag*>p;
        internal_thread = std::thread([f, &p] {
            p.set_value(&this_thread_interrupt_flag),
            f();
            }
        );
        flag = p.get_future().get();
    }
    void iterrupt()
    {
        if (flag)
            flag->set();
        return;
    }
};

检测一个线程是否被中断

下面给出了一段伪代码,来演示中断点的设计.

    void interruption_point()
    {
        if (this_thread_interrupt_flag.is_set())
            throw this_thread_interrupted();
    }

中断等待条件变量

现在可以显式调用interruption_point()来检测中断了.

然而,为了一个阻塞等待的时间,需要设计一个新的函数interruptible_wait().

这个函数关于条件变量的实现如下:

class interrupt_flag
{
    std::atomic<bool>flag;
    std::condition_variable* thread_cond;
    std::mutex set_clear_mutex;
public:
    void set()
    {
        flag.store(true, std::memory_order_relaxed);
        std::lock_guard<std::mutex>lk(set_clear_mutex);
        if (thread_cond)
            thread_cond->notify_all();
        return;
    }
    bool is_set()const
    {
        return flag.load(std::memory_order_relaxed);
    }
    void set_condition_variable(std::condition_variable& cv)
    {
        std::lock_guard<std::mutex>lk(set_clear_mutex);
        thread_cond = &cv;
        return;
    }
    void clear_condition_variable()
    {
        std::lock_guard<std::mutex>lk(set_clear_mutex);
        thread_cond = 0;
        return;
    }
    struct clear_cv_on_destruct
    {
        ~clear_cv_on_destruct() 
        {
            this_thread_interrupt_flag.clear_condition_variable();
            return;
        }
    };
};
thread_local interrupt_flag this_thread_interrupt_flag;

void interruption_point()
{
    if (this_thread_interrupt_flag.is_set())
        throw "this_thread_interrupted()";
}
void interruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk)
{
    interruption_point();
    this_thread_interrupt_flag.set_condition_variable(cv);
    interrupt_flag::clear_cv_on_destruct guard;
    interruption_point();
    cv.wait_for(lk, std::chrono::milliseconds(1));
    interruption_point();
    return;
}
template<typename Predicate>
void interruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk, Predicate pred)
{

    interruption_point();
    this_thread_interrupt_flag.set_condition_variable(cv);
    interrupt_flag::clear_cv_on_destruct guard;
    while (!this_thread_interrupt_flag.is_set() && !pred())
        cv.wait_for(lk, std::chrono::milliseconds(1));
    interruption_point();
    return;
}

中断在std::condition_variable_any上的等待

std::conditon_variable可以与任何锁类型配合工作,这使得其更为灵活.

下面是使用condtion_variable的版本:

class interrupt_flag
{
    std::atomic<bool>flag;
    std::condition_variable* thread_cond;
    std::condition_variable_any* thread_cond_any;
    std::mutex set_clear_mutex;
public:
    interrupt_flag() :
        thread_cond(0), thread_cond_any(0) {
        return;
    }
    void set()
    {
        flag.store(true, std::memory_order_relaxed);
        std::lock_guard<std::mutex>lk(set_clear_mutex);
        if (thread_cond)
            thread_cond->notify_all();
        else if (thread_cond_any)
            thread_cond_any->notify_all();
        return;
    }
    bool is_set()const
    {
        return flag.load(std::memory_order_relaxed);
    }
    void set_condition_variable(std::condition_variable& cv)
    {
        std::lock_guard<std::mutex>lk(set_clear_mutex);
        thread_cond = &cv;
        return;
    }
    void clear_condition_variable()
    {
        std::lock_guard<std::mutex>lk(set_clear_mutex);
        thread_cond = 0;
        return;
    }
    struct clear_cv_on_destruct
    {
        ~clear_cv_on_destruct()
        {
            this_thread_interrupt_flag.clear_condition_variable();
            return;
        }
    };
    template<typename Lockable>
    void wait(std::condition_variable_any& cv, Lockable& lk)
    {
        struct custom_lock
        {
            interrupt_flag* self;
            Lockable& lk;
            custom_lock(interrupt_flag* self_, std::condition_variable_any& cond, Lockable& lk_) :
                self(self_), lk(lk_)
            {
                self->set_clear_mutex.lock();
                self->thread_cond_any = &cond;
                return;
            }
            ~custom_lock()
            {
                self->thread_cond_any = 0;
                self->set_clear_mutex.unlock();
                return;
            }
            void unlock()
            {
                lk.unlock();
                self->set_clear_mutex.unlock();
                return;
            }
            void lock()
            {
                std::lock(self->set_clear_mutex, lk);
                return;
            }
        };
        custom_lock cl(this, cv, lk);
        interruption_point();
        cv.wait(cl);
        interruption_point();
    }
};
thread_local interrupt_flag this_thread_interrupt_flag;

template<typename Lockable>
void interruptible_wait(std::condition_variable_any& cv, Lockable& lk)
{
    this_thread_interrupt_flag.wait(cv, lk);
    return;
}

处理中断

从被中断的线程的角度来看,一个中断只是一个thread_interrupted异常.

这可以像其他异常一样进行处理.典型的操作是可以使用一个标准的catch块捕获它.

为了避免被迫记得每个你传递到interruptible_thread中的函数中放一个catch.你可以将此catch快放到你初始化interrupt_flag的包装器中.

就像是:

internal_thread=std::thread([f,&p]{
        p.set_value(&this_thread_interrupt_flag);
        try{
            f();
        }
        catch(thread_interrupted const&){
        }
        return;
    }
);

终于,这噩梦一般的第九章到此结束,本书最核心的内容也到此结束.C++多线程系列收工.

标签:std,function,return,thread,C++,queue,线程,threads,多线程
From: https://www.cnblogs.com/mesonoxian/p/18029307

相关文章

  • 多线程系列(七) -ThreadLocal 用法及内存泄露分析
    一、简介在Javaweb项目中,想必很多的同学对ThreadLocal这个类并不陌生,它最常用的应用场景就是用来做对象的跨层传递,避免多次传递,打破层次之间的约束。比如下面这个HttpServletRequest参数传递的简单例子!publicclassRequestLocal{/***线程本地变量*/......
  • Mounriver工程转为C++(转载)
    注:文章为内部转载。1、右键要转换为C++的工程new->other,按下图选择点击next,按下图配置,点击FINSH。2、工程转换为C++之后,原有的设置都会变成默认的,需要重新添加。上图添加头文件路径。上图添加链接脚本路径。上图使用默认的桩函数。3、添加C++初始化函数,在启动文件调......
  • 一种用于多线程中间状态同步的屏障机制
    一种用于多线程中间状态同步的屏障机制为了解决在多线程环境中,需要一个内置的计数屏障对于多个线程中的某一个部分进行检查,确保所有线程均到达该点后才能继续执行。该屏障常被用于多线程流水线中的中间检查,适用于阶段分割,是一种有效的同步机制。此处构建了一个barrier类,其中arr......
  • C++动态内存分配探秘:new与malloc的关键差异及实例解析
     概述:在C++中,new和malloc均用于动态内存分配,但存在关键差异。new是C++运算符,能调用构造函数,返回类型明确;而malloc是C函数,仅分配内存,需手动类型转换。示例源代码生动演示了它们在构造函数调用和类型信息方面的不同。在C++中,new 和 malloc 都用于动态内存分配,但它们之间......
  • C++强制类型转换详解:四种操作符解析与实例演示
     概述:C++中的强制类型转换是实现数据类型间转换的关键机制,包括static_cast、dynamic_cast、const_cast和reinterpret_cast四种。这些操作符适用于不同的场景,通过实例源代码详细阐述了它们的使用方法和步骤。在C++中,强制类型转换是将一个数据类型的值转换为另一个数据类型的过......
  • 多线程相关
    一、多线程与锁0、用户空间和内核空间1、什么是进程:进程是资源分配的基本单位(形象理解为程序进入内存运行的内容)2、什么是线程:程序执行的基本单位3、CAS的低层实现是汇编通过lockcmpxchg指令实现CAS的原子性4、对象在内存中的存储布局(刚new出来的时候)/(对象头和类型指针......
  • C++ 第四节课 C和C++指针的区别 C的宏函数和C++内联函数的优缺点
    #include<iostream>//定义一个宏函数#defineADD(x,y)x+y;//宏函数具有速度快等特点但是写代码有些业务比较繁琐,所以C++中使用了内联函数优化//在定义函数前面添加一个inline把这个函数变成内联函数inlineintmax(intx,inty){returnx>y?x:y;}usi......
  • c++ 通过一个临时的空的智能指针(裸指针),交换两个智能指针(裸指针)的值,注意是交换值!!!!
    #include<iostream>#include<memory>usingnamespacestd;structParenTask//父亲{stringname{};};typedefshared_ptr<ParenTask>ParenTaskPtr;structSubTask:ParenTask//孩子{intuid{};};typedefshared_ptr<SubTask&g......
  • M1 MacOS 配置C++环境时遇到的插件问题
    配置环境参照博客:https://www.cnblogs.com/BYGAO/p/15135609.html遇到问题:在Terminal--ConfigureDefaultBuildTasks--C/C++clang++这一步没有C/C++clang++选项。解决方法:shift+command+p打开搜索框,搜索SettingsSync:ShowSettings在IgnoredExtensions中打......
  • C++ 第三节课 指针的使用
    #include<iostream>usingnamespacestd;voidshow(){cout<<"全局函数"<<endl;}structStu{inta;voidwrite_code(){cout<<"成员函数"<<endl;}};intmain(){cout<<......