首页 > 编程语言 >线程池C++11实现

线程池C++11实现

时间:2024-12-15 17:55:22浏览次数:10  
标签:11 std tasks lock stop C++ ThreadPool task 线程

设计思路

tasks: 任务队列,每当有新任务时,就addTask到该队列

workers: 工作线程,不断地从tasks中取任务执行

queueMutex: 任务队列互斥锁,防止在addTask时出现冲突

condition_variable: 条件变量,当任务队列为空时阻塞线程,等待任务被添加进队列

function<void()> : 函数对象,tasks队列的成员,当前每一个都可以当成返回值为void、无参数的函数执行,由于后续添加任务时多数是带有返回值和参数的,因此需要使用bind函数绑定所有参数适配成void()类型,使用future获取所添加的任务返回值

实现步骤

整体结构

class ThreadPool
{
    using Task = std::function<void()>;

public:
    explicit ThreadPool(size_t threads);

    template <class F, class... Args>
    auto addTask(F&& f, Args&& ...args)->
        std::future<std::result_of_t<F(Args...)>>; // 类型后置,获取返回值

    void stop();
    ~ThreadPool();

private:
    std::vector<std::thread> _workers;
    std::queue<Task> _tasks;
    std::mutex _queueMutex;
    std::condition_variable _cv;

    bool _stop;
private:
    void executeTask();
};

初始化

创建指定数量的线程,每个线程负责执行excuteTask函数(不断取任务执行)

ThreadPool::ThreadPool(size_t threads)
    : _stop(false)
{
    for (int i = 0; i < threads; ++i)
    {
        _workers.emplace_back(&ThreadPool::executeTask,this);
    }
}

执行任务

不断从tasks队列取任务并执行,由于同时会有多个线程读写tasks队列会出现冲突,因为需要加锁,使用条件变量,当tasks队列为空时阻塞线程,如果线程池停止(stop=true)同时队列为空则整个线程终止.

void ThreadPool::executeTask()
{
    while (true)
    {
        std::unique_lock<std::mutex> lock{_queueMutex};
        _cv.wait(lock,[this]{
            return _stop || !(_tasks.empty());
        });
        if (_stop && _tasks.empty())
            return;

        auto task = std::move(_tasks.front());
        _tasks.pop();

        lock.unlock();
        task();
    }
}

添加任务

可变参数模板

该语法支持添加任意数目参数的函数,通过执行f(args...)可执行添加的任务函数。可以使用bind函数绑定f函数和它的所有参数,将其转换成无参的task函数对象(由于fun函数形参是&&,右值引用型,故需用到forward函数,详细请搜索右值引用)

template<class F, class... Args>
ret fun(F&& f, Args&& ...args){
    std::funtion<void()> task = std::bind(std::forward<F>(f), 
                                          std::forward<Args>(args)...);
    .....
}

decltype可以根据变量推测出类型。

添加任务到tasks队列,加锁防止冲突

template <class F, class... Args>
auto ThreadPool::addTask(F&& f, Args&& ...args) ->
    std::future<std::result_of_t<F(Args...)>>
{
    using RetType = decltype(f(args...));

    auto task = std::make_shared<std::packaged_task<RetType()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    std::future<RetType> ret_future = task->get_future();

    std::unique_lock<std::mutex> lock{_queueMutex};    
    if (_stop)
    {
        throw std::runtime_error("the threadPool is stopped.");
    }
    // 添加任务到任务队列
    _tasks.emplace([task] {
        (*task)();
    });
    lock.unlock();
    _cv.notify_one();

    return ret_future;
}

停止线程池

将_stop置为true,同时将workers中所有线程执行完毕,即可停止线程池

void ThreadPool::stop()
{
    std::unique_lock<std::mutex> lock(_queueMutex);
    _stop = true;
    lock.unlock();
    for (auto &t : _workers)
    {
        if (t.joinable())
            t.join();
    }
}

完整代码

#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <queue>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <thread>
#include <iostream>
#include <future>
#include <memory>
class ThreadPool
{
    using Task = std::function<void()>;

public:
    explicit ThreadPool(size_t threads);

    template <class F, class... Args>
    auto addTask(F&& f, Args&& ...args)->
        std::future<std::result_of_t<F(Args...)>>; // 类型后置,获取返回值

    void stop();
    ~ThreadPool();

private:
    std::vector<std::thread> _workers;
    std::queue<Task> _tasks;
    std::mutex _queueMutex;
    std::condition_variable _cv;

    bool _stop;
private:
    void executeTask();
};


ThreadPool::ThreadPool(size_t threads)
    : _stop(false)
{
    for (int i = 0; i < threads; ++i)
    {
        _workers.emplace_back(&ThreadPool::executeTask,this);
    }
}

// add task for work_queue
template <class F, class... Args>
auto ThreadPool::addTask(F&& f, Args&& ...args) ->
    std::future<std::result_of_t<F(Args...)>>
{
    using RetType = decltype(f(args...));

    auto task = std::make_shared<std::packaged_task<RetType()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    std::future<RetType> ret_future = task->get_future();

    std::unique_lock<std::mutex> lock{_queueMutex};    
    if (_stop)
    {
        throw std::runtime_error("the threadPool is stopped.");
    }
    // 添加任务到任务队列
    _tasks.emplace([task] {
        (*task)();
    });
    lock.unlock();
    _cv.notify_one();

    return ret_future;
}

// _stop the thread_pool
inline void ThreadPool::stop()
{
    std::unique_lock<std::mutex> lock(_queueMutex);
    _stop = true;
    lock.unlock();
    for (auto &t : _workers)
    {
        if (t.joinable())
            t.join();
    }
}

// exectuTask
void ThreadPool::executeTask()
{
    while (true)
    {
        std::unique_lock<std::mutex> lock{_queueMutex};
        _cv.wait(lock,[this]{
            return _stop || !(_tasks.empty());
        });
        if (_stop && _tasks.empty())
            return;

        auto task = std::move(_tasks.front());
        _tasks.pop();

        lock.unlock();
        task();
    }
}

ThreadPool::~ThreadPool()
{
    stop();
}

#endif

标签:11,std,tasks,lock,stop,C++,ThreadPool,task,线程
From: https://www.cnblogs.com/runtimeerror/p/18608240

相关文章

  • java如何请求接口然后终止某个线程
    Java请求接口并终止线程在Java开发中,处理多线程操作是常见需求。有时我们需要在请求某个接口后,根据接口返回结果或其他条件,终止某个线程的执行。本文将详细介绍如何在Java中请求接口并终止特定线程的方法。一、请求接口1.1使用 HttpURLConnectionJava提供了多种方式进行HTTP......
  • 超大规模数据库集群保稳系列:数据库攻防演练建设实践11
     01背景1.1初识混沌工程首先我们先了解一下什么是混沌工程?简单而言,混沌工程是在系统上进行实验的技术手段,目的是建立对系统抵御生产环境中失控条件的能力以及信心。这主要体现在两个方面,从系统角度来讲,混沌工程可以提升我们架构的容错能力和韧性,降低故障发生率和复发率,提......
  • C++ OCR文字识别api接口
    一.引言文字识别,也称为光学字符识别(OpticalCharacterRecognition,OCR),是一种将不同形式的文档(如扫描的纸质文档、PDF文件或数字相机拍摄的图片)中的文字转换成可编辑和可搜索的数据的技术。随着技术的发展,文字识别技术已经成为信息管理、自动化办公和智能系统的关键组成部分......
  • 前端面经每日一题day11
    前端面经整理-CSDN博客我找到内容题目就是在这里面,详细可以看这个。了解浏览器缓存机制浏览器缓存就是把已经请求过的资源存储起来,当下次需要该资源的时候,浏览器会根据缓存机制决定是直接使用缓存的资源还是向服务器发送请求。作用:降低服务器压力强制缓存>协商缓存强制......
  • 超大规模数据库集群保稳系列:数据库攻防演练建设实践11
     01背景1.1初识混沌工程首先我们先了解一下什么是混沌工程?简单而言,混沌工程是在系统上进行实验的技术手段,目的是建立对系统抵御生产环境中失控条件的能力以及信心。这主要体现在两个方面,从系统角度来讲,混沌工程可以提升我们架构的容错能力和韧性,降低故障发生率和复发率,提......
  • JavaEE 【知识改变命运】05 多线程(4)
    文章目录单例模式什么是单例模式饿汉模式懒汉模式多线程-懒汉模式分析多线程问题第一种添加sychronized的方式第二种添加sychronized的方式改进第二种添加sychronized的方式(DCL检查锁)阻塞队列什么是阻塞队列什么是消费生产者模型标准库中的阻塞队列消息队列应用的场景......
  • c++小结之字符串字面量
    存储区域字符串字面量是形如"Thisisabook.\n"这样的一组明确的字符串。字符串字面量通常存储在内存的静态存储区。静态存储区大小固定,不受操作系统影响,但是一般比较小。多个相同的字符串字面量多个相同的字符串字面量在内存是存储在同一个位置。比如:constchar*a="This......
  • 深入理解 Virtual Threads(虚拟线程)
    Java作为一种流行的编程语言,其生态系统在不断进化,尤其是在最新的版本中引入了许多令人兴奋的功能。本文将为您深入讲解Java的最新技术之一——VirtualThreads(虚拟线程),并探讨其在实际项目中的应用价值。什么是VirtualThreads?VirtualThreads是Java平台为解决高并发问......
  • MySQL 中 int(11) 的 11 表示什么?
    MySQL中int(11)的11表示什么?在MySQL中,int(11)中的11并不表示整数的取值范围,而是用于显示宽度(DisplayWidth)。它的含义和具体影响如下:1.显示宽度的定义显示宽度是指在使用ZEROFILL属性时,MySQL返回查询结果时显示的最小数字宽度。如果存储的整数值不足指定的宽......
  • GESP2024年12月认证C++四级( 第一部分选择题(6-10))
    ......