首页 > 其他分享 >【muduo】net篇---EventLoop

【muduo】net篇---EventLoop

时间:2023-08-29 12:33:08浏览次数:31  
标签:cb EventLoop --- _- 线程 net include channel


EventLoop类调用Poller::poll()进行I/O复用,返回活跃事件列表,然后遍历该列表,依次调用每一个活跃Channel的事件处理函数handleEvent(),最终回调了TcpConnection注册过来的函数

#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/Channel.h>
#include <muduo/net/Poller.h>
#include <muduo/net/SocketsOps.h>
#include <muduo/net/TimerQueue.h>

#include <algorithm>

#include <signal.h>
#include <sys/eventfd.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

namespace
{
__thread EventLoop* t_loopInThisThread = 0;

const int kPollTimeMs = 10000;

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-16
Description : eventfd是一种线程间通信机制。简单来说eventfd就是一个
文件描述符,它引用了一个内核维护的eventfd object,是uint64_t类型,
也就是8个字节,可以作为counter。支持read,write,以及有关epoll等操作。
*********************************************************************/
int createEventfd()
{
  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  if (evtfd < 0)
  {
    LOG_SYSERR << "Failed in eventfd";
    abort();
  }
  return evtfd;
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-21
Description : SIGPIPE的默认行为是终止进程,在命令行程序中这是合理的,
但是在网络编程中,这以为这如果对方断开连接而本地继续写入的话,会造成
服务进程意外退出。假如服务进程繁忙,没有及时处理对方断开连接的事件,
就有可能出现在连接断开之后继续发送数据的情况。这样的情况是需要避免的。
*********************************************************************/
#pragma GCC diagnostic ignored "-Wold-style-cast"
class IgnoreSigPipe
{
 public:
  IgnoreSigPipe()
  {
    ::signal(SIGPIPE, SIG_IGN);
  }
};
#pragma GCC diagnostic error "-Wold-style-cast"

IgnoreSigPipe initObj;
}

EventLoop* EventLoop::getEventLoopOfCurrentThread()
{
  return t_loopInThisThread;
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-21
Description : 创建轮询器(Poller),创建用于传递消息的管道,初始化
各个部分,然后进入一个无限循环,每一次循环中调用轮询器的轮询函数
(等待函数),等待事件发生,如果有事件发生,就依次调用套接字
(或其他的文件描述符)的事件处理器处理各个事件,然后调用投递的
回调函数;接着进入下一次循环。
*********************************************************************/
EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    iteration_(0),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    timerQueue_(new TimerQueue(this)),
    wakeupFd_(createEventfd()),
    wakeupChannel_(new Channel(this, wakeupFd_)),
    currentActiveChannel_(NULL)
{ 
  LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }
  // 设置唤醒事件处理器的读回调函数为handleRead
  wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
  // 启用读功能
  wakeupChannel_->enableReading();
}

EventLoop::~EventLoop()
{
  LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
            << " destructs in thread " << CurrentThread::tid();
  wakeupChannel_->disableAll();
  wakeupChannel_->remove();
  ::close(wakeupFd_);
  t_loopInThisThread = NULL;
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-16
Description : 事件循环。在某线程中实例化EventLoop对象,这个线程
就是IO线程,必须在IO线程中执行loop()操作,在当前IO线程中进行
updateChannel,在当前线程中进行removeChannel。
*********************************************************************/
void EventLoop::loop()
{
  assert(!looping_);
  assertInLoopThread();
  looping_ = true;
  quit_ = false;
  LOG_TRACE << "EventLoop " << this << " start looping";
  while (!quit_)
  {
    activeChannels_.clear();
    // 调用poll获得活跃的channel(activeChannels_)
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    // 增加Poll次数
    ++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // 事件处理开始
    eventHandling_ = true;
    // 遍历活跃的channel,执行每一个channel上的回调函数
    for (Channel* channel : activeChannels_)
    {
      currentActiveChannel_ = channel;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    // 事件处理结束
    eventHandling_ = false;
    // 处理用户在其他线程注册给IO线程的事件
    doPendingFunctors(); 
  }
  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-21
Description : 结束EventLoop线程。
*********************************************************************/
void EventLoop::quit()
{
  quit_ = true;
  if (!isInLoopThread())
  {
    wakeup();
  }
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-21
Description : EventLoop线程除了等待poll、执行poll返回的激活事件,
还可以处理一些其他任务,例如调用某一个回调函数,处理其他EventLoop
对象的,调用EventLoop::runInLoop(cb)即可让EventLoop线程执行cb函数。

假设我们有这样的调用:loop->runInLoop(run),说明想让IO线程执行一定
的计算任务,此时若是在当前的IO线程,就马上执行run();如果是其他线程
调用的,那么就执行queueInLoop(run),将run异步添加到队列,当loop内处理
完事件后,就执行doPendingFunctors(),也就执行到了run();最后想要结束
线程的话,执行quit。
*********************************************************************/
void EventLoop::runInLoop(Functor cb)
{
  // 如果当前线程是EventLoop线程则立即执行,否则放到任务队列中,异步执行
  if (isInLoopThread())
  {
    cb();
  }
  else
  {
    queueInLoop(std::move(cb));
  }
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-21
Description : 任务队列。
*********************************************************************/
void EventLoop::queueInLoop(Functor cb)
{
  {
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(std::move(cb));
  }
  // 如果当前线程不是EventLoop线程,或者正在执行pendingFunctors_中的任务,
  // 都要唤醒EventLoop线程,让其执行pendingFunctors_中的任务。
  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}

size_t EventLoop::queueSize() const
{
  MutexLockGuard lock(mutex_);
  return pendingFunctors_.size();
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-23
Description : 以下三个函数是设置定时器的回调函数。
*********************************************************************/
// 在指定的时间调用TimerCallback
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
  return timerQueue_->addTimer(std::move(cb), time, 0.0);
}

// 等一段时间之后调用TimerCallback
TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{
  Timestamp time(addTime(Timestamp::now(), delay));
  return runAt(time, std::move(cb));
}

// 以固定的时间反复调用TimerCallback
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
  Timestamp time(addTime(Timestamp::now(), interval));
  return timerQueue_->addTimer(std::move(cb), time, interval);
}

// 取消Timer
void EventLoop::cancel(TimerId timerId)
{
  return timerQueue_->cancel(timerId);
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-21
Description : 更新Channel,实际上是调用poller_->updateChannel()。
*********************************************************************/
void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel);
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-20
Description : EventLoop对象中的poller对象也持有Channel对象的指针,
所以需要将channel对象安全的从poller对象中移除。
*********************************************************************/
void EventLoop::removeChannel(Channel* channel)
{
  // 每次间接的调用的作用就是将需要改动的东西与当前调用的类撇清关系
  assert(channel->ownerLoop() == this);
  // 如果没有在loop线程调用直接退出
  assertInLoopThread();
  // 判断是否在事件处理状态。判断当前是否在处理这个将要删除的事件以及活动的事件表中是否有这个事件
  if (eventHandling_)
  {
    assert(currentActiveChannel_ == channel ||
        std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
  }
  poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  return poller_->hasChannel(channel);
}

void EventLoop::abortNotInLoopThread()
{
  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
            << " was created in threadId_ = " << threadId_
            << ", current thread id = " <<  CurrentThread::tid();
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-21
Description : 使用eventfd唤醒。
*********************************************************************/
void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

void EventLoop::handleRead()
{
  uint64_t one = 1;
  ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
  }
}

/******************************************************************** 
Modify : Eric Lee
Date : 2018-01-21
Description : 执行任务队列中的任务。
*********************************************************************/
void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;
  {
  MutexLockGuard lock(mutex_);
  functors.swap(pendingFunctors_);
  }
  for (const Functor& functor : functors)
  {
    functor();
  }
  callingPendingFunctors_ = false;
}

void EventLoop::printActiveChannels() const
{
  for (const Channel* channel : activeChannels_)
  {
    LOG_TRACE << "{" << channel->reventsToString() << "} ";
  }
}


标签:cb,EventLoop,---,_-,线程,net,include,channel
From: https://blog.51cto.com/u_6526235/7274964

相关文章

  • 【muduo】net篇---Poller
    Poller类负责更新某一个Channel(或者说套接字)上对应的事件,通过epoll_wait()函数返回有事件发生的套接字,设置Channel上的事件(revents_),并添加到激活事件列表中。#include<muduo/net/poller/EPollPoller.h>#include<muduo/base/Logging.h>#include<muduo/net/Channel.h>#include......
  • 论文阅读 《Pingmesh: A Large-Scale System for Data Center Network Latency Measur
    背景在我们内部产品中,一直有关于网络性能数据监控需求,我们之前是直接使用ping命令收集结果,每台服务器去ping(N-1)台,也就是N^2的复杂度,稳定性和性能都存在一些问题,最近打算对这部分进行重写,在重新调研期间看到了Pingmesh这篇论文,Pingmesh是微软用来监控数据中心网络情况......
  • 2023年DAMA-CDGA/CDGP数据治理认证线上到这里学习
    DAMA认证为数据管理专业人士提供职业目标晋升规划,彰显了职业发展里程碑及发展阶梯定义,帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力,促进开展工作实践应用及实际问题解决,形成企业所需的新数字经济下的核心职业竞争能力。DAMA是数据管理方面的认证,帮助数据从业者提升......
  • 2023年9月DAMA-CDGA/CDGP数据治理认证考试,火热报名
    据DAMA中国官方网站消息,2023年度第三期DAMA中国CDGA和CDGP认证考试定于2023年9月23日举行。 报名通道现已开启,相关事宜通知如下: 考试科目: 数据治理工程师(CertifiedDataGovernanceAssociate,CDGA)数据治理专家(CertifiedDataGovernanceProfessional,CDGP) 考试时间: CDGA:2023......
  • iTOP-i.MX8MM开发板添加RIL驱动程序库
    将Quectel提供的相应RIL库文件放入Android系统的以下路径。作者拷贝到了源码的android_build/device/fsl/imx8m/evk_8mm/lib目录下,如下图所示:然后将apns-conf.xml拷贝到android_build/device/fsl/imx8m/evk_8mm/下,如下图所示:......
  • 智能菜谱系统-计算机毕业设计源码+LW文档
    1.1研究背景自古以来,烹饪食品一直是人类的基本需求之一,烹饪技术的不断发展和创新,为人们带来了不同的美食体验。科技进步的同时又在不断地加快人们的生活节奏,越来越忙碌的生活节奏使得人们能够花费在制作美食上的时间越来越少;同时,随着生活水平的提高,人们对健康饮食的需求也日益增长......
  • 幼儿园管理系统-计算机毕业设计源码+LW文档
    摘 要现在人们对学前教育越来越重视,幼儿教育发展迅速,幼儿的数量也在大大增加,导致幼儿园的管理工作变得愈加繁重。以报表的方式管理幼儿园信息资料,不仅不方便园中资料的存储和查看,还加重了园中的管理工作、减低了工作效率。现在,大多数幼儿园都缺少一个向外界展现自身特色的平台,幼......
  • Flutter系列文章-Flutter在实际业务中的应用
    不同场景下的解决方案1.跨平台开发:在移动应用开发中,面对不同的平台(iOS和Android),我们通常需要编写两套不同的代码。而Flutter通过一套代码可以构建适用于多个平台的应用,大大提高了开发效率,降低了维护成本。2.混合开发:在一些已有的原生应用中,引入Flutter可以用于开发某些特定......
  • [代码随想录]Day30-贪心算法part04
    题目:860.柠檬水找零思路:收到钱三种情况:5刀:直接收起来就可以了,不需要找钱10刀:收到10刀,需要找5刀,如果没有5刀,就返回false,否则5刀-120刀:收到20刀(但是没用,找钱也不能找20所以不需要记录数量),优先考虑找105,因为10只能在这里用,其次再考虑找555代码:funclemonadeChange(bil......
  • JTS-IntersectionMatrix 使用说明
    参考:https://blog.csdn.net/weixin_40294332/article/details/124124928参考2:https://vimsky.com/examples/detail/java-method-com.vividsolutions.jts.geom.IntersectionMatrix.set.html......