后续会移植为C++版
文章目录
一、线程池原理
- 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
- 线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:
- 任务队列,存储需要处理的任务,由工作的线程来处理这些任务
- 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
- 已处理的任务会被从任务队列中删除
- 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
- 工作的线程(任务队列任务的消费者) ,N个
- 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
- 工作的线程相当于是任务队列的消费者角色,
- 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
- 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
- 管理者线程(不处理任务队列中的任务),1个
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
- 当任务过多的时候, 可以适当的创建一些新的工作线程
- 当任务过少的时候, 可以适当的销毁一些工作的线程
二、 一些函数
具体函数可见博客:【C++】多线程(基于Windows以及pthread库)
2.1 pthread_cond_wait()
用于阻塞当前线程,等待别的线程使用pthread_cond_signal()
或pthread_cond_broadcast
来唤醒它pthread_cond_wait()
必须与pthread_mutex
配套使用。pthread_cond_wait()
函数一进入wait状态就会自动release mutex。
当其他线程通过pthread_cond_signal()
或pthread_cond_broadcast
,把该线程唤醒,使pthread_cond_wait()
通过(返回)时,该线程又自动获得该mutex。
2.2 pthread_cond_signal()
pthread_cond_signal
函数的作用是发送一个信号给另外一个正在处于阻塞等待状态的线程,使其脱离阻塞状态,继续执行.如果没有线程处在阻塞等待状态,pthread_cond_signal
也会成功返回。
使用pthread_cond_signal
一般不会有“惊群现象”产生,他最多只给一个线程发信号。假如有多个线程正在阻塞等待着这个条件变量的话,那么是根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行。如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号。但无论如何一个pthread_cond_signal
调用最多发信一次。
2.3 pthread_create()
int pthread_create(pthread_t* restrict tidp,const pthread_attr_t* restrict_attr,void* (*start_rtn)(void*),void *restrict arg);
参数:
- tidp:事先创建好的pthread_t类型的参数。成功时tidp指向的内存单元被设置为新创建线程的线程ID。
- attr:用于定制各种不同的线程属性。通常直接设为NULL。
- start_rtn:新创建线程从此函数开始运行。无参数是arg设为NULL即可。
- arg:start_rtn函数的参数。无参数时设为NULL即可。有参数时输入参数的地址。当多于一个参数时应当使用结构体传入。
2.4 pthread_exit()
void pthread_exit(void *retval);
- retval 是void*类型的指针,可以指向任何类型的数据,它指向的数据将作为线程退出时的返回值。如果线程不需要返回任何数据,将 retval 参数置为NULL即可。
retval 指针不能指向函数内部的局部数据(比如局部变量)。换句话说,pthread_exit() 函数不能返回一个指向局部数据的指针,否则很可能使程序运行结果出错甚至崩溃。
三、 任务队列定义
// 任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
四、 线程池定义
// 线程池结构体
typedef struct ThreadPool {
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据
// 定义线程的操作
pthread_t managerID; // 管理者线程ID
pthread_t* threadIDs; // 工作线程ID,工作线程有多个,定义为数组
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数,任务特别少时,需要销毁线程
// 同步操作
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_mutex_t mutexBusy; // 锁busyNum变量
pthread_cond_t notFull; // 信号量,任务队列是不是满了,满了需要阻塞生产者,不能添加任务
pthread_cond_t notEmpty; // 信号量,任务队列是不是空了,空了需要阻塞消费者,不能消费任务
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
}ThreadPool;
五、 头文件内容 threadpool.h
#pragma once
#include<pthread.h>
#include<windows.h>
typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化,传入最大最小线程个数,以及任务容量大小
ThreadPool* threadPoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);
// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
//
//
// 工作的线程(消费者线程)任务函数,需要不断地从线程池的任务队列去读任务
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);
六、 .c文件实现
6.1 threadpool.c文件
#include "threadpool.h"
#include<stdio.h>
#include<malloc.h>
const int NUMBER = 2;
// @file:threadpool
// @author:IdealSanX_T
// @date:2024/6/12 9:21:15
// @brief:手撸线程池C语言版
// 任务结构体
typedef struct Task {
void (*function)(void* arg); //函数指针void* arg为泛型参数
void* arg; //参数地址
}Task;
// 线程池结构体
typedef struct ThreadPool {
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据
// 定义线程的操作
pthread_t managerID; // 管理者线程ID
pthread_t* threadIDs; // 工作线程ID,工作线程有多个,定义为数组
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数,任务特别少时,需要销毁线程
// 同步操作
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_mutex_t mutexBusy; // 锁busyNum变量
pthread_cond_t notFull; // 信号量,任务队列是不是满了,满了需要阻塞生产者,不能添加任务
pthread_cond_t notEmpty; // 信号量,任务队列是不是空了,空了需要阻塞消费者,不能消费任务
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
}ThreadPool;
// 初始化线程池
ThreadPool* threadPoolCreate(int min, int max, int queueSize) {
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool)); // 为线程池结构体分配内存
if (pool == NULL) {
printf("malloc threadpool fail...\n");
return NULL;
}
do {
// 任务队列初始化
pool->queueCapacity = queueSize; // 任务容量
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize); // 为任务队列分配容量内存
if (pool->taskQ == NULL) {
printf("malloc taskQ fail...\n");
break;
}
pool->queueFront = 0;
pool->queueRear = 0; //队头/尾指针初始化为0
// 同步操作初始化,锁的初始化需要调用方法pthread_mutex_init()
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}
// 线程的操作初始化
pool->minNum = min; // 线程数量最小值
pool->maxNum = max; // 线程数量最大值
pool->busyNum = 0; // 工作线程初始化
pool->exitNum = 0; // 销毁消除初始化
pool->liveNum = min; // 存活线程初始化,和最小个数相等(存活线程不等于工作线程,存活不一定工作)
// 管理者线程ID初始化
pthread_create(&pool->managerID, 0, manager, pool);
// 消费者线程ID初始化,分配线程最大数容量
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL){
printf("malloc threadIDs fail...\n");
break;
}
// 消费者线程ID数组初始化
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
for (int i = 0; i < max; i++) {
pthread_create(&pool->threadIDs[i], 0, worker, pool);
}
// 销毁线程池标志位初始化
pool->shutdown = 0;
return pool;
} while (false);
// 将初始化放在一次的循环里,是为了再分配内存时如果出错,使用break跳出循环,而不是return
// 这样做是为了每次失败后,可以跳出循环来释放已经分配的内存
// 释放资源
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);
return NULL;
}
// 线程池销毁
int threadPoolDestroy(ThreadPool* pool){
if (pool == NULL){
return -1;
}
// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者线程,被阻塞的消费者被唤醒后,pool->shutdown == 1时会自杀
for (int i = 0; i < pool->liveNum; ++i){
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ){
free(pool->taskQ);
pool->taskQ = NULL;
}
if (pool->threadIDs){
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
// 向任务队列添加任务,参数:线程池指针,函数指针,函数参数
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg){
// 互斥访问线程池
pthread_mutex_lock(&pool->mutexPool);
// 当任务队列满时并且线程池没有关闭时,阻塞此函数
while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
// 线程池关闭时,解锁退出
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
// 移动队尾指针
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
// 任务队列数量+1
pool->queueSize++;
// 释放不为空的信号量
pthread_cond_signal(&pool->notEmpty);
// 解锁
pthread_mutex_unlock(&pool->mutexPool);
}
// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool){
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool){
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
// 工作线程实现(消费者)
void* worker(void* arg){
// 接收传入的线程池
ThreadPool* pool = (ThreadPool*)arg;
// 不断从线程池的任务队列中取出任务
while (true) {
// 对线程池上锁,互斥访问线程池
pthread_mutex_lock(&pool->mutexPool);
// 任务队列为空并且线程池没有关闭时
while (pool->queueSize == 0 && !pool->shutdown) {
// 当notEmpty信号量没有时,则为空,就阻塞工作线程,并且释放mutexPool
// 不为空时,会被唤醒,消费notEmpty信号量,并重新对mutexPool加锁
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断是不是要销毁线程,exitNUM会在管理者进程中需要销毁线程时赋值
if (pool->exitNum > 0){
pool->exitNum--;
if (pool->liveNum > pool->minNum){
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
// 自定义线程退出函数,而没有直接用pthread_exit(),
// 需要将退出的线程所在线程队列位置进行置空
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// 从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动任务队列头结点
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--; // 任务队列中任务数量减一
// 解锁
pthread_cond_signal(&pool->notFull); // 消费了任务,释放不满信号量,notFull++
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy); //上锁,互斥访问busyNum
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy); //解锁
task.function(task.arg);
free(task.arg); //释放函数参数的堆内存
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy); //上锁,互斥访问busyNum
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy); //解锁
}
return NULL;
}
// 管理者线程实现
void* manager(void* arg){
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
// 每隔3s检测一次,是否需要添加/销毁线程
Sleep(3000);
// 取出线程池中任务的数量和当前线程的数量,互斥访问queueSize,liveNum
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// 取出忙的线程的数量,互斥访问busyNum
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// 添加线程,每次最多添加两个线程,也可以修改添加多个
// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数,这个条件是自定义,可以修改
if (queueSize > liveNum && liveNum < pool->maxNum){
// 加锁,互斥访问线程池
pthread_mutex_lock(&pool->mutexPool);
int counter = 0; // 添加线程的数量
for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i){
if (pool->threadIDs[i].x == 0){ // 将任务放到线程队列中空闲位置
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool); // 解锁
}
// 销毁线程,当空闲的存活线程过多,就需要销毁一部分
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数,这个条件是自定义,可以修改
if (busyNum * 2 < liveNum && liveNum > pool->minNum){
// 互斥访问线程池
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让空闲工作的线程自杀
for (int i = 0; i < NUMBER; ++i){
// 唤醒在任务队列为空时工作的线程,使其向下执行自杀操作
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
void threadExit(ThreadPool* pool){
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i){
// 寻找自杀的线程在线程队列里的位置
if (pthread_equal(pool->threadIDs[i], tid)) {
memset(&pool->threadIDs[i], 0, sizeof(pthread_t));
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
6.2 TestMain 测试文件
#include<stdio.h>
#include<string>
#include"threadpool.h"
// @file:TestMain
// @author:IdealSanX_T
// @date:2024/6/12 19:27:58
// @brief:
void taskFunc(void* arg){
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
Sleep(1000);
}
int main() {
// 创建线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for (int i = 0; i < 10; ++i){
int* num = (int*)malloc(sizeof(int));
*num = i;
printf("%d\n", *num);
threadPoolAdd(pool, taskFunc, num);
}
Sleep(3000);
threadPoolDestroy(pool);
return 0;
}
标签:实现,void,int,mutex,pthread,线程,pool
From: https://blog.csdn.net/qq_50921201/article/details/139591876