首页 > 编程语言 >线程池的实现源码及应用举例

线程池的实现源码及应用举例

时间:2024-06-02 19:55:41浏览次数:29  
标签:task thread 源码 threads 举例 pthread 线程 pool

1.线程池本质

​ 多个线程组成的一个集合,目的为了并发执行任务,定义时是一个结构体,成员有互斥锁,条件变量,任务链队列指针,任务链队列中等待的任务个数,当前活跃的线程数量,线程ID,线程销毁标记

2.线程池的关键技术
(1)万能函数指针(通用函数指针): *void *(*p)(void )
(使用技巧:函数的参数个数超过1个时,参数可以打包成结构体,多个参数就变成一个参数(全部包含在结构体里面了))

原理:该函数需要用到互斥锁在完成一个任务后减少任务数量,解锁后继续下一个任务,还要用到条件变量在任务全部完成时通过判断任务数量和结束标志位退出。

(2)封装线程池有关的接口函数
三大基本函数需要我们去封装
第一个:初始化线程池
原理:通过对线程池结构体中的成员初始化让线程池进入工作模式,随后使用循环创建对应数量的线程

第二个:添加任务

原理:通过动态分配内存准备新的内存空间分配给新的任务,在添加任务时利用互斥锁上锁防止任务函数完成任务时减少任务数量带来的冲中突,将任务尾插到任务链表中,并目对线程池结构体中各个成员变量进行更新。

第三个:线程池的销毁(回收线程,想办法让线程的任务函数退出)
原理:通过改变线程池结构体成员变量中的结束标志位,令所有线程能够退出,随后在任务函数中开始退出线程,并在这个线程池销毁函数中回收所有线程。

线程池实例源码列举如下:


头文件(thread_pool.h)

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS 1000 // 处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20  // 活跃的线程数量

// 任务结点  单向链表的节点,类型
struct task
{
	void *(*do_task)(void *arg); // 任务函数指针  指向线程要执行的任务  格式是固定的
	void *arg;					 // 需要传递给任务的参数,如果不需要,则NULL

	struct task *next; // 指向下一个任务结点的指针
};

// 线程池的管理结构体
typedef struct thread_pool
{
	pthread_mutex_t lock; // 互斥锁
	pthread_cond_t cond;  // 条件量

	bool shutdown;
	/*是否需要销毁线程池,用于指示线程池是否处于销毁状态。它的作用是在线程池需要被销毁时,向线程池中的工作线程发出信号,告知它们停止接受新的任务,并逐渐退出。具体来说,当 shutdown 标记被设置为 true 时,线程池将不再接受新的任务提交,但会继续执行已经提交的任务,直到所有任务都执行完毕。一旦线程池中的任务执行完毕,工作线程就会逐个退出,释放相关资源,最终销毁整个线程池。*/

	struct task *task_list; // 用于存储任务的链表

	pthread_t *tids; // 用于记录线程池中线程的ID

	unsigned max_waiting_tasks; // 线程池中线程的数量最大值
	unsigned waiting_tasks;		// 处于等待状态的线程数量
	unsigned active_threads;	// 正在活跃的线程数量
} thread_pool;

// 初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number);

// 向线程池中添加任务
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);

// 先线程池中添加线程
int add_thread(thread_pool *pool, unsigned int additional_threads_number);

// 从线程池中删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);

// 销毁线程池
bool destroy_pool(thread_pool *pool);

// 任务函数
void *routine(void *arg);

接口函数的实现源码(thread_pool.c):

#include "thread_pool.h"

/*
 *	@name   : handler
 *	@brief  : 接到取消请求之后进行释放互斥锁
 *	@params :
 *	          @arg : 传入每个添加的任务随机的10秒内的秒数
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void handler(void *arg)
{
	printf("[%u] is ended.\n",
		   (unsigned)pthread_self()); // 响应取消请求之后自动处理的例程:释放互斥锁,以确保不会因为线程被取消而导致资源泄漏或死锁等问题。
	pthread_mutex_unlock((pthread_mutex_t *)arg);
}

/*
 *	@name   : routine
 *	@brief  : 接到取消请求之后进行释放互斥锁
 *	@params :
 *	          @arg : 传递给线程任务的参数
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void *routine(void *arg)
{
// 调试
#ifdef DEBUG
	printf("[%u] is started.\n",
		   (unsigned)pthread_self());
#endif
	// 把需要传递给线程任务的参数进行备份
	thread_pool *pool = (thread_pool *)arg;
	struct task *p;

	while (1)
	{
		/*
		pthread_cleanup_push()是一个宏,用于向线程的取消处理器栈中注册(也可以简单理解为绑定)一个处理函数。作用是在线程退出时自动执行注册的处理函数,即handler
		*/
		pthread_cleanup_push(handler, (void *)&pool->lock);
		pthread_mutex_lock(&pool->lock); // 解锁,申请资源
		// 1,没有任务,且线程池没有被销毁,就挂起等待
		while (pool->waiting_tasks == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->cond, &pool->lock);
		}
		// 2, 没有任务,且线程池被标记销毁,就释放互斥锁并结束进程
		if (pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);
			pthread_exit(NULL); // CANNOT use 'break';
		}

		// 3, 从任务列表中取出一个任务进行消费,并更新线程池的任务列表和等待任务数量???如果线程数不够怎么办
		p = pool->task_list->next;
		pool->task_list->next = p->next;
		pool->waiting_tasks--;
		// 4, 释放互斥锁,并用pthread_cleanup_pop取消在pthread_cleanup_push中注册的清理处理器
		pthread_mutex_unlock(&pool->lock);
		pthread_cleanup_pop(0);

		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // 设置取消状态为禁用,防止在执行任务时被取消
		(p->do_task)(p->arg);								  // 执行任务函数,传入任务参数
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);  // 恢复取消状态为启用

		free(p); // 任务完成,释放任务所占用的内存空间
	}
}

/*
 *	@name   : init_pool
 *	@brief  : 初始化线程池
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	          @threads_number : 初始确定的线程数
 *	@retval : 成功返回true,失败false
 * 	@version:
 * 	@note   :
 */
bool init_pool(thread_pool *pool, unsigned int threads_number)
{

	pthread_mutex_init(&pool->lock, NULL); // 初始化互斥锁
	pthread_cond_init(&pool->cond, NULL);  // 初始化条件量

	pool->shutdown = false;										 //  设置销毁标志参数,初始化为不销毁
	pool->task_list = malloc(sizeof(struct task));				 // 给链表的节点申请堆内存
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); // 申请堆内存,用于存储创建出来的TID
	// 错误处理,对malloc进行错误处理
	if (pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	pool->task_list->next = NULL;				 // 对任务链表中的节点的指针域进行初始化
	pool->max_waiting_tasks = MAX_WAITING_TASKS; // 设置线程池中线程数量的最大值
	pool->waiting_tasks = 0;					 // 设置等待线程处理的任务的数量为0,说明现在没有任务
	pool->active_threads = threads_number;		 // 设置线程池中活跃的线程的数量

	for (int i = 0; i < pool->active_threads; i++) // 循环创建活跃线程
	{
		// 创建线程  把线程的ID存储在申请的堆内存
		if (pthread_create(&((pool->tids)[i]), NULL,
						   routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}
// 用于调试
#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}
	return true;
}

/*
 *	@name   : add_task
 *	@brief  : 向线程池的任务链表中添加任务,并唤醒
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	          @threads_number : 初始确定的线程数
 *	@retval : 成功返回true,失败false
 * 	@version:
 * 	@note   :
 */
// 先线程池的任务链表中添加任务
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{
	struct task *new_task = malloc(sizeof(struct task)); // 给任务链表节点申请内存
	if (new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	new_task->do_task = do_task;
	new_task->arg = arg;
	new_task->next = NULL; // 指针域设置为NULL

	pthread_mutex_lock(&pool->lock); // 进行线程池任务添加

	// 说明要处理的任务的数量大于能处理的任务数量,直接解锁释放资源并返回
	if (pool->waiting_tasks >= MAX_WAITING_TASKS)
	{
		pthread_mutex_unlock(&pool->lock);
		fprintf(stderr, "too many tasks.\n");
		free(new_task);
		return false;
	}

	struct task *tmp = pool->task_list; // 获取线程池单链表的头节点地址
	while (tmp->next != NULL)			// 遍历链表,找到单向链表的尾节点
		tmp = tmp->next;

	tmp->next = new_task;  // 把新的要处理的任务插入到链表的尾部  尾插
	pool->waiting_tasks++; // 要处理的任务的数量+1

	pthread_mutex_unlock(&pool->lock); // 解锁
// 调试
#ifdef DEBUG
	printf("[%u][%s] ==> a new task has been added.\n",
		   (unsigned)pthread_self(), __FUNCTION__);
#endif

	pthread_cond_signal(&pool->cond); // 唤醒第一个处于条件变量阻塞队列中的线程
	return true;
}

/*
 *	@name   : add_thread
 *	@brief  : 向线程池加入新线程
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	          @additional_threads : 需要添加的线程数
 *	@retval : 实际增加的线程数
 * 	@version:
 * 	@note   :
 */
int add_thread(thread_pool *pool, unsigned additional_threads)
{
	if (additional_threads == 0) // 判断需要添加的新线程的数量是否为0,是的话直接返回
		return 0;

	unsigned total_threads = pool->active_threads + additional_threads; // 计算线程池中总线程的数量

	int i, actual_increment = 0;													 // actual_increment 为实际增加的进程数
	for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) // 对实际增加的进程数进行限制,使得总和不能超过最大总数
	{
		// 创建新线程
		if (pthread_create(&((pool->tids)[i]),
						   NULL, routine, (void *)pool) != 0) // 增加进程任务处理
		{
			perror("add threads error");
			// no threads has been created, return fail
			if (actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++;

#ifdef DEBUG
		printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}
	// 记录此时线程池中活跃线程的总数,并作为返回值
	pool->active_threads += actual_increment;
	return actual_increment;
}
/*
 *	@name   : remove_thread
 *	@brief  : 移除多余线程
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	          @*removing_threads : 需要移除的线程数量
 *	@retval : 移除后剩下的活跃线程数
 * 	@version:
 * 	@note   :
 */
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	if (removing_threads == 0) // 判断需要添加的新线程的数量是否为0,是的话直接返回
		return pool->active_threads;

	int remaining_threads = pool->active_threads - removing_threads;
	remaining_threads = remaining_threads > 0 ? remaining_threads : 1; // 初步计算经过移除后,剩下的线程数,并判断;最终剩余的线程数不得小于1

	int i;
	for (i = pool->active_threads - 1; i > remaining_threads - 1; i--) // 依次移除线程,并进行错误判断
	{
		errno = pthread_cancel(pool->tids[i]);

		if (errno != 0)
			break;

#ifdef DEBUG
		printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
			   (unsigned)pthread_self(), __FUNCTION__,
			   i, (unsigned)pool->tids[i]);
#endif
	}

	if (i == pool->active_threads - 1) // 若没有移除成功,则返回-1
		return -1;
	else
	{
		pool->active_threads = i + 1;// 否则返回活跃的线程数
		return i + 1;
	}
}

/*
 *	@name   : destroy_pool
 *	@brief  : 销毁线程池
 *	@params :
 *	          @*pool : 线程池的管理结构体指针
 *	@retval : 成功返回true,失败false
 * 	@version:
 * 	@note   :
 */
bool destroy_pool(thread_pool *pool)
{
	pool->shutdown = true; // 1, 修改线程池结构体的成员,并通知所有线程
	pthread_cond_broadcast(&pool->cond);

	for (int i = 0; i < pool->active_threads; i++) // 2, 等待所有线程退出,并回收资源
	{
		errno = pthread_join(pool->tids[i], NULL);
		if (errno != 0)
		{
			printf("join tids[%d] error: %s\n",
				   i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
	}

	free(pool->task_list); // 释放申请了的堆内存
	free(pool->tids);
	free(pool);

	return true;
}

应用举例(main.c):

#include "thread_pool.h"
/**
 * @file name:	--
 * @brief
 * @author [email protected]
 * @date 2024/04/25
 * @version 1.0 :版本
 * @property :
 * @note
 * CopyRight (c)  2023-2024   [email protected]   All Right Reseverd
 */

/*
 *	@name   : mytask
 *	@brief  : 给每个线程安排的具体任务
 *	@params :
 *	          @arg : 传入每个添加的任务随机的10秒内的秒数
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 * 			  1.__FUNCTION__ 是 C 和 C++ 语言中的预定义宏,用于获取当前所在函数的名称(在 C++ 中,也包括成员函数)。它会在编译时被替换为当前函数的字符串字面值。
 * 			  2.这部分可以换成自己需要安排的进程任务
 */
void *mytask(void *arg)
{
	int n = (int)arg; // 定义整型变量接收参数

	printf("[%u][%s] ==> job will be done in %d sec...\n",
		   (unsigned)pthread_self(), __FUNCTION__, n);
	sleep(n);
	printf("[%u][%s] ==> job done!\n",
		   (unsigned)pthread_self(), __FUNCTION__);

	return NULL;
}

/*
 *	@name   : count_time
 *	@	    : 计时器,每隔一秒输出当前秒数
 *	@params :
 *	          @*arg : NULL
 *	@retval : NULL
 * 	@version:
 * 	@note   :
 */
void *count_time(void *arg)
{
	int i = 0;
	while (1)
	{
		sleep(1);
		printf("sec: %d\n", ++i);
	}
}

int main()
{
	// 创建线程进行实时输出时间
	pthread_t a;
	pthread_create(&a, NULL, count_time, NULL);

	// 1, initialize the pool 初始化带有2条线程的线程池
	thread_pool *pool = malloc(sizeof(thread_pool));
	init_pool(pool, 2);

	// 2, throw tasks  投入3个任务
	printf("throwing 3 tasks...\n");
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));

	// 3, check active threads number 显示当前的线程数量
	printf("current thread number: %d\n",
		   remove_thread(pool, 0));
	sleep(9);

	// 4, throw tasks 投入2个任务
	printf("throwing another 2 tasks...\n");
	add_task(pool, mytask, (void *)(rand() % 10));
	add_task(pool, mytask, (void *)(rand() % 10));
	// 5, add threads 增加2条线程
	add_thread(pool, 2);
	sleep(5);
	// 6, remove threads 移除3条线程
	printf("remove 3 threads from the pool, "
		   "current thread number: %d\n",
		   remove_thread(pool, 3));

	// 7, destroy the pool 销毁线程池
	destroy_pool(pool);
	return 0;
}

标签:task,thread,源码,threads,举例,pthread,线程,pool
From: https://www.cnblogs.com/cino/p/18227519

相关文章

  • Java面试题:解释一下Java中的synchronized关键字,它是如何保证线程安全的?
    在Java中,synchronized关键字是一种同步锁机制,用于确保多个线程在访问共享资源时能够保持线程安全。线程安全是指在多线程环境下,当多个线程尝试同时访问共享资源时,任何时刻最多只有一个线程能够执行特定的代码段。synchronized关键字可以用于以下几个方面:方法同步:当synch......
  • 免费的VMware ?就是它了!【送源码】
    在Docker没有出来之前,很多项目的的部署方案是使用虚拟机,在一台服务器上创建好几个虚机出来,配置一下网络,就可以把一台服务器当做多个服务器用了。而作为开发者来说,我们经常碰到需要使用不同操作系统的需求,比如刚开始打算学习Linux的时候,没有系统怎么办,买一台云服务器或者在......
  • Redis单线程
    Redis是基于Reactor模式开发的网络事件处理器,这个处理器是单线程的,所以redis是单线程的。为什么它是单线程还那么快呢?主要有以下几个原因:一、纯内存操作由于Redis是纯内存操作,相比于磁盘来说,内存就快得多,这个是Redis快的主要原因。二、多路复用I/O机制(NIO)Re......
  • stack源码阅读
    javaStackstack是一个后进先出的数据结构,继承于vector,自身提供了五种方法,pop、push、empty、peek、search本文主要介绍pop将一个元素入栈push将一个元素出栈packagejava.util;/***The{@codeStack}classrepresentsalast-in-first-out*(LIFO)......
  • 信号量(Semaphore),事件Event(了解),队列补充,进程池和线程池(重点),协程理论,Greenlet,Gevent模
    Ⅰ信号量(Semaphore)【一】什么是信号量信号量Semahpore(同线程一样)互斥锁:允许在同一时刻只能有一个线程或进程同资源进行修改信号量:允许指定数量的进程或线程对资源进行修改【二】例子比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去......
  • 【JUC】3-用户线程和守护线程
    一般情况下不做特别说明配置,默认都是用户线程 UserThread是系统的工作线程,它会完成这个程序需要完成的业务操作 DaemonThread是一种特殊的线程,为其它线程服务的,在后台默默的完成一些系统性的服务,比如垃圾回收线程就是最典型的例子守护线程作为一个服务线程,没有服务对象......
  • Java高并发核心编程.卷2,多线程、锁、JMM、JUC、高并发设计模式 (尼恩)电子版百度云
    书获取链接:python33  。c o  m我的阅读笔记:多线程:介绍Java多线程的基础概念,如线程的创建、启动、状态转换、线程间通信等。锁:深入探讨Java中的各种锁机制,包括内置锁(synchronized)、ReentrantLock、ReadWriteLock等,以及它们的使用场景和性能特点。Java内存模型(JMM):解释J......
  • 【数据结构】单链表-->详细讲解,后赋源码
    欢迎来到我的Blog,点击关注哦......
  • Springboot计算机毕业设计一次性环保餐具销售系统小程序【附源码】开题+论文+mysql+程
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景:随着外卖和快餐文化的快速发展,一次性餐具的使用量急剧增加,给环境带来了沉重的负担。传统的一次性餐具多为塑料制品,难以降解,对环境造成了长期污染。因......
  • Springboot计算机毕业设计药品外送小程序【附源码】开题+论文+mysql+程序+部署
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景:在当今快节奏的生活环境中,人们对便捷性的需求日益增长。特别是在医疗健康领域,当患者因疾病需要药品时,能够迅速获得所需药物显得至关重要。随着互联网......