文章目录
概要
增加扇入扇出:
优化:子线程维护自己的本地队列
分析:
目前文章《线程池一》介绍了一个简单的线程池,存在多个线程同时访问一个任务队列Task,出现抢锁的情况,这样会存在一定的性能消耗,会导致有些没抢到任务的线程没事做,造成资源浪费。
实现:
为每个线程创建一个任务队列,让每个线程维护自己的任务队列,通过生产者生产的任务,依次分发到各个队列中,避免了多个线程去抢一个锁。
整体架构流程
线程池代码实现
//<ThreadPool.h>
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <functional>
#include <random>
#include <chrono>
#include <windows.h>
//任务类型,这里简单地使用 std::function<void()>
using Task = std::function<void()>;
class ThreadPool {
public:
ThreadPool(int threadNumbers)
:max_threads(threadNumbers),
startTime(std::chrono::high_resolution_clock::now()),
LocalMutex(threadNumbers),
LocalTask(threadNumbers),
stop(threadNumbers),
cond_varVec(threadNumbers)
{
for (int i = 0; i < threadNumbers; i++)
{
workers.emplace_back(WorkerThread, this, i);
}
}
~ThreadPool()
{
{
for (size_t i = 0; i < max_threads; i++)
{
std::unique_lock<std::mutex> lock(LocalMutex[i]);
stop[i] = true;
cond_varVec[i].notify_all(); // 通知所有工作线程停止
}
}
for (std::thread& worker : workers) {
worker.join();
}
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - startTime);
std::cout << "Total time: " << duration.count() << " ms" << std::endl;
}
// 添加新任务到线程池
void AddTask(Task task, int hashIndex);
private:
//线程的工作函数
static void WorkerThread(void* arg,int threadIndex);
private:
std::vector<std::thread> workers; // 工作线程池
std::vector<std::condition_variable> cond_varVec; // 条件变量,用于通知工作线程有新任务
int max_threads; // 线程池的最大线程数
std::vector<bool> stop; // 线程池停止标志
std::vector<std::deque<Task>> LocalTask; // 线程内部维护的任务队列
std::vector<std::mutex> LocalMutex; // 子线程内部的互斥锁
std::chrono::high_resolution_clock::time_point startTime;
};
//-------------<ThreadPool.cpp>------------------
#include "ThreadPool.h"
void ThreadPool::AddTask(Task task,int hashIndex) {
{
std::unique_lock<std::mutex> lock(LocalMutex[hashIndex]);
LocalTask[hashIndex].push_back(task);
//tasks.push(task);
}
cond_varVec[hashIndex].notify_one(); // 通知一个工作线程有新任务
}
void ThreadPool::WorkerThread(void* arg, int threadIndex)
{
ThreadPool* pool = (ThreadPool*)arg;
while (true)
{
std::unique_lock<std::mutex>lock(pool->LocalMutex[threadIndex]);
pool->cond_varVec[threadIndex].wait(lock, [pool, threadIndex] { return pool->stop[threadIndex] || !pool->LocalTask[threadIndex].empty(); });
if (pool->stop[threadIndex] && pool->LocalTask[threadIndex].empty()) {
break;
}
Task task = pool->LocalTask[threadIndex].front();
pool->LocalTask[threadIndex].pop_front();
task(); // 执行任务
}
}
技术细节
在《线程池一》的基础上,我修改了任务队列,将queue改成了类型为deque的数组,每个队列都用自己的互斥锁与condition_variable。
小结
测试场景一:五百个任务,四个线程去执行
int main() {
const int num_tasks = 500;
const int num_threads = 4;
ThreadPool pool(num_threads);
// 添加任务到线程池
for (int i = 0; i < num_tasks; ++i) {
int hashIndex = i % num_threads;
pool.AddTask(std::bind(DoWork, i), hashIndex);
}
return 0;
}
线程池(一)耗时
线程池(二)耗时