CS149 ass2 线程池+ 计算图
任务描述
- 实现线程池,要求同时实现一个spin和sleep的线程池。
- 要求实现一个计算图,任务按照顺序完成
我的收获
这个assign不算难,主要目的就是学习并发编程。
我从这个Project学习的到:
- condition variable可以通过
wait(lock, stop_check())
的方式使用,但是这里的stop_check()
是有讲究的。例如wait(locka, [&]()->bool{return a || b;})
。condition 通过不同check a和b判断需不要终止。这里有一个原则:对stop_check
中涉及到的所有用于判断的变量,在代码任意处对它们操作,都应该加相同的锁。这里的例子中涉及到的用于判断的变量是a
和b
,也就是说,如果需要再别处对这两个变量进行修改,那么必须加上相同的锁也就是locka
。这里简单解释一下,如果代码中某处修改了a
// part A
con_.wait(locka, [&]()->bool{return a|| b;});
// part B
// somewhere
a = true
con_.notify_one()
那么假设线程1执行到partB,但是还没来得及修改a,而线程2执行到part A 刚好在stop_check
执行过程中,也就是刚好在执行[&]()->bool{return a|| b;}
,由于此时a为false,因此stop_check
返回false,线程2还未wait在con_上。此时线程1,修改a的值,它是唤醒wait在con_上的所有线程,但是由于线程2还没来得及wait在con_上,因此notify_one
直接pass。此时线程2因为stop_check
返回false,它会wait在con_上,等待其他线程唤醒。但是线程1已经在它wait之前notify
了,所有线程2会无限wait。造成一个死锁的假象。
原则:对stop_check
中任何涉及到的非局部变量操作时,都要加相同的锁!
- 如果要同时加两把锁,那么应该遵循相同的加锁顺序,和释放锁的顺序。例如如果所有线程申请两把锁,那么所有线程都应该遵循的方式都是:
locka.lock()
lockb.lock()
lockb.unlock()
locka.unlock()
- 多使用atomic。
我的实现
为这个线程池的类,构造一个工作队列work_q_
,线程池中所有线程都从工作队列中获取任务
struct taskTypeA
{
IRunnable* runnable;
int num_total_tasks;
int assigned_id;
TaskID task_id;
taskTypeA(IRunnable* runnable, int num_total_tasks, int assigned_id, TaskID task_id):runnable(runnable), num_total_tasks(num_total_tasks), assigned_id(assigned_id), task_id(task_id){}
};
class TaskSystemParallelThreadPoolSleeping: public ITaskSystem {
public:
TaskSystemParallelThreadPoolSleeping(int num_threads);
~TaskSystemParallelThreadPoolSleeping();
const char* name();
void run(IRunnable* runnable, int num_total_tasks);
TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps);
void sync();
private:
std::queue<taskTypeA> work_q_;
std::unordered_map<TaskID, std::pair<int, int> > completed_m_; // total_nums and completed_nums
std::mutex work_lock_;
std::mutex completed_lock_;
bool kill_signal_;
std::vector<std::thread> workers;
std::condition_variable work_con_;
std::condition_variable completed_con_;
std::atomic<TaskID> task_id_gen_;
int num_threads_;
};
在TaskSystemParallelThreadPoolSleeping
类的构造函数中,同时构造一个线程池,这个线程池中所有的线程,都是sleep状态,因此也不会占据资源。
tasksystemparallelthreadpoolsleeping::tasksystemparallelthreadpoolsleeping(int num_threads): itasksystem(num_threads) {
kill_signal_ = false;
task_id_gen_.store(0);
num_threads_ = num_threads;
auto runtask =[&]{
while (true) {
std::unique_lock<std::mutex> work_lock(work_lock_);
work_con_.wait(work_lock, [this]() {
return this->kill_signal_ || !this->work_q_.empty();
});
if (kill_signal_) {
break;
}
// get task info
tasktypea task = work_q_.front();
work_q_.pop();
taskid task_id = task.task_id;
int num_total_tasks = task.num_total_tasks;
int assigned_id = task.assigned_id;
irunnable* runnable = task.runnable;
int my_assigned_id = assigned_id + 1;
if(my_assigned_id < (num_total_tasks -1)){
work_q_.push(tasktypea(runnable, num_total_tasks, my_assigned_id, task_id));
}
work_lock.unlock();
runnable->runtask(my_assigned_id, num_total_tasks);
completed_lock_.lock();
auto cp = completed_m_[task_id];
int completed_size = cp.second + 1;
completed_m_[task_id] = std::make_pair(cp.first, completed_size);
if(completed_size == num_total_tasks){
completed_con_.notify_all();
}
completed_lock_.unlock();
}
};
workers.resize(num_threads);
for (int i = 0; i < num_threads; i++) {
workers[i] = std::thread(runtask);
}
}
这个线程池中的所有线程都从一个工作队列中获取相应一个Task,然后按照要求执行这个Task。如果这个的Task已经执行被分配完成了,那么就不放回工作队列work_q_
中,反之,则继续放回。
线程完成相应的Task的一部分任务后,把完成程度写到map中相应的element。例如Task0需要别切分为32份,那么每个线程都各自领一份,执行完成自己的部分后,对completed_size
加一后写回。如果线程发现这个Task0的completed_size
已经达到32,那么就把所有等待task0完成的进程唤醒。
sync方式执行Task
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
work_lock_.lock();
completed_lock_.lock();
// 注意这里一个要加上两把锁,因为只有在complete_m_也注册过后才能发布任务
TaskID task_id = task_id_gen_.fetch_add(1);
work_q_.push(taskTypeA(runnable, num_total_tasks, -1, task_id));
completed_m_[task_id] = std::make_pair(num_total_tasks, 0);
completed_lock_.unlock();
work_lock_.unlock();
work_con_.notify_all();
std::unique_lock<std::mutex> complete_lock(completed_lock_);
completed_con_.wait(complete_lock, [this, task_id]()->bool{
auto p = completed_m_[task_id];
return p.second == p.first;
});
complete_lock.unlock();
// work done
}
async方式执行
async方式也很简单,但是需要等待某些任务的完成。例如TaskC依赖TaskA和TaskB,需要同时等待TaskA和TaskB完成后再执行。async的意思就是发布任务后,不再等待完成,而是直接返回相应的taskID。
TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps) {
// wait completed
std::unique_lock<std::mutex> complete_lock(completed_lock_);
completed_con_.wait(complete_lock, [this, &deps]()->bool{
for(auto task_id: deps){
auto p = this->completed_m_[task_id];
if(p.second != p.first){
return false;
}
}
return true;
});
complete_lock.unlock();
// 发布任务后直接返回
work_lock_.lock();
completed_lock_.lock();
TaskID task_id = task_id_gen_.fetch_add(1);
work_q_.push(taskTypeA(runnable, num_total_tasks, -1, task_id));
completed_m_[task_id] = std::make_pair(num_total_tasks, 0);
completed_lock_.unlock();
work_lock_.unlock();
work_con_.notify_all();
return task_id;
}
sync
等待之前所有发布任务都完成
void TaskSystemParallelThreadPoolSleeping::sync() {
//
// TODO: CS149 students will modify the implementation of this method in Part B.
//
std::vector<TaskID> previous_tasks;
completed_lock_.lock();
for(auto& p :completed_m_){
auto task_id = p.first;
int num_total_tasks = p.second.first;
int completed_nums = p.second.second;
if(num_total_tasks != completed_nums){
previous_tasks.push_back(task_id);
}
}
completed_lock_.unlock();
// wait
std::unique_lock<std::mutex> completed_lock(completed_lock_);
completed_con_.wait(completed_lock, [this, &previous_tasks]()->bool{
for(TaskID task_id: previous_tasks){
auto p = this->completed_m_[task_id];
if(p.second != p.first){
return false;
}
}
return true;
});
completed_lock.unlock();
return;
}
析构函数销毁所有线程
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
work_lock_.lock();
kill_signal_ = true;
work_lock_.unlock();
work_con_.notify_all();
for(int i=0; i<num_threads_; i++){
workers[i].join();
}
}
标签:task,lock,completed,_.,线程,计算,CS149ass2,id
From: https://www.cnblogs.com/kalicener/p/16893387.html