首页 > 其他分享 >【高性能组件(1)】手写线程池

【高性能组件(1)】手写线程池

时间:2024-11-27 16:02:01浏览次数:8  
标签:thrdpool task queue 线程 pthread 组件 手写 pool

文章目录


前言

本文首先介绍了线程池的优点、构成,并手写了一个比较标准的线程池。


一、线程池介绍

线程池是一种维持管理固定数量线程的池式结构

同时也有很多其他的池式结构,例如连接池、对象池、内存池,这些结构的共同特点都是:复用资源

1.1 为什么需要线程池?

当某类任务特别耗时,且严重影响到该线程(生产者线程)处理其他任务。

  1. 耗时等待
  2. 耗时处理

当出现这种情况时可以将这类任务交给其他线程处理以提高性能。

1.2 线程池的作用

  1. 性能优化:不过度占用核心线程,异步执行耗时任务(等待或处理)
  2. 并发执行:充分利用多核,并发执行核心业务
  3. 复用线程资源
  4. 异步处理生产者线程的任务
  5. 减少多个任务的执行时间

1.3 线程池的构成

通常由3部分组成:生产者线程、任务队列、消费者线程(线程池)。

  1. 生产者线程发布耗时任务,一般是将耗时任务放入队列之中,采用条件变量来通知消费者线程。

  2. 任务队列中会保存任务上下文,提供相应的任务回调函数。

  3. 消费者线程会从任务队列之中取出任务,然后执行任务。线程调度顺序由条件变量或信号量等通知消费者线程,任务处理完成会休眠活跃的线程。

二、手写线程池

2.1 接口设计

2.1.1 封装原则

对于线程池封装原则,也就是应该给用户暴露哪些信息。

  1. 隐藏实现的细节,暴露出使用的接口。也就是在.h文件提供接口,而实现细节在.c文件
  2. 用户并不需要知道线程池的具体结构
  3. 用户需要知道任务在线程中是以何种形式执行的,比如需要提供什么回调函数
  4. 用户不需要知道线程管理的细节
  5. 用户不需要知道队列

2.1.2 创建线程池的接口

复杂资源创建,通常需要对称式的接口设计,有资源的创建,就有资源的销毁

// 对称处理
// 创建线程池资源
thrdpool_t *thrdpool_create(int thrd_count);
// 优雅退出
void thrdpool_terminate(thrdpool_t * pool);
// 增加任务
int thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg);
// 销毁线程,释放资源
void thrdpool_waitdone(thrdpool_t *pool);

上面四个接口是提供给用户使用的。

2.2 数据结构设计

typedef struct spinlock spinlock_t;

typedef struct task_s {
    void *next;
    handler_pt func;
    void *arg;
} task_t;

typedef struct task_queue_s {
    void *head;
    void **tail; // 指向任务(task_s)的next指针
    int block;
    spinlock_t lock; // 自旋锁,队列操作可以单独加锁
    pthread_mutex_t mutex; // 互斥锁(队列操作可以复用)
    pthread_cond_t cond; // 条件变量
} task_queue_t;

struct thrdpool_s {
    task_queue_t *task_queue;
    atomic_int quit;
    int thrd_count;
    pthread_t *threads;
};

在这里插入图片描述

2.3 线程池线程数量选择

2.3.1 维持固定数量线程

  1. 线程数量的增加,由于系统资源的限制,并不能带来性能提升,反而会带来负担
  2. 避免线程频繁的创建和销毁

2.3.2 线程数量选择

目的:充分利用系统资源

  1. CPU密集型:采用经验,使用CPU的核心个数
  2. IO密集型:采用经验,thread_num = (线程等待时间 + CPU运算时间) * CPU核心个数 / CPU运算时间
    1. 通常是2倍CPU核心数

2.4 具体编码实现

2.4.1 外部接口实现

void
thrdpool_terminate(thrdpool_t * pool) {
    atomic_store(&pool->quit, 1);
    __nonblock(pool->task_queue);
}
// 资源创建使用回滚式代码
thrdpool_t *
thrdpool_create(int thrd_count) {
    thrdpool_t *pool;

    pool = (thrdpool_t*)malloc(sizeof(*pool));
    if (pool) {
        task_queue_t *queue = __taskqueue_create();
        if (queue) {
            pool->task_queue = queue;
            atomic_init(&pool->quit, 0);
            if (__threads_create(pool, thrd_count) == 0)
                return pool;
            __taskqueue_destroy(queue); // 回滚
        }
        free(pool); // 回滚
    }
    return NULL;
}
// 业务逻辑使用防御式代码
// --把不满足条件的进行过滤--
int
thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg) {
    if (atomic_load(&pool->quit) == 1) 
        return -1;
    task_t *task = (task_t*) malloc(sizeof(task_t));
    if (!task) return -1;
    task->func = func;
    task->arg = arg;
    __add_task(pool->task_queue, task);
    return 0;
}

void
thrdpool_waitdone(thrdpool_t *pool) {
    int i;
    for (i=0; i<pool->thrd_count; i++) {
        pthread_join(pool->threads[i], NULL);
    }
    __taskqueue_destroy(pool->task_queue);
    free(pool->threads);
    free(pool);
}

2.4.2 内部接口实现

static void
__nonblock(task_queue_t *queue) {
    pthread_mutex_lock(&queue->mutex);
    queue->block = 0;
    pthread_mutex_unlock(&queue->mutex);
    pthread_cond_broadcast(&queue->cond);
}

static task_queue_t *
__taskqueue_create() {
    int ret;
    task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
    if (queue) {
        ret = pthread_mutex_init(&queue->mutex, NULL);
        if (ret == 0) {
            ret = pthread_cond_init(&queue->cond, NULL);
            if (ret == 0) {
                spinlock_init(&queue->lock);
                queue->head = NULL;
                queue->tail = &queue->head;
                queue->block = 1;
                return queue;
            }
            pthread_mutex_destroy(&queue->mutex);
        }
        free(queue);
    }
    return NULL;
}

// 流程逻辑,防御式代码
static inline void 
__add_task(task_queue_t *queue, void *task) {
    // 不限定任务类型,只要该任务的结构起始内存是一个用于链接下一个节点的指针
    void **link = (void**)task;
    *link = NULL;

    spinlock_lock(&queue->lock);
    *queue->tail /* 等价于 queue->tail->next */ = link;
    queue->tail = link;
    spinlock_unlock(&queue->lock);
    pthread_cond_signal(&queue->cond);
}

static inline void * 
__pop_task(task_queue_t *queue) {
    spinlock_lock(&queue->lock);
    if (queue->head == NULL) {
        spinlock_unlock(&queue->lock);
        return NULL;
    }
    task_t *task;
    task = queue->head;

    void **link = (void**)task;
    queue->head = *link;

    if (queue->head == NULL) {
        queue->tail = &queue->head;
    }
    spinlock_unlock(&queue->lock);
    return task;
}

static inline void * 
__get_task(task_queue_t *queue) {
    task_t *task;
    // 虚假唤醒
    while ((task = __pop_task(queue)) == NULL) {
        pthread_mutex_lock(&queue->mutex);
        if (queue->block == 0) {
            pthread_mutex_unlock(&queue->mutex);
            return NULL;
        }
        // 1. 先 unlock(&mtx)
        // 2. 在 cond 休眠
        // --- __add_task 时唤醒
        // 3. 在 cond 唤醒
        // 4. 加上 lock(&mtx);
        pthread_cond_wait(&queue->cond, &queue->mutex);
        pthread_mutex_unlock(&queue->mutex);
    }
    return task;
}

static void
__taskqueue_destroy(task_queue_t *queue) {
    task_t *task;
    while ((task = __pop_task(queue))) {
        free(task);
    }
    spinlock_destroy(&queue->lock);
    pthread_cond_destroy(&queue->cond);
    pthread_mutex_destroy(&queue->mutex);
    free(queue);
}

static void *
__thrdpool_worker(void *arg) {
    thrdpool_t *pool = (thrdpool_t*) arg;
    task_t *task;
    void *ctx;

    while (atomic_load(&pool->quit) == 0) {
        task = (task_t*)__get_task(pool->task_queue);
        if (!task) break;
        handler_pt func = task->func;
        ctx = task->arg;
        free(task);
        func(ctx);
    }
    
    return NULL;
}

static void 
__threads_terminate(thrdpool_t * pool) {
    atomic_store(&pool->quit, 1);
    __nonblock(pool->task_queue);
    int i;
    for (i=0; i<pool->thrd_count; i++) {
        pthread_join(pool->threads[i], NULL);
    }
}

static int 
__threads_create(thrdpool_t *pool, size_t thrd_count) {
    pthread_attr_t attr;
	int ret;

    ret = pthread_attr_init(&attr);

    if (ret == 0) {
        pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thrd_count);
        if (pool->threads) {
            int i = 0;
            for (; i < thrd_count; i++) {
                if (pthread_create(&pool->threads[i], &attr, __thrdpool_worker, pool) != 0) {
                    break;
                }
            }
            pool->thrd_count = i;
            pthread_attr_destroy(&attr);
            if (i == thrd_count)
                return 0;
            __threads_terminate(pool);
            free(pool->threads);
        }
        ret = -1;
    }
    return ret; 
}

总结

本文介绍了线程池的优点、构成,以及手写了一个比较标准的线程池。作为最重要的基础组件之一,明白原理以及能自己写出来是必要的。

参考链接:
https://github.com/0voice

标签:thrdpool,task,queue,线程,pthread,组件,手写,pool
From: https://blog.csdn.net/Morso/article/details/144078984

相关文章

  • 将 Python 计算代码转换为渲染的 LaTeX,就像手写一样清晰易懂!
    handcalcs是一个非常实用的开源Python库,它的特别之处在于能够将Python计算结果转换为渲染的LaTeX格式,使得复杂的计算过程像手写公式一样清晰、直观。这个工具对工程师、科学家以及任何从事数值计算的人来说,都可以大幅提高表达计算过程的可读性和透明度。handcalcs......
  • 【Linux】多线程(POSIX信号量、线程池、线程安全)
    ......
  • uniapp 修改引入组件样式 使用/deep/、::v-deep、>>>不生效 解决
    //放置与data同级options:{styleIsolation:'shared'},<template><viewclass='container'></view></template><script>exportdefault{props:{},data:()=>({}),computed:{},methods:{},wa......
  • react父组件调用子组件内部的dom或方法
    在React中,通过 React.forwardRef 和 useImperativeHandle 可以实现将父组件的 ref 转发给子组件,从而引用子组件的DOM或方法。以下是实现的步骤和代码示例:importReact,{forwardRef,useImperativeHandle,useRef}from"react";constChild=forwardRef((props,......
  • AntDesign - Vue Table组件 实现动态表格、列宽计算(方式二)
    朋友们,按照上文(方式一)思路,实现了动态表格和表头分组,这篇按照方式一的需求,扩展出另一种代码写法;一、表格头表格columns还是定义在data(){}中,初始化静态列数组,配置项列由后端接口返回(第二点写动态配置项代码);在方式一基础上加了筛选菜单功能,因此变化代码部分如下......
  • 鸿蒙多线程开发——sendable共享容器
    1、异步锁机制在介绍共享容器之前,先介绍异步锁机制。为了解决多线程并发任务间的数据竞争问题,ArkTS引入了异步锁能力。异步锁可能会被类对象持有,因此为了更方便地在并发实例间获取同一个异步锁对象,AsyncLock对象支持跨线程引用传递。由于ArkTS语言支持异步操作,阻塞锁容易产......
  • 介绍程序、进程与线程和计算机网络体系结构和计算机硬件组成以及计算机网络分类
    程序、进程与线程程序:程序是一组有序的指令,它定义了计算机执行特定任务的步骤。程序通常以源代码的形式存在,需要编译或解释成机器语言,计算机才能执行。程序是静态的,它只是指令和数据的集合,没有执行的概念。进程:进程是程序的执行实例。当程序被加载到内存并开始运行时,它就变......
  • 进程和线程的区别
    一、进程(一)进程的概念进程是指程序的一次执行过程,它具有生命周期,从创建到终止经历了一系列的状态变化。(二)进程的作用进程作为操作系统进行资源分配的基本单位,主要目的是为了实现程序的并发执行,从而提高系统的资源利用效率和处理能力。(三)进程的限制由于进程涉及资源的分配与......
  • 【Linux】线程同步与互斥
    文章目录1.线程互斥1.1进程线程间的互斥相关背景概念1.2互斥量mutex1.3相关操作1.4互斥量实现原理1.5互斥量的封装2.线程同步2.1条件变量2.2生产者消费者模型2.3基于BlockingQueue的生产者消费者模型2.4信号量2.5基于环形队列的生产消费模型3.线程池3.1......
  • Java进阶六-多线程
    一多线程相关概念进程(Process):进程是程序的基本执行实体。进程是操作系统分配资源的基本单位。每个进程都有自己的内存空间、代码段、数据段等。进程之间相互独立,一个进程的崩溃不会影响其他进程。进程是程序的基本执行实体。线程(Thread): 应用软件中相互独立,可以......