首页 > 系统相关 >Linux网络通信(线程池和线程池版本的服务器代码)

Linux网络通信(线程池和线程池版本的服务器代码)

时间:2022-11-11 19:35:19浏览次数:53  
标签:网络通信 task Linux 任务 线程 pthread cond pool

线程池

介绍

线程池: 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量

线程池的价值:

  1. 需要大量的线程来完成任务,且完成任务的时间比较短。可同时处理多任务,多请求。
  2. 有任务可以立即从线程池中调取线程取处理,节省了线程创建的时间
  3. 有效防止服务端线程过多而导致系统过载的问题

实现

线程池中首先需要有很多个线程,用户可以自己选择创建多少个线程。为了实现线程间的同步与互斥,还需要增加两个变量——互斥量和条件变量。我们还需要一个任务队列,主线程不断往里面塞任务,线程池的线程不断去处理。需要注意的是:这里的任务队列可以为空,但不能满,所以任务队列的容量不限定(实际场景中,任务队列容量不够就需要考虑换一台更大的服务器)

线程池的四个成员变量:

  • 一个队列: 存放任务
  • 线程池中线程数: 记录线程池中创建的线程数
  • 互斥量: 一个互斥锁
  • 条件变量: 两个条件变量

线程池:首先需要创建几个线程,还有一个任务队列,当任务队列有任务的时候就唤醒一个正在等待的线程,让线程去执行任务,线程池中的线程执行完任务不会销毁,大大减少的cpu的消耗。

需要两个条件变量和一个互斥锁,这个互斥锁用来锁住任务队列,因为任务队列是公共资源,其次还需要两个条件变量,一个条件变量用来阻塞取任务的线程,当队列中有任务的时候,直接取任务,然后解锁,当任务队列中没有任务的时候,解锁等待条件,条件满足抢锁,取任务,解锁。另一个条件变量用来阻塞添加者进程,当任务队列满了,会让添加者进程等待,当有线程取走一个任务的时候,会唤醒添加者进程。

版本一

任务函数

这里的任务函数采用的时回调函数的方式,提高了代码的通用性,可以根据自己的需求改写任务函数

//任务回调函数
void taskRun(void *arg)
{
      PoolTask *task = (PoolTask*)arg;
      int num = task->tasknum;
      printf("task %d is runing %lu\n",num,pthread_self());
  
      sleep(1);
      printf("task %d is done %lu\n",num,pthread_self());
}

线程池的主要代码框架

class ThreadPool
{
public:
    //构造函数,初始化线程池
    ThreadPool(int thrnum, int maxtasknum)
    {
    }
   static void* thrRun(void* arg)
    {     
    }
   //析构函数,摧毁线程池
   ~ThreadPool()
   {
   }
public:
   //添加任务到线程池
   void addtask(){};
private:
   //任务队列相关的参数
   int max_job_num;//最大任务个数
   int job_num;//实际任务个数
   PoolTask *tasks;//任务队列数组
   int job_push;//入队位置
   int job_pop;// 出队位置
 
   //线程相关参数
   int thr_num;//线程池内线程个数
   pthread_t *threads;//线程池内线程数组
   int shutdown;//是否关闭线程池
   pthread_mutex_t pool_lock;//线程池的锁
   pthread_cond_t empty_task;//任务队列为空的条件
   pthread_cond_t not_empty_task;//任务队列不为空的条件
};

放任务: 主线程无脑往任务队列中塞任务,塞任务之前进行加锁,塞完任务解锁,如果任务队列已经满了,等待线程取任务,然后唤醒在条件变量下等待的队列;放入了任务就给线程发送信号,唤醒线程来取
取任务: 线程池中的线程从任务队列中取任务,需要对任务队列上锁,因为对公共资源的操作都需要上锁,如果没有任务就阻塞,等待放任务唤醒;如果取完了一个任务,就唤醒添加任务

这就是两个条件变量和一个互斥锁的用法

//添加任务到线程池
   void addtask()
   {
     pthread_mutex_lock(&(this->pool_lock));
     //实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
     while(this->max_job_num <= this->job_num)
     {
          pthread_cond_wait(&(this->empty_task),&(this->pool_lock));
     }
 
     int taskpos = (this->job_push++)%this->max_job_num;

     this->tasks[taskpos].tasknum = beginnum++;
     this->tasks[taskpos].arg = (void*)&this->tasks[taskpos];
     this->tasks[taskpos].task_func = taskRun;
     this->job_num++;

     pthread_mutex_unlock(&(this->pool_lock));
 
     pthread_cond_signal(&(this->not_empty_task));//通知包身工
   }
  //取任务
  static void* thrRun(void* arg)
    {
      ThreadPool *pool = (ThreadPool*)arg;
      int taskpos = 0;//任务位置
      PoolTask *task = new PoolTask();
      while(1)
      {
          //获取任务,先要尝试加锁
          pthread_mutex_lock(&pool->pool_lock);
 
          //无任务并且线程池不是要摧毁
          while(pool->job_num <= 0 && !pool->shutdown )
          {
              //如果没有任务,线程会阻塞
              pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock);
          }
          if(pool->job_num)
          {
              //有任务需要处理
              taskpos = (pool->job_pop++)%pool->max_job_num;
              //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
              //为什么要拷贝?避免任务被修改,生产者会添加任务
              memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask));
              task->arg = task;
              pool->job_num--;
              //task = &thrPool->tasks[taskpos];
              pthread_cond_signal(&pool->empty_task);//通知生产者
          }
  
          if(pool->shutdown)
          {
              //代表要摧毁线程池,此时线程退出即可
              //pthread_detach(pthread_self());//临死前分家
              pthread_mutex_unlock(&pool->pool_lock);
              delete(task);
              pthread_exit(NULL);
          }
       //释放锁
       pthread_mutex_unlock(&pool->pool_lock);
       task->task_func(task->arg);//执行回调函数
       } 
     }

整体代码:

#include<iostream>
#include<string.h>
#include<pthread.h>
#include<sys/types.h>
#include<stdio.h>
#include<unistd.h>
using namespace std;
int beginnum = 1;

class PoolTask
{
public:
    int tasknum;//模拟任务编号
    void *arg;//回调函数参数
    void (*task_func)(void *arg);//任务的回调函数
};
//任务回调函数
void taskRun(void *arg)
{
      PoolTask *task = (PoolTask*)arg;
      int num = task->tasknum;
      printf("task %d is runing %lu\n",num,pthread_self());
  
      sleep(1);
      printf("task %d is done %lu\n",num,pthread_self());
}
class ThreadPool
{
public:
    //构造函数,初始化线程池
    ThreadPool(int thrnum, int maxtasknum)
    {
      this->thr_num = thrnum;
      this->max_job_num = maxtasknum;
      this->shutdown = 0;//是否摧毁线程池,1代表摧毁
      this->job_push = 0;//任务队列添加的位置
      this->job_pop = 0;//任务队列出队的位置
      this->job_num = 0;//初始化的任务个数为0
  
      //申请最大的任务队列
      this->tasks = new PoolTask[thrnum];
 
      //初始化锁和条件变量
      pthread_mutex_init(&(this->pool_lock),NULL);
      pthread_cond_init(&(this->empty_task),NULL);
      pthread_cond_init(&(this->not_empty_task),NULL);
  
      int i = 0;
      this->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间
  
      pthread_attr_t attr;
      pthread_attr_init(&attr);
      pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
      for(i = 0;i < thrnum;i++)
      {
          pthread_create(&(this->threads[i]),&attr,thrRun,this);//创建多个线程
      }
    }
   static void* thrRun(void* arg)
    {
      ThreadPool *pool = (ThreadPool*)arg;
      int taskpos = 0;//任务位置
      PoolTask *task = new PoolTask();
      while(1)
      {
          //获取任务,先要尝试加锁
          pthread_mutex_lock(&pool->pool_lock);
 
          //无任务并且线程池不是要摧毁
          while(pool->job_num <= 0 && !pool->shutdown )
          {
              //如果没有任务,线程会阻塞
              pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock);
          }
          if(pool->job_num)
          {
              //有任务需要处理
              taskpos = (pool->job_pop++)%pool->max_job_num;
              //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
              //为什么要拷贝?避免任务被修改,生产者会添加任务
              memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask));
              task->arg = task;
              pool->job_num--;
              //task = &thrPool->tasks[taskpos];
              pthread_cond_signal(&pool->empty_task);//通知生产者
          }
  
          if(pool->shutdown)
          {
              //代表要摧毁线程池,此时线程退出即可
              //pthread_detach(pthread_self());//临死前分家
              pthread_mutex_unlock(&pool->pool_lock);
              delete(task);
              pthread_exit(NULL);
          }
       //释放锁
       pthread_mutex_unlock(&pool->pool_lock);
       task->task_func(task->arg);//执行回调函数
       }
     }
   //析构函数,摧毁线程池
   ~ThreadPool()
   {
     this->shutdown = 1;//关闭线程池
     pthread_cond_broadcast(&(this->not_empty_task));//诱杀
 
     int i = 0;
     for(i = 0; i<this->thr_num ; i++)
     {
         pthread_join(this->threads[i],NULL);
     }
 
     pthread_cond_destroy(&(this->not_empty_task));
     pthread_cond_destroy(&(this->empty_task));
     pthread_mutex_destroy(&(this->pool_lock));
     delete []tasks;
     tasks = NULL;
     free(this->threads);
   }
 
public:
   //添加任务到线程池
   void addtask()
   {
     pthread_mutex_lock(&(this->pool_lock));
     cout << "当前任务队列中任务的个数是: " <<this-> job_num <<endl; 
     //实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
     while(this->max_job_num <= this->job_num)
     {
          pthread_cond_wait(&(this->empty_task),&(this->pool_lock));
     }
 
     int taskpos = (this->job_push++)%this->max_job_num;

     this->tasks[taskpos].tasknum = beginnum++;
     this->tasks[taskpos].arg = (void*)&this->tasks[taskpos];
     this->tasks[taskpos].task_func = taskRun;
     this->job_num++;

     pthread_mutex_unlock(&(this->pool_lock));
 
     pthread_cond_signal(&(this->not_empty_task));//通知包身工
   }
private:
   //任务队列相关的参数
   int max_job_num;//最大任务个数
   int job_num;//实际任务个数
   PoolTask *tasks;//任务队列数组
   int job_push;//入队位置
   int job_pop;// 出队位置
 
   //线程相关参数
   int thr_num;//线程池内线程个数
   pthread_t *threads;//线程池内线程数组
   int shutdown;//是否关闭线程池
   pthread_mutex_t pool_lock;//线程池的锁
   pthread_cond_t empty_task;//任务队列为空的条件
   pthread_cond_t not_empty_task;//任务队列不为空的条件
};
int main()
{
   ThreadPool *m = new ThreadPool(3,20);
   int j = 0;
   for(j=0;j<20;j++)
   {
     m->addtask();
   }
   sleep(20);
   delete m;
   m = NULL;
   system("pause");
   return EXIT_SUCCESS;
}

运行结果如下:

可以看到线程最多处理三个任务,而任务队列中最多可以存在20个任务,当线程取走了任务之后,唤醒生产者继续添加任务。

版本二

首先封装一个任务:

class Task
{
public:
  Task(int a = 0, int b = 0)
    :_a(a)
    ,_b(b)
  {}
  void Run()
  {
    //执行的任务可以自己编写
  }
private:
  int _a;
  int _b;
};

线程池的主要代码框架(唤醒和等待操作都已经封装好):

#define DEFAULT_MAX_PTHREAD 5

class ThreadPool
{
public:
  ThreadPool(int max_pthread = DEFAULT_MAX_PTHREAD)
    :_max_thread(max_pthread)
  {}
  ~ThreadPool()
  {
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_cond);
  }
public:
  void LockQueue()
  {
    pthread_mutex_lock(&_mutex);
  }
  void UnlockQueue()
  {
    pthread_mutex_unlock(&_mutex);
  }
  void ThreadWait()
  {
    pthread_cond_wait(&_cond, &_mutex);
  }
  void WakeUpThread()
  {
    pthread_cond_signal(&_cond);
    //pthread_cond_broadcast(&_cond);
  }
  bool IsEmpty()
  {
    return _q.empty();
  } 
private:
  queue<Task*>  _q;
  int  _max_thread;
  pthread_mutex_t _mutex;
  pthread_cond_t  _cond;
};

创建多个线程
创建多个线程可以用一个循环进行创建。需要注意的是,创建一个线程还需要提供一个线程启动后要执行的函数,这个启动函数只能有一个参数。如果把这个函数设置为成员函数,那么这个函数的第一个参数默认是this指针,这样显然是不可行的,所以这里我们考虑把这个启动函数设置为静态的。但是设置为静态的成员函数又会面临一个问题:如何调用其他成员函数和成员变量? 所以这里我们考虑创建线程的时候,把this指针传过去,让启动函数的arg 参数去接收即可

static void* Runtine(void* arg)
{
  pthread_detach(pthread_self());
  ThreadPool* this_p = (ThreadPool*)arg;

  while (1){
    this_p->LockQueue();
    while (this_p->IsEmpty()){
      this_p->ThreadWait();
    }
    Task* t;
    this_p->Get(t);
    this_p->UnlockQueue();
    // 解锁后处理任务
    t->Run();
    delete t;
  }
}
void ThreadPoolInit()
{
  pthread_mutex_init(&_mutex, nullptr);
  pthread_cond_init(&_cond, nullptr);
  pthread_t t[_max_thread];
  for(int i = 0; i < _max_thread; ++i)
  {
    pthread_create(t + i, nullptr, Runtine, this);
  }
}

注意: 线程创建后,执行启动函数,在这个函数中,线程会去任务队列中取任务并处理,取任务前需要进行加锁的操作(如果队列为空需要挂起等待),取完任务然后进行解锁,然后处理任务,让其它线程去任务队列中取任务

放任务: 主线程无脑往任务队列中塞任务,塞任务之前进行加锁,塞完任务解锁,然后唤醒在条件变量下等待的队列
取任务: 线程池中的线程从任务队列中取任务,这里不需要加锁,因为这个动作在启动函数中加锁的那一段区间中被调用的,其实已经上锁了

// 放任务
void Put(Task* data)
{
  LockQueue();
  _q.push(data);
  UnlockQueue();
  WakeUpThread();
}
// 取任务
void Get(Task*& data)
{
  data = _q.front();
  _q.pop();
}

这两个版本都可以实现简易的线程池,下面线程池版本的服务器主要是用版本二来实现,因为版本一要修改的内容有点多,小伙伴们可以自己修改一下

线程池版本服务器

多线程版本效果看起来还不错,但是来一个连接就创建一个线程,断开一个连接就释放一个线程,这样频繁地创建和释放线程资源,对OS来说是一种负担,同时也带来资源的浪费,如果我们使用线程池,把每一个客户端连接封装成一个任务,让线程池去处理,这样就不需要频繁地创建和销毁消除,效率也能提升很多。
线程池采用版本二,代码如下:

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "Task.hpp"

#define DEFAULT_MAX_PTHREAD 5

class ThreadPool
{
public:
  ThreadPool(int max_pthread = DEFAULT_MAX_PTHREAD)
    :_max_thread(max_pthread)
  {}
  static void* Runtine(void* arg)
  {
    pthread_detach(pthread_self());
    ThreadPool* this_p = (ThreadPool*)arg;

    while (1){
      this_p->LockQueue();
      while (this_p->IsEmpty()){
        this_p->ThreadWait();
      }
      Task* t;
      this_p->Get(t);
      this_p->UnlockQueue();
      // 解锁后处理任务
      t->Run();
      delete t;
    }
  }
  void ThreadPoolInit()
  {
    pthread_mutex_init(&_mutex, nullptr);
    pthread_cond_init(&_cond, nullptr);
    pthread_t t[_max_thread];
    for(int i = 0; i < _max_thread; ++i)
    {
      pthread_create(t + i, nullptr, Runtine, this);
    }
  }
  void Put(Task* data)
  {
    LockQueue();
    _q.push(data);
    UnlockQueue();
    WakeUpThread();
  }
  void Get(Task*& data)
  {
    data = _q.front();
    _q.pop();
  }
  ~ThreadPool()
  {
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_cond);
  }
public:
  void LockQueue()
  {
    pthread_mutex_lock(&_mutex);
  }
  void UnlockQueue()
  {
    pthread_mutex_unlock(&_mutex);
  }
  void ThreadWait()
  {
    pthread_cond_wait(&_cond, &_mutex);
  }
  void WakeUpThread()
  {
    pthread_cond_signal(&_cond);
    //pthread_cond_broadcast(&_cond);
  }
  bool IsEmpty()
  {
    return _q.empty();
  } 
private:
  std::queue<Task*>  _q;
  int             _max_thread;
  pthread_mutex_t _mutex;
  pthread_cond_t  _cond;
};

这里我们单独写一个头文件——Task.hpp,其中有任务类,任务类里面有三个成员变量,也就是端口号,IP和套接字,其中有一个成员方法——Run,里面封装了一个Service函数,也就是前面写的,把它放在Task.hpp这个头文件下,线程池里面的线程执行run函数即可,头文件内容如下:

#pragma once
#include <iostream>
#include <unistd.h>

static void Service(std::string ip, int port, int sock)
{
  while (1){
    char buf[256];
    ssize_t size = read(sock, buf, sizeof(buf)-1);
    if (size > 0){
      // 正常读取size字节的数据
      buf[size] = 0;
      std::cout << "[" << ip << "]:[" << port  << "]# "<< buf << std::endl;
      std::string msg = "server get!-> ";
      msg += buf;
      write(sock, msg.c_str(), msg.size());
    }
    else if (size == 0){
      // 对端关闭
      std::cout << "[" << ip << "]:[" << port  << "]# close" << std::endl;
      break;
    }
    else{
      // 出错
      std::cerr << sock << "read error" << std::endl; 
      break;
    }
  }

  close(sock);
  std::cout << "service done" << std::endl;
}

struct Task
{
  int _port;
  std::string _ip;
  int _sock;

  Task(int port, std::string ip, int sock)
    :_port(port)
    ,_ip(ip)
     ,_sock(sock)
  {}
  void Run()
  {
      Service(_ip, _port, _sock);
  }
};

服务器类的核心代码如下:

void loop()
{
  struct sockaddr_in peer;// 获取远端端口号和ip信息
  socklen_t len = sizeof(peer);
  _tp = new ThreadPool(THREAD_NUM); 
  _tp->ThreadPoolInit();
  while (1){
    // 获取链接
    // sock 是进行通信的一个套接字  _listen_sock 是进行监听获取链接的一个套接字
    int sock = accept(_listen_sock, (struct sockaddr*)&peer, &len);
    if (sock < 0){
      std::cout << "accept fail, continue accept" << std::endl;
      continue;
    }
    int peerPort = ntohs(peer.sin_port);
    std::string peerIp = inet_ntoa(peer.sin_addr);
    std::cout << "get a new link, [" << peerIp << "]:[" << peerPort  << "]"<< std::endl;
    Task* task = new Task(peerPort, peerIp, sock);
    _tp->Put(task);

  }
}

注意几点变化:

  1. 服务器类增加一个线程池成员变量,初始化函数里面增加线程池创建(在堆上申请)
  2. 析构函数增加释放线程池资源一步
  3. loop函数中只需要封装任务,并把任务丢进线程池中即可

标签:网络通信,task,Linux,任务,线程,pthread,cond,pool
From: https://www.cnblogs.com/yzsn12138/p/16881522.html

相关文章

  • Linux Centos7 部署步骤 mysql
    0.首先查看cpu架构uname-a输出内容中有关键词ARM或aarch64就是ARM架构,有关键词x86_64就是X86架构1.下载mysql8(https://dev.mysql.com/)选择 RedHat......
  • 新手必备的Linux命令
    新手必备的Linux命令,虽然平时大部分工作都是和Java相关的开发,但是每天都会接触Linux系统,尤其是使用了Mac之后,每天都是工作在黑色背景的命令行环境中.自己记忆力不好,......
  • 如何在Linux中查找进程
    大多数Linux用户使用预装的默认系统监控工具来检查内存、CPU使用率等。在Linux中,许多应用程序作为守护进程在系统后台运行,这会消耗更多的系统资源。在Linux中,您可以......
  • 使用final shell 连接使用 ubuntu server linux
    书接上回,VM安装ubuntuserver:https://www.cnblogs.com/runliuv/p/16880599.html1.从https://www.hostbuf.com/下载FinalShellSSH工具,并安装。2.点击左上角文件夹图标,......
  • Linux下(CentOS)下Docker的安装
    Linux下(CentOS)下Docker的安装以及阿里云加速器的配置什么是Docker?Docker是一个开源的应用容器引擎。让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中,然后发布到......
  • Using debugStub to debug a guest linux kernel
    UsingdebugStubtodebugaguestlinuxkernelIamrunningVMwareFusionVersion6.0.2(1398658)IhaveconfiguredthefollowinginmyvmxdebugStub.listen.gue......
  • 将windows目录共享,并linux访问
    设置windows下的共享目录:在windows下,打开网络和共享中心,点击更改高级共享设置选择公用,在密码保护的共享选项中点击关闭密码保护共享选择需要共享的文件夹或磁......
  • 电影推荐系统项目实战:环境配置与安装:-----Linux环境下 ElasticSearch(单节点)环境配置
    1通过WGET下载压缩包:wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.2.tar.gz  2.修改Linux配置参数:在文件末尾添加:sudo......
  • 步入Linux的现代方法
    目录0:Linux的初步认识0.0系统的认识0.1Linux操作系统认识,以及开源的提出:Linux的千奇百怪的版本0.2开源的含义0.3Linux的用途,各类发行版本详见:​​0:Linux的初步认识-......
  • LINUX CENTOS7 部署步骤 Ftp
    0.linux中FTP服务叫做vsftpd1.查看ftp是否安装rpm-qa|grepftp2.查找vsftpyumlist|grepvsftp3.安装vsftpdyum-yinstallvsftpd4.验证是否安装......