单任务队列线程池
用现代的C++标准库(线程+锁+条件变量)实现一个单任务队列的线程池非常简单。基本的实现思路是:在线程池构造时初始化线程数,在析构时停止线程池。对外只需要提供提交任务的接口即可。
接口设计
返回类型
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()); // 构造函数 template<typename F, typename... Args> auto enqueue(F &&f, Args &&...args); // 入队接口
入队接口enqueue()
这个模板函数返回值使用了auto
关键字进行推导,实际上的返回值其实是一个future。
输入参数
输入参数是一个可调用对象和它的参数,这里利用了C++11的可变参数模板来实现传递任意数量的可调用对象的参数。
基本实现
class ThreadPool { public: explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()); template<typename F, typename... Args> auto enqueue(F &&f, Args &&...args); ~ThreadPool(); private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; };
这个简单任务队列线程池的成员只有一个线程组,一个任务队列。为了保证任务队列的线程安全,提供一个锁。同时提供了一个条件变量,利用锁和条件变量,可以实现线程通知机制。线程通知机制指,刚开始时线程池中是没有任务的,所有的线程都等待任务的到来,当一个任务进入到线程池中,就会通知一个线程去处理到来的任务。
同时又提供一个stop变量,用来在析构的时候停止和清理任务和线程。因为懒(高情商:RAII风格线程池,生命周期基本上与应用的生命周期一致),没有提供stop接口。
下面是具体实现:
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> class ThreadPool { public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function<void()> > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for(size_t i = 0;i<threads;++i) workers.emplace_back( [this] { for(;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); } // add new work item to the pool template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); // don't allow enqueueing after stopping the pool if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for(std::thread &worker: workers) worker.join(); } #endif
参考:
https://github.com/lzpong/threadpool
https://github.com/progschj/ThreadPool
标签:11,std,task,stop,C++,ThreadPool,线程,include From: https://www.cnblogs.com/carsonzhu/p/16799198.html