首页 > 其他分享 >Web服务器项目详解

Web服务器项目详解

时间:2023-08-29 12:34:54浏览次数:44  
标签:std Web const void Callback 详解 fd 服务器 include



文章目录

  • 一、新连接到来的处理流程
  • 二、Channel、TcpConnection、TcpServer、Poller、EventLoop类详解
  • 1、Channel类
  • 2、TcpConnection类
  • 3、TcpServer类
  • 4、Poller类
  • 5、EventLoop类
  • 三、这几个类之间的关系


一、新连接到来的处理流程

一个新的连接到来后,首先被MainReactor接收,然后通过轮询调度的方式(避免了惊群效应)分配给某个subReactor,因为涉及到主线程和IO线程竞争,所以需要加锁。

subReactor这个时候可能正阻塞在epoll_wait上,所以需要异步唤醒subReactor的IO线程去接收新连接,并关注该文件描述符上是否有事件发生。

如果文件描述符上有事件发生,epoll_wait会返回活跃的文件描述符,然后回调之前被注册的事件函数

二、Channel、TcpConnection、TcpServer、Poller、EventLoop类详解

1、Channel类

Channel类表示每一个客户端连接的通道,封装了文件描述符并负责接收从TcpConnection类传过来的事件回调函数,以后当文件描述符上有事件发生时直接回调;每一个套接字对应于一个Channel

#ifndef _CHANNEL_H_
#define _CHANNEL_H_

#include <functional>

class Channel
{
public:
    //回调函数类型
    typedef std::function<void()> Callback;

    Channel();
    ~Channel();

    //设置文件描述符
    void SetFd(int fd) 
    {
        fd_ = fd; 
    }

    //获取文件描述符
    int GetFd() const
    { 
        return fd_; 
    }    

    //设置触发事件
    void SetEvents(uint32_t events)
    { 
        events_ = events; 
    }

    //获取触发事件
    uint32_t GetEvents() const
    { 
        return events_; 
    }

    //事件分发处理
    void HandleEvent();

    //设置读事件回调
    void SetReadHandle(const Callback &cb)
    {
        readhandler_ = cb; //提高效率,可以使用move语义,这里暂时还是存在一次拷贝
    }

    //设置写事件回调
    void SetWriteHandle(const Callback &cb)
    {
        writehandler_ = cb; 
    }    

    //设置错误事件回调
    void SetErrorHandle(const Callback &cb)
    { 
        errorhandler_ = cb;
    }

    //设置close事件回调
    void SetCloseHandle(const Callback &cb)
    {
        closehandler_ = cb;
    }

private:
    //文件描述符
    int fd_;
    //事件,一般情况下为epoll events 
    uint32_t events_;

    //事件触发时执行的函数,在tcpconn中注册
    Callback readhandler_;
    Callback writehandler_;
    Callback errorhandler_;
    Callback closehandler_;
};

#endif

2、TcpConnection类

TcpConnection类是对客户端连接(文件描述符和地址)的抽象,负责向channel类注册事件(可读、可写、错误等),以后如果有事件发生时还会负责数据的收发

#ifndef _TCP_CONNECTION_H_
#define _TCP_CONNECTION_H_

#include <functional>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <thread>
#include <memory>
#include "Channel.h"
#include "EventLoop.h"

class TcpConnection : public std::enable_shared_from_this<TcpConnection>
{
public:
    //TcpConnection智能指针
    typedef std::shared_ptr<TcpConnection> spTcpConnection;

    //回调函数类型
    typedef std::function<void(const spTcpConnection&)> Callback;
    typedef std::function<void(const spTcpConnection&, std::string&)> MessageCallback;
    //typedef std::function<void()> TaskCallback;.

    TcpConnection(EventLoop *loop, int fd, const struct sockaddr_in &clientaddr);
    ~TcpConnection();

    //获取当前连接的fd
    int fd() const
    { return fd_; }

    //获取当前连接所属的loop
    EventLoop* GetLoop() const { return loop_; }

    //添加本连接对应的事件到loop
    void AddChannelToLoop();

    //发送数据的函数
    void Send(const std::string &s); 

    //在当前IO线程发送数据函数
    void SendInLoop(); 

    //主动清理连接
    void Shutdown(); 

    //在当前IO线程清理连接函数
    void ShutdownInLoop();

    //可读事件回调
    void HandleRead(); 
    //可写事件回调
    void HandleWrite(); 
    //错误事件回调
    void HandleError(); 
    //连接关闭事件回调
    void HandleClose(); 

    //设置收到数据回调函数
    void SetMessaeCallback(const MessageCallback &cb)
    {
        messagecallback_ = cb;
    }

    //设置发送完数据的回调函数
    void SetSendCompleteCallback(const Callback &cb)
    {
        sendcompletecallback_ = cb;
    }

    //设置连接关闭的回调函数
    void SetCloseCallback(const Callback &cb)
    {
        closecallback_ = cb;
    }

    //设置连接异常的回调函数
    void SetErrorCallback(const Callback &cb)
    {
        errorcallback_ = cb;
    }

    //设置连接清理函数
    void SetConnectionCleanUp(const Callback &cb)
    {
        connectioncleanup_ = cb;
    }

    //设置异步处理标志,开启工作线程池的时候使用
    void SetAsyncProcessing(const bool asyncprocessing)
    {
        asyncprocessing_ = asyncprocessing;
    }

private:
    //当前连接所在的loop
    EventLoop *loop_;

    //当前连接的事件
    std::unique_ptr<Channel> spchannel_;

    //文件描述符
    int fd_;

    //对端地址
    struct sockaddr_in clientaddr_;

    //半关闭标志位
    bool halfclose_; 

    //连接已关闭标志位
    bool disconnected_; 

    //异步调用标志位,当工作任务交给线程池时,置为true,任务完成回调时置为false
    bool asyncprocessing_; 

    //读写缓冲
    std::string bufferin_;
    std::string bufferout_;

    //各种回调函数
    MessageCallback messagecallback_;
    Callback sendcompletecallback_;
    Callback closecallback_;
    Callback errorcallback_;
    Callback connectioncleanup_;
};

#endif

3、TcpServer类

TcpServer类负责处理新连接,并把新连接通过轮询调度的策略转发给subReactor,同时还要接收从用户传过来的事件处理函数,并创建一个TcpConnection类,把事件处理函数传递给TcpConnection类,最终这些函数都会传递到channel类上

#include "TcpServer.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <memory>

void Setnonblocking(int fd);

TcpServer::TcpServer(EventLoop* loop, const int port, const int threadnum)
    : serversocket_(),
    loop_(loop),
    serverchannel_(),
    conncount_(0),
    eventloopthreadpool(loop, threadnum)
{
    //serversocket_.SetSocketOption(); 
    serversocket_.SetReuseAddr();   
    serversocket_.BindAddress(port);
    serversocket_.Listen();
    serversocket_.Setnonblocking();

    serverchannel_.SetFd(serversocket_.fd());
    serverchannel_.SetReadHandle(std::bind(&TcpServer::OnNewConnection, this));
    serverchannel_.SetErrorHandle(std::bind(&TcpServer::OnConnectionError, this));
    
}

TcpServer::~TcpServer()
{

}

void TcpServer::Start()
{
    eventloopthreadpool.Start();

    serverchannel_.SetEvents(EPOLLIN | EPOLLET);
    loop_->AddChannelToPoller(&serverchannel_);
}

//新TCP连接处理,核心功能,业务功能注册,任务分发
void TcpServer::OnNewConnection()
{
    //循环调用accept,获取所有的建立好连接的客户端fd
    struct sockaddr_in clientaddr;
    int clientfd;
    while( (clientfd = serversocket_.Accept(clientaddr)) > 0) 
    {
        std::cout << "New client from IP:" << inet_ntoa(clientaddr.sin_addr) 
            << ":" << ntohs(clientaddr.sin_port) << std::endl;
        
        if(++conncount_ >= MAXCONNECTION)
        {
            close(clientfd);
            continue;
        }
        Setnonblocking(clientfd);

        //选择IO线程loop
        EventLoop *loop = eventloopthreadpool.GetNextLoop();

        //创建连接,注册业务函数
        std::shared_ptr<TcpConnection> sptcpconnection = std::make_shared<TcpConnection>(loop, clientfd, clientaddr);
        sptcpconnection->SetMessaeCallback(messagecallback_);
        sptcpconnection->SetSendCompleteCallback(sendcompletecallback_);
        sptcpconnection->SetCloseCallback(closecallback_);
        sptcpconnection->SetErrorCallback(errorcallback_);
        sptcpconnection->SetConnectionCleanUp(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
        {
            std::lock_guard<std::mutex> lock(mutex_);
            tcpconnlist_[clientfd] = sptcpconnection;
        }
        

        newconnectioncallback_(sptcpconnection);
        //Bug,应该把事件添加的操作放到最后,否则bug segement fault,导致HandleMessage中的phttpsession==NULL
        //总之就是做好一切准备工作再添加事件到epoll!!!
        sptcpconnection->AddChannelToLoop();
    }
}

//连接清理,bugfix:这里应该由主loop来执行,投递回主线程删除 OR 多线程加锁删除
void TcpServer::RemoveConnection(std::shared_ptr<TcpConnection> sptcpconnection)
{
    std::lock_guard<std::mutex> lock(mutex_);
    --conncount_;
    //std::cout << "clean up connection, conncount is" << conncount_ << std::endl;   
    tcpconnlist_.erase(sptcpconnection->fd());
}

void TcpServer::OnConnectionError()
{    
    std::cout << "UNKNOWN EVENT" << std::endl;
    serversocket_.Close();
}

void Setnonblocking(int fd)
{
    int opts = fcntl(fd, F_GETFL);
    if (opts < 0)
    {
        perror("fcntl(fd,GETFL)");
        exit(1);
    }
    if (fcntl(fd, F_SETFL, opts | O_NONBLOCK) < 0)
    {
        perror("fcntl(fd,SETFL,opts)");
        exit(1);
    }
}

4、Poller类

Poller类调用epoll_wait返回有事件发生的文件描述符,并更新channel上的事件,然后添加到活跃文件描述符列表上(activechannellist)

#include "Poller.h"
#include <iostream>
#include <stdio.h> //perror
#include <stdlib.h> //exit
#include <unistd.h> //close
#include <errno.h>

#define EVENTNUM 4096 //最大触发事件数量
#define TIMEOUT 1000 //epoll_wait 超时时间设置

Poller::Poller(/* args */)
    : pollfd_(-1),
    eventlist_(EVENTNUM),
    channelmap_(),
    mutex_()
{
    pollfd_ = epoll_create(256);
    if(pollfd_ == -1)
    {
        perror("epoll_create1");
        exit(1);
    }
    std::cout << "epoll_create" << pollfd_ << std::endl;
}

Poller::~Poller()
{
    close(pollfd_);
}

//等待I/O事件
void Poller::poll(ChannelList &activechannellist)
{
    int timeout = TIMEOUT;
    //std::cout << "epoll_wait..." << std::endl;(int)eventlist_.capacity()
    int nfds = epoll_wait(pollfd_, &*eventlist_.begin(), (int)eventlist_.capacity(), timeout);
    //int nfds = epoll_wait(pollfd_, &*eventlist_.begin(), (int)channelmap_.size()*0.7+1, timeout);
    if(nfds == -1)
    {
        //printf("epoll_wait error code is:%d", errno);
        perror("epoll wait error");
        //exit(1);
    }
    //printf("event num:%d\n", nfds);
    //std::cout << "event num:" << nfds << "\n";// << std::endl;
    for(int i = 0; i < nfds; ++i)
    {
        int events = eventlist_[i].events;
        //int fd = eventlist_[i].data.fd;
        Channel *pchannel = (Channel*)eventlist_[i].data.ptr;
        int fd = pchannel->GetFd();

        std::map<int, Channel*>::const_iterator iter;
        {
            std::lock_guard <std::mutex> lock(mutex_);
            iter = channelmap_.find(fd);
        }        
        if(iter != channelmap_.end())
        {
            pchannel->SetEvents(events);
            activechannellist.push_back(pchannel);
        }
        else
        {            
            std::cout << "not find channel!" << std::endl;
        }
    }
    if(nfds == (int)eventlist_.capacity())
    {
        std::cout << "resize:" << nfds << std::endl;
        eventlist_.resize(nfds * 2);
    }
    //eventlist_.clear();

}

//添加事件
void Poller::AddChannel(Channel *pchannel)
{
    int fd = pchannel->GetFd();
    struct epoll_event ev;
    ev.events = pchannel->GetEvents();
    //data是联合体
    //ev.data.fd = fd;
    ev.data.ptr = pchannel;
    {
        std::lock_guard <std::mutex> lock(mutex_);
        channelmap_[fd] = pchannel;
    }      

    if(epoll_ctl(pollfd_, EPOLL_CTL_ADD, fd, &ev) == -1)
    {
        perror("epoll add error");
        exit(1);
    }
    //std::cout << "addchannel!" << std::endl;
}

//删除事件
void Poller::RemoveChannel(Channel *pchannel)
{
    int fd = pchannel->GetFd();
    struct epoll_event ev;
    ev.events = pchannel->GetEvents();
    ///ev.data.fd = fd
    ev.data.ptr = pchannel;
    {
        std::lock_guard <std::mutex> lock(mutex_);
        channelmap_.erase(fd);
    }    

    if(epoll_ctl(pollfd_, EPOLL_CTL_DEL, fd, &ev) == -1)
    {
        perror("epoll del error");
        exit(1);
    }
    //std::cout << "removechannel!" << std::endl;
}

//更新事件
void Poller::UpdateChannel(Channel *pchannel)
{
    int fd = pchannel->GetFd();
    struct epoll_event ev;
    ev.events = pchannel->GetEvents();
    //ev.data.fd = fd;
    ev.data.ptr = pchannel;

    if(epoll_ctl(pollfd_, EPOLL_CTL_MOD, fd, &ev) == -1)
    {
        perror("epoll update error");
        exit(1);
    }
    //std::cout << "updatechannel!" << std::endl;
}

5、EventLoop类

EventLoop类根据Poller类返回的活跃文件描述符列表activechannellist,然后遍历该列表,依次调用channel上的回调函数

#include "EventLoop.h"
#include <iostream>
#include <sys/eventfd.h>
#include <unistd.h>
#include <stdlib.h>

int CreateEventFd()
{
    int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        std::cout << "Failed in eventfd" << std::endl;
        exit(1);
    }
    return evtfd;
}

EventLoop::EventLoop(/* args */)
    : functorlist_(),
    channellist_(),
    activechannellist_(),
    poller_(),
    quit_(true),
    tid_(std::this_thread::get_id()),
    mutex_(),
    wakeupfd_(CreateEventFd()),
    wakeupchannel_()
{
    wakeupchannel_.SetFd(wakeupfd_);
    wakeupchannel_.SetEvents(EPOLLIN | EPOLLET);
    wakeupchannel_.SetReadHandle(std::bind(&EventLoop::HandleRead, this));
    wakeupchannel_.SetErrorHandle(std::bind(&EventLoop::HandleError, this));
    AddChannelToPoller(&wakeupchannel_);
}

EventLoop::~EventLoop()
{
    close(wakeupfd_);
}

void EventLoop::WakeUp()
{
    uint64_t one = 1;
    ssize_t n = write(wakeupfd_, (char*)(&one), sizeof one);
}

void EventLoop::HandleRead()
{
    uint64_t one = 1;
    ssize_t n = read(wakeupfd_, &one, sizeof one);
}

void EventLoop::HandleError()
{
    ;
}    

void EventLoop::loop()
{
    quit_ = false;
    while(!quit_)
    {
        poller_.poll(activechannellist_);
        for(Channel *pchannel : activechannellist_)
        {            
            pchannel->HandleEvent();//处理事件
        }
        activechannellist_.clear();
        ExecuteTask();
    }
}

三、这几个类之间的关系

Web服务器项目详解_#include


标签:std,Web,const,void,Callback,详解,fd,服务器,include
From: https://blog.51cto.com/u_6526235/7274826

相关文章

  • 编译wasm Web应用
    刚学完WebAssembly的入门课,卖弄一点入门知识。首先我们知道wasm是目标语言,是一种新的V-ISA标准,所以编写wasm应用,正常来说不会直接使用WAT可读文本格式,更不会用wasm字节码;而是使用其他高级语言编写源代码,经过编译后得到wasm应用。课程中使用了C++来编写源代码,所以这里我也用C++来......
  • HTTP服务器项目面试题
    ......
  • ByteBuf用法详解文档
    来源:http://www.taodudu.cc/news/show-3638306.html?action=onClick_____________________________________________________________________________________________ ByteBufbytebuf文档点这里基本信息:ByteBuf类java.lang.Objectio.netty.buffer.ByteBuf所有已实......
  • Web服务器项目中常问的开放性问题
    文章目录一、对后端开发的看法?二、为什么要做静态http服务器?三、为什么要使用ET模式?四、多线程编程的注意事项1、为什么要用多线程2、多线程编程的特点3、确保线程安全的几种方式4、与多线程有关的编程方法五、使用Linux系统有什么好处?六、对云计算的看法?七、服务器突然崩溃退出,怎......
  • spring boot WebSocket @ServerEndpoint注解标识的class无法获取spring容器中的bean
    在@ServerEndpoint类中直接使用@Autowired注解注入Spring管理的bean可能不会成功,因为@ServerEndpoint并不受Spring容器的管理。通过创建一个静态的成员遍历属性和一个带有@Autowired注解的setter方法,你可以在类加载时将bean注入到静态属性中。但是,请注意这样做......
  • 怎么搭建web组态
    web组态是指通过可视化拖拽组件的方式,低代码搭建监控大屏、web页面。物联网各行业的数据以及监控场景,每个行业的业务不同,需要展示的页面也不同。组态快速搭建页面的优势,能更好的满足不同定制化监控页面的需求。BY组态软件,专注于能源电力、工业互联网、智能制造、原型设计等领域的......
  • Web组态可视化软件之BY组态可视化平台介绍
    Web组态可视化软件之BY组态可视化平台介绍关于组态软件,首先要从组态的概念开始说起。 什么是组态组态(Configure)的概念来自于20世纪70年代中期出现的第一代集散控制系统(DistributedControlSystem),可以理解为“配置”、“设定”、“设置”等,是指通过人机开发界面,用类似“搭积木”......
  • 软件测试|SQL中的UNION和UNION ALL详解
    简介在SQL(结构化查询语言)中,UNION和UNIONALL是用于合并查询结果集的两个关键字。它们在数据库查询中非常常用,但它们之间有一些重要的区别。在本文中,我们将深入探讨UNION和UNIONALL的含义、用法以及它们之间的区别。UNION操作UNION用于合并两个或多个查询的结果集,并返回一个唯一的......
  • 软件测试|Python中的变量与关键字详解
    简介在Python编程中,变量和关键字是非常重要的概念。它们是构建和控制程序的基本要素。本文将深入介绍Python中的变量和关键字,包括它们的定义、使用方法以及一些常见注意事项。变量变量的定义变量是用于存储数据值的名称。在Python中,变量无需声明,可以直接赋值使用。变量可以存储不同......
  • 【内部】服务器使用
    服务器参数服务器虚拟ip:10.242.0.2前期准备:Zerotier下载链接网络:打开zerotierone环境配置:ssh登陆:ssh用户名@服务器ip输入密码输入condaenvlist查看当前环境输入condacreate-n环境名python=3.X(输入python版本)pycharm远程连接:参考python:解释器位置/hom......