#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <string>
#include<signal.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#define DEFAULT_TIME 10
#define DEFAULT_STEP 15
using namespace std;
int dp[20]={1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20};
/*线程工作任务*/
typedef struct
{
void *(*function)(void *);
void *arg;
} thread_task;
/*线程池管理*/
struct pthread_pool
{
pthread_mutex_t lock; //锁整个线程池
pthread_mutex_t busylock; //锁忙线程数量
pthread_cond_t queue_not_full; //任务队列未满的信号
pthread_cond_t queue_not_empty; //任务对列不为空的信号
pthread_t *th;
pthread_t adjustth; //维护线程,任务过多或者过少时调节线程池的线程总数
int min_thr_num;
int max_thr_num;
int live_thr_num;
int busy_thr_num;
int wait_exit_num;
thread_task *queue_task; //任务队列
int queue_front;
int queue_rear;
int queue_size;
int queue_maxsize;
bool status = false;
};
void *process(void *);
void pthread_free(pthread_pool *pool) //线程池资源的释放
{
if(pool->queue_task)
{
free(pool->queue_task);
}
if(pool->th)
{
free(pool->th);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->busylock));
pthread_mutex_destroy(&(pool->busylock));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
}
void destroy_pool(pthread_pool *pool) //摧毁线程池
{
if (pool == NULL)
{
return ;
}
pool->status = false;
pthread_join((pool->adjustth), NULL);
for (int i = 0; i < pool->live_thr_num; i++)
{
pthread_cond_broadcast(&(pool->queue_not_empty));
}
for (int i = 0; i < pool->live_thr_num; i++)
{
pthread_join(pool->th[i], NULL);
}
pthread_free(pool);
}
void task_add(pthread_pool *pool, void *(*function)(void *arg), void *arg) //线程池添加任务
{
pthread_mutex_lock(&(pool->lock));
while (pool->queue_size == pool->queue_maxsize && pool->status == true)
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock)); //队列满时等待任务队列未满的信号也就是消费者取任务
}
if (pool->status == false)
{
pthread_cond_broadcast(&pool->queue_not_empty);
pthread_mutex_lock(&(pool->lock));
return ;
}
if (pool->queue_task[pool->queue_rear].arg != NULL)
{
pool->queue_task[pool->queue_rear].arg = NULL;
}
pool->queue_task[pool->queue_rear].function = function;
pool->queue_task[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_maxsize;
pool->queue_size++;
pthread_cond_signal(&pool->queue_not_empty);
pthread_mutex_unlock(&(pool->lock));
}
bool is_thread_alive(pthread_t tid) //判断线程是否存活
{
int kill_rc = pthread_kill(tid, 0); //发送0号信号,测试是否存活
if (kill_rc == ESRCH) //线程不存在
{
return false;
}
return true;
}
void *adjust(void *arg) //独立线程管理线程池的线程个数
{
struct pthread_pool *pool = (struct pthread_pool *)arg;
while (pool->status==true)
{
sleep(DEFAULT_TIME);
pthread_mutex_lock(&(pool->lock));
int live_thr_num = pool->live_thr_num;
int queue_size = pool->queue_size;
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->busylock));
int busy_thr_num = pool->busy_thr_num;
pthread_mutex_unlock(&(pool->busylock));
if (queue_size > live_thr_num && live_thr_num < pool->max_thr_num)
{
cout<<"too many task"<<endl;
int add = 0;
for (int i = 0; i < pool->max_thr_num && add < DEFAULT_STEP && pool->live_thr_num < pool->max_thr_num; i++)
{
if (pool->th[i] == 0 && !is_thread_alive(pool->th[i]))
{
pthread_create(&(pool->th[i]), NULL, process, (void *)pool);
add++;
pool->live_thr_num++;
}
}
}
if (busy_thr_num * 2 < live_thr_num && live_thr_num < pool->max_thr_num)
{
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_num = DEFAULT_STEP;
pthread_mutex_unlock(&(pool->lock));
for (int i = 0; i < DEFAULT_STEP; i++)
{
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
return NULL;
}
void *process(void *arg) //工作线程
{
struct pthread_pool *pool = (struct pthread_pool *)arg;
thread_task task;
while (true)
{
pthread_mutex_lock(&(pool->lock));
while (pool->queue_size == 0 && pool->status == true)
{
cout << pthread_self() << " thread was waiting" << endl;
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
if (pool->wait_exit_num > 0)
{
pool->wait_exit_num--;
if (pool->live_thr_num > pool->min_thr_num)
{
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
}
}
if (pool->status == false)
{
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
// cout<<"yes"<<endl;
task.function = pool->queue_task[pool->queue_front].function;
task.arg = pool->queue_task[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % pool->queue_maxsize;
pool->queue_size--;
pthread_cond_broadcast(&pool->queue_not_full);
pthread_mutex_unlock(&(pool->lock));
//pthread_cond_broadcast(&(pool->queue_not_full));
pthread_mutex_lock(&(pool->busylock));
pool->busy_thr_num++;
pthread_mutex_unlock(&(pool->busylock));
(*(task.function))(task.arg);
pthread_mutex_lock(&(pool->busylock));
pool->busy_thr_num--;
pthread_mutex_unlock(&(pool->busylock));
}
pthread_exit(NULL);
}
pthread_pool *pthreadpool_create(int min_thr_num, int max_thr_num, int queue_maxsize) //初始化线程池
{
pthread_pool *pool = NULL;
do {
pool = new pthread_pool;
if (pool == NULL)
{
break;
}
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->queue_maxsize = queue_maxsize;
pool->wait_exit_num = 0;
pool->live_thr_num = min_thr_num;
pool->busy_thr_num = 0;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->queue_size = 0;
pool->status = true;
pool->th = new pthread_t[max_thr_num];
if (pool->th == NULL)
{
break;
}
memset(pool->th, 0, sizeof(pthread_t)*max_thr_num);
pool->queue_task = new thread_task[queue_maxsize];
if (pool->queue_task == NULL)
{
break;
}
memset(pool->queue_task, 0, sizeof(thread_task)*pool->queue_maxsize);
pthread_mutex_init(&(pool->lock), NULL);
pthread_mutex_init(&(pool->busylock), NULL);
pthread_cond_init(&(pool->queue_not_empty), NULL);
pthread_cond_init(&(pool->queue_not_full), NULL);
for (int i = 0; i < min_thr_num; i++)
{
pthread_create(&pool->th[i], NULL, process, (void *)pool);
cout << pool->th[i] << " thread was created" << endl;
}
pthread_create(&pool->adjustth, NULL, adjust, (void *)pool);
return pool;
} while (0);
return NULL;
}
void *dowork(void *arg) //模拟任务
{
int *a = (int* )arg;
sleep(*a);
cout<<"**************************"<<"第"<<*a<<"次执行任务"<<endl;
for (int i =1; i <=*a; i++) {
cout << "hello world" << endl;
}
cout<<"*****************************任务结束"<<*a<<endl<<endl;
return NULL;
}
int main()
{
pthread_pool *pool = pthreadpool_create(10, 100, 100);
for(int i=0;i<=19;i++)
{
task_add(pool, dowork, (void *)&dp[i]);
}
sleep(21);
destroy_pool(pool);
system ("pause");
return 0;
}
标签:基于,lock,linux,thr,queue,num,线程,pthread,pool
From: https://blog.csdn.net/weixin_72492465/article/details/139267652