#pragma once
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
#include "glog/logging.h"
#include "basic/helper/helper.h"
// Usage:
// enqueue and store future
// result type is std::pair<int32_t,std::future<T>>
// first id thread_id, second is function result
// auto result = ThreadPool::Get().Enqueue([i]() {return i;})
// auto result = ThreadPool::Get().Enqueue(thread_id ,[i]() {return
// i;})
// get result from future
// result.second.get()
// get thread state
// key is thread id , value is thread state the thread
// state has kWaiting, kRunning, kFinished
// const std::map<int32_t, ThreadState>& thread_state =
// ThreadPool::Get().thread_state();
enum class ThreadState { kWaiting = 0, kRunning = 1, kFinished = 2 };
class ThreadPool {
public:
DECLARE_SINGLETON(ThreadPool);
template <class F, class... Args>
std::pair<int32_t, std::future<typename std::result_of<F(Args...)>::type>>
Enqueue(int32_t thread_id, F&& f, Args&&... args);
template <class F, class... Args>
std::pair<int32_t, std::future<typename std::result_of<F(Args...)>::type>>
Enqueue(F&& f, Args&&... args);
const std::map<int32_t, ThreadState>& thread_state() { return thread_state_; }
inline bool Exist(int32_t id) {
return thread_state_.find(id) != thread_state_.end();
}
inline bool IsFinished(int32_t id) {
return thread_state_.at(id) == ThreadState::kFinished;
}
~ThreadPool();
private:
ThreadPool();
// need to keep track of threads so we can join them
std::vector<std::thread> workers_;
// the task queue
std::queue<std::pair<int32_t, std::function<void()>>> tasks_;
std::queue<int32_t> free_thread_id_;
// synchronization
std::mutex queue_mutex_;
std::mutex thread_id_mutex_;
std::condition_variable condition_;
std::map<int32_t, ThreadState> thread_state_;
std::atomic<int32_t> thread_id_;
static constexpr size_t threads_ = 32;
bool stop_;
};
inline ThreadPool::ThreadPool() : stop_(false) {
for (size_t i = 0; i < threads_; ++i)
workers_.emplace_back([this] {
for (;;) {
std::function<void()> task;
int32_t thread_id = 0;
{
std::unique_lock<std::mutex> lock(this->queue_mutex_);
this->condition_.wait(
lock, [this] { return this->stop_ || !this->tasks_.empty(); });
if (this->stop_ && this->tasks_.empty()) return;
task = std::move(this->tasks_.front().second);
thread_id = this->tasks_.front().first;
thread_state_[thread_id] = ThreadState::kRunning;
this->tasks_.pop();
}
task();
{
std::unique_lock<std::mutex> lock(this->thread_id_mutex_);
if (thread_id < 0) {
free_thread_id_.push(thread_id);
}
thread_state_[thread_id] = ThreadState::kFinished;
}
}
});
}
// add new work item to the pool
template <class F, class... Args>
std::pair<int32_t, std::future<typename std::result_of<F(Args...)>::type>>
ThreadPool::Enqueue(int32_t thread_id, F&& f, Args&&... args) {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::pair<int32_t, std::future<return_type>> result;
result.first = thread_id;
result.second = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// TODO(haohu): if ThreadPool stop, not add new task
thread_state_[thread_id] = ThreadState::kWaiting;
tasks_.emplace(std::make_pair(thread_id, [task]() { (*task)(); }));
}
condition_.notify_one();
return result;
}
// add new work item to the pool
template <class F, class... Args>
std::pair<int32_t, std::future<typename std::result_of<F(Args...)>::type>>
ThreadPool::Enqueue(F&& f, Args&&... args) {
using return_type = typename std::result_of<F(Args...)>::type;
int32_t thread_id = 0;
{
std::unique_lock<std::mutex> lock(thread_id_mutex_);
if (!free_thread_id_.empty()) {
thread_id = free_thread_id_.front();
free_thread_id_.pop();
} else {
thread_id = --this->thread_id_;
}
}
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::pair<int32_t, std::future<return_type>> result;
result.first = thread_id;
result.second = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// TODO(haohu): if ThreadPool stop, not add new task
thread_state_[thread_id] = ThreadState::kWaiting;
tasks_.emplace(std::make_pair(thread_id, [task]() { (*task)(); }));
}
condition_.notify_one();
return result;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
thread_state_.clear();
}
condition_.notify_all();
LOG(INFO) << "Waiting for all jobs to finish.";
for (std::thread& worker : workers_) worker.join();
}
标签:std,thread,c++,_.,state,线程,result,id
From: https://www.cnblogs.com/hh13579/p/17076175.html