首页 > 其他分享 >简易线程池实现

简易线程池实现

时间:2024-04-04 09:13:48浏览次数:24  
标签:实现 cond 简易 线程 pthread queueFront mutex pool

程序中,频繁地调用pthread_create函数创建线程,非常浪费时间,尤其在服务器端的时候,多线程的使用情况下频繁启用线程和释放线程资源,这样也会影响程序的运行效率。

可以先在服务器空闲的时候先创建多个线程,我们先在线程函数里面使用pthread_cond_wait将其休眠,有任务过来的时候使用pthread_cond_signal启用线程。

 

多线程(2)-线程同步条件变量 - lethe1203 - 博客园 (cnblogs.com)一节demo中,使用条件变量和Mutex实现生产者和消费者模型,有一点很关键:

pthread_cond_wait函数里面发生了什么?  --释放了互斥锁,pthread_cond_wait函数里面第二个参数是互斥锁

生产者线程如果没有通知消费者线程,消费者线程就无法把锁unlock,这样也保证了生产者线程对临界区的唯一性访问

 

这也就是下面简易线程池的核心所在:

threadpool.h

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
 
// queue node
typedef struct Task 
{
        void (*function)(void *arg);                                                                 
        void *arg;
        struct Task *next;
} Task;
 
typedef struct ThreadPool
{
        Task *queueFront;
        Task *queueRear;
 
        int num;
 
        pthread_t *threadID;
 
        pthread_mutex_t mutex;
 
        pthread_cond_t cond;
 
        // shutdown pool
        int shutdown;
} ThreadPool;

 

threadpool.c

#include "threadpool.h"
 
void *worker(void *arg)
{
        ThreadPool *pool = (ThreadPool *)arg;
        while (1) 
        {
                pthread_mutex_lock(&pool->mutex);
 
                // 如果任务队列为空,且线程池没有被关闭,线程睡眠
                while (pool->queueFront == pool->queueRear && pool->shutdown == 0)
                {
                        // 阻塞等待任务
                        pthread_cond_wait(&pool->cond, &pool->mutex);
                }
 
                if (pool->shutdown == 1) {
                        printf("--------------------debug8\n");                        
                        pthread_mutex_unlock(&pool->mutex);
                        printf("thread shutdown thread %ld exit ...\n", pthread_self());
                        pthread_exit((void *)0);
                }
 
                Task task;
                Task *t = pool->queueFront->next;
                task.function = t->function;
                task.arg = t->arg;
                pool->queueFront->next = t->next;
                free(t);
 
                if (pool->queueFront->next == NULL) {
                        pool->queueRear = pool->queueFront;
                }
 
                pthread_mutex_unlock(&pool->mutex);
 
                printf("thread %ld start working ...\n", pthread_self());
                task.function(task.arg);        // 函数指针调用函数
                printf("thread %ld end working ...\n", pthread_self());
        }
}
 
ThreadPool *create_thread_pool(int num)
{
        // printf("--------------------debug1");
        ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
        if (pool == NULL) {
                printf("malloc threadpool memory failed\n");
                return NULL;
        }
        // printf("--------------------debug2");
 
        pool->queueFront = (Task *)malloc(sizeof(Task));
        if (pool->queueFront == NULL) {
                fprintf(stderr, "malloc Task failed\n");
                goto pfree;
                return NULL;
        }
 
        // printf("--------------------debug3");
        // pool->queueRear = pool->queueFront;
        pool->queueFront->next =  NULL;
        pool->queueRear = pool->queueFront;
 
        pool->num = num;
        // printf("--------------------debug4");
 
        // init threadid
        pool->threadID = (pthread_t *)malloc(sizeof(pthread_t) * num);
        if (pool->threadID == NULL) {
                fprintf(stderr, "malloc threadID failed\n");
                goto qfree;
                return NULL;
        }
 
        // printf("--------------------debug5");
        for (int i = 0; i < num; i++) {
                if (pthread_create(&pool->threadID[i], NULL, worker, pool) != 0)
                {
                        fprintf(stderr, "pthread_create threadID failed\n");
                        goto tfree;
                        return NULL;
                }
                pthread_detach(pool->threadID[i]);        // pthread detach
        }
 
        // printf("--------------------debug6");
        pthread_mutex_init(&pool->mutex, NULL);
        pthread_cond_init(&pool->cond, NULL);
 
        pool->shutdown = 0;        // 0: running 1: shutsown
 
        return pool;
pfree:
        free(pool);
        return 0;
qfree:
        free(pool->queueFront);
        free(pool);
        return 0;
tfree:
        free(pool->threadID);
        free(pool->queueFront);
        free(pool);
        return 0;
}
 
void taskfunction(void *arg)
{
        int num = *(int *)arg;
        printf("thread %ld is working num = %d ...\n", pthread_self(), num);
 
        sleep(1);
 
        free(arg);
        // return 0;
}
 
void thread_pool_add(ThreadPool *pool, void (*func)(void *), void *arg)
{
        pthread_mutex_lock(&pool->mutex);
 
        Task *t = (Task *)malloc(sizeof(Task));
        if (t == NULL) {
                fprintf(stderr, "malloc Task failure\n");
                return;
        }
 
        // 一个任务包含三个部分,线程函数,参数,指向下一个任务节点的指针
        t->function = func;
        t->arg = arg;
        t->next = NULL;
        
        pool->queueRear->next = t;
        pool->queueRear = t;
        pthread_mutex_unlock(&pool->mutex);
 
        pthread_cond_signal(&pool->cond);
}
 
void thread_pool_destroy(ThreadPool *pool)
{
        // 关闭线程池
        pool->shutdown = 1;
 
        printf("--------------------debug7\n");
        // 唤醒所有线程
        for (int i = 0; i < pool->num; i++) {
                pthread_cond_signal(&pool->cond);
        }
 
        // 释放线程号
        if (pool->threadID) {
                free(pool->threadID);
        }
 
        // 释放任务队列
        while (pool->queueFront->next)
        {
                Task *t = pool->queueFront->next;
                pool->queueFront->next = t->next;
                free(t);
        }
 
        free(pool->queueFront);
 
        // 释放互斥锁和条件变量
        pthread_mutex_destroy(&pool->mutex);
        pthread_cond_destroy(&pool->cond);
 
        // 释放线程池
        free(pool);
}
 
int main()
{
        ThreadPool *pool = create_thread_pool(10);
        if (pool == NULL) {
                printf("create thread pool failed\n");
                return -1;
        }
 
        printf("threadPool create success.\n");
 
        sleep(1);
 
        for (int i = 0; i < 5; i++) {
                int *n = (int *)malloc(sizeof(int));
 
                *n = i;
 
                // 将任务添加到任务队列,taskfunction任务函数
                thread_pool_add(pool, taskfunction, n);
        }
        sleep(6);
 
        thread_pool_destroy(pool);
 
        return 0;
}

队列相当于是一个共享变量,这些子线程会不断地取任务去执行,有任务出队有任务入队,同一个时刻只能有一个任务进行操作。条件变量用来控制线程的睡眠、启动和销毁时机。

 

执行结果:

 

只提供大体思路,上面的程序暂时存在一些小问题,待解决

 

标签:实现,cond,简易,线程,pthread,queueFront,mutex,pool
From: https://www.cnblogs.com/lethe1203/p/18113895

相关文章

  • 量化交易入门(四十一)ASI指标Python实现和回测
    老规矩先上图,看看ASI指标使用苹果数据回测后的结果如何。一、策略运行结果执行的结果:StartingPortfolioValue:100000.00FinalPortfolioValue:92514.82AnnualizedReturn:-1.93%SharpeRatio:-0.27MaxDrawdown:25.34%MaxDrawdownPeriod:441唉,好像亏钱了......
  • 安全访问多线程环境:掌握 Java 并发集合的使用技巧
    哈喽,各位小伙伴们,你们好呀,我是喵手。  今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。  我是一名后端开发爱好者,工作日常接触到最多的就是Java语言啦,所以我都尽量抽业余时间把......
  • 常见面试题--动态规划介绍(附C++源码实现)
    关注我,持续分享逻辑思维&管理思维;可提供大厂面试辅导、及定制化求职/在职/管理/架构辅导;有意找工作的同学,请参考博主的原创:《面试官心得--面试前应该如何准备》,《面试官心得--面试时如何进行自我介绍》, 《做好面试准备,迎接2024金三银四》。【图解《程序员面试常见的十大算法......
  • 30 天精通 RxJS (08):简易拖拉实作 - take, first, takeUntil, concatAll
    我们今天要接着讲take,first,takeUntil,concatAll这四个operators,并且实作一个简易的拖拉功能。Operatorstaketake是一个很简单的operator,顾名思义就是取前几个元素后就结束,范例如下varsource=Rx.Observable.interval(1000)varexample=source.take(3)example.......
  • 图像分类模型AlexNet原理与实现
    图像分类模型AlexNet原理与实现作者:禅与计算机程序设计艺术1.背景介绍图像分类是计算机视觉领域的一个核心任务,其目标是将输入图像归类到预定义的类别中。随着深度学习技术的发展,基于卷积神经网络(ConvolutionalNeuralNetwork,CNN)的图像分类模型取得了突破性的进......
  • 数据结构与算法分析实验3 [进阶]通过链表实现多项式加法和乘法
    文章目录大致内容介绍多项式加法代码一览头文件Poly.h内容如下:实现文件Poly.cpp内容如下:初始化增加元素删除元素功能函数遍历函数清除销毁打印多项式向多项式内插入一个元素源文件main.cpp内容如下:实现效果:多项式乘法实现方法:在Poly.h中添加声明:在Poly.cpp中添加实现:在......
  • 列表嵌套字典实现简单通讯录功能 python
    contacts_list=[]#使用列表来存储字典,每个字典代表一个联系人whileTrue:print('1.增加2.删除break.退出')num=input('请输入选项:')ifnum=='1':name=input('请输入姓名:')phone=input('请输入电话:')......
  • “码中谜“ ConcurrentHashMap线程安全机制的弹指一挥间
    引言:ConcurrentHashMap是Java中解决并发编程问题的重要工具。它提供了线程安全的HashMap实现,并能在多线程环境下保持高性能。本文将深入ConcurrentHashMap的实现,解析其线程安全机制,并提供相关代码示例。详解ConcurrentHashMap的数据结构:ConcurrentHashMap在Java中是通过......
  • 【附源码】java毕业设计书城管理系统的设计与实现
    本系统(程序+源码)带文档lw万字以上 文末可领取本课题的JAVA源码参考系统程序文件列表系统的选题背景和意义选题背景:随着互联网技术的普及和电子商务的发展,传统的书店逐渐向线上书城转型,以满足人们日益增长的在线阅读和购书需求。一个功能全面、操作便捷、系统稳定的书城......
  • JS实现检查给定时间范围是否在每天的某个时间段内
    //解析时间字符串,返回对应的分钟数functionparseTime(timeStr){const[hours,minutes]=timeStr.split(':').map(num=>parseInt(num));returnhours*60+minutes;}//解析时间字符串,返回对应的Date对象functionparseTimeString(timeStr){const......