首页 > 编程语言 >Swoole 源码分析之 epoll 多路复用模块

Swoole 源码分析之 epoll 多路复用模块

时间:2024-03-11 09:33:55浏览次数:38  
标签:reactor Swoole epoll event 源码 fd events socket

首发原文链接:Swoole 源码分析之 Http Server 模块
大家好,我是码农先森。

引言

在传统的IO模型中,每个IO操作都需要创建一个单独的线程或进程来处理,这样的操作会导致系统资源的大量消耗和管理开销。

而IO多路复用技术通过使用少量的线程或进程同时监视多个IO事件,能够更高效地处理大量的IO操作,从而提高系统的性能和资源利用率。

在IO多路复用的技术中尤其突出的是 epoll 技术,它是解决 C10K 问题的利器。

Swoole 中的多路复用

多路复用技术可以说是贯穿了整个 Swoole,同时也是 Swoole 为什么是高性能通信框架的根本原因。

Swoole 最重要的协程模块就是利用的 IO 多路复用事件循环技术,这也是与 Go 语言中协程不同的本质原因。

下面我们来一起看下 Swoole 中是如何实现 epoll 多路复用技术的。

这是创建 eoll 实例的方法,其中的 Reactor 是一个线程对象。

// 创建一个 epoll 实例,为其分配事件数组,并设置相关的 reactor 对象属性
// swoole-src/src/reactor/epoll.cc:71
ReactorEpoll::ReactorEpoll(Reactor *_reactor, int max_events) : ReactorImpl(_reactor) {
    // 创建一个 epoll 实例
    epfd_ = epoll_create(512);
    // 检查 epoll 创建是否成功
    if (!ready()) {
        swoole_sys_warning("epoll_create failed");
        return;
    }

    // epoll_event 结构体数组分配内存
    // 用于存储注册到 epoll 实例上的事件
    events_ = new struct epoll_event[max_events];
    // 设置最大事件数量
    reactor_->max_event_num = max_events;
    // native_handle 设置为 epoll 实例
    reactor_->native_handle = epfd_;
}

这个方法是向 epoll 事件循环中添加一个客户端的连接对象,用于监听。

// 向 epoll 事件循环中添加一个 socket,并为其设置特定的事件监听
// swoole-src/src/reactor/epoll.cc:94
int ReactorEpoll::add(Socket *socket, int events) {
    // 定义 epoll_event 结构体实例 e
    struct epoll_event e;
    // 设置事件类型
    e.events = get_events(events);
    // 设置 socket 指针,在 epoll 触发事件时,可以找到对应的 socket
    e.data.ptr = socket;

    // 添加事件到 epoll
    if (epoll_ctl(epfd_, EPOLL_CTL_ADD, socket->fd, &e) < 0) {
        swoole_sys_warning(
            "failed to add events[fd=%d#%d, type=%d, events=%d]", socket->fd, reactor_->id, socket->fd_type, events);
        return SW_ERR;
    }

    // 在 Reactor 中添加 socket
    // 为了在 Reactor 内部进行管理和跟踪
    reactor_->_add(socket, events);
    swoole_trace_log(
        SW_TRACE_EVENT, "add events[fd=%d#%d, type=%d, events=%d]", socket->fd, reactor_->id, socket->fd_type, events);

    return SW_OK;
}

这个方法是从 epoll 事件循环中移除一个客户端连接对象。

// 从 epoll 事件循环中删除一个 socket
// swoole-src/src/reactor/epoll.cc:113
int ReactorEpoll::del(Socket *_socket) {
    // 检查 socket 是否已被移除
    if (_socket->removed) {
        swoole_error_log(SW_LOG_WARNING,
                         SW_ERROR_EVENT_SOCKET_REMOVED,
                         "failed to delete events[fd=%d, fd_type=%d], it has already been removed",
                         _socket->fd, _socket->fd_type);
        return SW_ERR;
    }
    
    // 使用 epoll_ctl 函数从 epoll 的文件描述符 epfd_ 中删除 socket
    if (epoll_ctl(epfd_, EPOLL_CTL_DEL, _socket->fd, nullptr) < 0) {
        after_removal_failure(_socket);
        if (errno != EBADF && errno != ENOENT) {
            return SW_ERR;
        }
    }

    swoole_trace_log(SW_TRACE_REACTOR, "remove event[reactor_id=%d|fd=%d]", reactor_->id, _socket->fd);
    // 从 Reactor 中删除该 socket
    reactor_->_del(_socket);

    return SW_OK;
}

这个方法是用于修改一个已经在 epoll 事件循环中的客户端连接对象。

// 修改一个已经存在于 epoll 事件循环中的 socket 的事件监听类型
// swoole-src/src/reactor/epoll.cc:134
int ReactorEpoll::set(Socket *socket, int events) {
    // 定义 epoll_event 结构体实例 e
    struct epoll_event e;
    // 设置事件类型
    e.events = get_events(events);
    // 设置 socket 指针,在 epoll 触发事件时,可以找到对应的 socket
    e.data.ptr = socket;

    // 使用 epoll_ctl 函数修改 epoll 文件描述符 epfd_ 中对应 socket 的事件
    int ret = epoll_ctl(epfd_, EPOLL_CTL_MOD, socket->fd, &e);
    if (ret < 0) {
        swoole_sys_warning(
            "failed to set events[fd=%d#%d, type=%d, events=%d]", socket->fd, reactor_->id, socket->fd_type, events);
        return SW_ERR;
    }

    swoole_trace_log(SW_TRACE_EVENT, "set event[reactor_id=%d, fd=%d, events=%d]", reactor_->id, socket->fd, events);
    // 在 Reactor 内部进行相应的设置
    reactor_->_set(socket, events);

    return SW_OK;
}

这个方法是 epoll 事件循环环节中最重要的一点,开始等待 Socket IO事件的触发,并且调用对应的处理函数。

// swoole-src/src/reactor/epoll.cc:153
int ReactorEpoll::wait(struct timeval *timeo) {
    // 声明事件对象 event、Reactor 处理对象 handler
    Event event;
    ReactorHandler handler;
    int i, n, ret;

    // reactor 对象 ID 和 最大事件数量
    int reactor_id = reactor_->id;
    int max_event_num = reactor_->max_event_num;

    // 用于设置超时时间,如果 timeout_msec 为 0,则根据传入的 timeo 参数设置超时时间
    if (reactor_->timeout_msec == 0) {
        if (timeo == nullptr) {
            reactor_->timeout_msec = -1;
        } else {
            reactor_->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;
        }
    }
	
	// 在进入事件循环之前调用 before_wait 方法,表示准备开始等待事件
    reactor_->before_wait();

    while (reactor_->running) {
        // 如果定义了 onBegin 回调函数,则调用它来执行相应的操作
        if (reactor_->onBegin != nullptr) {
            reactor_->onBegin(reactor_);
        }

        // 调用 epoll_wait 函数获取就绪事件的数量
        n = epoll_wait(epfd_, events_, max_event_num, reactor_->get_timeout_msec());
        if (n < 0) {
            // 如果出现错误且不捕获错误,则打印错误信息并返回错误码
            if (!reactor_->catch_error()) {
                swoole_sys_warning("[Reactor#%d] epoll_wait failed", reactor_id);
                return SW_ERR;
            } else {
                goto _continue;
            }
        } else if (n == 0) {
            // 如果返回的就绪事件数为 0,则执行结束回调函数并继续下一轮循环。
            reactor_->execute_end_callbacks(true);
            SW_REACTOR_CONTINUE;
        }
        
        for (i = 0; i < n; i++) {
            // 在处理每个就绪事件时,将事件相关信息保存在event对象中
            event.reactor_id = reactor_id;
            event.socket = (Socket *) events_[i].data.ptr;
            event.type = event.socket->fd_type;
            event.fd = event.socket->fd;

            // 如果事件类型是 EPOLLRDHUP、EPOLLERR 或 EPOLLHUP 之一,则设置 event_hup 标志为 1。
            if (events_[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
                event.socket->event_hup = 1;
            }

            // 检查是否存在可读事件且套接字未被移除
            // read 如果是可读事件(EPOLLIN),则调用相应的读事件处理器
            if ((events_[i].events & EPOLLIN) && !event.socket->removed) {
                handler = reactor_->get_handler(SW_EVENT_READ, event.type);
                ret = handler(reactor_, &event);
                if (ret < 0) {
                    swoole_sys_warning("EPOLLIN handle failed. fd=%d", event.fd);
                }
            }
            
            // 检查是否存在可写事件且套接字未被移除
            // write 如果是可写事件(EPOLLOUT),则调用相应的写事件处理器。
            if ((events_[i].events & EPOLLOUT) && !event.socket->removed) {
                handler = reactor_->get_handler(SW_EVENT_WRITE, event.type);
                ret = handler(reactor_, &event);
                if (ret < 0) {
                    swoole_sys_warning("EPOLLOUT handle failed. fd=%d", event.fd);
                }
            }

            // error 如果是错误事件(EPOLLRDHUP、EPOLLERR、EPOLLHUP),则调用相应的错误事件处理器。
            if ((events_[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) && !event.socket->removed) {
                // ignore ERR and HUP, because event is already processed at IN and OUT handler.
                if ((events_[i].events & EPOLLIN) || (events_[i].events & EPOLLOUT)) {
                    continue;
                }
                handler = reactor_->get_error_handler(event.type);
                ret = handler(reactor_, &event);
                if (ret < 0) {
                    swoole_sys_warning("EPOLLERR handle failed. fd=%d", event.fd);
                }
            }

            // 在处理完事件后,检查是否需要执行一次性事件的删除操作
            if (!event.socket->removed && (event.socket->events & SW_EVENT_ONCE)) {
                reactor_->_del(event.socket);
            }
        }

    _continue:
        // 在事件循环中执行回调函数并继续下一轮循环
        reactor_->execute_end_callbacks(false);
        SW_REACTOR_CONTINUE;
    }
    return 0;
}

总结

  1. epoll 在内部使用了红黑树的数据结构,红黑树是一个高效的数据结构。
  2. epoll 是解决 C10K 问题的利器,不仅是在 Swoole 中被应用,在很多的高性能服务中也有应用,例如:Nginx 服务等。
  3. Swoole 被称为高性能通信框架的关键原因,就是采用了 epoll 多路复用技术。

标签:reactor,Swoole,epoll,event,源码,fd,events,socket
From: https://www.cnblogs.com/yxhblogs/p/18050311

相关文章

  • Java面试必考题之线程的生命周期,结合源码,透彻讲解!
    写在开头在前面的几篇博客里,我们学习了Java的多线程,包括线程的作用、创建方式、重要性等,那么今天我们就要正式踏入线程,去学习更加深层次的知识点了。第一个需要学的就是线程的生命周期,也可以将之理解为线程的几种状态,以及互相之间的切换,这几乎是Java多线程的面试必考题,每一年都......
  • dolphinscheduler 实现master宕机故障转移能力源码分析
    DS(dolphinscheduler)的master是去中心化的,而故障转移能力是由master完成的,那么是多个master同时干故障转移,还是选举出一个master来干这件事情呢?回归到源码进行分析1.master启动方法@PostConstructpublicvoidrun()throwsSchedulerException{....this.failoverE......
  • 通达信买点100%指标公式源码1
    {通达信买点100%指标公式源码1}Var1:=1;趋势线:((3*SMA((CLOSE-LLV(LOW,27))/(HHV(HIGH,27)-LLV(LOW,27))*100,5,1)-2*SMA(SMA((CLOSE-LLV(LOW,27))/(HHV(HIGH,27)-LLV(LOW,27))*100,5,1),3,1)-50)*1.032+50),COLORRED;Var2:=(2*CLOSE+HIGH+LOW+OPEN)/5;Var3:=LLV(LOW,34)......
  • 通达信《鱼窝打分+鱼游打分》鱼仙指标 尾盘专用打分1支 止跌止盈量化计算 盘中捉涨停
    {通达信《鱼窝打分+鱼游打分》鱼仙指标尾盘专用打分1支止跌止盈量化计算盘中捉涨停捉妖源码文件分享}通达信《鱼窝打分+鱼游打分》鱼仙指标尾盘专用打分1支止跌止盈量化计算源码文件分享本指标每天尾盘打分1只《2022鱼仙指标盘中捉涨停妖栏》鱼窝打分鱼游打分稳......
  • 通达信成本无敌主图指标公式源码
    {通达信成本无敌主图指标公式源码}{指标介绍:出现绿色曲线时,此时我们持股,绿色曲线消失时我们持币。同时对于股价站上60日均线即大红粗线,成本无敌向上发散,此时就是打板时重点关注的对象。}JJJ:=IF(DYNAINFO(8)>0.01,0.01*DYNAINFO(10)/DYNAINFO(8),DYNAINFO(3));DDD:=(DYNAINF......
  • 通达信蓝防守黄进攻主图指标公式源码
    {通达信蓝防守黄进攻主图指标公式源码}DIFF:=EMA(CLOSE,12)-EMA(CLOSE,26);DEA:=EMA(DIFF,9);macd:=2*(DIFF-DEA);H9:=HHV(C,9),LINETHICK1,COLORGRAY;L9:=LLV(C,9),LINETHICK1,COLORGRAY;DRAWBAND(H9,RGB(255,217,40),L9,RGB(255,255,255));STICKLINE(MacD<0OR(DIF......
  • 通达信庄家诱空洗盘出击指标公式源码
    {通达信庄家诱空洗盘出击指标公式源码}{庄家诱空洗盘出击}MA05:=MA(C,5); MA10:=MA(C,10);MA30:=MA(C,30);BB05:=ATAN((MA05/REF(MA05,1)-1)*100)*180/3.1416;BB10:=ATAN((MA10/REF(MA10,1)-1)*100)*180/3.1416;BB30:=ATAN((MA30/REF(MA30,1)-1)*100)*180/3.1416;出......
  • 通达信短线抄底避险指标公式源码
    {通达信短线抄底避险指标公式源码}N:=5;走势:4*SMA((CLOSE-LLV(LOW,N))/(HHV(HIGH,N)-LLV(LOW,N))*100,5,1)-3*SMA(SMA((CLOSE-LLV(LOW,N))/(HHV(HIGH,N)-LLV(LOW,N))*100,5,1),3.2,1),COLOR0099FF,LINETHICK1;底部线:8,COLORGREEN,LINETHICK1;脱离底部:IF(crOSS(走势,底......
  • spring-security源码-FilterChainProxy
    FilterChainProxy内部存储了我们各个HttpSecurty生成的SecurityFilterChain。FilterChainProxy实现了ServletFilter接口。只真正的入口org.springframework.security.web.FilterChainProxy.doFilterpublicvoiddoFilter(ServletRequestrequest,ServletResponseresponse,F......
  • spring-security源码-如何初始化SecurityFilterChain到Servlet
    1.SecurityFilterChain是由HttpSecurty根据各个config配置生成的FilterSecurityFilterChain是接口,默认实现是由DefaultSecurityFilterChainSecurityFilterChain只充当描述的作用,描述哪些url走这批filterpublicfinalclassDefaultSecurityFilterChainimplementsSecurityF......