首页 > 其他分享 >Thread-Pool

Thread-Pool

时间:2024-06-11 08:59:54浏览次数:27  
标签:task Thread void threads Pool pthread 线程 pool

线程池

线程池简介

线程池是一种多线程设计模式,用于优化并发任务的执行。通过预创建一组线程,可以减少线程创建和销毁的开销,提高系统性能。线程池主要由两个部分组成:任务队列和工作线程。工作线程从任务队列中获取任务并执行它们。

代码结构

线程池的代码结构如下:

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS	1000   //处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS	20     //活跃的线程数量 

//任务结点  单向链表的节点,类型
struct task
{
	void *(*do_task)(void *arg);  //任务函数指针  指向线程要执行的任务  格式是固定的
	void *arg;					  //需要传递给任务的参数,如果不需要,则NULL

	struct task *next;			  //指向下一个任务结点的指针
};


//线程池的管理结构体
typedef struct thread_pool
{
	pthread_mutex_t lock;		  // 互斥锁
	pthread_cond_t  cond;		  // 条件量

	bool shutdown;				  //是否需要销毁线程池

	struct task *task_list;		  //用于存储任务的链表

	pthread_t *tids;			  //用于记录线程池中线程的ID

	unsigned max_waiting_tasks;	  //线程池中线程的数量最大值
	unsigned waiting_tasks;		  //处于等待状态的线程数量
	unsigned active_threads;	  //正在活跃的线程数量
}thread_pool;


/**
 * init_pool - 初始化线程
 * 	
 * @pool					 线程池的管理结构体
 * @threads_number	     在线程池中创建的活跃线程数量
 * 
 * return:	   
 * 成功返回true,失败返回false
 * 
 * NOTE:	   
 * 创建的活跃线程数量不要超过 MAX_ACTIVE_THREADS (20)
 */
bool init_pool(thread_pool *pool, unsigned int threads_number);


/**
 * add_task - 添加线程池中的任务
 * 	
 * @pool				         线程池的管理结构体
 * @(*do_task)(void *arg)	     线程任务
 * @arg                          县城任务参数
 * 
 * return:	   
 * 成功返回true,失败返回false
 * 
 */
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg);


/**
 * add_thread - 添加线程池中的线程
 * 	
 * @pool				         线程池的管理结构体
 * @additional_threads	         添加的线程数量
 * 
 * return:	   
 * 返回实际添加的线程数量
 * 
 */
int add_thread(thread_pool *pool, unsigned int additional_threads);

/**
 * remove_thread - 从线程池中删除线程
 * 	
 * @pool				         线程池的管理结构体
 * @additional_threads	         添加的线程数量
 * 
 * return:	   
 * 返回实际删除的线程数量
 * 
 */
int remove_thread(thread_pool *pool, unsigned int removing_threads);

/**
 * destroy_pool - 销毁线程池
 * 	
 * @pool				         线程池的管理结构体
 * 
 * return:	   
 * 成功返回true,失败返回false
 * 
 */
bool destroy_pool(thread_pool *pool);

/**
 * routine 
 * 线程例程函数,用于执行线程池中的任务。
 * 其主要功能包括等待任务、处理任务、正确处理互斥锁以及确保线程安全退出。
 * 	
 * @pool	                     线程池的管理结构体
 * 
 */
void *routine(void *arg);


#endif


主要函数实现与解释

初始化线程池

// 初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
	// 初始化互斥锁
	pthread_mutex_init(&pool->lock, NULL);

	// 初始化条件量
	pthread_cond_init(&pool->cond, NULL);

	// 设置线程状态为 未关闭
	pool->shutdown = false; 

	// 给链表的节点申请堆内存
	pool->task_list = malloc(sizeof(struct task)); 

	// 申请堆内存,用于存储创建出来的线程的ID
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);

	// 错误处理,对malloc进行错误处理
	if(pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}
 
	// 对任务链表中的节点的指针域进行初始化
	pool->task_list->next = NULL;
 
	// 设置线程池中线程数量的最大值
	pool->max_waiting_tasks = MAX_WAITING_TASKS;
	
	// 设置等待线程处理的任务的数量为0,说明现在没有任务
	pool->waiting_tasks = 0;

	// 设置线程池中活跃的线程的数量
	pool->active_threads = threads_number;

	int i;

	// 循环创建活跃线程
	for(i=0; i<pool->active_threads; i++)
	{
		// 创建线程  把线程的ID存储在申请的堆内存
		if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}

		// 调试
		#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	return true;
}

添加任务到线程池

// 先线程池的任务链表中添加任务
bool add_task(thread_pool *pool,void *(*do_task)(void *arg), void *arg)
{
	// 给任务链表节点申请内存
	struct task *new_task = malloc(sizeof(struct task));
	if(new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	// 设置线程任务以及线程参数
	new_task->do_task = do_task;
	new_task->arg = arg;			
	new_task->next = NULL;			// 指针域设置为NULL

	//============ LOCK =============//
	pthread_mutex_lock(&pool->lock);
	//===============================//

	// 若要处理的任务的数量大于能处理的任务数量
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)
	{
		// 解锁并释放申请的内存
		pthread_mutex_unlock(&pool->lock); 

		fprintf(stderr, "too many tasks.\n");
		free(new_task); 

		return false;
	}
	
	// 若要处理的任务的数量未大于能处理的任务数量,将新任务添加到线程池的任务链表中
	struct task *tmp = pool->task_list;

	// 尾插法将添加新任务添加到线程池的任务链表中
	while(tmp->next != NULL)
		tmp = tmp->next;
	tmp->next = new_task;

	// 要处理的任务的数量+1
	pool->waiting_tasks++;

	//=========== UNLOCK ============//
	pthread_mutex_unlock(&pool->lock);
	//===============================//

	//调试
	#ifdef DEBUG
	printf("[%u][%s] ==> a new task has been added.\n",
		(unsigned)pthread_self(), __FUNCTION__);
	#endif

	//唤醒第一个处于阻塞队列中的线程
	pthread_cond_signal(&pool->cond);
	return true;
}

添加线程到线程池

// 向线程池加入新线程,返回实际加入的数量
int add_thread(thread_pool *pool, unsigned additional_threads)
{
	//判断需要添加的新线程的数量是否为0
	if(additional_threads == 0)
		return 0;

	// 计算线程池中总线程的数量
	unsigned total_threads = pool->active_threads + additional_threads;

	int i, actual_increment = 0;

	// 循环添加线程,不能大于最大线程活跃数量
	for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++)
	{
		// 创建新线程
		if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0)
		{
			perror("add threads error");

			// no threads has been created, return fail
			if(actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++; 

		#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	// 记录此时线程池中活跃线程的总数
	pool->active_threads += actual_increment;
	return actual_increment;
}

从线程池中移除线程

// 移除线程池中的线程,返回线程池中剩余的线程数量
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	if(removing_threads == 0)
		return pool->active_threads;

	// 计算剩余线程数量,并判断移除线程数量的可行性
	int remaining_threads = pool->active_threads - removing_threads;
	remaining_threads = remaining_threads > 0 ? remaining_threads : 1;

	int i; // 定义变量用于记录剩余线程数量
	// 循环移除线程
	for(i=pool->active_threads-1; i>remaining_threads-1; i--)
	{
		errno = pthread_cancel(pool->tids[i]);

		if(errno != 0)
			break;

		#ifdef DEBUG
		printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
			(unsigned)pthread_self(), __FUNCTION__,
			i, (unsigned)pool->tids[i]);
		#endif
	}

	// 执行循环删除线程后,剩余线程数量并未改变,说明删除线程数量参数异常
	if(i == pool->active_threads-1)
		return -1;
	else // 否则更新线程数量
	{
		pool->active_threads = i+1;
		return i+1;
	}
}

销毁线程池

// 销毁线程池
bool destroy_pool(thread_pool *pool)
{
	pool->shutdown = true; // 设置线程状态为 关闭
	pthread_cond_broadcast(&pool->cond); // 唤醒所有阻塞线程

	// 等待所有线程结束
	int i;
	for(i=0; i<pool->active_threads; i++)
	{
		errno = pthread_join(pool->tids[i], NULL); // 等待线程结束,并回收资源
		if(errno != 0)
		{
			printf("join tids[%d] error: %s\n",
					i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
		
	}

	// 释放内存空间
	free(pool->task_list);
	free(pool->tids);
	free(pool);

	return true;
}

线程例程函数

void *routine(void *arg)
{
	//调试
	#ifdef DEBUG
	printf("[%u] is started.\n",
		(unsigned)pthread_self());
	#endif

	//把需要传递给线程任务的参数进行备份
	thread_pool *pool = (thread_pool *)arg;
	struct task *p;

	while(1)
	{
		/*
		** push a cleanup functon handler(), make sure that
		** the calling thread will release the mutex properly
		** even if it is cancelled during holding the mutex.
		**
		** NOTE:
		** pthread_cleanup_push() is a macro which includes a
		** loop in it, so if the specified field of codes that 
		** paired within pthread_cleanup_push() and pthread_
		** cleanup_pop() use 'break' may NOT break out of the
		** truely loop but break out of these two macros.
		** see line 82 below.
		*/

		//================================================//

		/**
		* void pthread_cleanup_push(void (*routine)(void *), void *arg);
		*
		* @routine: 指向清理函数的指针
		* @arg: 传递给清理函数的参数
		*
		* 用于在线程执行过程中注册一个清理函数hander
		* 以便在线程被取消或退出时自动调用该清理函数进行资源释放或清理操作
		*
		*/
		
		// 此处注册了一个解锁函数,无论发生何种情况(如线程被取消),都能正确地释放互斥锁,避免死锁的发生
		pthread_cleanup_push(handler, (void *)&pool->lock);  
		pthread_mutex_lock(&pool->lock);
		//================================================//

		// 若线程池中没有任务,并且线程池未处于关闭状态
		while(pool->waiting_tasks == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->cond, &pool->lock); // 阻塞等待
		}

		// 若线程池中没有任务,且线程池处于关闭状态
		if(pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			// 结束线程
			pthread_mutex_unlock(&pool->lock);
			pthread_exit(NULL); // CANNOT use 'break';
		}

		// 线程池中有任务,则继续执行,并更新线程池信息
		p = pool->task_list->next;
		pool->task_list->next = p->next;
		pool->waiting_tasks--; 

		//================================================//
		pthread_mutex_unlock(&pool->lock);
		// 解除注册的清理函数。如果传递 1,则立即执行清理函数;如果传递 0,则不执行清理函数但会解除注册
		pthread_cleanup_pop(0);
		//================================================//

		// 控制线程的取消状态,以确保在执行任务时,线程不会被取消
		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
		(p->do_task)(p->arg);
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

		free(p);
	}

	pthread_exit(NULL);
}

使用示例

#include "thread_pool.h"

void *mytask(void *arg)
{
	int n = (int)arg;

	printf("[%u][%s] ==> job will be done in %d sec...\n",
		(unsigned)pthread_self(), __FUNCTION__, n);

	sleep(n);

	printf("[%u][%s] ==> job done!\n",
		(unsigned)pthread_self(), __FUNCTION__);

	return NULL;
}

void *count_time(void *arg)
{
	int i = 0;
	while(1)
	{
		sleep(1);
		printf("sec: %d\n", ++i);
	}
}


int main(void)
{
	pthread_t a;
	pthread_create(&a, NULL, count_time, NULL);

	// 1, initialize the pool
	thread_pool *pool = malloc(sizeof(thread_pool));
	init_pool(pool, 2);

	// 2, throw tasks
	printf("throwing 3 tasks...\n");
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));

	// 3, check active threads number
	printf("current thread number: %d\n",
			remove_thread(pool, 0));
	sleep(9);

	// 4, throw tasks
	printf("throwing another 2 tasks...\n");
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));

	// 5, add threads
	add_thread(pool, 2);

	sleep(5);

	// 6, remove threads
	printf("remove 3 threads from the pool, "
	       "current thread number: %d\n",
			remove_thread(pool, 3));

	// 7, destroy the pool
	destroy_pool(pool);
	return 0;
}

标签:task,Thread,void,threads,Pool,pthread,线程,pool
From: https://www.cnblogs.com/LeanderPeng/p/18241467

相关文章

  • 翻译《The Old New Thing》- Why isn’t there a SendThreadMessage function?
    Whyisn'tthereaSendThreadMessagefunction?-TheOldNewThing(microsoft.com)https://devblogs.microsoft.com/oldnewthing/20081223-00/?p=19743RaymondChen 2008年12月23日为什么没有SendThreadMessage函数?简要文章讨论了Windows中不存在`SendThread......
  • 快速使用 ThreadPoolExecutor 并行加速
    总览一般的Python脚本只会用上单线程。对于IO密集型任务,用多线程加速会快得多。本文会给出一个模板,使用ThreadPoolExecutor进行并行加速。注意,由于GIL的存在,对于CPU密集型任务ProcessPoolExecutor是更好的选择。快速使用ThreadPoolExecutor请看以下模板。fro......
  • threadX 消息队列
    1、使用消息列的目的在ThreadX操作系统下使用消息队列的目的主要有以下几点:提高CPU利用率:消息队列是RTOS(实时操作系统)中常用的一种数据通信方式,常用于任务与任务之间或是中断与任务之间的数据传递。相比裸机系统中使用全局变量进行数据传递需要不断地轮询标志状态,使用RT......
  • RT-Thread Studio使用教程
    介绍RT-ThreadStudio是官方出品的一款专门针对RT-Thread嵌入式开发、部署、调试、测试的集成开发环境,它基于Eclipse开源项目开发,极大的提高了嵌入式开发者的开发效率,目前最新版本是2.26下载使用浏览器打开RT-Thread官网,选择左上角资源点击RT-ThreadStudio,打开RT-ThreadStudi......
  • RT-Thread和Infineon主持的嵌入式网络应用开发沙龙
    主题会议由RT-Thread&&Infineon共同主持,PSoc62开发板现场演示从0到1搭建智能数据网关RT-Thread介绍rt-thread社区负责人郭占鑫郭工介绍RT-Thread英飞凌合作伙伴介绍英飞凌产品负责人介绍英飞凌的产品动态、分享未来的一些嵌入式技术发展方向以及应用案例技术分享(钩子函......
  • go pool
     来自:sync.Pool原理sync.Pool核心对象有三个New:函数,负责对象初始化Get:获取Pool中的对象,如果Pool中对象不存在则会调用NewPut:将对象放入Pool中NewfuncPool的结构很简单,就5个字段 typePoolstruct{ ... Newfunc()interface{} }......
  • GD32F4xx+RT-Thread,SPI驱动、文件系统挂载应该怎么写
    在GD32F470芯片上使用RT-Thread操作系统,配合NORFlash(GD25Q256)编写SPI驱动、文件系统驱动以及挂载关联,可以按照以下步骤进行:1.SPI驱动编写首先需要编写SPI驱动来控制GD32F470与NORFlash之间的通信。这包括初始化SPI接口,配置SPI时钟、极性、相位等参数,并实现SPI传输函数。S......
  • Python并发 :ThreadPoolExecutor
    concurrent.futures是Python中执行异步编程的重要工具,它提供了以下两个类: 1.ThreadPoolExecutorfromconcurrent.futuresimportThreadPoolExecutordeftest(num):print("Threads"num)#新建ThreadPoolExecutor对象并指定最大的线程数量withThreadPoolExecutor(......
  • mysql中InnoDB存储引擎的Buffer Pool
    大家好。众所周知,对于使用InnoDB作为存储引擎的表来说,不管是用于存储用户数据的索引(包括聚簇索引和二级索引),还是各种系统数据,都是存储在磁盘上的。在处理客户端的请求时,当需要访问某个页的数据时,就会把完整的页的数据全部加载到内存中。将整个页加载到内存中后就可以进行读......
  • 从上下文切换谈thread_local工作原理
    从上下文切换谈thread_local工作原理thread_local是什么熟悉多线程编程的小伙伴一定对thread_local不陌生,thread_local是C++11引入的一种存储类说明符,用于定义每个线程都有其独立实例的变量。每个线程对这些变量有自己的副本,而不共享其他线程的副本。这在多线程编程中非常有......