首页 > 其他分享 >线程池实现范例

线程池实现范例

时间:2024-08-07 16:40:54浏览次数:17  
标签:范例 std task 实现 lock _. 线程 include

1 初始化线程池

    确定线程数量,并做好互斥访问

2 启动所有线程

   std::vector<std::thread*> threads_;

unique_lock<mutex> lock(mutex_);
for (int i = 0; i < thread_num_; i++)
{
auto th = new thread(&XThreadPool::Run, this);
threads_.push_back(th);
}

3 准备好任务处理基类和插入任务

///线程分配的任务类
class XTask
{
public:
// 执行具体的任务
virtual int Run() = 0;
};

存储任务的列表

std::list<XTask*> tasks_;

插入任务,通知线程池处理

unique_lock<mutex> lock(mutex_);
tasks_.push_back(task);
condition_.notify_one();

4 获取任务接口
通过条件变量阻塞等待任务

////////////////////////////////////////////////////////////
///获取任务
XTaskType XThreadPool::GetTask()
{
unique_lock<mutex> lock(mutex_);
if (tasks_.empty())
{
condition_.wait(lock);//阻塞等待通知
}
if (is_exit_)
return nullptr;
if (tasks_.empty())
{
return nullptr;
}
auto task = tasks_.front();
tasks_.pop_front();
return task;
}

5 执行任务线程入口函数

void XThreadPool::Run()
{
while(!IsExit())
{
//获取任务
auto task = GetTask();
if (!task)
continue;
try
{
task‐>Run();
}
catch (...)
{
 cerr << "XThreadPool::Run() exception" << endl;
  }
  }
}

完整代码如下:

xthread_pool.h

#pragma once
#include <thread>
#include <mutex>
#include <vector>
#include <list>
#include <functional>
#include <atomic>
class XTask
{
public:
    virtual int Run() = 0;
    std::function<bool()> is_exit = nullptr;
};

class XThreadPool
{
public:
    //////////////////////////////////////////////
    /// 初始化线程池
    /// @para num 线程数量
    void Init(int num);

    //////////////////////////////////////////////
    /// 启动所有线程,必须先调用Init
    void Start();

    //////////////////////////////////////////////
    /// 线程池退出
    void Stop();

    void AddTask(XTask* task);

    XTask* GetTask();

    //线程池是否退出
    bool is_exit() { return is_exit_; }

    int task_run_count() { return task_run_count_; }
private:
    //线程池线程的入口函数
    void Run() ;
    int thread_num_ = 0;//线程数量
    std::mutex mux_;
    std::vector<std::thread*> threads_;
    std::list<XTask*> tasks_;
    std::condition_variable cv_;
    bool is_exit_ = false; //线程池退出

    //正在运行的任务数量,线程安全
    std::atomic<int> task_run_count_ = {0};
};

xthread_pool.cpp

#include "xthread_pool.h"
#include <iostream>
using namespace std;

//////////////////////////////////////////////
/// 初始化线程池
/// @para num 线程数量
void XThreadPool::Init(int num)
{
    unique_lock<mutex> lock(mux_);
    this->thread_num_ = num;
    cout << "Thread pool Init " << num << endl;
}

//////////////////////////////////////////////
/// 启动所有线程,必须先调用Init
void XThreadPool::Start()
{
    unique_lock<mutex> lock(mux_);
    if (thread_num_ <= 0)
    {
        cerr << "Please Init XThreadPool" << endl;
        return;
    }
    if (!threads_.empty())
    {
        cerr << "Thread pool has start!" << endl;
        return;
    }
    //启动线程
    for (int i = 0; i < thread_num_; i++)
    {
        auto th = new thread(&XThreadPool::Run, this);
        threads_.push_back(th);
    }
}
//////////////////////////////////////////////
/// 线程池退出
void XThreadPool::Stop()
{
    is_exit_ = true;
    cv_.notify_all();
    for (auto& th : threads_)
    {
        th->join();
    }
    unique_lock<mutex> lock(mux_);
    threads_.clear();
}
//线程池线程的入口函数
void XThreadPool::Run()
{
    cout << "begin XThreadPool Run " << this_thread::get_id() << endl;
    while (!is_exit())
    {
        auto task = GetTask();
        if (!task)continue;
        ++task_run_count_;
        try
        {
            task->Run();
        }
        catch (...)
        {

        }
        --task_run_count_;
    }

    cout << "end XThreadPool Run " << this_thread::get_id() << endl;
}

void XThreadPool::AddTask(XTask* task)
{
    unique_lock<mutex> lock(mux_);
    tasks_.push_back(task);
    task->is_exit = [this] {return is_exit(); };

    lock.unlock();
    cv_.notify_one();
}

XTask* XThreadPool::GetTask()
{
    unique_lock<mutex> lock(mux_);
    if (tasks_.empty())
    {
        cv_.wait(lock);
    }
    if (is_exit())
        return nullptr;
    if (tasks_.empty())
        return nullptr;
    auto task = tasks_.front();
    tasks_.pop_front();
    return task;
}

main.cpp

#include "xthread_pool.h"
#include <iostream>
using namespace std;
class MyTask :public XTask
{
public:
    int Run()
    {
        cout << "================================================" << endl;
        cout << this_thread::get_id()<<" Mytask " << name << endl;
        cout << "================================================" << endl;
        for (int i = 0; i < 10; i++)
        {
            if (is_exit())break;
            cout << "." << flush;
            this_thread::sleep_for(500ms);
        }
        return 0;
    }
    std::string name = "";
};
int main(int argc, char* argv[])
{
  
    XThreadPool pool;
    pool.Init(16);
    pool.Start();

    MyTask task1;
    task1.name = "test name 001";
    pool.AddTask(&task1);

    MyTask task2;
    task2.name = "test name 002";
    pool.AddTask(&task2);
    this_thread::sleep_for(100ms);
    cout << "task run  count = " << pool.task_run_count() << endl;


    this_thread::sleep_for(1s);
    pool.Stop();
    cout << "task run  count = " << pool.task_run_count() << endl;

    getchar();
    return 0;
}

智能指针版本改进结果

实现如下:

xthread_pool.h

#pragma once
#include <thread>
#include <mutex>
#include <vector>
#include <list>
#include <functional>
#include <atomic>
#include <future>
class XTask
{
public:
    virtual int Run() = 0;
    std::function<bool()> is_exit = nullptr;
    auto GetReturn()
    {
        //阻塞等待 set_value
        return p_.get_future().get();
    }
    void SetValue(int v)
    {
        p_.set_value(v);
    }
private:
    //用来接收返回值
    std::promise<int> p_;
};

class XThreadPool
{
public:
    //////////////////////////////////////////////
    /// 初始化线程池
    /// @para num 线程数量
    void Init(int num);

    //////////////////////////////////////////////
    /// 启动所有线程,必须先调用Init
    void Start();

    //////////////////////////////////////////////
    /// 线程池退出
    void Stop();

    //void AddTask(XTask* task);
    void AddTask(std::shared_ptr<XTask> task);

    std::shared_ptr<XTask> GetTask();

    //线程池是否退出
    bool is_exit() { return is_exit_; }

    int task_run_count() { return task_run_count_; }
    ~XThreadPool() { Stop(); }
private:
    //线程池线程的入口函数
    void Run() ;
    int thread_num_ = 0;//线程数量
    std::mutex mux_;
    //std::vector<std::thread*> threads_;
    //线程列表 指针指针版本
    std::vector< std::shared_ptr<std::thread> > threads_;

    //std::list<XTask*> tasks_;
    std::list<std::shared_ptr<XTask> > tasks_;
    
    std::condition_variable cv_;
    bool is_exit_ = false; //线程池退出

    //正在运行的任务数量,线程安全
    std::atomic<int> task_run_count_ = {0};
};

 xthread_pool.cpp

#include "xthread_pool.h"
#include <iostream>
using namespace std;

//////////////////////////////////////////////
/// 初始化线程池
/// @para num 线程数量
void XThreadPool::Init(int num)
{
    unique_lock<mutex> lock(mux_);
    this->thread_num_ = num;
    cout << "Thread pool Init " << num << endl;
}

//////////////////////////////////////////////
/// 启动所有线程,必须先调用Init
void XThreadPool::Start()
{
    unique_lock<mutex> lock(mux_);
    if (thread_num_ <= 0)
    {
        cerr << "Please Init XThreadPool" << endl;
        return;
    }
    if (!threads_.empty())
    {
        cerr << "Thread pool has start!" << endl;
        return;
    }
    //启动线程
    for (int i = 0; i < thread_num_; i++)
    {
        //auto th = new thread(&XThreadPool::Run, this);
        auto th = make_shared<thread>(&XThreadPool::Run, this);
        threads_.push_back(th);
    }
}
//////////////////////////////////////////////
/// 线程池退出
void XThreadPool::Stop()
{
    is_exit_ = true;
    cv_.notify_all();
    for (auto& th : threads_)
    {
        th->join();
    }
    unique_lock<mutex> lock(mux_);
    threads_.clear();
}
//线程池线程的入口函数
void XThreadPool::Run()
{
    cout << "begin XThreadPool Run " << this_thread::get_id() << endl;
    while (!is_exit())
    {
        auto task = GetTask();
        if (!task)continue;
        ++task_run_count_;
        try
        {
            auto re = task->Run();
            task->SetValue(re);
        }
        catch (...)
        {

        }
        --task_run_count_;
    }

    cout << "end XThreadPool Run " << this_thread::get_id() << endl;
}

void XThreadPool::AddTask(std::shared_ptr<XTask> task)
{
    unique_lock<mutex> lock(mux_);
    tasks_.push_back(task);
    task->is_exit = [this] {return is_exit(); };
    lock.unlock();
    cv_.notify_one();
}

std::shared_ptr<XTask> XThreadPool::GetTask()
{
    unique_lock<mutex> lock(mux_);
    if (tasks_.empty())
    {
        cv_.wait(lock);
    }
    if (is_exit())
        return nullptr;
    if (tasks_.empty())
        return nullptr;
    auto task = tasks_.front();
    tasks_.pop_front();
    return task;
}

xvideo_task.h:

#pragma once
#include "xthread_pool.h"
class XVideoTask :public XTask
{
public:
    std::string in_path;
    std::string out_path;
    int width = 0;
    int height = 0;
private:
    int Run();
};

xvideo_task.cpp

#include "xvideo_task.h"
#include <sstream>
using namespace std;
int XVideoTask::Run()
{
    //ffmpeg -y -i test.mp4 -s 400x300 400.mp4 >log.txt 2>&1
    stringstream ss;
    ss << "ffmpeg.exe -y -i " << in_path<<" ";
    if (width > 0 && height > 0)
        ss << " -s " << width << "x" << height<<" ";
    ss << out_path;
    ss << " >" << this_thread::get_id() << ".txt 2>&1";
    return system(ss.str().c_str());
}

main.cpp

#include "xthread_pool.h"
#include "xvideo_task.h"
#include <iostream>
using namespace std;
/// 命令行视频转码工具
/// ffmpeg工具
/// 用户输入 视频源 输出视频尺寸
/// 在线程池中执行转码任务
/// 转码任务调用ffmpeg
/// ffmpeg -y -i test.mp4 -s 400x300 400.mp4 >log.txt 2>&1

int main(int argc, char* argv[])
{
    XThreadPool pool;
    pool.Init(16);
    pool.Start();
    this_thread::sleep_for(200ms);
    for (;;)
    {
        this_thread::sleep_for(200ms);
        cout << "\n====================================================================\n";
        auto task = make_shared<XVideoTask>();
        cout << "请输入命令(v e l):";
        string cmd;
        cin >> cmd;
        if (cmd == "e")
            break;
        else if (cmd == "l")
        {
            cout << "task run count = " << pool.task_run_count() << endl;
            continue;
        }
        cout << "视频源:";
        cin >> task->in_path;
        cout << "目标:";
        cin >> task->out_path;
        cout << "输出宽:";
        cin >> task->width;
        cout << "输出高:";
        cin >> task->height;
        pool.AddTask(task);
    }
    pool.Stop();
    /*
    auto vtask1 = make_shared<XVideoTask>();
    vtask1->in_path = "test.mp4";
    vtask1->out_path = "640.mp4";
    vtask1->width = 640;
    vtask1->height = 480;
    pool.AddTask(vtask1);
    
    vtask1->GetReturn();
    */
    return 0;
}

 

标签:范例,std,task,实现,lock,_.,线程,include
From: https://www.cnblogs.com/bwbfight/p/18347307

相关文章

  • vue中实现文字向上滚动效果
    <template><divclass="djs-box"><divclass="topBox"><h3>vue实现文字向上滚动效果</h3><div>大剑师兰特,还是大剑师兰特,gis-dajianshi</div></div><divcla......
  • matlab实现车牌识别系统
    在MATLAB中实现一个车牌识别系统通常涉及多个步骤,包括图像预处理、车牌定位、字符分割和字符识别。这里我将给出一个简化的流程和示例代码,帮助你开始这个项目。步骤1:图像预处理图像预处理通常包括灰度化、二值化、滤波等步骤,以去除噪声并增强车牌区域的特征。%读取图像......
  • Linux:线程同步之信号量
    信号量(1)What(什么是信号量)提供一种计数器的方式控制对共享资源的访问;当计数器大于0时,请求资源成功并计数器-1;当计数器小于0时,线程阻塞,等待其它线程执行signal(V操作)唤醒它(2)Why(信号量的作用)实现线程的同步与互斥:通过信号量的设计,可以实现对共享资源的串行访问实现线......
  • 基于springboot+MySQL校园社团信息管理系统的设计与实现-计算机毕设 附源码 02705
    springboot校园社团信息管理系统的设计与实现目 录摘要1绪论1.1研究背景1.2 研究意义1.3论文结构与章节安排2 校园社团信息管理系统系统分析2.1可行性分析2.2系统流程分析2.2.1数据增加流程2.2.2数据修改流程2.2.3数据删除流程2.3 系统......
  • Java实现字符串中字符出现次数统计
    在编程过程中,我们经常需要对字符串进行处理。今天,我将为大家分享一个Java示例,用于统计字符串中每个字符出现的次数。让我们一起来看看吧!在日常生活中,我们经常会遇到需要统计字符串中字符出现次数的场景。例如,统计一篇文章中各个字母的出现次数,以便进行词频分析。本文将带大家......
  • 激光点云去畸变_原理及实现
    激光点云去畸变:原理及实现机械式激光雷达产生畸变的原因Lidar扫描周期内(一般0.1s)自车有一定幅度的旋转(Rotation)和平移(Translation),因此不同时间点打出去的激光点束并不在严格统一的Lidar坐标系内,需要对同一帧Lidar转化在统一时间戳对应的Lidar坐标系上(一般转化到第......
  • freemarker实现动态行单元格合并
    原文链接:https://www.cnblogs.com/10158wsj/p/11211471.htmlhttps://blog.csdn.net/weixin_43667830/article/details/106936546项目需求:项目中有个需求,需要将一些数据库中的数据根据需求导出,生成一个word,研究了一些技术,其中包括POI、freemaker,对比了一下实现过程及技术难度没......
  • 泗博MODBUS TCP转PROFINET网关EPN-330实现焊机与西门子PLC的连接
    随着工业自动化水平的不断提高,各种设备之间的互联互通变得至关重要。然而,由于不同设备可能采用不同的通信协议,如何实现它们之间的无缝连接,成为了许多工程师和企业面临的难题。今天,我们就来分享一个上海泗博成功的案例,看看如何通过泗博自动化的MODBUSTCP转PROFINET网关EPN-330,实现......
  • 【数据结构与算法】删除循环队列中第k个元素的算法 C++实现(循环队列+模运算)
    数组a[MaxSize]用作一个循环队列,front指向循环队列中队头元素的前一个位置,rear指向队尾元素的位置。设计删除队列中第k个元素的算法。思路首先,判断kkk是否在有效范围内......
  • 【数据结构与算法】在循环队列中第k个元素之后插入元素的算法 C++实现(循环队列+模运算
    数组a[MaxSize]用作一个循环队列,front指向循环队列中队头元素的前一个位置,rear指向队尾元素的位置。设计在队列中第k个元素之后插入item的算法。思路首先,检查输入的位置k是否在合理的范围内,即1到queueSize(Q)(包含两端)。如果k在这个范围外,那么返回ERROR。然后,计......