文章目录
前言
本文首先介绍了线程池的优点、构成,并手写了一个比较标准的线程池。
一、线程池介绍
线程池是一种维持管理固定数量线程的池式结构。
同时也有很多其他的池式结构,例如连接池、对象池、内存池,这些结构的共同特点都是:复用资源。
1.1 为什么需要线程池?
当某类任务特别耗时,且严重影响到该线程(生产者线程)处理其他任务。
- 耗时等待
- 耗时处理
当出现这种情况时可以将这类任务交给其他线程处理以提高性能。
1.2 线程池的作用
- 性能优化:不过度占用核心线程,异步执行耗时任务(等待或处理)
- 并发执行:充分利用多核,并发执行核心业务
- 复用线程资源
- 异步处理生产者线程的任务
- 减少多个任务的执行时间
1.3 线程池的构成
通常由3部分组成:生产者线程、任务队列、消费者线程(线程池)。
-
生产者线程发布耗时任务,一般是将耗时任务放入队列之中,采用条件变量来通知消费者线程。
-
任务队列中会保存任务上下文,提供相应的任务回调函数。
-
消费者线程会从任务队列之中取出任务,然后执行任务。线程调度顺序由条件变量或信号量等通知消费者线程,任务处理完成会休眠活跃的线程。
二、手写线程池
2.1 接口设计
2.1.1 封装原则
对于线程池封装原则,也就是应该给用户暴露哪些信息。
- 隐藏实现的细节,暴露出使用的接口。也就是在
.h
文件提供接口,而实现细节在.c
文件 - 用户并不需要知道线程池的具体结构
- 用户需要知道任务在线程中是以何种形式执行的,比如需要提供什么回调函数
- 用户不需要知道线程管理的细节
- 用户不需要知道队列
2.1.2 创建线程池的接口
复杂资源创建,通常需要对称式的接口设计,有资源的创建,就有资源的销毁。
// 对称处理
// 创建线程池资源
thrdpool_t *thrdpool_create(int thrd_count);
// 优雅退出
void thrdpool_terminate(thrdpool_t * pool);
// 增加任务
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
// 销毁线程,释放资源
void thrdpool_waitdone(thrdpool_t *pool);
上面四个接口是提供给用户使用的。
2.2 数据结构设计
typedef struct spinlock spinlock_t;
typedef struct task_s {
void *next;
handler_pt func;
void *arg;
} task_t;
typedef struct task_queue_s {
void *head;
void **tail; // 指向任务(task_s)的next指针
int block;
spinlock_t lock; // 自旋锁,队列操作可以单独加锁
pthread_mutex_t mutex; // 互斥锁(队列操作可以复用)
pthread_cond_t cond; // 条件变量
} task_queue_t;
struct thrdpool_s {
task_queue_t *task_queue;
atomic_int quit;
int thrd_count;
pthread_t *threads;
};
2.3 线程池线程数量选择
2.3.1 维持固定数量线程
- 线程数量的增加,由于系统资源的限制,并不能带来性能提升,反而会带来负担
- 避免线程频繁的创建和销毁
2.3.2 线程数量选择
目的:充分利用系统资源
- CPU密集型:采用经验,使用CPU的核心个数
- IO密集型:采用经验,
thread_num = (线程等待时间 + CPU运算时间) * CPU核心个数 / CPU运算时间
- 通常是2倍CPU核心数
2.4 具体编码实现
2.4.1 外部接口实现
void
thrdpool_terminate(thrdpool_t * pool) {
atomic_store(&pool->quit, 1);
__nonblock(pool->task_queue);
}
// 资源创建使用回滚式代码
thrdpool_t *
thrdpool_create(int thrd_count) {
thrdpool_t *pool;
pool = (thrdpool_t*)malloc(sizeof(*pool));
if (pool) {
task_queue_t *queue = __taskqueue_create();
if (queue) {
pool->task_queue = queue;
atomic_init(&pool->quit, 0);
if (__threads_create(pool, thrd_count) == 0)
return pool;
__taskqueue_destroy(queue); // 回滚
}
free(pool); // 回滚
}
return NULL;
}
// 业务逻辑使用防御式代码
// --把不满足条件的进行过滤--
int
thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg) {
if (atomic_load(&pool->quit) == 1)
return -1;
task_t *task = (task_t*) malloc(sizeof(task_t));
if (!task) return -1;
task->func = func;
task->arg = arg;
__add_task(pool->task_queue, task);
return 0;
}
void
thrdpool_waitdone(thrdpool_t *pool) {
int i;
for (i=0; i<pool->thrd_count; i++) {
pthread_join(pool->threads[i], NULL);
}
__taskqueue_destroy(pool->task_queue);
free(pool->threads);
free(pool);
}
2.4.2 内部接口实现
static void
__nonblock(task_queue_t *queue) {
pthread_mutex_lock(&queue->mutex);
queue->block = 0;
pthread_mutex_unlock(&queue->mutex);
pthread_cond_broadcast(&queue->cond);
}
static task_queue_t *
__taskqueue_create() {
int ret;
task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
if (queue) {
ret = pthread_mutex_init(&queue->mutex, NULL);
if (ret == 0) {
ret = pthread_cond_init(&queue->cond, NULL);
if (ret == 0) {
spinlock_init(&queue->lock);
queue->head = NULL;
queue->tail = &queue->head;
queue->block = 1;
return queue;
}
pthread_mutex_destroy(&queue->mutex);
}
free(queue);
}
return NULL;
}
// 流程逻辑,防御式代码
static inline void
__add_task(task_queue_t *queue, void *task) {
// 不限定任务类型,只要该任务的结构起始内存是一个用于链接下一个节点的指针
void **link = (void**)task;
*link = NULL;
spinlock_lock(&queue->lock);
*queue->tail /* 等价于 queue->tail->next */ = link;
queue->tail = link;
spinlock_unlock(&queue->lock);
pthread_cond_signal(&queue->cond);
}
static inline void *
__pop_task(task_queue_t *queue) {
spinlock_lock(&queue->lock);
if (queue->head == NULL) {
spinlock_unlock(&queue->lock);
return NULL;
}
task_t *task;
task = queue->head;
void **link = (void**)task;
queue->head = *link;
if (queue->head == NULL) {
queue->tail = &queue->head;
}
spinlock_unlock(&queue->lock);
return task;
}
static inline void *
__get_task(task_queue_t *queue) {
task_t *task;
// 虚假唤醒
while ((task = __pop_task(queue)) == NULL) {
pthread_mutex_lock(&queue->mutex);
if (queue->block == 0) {
pthread_mutex_unlock(&queue->mutex);
return NULL;
}
// 1. 先 unlock(&mtx)
// 2. 在 cond 休眠
// --- __add_task 时唤醒
// 3. 在 cond 唤醒
// 4. 加上 lock(&mtx);
pthread_cond_wait(&queue->cond, &queue->mutex);
pthread_mutex_unlock(&queue->mutex);
}
return task;
}
static void
__taskqueue_destroy(task_queue_t *queue) {
task_t *task;
while ((task = __pop_task(queue))) {
free(task);
}
spinlock_destroy(&queue->lock);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->mutex);
free(queue);
}
static void *
__thrdpool_worker(void *arg) {
thrdpool_t *pool = (thrdpool_t*) arg;
task_t *task;
void *ctx;
while (atomic_load(&pool->quit) == 0) {
task = (task_t*)__get_task(pool->task_queue);
if (!task) break;
handler_pt func = task->func;
ctx = task->arg;
free(task);
func(ctx);
}
return NULL;
}
static void
__threads_terminate(thrdpool_t * pool) {
atomic_store(&pool->quit, 1);
__nonblock(pool->task_queue);
int i;
for (i=0; i<pool->thrd_count; i++) {
pthread_join(pool->threads[i], NULL);
}
}
static int
__threads_create(thrdpool_t *pool, size_t thrd_count) {
pthread_attr_t attr;
int ret;
ret = pthread_attr_init(&attr);
if (ret == 0) {
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thrd_count);
if (pool->threads) {
int i = 0;
for (; i < thrd_count; i++) {
if (pthread_create(&pool->threads[i], &attr, __thrdpool_worker, pool) != 0) {
break;
}
}
pool->thrd_count = i;
pthread_attr_destroy(&attr);
if (i == thrd_count)
return 0;
__threads_terminate(pool);
free(pool->threads);
}
ret = -1;
}
return ret;
}
总结
本文介绍了线程池的优点、构成,以及手写了一个比较标准的线程池。作为最重要的基础组件之一,明白原理以及能自己写出来是必要的。
标签:thrdpool,task,queue,线程,pthread,组件,手写,pool From: https://blog.csdn.net/Morso/article/details/144078984参考链接:
https://github.com/0voice