首页 > 其他分享 >ThreadPool解析

ThreadPool解析

时间:2025-01-17 13:43:39浏览次数:1  
标签:std task lock stop ThreadPool 线程 解析

Thread_Pool 项目解析

简介

ThreadPool 是一个轻量级的 C++ 线程池实现,旨在简化多线程编程。

项目分析

我们首先上github的项目地址:https://github.com/progschj/ThreadPool,然后克隆项目到本地。

点开项目的ThrealPool.h文件,查看源码:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;
    
    // synchronization

    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};
 
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif

类成员分析

接下来,我们一步一步分析源代码。

在整个文件中只定义一个类ThreadPool,它的类成员有:

    std::vector< std::thread > workers;//存储处理任务的线程
    std::queue< std::function<void()> > tasks;//存储任务的队列
    std::mutex queue_mutex; // 互斥锁
    std::condition_variable condition; // 条件变量,和上面的互斥锁保证多线程的同步和互斥
    bool stop; // 线程池的是否停止的标志

ThreadPool初始化

先上代码:

inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

ThreadPool 的初始化需传入一个参数threads,且将stop赋值为0.

接着往workers里加入threads个线程,每个线程都执行死循环:

            for(;;)
            {
                std::function<void()> task;
                {
                    std::unique_lock<std::mutex> lock(this->queue_mutex);
                    this->condition.wait(lock,
                        [this]{ return this->stop || !this->tasks.empty(); });
                    if(this->stop && this->tasks.empty())
                        return;
                    task = std::move(this->tasks.front());
                    this->tasks.pop();
                }

                task();
            }

在循环中,先定义锁,再调用condition.wait()方法,当线程池运行且任务队列为空时,线程堵塞,否则线程继续运行,然后当线程池停止且任务队列为空时,跳出循环,结束线程。否则从取出任务队列的第一个任务,执行任务。

ThreadPool enqueue 加入队列

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

enqueue 方法是模板函数,传入可调用对象F和任意数量的的参数args,,返回一个future对象,返回线程异步操作的结果。

using return_type = typename std::result_of<F(Args...)>::type;

首先,定义返回类型return_type,表示传入的可调用对象的返回值的类型。

auto task = std::make_shared< std::packaged_task<return_type()> >(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

程序创建智能指针task,其指向了一个使用bind绑定的可调用对象(该对象调用f,并传入参数args),再使用packaged_task包装成可调用对象。创建智能指针的目的是为了其他线程的使用。

std::future<return_type> res = task->get_future();
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");
    tasks.emplace([task](){ (*task)(); });
}

使用res保存任务线程的异步结果,并作为返回值。
然后在代码块中使用互斥锁加锁,然后将任务加入任务队列中。
最后通知线程池中的一个线程处理任务并返回res。

ThreadPool 析构函数

看注释就可以了:

inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);        
        stop = true;
        //表示线程池停止。
    }
    condition.notify_all();                 // 通知所有线程
    for(std::thread &worker: workers)
        worker.join();                      // 等待所有线程结束
}

总结:

ThreadPool 的运行步骤可以分为以下几步:

  1. 创建ThreadPool对象,传入线程池工作线程数量。在线程池中填加工作线程,并堵塞等待任务线程的通知。
  2. 调用enqueue方法,传入可调用对象和参数。在该方法中,enqueue先通过一系列操作调整传入的参数,再将其加入任务队列。
  3. 以上操作完成后,通知线程池中的一个线程处理任务。在线程池中取出任务队列的当前最先进来的任务处理。
  4. 处理完任务将结果保存到enqueue里的异步返回结果的future对象中,并通过enqueue返回。
  5. ThreadPool对象被销毁时,将标志stop设置为true,并会通知所有堵塞线程,等待线程池中的所有线程结束。
    ThreadPool 实现简单的线程池,使用简单的先进先出策略调度任务,如果可以使用更加复杂的策略,我们可以自己修改代码。

标签:std,task,lock,stop,ThreadPool,线程,解析
From: https://www.cnblogs.com/haruu/p/18676776

相关文章

  • Python魔法方法深度解析:解密 __call__、__new__ 和 __del__
    《PythonOpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门!解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界在Python中,魔法方法(MagicMethods)是一些特殊的方法,它们允许开发者定制对象的行为。这些方法前后由双下划线包围,如__init__、__str__、__call_......
  • 消息队列实战指南:三大MQ 与 Kafka 适用场景全解析
    前言:在当今数字化时代,分布式系统和大数据处理变得愈发普遍,消息队列作为其中的关键组件,承担着系统解耦、异步通信、流量削峰等重要职责。ActiveMQ、RabbitMQ、RocketMQ和Kafka作为市场上极具代表性的消息队列产品,各自拥有独特的功能特性与适用场景。本博客旨在深入剖析这四种消......
  • 矩阵碰一碰发视频源码技术开发全解析,支持OEM
    在当今数字化内容传播迅速发展的时代,矩阵碰一碰发视频功能以其便捷、高效的特点,为用户分享视频提供了全新的体验。本文将深入探讨矩阵碰一碰发视频源码的技术开发过程,从原理到实现,为开发者提供全面的技术指引。一、技术原理矩阵碰一碰发视频功能主要基于近场通信技术,如NFC(N......
  • GaussDB云原生数据库SQL引擎继承原来openGauss的词法解析,语法解析,查询重写,查询优化和
    云原生数据库SQL引擎继承原来openGauss的词法解析,语法解析,查询重写,查询优化和执行引擎的能力。由于云原生数据库是shareddisk架构,一个事务在一个节点上执行,所以不需要原来分布式根据分布式key进行数据分布,分布式执行和分布式2PC提交的能力。为了支持数据库粒度的异地多活,云原生......
  • 颜色代码解析
    颜色代码的格式通常是#AARRGGBB或#RRGGBB:AA:Alpha通道(透明度),00表示完全透明,FF表示完全不透明。RR:红色通道,00表示无红色,FF表示红色最大值。GG:绿色通道,00表示无绿色,FF表示绿色最大值。BB:蓝色通道,00表示无蓝色,FF表示蓝色最大值。例如:00FFFFFF:AA=00:完全透明。RR......
  • ZooKeeper 常见问题与核心机制解析
    Zookeeper集群本身不直接支持动态添加机器。在Zookeeper中,集群的配置是在启动时静态定义的,并且集群中的每个成员都需要知道其他所有成员。当你想要增加一个新的Zookeeper服务器到现有的集群中时,你需要更新所有现有服务器的配置文件(通常是zoo.cfg文件),以包含新的服务器信息。......
  • SpringBoot源码解析(七):应用上下文结构体系
    SpringBoot源码系列文章SpringBoot源码解析(一):SpringApplication构造方法SpringBoot源码解析(二):引导上下文DefaultBootstrapContextSpringBoot源码解析(三):启动开始阶段SpringBoot源码解析(四):解析应用参数argsSpringBoot源码解析(五):准备应用环境SpringBoot源码解......
  • SpelExpressionParser 是 Spring Expression Language(SpEL)中的一个重要组件,用于解析
    SpelExpressionParser 是SpringExpressionLanguage(SpEL)中的一个重要组件,主要用于解析和评估Spring表达式。以下是关于它的详细解释:主要功能表达式解析:将一个以字符串形式表示的Spring表达式转换为可执行的表达式对象。Spring表达式可以包含变量引用、方法调用、属性......
  • 市面上唯一一本全面解析Transformer的书《Transformer、BERT、GPT 大语言模型原理深度
    Transformer,BERT,andGPT:IncludingChatGPTandPromptEngineering,出版于2023年11月,作者是奥斯瓦尔德·坎佩萨托(OswaldCampesato)奥斯瓦尔德·坎佩萨托(OswaldCampesato):专门研究深度学习、Java、Android和TensorFlow。他是25本书的作者/合著者,其中包括TensorF......
  • STM32F103使用flash_algo解析FLM相关
    1、全局区(.bss段和.data段)根据实际情况修改2、栈顶地址根据实际情况修改/*FlashOSRoutines(AutomagicallyGenerated)*Copyright(c)2009-2015ARMLimited*/#include"flash_blob.h"//代码区flash_code[]使用JLINK/STLINK等放到RAM,一般是0x20000000staticconst......