首页 > 系统相关 >基于linux下c实现的简单版线程池

基于linux下c实现的简单版线程池

时间:2024-05-28 15:28:49浏览次数:24  
标签:基于 lock linux thr queue num 线程 pthread pool

#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <string>
#include<signal.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#define DEFAULT_TIME 10
#define DEFAULT_STEP 15
using namespace std;
int dp[20]={1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20};

/*线程工作任务*/
typedef struct 
{
        void *(*function)(void *);
        void *arg;
} thread_task;

/*线程池管理*/
struct pthread_pool 
{
        pthread_mutex_t lock;                //锁整个线程池
        pthread_mutex_t busylock;            //锁忙线程数量
        pthread_cond_t  queue_not_full;      //任务队列未满的信号
        pthread_cond_t  queue_not_empty;     //任务对列不为空的信号
 
        pthread_t *th;                       
        pthread_t adjustth;                  //维护线程,任务过多或者过少时调节线程池的线程总数
              
        int min_thr_num;                        
        int max_thr_num;
        int live_thr_num;
        int busy_thr_num;
        int wait_exit_num;


        thread_task *queue_task;             //任务队列
        int queue_front;
        int queue_rear;
        int queue_size;        
        int queue_maxsize;

        bool status = false;
};
void *process(void *);

void pthread_free(pthread_pool *pool)          //线程池资源的释放
{
        if(pool->queue_task) 
        {
           free(pool->queue_task);
        }
        if(pool->th) 
        {
           free(pool->th);
           pthread_mutex_lock(&(pool->lock));
           pthread_mutex_destroy(&(pool->lock));
           pthread_mutex_lock(&(pool->busylock));
           pthread_mutex_destroy(&(pool->busylock));
           pthread_cond_destroy(&(pool->queue_not_empty));
           pthread_cond_destroy(&(pool->queue_not_full));
        }
        free(pool);
        pool = NULL;
}

void destroy_pool(pthread_pool *pool)       //摧毁线程池
{
        if (pool == NULL) 
        {
            return ;
        }
        pool->status = false;
        pthread_join((pool->adjustth), NULL);
        
        for (int i = 0; i < pool->live_thr_num; i++) 
        {
            pthread_cond_broadcast(&(pool->queue_not_empty));
        }
        
        for (int i = 0; i < pool->live_thr_num; i++) 
        {
            pthread_join(pool->th[i], NULL);
        }
        pthread_free(pool);
}

void task_add(pthread_pool *pool, void *(*function)(void *arg), void *arg)   //线程池添加任务
{
        pthread_mutex_lock(&(pool->lock));
        while (pool->queue_size == pool->queue_maxsize && pool->status == true) 
        {
                pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));      //队列满时等待任务队列未满的信号也就是消费者取任务
        }

        if (pool->status == false) 
        {
               pthread_cond_broadcast(&pool->queue_not_empty);
               pthread_mutex_lock(&(pool->lock));
               return ;
        }


        if (pool->queue_task[pool->queue_rear].arg != NULL) 
        {
                pool->queue_task[pool->queue_rear].arg = NULL;
        }

        pool->queue_task[pool->queue_rear].function = function;
        pool->queue_task[pool->queue_rear].arg = arg;
        pool->queue_rear = (pool->queue_rear + 1) % pool->queue_maxsize;
        pool->queue_size++;
        pthread_cond_signal(&pool->queue_not_empty);
        pthread_mutex_unlock(&(pool->lock));
}

bool is_thread_alive(pthread_t tid)             //判断线程是否存活
{
        int kill_rc = pthread_kill(tid, 0);     //发送0号信号,测试是否存活
        if (kill_rc == ESRCH)                   //线程不存在
        { 
                return false;
        }
        return true;
}

void *adjust(void *arg)                         //独立线程管理线程池的线程个数
{
        struct pthread_pool *pool = (struct pthread_pool *)arg;
        while (pool->status==true) 
        {
             sleep(DEFAULT_TIME);

            pthread_mutex_lock(&(pool->lock));
            int live_thr_num = pool->live_thr_num;
            int queue_size = pool->queue_size;
            pthread_mutex_unlock(&(pool->lock));
            pthread_mutex_lock(&(pool->busylock));
            int busy_thr_num = pool->busy_thr_num;
            pthread_mutex_unlock(&(pool->busylock));

            if (queue_size > live_thr_num && live_thr_num < pool->max_thr_num) 
            {
                  cout<<"too many task"<<endl;
                  int add = 0;
            
            for (int i = 0; i < pool->max_thr_num && add < DEFAULT_STEP && pool->live_thr_num < pool->max_thr_num; i++) 
              {
                 if (pool->th[i] == 0 && !is_thread_alive(pool->th[i])) 
                 {
                    pthread_create(&(pool->th[i]), NULL, process, (void *)pool);
                    add++;
                    pool->live_thr_num++;
                 }

              }
            }

            if (busy_thr_num * 2 < live_thr_num && live_thr_num < pool->max_thr_num) 
                {
                        pthread_mutex_lock(&(pool->lock));
                        pool->wait_exit_num = DEFAULT_STEP;
                        pthread_mutex_unlock(&(pool->lock));
                        for (int i = 0; i < DEFAULT_STEP; i++) 
                        {
                           pthread_cond_signal(&(pool->queue_not_empty));
                        }
                }
        }
        return NULL;
}

void *process(void *arg)                     //工作线程
{
        struct pthread_pool *pool = (struct pthread_pool *)arg;
        thread_task task;
        while (true) 
        {
                pthread_mutex_lock(&(pool->lock));
                while (pool->queue_size == 0 && pool->status == true) 
                {
                        cout << pthread_self() << " thread was waiting" << endl;
                        pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));

                        if (pool->wait_exit_num > 0) 
                        {
                                pool->wait_exit_num--;
                                if (pool->live_thr_num > pool->min_thr_num) 
                                {
                                        pool->live_thr_num--;
                                        pthread_mutex_unlock(&(pool->lock));
                                        pthread_exit(NULL);
                                }
                        }
                }

                if (pool->status == false) 
                {
                        pthread_mutex_unlock(&(pool->lock));
                        pthread_exit(NULL);
                }
        //        cout<<"yes"<<endl;
                task.function = pool->queue_task[pool->queue_front].function;
                task.arg = pool->queue_task[pool->queue_front].arg;
                pool->queue_front = (pool->queue_front + 1) % pool->queue_maxsize;
                pool->queue_size--;

                pthread_cond_broadcast(&pool->queue_not_full);
                pthread_mutex_unlock(&(pool->lock));
                //pthread_cond_broadcast(&(pool->queue_not_full));

                pthread_mutex_lock(&(pool->busylock));
                pool->busy_thr_num++;
                pthread_mutex_unlock(&(pool->busylock));

                (*(task.function))(task.arg);

                pthread_mutex_lock(&(pool->busylock));
                pool->busy_thr_num--;
                pthread_mutex_unlock(&(pool->busylock));
        }
        pthread_exit(NULL);
}

pthread_pool *pthreadpool_create(int min_thr_num, int max_thr_num, int queue_maxsize)  //初始化线程池
{
        pthread_pool *pool = NULL;
        do {
                pool = new pthread_pool;
                if (pool == NULL) 
                {
                        break;
                }
                pool->min_thr_num = min_thr_num;
                pool->max_thr_num = max_thr_num;
                pool->queue_maxsize = queue_maxsize;
                pool->wait_exit_num = 0;
                pool->live_thr_num = min_thr_num;
                pool->busy_thr_num = 0;
                pool->queue_front = 0;
                pool->queue_rear = 0;
                pool->queue_size = 0;
                pool->status = true;

                pool->th = new pthread_t[max_thr_num];
                if (pool->th == NULL) 
                {
                        break;
                }
                memset(pool->th, 0, sizeof(pthread_t)*max_thr_num);

                pool->queue_task = new thread_task[queue_maxsize];
                if (pool->queue_task == NULL) 
                {
                        break;
                }
                memset(pool->queue_task, 0, sizeof(thread_task)*pool->queue_maxsize);

                pthread_mutex_init(&(pool->lock), NULL);
                pthread_mutex_init(&(pool->busylock), NULL);
                pthread_cond_init(&(pool->queue_not_empty), NULL);
                pthread_cond_init(&(pool->queue_not_full), NULL);
                for (int i = 0; i < min_thr_num; i++) 
                {
                        pthread_create(&pool->th[i], NULL, process, (void *)pool);
                        cout << pool->th[i] << " thread was created" << endl;
                }
                pthread_create(&pool->adjustth, NULL, adjust, (void *)pool);
                return pool;
        } while (0);
        return NULL;
}

void *dowork(void *arg)                            //模拟任务
{
        int *a = (int* )arg;
        sleep(*a);
        cout<<"**************************"<<"第"<<*a<<"次执行任务"<<endl;
        for (int i =1; i <=*a; i++) {
                cout << "hello world" << endl;
        }
        cout<<"*****************************任务结束"<<*a<<endl<<endl;
        return NULL;
}

int main() 
{
        pthread_pool *pool = pthreadpool_create(10, 100, 100);
        for(int i=0;i<=19;i++)
        {
              task_add(pool, dowork, (void *)&dp[i]);
        }
        sleep(21);
        destroy_pool(pool);   
        system ("pause");     
        return 0;
}

标签:基于,lock,linux,thr,queue,num,线程,pthread,pool
From: https://blog.csdn.net/weixin_72492465/article/details/139267652

相关文章

  • MySQL社区版本没有自带审计功能,所以基于MySQL8.0.33版本容器运行的MySQL自行安装插件
     因为MySQL社区版本没有自带审计功能,所以基于MySQL8.0.33版本容器运行的MySQL自行安装插件1.查看Mysql是否安装过audit_log插件SELECT*FROMinformation_schema.PLUGINSWHEREPLUGIN_NAMELIKE'%audit%'; 2.下载PerconaServerforMySQL,地址为 Installwithbinar......
  • SpringBoot系列---【线程池优雅停机,避免消费数据丢数的问题】
    1.问题项目中通过kafka来对接上游,在项目中写一个listener监听topTopic队列,for循环消费records,在for循环中处理成存储到es的对象,一次拉50条,使用自定义线程池esThreadPool异步推送到es中,但是每次停机就会丢数据,例:kafka消费了1000条,但是往es中存储比较慢,优雅停机的时候,esThreadPool......
  • Docker安装(Linux)
    简单方法:docker.io        Debian团队维护和打包aptinstalldocker.ioaptinstalldocker-compose但是版本会较低复杂方法:docker.ce    Docker官方团队维护和打包安装一些必要的系统工具sudoaptupdatesudoapt-yinstallapt-transport-https......
  • 基于 RNNs 对 IMDB 电影评论进行情感分类
    前言系列专栏:【深度学习:算法项目实战】✨︎涉及医疗健康、财经金融、商业零售、食品饮料、运动健身、交通运输、环境科学、社交媒体以及文本和图像处理等诸多领域,讨论了各种复杂的深度神经网络思想,如卷积神经网络、循环神经网络、生成对抗网络、门控循环单元、长短期记忆......
  • 基于java中的springboot框架实现医药管理系统项目演示【内附项目源码+论文说明】
    基于java中的springboot框架实现医药管理系统项目演示【内附项目源码+LW说明】摘要计算机网络发展到现在已经好几十年了,在理论上面已经有了很丰富的基础,并且在现实生活中也到处都在使用,可以说,经过几十年的发展,互联网技术已经把地域信息的隔阂给消除了,让整个世界都可以即......
  • 基于java中的springboot框架实现秒杀系统项目演示【内附项目源码+论文说明】
    基于java中的springboot框架实现秒杀系统项目演示【内附项目源码+LW说明】摘要社会发展日新月异,用计算机应用实现数据管理功能已经算是很完善的了,但是随着移动互联网的到来,处理信息不再受制于地理位置的限制,处理信息及时高效,备受人们的喜爱。本次开发一套基于SpringBoo......
  • Linux使用脚本一键安装Oracle11g
    最近一直在搞服务器,记录下使用脚本安装Oracle数据库,仅供学习使用链接:https://pan.baidu.com/s/1Rrx5SeA-t8hKZW2ZqlqfZg 提取码:lss11.安装CentOS7虚拟机Linux2.修改IP(自动分配IP或者配置静态IP)cd/etc/sysconfig/network-scripts/ls查看文件(后续要使用ens33)修改ifvf......
  • 2024-Linux
    单选题 一.单选题(共64题,100分)1. (单选题)如果umask设置为022,新创建的文件的缺省权限是什么?A.\----w--w-B.\-w--w----C.\r-xr-x---D.\rw-r--r--我的答案: D:\rw-r--r--;正确答案: D:\rw-r--r--; 1.5分2. (单选题)如果要列出一个目录下的所有......
  • Linux网站访问控制
    环境:操作系统:centos7(linux)试验系统:win7(client)目的:实现在客户机win7访问网站www.jd.com访问到的是centos的虚拟网站GW开启路由转发以及网络地址转换模式GW开启路由转发grep-v"#"/etc/sysctl.confGW开启网络地址转换模式touchstrat.sh创建strat.sh文件......
  • java+Angular+Nginx微服务架构+VUE 基于SaaS云部署、云计算的区域医院云HIS系统源码
    java+Angular+Nginx微服务架构+VUE基于SaaS云部署、云计算的区域医院云HIS系统源码HIS系统:可以根据医院规模、个性流程定制个性化程序;以临床工作为核心,方便医生的临床医疗行为,提高医疗服务质量,能提供临床专科数据分析系统,可用于医疗评估、生物医学研究、教育和医疗保健管理......