设计思路
tasks: 任务队列,每当有新任务时,就addTask到该队列
workers: 工作线程,不断地从tasks中取任务执行
queueMutex: 任务队列互斥锁,防止在addTask时出现冲突
condition_variable: 条件变量,当任务队列为空时阻塞线程,等待任务被添加进队列
function<void()> : 函数对象,tasks队列的成员,当前每一个都可以当成返回值为void、无参数的函数执行,由于后续添加任务时多数是带有返回值和参数的,因此需要使用bind函数绑定所有参数适配成void()类型,使用future获取所添加的任务返回值
实现步骤
整体结构
class ThreadPool
{
using Task = std::function<void()>;
public:
explicit ThreadPool(size_t threads);
template <class F, class... Args>
auto addTask(F&& f, Args&& ...args)->
std::future<std::result_of_t<F(Args...)>>; // 类型后置,获取返回值
void stop();
~ThreadPool();
private:
std::vector<std::thread> _workers;
std::queue<Task> _tasks;
std::mutex _queueMutex;
std::condition_variable _cv;
bool _stop;
private:
void executeTask();
};
初始化
创建指定数量的线程,每个线程负责执行excuteTask函数(不断取任务执行)
ThreadPool::ThreadPool(size_t threads)
: _stop(false)
{
for (int i = 0; i < threads; ++i)
{
_workers.emplace_back(&ThreadPool::executeTask,this);
}
}
执行任务
不断从tasks队列取任务并执行,由于同时会有多个线程读写tasks队列会出现冲突,因为需要加锁,使用条件变量,当tasks队列为空时阻塞线程,如果线程池停止(stop=true)同时队列为空则整个线程终止.
void ThreadPool::executeTask()
{
while (true)
{
std::unique_lock<std::mutex> lock{_queueMutex};
_cv.wait(lock,[this]{
return _stop || !(_tasks.empty());
});
if (_stop && _tasks.empty())
return;
auto task = std::move(_tasks.front());
_tasks.pop();
lock.unlock();
task();
}
}
添加任务
可变参数模板
该语法支持添加任意数目参数的函数,通过执行f(args...)可执行添加的任务函数。可以使用bind函数绑定f函数和它的所有参数,将其转换成无参的task函数对象(由于fun函数形参是&&,右值引用型,故需用到forward函数,详细请搜索右值引用)
template<class F, class... Args>
ret fun(F&& f, Args&& ...args){
std::funtion<void()> task = std::bind(std::forward<F>(f),
std::forward<Args>(args)...);
.....
}
decltype可以根据变量推测出类型。
添加任务到tasks队列,加锁防止冲突
template <class F, class... Args>
auto ThreadPool::addTask(F&& f, Args&& ...args) ->
std::future<std::result_of_t<F(Args...)>>
{
using RetType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<RetType> ret_future = task->get_future();
std::unique_lock<std::mutex> lock{_queueMutex};
if (_stop)
{
throw std::runtime_error("the threadPool is stopped.");
}
// 添加任务到任务队列
_tasks.emplace([task] {
(*task)();
});
lock.unlock();
_cv.notify_one();
return ret_future;
}
停止线程池
将_stop置为true,同时将workers中所有线程执行完毕,即可停止线程池
void ThreadPool::stop()
{
std::unique_lock<std::mutex> lock(_queueMutex);
_stop = true;
lock.unlock();
for (auto &t : _workers)
{
if (t.joinable())
t.join();
}
}
完整代码
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <thread>
#include <iostream>
#include <future>
#include <memory>
class ThreadPool
{
using Task = std::function<void()>;
public:
explicit ThreadPool(size_t threads);
template <class F, class... Args>
auto addTask(F&& f, Args&& ...args)->
std::future<std::result_of_t<F(Args...)>>; // 类型后置,获取返回值
void stop();
~ThreadPool();
private:
std::vector<std::thread> _workers;
std::queue<Task> _tasks;
std::mutex _queueMutex;
std::condition_variable _cv;
bool _stop;
private:
void executeTask();
};
ThreadPool::ThreadPool(size_t threads)
: _stop(false)
{
for (int i = 0; i < threads; ++i)
{
_workers.emplace_back(&ThreadPool::executeTask,this);
}
}
// add task for work_queue
template <class F, class... Args>
auto ThreadPool::addTask(F&& f, Args&& ...args) ->
std::future<std::result_of_t<F(Args...)>>
{
using RetType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<RetType> ret_future = task->get_future();
std::unique_lock<std::mutex> lock{_queueMutex};
if (_stop)
{
throw std::runtime_error("the threadPool is stopped.");
}
// 添加任务到任务队列
_tasks.emplace([task] {
(*task)();
});
lock.unlock();
_cv.notify_one();
return ret_future;
}
// _stop the thread_pool
inline void ThreadPool::stop()
{
std::unique_lock<std::mutex> lock(_queueMutex);
_stop = true;
lock.unlock();
for (auto &t : _workers)
{
if (t.joinable())
t.join();
}
}
// exectuTask
void ThreadPool::executeTask()
{
while (true)
{
std::unique_lock<std::mutex> lock{_queueMutex};
_cv.wait(lock,[this]{
return _stop || !(_tasks.empty());
});
if (_stop && _tasks.empty())
return;
auto task = std::move(_tasks.front());
_tasks.pop();
lock.unlock();
task();
}
}
ThreadPool::~ThreadPool()
{
stop();
}
#endif
标签:11,std,tasks,lock,stop,C++,ThreadPool,task,线程
From: https://www.cnblogs.com/runtimeerror/p/18608240