首页 > 编程语言 >c++线程池

c++线程池

时间:2023-01-30 15:45:02浏览次数:40  
标签:std thread c++ _. state 线程 result id

#pragma once

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>

#include "glog/logging.h"

#include "basic/helper/helper.h"

// Usage:

// enqueue and store future
// result type is std::pair<int32_t,std::future<T>>
// first id thread_id, second is function result
// auto result = ThreadPool::Get().Enqueue([i]() {return i;})
// auto result = ThreadPool::Get().Enqueue(thread_id ,[i]() {return
// i;})

// get result from future
// result.second.get()

// get thread state
// key is thread id , value is thread state the thread
// state has kWaiting, kRunning, kFinished
// const std::map<int32_t, ThreadState>& thread_state =
// ThreadPool::Get().thread_state();



enum class ThreadState { kWaiting = 0, kRunning = 1, kFinished = 2 };

class ThreadPool {
 public:
  DECLARE_SINGLETON(ThreadPool);

  template <class F, class... Args>
  std::pair<int32_t, std::future<typename std::result_of<F(Args...)>::type>>
  Enqueue(int32_t thread_id, F&& f, Args&&... args);

  template <class F, class... Args>
  std::pair<int32_t, std::future<typename std::result_of<F(Args...)>::type>>
  Enqueue(F&& f, Args&&... args);

  const std::map<int32_t, ThreadState>& thread_state() { return thread_state_; }

  inline bool Exist(int32_t id) {
    return thread_state_.find(id) != thread_state_.end();
  }

  inline bool IsFinished(int32_t id) {
    return thread_state_.at(id) == ThreadState::kFinished;
  }

  ~ThreadPool();

 private:
  ThreadPool();

  // need to keep track of threads so we can join them
  std::vector<std::thread> workers_;
  // the task queue
  std::queue<std::pair<int32_t, std::function<void()>>> tasks_;
  std::queue<int32_t> free_thread_id_;
  // synchronization
  std::mutex queue_mutex_;
  std::mutex thread_id_mutex_;
  std::condition_variable condition_;
  std::map<int32_t, ThreadState> thread_state_;

  std::atomic<int32_t> thread_id_;
  static constexpr size_t threads_ = 32;
  bool stop_;
};

inline ThreadPool::ThreadPool() : stop_(false) {
  for (size_t i = 0; i < threads_; ++i)
    workers_.emplace_back([this] {
      for (;;) {
        std::function<void()> task;
        int32_t thread_id = 0;
        {
          std::unique_lock<std::mutex> lock(this->queue_mutex_);
          this->condition_.wait(
              lock, [this] { return this->stop_ || !this->tasks_.empty(); });
          if (this->stop_ && this->tasks_.empty()) return;
          task = std::move(this->tasks_.front().second);
          thread_id = this->tasks_.front().first;
          thread_state_[thread_id] = ThreadState::kRunning;
          this->tasks_.pop();
        }
        task();
        {
          std::unique_lock<std::mutex> lock(this->thread_id_mutex_);
          if (thread_id < 0) {
            free_thread_id_.push(thread_id);
          }
          thread_state_[thread_id] = ThreadState::kFinished;
        }
      }
    });
}
// add new work item to the pool
template <class F, class... Args>
std::pair<int32_t, std::future<typename std::result_of<F(Args...)>::type>>
ThreadPool::Enqueue(int32_t thread_id, F&& f, Args&&... args) {
  using return_type = typename std::result_of<F(Args...)>::type;

  auto task = std::make_shared<std::packaged_task<return_type()>>(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  std::pair<int32_t, std::future<return_type>> result;
  result.first = thread_id;
  result.second = task->get_future();
  {
    std::unique_lock<std::mutex> lock(queue_mutex_);

    // TODO(haohu): if ThreadPool stop, not add new task
    thread_state_[thread_id] = ThreadState::kWaiting;
    tasks_.emplace(std::make_pair(thread_id, [task]() { (*task)(); }));
  }
  condition_.notify_one();
  return result;
}

// add new work item to the pool
template <class F, class... Args>
std::pair<int32_t, std::future<typename std::result_of<F(Args...)>::type>>
ThreadPool::Enqueue(F&& f, Args&&... args) {
  using return_type = typename std::result_of<F(Args...)>::type;
  int32_t thread_id = 0;
  {
    std::unique_lock<std::mutex> lock(thread_id_mutex_);
    if (!free_thread_id_.empty()) {
      thread_id = free_thread_id_.front();
      free_thread_id_.pop();
    } else {
      thread_id = --this->thread_id_;
    }
  }

  auto task = std::make_shared<std::packaged_task<return_type()>>(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  std::pair<int32_t, std::future<return_type>> result;
  result.first = thread_id;
  result.second = task->get_future();
  {
    std::unique_lock<std::mutex> lock(queue_mutex_);

    // TODO(haohu): if ThreadPool stop, not add new task
    thread_state_[thread_id] = ThreadState::kWaiting;
    tasks_.emplace(std::make_pair(thread_id, [task]() { (*task)(); }));
  }
  condition_.notify_one();
  return result;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
  {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    stop_ = true;
    thread_state_.clear();
  }
  condition_.notify_all();

  LOG(INFO) << "Waiting for all jobs to finish.";

  for (std::thread& worker : workers_) worker.join();
}

标签:std,thread,c++,_.,state,线程,result,id
From: https://www.cnblogs.com/hh13579/p/17076175.html

相关文章

  • C++调用Python的API总结
    最近在做C++调Python的work,简单总结下(一) 初始化和关闭Python解释器#include<Python.h>Py_Initialize();…Py_Finalize();所有的Python程序都要在这之间执行(二)......
  • C++调用python脚本
    随着机器学习/深度学习这几年的的火热,python成了当红炸子鸡,使用python训练机器学习模型则成了开发人员们最喜欢的方法,但是由于过往调度系统一般都是用C++来开发的,因此......
  • C/C++自助攒机系统[2023-01-30]
    C/C++自助攒机系统[2023-01-30]自助攒机系统管理员可以录入如下几种硬件的价格、型号信息:CPU:主频、品牌、价格、图片硬盘:容量、品牌、价格显示器:尺寸、品牌、价格......
  • C/C++晋中理工学院数据结构[2023-01-30]
    C/C++晋中理工学院数据结构[2023-01-30]晋中理工学院数据结构实验周任务书2022-2023学年第1学期学院: 信创与大数据学院专业: 学生姓名: 学号......
  • C++ 树进阶系列之树状数组的树形之路
    1.前言树状数组也称二叉索引树,由PeterM.Fenwick于1994发明,也可称为Fenwick树。树状数组的设计非常精巧,多用于求解数列的前缀和、区间和等问题,为区间类型问题提供了模板......
  • C#调用C++动态链接库dll之P/Invoke方式 — 2.在C#控制台程序中调试C++动态链接库
    很简单1.C#控制台项目右键-属性-生成-允许不安全代码-打勾;2.C#控制台项目右键-属性-调试-启用本地代码调试-打勾;......
  • 多线程基本概念
    什么是进程,线程?线程是进程的最小执行单元,相当于不同的执行路径run和start的区别?run只是单纯的方法调用,在主线程进行.start相当于开启一个线程线程的启动......
  • 嵌入式面经_嵌入式面试题_嵌入式软件开发面经C++面经111道面试题答案解析
     本人2020年本硕毕业于广东工业大学:嵌入式许乔丹,牛客高级专栏作者,牛客大学讲师,在2020届秋招共拿到珠海格力,云从科技,CVTE,小米,美的,华为的嵌入式offer,签约CVTE嵌入式岗位,整......
  • 线程的生命周期,线程有几种状态
    线程的生命周期:创建-->就绪-->运行-->阻塞-->死亡线程的状态:新建状态(New):新创建了一个线程对象就绪状态(Runable):其它线程调用了该线程的start方法。该状态的线程都在......
  • 【博学谷学习记录】超强总结,用心分享 | 进程和线程的使用
    【博学谷IT技术支持】一、介绍进程:进程是资源分配最小单位;进程之间的资源是独立的;进程有自己独立的地址空间,每启动一个进程,系统都会为其分配地址空间;进程是可以并行执......