首页 > 其他分享 >线程池的实现代码分析

线程池的实现代码分析

时间:2024-06-09 17:56:18浏览次数:19  
标签:分析 task thread 代码 线程 pthread threads pool

[toc]

线程池

线程池代码分析

image

thread_pool.c

#include "thread_pool.h"

void handler(void *arg)
{
	printf("[%u] is ended.\n",
		(unsigned)pthread_self());										//打印自己的进程号

	pthread_mutex_unlock((pthread_mutex_t *)arg);						//解锁
}

//线程要执行的任务
void *routine(void *arg)
{
	//调试
	#ifdef DEBUG
	printf("[%u] is started.\n",
		(unsigned)pthread_self());
	#endif

	//把需要传递给线程任务的参数进行备份
	thread_pool *pool = (thread_pool *)arg;								
	struct task *p;

	while(1)
	{
		/*
		** push a cleanup functon handler(), make sure that
		** the calling thread will release the mutex properly
		** even if it is cancelled during holding the mutex.
		**
		** NOTE:
		** pthread_cleanup_push() is a macro which includes a
		** loop in it, so if the specified field of codes that 
		** paired within pthread_cleanup_push() and pthread_
		** cleanup_pop() use 'break' may NOT break out of the
		** truely loop but break out of these two macros.
		** see line 61 below.
		*/
		//================================================//
		pthread_cleanup_push(handler, (void *)&pool->lock);   //注册清理线程函数
		pthread_mutex_lock(&pool->lock);					  //上锁
		//================================================//

		// 1, no task, and is NOT shutting down, then wait
		while(pool->waiting_tasks == 0 && !pool->shutdown){   //判断处于等待状态的线程是否等于0,以及判断是否需要销毁线程池,这里是如果false,则进入循环
			pthread_cond_wait(&pool->cond, &pool->lock);     //会将调用线程放入条件变量的等待队列,并释放互斥锁。线程在被唤醒之前会一直阻塞。当线程被唤醒后,它会重新获取互斥锁,然后继续执行
		}

		// 2, no task, and is shutting down, then exit
		if(pool->waiting_tasks == 0 && pool->shutdown == true) //如果需要销毁,则解锁然后线程退出
		{
			pthread_mutex_unlock(&pool->lock);
			pthread_exit(NULL); // CANNOT use 'break';
		}

		// 3, have some task, then consume it   //链表的头删,表示当有任务链表中有任务时,处于等待队列的线程就会去,执行任务链表中的表头的任务
		p = pool->task_list->next;				//将链表的首结点地址,给到局部变量的P
		pool->task_list->next = p->next;		//让链表的头结点,指向P->next ,也就是首结点的下一个结点
		pool->waiting_tasks--;					//让等待被线程执行的任务数量减1

		//================================================//
		pthread_mutex_unlock(&pool->lock);						//解锁
		pthread_cleanup_pop(0);									//不取消清理函数,也不执行
		//================================================//

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);   
		(p->do_task)(p->arg);									//执行链表中的函数
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);	

		free(p);												//执行完任务结点后释放局部变量下保存的堆内存地址
	}

	pthread_exit(NULL);
}

//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
	//初始化互斥锁
	pthread_mutex_init(&pool->lock, NULL);

	//初始化条件量
	pthread_cond_init(&pool->cond, NULL);

	//销毁标志 
	pool->shutdown = false; //不销毁

	//给链表的节点申请堆内存
	pool->task_list = malloc(sizeof(struct task)); 

	//申请堆内存,用于存储创建出来的线程的ID
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);      //线程池中最大线程数量= 活跃线程+能够创建的线程数量(既能够存放线程ID的个数)  

	//错误处理,对malloc进行错误处理
	if(pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	//对任务链表中的节点的指针域进行初始化
	pool->task_list->next = NULL;

	//设置线程池中线程数量的最大值
	pool->max_waiting_tasks = MAX_WAITING_TASKS;			//方便更改
	
	//设置等待线程处理的任务的数量为0,说明现在没有任务
	pool->waiting_tasks = 0;

	//设置线程池中活跃的线程的数量
	pool->active_threads = threads_number;

	int i;

	//循环创建活跃线程
	for(i=0; i<pool->active_threads; i++)                   
	{
		//创建线程  把线程的ID存储在申请的堆内存
		if(pthread_create(&((pool->tids)[i]), NULL,					
					routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}

		//用于调试
		#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	return true;
}

//先线程池的任务链表中添加任务
bool add_task(thread_pool *pool,
	      void *(*do_task)(void *arg), void *arg)           //将自定义的任务,添加到
{
	//给任务链表节点申请内存
	struct task *new_task = malloc(sizeof(struct task));
	if(new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	new_task->do_task = do_task;	//设置需要在链表中添加的任务
	new_task->arg = arg;			//任务函数的参数
	new_task->next = NULL;			//指针域设置为NULL

	//============ LOCK =============//
	pthread_mutex_lock(&pool->lock);			   //上锁防止主线程与任务执行线程,资源竞争
	//===============================//

	//说明要处理的任务的数量大于能处理的任务数量
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)   
	{
		pthread_mutex_unlock(&pool->lock);		  

		fprintf(stderr, "too many tasks.\n");
		free(new_task);							 

		return false;
	}
	
	struct task *tmp = pool->task_list;		 //通过局部变量,对任务链表进行访问,可以保留头结点的位置,减少持有锁的时间,随着函数结束自动释放资源

	//遍历链表,找到单向链表的尾节点
	while(tmp->next != NULL)
		tmp = tmp->next;

	//把新的要处理的任务插入到链表的尾部  尾插
	tmp->next = new_task;

	//要处理的任务的数量+1
	pool->waiting_tasks++;

	//=========== UNLOCK ============//
	pthread_mutex_unlock(&pool->lock);
	//===============================//

	//调试
	#ifdef DEBUG
	printf("[%u][%s] ==> a new task has been added.\n",
		(unsigned)pthread_self(), __FUNCTION__);
	#endif

	//唤醒第一个处于阻塞队列中的线程
	pthread_cond_signal(&pool->cond);
	return true;
}

//向线程池加入新线程
int add_thread(thread_pool *pool, unsigned additional_threads)
{
	//判断需要添加的新线程的数量是否为0
	if(additional_threads == 0)
		return 0;

	//计算线程池中总线程的数量
	unsigned total_threads =
			pool->active_threads + additional_threads;   //活跃线程的数量+需要添加新线程的数量=总的活跃线程的数量

	int i, actual_increment = 0;       					 

	
	for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++) //判断条件为 活跃线程的数量小于线程池中总线程的数量 且 小于创建线程的上限
	{																				//线程总的活跃线程的数量,是用来判断循环多少次的
		//创建新线程
		if(pthread_create(&((pool->tids)[i]),										//而超过能够创建的线程数量也一样不会在添加新线程了
					NULL, routine, (void *)pool) != 0)								//为什么不这样写for(i=0 , i<additional_threads, && i < MAX_ACTIVE_THREADS;i++) 
		{																			//因为创建出来的线程ID,会根据创建时的顺序,存放到对应的数组下标的位置,
			perror("add threads error");											//所以需要定义两个局部变量,用于保存数组最后的一个元素的后一个位置,因为是i++  ,int i, actual_increment = 0;
																					//actual_increment这个变量用于保存实际创建线程的数量
			// no threads has been created, return fail
			if(actual_increment == 0)												
				return -1;

			break;
		}
		actual_increment++; 

		#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	//记录此时线程池中活跃线程的总数
	pool->active_threads += actual_increment;
	return actual_increment;
}

int remove_thread(thread_pool *pool, unsigned int removing_threads)				//删除线程池中的线程。 参数是需要删除的数量
{
	if(removing_threads == 0)
		return pool->active_threads;											

	int remaining_threads = pool->active_threads - removing_threads;			//计算线程池中剩余的线程数量			
	remaining_threads = remaining_threads > 0 ? remaining_threads : 1;          //防呆,如果想要删除的线程数量大于线程池中线程的数量则 
																				//保留初始化时数组下标为0的线程,如果不保留,就是销毁线程池了
	int i;																		//在删除线程池中的线程时,保留至少一个线程的原因是为了确保线程池仍然能够处理剩余的任务,避免完全停止服务。
	for(i=pool->active_threads-1; i>remaining_threads-1; i--)					//pool->active_threads-1即数组中最后一个元素下标
	{
		errno = pthread_cancel(pool->tids[i]);									// 取消线程 同时 执行线程清理函数,主线程调用,终止其他线程

		if(errno != 0)
			break;

		#ifdef DEBUG
		printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	if(i == pool->active_threads-1)												
		return -1;
	else
	{
		pool->active_threads = i+1;
		return i+1;
	}
}

bool destroy_pool(thread_pool *pool)								//销毁线程池
{
	// 1, activate all threads
	pool->shutdown = true;
	pthread_cond_broadcast(&pool->cond);							//唤醒线程池中所有线程,然后终止线程

	// 2, wait for their exiting
	int i;
	for(i=0; i<pool->active_threads; i++)							
	{
		errno = pthread_join(pool->tids[i], NULL);                //主线程调用,循环等待回收线程资源
		if(errno != 0)
		{
			printf("join tids[%d] error: %s\n",
					i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
		
	}

	// 3, free memories
	free(pool->task_list);											//释放线程池中任务链表的堆内存
	free(pool->tids);												//释放线程句柄数组的堆内存
	free(pool);														//释放管理线程池的堆内存

	return true;
}

thread_pool.h

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS	1000   //处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS	20     //活跃的线程数量 

//任务结点  单向链表的节点,类型
struct task
{
	void *(*do_task)(void *arg); //任务函数指针  指向线程要执行的任务  格式是固定的
	void *arg;					 //需要传递给任务的参数,如果不需要,则NULL

	struct task *next;			 //指向下一个任务结点的指针
};

//线程池的管理结构体
typedef struct thread_pool
{
	pthread_mutex_t lock;		// 互斥锁
	pthread_cond_t  cond;		// 条件量

	bool shutdown;				//是否需要销毁线程池

	struct task *task_list;		//用于存储任务的链表

	pthread_t *tids;			//用于记录线程池中线程的ID,其实是一个数组,数组中元素的类型是pthread_t

	unsigned max_waiting_tasks;	//线程池中线程的数量最大值
	unsigned waiting_tasks;		//设置等待线程处理的任务的数量
	unsigned active_threads;	//正在活跃的线程数量
}thread_pool;

//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number);

//向线程池中添加任务
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);

//先线程池中添加线程
int  add_thread(thread_pool *pool, unsigned int additional_threads_number);

//从线程池中删除线程
int  remove_thread(thread_pool *pool, unsigned int removing_threads_number);

//销毁线程池
bool destroy_pool(thread_pool *pool);

//任务函数
void *routine(void *arg);

#endif

main.c

#include "thread_pool.h"

// 任务函数:模拟一个需要耗时的任务
void *mytask(void *arg)
{
    int n = (int)arg;

    printf("[%u][%s] ==> job will be done in %d sec...\n",
           (unsigned)pthread_self(), __FUNCTION__, n);

    sleep(n); // 模拟任务耗时

    printf("[%u][%s] ==> job done!\n",
           (unsigned)pthread_self(), __FUNCTION__);

    return NULL;
}

// 计时函数:每秒打印一次经过的秒数
void *count_time(void *arg)
{
    int i = 0;
    while(1)
    {
        sleep(1);
        printf("sec: %d\n", ++i);
    }
}

int main(void)
{
    pthread_t a;
    // 创建计时线程
    pthread_create(&a, NULL, count_time, NULL);

    // 1. 初始化线程池
    thread_pool *pool = malloc(sizeof(thread_pool));
    init_pool(pool, 2); // 初始化线程池,初始线程数为2

    // 2. 添加任务到线程池
    printf("throwing 3 tasks...\n");
    add_task(pool, mytask, (void *)(rand()%10));
    add_task(pool, mytask, (void *)(rand()%10));
    add_task(pool, mytask, (void *)(rand()%10));

    // 3. 检查当前活动线程数
    printf("current thread number: %d\n",
           remove_thread(pool, 0)); // 传入0表示不移除线程,只返回当前线程数
    sleep(9); // 等待9秒

    // 4. 添加更多任务到线程池
    printf("throwing another 2 tasks...\n");
    add_task(pool, mytask, (void *)(rand()%10));
    add_task(pool, mytask, (void *)(rand()%10));

    // 5. 添加更多线程到线程池
    add_thread(pool, 2); // 增加2个线程

    sleep(5); // 等待5秒

    // 6. 从线程池中移除线程
    printf("remove 3 threads from the pool, "
           "current thread number: %d\n",
           remove_thread(pool, 3)); // 移除3个线程,并打印当前线程数

    // 7. 销毁线程池
    destroy_pool(pool); // 销毁线程池
    return 0; // 主函数返回0,程序结束
}

image
image
image
image

线程取消的基本概念

  • 取消点(Cancellation Point):线程在执行过程中,会在一些特定的函数调用时检查是否有取消请求,这些函数称为取消点。例如,pthread_testcancelpthread_joinpthread_cond_wait 等函数都是取消点。

  • 取消类型(Cancellation Type):决定线程在取消点如何响应取消请求。主要有两种类型:

    • 异步取消(Asynchronous Cancellation):线程可以在任何时刻被取消。
    • 延迟取消(Deferred Cancellation):线程只有在到达取消点时才会被取消。POSIX 线程库默认使用这种类型。
  • 取消状态(Cancellation State):决定线程是否响应取消请求。可以是以下两种状态:

    • 启用(Enable):线程会响应取消请求。
    • 禁用(Disable):线程不会响应取消请求。

    pthread_cleanup_push 是 POSIX 线程库(pthread)中的一个函数,用于在线程取消时执行清理操作。它与 pthread_cleanup_pop 配对使用,确保在线程退出或被取消时执行特定的清理代码,例如释放资源或解锁互斥锁。

    • pthread_cleanup_push(cleanup, "Resource A"):注册清理函数 cleanup,当线程被取消或退出时,会执行 cleanup("Resource A")
    • pthread_cleanup_pop(1):取消清理函数并执行它(参数 1 表示执行清理函数)。
    • pthread_testcancel():这是一个取消点,线程在这里检查是否有取消请求。
      pthread_cleanup_pushpthread_cleanup_pop 用于确保线程在被取消或正常退出时执行特定的清理操作。这对于管理资源(如内存、文件描述符、互斥锁等)非常重要,确保不会因为线程的意外终止而导致资源泄漏。

    线程取消(Thread Cancellation)是指在多线程编程中,允许一个线程请求终止另一个线程的执行。POSIX 线程库提供了这种机制,使得线程可以被其他线程取消。这在某些情况下非常有用,例如当一个线程因为某种原因需要提前终止另一个线程的执行时。

相关函数

  • pthread_cancel(pthread_t thread): 请求取消指定的线程。
  • pthread_setcancelstate(int state, int *oldstate): 设置线程的取消状态。
  • pthread_setcanceltype(int type, int *oldtype): 设置线程的取消类型。
  • pthread_testcancel(): 创建一个取消点,线程在执行到这里时会检查是否有取消请求。

通过线程池去管理线程:重点在于用条件变量,以及线性表(数组)保存线程的ID(句柄)
通过链表去管理任务: 尾插,头删的方式

阅读代码的时候需要结合上下文
布尔类型一般常用来判断是和否(即二进制可作为标志)
函数指针:用来指向函数的指针 void *(*do_task)(void *arg);

释放堆内存时,包括结构体里的指针的堆内存分配

整体的架构应该是:

堵塞队列:线程1 , 线程2 ,线程3 .......
任务链表: 任务1 , 任务2 ,任务3 .........

pthread_线程1{
上锁
while{
条件量
}
if{判断标志位
销毁线程池
}
pool->taks_list->next = p->next(头删)
(p->task_任务1)(p->arg)
解锁
free(p)
}

标签:分析,task,thread,代码,线程,pthread,threads,pool
From: https://www.cnblogs.com/kencszqh/p/18239817

相关文章

  • python-数据分析-Pandas-1、Series对象
    Pandas是WesMcKinney在2008年开发的一个强大的分析结构化数据的工具集。Pandas以NumPy为基础(实现数据存储和运算)提供了专门用于数据分析的类型、方法和函数,对数据分析和数据挖掘提供了很好的支持;同时pandas还可以跟数据可视化工具matplotlib很好的整合在一起,非常轻松......
  • 第二次opp的总结和分析
    23201927-杨民星第二次blog1.前言:  继上次第一次blog之后,又开展了3次的Java的opp的训练集。在这三次的opp的题目中,第一次是对于之前三次题目的最后一次的迭代(上次3个题集的blog23201927-杨民星-第一次博客)这个题目也是答题系统的最后一个题目;而对于后面两个题目,就是基于电路......
  • 加油站AI智能视频监控分析系统 YOLOv8
    加油站AI智能视频监控分析系统可以根据视频总流量分析技术,使优化算法实体模型替代人的眼睛,加油站AI智能视频监控分析系统即时鉴别加油站内部的工作过程中的安全规范、员工行为准则等问题。加油站AI智能视频监控分析系统优化算法实体模型可以精确捕获违规操作,全年度24个小时无间......
  • 11 深入理解Linux文件系统与日志分析
    目录11.1深入理解Linux文件系统    11.1.1inode与block详解        1.inode和block概述        2.inode的内容        3.inode的号码        4.inode的大小    11.1.2硬链接与......
  • VS Code 中怎么运行js代码
    在VSCode中运行JavaScript代码可以通过以下步骤实现:1.安装Node.js:首先确保你的电脑安装了Node.js。你可以在官方网站(https://nodejs.org/)上下载适用于你操作系统的安装包,然后按照指示进行安装。2.打开VSCode:打开VSCode编辑器,确保你已经安装了VSCode的JavaScript插件。......
  • springboot+vue在线考试系统附带文章和源代码部署讲解等
    文章目录前言项目运行效果截图技术栈后端springboot框架:后端mybatis框架:前端框架vue:数据库mysql:开发环境代码参考数据库参考源码质量保障源码获取前言......
  • springboot+vue养老院管理系统附带文章和源代码部署讲解等
    文章目录前言项目运行效果截图技术栈后端springboot框架:后端mybatis框架:前端框架vue:数据库mysql:开发环境代码参考数据库参考源码质量保障源码获取前言......
  • springboot+vue医院管理系统附带文章和源代码部署讲解等
    文章目录前言项目运行效果截图技术栈后端springboot框架:后端mybatis框架:前端框架vue:数据库mysql:开发环境代码参考数据库参考源码质量保障源码获取前言......
  • 思科配置基础代码内容针对网络构建中型局域网构建中型局域网需全面考虑企业网或校园网
    构建中型局域网需全面考虑企业网或校园网的需求。对于企业网,需确保高效数据传输、可靠设备连接及严格的安全防护;而校园网则需关注教学管理的便捷性、无线网络的覆盖以及未来扩展的灵活性。整体而言,构建与扩展需围绕用户需求、网络性能及安全性展开。一、配置三层交换机的端口......
  • 数据结构严蔚敏版精简版-线性表以及c语言代码实现
    线性表、栈、队列、串和数组都属于线性结构。线性结构的基本特点是除第一个元素无直接前驱,最后一个元素无直接后继之外,其他每个数据元素都有一个前驱和后继。1 线性表的定义和特点如此类由n(n大于等于0)个数据特性相同的元素构成的有限序列称为线性表。线性表中元素的个数n定......