介绍
- 异步线程只有一个,由主线程充当,它负责监听所有 socket 上的事件
- 如果监听 socket 上发生读事件(有新的连接请求到来),主线程就接受得到新的连接 socket,然后往 epoll 内核事件表上注册该连接 socket 上的读写事件
- 如果连接 socket 上发生读写事件(客户端和服务端有数据传输),主线程将该连接 socket 插入请求队列
- 所有工作线程都睡眠在请求队列上,当有任务到来时,他们通过竞争获得任务接管权
代码
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "locker.h"
// 定义成模板类是为了代码复用,T 是任务的类型
template <typename T>
class threadpool {
public:
threadpool(int thread_number = 8, int max_requests = 10000);
~threadpool();
bool append(T *request); // 往请求队列中添加任务
private:
static void * worker(void *arg); // 工作线程运行的函数
void run();
private:
pthread_t *m_threads; // 描述线程池的数组
int m_thread_number; // 线程池中的线程数
std::list<T> m_workqueue; // 请求队列
int m_max_requests; // 请求队列中允许的最大请求数
locker m_queuelocker; // 保护请求队列的互斥锁
sem m_queuestat; // 是否有任务需要处理
bool m_stop; // 是否结束线程
};
template <typename T>
threadpool<T>::threadpool(int thread_number, int max_requests) :
m_threads(nullptr),
m_thread_number(thread_number),
m_max_requests(max_requests),
m_stop(false)
{
if (thread_number <= 0 || max_requests <= 0) {
throw std::exception();
}
m_threads = new pthread_t[m_thread_number];
if (m_threads == nullptr) {
throw std::exception();
}
// 创建 m_thread_number 个线程,并将他们设置成脱离线程
for (int i = 0; i < m_thread_number; i++) {
printf("create the %dth thread\n", i);
if (pthread_create(m_threads + i, nullptr, worker, (void *)this) != 0) {
delete [] m_threads;
throw std::exception();
}
if (pthread_detach(m_threads[i]) != 0) {
delete [] m_threads;
throw std::exception();
}
}
}
template <typename T>
threadpool<T>::~threadpool()
{
delete [] m_threads;
m_stop = true;
}
template <typename T>
bool threadpool<T>::append(T *request)
{
m_queuelocker.lock();
if (m_workqueue.size() > m_max_requests) {
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template <typename T>
void * threadpool<T>::worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template <typename T>
void threadpool<T>::run()
{
while (!m_stop) {
m_queuestat.wait();
m_queuelocker.lock();
if (m_workqueue.empty()) {
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if (request == nullptr)
continue;
request->process(); // 任务的处理函数
}
}
#endif
注意事项
- 每个工作线程调用 worker 函数,worker 函数再调用 run 函数,为什么不直接 pthread_create(m_threads + i, nullptr, run, nullptr)?
- C++ 类的非静态成员函数第一个参数是隐含的 this 指针,所以 run 函数的参数个数和所需不匹配
- 而静态函数成员不含 this 指针
- 如何在静态函数成员中使用类的动态成员?
- 将类的对象作为参数传递给该静态函数,然后在静态函数中引用这个对象,并调用其动态方法
- 通过类的静态对象来调用。比如单例模式中,静态函数可以通过类的全局唯一实例来访问动态成员函数