[toc]
线程池
线程池代码分析
thread_pool.c
#include "thread_pool.h"
void handler(void *arg)
{
printf("[%u] is ended.\n",
(unsigned)pthread_self()); //打印自己的进程号
pthread_mutex_unlock((pthread_mutex_t *)arg); //解锁
}
//线程要执行的任务
void *routine(void *arg)
{
//调试
#ifdef DEBUG
printf("[%u] is started.\n",
(unsigned)pthread_self());
#endif
//把需要传递给线程任务的参数进行备份
thread_pool *pool = (thread_pool *)arg;
struct task *p;
while(1)
{
/*
** push a cleanup functon handler(), make sure that
** the calling thread will release the mutex properly
** even if it is cancelled during holding the mutex.
**
** NOTE:
** pthread_cleanup_push() is a macro which includes a
** loop in it, so if the specified field of codes that
** paired within pthread_cleanup_push() and pthread_
** cleanup_pop() use 'break' may NOT break out of the
** truely loop but break out of these two macros.
** see line 61 below.
*/
//================================================//
pthread_cleanup_push(handler, (void *)&pool->lock); //注册清理线程函数
pthread_mutex_lock(&pool->lock); //上锁
//================================================//
// 1, no task, and is NOT shutting down, then wait
while(pool->waiting_tasks == 0 && !pool->shutdown){ //判断处于等待状态的线程是否等于0,以及判断是否需要销毁线程池,这里是如果false,则进入循环
pthread_cond_wait(&pool->cond, &pool->lock); //会将调用线程放入条件变量的等待队列,并释放互斥锁。线程在被唤醒之前会一直阻塞。当线程被唤醒后,它会重新获取互斥锁,然后继续执行
}
// 2, no task, and is shutting down, then exit
if(pool->waiting_tasks == 0 && pool->shutdown == true) //如果需要销毁,则解锁然后线程退出
{
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL); // CANNOT use 'break';
}
// 3, have some task, then consume it //链表的头删,表示当有任务链表中有任务时,处于等待队列的线程就会去,执行任务链表中的表头的任务
p = pool->task_list->next; //将链表的首结点地址,给到局部变量的P
pool->task_list->next = p->next; //让链表的头结点,指向P->next ,也就是首结点的下一个结点
pool->waiting_tasks--; //让等待被线程执行的任务数量减1
//================================================//
pthread_mutex_unlock(&pool->lock); //解锁
pthread_cleanup_pop(0); //不取消清理函数,也不执行
//================================================//
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
(p->do_task)(p->arg); //执行链表中的函数
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(p); //执行完任务结点后释放局部变量下保存的堆内存地址
}
pthread_exit(NULL);
}
//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
//初始化互斥锁
pthread_mutex_init(&pool->lock, NULL);
//初始化条件量
pthread_cond_init(&pool->cond, NULL);
//销毁标志
pool->shutdown = false; //不销毁
//给链表的节点申请堆内存
pool->task_list = malloc(sizeof(struct task));
//申请堆内存,用于存储创建出来的线程的ID
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); //线程池中最大线程数量= 活跃线程+能够创建的线程数量(既能够存放线程ID的个数)
//错误处理,对malloc进行错误处理
if(pool->task_list == NULL || pool->tids == NULL)
{
perror("allocate memory error");
return false;
}
//对任务链表中的节点的指针域进行初始化
pool->task_list->next = NULL;
//设置线程池中线程数量的最大值
pool->max_waiting_tasks = MAX_WAITING_TASKS; //方便更改
//设置等待线程处理的任务的数量为0,说明现在没有任务
pool->waiting_tasks = 0;
//设置线程池中活跃的线程的数量
pool->active_threads = threads_number;
int i;
//循环创建活跃线程
for(i=0; i<pool->active_threads; i++)
{
//创建线程 把线程的ID存储在申请的堆内存
if(pthread_create(&((pool->tids)[i]), NULL,
routine, (void *)pool) != 0)
{
perror("create threads error");
return false;
}
//用于调试
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
return true;
}
//先线程池的任务链表中添加任务
bool add_task(thread_pool *pool,
void *(*do_task)(void *arg), void *arg) //将自定义的任务,添加到
{
//给任务链表节点申请内存
struct task *new_task = malloc(sizeof(struct task));
if(new_task == NULL)
{
perror("allocate memory error");
return false;
}
new_task->do_task = do_task; //设置需要在链表中添加的任务
new_task->arg = arg; //任务函数的参数
new_task->next = NULL; //指针域设置为NULL
//============ LOCK =============//
pthread_mutex_lock(&pool->lock); //上锁防止主线程与任务执行线程,资源竞争
//===============================//
//说明要处理的任务的数量大于能处理的任务数量
if(pool->waiting_tasks >= MAX_WAITING_TASKS)
{
pthread_mutex_unlock(&pool->lock);
fprintf(stderr, "too many tasks.\n");
free(new_task);
return false;
}
struct task *tmp = pool->task_list; //通过局部变量,对任务链表进行访问,可以保留头结点的位置,减少持有锁的时间,随着函数结束自动释放资源
//遍历链表,找到单向链表的尾节点
while(tmp->next != NULL)
tmp = tmp->next;
//把新的要处理的任务插入到链表的尾部 尾插
tmp->next = new_task;
//要处理的任务的数量+1
pool->waiting_tasks++;
//=========== UNLOCK ============//
pthread_mutex_unlock(&pool->lock);
//===============================//
//调试
#ifdef DEBUG
printf("[%u][%s] ==> a new task has been added.\n",
(unsigned)pthread_self(), __FUNCTION__);
#endif
//唤醒第一个处于阻塞队列中的线程
pthread_cond_signal(&pool->cond);
return true;
}
//向线程池加入新线程
int add_thread(thread_pool *pool, unsigned additional_threads)
{
//判断需要添加的新线程的数量是否为0
if(additional_threads == 0)
return 0;
//计算线程池中总线程的数量
unsigned total_threads =
pool->active_threads + additional_threads; //活跃线程的数量+需要添加新线程的数量=总的活跃线程的数量
int i, actual_increment = 0;
for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++) //判断条件为 活跃线程的数量小于线程池中总线程的数量 且 小于创建线程的上限
{ //线程总的活跃线程的数量,是用来判断循环多少次的
//创建新线程
if(pthread_create(&((pool->tids)[i]), //而超过能够创建的线程数量也一样不会在添加新线程了
NULL, routine, (void *)pool) != 0) //为什么不这样写for(i=0 , i<additional_threads, && i < MAX_ACTIVE_THREADS;i++)
{ //因为创建出来的线程ID,会根据创建时的顺序,存放到对应的数组下标的位置,
perror("add threads error"); //所以需要定义两个局部变量,用于保存数组最后的一个元素的后一个位置,因为是i++ ,int i, actual_increment = 0;
//actual_increment这个变量用于保存实际创建线程的数量
// no threads has been created, return fail
if(actual_increment == 0)
return -1;
break;
}
actual_increment++;
#ifdef DEBUG
printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
//记录此时线程池中活跃线程的总数
pool->active_threads += actual_increment;
return actual_increment;
}
int remove_thread(thread_pool *pool, unsigned int removing_threads) //删除线程池中的线程。 参数是需要删除的数量
{
if(removing_threads == 0)
return pool->active_threads;
int remaining_threads = pool->active_threads - removing_threads; //计算线程池中剩余的线程数量
remaining_threads = remaining_threads > 0 ? remaining_threads : 1; //防呆,如果想要删除的线程数量大于线程池中线程的数量则
//保留初始化时数组下标为0的线程,如果不保留,就是销毁线程池了
int i; //在删除线程池中的线程时,保留至少一个线程的原因是为了确保线程池仍然能够处理剩余的任务,避免完全停止服务。
for(i=pool->active_threads-1; i>remaining_threads-1; i--) //pool->active_threads-1即数组中最后一个元素下标
{
errno = pthread_cancel(pool->tids[i]); // 取消线程 同时 执行线程清理函数,主线程调用,终止其他线程
if(errno != 0)
break;
#ifdef DEBUG
printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",
(unsigned)pthread_self(), __FUNCTION__,
i, (unsigned)pool->tids[i]);
#endif
}
if(i == pool->active_threads-1)
return -1;
else
{
pool->active_threads = i+1;
return i+1;
}
}
bool destroy_pool(thread_pool *pool) //销毁线程池
{
// 1, activate all threads
pool->shutdown = true;
pthread_cond_broadcast(&pool->cond); //唤醒线程池中所有线程,然后终止线程
// 2, wait for their exiting
int i;
for(i=0; i<pool->active_threads; i++)
{
errno = pthread_join(pool->tids[i], NULL); //主线程调用,循环等待回收线程资源
if(errno != 0)
{
printf("join tids[%d] error: %s\n",
i, strerror(errno));
}
else
printf("[%u] is joined\n", (unsigned)pool->tids[i]);
}
// 3, free memories
free(pool->task_list); //释放线程池中任务链表的堆内存
free(pool->tids); //释放线程句柄数组的堆内存
free(pool); //释放管理线程池的堆内存
return true;
}
thread_pool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>
#define MAX_WAITING_TASKS 1000 //处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20 //活跃的线程数量
//任务结点 单向链表的节点,类型
struct task
{
void *(*do_task)(void *arg); //任务函数指针 指向线程要执行的任务 格式是固定的
void *arg; //需要传递给任务的参数,如果不需要,则NULL
struct task *next; //指向下一个任务结点的指针
};
//线程池的管理结构体
typedef struct thread_pool
{
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件量
bool shutdown; //是否需要销毁线程池
struct task *task_list; //用于存储任务的链表
pthread_t *tids; //用于记录线程池中线程的ID,其实是一个数组,数组中元素的类型是pthread_t
unsigned max_waiting_tasks; //线程池中线程的数量最大值
unsigned waiting_tasks; //设置等待线程处理的任务的数量
unsigned active_threads; //正在活跃的线程数量
}thread_pool;
//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number);
//向线程池中添加任务
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);
//先线程池中添加线程
int add_thread(thread_pool *pool, unsigned int additional_threads_number);
//从线程池中删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);
//销毁线程池
bool destroy_pool(thread_pool *pool);
//任务函数
void *routine(void *arg);
#endif
main.c
#include "thread_pool.h"
// 任务函数:模拟一个需要耗时的任务
void *mytask(void *arg)
{
int n = (int)arg;
printf("[%u][%s] ==> job will be done in %d sec...\n",
(unsigned)pthread_self(), __FUNCTION__, n);
sleep(n); // 模拟任务耗时
printf("[%u][%s] ==> job done!\n",
(unsigned)pthread_self(), __FUNCTION__);
return NULL;
}
// 计时函数:每秒打印一次经过的秒数
void *count_time(void *arg)
{
int i = 0;
while(1)
{
sleep(1);
printf("sec: %d\n", ++i);
}
}
int main(void)
{
pthread_t a;
// 创建计时线程
pthread_create(&a, NULL, count_time, NULL);
// 1. 初始化线程池
thread_pool *pool = malloc(sizeof(thread_pool));
init_pool(pool, 2); // 初始化线程池,初始线程数为2
// 2. 添加任务到线程池
printf("throwing 3 tasks...\n");
add_task(pool, mytask, (void *)(rand()%10));
add_task(pool, mytask, (void *)(rand()%10));
add_task(pool, mytask, (void *)(rand()%10));
// 3. 检查当前活动线程数
printf("current thread number: %d\n",
remove_thread(pool, 0)); // 传入0表示不移除线程,只返回当前线程数
sleep(9); // 等待9秒
// 4. 添加更多任务到线程池
printf("throwing another 2 tasks...\n");
add_task(pool, mytask, (void *)(rand()%10));
add_task(pool, mytask, (void *)(rand()%10));
// 5. 添加更多线程到线程池
add_thread(pool, 2); // 增加2个线程
sleep(5); // 等待5秒
// 6. 从线程池中移除线程
printf("remove 3 threads from the pool, "
"current thread number: %d\n",
remove_thread(pool, 3)); // 移除3个线程,并打印当前线程数
// 7. 销毁线程池
destroy_pool(pool); // 销毁线程池
return 0; // 主函数返回0,程序结束
}
线程取消的基本概念
取消点(Cancellation Point):线程在执行过程中,会在一些特定的函数调用时检查是否有取消请求,这些函数称为取消点。例如,
pthread_testcancel
、pthread_join
、pthread_cond_wait
等函数都是取消点。取消类型(Cancellation Type):决定线程在取消点如何响应取消请求。主要有两种类型:
- 异步取消(Asynchronous Cancellation):线程可以在任何时刻被取消。
- 延迟取消(Deferred Cancellation):线程只有在到达取消点时才会被取消。POSIX 线程库默认使用这种类型。
取消状态(Cancellation State):决定线程是否响应取消请求。可以是以下两种状态:
- 启用(Enable):线程会响应取消请求。
- 禁用(Disable):线程不会响应取消请求。
pthread_cleanup_push
是 POSIX 线程库(pthread)中的一个函数,用于在线程取消时执行清理操作。它与pthread_cleanup_pop
配对使用,确保在线程退出或被取消时执行特定的清理代码,例如释放资源或解锁互斥锁。pthread_cleanup_push(cleanup, "Resource A")
:注册清理函数cleanup
,当线程被取消或退出时,会执行cleanup("Resource A")
。pthread_cleanup_pop(1)
:取消清理函数并执行它(参数1
表示执行清理函数)。pthread_testcancel()
:这是一个取消点,线程在这里检查是否有取消请求。
pthread_cleanup_push
和pthread_cleanup_pop
用于确保线程在被取消或正常退出时执行特定的清理操作。这对于管理资源(如内存、文件描述符、互斥锁等)非常重要,确保不会因为线程的意外终止而导致资源泄漏。
线程取消(Thread Cancellation)是指在多线程编程中,允许一个线程请求终止另一个线程的执行。POSIX 线程库提供了这种机制,使得线程可以被其他线程取消。这在某些情况下非常有用,例如当一个线程因为某种原因需要提前终止另一个线程的执行时。
相关函数
pthread_cancel(pthread_t thread)
: 请求取消指定的线程。pthread_setcancelstate(int state, int *oldstate)
: 设置线程的取消状态。pthread_setcanceltype(int type, int *oldtype)
: 设置线程的取消类型。pthread_testcancel()
: 创建一个取消点,线程在执行到这里时会检查是否有取消请求。
通过线程池去管理线程:重点在于用条件变量,以及线性表(数组)保存线程的ID(句柄)
通过链表去管理任务: 尾插,头删的方式
阅读代码的时候需要结合上下文
布尔类型一般常用来判断是和否(即二进制可作为标志)
函数指针:用来指向函数的指针 void *(*do_task)(void *arg);
释放堆内存时,包括结构体里的指针的堆内存分配
整体的架构应该是:
堵塞队列:线程1 , 线程2 ,线程3 .......
任务链表: 任务1 , 任务2 ,任务3 .........
pthread_线程1{
上锁
while{
条件量
}
if{判断标志位
销毁线程池
}
pool->taks_list->next = p->next(头删)
(p->task_任务1)(p->arg)
解锁
free(p)
}