首页 > 其他分享 >【C】线程池实现

【C】线程池实现

时间:2024-06-12 20:29:17浏览次数:31  
标签:实现 void int mutex pthread 线程 pool

后续会移植为C++版

文章目录


一、线程池原理

  • 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
  • 线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:

  • 任务队列,存储需要处理的任务,由工作的线程来处理这些任务
    • 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
    • 已处理的任务会被从任务队列中删除
    • 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
  • 工作的线程(任务队列任务的消费者) ,N个
    • 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
    • 工作的线程相当于是任务队列的消费者角色,
    • 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
    • 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
  • 管理者线程(不处理任务队列中的任务),1个
    • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
    • 当任务过多的时候, 可以适当的创建一些新的工作线程
    • 当任务过少的时候, 可以适当的销毁一些工作的线程
      线程池原理

二、 一些函数

具体函数可见博客:【C++】多线程(基于Windows以及pthread库)

2.1 pthread_cond_wait()

用于阻塞当前线程,等待别的线程使用pthread_cond_signal()pthread_cond_broadcast来唤醒它pthread_cond_wait()必须与pthread_mutex配套使用。pthread_cond_wait()函数一进入wait状态就会自动release mutex。
当其他线程通过pthread_cond_signal()或pthread_cond_broadcast,把该线程唤醒,使pthread_cond_wait()通过(返回)时,该线程又自动获得该mutex。

2.2 pthread_cond_signal()

pthread_cond_signal函数的作用是发送一个信号给另外一个正在处于阻塞等待状态的线程,使其脱离阻塞状态,继续执行.如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。
使用pthread_cond_signal一般不会有“惊群现象”产生,他最多只给一个线程发信号。假如有多个线程正在阻塞等待着这个条件变量的话,那么是根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行。如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号。但无论如何一个pthread_cond_signal调用最多发信一次。

2.3 pthread_create()

int pthread_create(pthread_t* restrict tidp,const pthread_attr_t* restrict_attr,void* (*start_rtn)(void*),void *restrict arg);

参数:

  • tidp:事先创建好的pthread_t类型的参数。成功时tidp指向的内存单元被设置为新创建线程的线程ID。
  • attr:用于定制各种不同的线程属性。通常直接设为NULL。
  • start_rtn:新创建线程从此函数开始运行。无参数是arg设为NULL即可。
  • arg:start_rtn函数的参数。无参数时设为NULL即可。有参数时输入参数的地址。当多于一个参数时应当使用结构体传入。

2.4 pthread_exit()

void pthread_exit(void *retval);
  • retval 是void*类型的指针,可以指向任何类型的数据,它指向的数据将作为线程退出时的返回值。如果线程不需要返回任何数据,将 retval 参数置为NULL即可。

retval 指针不能指向函数内部的局部数据(比如局部变量)。换句话说,pthread_exit() 函数不能返回一个指向局部数据的指针,否则很可能使程序运行结果出错甚至崩溃。

三、 任务队列定义

// 任务结构体
typedef struct Task
{
    void (*function)(void* arg);
    void* arg;
}Task;

四、 线程池定义

// 线程池结构体
typedef struct ThreadPool {
	// 任务队列
	Task* taskQ;
	int queueCapacity;		// 容量
	int queueSize;			// 当前任务个数
	int queueFront;			// 队头 -> 取数据
	int queueRear;			// 队尾 -> 放数据

	// 定义线程的操作
	pthread_t managerID;    // 管理者线程ID
	pthread_t* threadIDs;   // 工作线程ID,工作线程有多个,定义为数组
	int minNum;             // 最小线程数量
	int maxNum;             // 最大线程数量
	int busyNum;            // 忙的线程的个数
	int liveNum;            // 存活的线程的个数
	int exitNum;            // 要销毁的线程个数,任务特别少时,需要销毁线程

	// 同步操作
	pthread_mutex_t mutexPool;  // 锁整个的线程池
	pthread_mutex_t mutexBusy;  // 锁busyNum变量
	pthread_cond_t notFull;     // 信号量,任务队列是不是满了,满了需要阻塞生产者,不能添加任务
	pthread_cond_t notEmpty;    // 信号量,任务队列是不是空了,空了需要阻塞消费者,不能消费任务

	int shutdown;				// 是不是要销毁线程池, 销毁为1, 不销毁为0
}ThreadPool;

五、 头文件内容 threadpool.h

#pragma once
#include<pthread.h>
#include<windows.h>
typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化,传入最大最小线程个数,以及任务容量大小
ThreadPool* threadPoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);



//
// 
// 工作的线程(消费者线程)任务函数,需要不断地从线程池的任务队列去读任务
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);

六、 .c文件实现

6.1 threadpool.c文件

#include "threadpool.h"
#include<stdio.h>
#include<malloc.h>
const int NUMBER = 2;

// @file:threadpool
// @author:IdealSanX_T
// @date:2024/6/12 9:21:15
// @brief:手撸线程池C语言版

// 任务结构体
typedef struct Task {
	void (*function)(void* arg); //函数指针void* arg为泛型参数
	void* arg; //参数地址
}Task;

// 线程池结构体
typedef struct ThreadPool {
	// 任务队列
	Task* taskQ;
	int queueCapacity;		// 容量
	int queueSize;			// 当前任务个数
	int queueFront;			// 队头 -> 取数据
	int queueRear;			// 队尾 -> 放数据

	// 定义线程的操作
	pthread_t managerID;    // 管理者线程ID
	pthread_t* threadIDs;   // 工作线程ID,工作线程有多个,定义为数组
	int minNum;             // 最小线程数量
	int maxNum;             // 最大线程数量
	int busyNum;            // 忙的线程的个数
	int liveNum;            // 存活的线程的个数
	int exitNum;            // 要销毁的线程个数,任务特别少时,需要销毁线程

	// 同步操作
	pthread_mutex_t mutexPool;  // 锁整个的线程池
	pthread_mutex_t mutexBusy;  // 锁busyNum变量
	pthread_cond_t notFull;     // 信号量,任务队列是不是满了,满了需要阻塞生产者,不能添加任务
	pthread_cond_t notEmpty;    // 信号量,任务队列是不是空了,空了需要阻塞消费者,不能消费任务

	int shutdown;				// 是不是要销毁线程池, 销毁为1, 不销毁为0
}ThreadPool;

// 初始化线程池
ThreadPool* threadPoolCreate(int min, int max, int queueSize) {
	ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool)); // 为线程池结构体分配内存
	if (pool == NULL) {
		printf("malloc threadpool fail...\n");
		return NULL;
	}
	do {
		// 任务队列初始化
		pool->queueCapacity = queueSize; // 任务容量
		pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);  // 为任务队列分配容量内存
		if (pool->taskQ == NULL) {
			printf("malloc taskQ fail...\n");
			break;
		}
		pool->queueFront = 0;
		pool->queueRear = 0;  //队头/尾指针初始化为0

		// 同步操作初始化,锁的初始化需要调用方法pthread_mutex_init()
		if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
			pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
			pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
			pthread_cond_init(&pool->notFull, NULL) != 0)
		{
			printf("mutex or condition init fail...\n");
			break;
		}

		// 线程的操作初始化
		pool->minNum = min;  // 线程数量最小值
		pool->maxNum = max;	 // 线程数量最大值
		pool->busyNum = 0;   // 工作线程初始化
		pool->exitNum = 0;   // 销毁消除初始化
		pool->liveNum = min; // 存活线程初始化,和最小个数相等(存活线程不等于工作线程,存活不一定工作)
		// 管理者线程ID初始化
		pthread_create(&pool->managerID, 0, manager, pool);
		// 消费者线程ID初始化,分配线程最大数容量
		pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
		if (pool->threadIDs == NULL){
			printf("malloc threadIDs fail...\n");
			break;
		}

		// 消费者线程ID数组初始化
		memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
		for (int i = 0; i < max; i++) {
			pthread_create(&pool->threadIDs[i], 0, worker, pool);
		}

		// 销毁线程池标志位初始化
		pool->shutdown = 0;

		return pool;
	} while (false);

	// 将初始化放在一次的循环里,是为了再分配内存时如果出错,使用break跳出循环,而不是return
	// 这样做是为了每次失败后,可以跳出循环来释放已经分配的内存


	// 释放资源
	if (pool && pool->threadIDs) free(pool->threadIDs);
	if (pool && pool->taskQ) free(pool->taskQ);
	if (pool) free(pool);

	return NULL;
}

// 线程池销毁
int threadPoolDestroy(ThreadPool* pool){
	if (pool == NULL){
		return -1;
	}

	// 关闭线程池
	pool->shutdown = 1;
	// 阻塞回收管理者线程
	pthread_join(pool->managerID, NULL);
	// 唤醒阻塞的消费者线程,被阻塞的消费者被唤醒后,pool->shutdown == 1时会自杀
	for (int i = 0; i < pool->liveNum; ++i){
		pthread_cond_signal(&pool->notEmpty);
	}

	// 释放堆内存
	if (pool->taskQ){
		free(pool->taskQ);
		pool->taskQ = NULL;
	}
	if (pool->threadIDs){
		free(pool->threadIDs);
	}

	pthread_mutex_destroy(&pool->mutexPool);
	pthread_mutex_destroy(&pool->mutexBusy);
	pthread_cond_destroy(&pool->notEmpty);
	pthread_cond_destroy(&pool->notFull);

	free(pool);
	pool = NULL;

	return 0;
}

// 向任务队列添加任务,参数:线程池指针,函数指针,函数参数
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg){
	// 互斥访问线程池
	pthread_mutex_lock(&pool->mutexPool);
	// 当任务队列满时并且线程池没有关闭时,阻塞此函数
	while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
		pthread_cond_wait(&pool->notFull, &pool->mutexPool);
	}

	// 线程池关闭时,解锁退出
	if (pool->shutdown) {
		pthread_mutex_unlock(&pool->mutexPool);
		return;
	}

	// 添加任务
	pool->taskQ[pool->queueRear].function = func;
	pool->taskQ[pool->queueRear].arg = arg;

	// 移动队尾指针
	pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;

	// 任务队列数量+1
	pool->queueSize++;

	// 释放不为空的信号量
	pthread_cond_signal(&pool->notEmpty);
	// 解锁
	pthread_mutex_unlock(&pool->mutexPool);
}

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool){

	pthread_mutex_lock(&pool->mutexBusy);
	int busyNum = pool->busyNum;
	pthread_mutex_unlock(&pool->mutexBusy);
	return busyNum;
}

int threadPoolAliveNum(ThreadPool* pool){
	pthread_mutex_lock(&pool->mutexPool);
	int liveNum = pool->liveNum;
	pthread_mutex_unlock(&pool->mutexPool);
	return liveNum;
}


// 工作线程实现(消费者)
void* worker(void* arg){
	// 接收传入的线程池
	ThreadPool* pool = (ThreadPool*)arg;

	// 不断从线程池的任务队列中取出任务
	while (true) {
		// 对线程池上锁,互斥访问线程池
		pthread_mutex_lock(&pool->mutexPool);
		
		// 任务队列为空并且线程池没有关闭时
		while (pool->queueSize == 0 && !pool->shutdown) {
			// 当notEmpty信号量没有时,则为空,就阻塞工作线程,并且释放mutexPool
			// 不为空时,会被唤醒,消费notEmpty信号量,并重新对mutexPool加锁
			pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);


			// 判断是不是要销毁线程,exitNUM会在管理者进程中需要销毁线程时赋值
			if (pool->exitNum > 0){
				pool->exitNum--;
				if (pool->liveNum > pool->minNum){
					pool->liveNum--;
					pthread_mutex_unlock(&pool->mutexPool);
					// 自定义线程退出函数,而没有直接用pthread_exit(),
					// 需要将退出的线程所在线程队列位置进行置空
					threadExit(pool); 
				}
			}
		}

		// 判断线程池是否被关闭了
		if (pool->shutdown)
		{
			pthread_mutex_unlock(&pool->mutexPool);
			threadExit(pool);
		}

		// 从任务队列中取出一个任务
		Task task;
		task.function = pool->taskQ[pool->queueFront].function;
		task.arg = pool->taskQ[pool->queueFront].arg;
		// 移动任务队列头结点
		pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
		pool->queueSize--; // 任务队列中任务数量减一
		// 解锁
		pthread_cond_signal(&pool->notFull); // 消费了任务,释放不满信号量,notFull++
		pthread_mutex_unlock(&pool->mutexPool);

		printf("thread %ld start working...\n", pthread_self());
		pthread_mutex_lock(&pool->mutexBusy);  //上锁,互斥访问busyNum
		pool->busyNum++;
		pthread_mutex_unlock(&pool->mutexBusy); //解锁
		task.function(task.arg);
		free(task.arg); //释放函数参数的堆内存
		task.arg = NULL;

		printf("thread %ld end working...\n", pthread_self());
		pthread_mutex_lock(&pool->mutexBusy); //上锁,互斥访问busyNum
		pool->busyNum--;
		pthread_mutex_unlock(&pool->mutexBusy); //解锁
	}
	return NULL;
}

// 管理者线程实现
void* manager(void* arg){
	ThreadPool* pool = (ThreadPool*)arg;
	while (!pool->shutdown)
	{
		// 每隔3s检测一次,是否需要添加/销毁线程
		Sleep(3000);

		// 取出线程池中任务的数量和当前线程的数量,互斥访问queueSize,liveNum
		pthread_mutex_lock(&pool->mutexPool);
		int queueSize = pool->queueSize;
		int liveNum = pool->liveNum;
		pthread_mutex_unlock(&pool->mutexPool);

		// 取出忙的线程的数量,互斥访问busyNum
		pthread_mutex_lock(&pool->mutexBusy);
		int busyNum = pool->busyNum;
		pthread_mutex_unlock(&pool->mutexBusy);
		

		// 添加线程,每次最多添加两个线程,也可以修改添加多个
		// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数,这个条件是自定义,可以修改
		if (queueSize > liveNum && liveNum < pool->maxNum){
			// 加锁,互斥访问线程池
			pthread_mutex_lock(&pool->mutexPool);

			int counter = 0; // 添加线程的数量
			for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i){
				if (pool->threadIDs[i].x == 0){ // 将任务放到线程队列中空闲位置
					pthread_create(&pool->threadIDs[i], NULL, worker, pool);
					counter++;
					pool->liveNum++;
				}
			}
			pthread_mutex_unlock(&pool->mutexPool); // 解锁
		}

		// 销毁线程,当空闲的存活线程过多,就需要销毁一部分
		// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数,这个条件是自定义,可以修改
		if (busyNum * 2 < liveNum && liveNum > pool->minNum){
			// 互斥访问线程池
			pthread_mutex_lock(&pool->mutexPool);
			pool->exitNum = NUMBER;
			pthread_mutex_unlock(&pool->mutexPool);

			// 让空闲工作的线程自杀
			for (int i = 0; i < NUMBER; ++i){
				// 唤醒在任务队列为空时工作的线程,使其向下执行自杀操作
				pthread_cond_signal(&pool->notEmpty);
			}
		}
	}
	return NULL;
}

void threadExit(ThreadPool* pool){
	pthread_t tid = pthread_self();
	for (int i = 0; i < pool->maxNum; ++i){
		// 寻找自杀的线程在线程队列里的位置
		if (pthread_equal(pool->threadIDs[i], tid)) {
			memset(&pool->threadIDs[i], 0, sizeof(pthread_t));
			printf("threadExit() called, %ld exiting...\n", tid);
			break;
		}
	}
	pthread_exit(NULL);
}

6.2 TestMain 测试文件

#include<stdio.h>
#include<string>
#include"threadpool.h"

// @file:TestMain
// @author:IdealSanX_T
// @date:2024/6/12 19:27:58
// @brief:

void taskFunc(void* arg){
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
    Sleep(1000);
}

int main() {
    // 创建线程池
    ThreadPool* pool = threadPoolCreate(3, 10, 100);
    
    for (int i = 0; i < 10; ++i){
        int* num = (int*)malloc(sizeof(int));
        *num = i;
        printf("%d\n", *num);
        threadPoolAdd(pool, taskFunc, num);
    }

    Sleep(3000);

    threadPoolDestroy(pool);
    return 0;
}

标签:实现,void,int,mutex,pthread,线程,pool
From: https://blog.csdn.net/qq_50921201/article/details/139591876

相关文章

  • 【C++】多线程(基于Windows以及pthread库)
    文章目录一、前言1.1进程和线程二、创建线程2.1线程函数pthread_self(void)2.2创建线程三、线程退出3.1线程函数pthread_exit()四、线程回收4.1线程函数pthread_join()4.2线程数据回收五、线程分离5.1线程函数pthread_detach()六、C++线程类七、线程同......
  • PasteSpider的集群组件PasteCluster(让你的项目快速支持集群模式)的思路及实现(含源码
    PasteSpider是什么?一款使用.net编写的开源的Linux容器部署助手,支持一键发布,平滑升级,自动伸缩,Key-Value配置,项目网关,环境隔离,运行报表,差量升级,私有仓库,集群部署,版本管理等!30分钟上手,让开发也可以很容易的学会在linux上部署你得项目![从需求角度介绍PasteSpider(K8S平替部署......
  • DP经典问题----背包问题的代码实现(入门级)(C++/PYTHON)
    背包的状态转换方程i:表示物品序号j:表示背包大小W[i]:表示第i件物品的重量f[i,j]:表示在前i件物品中选择若干件放在承重为j的背包中,可以取得的最大价值f[i-1,j-Wi]:表示在前i-1件物品中选择若干件放在承重为j-Wi的背包中,可以取得的最大价值Pi(j>=Wi):表示第i件物品的价值,要......
  • 线程安全问题【snychornized 、死锁、线程通信】
    目录一、线程安全1.1线程安全问题?1.2如何解决线程安全问题方法具体如何实现?1.3同步方法1.4同步代码块1.5总结1.6售票例子1.8补充二、线程安全的集合三、死锁【了解】四、线程通信4.1同步方法4.2同步代码块4.3wait和sleep本篇的思维导图最后一、线程......
  • 栈溢出漏洞利用二,ret2syscall,构造rop链条实现攻击(pwn入门)
    原理原理就直接参考别的大佬写的文章讲下了 参考文章:https://blog.csdn.net/qq_33948522/article/details/93880812ret2syscall,即控制程序执行系统调用,获取shellret2syscall通常采用execve(重点函数,32位调用号为0x0b,64位调用号为0x3b)ROPReturnOrientedProgramming,其......
  • 记录--前端实现文件预览(word、excel、pdf、ppt、xmind、 音视频、图片、文本) 国际化
    ......
  • python指南之多线程与多进程编程大全
    Python作为一种高级编程语言,提供了多种并发编程的方式,其中多线程与多进程是最常见的两种方式之一。在本文中,我们将探讨Python中多线程与多进程的概念、区别以及如何使用线程池与进程池来提高并发执行效率。多线程与多进程的概念多线程多线程是指在同一进程内,多个线程并发执......
  • 从零手写实现 nginx-19-HTTP CORS(Cross-Origin Resource Sharing,跨源资源共享)介绍+解
    前言大家好,我是老马。很高兴遇到你。我们为java开发者实现了java版本的nginxhttps://github.com/houbb/nginx4j如果你想知道servlet如何处理的,可以参考我的另一个项目:手写从零实现简易版tomcatminicat手写nginx系列如果你对nginx原理感兴趣,可以阅读:从零......
  • Flutter-使用MethodChannel 实现与iOS交互
    前言使用MethodChannel在Flutter与原生Android和iOS之间进行通信,可以让你在Flutter应用中调用设备的原生功能。基础概念MethodChannel:Flutter提供的通信机制,允许消息以方法调用的形式在Flutter与原生代码之间传递。方法调用:从Flutter向原生或从原生向Flu......
  • Java线程池以及Future和CompletableFuture的用法
    参考:https://blog.csdn.net/weixin_50330544/article/details/1316871501.线程池为什么使用线程池?频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。系统无法合理管理内部的资源分布,会降低系统的稳定性。......