从上图可以看到,线程被创建出来之后,都处于睡眠态,它们实际上是进入了条件量的等待队列中。而任务都被放入一个链表,被互斥锁保护起来。下面是线程池里面线程们的一生:
\1. 被创建
\2. 写遗书(准备好退出处理函数,防止在持有一把锁的状态中死去)
\3. 试图持有互斥锁(等待任务)
\4. 判断是否有任务,若无则进入条件量等待队列睡眠,若有则进入第5步
\5. 从任务链表中取得一个任务
\6. 释放互斥锁
\7. 销毁遗书(将备用的退出处理函数弹出,避免占用内存)
\8. 执行任务,完毕之后重回第2步
thread_pool.c:
/*************************************************************************************
*
* 该.c文件主要用于管理线程池,包含了清理函数handler()、线程执行任务函数routine()、初始化线程池函数init_pool()、
* 往线程池添加任务函数add_task()、往线程池中添加线程函数add_thread()、移除线程函数remove_thread()、销毁线程池函数destroy_pool()
*
*
*
* Copyright [email protected] All right is reserved
*
*************************************************************************************/
#include "thread_pool.h"
/**************************************************************************************
*
* @name : handler
* @function: 该函数为清理函数handler(),确保调用线程会正确释放互斥锁,即使它在持有互斥锁时被取消。
* @params :
* @arg : 指的是传递给该清理函数的互斥锁变量
* @retval : none
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
* @note : none
*
* ***********************************************************************************/
void handler(void *arg)
{
printf("[%u] is ended.\n",
(unsigned)pthread_self()); //输出当前线程的ID
pthread_mutex_unlock((pthread_mutex_t *)arg); //解锁互斥锁
}
/**************************************************************************************
*
* @name : routine
* @function: 该函数指的是线程要执行的任务
* @params :
* @arg : 该参数一般指的是传递给该函数的线程池管理结构体指针
*
* @retval : none
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
* @note : 在进入 pthread_cleanup_push 和 pthread_cleanup_pop 之间的代码块时,清理函数被注册。
* 如果线程在这段代码中被取消或正常退出,清理函数会被调用。
* pthread_cleanup_pop 用于解除清理函数的注册,并根据 execute 参数决定是否立即执行清理函数。
*
* ***********************************************************************************/
void *routine(void *arg)
{
//调试信息输出(DEBUG模式下)
#ifdef DEBUG
printf("[%u] is started.\n",
(unsigned)pthread_self()); //输出当前线程ID
#endif
//把需要传递给线程任务的参数进行备份,把arg类型强转为thread_pool *
thread_pool *pool = (thread_pool *)arg;
//定义任务结点变量指针,用于备份任务链表的首结点
struct task *p;
while(1)
{
/*
** push a cleanup functon handler(), make sure that
** the calling thread will release the mutex properly
** even if it is cancelled during holding the mutex.
**
** NOTE:
** pthread_cleanup_push() is a macro which includes a
** loop in it, so if the specified field of codes that
** paired within pthread_cleanup_push() and pthread_
** cleanup_pop() use 'break' may NOT break out of the
** truely loop but break out of these two macros.
** see line 61 below.
*/
//================================================//
/********************************************************************************
*
*pthread_cleanup_push()为注册清理函数,如果 routine 中的代码块由于某种原因异常退出(比如抛出了异常或被取消了),
*那么 handler 函数将被自动调用,并传入 pool->lock 的地址作为参数,从而确保互斥锁被正确解锁。
*
* *****************************************************************************/
pthread_cleanup_push(handler, (void *)&pool->lock);
//锁定互斥锁
pthread_mutex_lock(&pool->lock);
//================================================//
// 1, no task, and is NOT shutting down, then wait
// 1、当线程池没有任务时,并且不是销毁状态时,则挂起
while(pool->waiting_tasks == 0 && !pool->shutdown)
{
//线程挂起
pthread_cond_wait(&pool->cond, &pool->lock);
}
// 2, no task, and is shutting down, then exit
// 2、当线程池没有任务时,并且处于销毁状态时,则终止线程
if(pool->waiting_tasks == 0 && pool->shutdown == true)
{
//解锁互斥锁
pthread_mutex_unlock(&pool->lock);
//终止线程
pthread_exit(NULL); // CANNOT use 'break';
}
// 3, have some task, then consume it
// 3、当有任务时,则消耗任务
p = pool->task_list->next; //备份线程池中任务链表的首结点任务
pool->task_list->next = p->next; //重新备份线程池任务链表的首结点的后继结点为首结点
pool->waiting_tasks--; //线程任务减1
//================================================//
pthread_mutex_unlock(&pool->lock); //解锁互斥锁
/***********************************************************************************
*
* pthread_cleanup_pop()用于解除之前通过 pthread_cleanup_push 注册的清理函数。其主要作用是在特定代码块结束时,
* 决定是否执行已注册的清理函数。如果 execute 为非零值,清理函数会被立即执行。
* 如果 execute 为零,仅解除清理函数的注册,但不会执行清理函数。
*
***********************************************************************************/
pthread_cleanup_pop(0); // 此时参数为0,仅解除清理函数的注册,但不会执行清理函数
//================================================//
/***********************************************************************************
*
* 线程的可取消状态被设置为 PTHREAD_CANCEL_DISABLE,这意味着该线程现在不能被其他线程通过
* pthread_cancel 取消。因为 oldstate 参数是 NULL,所以函数不会返回当前的可取消状态。
*
* ********************************************************************************/
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
//执行任务,此时该线程为不可取消状态,保证线程完整地执行任务
(p->do_task)(p->arg);
//此时该线程能被其他线程通过pthread_cancel 取消
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
//释放任务资源
free(p);
}
//终止线程
pthread_exit(NULL);
}
/**************************************************************************************
*
* @name : init_pool
* @function: 该函数的作用为初始化线程池管理结构体
* @params :
* @pool : 指的是需要初始化的线程池管理结构体指针
* @threads_number : 指的是要创建的初始活跃线程的数量
*
* @retval : bool : 返回值用于判断初始化线程池是否初始化成功
成功返回true,失败返回false
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
* @note : none
*
* ***********************************************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
//初始化线程池的管理结构体中的互斥锁成员
pthread_mutex_init(&pool->lock, NULL);
//初始化线程池的管理结构体中的条件变量成员
pthread_cond_init(&pool->cond, NULL);
//初始化销毁标志为false,即不销毁状态
pool->shutdown = false;
//给线程池中的任务链表的节点申请堆内存
pool->task_list = malloc(sizeof(struct task));
//给线程池中的线程ID申请堆内存,用于存储创建出来的线程的ID
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
//错误处理,对malloc进行错误处理
if(pool->task_list == NULL || pool->tids == NULL)
{
perror("allocate memory error");
return false;
}
//对任务链表中的节点的指针域进行初始化
pool->task_list->next = NULL;
//设置线程池中线程数量的最大值
pool->max_waiting_tasks = MAX_WAITING_TASKS;
//设置等待线程处理的任务的数量为0,说明现在没有任务
pool->waiting_tasks = 0;
//设置线程池中活跃的线程的数量
pool->active_threads = threads_number;
int i;
//循环创建活跃线程
for(i=0; i<pool->active_threads; i++)
{
//创建线程 把线程的ID存储在申请的堆内存
if(pthread_create(&((pool->tids)[i]), NULL,
routine, (void *)pool) != 0)
{
perror("create threads error");
return false;
}
//调试信息输出(在DEBUG模式下)
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__, //输出当前线程ID和函数名
i, (unsigned)pool->tids[i]); //输出被创建线程的索引和线程ID
#endif
}
return true; //成功则返回true
}
/**************************************************************************************
*
* @name : add_task
* @function: 该函数可往线程池中的任务链表中添加新任务
* @params :
* @pool : 指的是传递给该函数的线程池管理结构体指针
* @do_task : 指的是投送至线程池的执行例程
* @arg : 指的是要传递给新添加执行例程的参数,若执行例程不需要参数则填NULL
*
* @retval : bool : 返回值用于判断往线程池中的任务链表中添加新任务是否成功
成功返回true,失败返回false
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
*
* @note : 调用一次添加任务的函数时只能往线程池中添加一个任务
*
* ***********************************************************************************/
bool add_task(thread_pool *pool,
void *(*do_task)(void *arg), void *arg)
{
//给任务链表新节点申请内存
struct task *new_task = malloc(sizeof(struct task));
//错误处理,对malloc进行错误处理
if(new_task == NULL)
{
perror("allocate memory error");
return false;
}
new_task->do_task = do_task; //添加任务到任务链表中
new_task->arg = arg; //添加任务参数到任务链表中
new_task->next = NULL; //指针域设置为NULL
//============ LOCK =============//
pthread_mutex_lock(&pool->lock);
//===============================//
//说明要处理的任务的数量大于能处理的任务数量
if(pool->waiting_tasks >= MAX_WAITING_TASKS)
{
//解锁互斥锁
pthread_mutex_unlock(&pool->lock);
//说明太多任务
fprintf(stderr, "too many tasks.\n");
//释放任务链表
free(new_task);
//此时任务数量大于最大值,返回false
return false;
}
//备份线程池中的任务链表头结点
struct task *tmp = pool->task_list;
//遍历链表,找到单向链表的尾节点
while(tmp->next != NULL)
tmp = tmp->next; //备份任务链表的首结点
//把新的要处理的任务插入到任务链表的尾部(尾插)
tmp->next = new_task;
//要处理的任务的数量+1
pool->waiting_tasks++;
//=========== UNLOCK ============//
pthread_mutex_unlock(&pool->lock);
//===============================//
//调试信息输出(在DEBUG模式下)
#ifdef DEBUG
printf("[%u][%s] ==> a new task has been added.\n",
(unsigned)pthread_self(), __FUNCTION__); //输出当前线程的ID和函数名
#endif
//唤醒第一个处于阻塞队列中的线程
pthread_cond_signal(&pool->cond);
//成功则返回true
return true;
}
/**************************************************************************************
*
* @name : add_thread
* @function: 该函数可往线程池中添加活跃新线程的个数
* @params :
* @pool : 指的是需要增加线程的线程池管理结构体指针
* @additional_threads : 指的是要新添加的新线程数量
*
* @retval : int :返回值指的是创建完新线程的数量,添加失败则返回-1
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
*
* @note : none
*
* ***********************************************************************************/
int add_thread(thread_pool *pool, unsigned additional_threads)
{
//判断需要添加的新线程的数量是否为0,如果是则直接退出
if(additional_threads == 0)
return 0;
//计算线程池中总线程的数量,活跃的线程数量 + 新添加的线程数量
unsigned total_threads = pool->active_threads + additional_threads;
//i用于记录活跃线程的数量,actual_increment用于记录创建新线程的数量
int i, actual_increment = 0;
//在活跃线程数量的基础上添加新线程,并且线程的总数不能超过指定的最大线程数量
for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++)
{
//创建新线程,并且做错误处理,如果判断的条件不返回为0,则创建失败
if(pthread_create(&((pool->tids)[i]),
NULL, routine, (void *)pool) != 0)
{
perror("add threads error");
// no threads has been created, return fail
//如果没有创建线程,则直接判为创建失败并且退出
if(actual_increment == 0)
return -1; //返回-1,创建失败,直接退出函数
//创建成功则退出循环
break;
}
//创建成功则新线程的数量+1
actual_increment++;
//调试信息输出(在DEBUG模式下)
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__, //输出当前线程的ID和函数名
i, (unsigned)pool->tids[i]); //输出被创建出来的线程的索引和线程ID
#endif
}
//记录此时线程池中活跃线程的总数
pool->active_threads += actual_increment;
//返回创建新线程的数量
return actual_increment;
}
/**************************************************************************************
*
* @name : remove_thread
* @function: 该函数作用为移除线程池中的线程
* @params :
* @pool : 指的是需要移除线程的线程池结构体指针
* @removing_threads :指的是要移除线程的数量
*
* @retval : int : 返回值为剩余线程的数量,移除失败则返回-1
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
*
* @note : 1.线程池至少存在1条线程
2.如果被移除的线程正在执行任务,则等待其完成任务之后移除
*
* ***********************************************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
//判断要移除的线程数量是否为0,如果是则返回活跃线程的数量
if(removing_threads == 0)
return pool->active_threads;
//剩余的线程数量 = 活跃线程数量 - 要移除的线程数量
int remaining_threads = pool->active_threads - removing_threads;
//至少要保留一个线程
remaining_threads = remaining_threads > 0 ? remaining_threads : 1;
// 遍历线程,从最后一个活跃线程开始向前遍历(类似数组最后一个元素)
int i;
for(i=pool->active_threads-1; i>remaining_threads-1; i--)
{
//尝试取消第i个线程
errno = pthread_cancel(pool->tids[i]);
//检查取消线程是否成功
if(errno != 0)
break; //如果取消失败,则跳出循环
// 调试信息输出(在DEBUG模式下)
#ifdef DEBUG
printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
(unsigned)pthread_self(), __FUNCTION__, //输出当前线程ID和函数名
i, (unsigned)pool->tids[i]); //输出被取消线程的索引和线程ID
#endif
}
//如果i的值仍然是初始值(即没有线程被取消),则返回-1表示取消失败
if(i == pool->active_threads-1)
return -1;
else
{
//如果i的值有变化。则更新活跃线程的数量(i可作为数组下标使用,所以要 +1)
pool->active_threads = i+1;
//返回剩余活跃线程的数量
return i+1;
}
}
/**************************************************************************************
*
* @name : destroy_pool
* @function: 该函数作用为阻塞等待所有任务完成,然后立即销毁整个线程池,释放所有资源和内存
* @params :
* @pool : 指的是要销毁的线程池管理结构体指针
*
* @retval : bool : 返回值用于判断是否成功销毁线程池
成功返回true,失败返回false
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
*
* @note : none
*
* ***********************************************************************************/
bool destroy_pool(thread_pool *pool)
{
// 1, activate all threads
//是否销毁线程池的标志设置为true,表示销毁线程池
pool->shutdown = true;
// 使用条件变量广播,通知所有等待任务的线程检查关闭标志(此时唤醒了所有线程)
pthread_cond_broadcast(&pool->cond);
// 2, wait for their exiting
//等待所有线程退出
int i;
for(i=0; i<pool->active_threads; i++)
{
//等待每个线程结束,回收被终止线程的资源并且返回一个错误码
errno = pthread_join(pool->tids[i], NULL);
//检车终止线程是否成功,返回值等于0则成功终止线程,否则失败
if(errno != 0)
{
//如果等待线程结束失败,则打印错误信息
printf("join tids[%d] error: %s\n",
i, strerror(errno));
}
else
//如果等待线程结束成功,则打印每个被终止线程的ID
printf("[%u] is joined\n", (unsigned)pool->tids[i]);
}
// 3, free memories
//释放线程池中任务链表的内存
free(pool->task_list);
//释放线程池中线程ID的内存
free(pool->tids);
//释放线程池的内存
free(pool);
//销毁成功,返回true
return true;
}
thread_pool.h:
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
/*****************************************头文件********************************************/
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>
/******************************************END*********************************************/
/*********************************************宏*******************************************/
#define MAX_WAITING_TASKS 1000 //处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20 //活跃的线程数量
/*********************************************END*****************************************/
//任务结点 单向链表的节点,类型
struct task
{
void *(*do_task)(void *arg); //任务函数指针 指向线程要执行的任务 格式是固定的
void *arg; //需要传递给任务的参数,如果不需要,则NULL
struct task *next; //指向下一个任务结点的指针
};
//线程池的管理结构体
typedef struct thread_pool
{
pthread_mutex_t lock; // 互斥锁,保护任务队列
pthread_cond_t cond; // 条件量,同步所有线程
bool shutdown; //是否需要销毁线程池
struct task *task_list; //用于存储任务的链表
pthread_t *tids; //用于记录线程池中线程的ID
unsigned max_waiting_tasks; //线程池中线程的数量最大值
unsigned waiting_tasks; //处于等待状态的任务数量
unsigned active_threads; //正在活跃的线程数量
}thread_pool;
/**************************************************************************************
*
* @name : init_pool
* @function: 该函数的作用为初始化线程池管理结构体
* @params :
* @pool : 指的是需要初始化的线程池管理结构体指针
* @threads_number : 指的是要创建的初始活跃线程的数量
*
* @retval : bool : 返回值用于判断初始化线程池是否初始化成功
成功返回true,失败返回false
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
* @note : none
*
* ***********************************************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number);
/**************************************************************************************
*
* @name : add_task
* @function: 该函数可往线程池中的任务链表中添加新任务
* @params :
* @pool : 指的是传递给该函数的线程池管理结构体指针
* @do_task : 指的是投送至线程池的执行例程
* @arg : 指的是要传递给新添加执行例程的参数,若执行例程不需要参数则填NULL
*
* @retval : bool : 返回值用于判断往线程池中的任务链表中添加新任务是否成功
成功返回true,失败返回false
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
*
* @note : 调用一次添加任务的函数时只能往线程池中添加一个任务
*
* ***********************************************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);
/**************************************************************************************
*
* @name : add_thread
* @function: 该函数可往线程池中添加活跃新线程的个数
* @params :
* @pool : 指的是需要增加线程的线程池管理结构体指针
* @additional_threads : 指的是要新添加的新线程数量
*
* @retval : int :返回值指的是创建完新线程的数量,添加失败则返回-1
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
*
* @note : none
*
* ***********************************************************************************/
int add_thread(thread_pool *pool, unsigned int additional_threads_number);
/**************************************************************************************
*
* @name : remove_thread
* @function: 该函数作用为移除线程池中的线程
* @params :
* @pool : 指的是需要移除线程的线程池结构体指针
* @removing_threads :指的是要移除线程的数量
*
* @retval : int : 返回值为剩余线程的数量,移除失败则返回-1
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
*
* @note : 1.线程池至少存在1条线程
2.如果被移除的线程正在执行任务,则等待其完成任务之后移除
*
* ***********************************************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);
/**************************************************************************************
*
* @name : destroy_pool
* @function: 该函数作用为阻塞等待所有任务完成,然后立即销毁整个线程池,释放所有资源和内存
* @params :
* @pool : 指的是要销毁的线程池管理结构体指针
*
* @retval : bool : 返回值用于判断是否成功销毁线程池
成功返回true,失败返回false
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
*
* @note : none
*
* ***********************************************************************************/
bool destroy_pool(thread_pool *pool);
/**************************************************************************************
*
* @name : routine
* @function: 该函数指的是线程要执行的任务
* @params :
* @arg : 该参数一般指的是传递给该函数的线程池管理结构体指针
*
* @retval : none
* @date : 2024/06/12
* @author : [email protected]
* @version: V1.0
*
* @note : none
*
* ***********************************************************************************/
void *routine(void *arg);
#endif
标签:task,任务,threads,pthread,线程,pool
From: https://www.cnblogs.com/lwj294/p/18244656