首页 > 其他分享 >线程池原理及c语言实现线程池

线程池原理及c语言实现线程池

时间:2024-06-10 15:55:56浏览次数:16  
标签:task threads 语言 thread 线程 pthread 原理 pool

线程池

线程池是一种多线程处理机制,其主要目的是提高系统资源利用率、降低系统资源消耗,并通过控制并发线程数量来优化性能。以下是关于线程池的详细解释:

  1. 定义:
    • 线程池是一种线程使用模式,它维护着一组线程,这些线程等待监督管理者分配可并发执行的任务。
    • 通过将任务添加到队列中,并在线程创建后自动启动这些任务,线程池能够避免在处理短时间任务时频繁创建与销毁线程的代价。
  2. 主要特点:
    • 线程复用:通过重用已存在的线程,降低了线程创建和销毁造成的系统资源消耗。
    • 控制最大并发数:线程池能够控制同时运行的线程数量,避免了因线程过多导致的系统资源过度消耗和性能下降。
    • 管理线程:线程池统一分配、管理和调优线程资源,提高了资源使用率。
  3. 优势:
    • 降低系统资源消耗:通过重用线程,减少了线程的创建和销毁开销。
    • 提高系统响应速度:当有任务到达时,无需等待新线程的创建,可以直接执行。
    • 方便线程并发数的管控:有效防止了因线程无限制创建而导致的系统资源阻塞或内存不足等问题。
    • 更强大的功能:线程池提供了定时、定期以及可控线程数等功能的线程池,方便使用。
  4. 工作原理:
    • 线程池首先将任务放入队列中。
    • 当线程池中的线程空闲时,它们会从队列中取出任务并执行。
    • 如果线程数量超过了最大数量,超出数量的线程会排队等候,等其它线程执行完毕后再从队列中取出任务来执行。

完整代码示例

thread_pool.c

#include "thread_pool.h"
/*******************************************************************
*
*	函数名称:	  handler
*	函数功能:   用作一个线程取消或退出的清理处理程序(cleanup handler)。当线程被取消或调用 pthread_exit 时,这个处理程序会被自动调用
* 	函数参数:
*  			@a :void *arg  指向任意类型的指针参数
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
void handler(void *arg)
{
	printf("[%u] is ended.\n",
		(unsigned)pthread_self());

	pthread_mutex_unlock((pthread_mutex_t *)arg);//互斥锁解锁
}

/*******************************************************************
*
*	函数名称:	  routine
*	函数功能:   线程池中工作线程的例程(routine),用于从任务队列中取出并执行任务
* 	函数参数:
*  			@a :void *arg  指向任意类型的指针,用于向要执行的任务函数传递参数
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
//线程要执行的任务
void *routine(void *arg)
{
	//调试
	#ifdef DEBUG
	printf("[%u] is started.\n",
		(unsigned)pthread_self());
	#endif

	//把需要传递给线程任务的参数进行备份
	thread_pool *pool = (thread_pool *)arg;
	//指向 task 结构体的指针 p
	struct task *p;

	while(1)
	{
		/*
		** push a cleanup functon handler(), make sure that
		** the calling thread will release the mutex properly
		** even if it is cancelled during holding the mutex.
		**
		** NOTE:
		** pthread_cleanup_push() is a macro which includes a
		** loop in it, so if the specified field of codes that 
		** paired within pthread_cleanup_push() and pthread_
		** cleanup_pop() use 'break' may NOT break out of the
		** truely loop but break out of these two macros.
		** see line 61 below.
		*/
		//================================================//
		//注册一个清理函数,如果线程在执行过程中被取消,则上面的handler 函数将被调用,确保 pool->lock 互斥锁被正确释放
		pthread_cleanup_push(handler, (void *)&pool->lock);
		//线程首先尝试获取互斥锁
		pthread_mutex_lock(&pool->lock);
		//================================================//

		// 1, no task, and is NOT shutting down, then wait
		//如果线程池没有等待任务并且线程池没有被销毁,在 pool->cond 条件变量上等待
		while(pool->waiting_tasks == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->cond, &pool->lock);
		}

		// 2, no task, and is shutting down, then exit
		//如果线程池没有等待任务并且线程池被销毁了,则解锁线程退出
		if(pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);
			pthread_exit(NULL); // CANNOT use 'break';
		}

		// 3, have some task, then consume it
		//线程将从任务队列的头部取出一个任务
		p = pool->task_list->next;
		pool->task_list->next = p->next;
		//将线程等待的任务减一
		pool->waiting_tasks--;

		//================================================//
		pthread_mutex_unlock(&pool->lock);//互斥锁解锁
		//从线程清理处理程序栈中弹出顶级清理处理程序的函数
		pthread_cleanup_pop(0);
		//================================================//
		//禁用线程的取消状态,确保线程在执行接下来的代码块时不会被取消
		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
		//调用了某个函数指针 do_task,并传入了另一个指针 arg 作为参数
		(p->do_task)(p->arg);//执行线程的主要任务或工作
		//重新启用了线程的取消状态,确保线程在之后可以像之前一样被取消
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
		//完成任务释放p所占的内存
		free(p);
	}
	//线程退出,当然程序不会执行到此处
	pthread_exit(NULL);
}

/*******************************************************************
*
*	函数名称:	  init_pool
*	函数功能:   根据提供的参数来初始化一个线程池,并创建相应的线程数量,在初始化完成后,线程池就可以用来执行提交给它的任务了。
* 	函数参数:
*  			@a :thread_pool *pool 指向thread_pool结构体的指针 
*  			@b :unsigned int threads_number  要在线程池中创建的线程数量  
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
	//初始化互斥锁
	pthread_mutex_init(&pool->lock, NULL);

	//初始化条件量
	pthread_cond_init(&pool->cond, NULL);

	//销毁标志 
	pool->shutdown = false; //不销毁

	//给链表的节点申请堆内存
	pool->task_list = malloc(sizeof(struct task)); 

	//申请堆内存,用于存储创建出来的线程的ID
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);

	//错误处理,对malloc进行错误处理
	if(pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	//对任务链表中的节点的指针域进行初始化
	pool->task_list->next = NULL;

	//设置线程池中线程数量的最大值
	pool->max_waiting_tasks = MAX_WAITING_TASKS;
	
	//设置等待线程处理的任务的数量为0,说明现在没有任务
	pool->waiting_tasks = 0;

	//设置线程池中活跃的线程的数量
	pool->active_threads = threads_number;

	int i;

	//循环创建活跃线程
	for(i=0; i<pool->active_threads; i++)
	{
		//创建线程  把线程的ID存储在申请的堆内存
		if(pthread_create(&((pool->tids)[i]), NULL,
					routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}

		//用于调试
		#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	return true;
}


/*******************************************************************
*
*	函数名称:	  add_task
*	函数功能:     函数的主要作用是将一个任务添加到线程池的任务队列中,以便线程池中的线程可以执行它
* 	函数参数:
*  			@a :thread_pool *pool 指向thread_pool结构体的指针 
*  			@b :void *(*do_task)(void *arg) 指向一个接受一个void *参数并返回一个void *的函数。这个函数通常代表要在线程池中执行的任务
*			@c :void *arg     指向任意类型的指针,用于向任务函数do_task传递参数
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
//先线程池的任务链表中添加任务
bool add_task(thread_pool *pool,
	      void *(*do_task)(void *arg), void *arg)
{
	//给任务链表节点申请内存
	struct task *new_task = malloc(sizeof(struct task));
	if(new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	new_task->do_task = do_task;    //将新的 do_task 成员赋值给 new_task 结构体中
	new_task->arg = arg;			//将任务参数添加进来
	new_task->next = NULL;			//指针域设置为NULL

	//============ LOCK =============//
	pthread_mutex_lock(&pool->lock); //互斥锁上锁
	//===============================//

	//说明要处理的任务的数量大于能处理的任务数量
	//当要处理的任务大于能处理的最大任务数量时,互斥锁解锁,释放新任务,增加无效
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)
	{
		pthread_mutex_unlock(&pool->lock);

		fprintf(stderr, "too many tasks.\n");
		free(new_task);

		return false;
	}
	
	struct task *tmp = pool->task_list; //将任务添加到任务队列中

	//遍历链表,找到单向链表的尾节点
	while(tmp->next != NULL)
		tmp = tmp->next;

	//把新的要处理的任务插入到链表的尾部  尾插
	tmp->next = new_task;

	//要处理的任务的数量+1
	pool->waiting_tasks++;

	//=========== UNLOCK ============//
	pthread_mutex_unlock(&pool->lock);//互斥锁解锁
	//===============================//

	//用于调试
	#ifdef DEBUG
	printf("[%u][%s] ==> a new task has been added.\n",
		(unsigned)pthread_self(), __FUNCTION__);
	#endif

	//唤醒第一个处于阻塞队列中的线程
	pthread_cond_signal(&pool->cond);
	return true;
}

/*******************************************************************
*
*	函数名称:	   add_thread
*	函数功能:     向线程池(thread_pool)中添加一定数量的新线程
* 	函数参数:
*  			@a :thread_pool *pool 指向thread_pool结构体的指针 
*  			@b :unsigned additional_threads 无符号整数,表示要添加到线程池中的新线程的数量
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
//向线程池加入新线程
int add_thread(thread_pool *pool, unsigned additional_threads)
{
	//判断需要添加的新线程的数量是否为0
	if(additional_threads == 0)
		return 0;

	//计算线程池中总线程的数量
	//计算线程池中总线程数,即当前活跃线程数加上要添加的新线程数
	unsigned total_threads =
			pool->active_threads + additional_threads;

	int i, actual_increment = 0;//将实际创建的线程数初始化为0

	//循环从当前的活跃线程数开始,直到达到要创建的总线程数或达到最大活跃线程数
	for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++)
	{
		//创建新线程
		//使用 pthread_create 函数创建新线程。每个新线程执行 routine 函数,并将 pool 作为参数传递。如果线程创建失败,则打印错误信息
		if(pthread_create(&((pool->tids)[i]),
					NULL, routine, (void *)pool) != 0)
		{
			perror("add threads error");

			// no threads has been created, return fail
			//如果实际创建的线程数为了零,则说明循环创建线程失败返回-1,退出循环
			if(actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++; //创建成功一次,实际线程数加一

		#ifdef DEBUG//用于打印调试
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	//记录此时线程池中活跃线程的总数
	pool->active_threads += actual_increment;
	return actual_increment; //返回线程池中活跃线程的总数
}


/*******************************************************************
*
*	函数名称:	   remove_thread
*	函数功能:     将线程池(thread_pool)中删除一定数量的线程
* 	函数参数:
*  			@a :thread_pool *pool 指向thread_pool结构体的指针 
*  			@b :unsigned int removing_threads 无符号整数,表示要从线程池中删除的线程数量
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	//初步判断,如果需要移除的线程数位0,则直接返回当前活动线程的数量 
	if(removing_threads == 0)
		return pool->active_threads;

	//剩余的线程数等于当前活跃的线程数减去需移除的线程数
	int remaining_threads = pool->active_threads - removing_threads;
	//对当前剩余的进程数进行判断,如果小于或者等于零都将剩余进程数赋值为1
	remaining_threads = remaining_threads > 0 ? remaining_threads : 1;


	//遍历并取消线程,从当前活动线程数量的最后一个索引开始,向前遍历到 remaining_threads-1
	int i;
	for(i=pool->active_threads-1; i>remaining_threads-1; i--)
	{
		//使用 pthread_cancel 尝试取消它
		errno = pthread_cancel(pool->tids[i]);
		//如果取消失败(errno != 0),则跳出循环
		if(errno != 0)
			break;

		//用于打印调试
		#ifdef DEBUG
		printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	//如果循环因为取消失败而中断,则返回 -1 表示出错
	if(i == pool->active_threads-1)
		return -1;
	else
	{
		pool->active_threads = i+1;
		return i+1;
	}
}



/*******************************************************************
*
*	函数名称:	  destroy_pool
*	函数功能:     将线程池(thread_pool)进行销毁操作
* 	函数参数:
*  			@a :thread_pool *pool 指向thread_pool结构体的指针 
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
bool destroy_pool(thread_pool *pool)
{
	// 1, activate all threads
	//ture表示确定销毁线程池
	pool->shutdown = true;
	// 广播条件变量,唤醒所有等待该条件的线程
	pthread_cond_broadcast(&pool->cond);

	// 2, wait for their exiting
	int i;
	//遍历当成活跃线程数次,等待每个线程接合
	for(i=0; i<pool->active_threads; i++)
	{
		errno = pthread_join(pool->tids[i], NULL);
		//如果线程接合成功打印ID,失败则打印错误信息
		if(errno != 0)
		{
			printf("join tids[%d] error: %s\n",
					i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
		
	}

	// 3, free memories
	//释放任务队列,线程id,线程池
	free(pool->task_list);
	free(pool->tids);
	free(pool);

	return true;
}

thread_pool.h

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS	1000   //处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS	20     //活跃的线程数量 
//任务结点  单向链表的节点,类型
struct task
{
	void *(*do_task)(void *arg); //任务函数指针  指向线程要执行的任务  格式是固定的
	void *arg;					 //需要传递给任务的参数,如果不需要,则NULL

	struct task *next;			 //指向下一个任务结点的指针
};

//线程池的管理结构体
typedef struct thread_pool
{
	pthread_mutex_t lock;		// 互斥锁
	pthread_cond_t  cond;		// 条件量
	bool shutdown;				//是否需要销毁线程池
	struct task *task_list;		//用于存储任务的链表
	pthread_t *tids;			//用于记录线程池中线程的ID
	unsigned max_waiting_tasks;	//线程池中线程的数量最大值
	unsigned waiting_tasks;		//处于等待状态的线程数量
	unsigned active_threads;	//正在活跃的线程数量
}thread_pool;

//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number);

//向线程池中添加任务
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);

//先线程池中添加线程
int  add_thread(thread_pool *pool, unsigned int additional_threads_number);

//从线程池中删除线程
int  remove_thread(thread_pool *pool, unsigned int removing_threads_number);

//销毁线程池
bool destroy_pool(thread_pool *pool);

//任务函数
void *routine(void *arg);


#endif

main.c

#include "thread_pool.h"

/*******************************************************************
*
*	函数名称:	  mytask
*	函数功能:   函数是一个线程的工作函数,间隔n秒打印线程ID和函数名,用于线程池任务测试
* 	函数参数:
*  			@a :void *arg  指向任意类型的指针参数
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
void *mytask(void *arg)
{
	//将传入的参数进行强转
	int n = (int)arg;
	//打印线程ID和函数名
	printf("[%u][%s] ==> job will be done in %d sec...\n",
		(unsigned)pthread_self(), __FUNCTION__, n);

	sleep(n);
	printf("[%u][%s] ==> job done!\n",
		(unsigned)pthread_self(), __FUNCTION__);

	return NULL;
}

/*******************************************************************
*
*	函数名称:	 count_time
*	函数功能:   线程的工作函数,无限循环地每秒打印一个递增的秒数计数器
* 	函数参数:
*  			@a :void *arg  指向任意类型的指针参数
*  			
*   返回结果:   
* 	注意事项:   None
* 	函数作者:  [email protected]
*	创建日期:   2024/06/09
*	修改历史:
*	函数版本:	V1.0
* *****************************************************************/
void *count_time(void *arg)
{
	int i = 0;
	while(1)
	{
		sleep(1);
		printf("sec: %d\n", ++i);
	}
}

int main(void)
{
	pthread_t a;
	pthread_create(&a, NULL, count_time, NULL);

	// 1, initialize the pool
	thread_pool *pool = malloc(sizeof(thread_pool));
	init_pool(pool, 2);

	// 2, throw tasks
	printf("throwing 3 tasks...\n");
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));

	// 3, check active threads number
	printf("current thread number: %d\n",
			remove_thread(pool, 0));
	sleep(9);

	// 4, throw tasks
	printf("throwing another 2 tasks...\n");
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));

	// 5, add threads
	add_thread(pool, 2);

	sleep(5);

	// 6, remove threads
	printf("remove 3 threads from the pool, "
	       "current thread number: %d\n",
			remove_thread(pool, 3));

	// 7, destroy the pool
	destroy_pool(pool);
	return 0;
}

通过以上代码及结论,可知线程池是一种高效的线程管理方式,通过复用线程、控制最大并发数和管理线程等特性,能够显著提升系统的性能和资源利用率。

如果代码用法有什么问题,请将问题发至网易邮箱 [email protected],作者将及时改正,欢迎与各位老爷交流讨论。

麻烦三连加关注!!!!

比心

标签:task,threads,语言,thread,线程,pthread,原理,pool
From: https://www.cnblogs.com/zkbklink/p/18240716

相关文章

  • 推测性解码:加速多模态大型语言模型的推理
    大模型(LLMs)以其卓越的性能在多个应用场景中大放异彩。然而,随着应用的深入,这些模型的推理速度问题逐渐凸显。为了解决这一挑战,推测性解码(SpeculativeDecoding,SPD)技术应运而生。本文深入探讨了SPD在多模态大型语言模型(MLLMs)中的应用,尤其是针对LLaVA7B模型的优化。MLLMs通过融......
  • 【C语言】宏offsetof的模拟实现(计算结构体中某变量相对于首地址的偏移)
    首先我们应该特别留意:offsetof是一个宏,并非是一个函数!宏offsetof的介绍:参数:第一个是结构体类型名称,第二个是结构体成员名返回类型:size_t无符号整形引用的头文件:<stddef.h>offsetof的使用举列:#include<stddef.h>structStu//注释为相对于起始位置的偏移量{......
  • C语言——使用函数创建动态内存
    一、堆和栈的区别1)栈(Stack):栈是一种自动分配和释放内存的数据结构,存储函数的参数值、局部变量的值等。栈的特点是后进先出,即最后进入的数据最先出来,类似于我们堆盘子一样。栈的大小和生命周期是由系统自动管理的,不需要程序员手动释放。2)堆(Heap):堆是由程序员手动分配和释......
  • 深度解读ChatGPT基本原理
    前言ChatGPT是一种基于人工智能的自然语言处理模型,由OpenAI开发。它以GPT(生成预训练变换模型,GenerativePre-trainedTransformer)为核心,旨在通过深度学习技术实现对人类语言的理解和生成。自问世以来,ChatGPT凭借其卓越的语言生成能力,广泛应用于对话系统、文本生成、翻译等......
  • etcd watch 实现原理
    介绍在etcd中,watch是一个非常重要的特性,它可以让客户端监控etcd中的key或者一组key,当key发生变化时,etcd会通知客户端。本文将介绍etcdwatch的实现原理。etcdctlwatch/test#当/test的值发生变化时,会输出如下信息PUT/testaPUT/testbDELETE/testwatc......
  • 初始C语言——结构化算法的结构
    C语言程序是一种程序化程序,也就是说,可以用C语言程序来解决的问题,都可以分解成相互独立的几个部分,每个部分都可以通过简单的语句或结构来实现。一般而言,对于结构化的程序,一个完整的算法可以用“顺序结构”,“分支结构”和“循环结构”的有机组合来表示。(一)----------顺序结构......
  • 纯理论容器实现的原理
    近期在复习容器的原理,希望这篇文章可以帮助到大家。一、什么是容器?    容器本质上就是主机上的一个进程。这个进程拥有自己的用户空间并且和主机共享内核空间。        容器内的进程可以通过系统调用与内核进行交互,使用内核提供的各种功能和资源。   ......
  • 实验6 C语言结构体、枚举应用编程
    4.实验任务41#include<stdio.h>2#defineN1034typedefstruct{5charisbn[20];6charname[80];7charauthor[80];8doublesales_price;9intsales_count;10}Book;1112voidoutput(Bookx[],int);13voidsort(Bo......
  • 【转载】C 语言有什么奇技淫巧
    快速范围判断经常要批量判断某些值在不在范围内,如果int检测是[0,N)的话:if(x>=0&&x<N)...众所周知,现代CPU优化,减分支是重要手段,上述两次判断可以简写为:if(((unsignedint)x)<N)...减少判断次数。如果int检测范围是[minx,maxx]这种更常见的形式的话,......
  • Java 数据类型 -- Java 语言的 8 种基本数据类型、字符串与数组
    大家好,我是栗筝i,这篇文章是我的“栗筝i的Java技术栈”专栏的第004篇文章,在“栗筝i的Java技术栈”这个专栏中我会持续为大家更新Java技术相关全套技术栈内容。专栏的主要目标是已经有一定Java开发经验,并希望进一步完善自己对整个Java技术体系来充实自己的......