Thread_Pool 项目解析
简介
ThreadPool 是一个轻量级的 C++ 线程池实现,旨在简化多线程编程。
项目分析
我们首先上github的项目地址:https://github.com/progschj/ThreadPool,然后克隆项目到本地。
点开项目的ThrealPool.h文件,查看源码:
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
#endif
类成员分析
接下来,我们一步一步分析源代码。
在整个文件中只定义一个类ThreadPool,它的类成员有:
std::vector< std::thread > workers;//存储处理任务的线程
std::queue< std::function<void()> > tasks;//存储任务的队列
std::mutex queue_mutex; // 互斥锁
std::condition_variable condition; // 条件变量,和上面的互斥锁保证多线程的同步和互斥
bool stop; // 线程池的是否停止的标志
ThreadPool初始化
先上代码:
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
ThreadPool 的初始化需传入一个参数threads,且将stop赋值为0.
接着往workers里加入threads个线程,每个线程都执行死循环:
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
在循环中,先定义锁,再调用condition.wait()方法,当线程池运行且任务队列为空时,线程堵塞,否则线程继续运行,然后当线程池停止且任务队列为空时,跳出循环,结束线程。否则从取出任务队列的第一个任务,执行任务。
ThreadPool enqueue 加入队列
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
enqueue 方法是模板函数,传入可调用对象F和任意数量的的参数args,,返回一个future对象,返回线程异步操作的结果。
using return_type = typename std::result_of<F(Args...)>::type;
首先,定义返回类型return_type,表示传入的可调用对象的返回值的类型。
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
程序创建智能指针task,其指向了一个使用bind绑定的可调用对象(该对象调用f,并传入参数args),再使用packaged_task包装成可调用对象。创建智能指针的目的是为了其他线程的使用。
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
使用res保存任务线程的异步结果,并作为返回值。
然后在代码块中使用互斥锁加锁,然后将任务加入任务队列中。
最后通知线程池中的一个线程处理任务并返回res。
ThreadPool 析构函数
看注释就可以了:
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
//表示线程池停止。
}
condition.notify_all(); // 通知所有线程
for(std::thread &worker: workers)
worker.join(); // 等待所有线程结束
}
总结:
ThreadPool 的运行步骤可以分为以下几步:
- 创建ThreadPool对象,传入线程池工作线程数量。在线程池中填加工作线程,并堵塞等待任务线程的通知。
- 调用enqueue方法,传入可调用对象和参数。在该方法中,enqueue先通过一系列操作调整传入的参数,再将其加入任务队列。
- 以上操作完成后,通知线程池中的一个线程处理任务。在线程池中取出任务队列的当前最先进来的任务处理。
- 处理完任务将结果保存到enqueue里的异步返回结果的future对象中,并通过enqueue返回。
- ThreadPool对象被销毁时,将标志stop设置为true,并会通知所有堵塞线程,等待线程池中的所有线程结束。
ThreadPool 实现简单的线程池,使用简单的先进先出策略调度任务,如果可以使用更加复杂的策略,我们可以自己修改代码。