首页 > 系统相关 >Linux线程池的创建(超详细解析)

Linux线程池的创建(超详细解析)

时间:2024-03-13 21:58:31浏览次数:33  
标签:task void Linux thrPool 线程 pthread 解析 pool

线程池:

若干个线程组合在一起形成线程池;

为什么需要线程池:

多线程版本服务器一个客户端就需要创建一个线程,如果客户端太多,明显不太合适;

创建思路:我们需要一个线程池结构体,然后这个结构体里面包含任务池,这个线程池结构体是全局变量,需要使用互斥锁,当子线程执行回调函数时,把该线程池作为参数传入,然后子线程可以从线程池结构体里面的任务池取出主线程创建的任务。我们也要使用条件变量,当任务池为空时,子线程阻塞等待,当任务池满时,主线程阻塞等待。

这是我们创建的线程池结构体:

typedef struct _poolTask
{
	int tasknum;//该任务的任务编号
	void *arg;//任务回调函数的参数
	void (*task_func)(void *arg);//任务回调函数
}PoolTask;
typedef struct _ThreadPool
{
	int max_job_num;//最大任务量
	int job_num;//实际任务量
	PoolTask *tasks;//任务池(任务数组)
	int job_push;//任务进入任务池的位置
	int job_pop;//任务出任务池的位置

	int thr_num;//子线程的数量
	pthread_t *threads;//线程池(子线程数组)
	int shutdown;//(是否销毁线程池)
	pthread_mutex_t pool_lock;//互斥锁
	pthread_cond_t empty_task;//条件变量
	pthread_cond_t not_empty_task;
}ThreadPool;

我们可以先创建一个threadpool.h头文件:

#ifndef _THREADPOO_H
#define _THREADPOO_H
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<sys/types.h>
#include<pthread.h>
typedef struct _poolTask
{
	int tasknum;
	void *arg;
	void (*task_func)(void *arg);
}PoolTask;
typedef struct _ThreadPool
{
	int max_job_num;
	int job_num;
	PoolTask *tasks;
	int job_push;
	int job_pop;

	int thr_num;
	pthread_t *threads;
	int shutdown;
	pthread_mutex_t pool_lock;
	pthread_cond_t empty_task;
	pthread_cond_t not_empty_task;
}ThreadPool;

void create_threadpool(int thrnum,int maxtasknum);
void destroy_threadpool(ThreadPool *pool);
void addtask(ThreadPool *pool);
void taskRun(void *arg);

#endif

这四个函数我们要在threadpool.c中实现

void create_threadpool(int thrnum,int maxtasknum);//创建线程池
void destroy_threadpool(ThreadPool *pool);//销毁线程池
void addtask(ThreadPool *pool);//往线程池中的任务池中添加任务
void taskRun(void *arg);//任务回调函数

 我们现在pthread_pool.c定义全局变量:

ThreadPool *thrPool=NULL;
int beginnum=1000;

void create_threadpool(int thrnum,int maxtasknum);//创建线程池的实现:

void create_threadpool(int thrnum,int maxtasknum)
{
	printf("begin create\n");
	thrPool =(ThreadPool*)malloc(sizeof(ThreadPool));//向堆区申请一个ThreadPool大小的内存空间

	thrPool->thr_num=thrnum;//线程数量
	thrPool->max_job_num=maxtasknum;//最大任务量
	thrPool->shutdown=0;//为1是销毁线程池
	thrPool->job_push=0;//入队位置初始为0
	thrPool->job_pop=0;//出队位置初始为0
	thrPool->job_num=0;//实际任务量为0

	thrPool->tasks=(PoolTask*)malloc(sizeof(PoolTask)*maxtasknum);//申请任务池(任务数组)的空间

	pthread_mutex_init(&thrPool->pool_lock,NULL);//初始化互斥锁,条件变量
	pthread_cond_init(&thrPool->empty_task,NULL);
	pthread_cond_init(&thrPool->not_empty_task,NULL);

	int i=0;
	thrPool->threads=(pthread_t*)malloc(sizeof(pthread_t)*thrnum);//申请thrum个线程的空间

	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);//设置子线程为分离属性
	for(i=0;i<thrnum;i++)
	{
		pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//循环创建子线程,子线程执行thrRun回调函数
	}
	printf("end create\n");
}

void addtask(ThreadPool *pool);//往线程池中的任务池中添加任务的实现:

void addtask(ThreadPool *pool)
{
	pthread_mutex_lock(&pool->pool_lock);//加锁
	while(pool->max_job_num<=pool->job_num)//任务池满了
	{
		pthread_cond_wait(&pool->empty_task,&pool->pool_lock);//阻塞等待并解锁
	}

	int taskpos=(pool->job_push++)%pool->max_job_num;//任务进入任务池的位置,循环进入
	pool->tasks[taskpos].tasknum=beginnum++;//任务编号加一
	pool->tasks[taskpos].arg=(void *)&pool->tasks[taskpos];//把自己的地址传给任务自己的参数
	pool->tasks[taskpos].task_func=taskRun;//任务回调函数
	pool->job_num++;//实际任务数加一

	pthread_mutex_unlock(&pool->pool_lock);//解锁
	pthread_cond_signal(&pool->not_empty_task);//通知子线程阻塞的条件变量
}

void destroy_threadpool(ThreadPool *pool);//销毁线程池的实现:

void destroy_threadpool(ThreadPool *pool)
{
	pool->shutdown=1;//改为一,表示摧毁线程池
	pthread_cond_broadcast(&pool->not_empty_task);//通知子线程的条件变量不再阻塞

	pthread_cond_destroy(&pool->not_empty_task);//销毁锁,条件变量
	pthread_cond_destroy(&pool->empty_task);
	pthread_mutex_destroy(&pool->pool_lock);

	free(pool->tasks);//释放
	free(pool->threads);
	free(pool);
}

子线程回调函数thrRun的实现:

void *thrRun(void *arg)
{
	ThreadPool *pool=(ThreadPool*)arg;//强转参数的类型
	int taskpos=0;
	PoolTask *task=(PoolTask*)malloc(sizeof(PoolTask));//申请一个任务的空间
	while(1)//一直死循环,子线程不退化出
	{
		pthread_mutex_lock(&thrPool->pool_lock);//加锁

		while(thrPool->job_num<=0&&!thrPool->shutdown)//用while循环是三个子线程都阻塞,获得信号时只有一个线程获得锁,其他线程尝试加锁,又会阻塞,当获得锁的线程执行完任务后,其他线程继续while循环。
		{
			pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);
		}
		if(thrPool->job_num)//实际任务数量
		{
			taskpos=(thrPool->job_pop++)%thrPool->max_job_num;//任务出任务池的位置
			memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));//把任务池的任务的复制一份出来,防止主线程同时对该任务的值改变
			task=task->arg;//改变为之前任务池任务的地址
			thrPool->job_num--;//实际任务数量-1
			pthread_cond_signal(&thrPool->empty_task);//向主线程发送信号
		}
		if(thrPool->shutdown)//销毁线程池
		{
			pthread_mutex_unlock(&thrPool->pool_lock);//解锁
			free(task);//释放task的内存
			pthread_exit(NULL);//退出
		}
		pthread_mutex_unlock(&thrPool->pool_lock);//正常解锁
		task->task_func(task->arg);//执行任务回调函数
	}
}

void taskRun(void *arg);//任务回调函数的实现

 

void taskRun(void *arg)
{
	PoolTask*task=(PoolTask*)arg;
	printf("task:numer==[%d]\n",task->tasknum);//打印任务的编号
}

完整的pthreadpool.c

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<sys/types.h>
#include<pthread.h>
#include"threadpool.h"
ThreadPool *thrPool=NULL;
int beginnum=1000;
void taskRun(void *arg)
{
	PoolTask*task=(PoolTask*)arg;
	printf("task:numer==[%d]\n",task->tasknum);
}
void *thrRun(void *arg)
{
	ThreadPool *pool=(ThreadPool*)arg;
	int taskpos=0;
	PoolTask *task=(PoolTask*)malloc(sizeof(PoolTask));
	while(1)
	{
		pthread_mutex_lock(&thrPool->pool_lock);

		while(thrPool->job_num<=0&&!thrPool->shutdown)
		{
			pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);
		}
		if(thrPool->job_num)
		{
			taskpos=(thrPool->job_pop++)%thrPool->max_job_num;
			memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));
			task=task->arg;
			thrPool->job_num--;
			pthread_cond_signal(&thrPool->empty_task);
		}
		if(thrPool->shutdown)
		{
			pthread_mutex_unlock(&thrPool->pool_lock);
			free(task);
			pthread_exit(NULL);
		}
		pthread_mutex_unlock(&thrPool->pool_lock);
		task->task_func(task->arg);
	}
}
void create_threadpool(int thrnum,int maxtasknum)
{
	printf("begin create\n");
	thrPool =(ThreadPool*)malloc(sizeof(ThreadPool));

	thrPool->thr_num=thrnum;
	thrPool->max_job_num=maxtasknum;
	thrPool->shutdown=0;
	thrPool->job_push=0;
	thrPool->job_pop=0;
	thrPool->job_num=0;

	thrPool->tasks=(PoolTask*)malloc(sizeof(PoolTask)*maxtasknum);

	pthread_mutex_init(&thrPool->pool_lock,NULL);
	pthread_cond_init(&thrPool->empty_task,NULL);
	pthread_cond_init(&thrPool->not_empty_task,NULL);

	int i=0;
	thrPool->threads=(pthread_t*)malloc(sizeof(pthread_t)*thrnum);

	pthread_attr_t attr;
	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
	for(i=0;i<thrnum;i++)
	{
		pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);
	}
	printf("end create\n");
}
void addtask(ThreadPool *pool)
{
	pthread_mutex_lock(&pool->pool_lock);
	while(pool->max_job_num<=pool->job_num)
	{
		pthread_cond_wait(&pool->empty_task,&pool->pool_lock);
	}

	int taskpos=(pool->job_push++)%pool->max_job_num;
	pool->tasks[taskpos].tasknum=beginnum++;
	pool->tasks[taskpos].arg=(void *)&pool->tasks[taskpos];
	pool->tasks[taskpos].task_func=taskRun;
	pool->job_num++;

	pthread_mutex_unlock(&pool->pool_lock);
	pthread_cond_signal(&pool->not_empty_task);
}
void destroy_threadpool(ThreadPool *pool)
{
	pool->shutdown=1;
	pthread_cond_broadcast(&pool->not_empty_task);

	pthread_cond_destroy(&pool->not_empty_task);
	pthread_cond_destroy(&pool->empty_task);
	pthread_mutex_destroy(&pool->pool_lock);

	free(pool->tasks);
	free(pool->threads);
	free(pool);
}
int main()
{
	create_threadpool(3,20);//创建3个子线程,最大任务量为20
	int i=0;
	for(i=0;i<50;i++)//循环添加50个任务
	{
		addtask(thrPool);
	}
	sleep(20);
	destroy_threadpool(thrPool);//销毁线程池
	return 0;
}

结果:

标签:task,void,Linux,thrPool,线程,pthread,解析,pool
From: https://blog.csdn.net/luosuss/article/details/136654185

相关文章

  • Linux系统架构----Nginx的服务优化
    Linux系统架构----Nginx的服务优化一.隐藏版本号在生产环境中,需要隐藏Nginx的版本号,以免泄露Nginx的版本,使得攻击者不能针对特定版本进行攻击查看Nginx的版本有两种方法使用fiddler工具抓取数据包,查看Nginx版本在Centos7上使用使用命令curl-I查看隐藏Nginx版本......
  • 信息学奥赛一本通题目解析:2086:【22CSPJ普及组】乘方(pow)
    2086:【22CSPJ普及组】乘方(pow)题目描述小文同学刚刚接触了信息学竞赛,有一天她遇到了这样一个题:给定正整数aaa和b......
  • JUC源码讲解:逐步解析 join()
    JUC源码讲解:逐步解析join()问题抛出join()在源码中其实是使用了wait()方法的,因此,wait()的特性join()都能继承过来,我们知道wait()有什么特性呢?wait()的特性:会释放锁对中断异常敏感会释放CPU时间片我也对wait()做了讲解,想了解可移步https://www.cnblogs.......
  • linux:services服务器配置
    1.环境准备。配置selinux和防火墙vim/etc/selinux/configSELINUX=permissiveyum-yremovefirewalldip地址基础[root@server~]#ipaddressshow[root@server~]#ipas临时添加IP地址[root@server~]#ipaddressadd192.168.10.1/24deveth......
  • CASA模型原理深度解析:揭示生态系统净初级生产力的奥秘
    植被,作为陆地生态系统的重要基石,对维护生态环境功能具有不可替代的作用。其中,植被净初级生产力(NPP)是衡量植被生态系统健康与功能的关键指标。它反映了单位面积上绿色植被通过光合作用生产的有机质总量在扣除自养呼吸后的剩余部分,不仅直接体现了生态系统在自然条件下的生产能......
  • 【华为OD机试真题 Python】人气最高的店铺|解题思路、代码解析
    文章目录题目描述输入输出示例1输入输出说明示例2输入输出说明解题思路实现代码题目描述某购物城有m个商铺,现决定举办一场活动选出人气最高店铺。活动共有n位市民参与,每位市民只能投一票,但1号店铺如果给该市民发放q元的购物补贴,该市民会改......
  • Java多线程&并发篇2024
    目录Java全技术栈面试题合集地址Java多线程&并发篇1.volatile能使得一个非原子操作变成原子操作吗?2.volatile修饰符的有过什么实践?3.volatile类型变量提供什么保证?4.Java中怎么获取一份线程dump文件?5.什么是线程局部变量?6.Java中sleep方法和wait方法的区别?7.......
  • 3.1 RK3399项目开发实录-Linux开发,编译 Linux 固件(物联技术666)
    通过百度网盘分享的文件:嵌入式物联网单片…链接:https://pan.baidu.com/s/1Zi9hj41p_dSskPOhIUnu9Q?pwd=8qo1提取码:8qo1复制这段内容打开「百度网盘APP即可获取」1.编译Linux固件为了方便用户的使用与开发,官方提供了Linux开发的整套SDK,本章详细的说明SDK的具......
  • linux(centos7)通过ckman安装clickhouse并设置自启动
    软件所需安装包:链接:https://pan.baidu.com/s/1MvvS-UoZgn-c0H8pPAavEg?pwd=li9f提取码:li9f--来自百度网盘超级会员V5的分享安装ckman1.使用rpm的方式安装:将rpm包放到服务器,执行命令rpm-ivhckman-2.2.3.x86_64.rpm2.启动:systemctlstartckman3.默认来说ckman是配置了......
  • JMeter接口性能压测之阶梯加压线程组(Stepping Thread Group)
    一、前言1、阶梯式场景(负载测试):该场景主要应用在负载测试只里面,通过设定一定的并发线程数,给定加压规则,遵循“缓起步,快结束”的原则,不断地增加并发用户来找到系统的性能瓶颈,进而有针对性的进行各方面的系统优化。2、Stepping Thread Group的作用减少服务器的瞬时压力,......