首页 > 其他分享 >手写线程池

手写线程池

时间:2024-07-26 11:40:38浏览次数:13  
标签:task struct thread worker 线程 手写 pool

手写线程池

线程池解决的问题是避免线程创建、销毁的代价以及避免线程太多,内存耗尽。GCC编译时,必须通过指令引入线程库。

gcc -o threadpool threadpool.c -pthread

组件开发

线程池最基本需要包含三个组件

  • 任务队列-添加任务
  • 管理组件-管理线程对立以及任务队列
  • 执行队列-线程队列-访问管理组件,从管理任务组件中取出任务

实现思路

  1. 首先定义结构

    任务队列

    // 定义任务队列,实现的功能是需要注册到管理者
    struct task{ // 
        void (*task_func) (struct task*);
        void* user_data; // callback
        struct task* prev;
        struct task* next;
    };
    

    执行队列

    // 执行队列,需要下载任务,注册到管理者
    struct worker{
        pthread_t t_id;
        int thread_number;
        int thread_close;
        struct worker* prev;
        struct worker* next;
        struct manager* m_manger;
    };
    

    管理队列

    // 管理者,管理执行者和任务,其实这里就是线程池
    typedef struct manager{
        struct task* tasks;
        struct worker* workers;
    
        // 同步机制
        pthread_mutex_t m_mutex;  // exclusive lock
        pthread_cond_t m_cond;    // cond to make thread to wait task in hang up
    }thread_pool;
    
  2. 然后定义接口方法-线程池创建、销毁、任务添加、线程回调函数、任务接口函数

    • 线程回调函数
    void* thread_call_back(void* arg){
        struct worker* m_worker = (struct worker* )arg;
        printf("execute call_back: %d!\n", m_worker->thread_number);
        
        // 使用manager的cond和mutex避免死锁
        
        // 线程需要一直等待任务唤醒
        while(1){
            pthread_mutex_lock(&m_worker->m_manger->m_mutex);
            // 队列空就忙等
            while(m_worker->m_manger->tasks==NULL){
                if(m_worker->thread_close  == 1){
                    break;
                }
                pthread_cond_wait(&m_worker->m_manger->m_cond, &m_worker->m_manger->m_mutex); // 队列空,线程就一直挂起等待资源
            }
            if(m_worker->thread_close  == 1){
                pthread_mutex_unlock(&m_worker->m_manger->m_mutex);
                break;
            }
            struct task* m_task = m_worker->m_manger->tasks;
            printf("执行第几个包: %d\n", *(int*)m_task->user_data);
            LIST_DELETE(m_task, m_worker->m_manger->tasks);
            // 否则取任务
            m_task->task_func(m_task); // 执行任务
            m_task = NULL;
            pthread_mutex_unlock(&m_worker->m_manger->m_mutex);
        }
    
        printf("thread destroy %d\n", m_worker->thread_number);
        
        free(m_worker);
        m_worker = NULL;
        
        return 0;
    }
    
    • 线程池创建(通常一个线程占用8M空间)
    int threadpool_create(thread_pool* pool, int thread_count){
        if(pool==NULL) return -1;
        if(thread_count < 1) return -2;
    
        // 初始化同步机制
    
        // struct worker m_worker = {0};
    #if 1
         // 堆内存分配
        for(int i = 0; i < thread_count; ++i){
            struct worker* m_worker = (struct worker*)malloc(sizeof(struct worker));
            if(m_worker == NULL) {
                perror("malloc");
                return -2;
            }
            
            m_worker->m_manger = pool;
            m_worker->thread_close = 0;
            m_worker->thread_number = i+1; // 线程从1开始编号
            LIST_INSERT(m_worker, pool->workers);
            // 创建线程
            int ret = pthread_create(&m_worker->t_id,NULL, thread_call_back, m_worker);
            if(ret != 0){ // 分配错误就释放
                perror("pthread_create");
                free(m_worker);
                m_worker = NULL;
                return -3;
            }
            
            printf("thread create success: %d!\n", m_worker->thread_number);
        }
    #elif 0
        struct worker m_worker[thread_count]; // 动态数组的组织方式
        memset(m_worker, 0 ,sizeof(m_worker)); // 栈内存
        for(int i = 0; i < thread_count; ++i){
            m_worker[i].thread_number = i+1;
            pthread_create(&m_worker[i].t_id, NULL, thread_call_back, &m_worker[i]);
        }
    #endif
        return 0;
    }
    
    • 线程池销毁
    int threadpool_destroy(thread_pool* pool, int workers_count){
        // 销毁怎么做,其实就是广播或者一个一个给线程发送条件满足信号,唤醒所有线程
        struct worker* p = NULL;
        while(pool->workers){
            p = pool->workers;
            p->thread_close = 1;
            pool->workers = pool->workers->next;
        }
        pthread_mutex_lock(&pool->m_mutex);
        pthread_cond_broadcast(&pool->m_cond);
        pthread_mutex_unlock(&pool->m_mutex);
        
        pool->workers =NULL;
        pool->tasks =NULL;
    
        printf("list is clear!\n");
        return 0;
    }
    
    • 添加任务
    int push_tasks(thread_pool* pool, struct task* m_task){
        pthread_mutex_lock(&pool->m_mutex);
        
        LIST_INSERT(m_task, pool->tasks); // 插入任务
        pthread_cond_signal(&pool->m_cond); // 唤醒一个线程去处理
        pthread_mutex_unlock(&pool->m_mutex);
        return 0;
    }
    
    • 任务接口
    void task_entry(struct Tasks* task){
        int idx = *(int*)task->user_data;
        printf("idx: %d\n", idx);
        free(task->user_data);
        free(task);
    }
    
    • 主函数
    int main(){
        thread_pool pool = {0};
        pthread_mutex_init(&pool.m_mutex, NULL);
        pthread_cond_init(&pool.m_cond, NULL);
        threadpool_create(&pool, THREAD_COUNT); // 可能在执行任务期间,它可能已经结束了
        for(int i = 0 ; i < TASK_COUNT; ++i){
            struct task* p_task  = (struct task*)malloc(sizeof(struct task));
            p_task->task_func = task_entry;
            // user_data指向的地方其实是未分配的,需要进行分配
            p_task->user_data  = malloc(sizeof(int));
            *(int*)p_task->user_data  = i+1;
            push_tasks(&pool, p_task);
        }
        threadpool_destroy(&pool, worker_count);
        getchar();
        return 0;
    }
    

    复盘问题

    1. 为什么会出现段错误?

      一般来说都是因为访问空指针从而引发段错误,所以每个结构体都需要进行初始化。

    2. 主线程不等子线程会发生什么?

      也会发生段错误,因为子线程会访问已经释放的空指针,所以主线程 要等待子线程完成再结束。

    3. 为什么不能都分配为栈内存?

      注意一定是分配堆内存才能够释放,而且挂到队列上的东西,一定要堆内存分配,因为栈内存的生命周期短。如果任务分配栈内存,那么它在存在于函数结束的时候,那么会产生一种状况,虽然其加入到队列了,但是其已经被回收了,那么会线程访问会产生未定义行为。

标签:task,struct,thread,worker,线程,手写,pool
From: https://www.cnblogs.com/solicit/p/18324986

相关文章

  • 线程的核心原理
    线程调度模型1分时调度模型:系统平均分配CPU时间片,所有线程轮流占用CPU.2抢占式调度模型:系统按照线程优先级来分配CPU时间片,优先级高的线程获取CPU执行时间相对多一些.线程的优先级Thread类里的这个属性privateintpriority代表线程的优先级.优先级值的范围为1-10.......
  • 多线程的创建方式
    线程的创建方式1:通过继承Thread类创建一个线程类.子类重写Thread的run方法即可.publicclassThreadTwoTestextendsThread{publicstaticvoidmain(String[]args){ThreadTwoTestthreadTwoTest=newThreadTwoTest();threadTwoTest.start();......
  • 多层全连接神经网络(八)---实现MNIST手写数字分类
    简单的三层全连接神经网络    在PyTorch里面可以很简单地定义三层全连接神经网络。classsimpleNet(nn.Module):def__init__(self,in_dim,n_hidden_1,n_hidden_2,out_dim):super(simpleNet,self).__init__()self.layer1=nn.Linear(......
  • 第三章 进程线程模型
    第三章进程线程模型进程1、并发环境与多道程序设计(1)程序的顺序执行程序:指令或者语句序列;体现了某种算法;所以程序是顺序的特点:顺序性;封闭性;程序执行结果的确定性;程序结果的可再现性(2)多道程序设计定义:计算机能够同时处理多个具有独立功能的程序;以增强系统的处理能力和提高机......
  • 多线程补充(上)
    线程安全问题首先,什么是线程安全问题呢?线程安全问题指的是,多个线程同时操作同一个共享资源的时候,可能会出现业务安全问题。场景:小明和小红是一对夫妻,他们有一个共享账户,余额是10万元,小红和小明同时来取钱,并且2人各自都在取钱10万元,可能出现什么问题呢?小明和小红假设都是......
  • 多线程创建方式和常用方法
    线程其实是程序中的一条执行路径。怎样的程序才是多线程程序呢?可以同时有很多人一起进入的网站,而且每一个人互不影响。比如百度网盘,可以同时下载或者上传多个文件。这些程序中其实就有多条执行路径,每一条执行执行路径就是一条线程,所以这样的程序就是多线程程序。Java提供了几......
  • 实验15.多线程调度
    简介实验.多线程调度内核线程1.在时钟中断函数中处理中,减少当前线程pcb的tick,tick为0则启动调度2.调度,把当前线程pcb放入就绪队列队尾,把就绪线程队列队首拿出来执行主要代码引导省略内核list.c//文件:list.c//时间:2024-07-25//来自:ccj//描......
  • 手写CountDownLatch
    手写CountDownLatch思路1. 设置aqs类中的状态为2;2. 调用await方法,让当前线程变为阻塞3. 调用countDown方法的时候状态-1,如果状态=0的情况下,则唤醒刚才阻塞的线程 publicclassMyCountDownLatch{privateSyncsync;privateMyCountDownLatch(intcount){......
  • 手写Semaphore信号量
    publicclassMySemaphore{privateSyncsync;publicMySemaphore(intcount){sync=newSync(count);}publicvoidacquire(){sync.acquireShared(1);}publicvoidrelease(){sync.releaseShared(1);......
  • 【多线程】单例模式
    ......