首页 > 其他分享 >封装线程池相关函数

封装线程池相关函数

时间:2024-06-11 09:24:15浏览次数:27  
标签:None task 封装 函数 threads pthread 线程 pool

image

thread_pool.c

线程要执行的任务

/*****************************************************************************************
 *
 *  @name      :  routine
 *	@function  :  线程要执行的任务
 *  @paramsv   : None
 *  @retval    :  None
 *  @author    :  Dazz
 *  @date      :  2024/6/2
 *  @version   :  None
 *  @note      :  该函数会在routine线程被取消的时候自动执行
 *
 *
 * *****************************************************************************************/
void *routine(void *arg)
{
// 调试
#ifdef DEBUG
	printf("[%u] is started.\n",
		   (unsigned)pthread_self());
#endif

	// 把需要传递给线程任务的参数进行备份
	thread_pool *pool = (thread_pool *)arg;

	// 新建一个任务结点用来备份
	struct task *p;

	while (1)
	{
		/**********************************************************************
		 **
		 **   pthread_cleanup_push是一个宏,需要配合pthread_cleanup_pop一起使用
		 **   如在执行pthread_cleanup_pop之前,该线程被取消了,则会调用handler这个函数,
		 **   并将pool->lock作为参数传递给handler
		 **
		 ****************************************************************************/
		pthread_cleanup_push(handler, (void *)&pool->lock);

		// 对互斥锁上锁
		pthread_mutex_lock(&pool->lock);

		/******************************任务量为0的情况********************************************************/

		// 当处于等待状态的线程数量为0且销毁线程池的标志为0时
		while (pool->waiting_tasks == 0 && !pool->shutdown)
		{
			// 对该线程进行挂起
			pthread_cond_wait(&pool->cond, &pool->lock);
		}

		// 当处于等待状态的线程数量为0且销毁线程池的标志为1时
		if (pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			// 解锁互斥锁
			pthread_mutex_unlock(&pool->lock);

			// 结束该线程
			pthread_exit(NULL); // CANNOT use 'break';
		}

		/******************************任务量不为0的情况****************************************************/

		// 备份任务结点
		p = pool->task_list->next;

		// 让线程池里的当前任务指向下一个任务
		pool->task_list->next = p->next;

		// 令处于等待状态的线程数量减一
		pool->waiting_tasks--;

		//================================================//
		// 互斥锁解锁
		pthread_mutex_unlock(&pool->lock);

		// 不执行清理处理程序,参数为1则执行清理处理程序
		pthread_cleanup_pop(0);
		//================================================//

		// 设置线程为不可取消
		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);

		// 执行p,即当前任务结点里需要执行的任务
		(p->do_task)(p->arg);

		// 设置线程为可取消
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

		// 释放堆内存
		free(p);
	}

	// 结束线程
	pthread_exit(NULL);
}

初始化线池

/******************************************************
 *
 *  @name      :  init_pool
 *	@function  :  初始化线池
 *  @params
 *                @pool             : 线程池的地址
 *                @threads_number   : 线程池中活跃的线程的数量
 *
 *  @retval    : 成功返回1,否则返回0
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
	// 以默认属性初始化互斥锁
	pthread_mutex_init(&pool->lock, NULL);

	// 以默认属性初始化条件量
	pthread_cond_init(&pool->cond, NULL);

	// 销毁标志设置为不销毁
	pool->shutdown = false;

	// 给链表的节点申请堆内存
	pool->task_list = (struct task *)malloc(sizeof(struct task));

	// 申请堆内存,用于存储创建出来的线程的ID
	pool->tids = (pthread_t *)malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);

	// 错误处理,对malloc进行错误处理
	if (pool->task_list == NULL || pool->tids == NULL)
	{
		fprintf(stderr, "malloc for task_list or tids fail!,error: %d, %s\n", errno, strerror(errno));
		return false;
	}

	// 对任务链表中的节点的指针域进行初始化
	pool->task_list->next = NULL;

	// 设置线程池中线程数量的最大值
	pool->max_waiting_tasks = MAX_WAITING_TASKS;

	// 设置等待线程处理的任务的数量为0,说明现在没有任务
	pool->waiting_tasks = 0;

	// 设置线程池中活跃的线程的数量
	pool->active_threads = threads_number;

	// 循环创建活跃线程
	for (int i = 0; i < pool->active_threads; i++)
	{
		// 创建线程  把线程的ID存储在申请的堆内存
		if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0)
		{
			fprintf(stderr, " create threads fail!,error: %d, %s\n", errno, strerror(errno));
			return false;
		}

// 用于调试
#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}

	return true;
}

向线程池的任务链表中添加任务

/******************************************************
 *
 *  @name      :  add_task
 *	@function  :  向线程池的任务链表中添加任务
 *  @params
 *                @pool         : 线程池的地址
 *                @do_task      : 需要执行的任务的函数
 *                @arg          :需要执行的任务的函数的参数
 *
 *  @retval    : 如成功返回1,否则返回0
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{
	// 给任务链表节点申请内存
	struct task *new_task = malloc(sizeof(struct task));
	if (new_task == NULL)
	{
		fprintf(stderr, " malloc for new task fail!,error: %d, %s\n", errno, strerror(errno));
		return false;
	}

	new_task->do_task = do_task; // 任务函数指针指向传进来的do_task
	new_task->arg = arg;		 // 设置函数参数
	new_task->next = NULL;		 // 指针域设置为NULL

	//============ LOCK =============//
	// 互斥锁上锁
	pthread_mutex_lock(&pool->lock);
	//===============================//

	// 说明要处理的任务的数量大于能处理的任务数量
	if (pool->waiting_tasks >= MAX_WAITING_TASKS)
	{
		pthread_mutex_unlock(&pool->lock);

		fprintf(stderr, "too many tasks.\n");
		free(new_task);

		return false;
	}

	// 备份任务链表
	struct task *tmp = pool->task_list;

	// 遍历链表,找到单向链表的尾节点
	while (tmp->next != NULL)
		tmp = tmp->next;

	// 把新的要处理的任务插入到链表的尾部  尾插
	tmp->next = new_task;

	// 要处理的任务的数量+1
	pool->waiting_tasks++;

	//=========== UNLOCK ============//
	pthread_mutex_unlock(&pool->lock);
	//===============================//

// 调试
#ifdef DEBUG
	printf("[%u][%s] ==> a new task has been added.\n",
		   (unsigned)pthread_self(), __FUNCTION__);
#endif

	// 唤醒第一个处于阻塞队列中的线程
	pthread_cond_signal(&pool->cond);
	return true;
}

向线程池加入新线程

/******************************************************
 *
 *  @name      :  add_thread
 *	@function  :  向线程池加入新线程
 *  @params
 *                @pool                  : 线程池的地址
 *                @removing_threads      : 需要增加的线程数量
 *
 *  @retval    : 返回正在活跃的线程数量
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
int add_thread(thread_pool *pool, unsigned additional_threads)
{
	// 判断需要添加的新线程的数量是否为0
	if (additional_threads == 0)
		return 0;

	// 计算线程池中总线程的数量
	unsigned total_threads =
		pool->active_threads + additional_threads;

	int i, actual_increment = 0;

	// 循环创建新线程
	for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)
	{
		// 创建新线程
		if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0)
		{
			perror("add threads error");

			// 如需要加入的新线程池数量为0,则直接退出
			if (actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++;
// 用于调试
#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}

	// 记录此时线程池中活跃线程的总数
	pool->active_threads += actual_increment;
	return actual_increment;
}

从线程池中删除线程

/******************************************************
 *
 *  @name      :  remove_thread
 *	@function  :  从线程池中删除线程
 *  @params
 *                @pool                  : 线程池的地址
 *                @removing_threads      : 需要删除的线程数量
 *
 *  @retval    : 返回正在活跃的线程数量
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	// 如果需要删除的线程数量为0,则返回正在活跃的线程数量
	if (removing_threads == 0)
		return pool->active_threads;

	// 定义一个变量叫remaining_threads,remaining_threads的值为正在活跃的线程数量减去需要删除线程的数量
	int remaining_threads = pool->active_threads - removing_threads;

	// 如remaining_threads大于零,说明正在活跃的线程数量大于需要删除线程的数量
	// 如remaining_threads小于零,说明正在活跃的线程数量小于或等于需要删除线程的数量,并将remaining_threads赋值为1
	remaining_threads = remaining_threads > 0 ? remaining_threads : 1;

	int i;

	// 循环取消线程,直到正在活跃的线程数量为0
	for (i = pool->active_threads - 1; i > remaining_threads - 1; i--)
	{
		// 如调用pthread_cancel函数失败,则退出循环
		errno = pthread_cancel(pool->tids[i]);

		if (errno != 0)
			break;

#ifdef DEBUG
		printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}

	if (i == pool->active_threads - 1)
		return -1;

	// 返回现在的线程数量
	else
	{
		pool->active_threads = i + 1;
		return i + 1;
	}
}

释放线程池

/******************************************************
 *
 *  @name      :  destroy_pool
 *	@function  :  释放线程池
 *  @params
 *                @pool                  : 线程池的地址
 *
 *  @retval    : 成功返回1,否则返回0
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
bool destroy_pool(thread_pool *pool)
{
	// 设置销毁标志位1
	pool->shutdown = true;

	// 唤醒所有线程
	pthread_cond_broadcast(&pool->cond);

	// 循环等待线程结束
	for (int i = 0; i < pool->active_threads; i++)
	{
		// 等待线程结束并释放相关资源
		errno = pthread_join(pool->tids[i], NULL);
		if (errno != 0)
		{
			printf("join tids[%d] error: %s\n", i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
	}

	// 释放相关内存
	free(pool->task_list);
	free(pool->tids);
	free(pool);

	return true;
}

thread_pool.h

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>

#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS 1000 // 处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20  // 活跃的线程数量

// 任务结点  单向链表的节点,类型
struct task
{
	void *(*do_task)(void *arg); // 任务函数指针  指向线程要执行的任务  格式是固定的
	void *arg;					 // 需要传递给任务的参数,如果不需要,则NULL

	struct task *next; // 指向下一个任务结点的指针
};

// 线程池的管理结构体
typedef struct thread_pool
{
	pthread_mutex_t lock; // 互斥锁
	pthread_cond_t cond;  // 条件量

	bool shutdown; // 是否需要销毁线程池

	struct task *task_list; // 用于存储任务的链表

	pthread_t *tids; // 用于记录线程池中线程的ID

	unsigned max_waiting_tasks; // 线程池中线程的数量最大值
	unsigned waiting_tasks;		// 处于等待状态的线程数量
	unsigned active_threads;	// 正在活跃的线程数量
} thread_pool;

/******************************************************
 *
 *  @name      :  init_pool
 *	@function  :  初始化线池
 *  @params
 *                @pool             : 线程池的地址
 *                @threads_number   : 线程池中活跃的线程的数量
 *
 *  @retval    : 成功返回1,否则返回0
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number);

/******************************************************
 *
 *  @name      :  add_task
 *	@function  :  向线程池的任务链表中添加任务
 *  @params
 *                @pool         : 线程池的地址
 *                @do_task      : 需要执行的任务的函数
 *                @arg          :需要执行的任务的函数的参数
 *
 *  @retval    : 如成功返回1,否则返回0
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);

/******************************************************
 *
 *  @name      :  add_thread
 *	@function  :  向线程池加入新线程
 *  @params
 *                @pool                  : 线程池的地址
 *                @removing_threads      : 需要增加的线程数量
 *
 *  @retval    : 返回正在活跃的线程数量
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
int add_thread(thread_pool *pool, unsigned int additional_threads_number);

/******************************************************
 *
 *  @name      :  remove_thread
 *	@function  :  从线程池中删除线程
 *  @params
 *                @pool                  : 线程池的地址
 *                @removing_threads      : 需要删除的线程数量
 *
 *  @retval    : 返回正在活跃的线程数量
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);

/******************************************************
 *
 *  @name      :  destroy_pool
 *	@function  :  释放线程池
 *  @params
 *                @pool                  : 线程池的地址
 *
 *  @retval    : 成功返回1,否则返回0
 *  @author    : Dazz
 *  @date      : 2024/6/2
 *  @version   : None
 *  @note      : None
 *
 *
 * *******************************************************/
bool destroy_pool(thread_pool *pool);

/*****************************************************************************************
 *
 *  @name      :  routine
 *	@function  :  线程要执行的任务
 *  @paramsv   : None
 *  @retval    :  None
 *  @author    :  Dazz
 *  @date      :  2024/6/2
 *  @version   :  None
 *  @note      :  该函数会在routine线程被取消的时候自动执行
 *
 *
 * *****************************************************************************************/
void *routine(void *arg);

#endif

标签:None,task,封装,函数,threads,pthread,线程,pool
From: https://www.cnblogs.com/Dazz24/p/18241483

相关文章

  • 线程池(待补齐)
    #include<stdio.h>#include<stdbool.h>#include<unistd.h>#include<stdlib.h>#include<string.h>#include<strings.h>#include<errno.h>#include<pthread.h>#defineMAX_WAITING_TASKS1000//处于等待状态的线程数量......
  • 【JS封装-兼容IE(较旧版本如IE8及以下)】强化编程实践:精选JavaScript函数封装集锦-添加E
    目录添加Event监听获取非行间样式JSON.parse与JSON.stringifyquerySelector与querySelectorAll的兼容支持跨浏览器的classList操作兼容性处理console.log兼容性处理forEach方法Promise的兼容性处理FetchAPI的兼容性处理添加Event监听IE8及以下版本不支持addEvent......
  • 服务器IO多路复用的select和poll的区别以及监听套接字select函数的四个宏操作
    目录知识补给站对文件描述符集合操作的四个宏操作服务器IO多路复用中的select和poll的区别知识补给站对文件描述符集合操作的四个宏操作对文件描述符集合操作的四个宏操作在select函数中起着关键的作用,它们用于初始化、添加、删除和检查文件描述符集合中的元素。这四个宏为:FD_......
  • 线程池
    //定义任务函数,用于模拟需要执行的任务void*mytask(void*arg){intn=(int)arg;//输出任务信息和需要花费的时间printf("[%u][%s]==>jobwillbedonein%dsec...\n",(unsigned)pthread_self(),__FUNCTION__,n);sleep(n);//模拟任务执行时间//输......
  • 高级函数
    reduce#倒序lists=[2,5,2,4,7]print(sorted(lists,reverse=True))filter#过滤序列,过滤掉不符合条件的元系defget_data(x):returnx%2==0#查询1-100的偶数print(list(filter(get_data,range(1,101))))reduce对序列中的元素进行案计计算fromfunctoolsimportreduce......
  • risc-v中的函数调用
    先来看一个普通main函数的完整执行过程(以a=bproblem为例)intmain(){inta=2;intb=3;intc=a+b;}其risc-v(rv32)的汇编如下main:addisp,sp,-32#将栈指针sp向下移动32个字节,预留栈空间swra,28(sp)#将返回地址ra存......
  • IO进程线程(十一)进程间通信 消息队列
    文章目录一、IPC(Inter-ProcessCommunication)进程间通信相关命令:(一)ipcs---查看IPC对象(二)获取IPC键值(三)删除IPC对象的命令(四)获取IPC键值的函数1.函数定义2.使用示例二、消息队列(一)特点(二)相关API1.创建或获取一个消息队列2.向消息队列中写消息3.在消息队列中......
  • IO进程线程(十二)进程间通信 共享内存 信号灯集
    文章目录一、共享内存sharedmemory(shm)(一)特点(二)相关API1.创建共享内存2.映射共享内存到当前的进程空间3.取消地址映射4.共享内存控制(三)使用示例(四)属性二、信号灯集---控制进程间同步(一)特点(二)相关API1.创建一个信号灯集2.信号灯集控制函数3.信号灯集的操作函......
  • C语言指针(函数指针的深入)
    在函数指针进阶中初步接触了函数指针现在来深度学习一下我们来分析两个题目加深一下对函数指针的理解例1:(*(void(*)())0)();分析这个函数的作用是什么?看到这段语句这么长是不是觉得很难,在我分析了之后就会变得非常简单分析之前我们先讲解一下C声明的组成任何C变量的声......
  • 三角函数和反三角函数导数的推导
    三角函数的导数 反三角函数的导数的推导  ......