手写线程池
线程池解决的问题是避免线程创建、销毁的代价以及避免线程太多,内存耗尽。GCC编译时,必须通过指令引入线程库。
gcc -o threadpool threadpool.c -pthread
组件开发
线程池最基本需要包含三个组件
- 任务队列-添加任务
- 管理组件-管理线程对立以及任务队列
- 执行队列-线程队列-访问管理组件,从管理任务组件中取出任务
实现思路
-
首先定义结构
任务队列
// 定义任务队列,实现的功能是需要注册到管理者 struct task{ // void (*task_func) (struct task*); void* user_data; // callback struct task* prev; struct task* next; };
执行队列
// 执行队列,需要下载任务,注册到管理者 struct worker{ pthread_t t_id; int thread_number; int thread_close; struct worker* prev; struct worker* next; struct manager* m_manger; };
管理队列
// 管理者,管理执行者和任务,其实这里就是线程池 typedef struct manager{ struct task* tasks; struct worker* workers; // 同步机制 pthread_mutex_t m_mutex; // exclusive lock pthread_cond_t m_cond; // cond to make thread to wait task in hang up }thread_pool;
-
然后定义接口方法-线程池创建、销毁、任务添加、线程回调函数、任务接口函数
- 线程回调函数
void* thread_call_back(void* arg){ struct worker* m_worker = (struct worker* )arg; printf("execute call_back: %d!\n", m_worker->thread_number); // 使用manager的cond和mutex避免死锁 // 线程需要一直等待任务唤醒 while(1){ pthread_mutex_lock(&m_worker->m_manger->m_mutex); // 队列空就忙等 while(m_worker->m_manger->tasks==NULL){ if(m_worker->thread_close == 1){ break; } pthread_cond_wait(&m_worker->m_manger->m_cond, &m_worker->m_manger->m_mutex); // 队列空,线程就一直挂起等待资源 } if(m_worker->thread_close == 1){ pthread_mutex_unlock(&m_worker->m_manger->m_mutex); break; } struct task* m_task = m_worker->m_manger->tasks; printf("执行第几个包: %d\n", *(int*)m_task->user_data); LIST_DELETE(m_task, m_worker->m_manger->tasks); // 否则取任务 m_task->task_func(m_task); // 执行任务 m_task = NULL; pthread_mutex_unlock(&m_worker->m_manger->m_mutex); } printf("thread destroy %d\n", m_worker->thread_number); free(m_worker); m_worker = NULL; return 0; }
- 线程池创建(通常一个线程占用8M空间)
int threadpool_create(thread_pool* pool, int thread_count){ if(pool==NULL) return -1; if(thread_count < 1) return -2; // 初始化同步机制 // struct worker m_worker = {0}; #if 1 // 堆内存分配 for(int i = 0; i < thread_count; ++i){ struct worker* m_worker = (struct worker*)malloc(sizeof(struct worker)); if(m_worker == NULL) { perror("malloc"); return -2; } m_worker->m_manger = pool; m_worker->thread_close = 0; m_worker->thread_number = i+1; // 线程从1开始编号 LIST_INSERT(m_worker, pool->workers); // 创建线程 int ret = pthread_create(&m_worker->t_id,NULL, thread_call_back, m_worker); if(ret != 0){ // 分配错误就释放 perror("pthread_create"); free(m_worker); m_worker = NULL; return -3; } printf("thread create success: %d!\n", m_worker->thread_number); } #elif 0 struct worker m_worker[thread_count]; // 动态数组的组织方式 memset(m_worker, 0 ,sizeof(m_worker)); // 栈内存 for(int i = 0; i < thread_count; ++i){ m_worker[i].thread_number = i+1; pthread_create(&m_worker[i].t_id, NULL, thread_call_back, &m_worker[i]); } #endif return 0; }
- 线程池销毁
int threadpool_destroy(thread_pool* pool, int workers_count){ // 销毁怎么做,其实就是广播或者一个一个给线程发送条件满足信号,唤醒所有线程 struct worker* p = NULL; while(pool->workers){ p = pool->workers; p->thread_close = 1; pool->workers = pool->workers->next; } pthread_mutex_lock(&pool->m_mutex); pthread_cond_broadcast(&pool->m_cond); pthread_mutex_unlock(&pool->m_mutex); pool->workers =NULL; pool->tasks =NULL; printf("list is clear!\n"); return 0; }
- 添加任务
int push_tasks(thread_pool* pool, struct task* m_task){ pthread_mutex_lock(&pool->m_mutex); LIST_INSERT(m_task, pool->tasks); // 插入任务 pthread_cond_signal(&pool->m_cond); // 唤醒一个线程去处理 pthread_mutex_unlock(&pool->m_mutex); return 0; }
- 任务接口
void task_entry(struct Tasks* task){ int idx = *(int*)task->user_data; printf("idx: %d\n", idx); free(task->user_data); free(task); }
- 主函数
int main(){ thread_pool pool = {0}; pthread_mutex_init(&pool.m_mutex, NULL); pthread_cond_init(&pool.m_cond, NULL); threadpool_create(&pool, THREAD_COUNT); // 可能在执行任务期间,它可能已经结束了 for(int i = 0 ; i < TASK_COUNT; ++i){ struct task* p_task = (struct task*)malloc(sizeof(struct task)); p_task->task_func = task_entry; // user_data指向的地方其实是未分配的,需要进行分配 p_task->user_data = malloc(sizeof(int)); *(int*)p_task->user_data = i+1; push_tasks(&pool, p_task); } threadpool_destroy(&pool, worker_count); getchar(); return 0; }
复盘问题
-
为什么会出现段错误?
一般来说都是因为访问空指针从而引发段错误,所以每个结构体都需要进行初始化。
-
主线程不等子线程会发生什么?
也会发生段错误,因为子线程会访问已经释放的空指针,所以主线程 要等待子线程完成再结束。
-
为什么不能都分配为栈内存?
注意一定是分配堆内存才能够释放,而且挂到队列上的东西,一定要堆内存分配,因为栈内存的生命周期短。如果任务分配栈内存,那么它在存在于函数结束的时候,那么会产生一种状况,虽然其加入到队列了,但是其已经被回收了,那么会线程访问会产生未定义行为。