首页 > 其他分享 >CS149ass2线程池+计算图

CS149ass2线程池+计算图

时间:2022-11-15 18:15:16浏览次数:56  
标签:task lock completed _. 线程 计算 CS149ass2 id

CS149 ass2 线程池+ 计算图

任务描述

  • 实现线程池,要求同时实现一个spin和sleep的线程池。
  • 要求实现一个计算图,任务按照顺序完成
image-20221115172225634

我的收获

这个assign不算难,主要目的就是学习并发编程。

我从这个Project学习的到:

  1. condition variable可以通过wait(lock, stop_check())的方式使用,但是这里的stop_check()是有讲究的。例如wait(locka, [&]()->bool{return a || b;})。condition 通过不同check a和b判断需不要终止。这里有一个原则:对stop_check中涉及到的所有用于判断的变量,在代码任意处对它们操作,都应该加相同的锁。这里的例子中涉及到的用于判断的变量是ab,也就是说,如果需要再别处对这两个变量进行修改,那么必须加上相同的锁也就是locka。这里简单解释一下,如果代码中某处修改了a
// part A
con_.wait(locka, [&]()->bool{return a|| b;});

// part B
// somewhere
a = true
con_.notify_one()

那么假设线程1执行到partB,但是还没来得及修改a,而线程2执行到part A 刚好在stop_check执行过程中,也就是刚好在执行[&]()->bool{return a|| b;},由于此时a为false,因此stop_check返回false,线程2还未wait在con_上。此时线程1,修改a的值,它是唤醒wait在con_上的所有线程,但是由于线程2还没来得及wait在con_上,因此notify_one直接pass。此时线程2因为stop_check返回false,它会wait在con_上,等待其他线程唤醒。但是线程1已经在它wait之前notify了,所有线程2会无限wait。造成一个死锁的假象。

原则:对stop_check中任何涉及到的非局部变量操作时,都要加相同的锁!

  1. 如果要同时加两把锁,那么应该遵循相同的加锁顺序,和释放锁的顺序。例如如果所有线程申请两把锁,那么所有线程都应该遵循的方式都是:
locka.lock()
lockb.lock()

lockb.unlock()
locka.unlock()
  1. 多使用atomic。

我的实现

为这个线程池的类,构造一个工作队列work_q_,线程池中所有线程都从工作队列中获取任务

struct taskTypeA
{
  IRunnable* runnable;
  int num_total_tasks;
  int assigned_id;
  TaskID task_id;
  taskTypeA(IRunnable* runnable, int num_total_tasks, int assigned_id, TaskID task_id):runnable(runnable), num_total_tasks(num_total_tasks), assigned_id(assigned_id), task_id(task_id){}
};
class TaskSystemParallelThreadPoolSleeping: public ITaskSystem {
    public:
        TaskSystemParallelThreadPoolSleeping(int num_threads);
        ~TaskSystemParallelThreadPoolSleeping();
        const char* name();
        void run(IRunnable* runnable, int num_total_tasks);
        TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
                                const std::vector<TaskID>& deps);
        void sync();
    
    private:
        std::queue<taskTypeA> work_q_;
        std::unordered_map<TaskID, std::pair<int, int> > completed_m_; // total_nums and completed_nums

        std::mutex work_lock_;
        std::mutex completed_lock_;

        bool kill_signal_;
        std::vector<std::thread> workers;

        std::condition_variable work_con_;
        std::condition_variable completed_con_;

        std::atomic<TaskID> task_id_gen_;
        int num_threads_;
};

TaskSystemParallelThreadPoolSleeping类的构造函数中,同时构造一个线程池,这个线程池中所有的线程,都是sleep状态,因此也不会占据资源。

tasksystemparallelthreadpoolsleeping::tasksystemparallelthreadpoolsleeping(int num_threads): itasksystem(num_threads) {

    kill_signal_ = false;
    task_id_gen_.store(0);
    num_threads_ = num_threads;
    auto runtask =[&]{
        while (true) {
            std::unique_lock<std::mutex> work_lock(work_lock_);
            work_con_.wait(work_lock, [this]() {
                return this->kill_signal_ || !this->work_q_.empty();
            });
            if (kill_signal_) {
                break;
            }
            // get task info
            tasktypea task = work_q_.front();
            work_q_.pop();

            taskid task_id = task.task_id;
            int num_total_tasks = task.num_total_tasks;
            int assigned_id = task.assigned_id;
            irunnable* runnable = task.runnable;
            
            int my_assigned_id = assigned_id + 1;
            if(my_assigned_id < (num_total_tasks -1)){
                work_q_.push(tasktypea(runnable, num_total_tasks, my_assigned_id, task_id));
            }
            work_lock.unlock();
            
            runnable->runtask(my_assigned_id, num_total_tasks);
            completed_lock_.lock();
            auto cp = completed_m_[task_id];
            int completed_size = cp.second + 1;
            completed_m_[task_id] = std::make_pair(cp.first, completed_size);
            if(completed_size == num_total_tasks){
                completed_con_.notify_all();
            }
            completed_lock_.unlock();
        }
    };
    workers.resize(num_threads);
    for (int i = 0; i < num_threads; i++) {
        workers[i] = std::thread(runtask);
    }
}

这个线程池中的所有线程都从一个工作队列中获取相应一个Task,然后按照要求执行这个Task。如果这个的Task已经执行被分配完成了,那么就不放回工作队列work_q_中,反之,则继续放回。

线程完成相应的Task的一部分任务后,把完成程度写到map中相应的element。例如Task0需要别切分为32份,那么每个线程都各自领一份,执行完成自己的部分后,对completed_size加一后写回。如果线程发现这个Task0的completed_size已经达到32,那么就把所有等待task0完成的进程唤醒。

sync方式执行Task

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {

    work_lock_.lock();
    completed_lock_.lock();
    // 注意这里一个要加上两把锁,因为只有在complete_m_也注册过后才能发布任务
    TaskID task_id =  task_id_gen_.fetch_add(1);
    work_q_.push(taskTypeA(runnable, num_total_tasks, -1, task_id));
    completed_m_[task_id] = std::make_pair(num_total_tasks, 0);
    completed_lock_.unlock();
    work_lock_.unlock();

    work_con_.notify_all();

    std::unique_lock<std::mutex> complete_lock(completed_lock_);
    completed_con_.wait(complete_lock, [this, task_id]()->bool{
        auto p = completed_m_[task_id];
        return p.second == p.first;
    });
    complete_lock.unlock();

    // work done
}

async方式执行

async方式也很简单,但是需要等待某些任务的完成。例如TaskC依赖TaskA和TaskB,需要同时等待TaskA和TaskB完成后再执行。async的意思就是发布任务后,不再等待完成,而是直接返回相应的taskID。

TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
                                                    const std::vector<TaskID>& deps) {
    
    // wait completed
    std::unique_lock<std::mutex> complete_lock(completed_lock_);
    completed_con_.wait(complete_lock, [this, &deps]()->bool{
        for(auto task_id: deps){
            auto p = this->completed_m_[task_id];
            if(p.second != p.first){
                return false;
            }
        }
        return true;
    });
    complete_lock.unlock();
    

    // 发布任务后直接返回
    work_lock_.lock();
    completed_lock_.lock();
    TaskID task_id =  task_id_gen_.fetch_add(1);
    work_q_.push(taskTypeA(runnable, num_total_tasks, -1, task_id));
    completed_m_[task_id] = std::make_pair(num_total_tasks, 0);
    completed_lock_.unlock();
    work_lock_.unlock();


    work_con_.notify_all();
    
    return task_id;
}

sync

等待之前所有发布任务都完成

void TaskSystemParallelThreadPoolSleeping::sync() {

    //
    // TODO: CS149 students will modify the implementation of this method in Part B.
    //
    std::vector<TaskID> previous_tasks;
    completed_lock_.lock();
    for(auto& p :completed_m_){
        auto task_id = p.first;
        int num_total_tasks = p.second.first;
        int completed_nums = p.second.second;
        if(num_total_tasks != completed_nums){
            previous_tasks.push_back(task_id);
        }
    }
    completed_lock_.unlock();
    // wait
    std::unique_lock<std::mutex> completed_lock(completed_lock_);
    completed_con_.wait(completed_lock, [this, &previous_tasks]()->bool{
        for(TaskID task_id: previous_tasks){
            auto p = this->completed_m_[task_id];
            if(p.second != p.first){
                return false;
            }
        }
        return true;
    });
    completed_lock.unlock();

    return;
}

析构函数销毁所有线程

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    work_lock_.lock();
    kill_signal_ = true;
    work_lock_.unlock();
    work_con_.notify_all();
    for(int i=0; i<num_threads_; i++){
        workers[i].join();
    }    
}

标签:task,lock,completed,_.,线程,计算,CS149ass2,id
From: https://www.cnblogs.com/kalicener/p/16893387.html

相关文章