thread_pool.c
线程要执行的任务
/*****************************************************************************************
*
* @name : routine
* @function : 线程要执行的任务
* @paramsv : None
* @retval : None
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : 该函数会在routine线程被取消的时候自动执行
*
*
* *****************************************************************************************/
void *routine(void *arg)
{
// 调试
#ifdef DEBUG
printf("[%u] is started.\n",
(unsigned)pthread_self());
#endif
// 把需要传递给线程任务的参数进行备份
thread_pool *pool = (thread_pool *)arg;
// 新建一个任务结点用来备份
struct task *p;
while (1)
{
/**********************************************************************
**
** pthread_cleanup_push是一个宏,需要配合pthread_cleanup_pop一起使用
** 如在执行pthread_cleanup_pop之前,该线程被取消了,则会调用handler这个函数,
** 并将pool->lock作为参数传递给handler
**
****************************************************************************/
pthread_cleanup_push(handler, (void *)&pool->lock);
// 对互斥锁上锁
pthread_mutex_lock(&pool->lock);
/******************************任务量为0的情况********************************************************/
// 当处于等待状态的线程数量为0且销毁线程池的标志为0时
while (pool->waiting_tasks == 0 && !pool->shutdown)
{
// 对该线程进行挂起
pthread_cond_wait(&pool->cond, &pool->lock);
}
// 当处于等待状态的线程数量为0且销毁线程池的标志为1时
if (pool->waiting_tasks == 0 && pool->shutdown == true)
{
// 解锁互斥锁
pthread_mutex_unlock(&pool->lock);
// 结束该线程
pthread_exit(NULL); // CANNOT use 'break';
}
/******************************任务量不为0的情况****************************************************/
// 备份任务结点
p = pool->task_list->next;
// 让线程池里的当前任务指向下一个任务
pool->task_list->next = p->next;
// 令处于等待状态的线程数量减一
pool->waiting_tasks--;
//================================================//
// 互斥锁解锁
pthread_mutex_unlock(&pool->lock);
// 不执行清理处理程序,参数为1则执行清理处理程序
pthread_cleanup_pop(0);
//================================================//
// 设置线程为不可取消
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
// 执行p,即当前任务结点里需要执行的任务
(p->do_task)(p->arg);
// 设置线程为可取消
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
// 释放堆内存
free(p);
}
// 结束线程
pthread_exit(NULL);
}
初始化线池
/******************************************************
*
* @name : init_pool
* @function : 初始化线池
* @params
* @pool : 线程池的地址
* @threads_number : 线程池中活跃的线程的数量
*
* @retval : 成功返回1,否则返回0
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
// 以默认属性初始化互斥锁
pthread_mutex_init(&pool->lock, NULL);
// 以默认属性初始化条件量
pthread_cond_init(&pool->cond, NULL);
// 销毁标志设置为不销毁
pool->shutdown = false;
// 给链表的节点申请堆内存
pool->task_list = (struct task *)malloc(sizeof(struct task));
// 申请堆内存,用于存储创建出来的线程的ID
pool->tids = (pthread_t *)malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
// 错误处理,对malloc进行错误处理
if (pool->task_list == NULL || pool->tids == NULL)
{
fprintf(stderr, "malloc for task_list or tids fail!,error: %d, %s\n", errno, strerror(errno));
return false;
}
// 对任务链表中的节点的指针域进行初始化
pool->task_list->next = NULL;
// 设置线程池中线程数量的最大值
pool->max_waiting_tasks = MAX_WAITING_TASKS;
// 设置等待线程处理的任务的数量为0,说明现在没有任务
pool->waiting_tasks = 0;
// 设置线程池中活跃的线程的数量
pool->active_threads = threads_number;
// 循环创建活跃线程
for (int i = 0; i < pool->active_threads; i++)
{
// 创建线程 把线程的ID存储在申请的堆内存
if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0)
{
fprintf(stderr, " create threads fail!,error: %d, %s\n", errno, strerror(errno));
return false;
}
// 用于调试
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
return true;
}
向线程池的任务链表中添加任务
/******************************************************
*
* @name : add_task
* @function : 向线程池的任务链表中添加任务
* @params
* @pool : 线程池的地址
* @do_task : 需要执行的任务的函数
* @arg :需要执行的任务的函数的参数
*
* @retval : 如成功返回1,否则返回0
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{
// 给任务链表节点申请内存
struct task *new_task = malloc(sizeof(struct task));
if (new_task == NULL)
{
fprintf(stderr, " malloc for new task fail!,error: %d, %s\n", errno, strerror(errno));
return false;
}
new_task->do_task = do_task; // 任务函数指针指向传进来的do_task
new_task->arg = arg; // 设置函数参数
new_task->next = NULL; // 指针域设置为NULL
//============ LOCK =============//
// 互斥锁上锁
pthread_mutex_lock(&pool->lock);
//===============================//
// 说明要处理的任务的数量大于能处理的任务数量
if (pool->waiting_tasks >= MAX_WAITING_TASKS)
{
pthread_mutex_unlock(&pool->lock);
fprintf(stderr, "too many tasks.\n");
free(new_task);
return false;
}
// 备份任务链表
struct task *tmp = pool->task_list;
// 遍历链表,找到单向链表的尾节点
while (tmp->next != NULL)
tmp = tmp->next;
// 把新的要处理的任务插入到链表的尾部 尾插
tmp->next = new_task;
// 要处理的任务的数量+1
pool->waiting_tasks++;
//=========== UNLOCK ============//
pthread_mutex_unlock(&pool->lock);
//===============================//
// 调试
#ifdef DEBUG
printf("[%u][%s] ==> a new task has been added.\n",
(unsigned)pthread_self(), __FUNCTION__);
#endif
// 唤醒第一个处于阻塞队列中的线程
pthread_cond_signal(&pool->cond);
return true;
}
向线程池加入新线程
/******************************************************
*
* @name : add_thread
* @function : 向线程池加入新线程
* @params
* @pool : 线程池的地址
* @removing_threads : 需要增加的线程数量
*
* @retval : 返回正在活跃的线程数量
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
int add_thread(thread_pool *pool, unsigned additional_threads)
{
// 判断需要添加的新线程的数量是否为0
if (additional_threads == 0)
return 0;
// 计算线程池中总线程的数量
unsigned total_threads =
pool->active_threads + additional_threads;
int i, actual_increment = 0;
// 循环创建新线程
for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)
{
// 创建新线程
if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0)
{
perror("add threads error");
// 如需要加入的新线程池数量为0,则直接退出
if (actual_increment == 0)
return -1;
break;
}
actual_increment++;
// 用于调试
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
// 记录此时线程池中活跃线程的总数
pool->active_threads += actual_increment;
return actual_increment;
}
从线程池中删除线程
/******************************************************
*
* @name : remove_thread
* @function : 从线程池中删除线程
* @params
* @pool : 线程池的地址
* @removing_threads : 需要删除的线程数量
*
* @retval : 返回正在活跃的线程数量
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
// 如果需要删除的线程数量为0,则返回正在活跃的线程数量
if (removing_threads == 0)
return pool->active_threads;
// 定义一个变量叫remaining_threads,remaining_threads的值为正在活跃的线程数量减去需要删除线程的数量
int remaining_threads = pool->active_threads - removing_threads;
// 如remaining_threads大于零,说明正在活跃的线程数量大于需要删除线程的数量
// 如remaining_threads小于零,说明正在活跃的线程数量小于或等于需要删除线程的数量,并将remaining_threads赋值为1
remaining_threads = remaining_threads > 0 ? remaining_threads : 1;
int i;
// 循环取消线程,直到正在活跃的线程数量为0
for (i = pool->active_threads - 1; i > remaining_threads - 1; i--)
{
// 如调用pthread_cancel函数失败,则退出循环
errno = pthread_cancel(pool->tids[i]);
if (errno != 0)
break;
#ifdef DEBUG
printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
if (i == pool->active_threads - 1)
return -1;
// 返回现在的线程数量
else
{
pool->active_threads = i + 1;
return i + 1;
}
}
释放线程池
/******************************************************
*
* @name : destroy_pool
* @function : 释放线程池
* @params
* @pool : 线程池的地址
*
* @retval : 成功返回1,否则返回0
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
bool destroy_pool(thread_pool *pool)
{
// 设置销毁标志位1
pool->shutdown = true;
// 唤醒所有线程
pthread_cond_broadcast(&pool->cond);
// 循环等待线程结束
for (int i = 0; i < pool->active_threads; i++)
{
// 等待线程结束并释放相关资源
errno = pthread_join(pool->tids[i], NULL);
if (errno != 0)
{
printf("join tids[%d] error: %s\n", i, strerror(errno));
}
else
printf("[%u] is joined\n", (unsigned)pool->tids[i]);
}
// 释放相关内存
free(pool->task_list);
free(pool->tids);
free(pool);
return true;
}
thread_pool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <errno.h>
#include <pthread.h>
#define MAX_WAITING_TASKS 1000 // 处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20 // 活跃的线程数量
// 任务结点 单向链表的节点,类型
struct task
{
void *(*do_task)(void *arg); // 任务函数指针 指向线程要执行的任务 格式是固定的
void *arg; // 需要传递给任务的参数,如果不需要,则NULL
struct task *next; // 指向下一个任务结点的指针
};
// 线程池的管理结构体
typedef struct thread_pool
{
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件量
bool shutdown; // 是否需要销毁线程池
struct task *task_list; // 用于存储任务的链表
pthread_t *tids; // 用于记录线程池中线程的ID
unsigned max_waiting_tasks; // 线程池中线程的数量最大值
unsigned waiting_tasks; // 处于等待状态的线程数量
unsigned active_threads; // 正在活跃的线程数量
} thread_pool;
/******************************************************
*
* @name : init_pool
* @function : 初始化线池
* @params
* @pool : 线程池的地址
* @threads_number : 线程池中活跃的线程的数量
*
* @retval : 成功返回1,否则返回0
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number);
/******************************************************
*
* @name : add_task
* @function : 向线程池的任务链表中添加任务
* @params
* @pool : 线程池的地址
* @do_task : 需要执行的任务的函数
* @arg :需要执行的任务的函数的参数
*
* @retval : 如成功返回1,否则返回0
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);
/******************************************************
*
* @name : add_thread
* @function : 向线程池加入新线程
* @params
* @pool : 线程池的地址
* @removing_threads : 需要增加的线程数量
*
* @retval : 返回正在活跃的线程数量
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
int add_thread(thread_pool *pool, unsigned int additional_threads_number);
/******************************************************
*
* @name : remove_thread
* @function : 从线程池中删除线程
* @params
* @pool : 线程池的地址
* @removing_threads : 需要删除的线程数量
*
* @retval : 返回正在活跃的线程数量
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);
/******************************************************
*
* @name : destroy_pool
* @function : 释放线程池
* @params
* @pool : 线程池的地址
*
* @retval : 成功返回1,否则返回0
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : None
*
*
* *******************************************************/
bool destroy_pool(thread_pool *pool);
/*****************************************************************************************
*
* @name : routine
* @function : 线程要执行的任务
* @paramsv : None
* @retval : None
* @author : Dazz
* @date : 2024/6/2
* @version : None
* @note : 该函数会在routine线程被取消的时候自动执行
*
*
* *****************************************************************************************/
void *routine(void *arg);
#endif
标签:None,task,封装,函数,threads,pthread,线程,pool
From: https://www.cnblogs.com/Dazz24/p/18241483