EventLoopThread是事件循环线程,包含一个Thread对象,一个EventLoop对象。在构造函数中,把EventLoopThread::threadFunc
注册到Thread对象中(线程启动时会回调)。
EventLoopThreadPool是事件循环线程池,管理所有客户端连接,每个线程都有唯一一个事件循环。可以调用setThreadNum设置线程的数目。
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/EventLoop.h>
using namespace muduo;
using namespace muduo::net;
/********************************************************************
Modify : Eric Lee
Date : 2018-01-21
Description : 这个类专门创建一个线程用于执行Reactor的事件循环,当然
这只是一个辅助类,没有说一定要使用它,可以根据自己的情况进行选择,
你也可以不创建线程去执行事件循环,而在主线程中执行事件循环,
一切根据自己的需要。
EventLoopThread(也叫IO线程)的工作流程为:
1、在主线程(暂且这么称呼)创建EventLoopThread对象。
2、主线程调用EventLoopThread.start(),启动EventLoopThread中的线程
(称为IO线程),这是主线程要等待IO线程创建完成EventLoop对象。
3、IO线程调用threadFunc创建EventLoop对象。通知主线程已经创建完成。
4、主线程返回创建的EventLoop对象。
*********************************************************************/
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
const string& name)
: loop_(NULL),
exiting_(false),
thread_(std::bind(&EventLoopThread::threadFunc, this), name),
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
EventLoopThread::~EventLoopThread()
{
exiting_ = true;
if (loop_ != NULL)
{
loop_->quit();
thread_.join();
}
}
/********************************************************************
Modify : Eric Lee
Date : 2018-01-16
Description : 启动一个EventLoop线程。
*********************************************************************/
EventLoop* EventLoopThread::startLoop()
{
assert(!thread_.started());
// 当前线程启动,调用threadFunc()
thread_.start();
EventLoop* loop = NULL;
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
// 等待创建好当前IO线程
cond_.wait();
}
loop = loop_;
}
return loop;
}
void EventLoopThread::threadFunc()
{
EventLoop loop;
// 如果有初始化函数,就先调用初始化函数
if (callback_)
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
loop_ = &loop;
// 通知startLoop线程已经启动完毕
cond_.notify();
}
// 事件循环
loop.loop();
MutexLockGuard lock(mutex_);
loop_ = NULL;
}
#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
/********************************************************************
Modify : Eric Lee
Date : 2018-01-21
Description : eventloopthreadpool是基于muduo库中Tcpserver这个类专门
做的一个线程池,它的模式属于半同步半异步,线程池中每一个线程都有一个
自己的eventloop,而每一个eventloop底层都是一个poll或者epoll,它利用了
自身的poll或者epoll在没有事件的时候阻塞住,在有事件发生的时候,epoll
监听到了事件就会去处理事件。
*********************************************************************/
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
: baseLoop_(baseLoop),
name_(nameArg),
started_(false),
numThreads_(0),
next_(0)
{
}
EventLoopThreadPool::~EventLoopThreadPool()
{
// Don't delete loop, it's stack variable
}
/********************************************************************
Modify : Eric Lee
Date : 2018-01-21
Description : 启动EventLoop线程池。
*********************************************************************/
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
// 创建一个EventLoop线程
EventLoopThread* t = new EventLoopThread(cb, buf);
// 放入threads_(EventLoop线程池)
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
// 启动每个EventLoopThread线程进入startLoop(),并且把返回的每个EventLoop指针压入到loops_
loops_.push_back(t->startLoop());
}
if (numThreads_ == 0 && cb)
{
// 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb
cb(baseLoop_);
}
}
/********************************************************************
Modify : Eric Lee
Date : 2018-01-21
Description : 使用round-robin(轮询调度)算法,从EventLoop线程池中
选择一个EventLoop。
*********************************************************************/
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_->assertInLoopThread();
assert(started_);
EventLoop* loop = baseLoop_;
/************************************************************************
轮询调度算法的原理是每一次把来自用户的请求轮流分配给内部中的服务器,从1
开始,直到N(内部服务器个数),然后重新开始循环。轮询调度算法假设所有服务器
的处理性能都相同,不关心每台服务器的当前连接数和响应速度。当请求服务间隔
时间变化比较大时,轮询调度算法容易导致服务器间的负载不平衡。
*************************************************************************/
if (!loops_.empty())
{
// round-robin(轮询调度),next_记录的是下一个空闲的EventLoop。
loop = loops_[next_];
++next_;
if (implicit_cast<size_t>(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
/********************************************************************
Modify : Eric Lee
Date : 2018-01-21
Description : 通过哈希法从EventLoop线程池中选择一个EventLoop。
*********************************************************************/
EventLoop* EventLoopThreadPool::getLoopForHash(size_t hashCode)
{
baseLoop_->assertInLoopThread();
EventLoop* loop = baseLoop_;
if (!loops_.empty())
{
loop = loops_[hashCode % loops_.size()];
}
return loop;
}
/********************************************************************
Modify : Eric Lee
Date : 2018-01-21
Description : 获得所有的EventLoop。
*********************************************************************/
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
baseLoop_->assertInLoopThread();
assert(started_);
if (loops_.empty())
{
return std::vector<EventLoop*>(1, baseLoop_);
}
else
{
return loops_;
}
}