基础引入
- 应用层read&&write的时候,把数据从用户层写到操作系统,本质是拷贝函数。
- read时候如果缓冲区没有数据,那么就要等待数据才能读取,因此IO=等待+拷贝,要进行拷贝,必须等待读写事件就绪。
- 高效IO指单位时间内,IO过程中,等待的比重小,IO效率高。
五种IO模型
- 同步阻塞IO(Blocking IO):这是一种传统的IO模型。在这种模型中,用户空间线程是主动发起IO请求的一方,而内核空间是被动接受方。用户空间程序需要等待内核IO操作彻底完成后,才返回到用户空间执行用户的操作。
- 同步非阻塞IO(Non-blocking IO):在这种模型中,用户程序不需要等待内核IO操作完成后,内核会立即返回给用户一个状态值。这样,用户空间无需等到内核IO操作彻底完成,可以立即返回用户空间执行用户的操作,处于非阻塞的状态。
- IO多路复用(IO Multiplexing):这种模型有时也称为异步阻塞IO。它是经典的Reactor设计模式,允许一个线程同时处理多个IO请求。这种模型可以提高系统的吞吐量,因为它减少了线程的上下文切换次数。
- 异步IO(Asynchronous IO):异步IO是一种更高级的IO模型,它将IO操作与用户请求分离开来。当用户发出IO请求时,系统会立即返回一个结果(通常是一个表示操作正在进行的标识符),而不会等待IO操作真正完成。当IO操作完成后,系统会通过某种方式(如回调函数)通知用户程序。
- 信号驱动IO(Signal-Driven I/O):这种模型允许用户进程收到一个信号(通常是SIGIO)来通知其某个IO操作已经完成。在信号驱动IO模型中,当用户线程发起一个IO请求时,它不需要等待这个请求完成,而是可以继续执行其他任务。当内核完成IO请求时,它会发送一个信号给用户线程,通知它IO操作已经完成。
select
函数解析
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
- nfds:需要监视的文件描述符中,最大的文件描述符值+1。
- readfds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的读事件是否就绪,返回时内核告知用户哪些文件描述符的读事件已经就绪。
- writefds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的写事件是否就绪,返回时内核告知用户哪些文件描述符的写事件已经就绪。
- exceptfds:输入输出型参数,调用时用户告知内核需要监视哪些文件描述符的异常事件是否就绪,返回时内核告知用户哪些文件描述符的异常事件已经就绪。
- timeout:输入输出型参数,调用时由用户设置select的等待时间,返回时表示timeout的剩余时间。
select函数运行时将rfds(bitmap)拷贝到内核态来判断事件是否就绪,如果没数据就阻塞等待,当事件就绪,会将rfds置位(可能有多个位),变为输出型参数,再进行后续逻辑
实现一个select_server
#pragma once
#include<iostream>
#include<sys/select.h>
#include<sys/time.h>
#include<unistd.h>
#include"Socket.hpp"
static const uint16_t defaultport=8080;
static const int fd_num_max=(sizeof(fd_set)*8);
int defaultfd=-1;
class SelectServer
{
public:
SelectServer(uint16_t port=defaultport)
:_port(port)
{
for(int i=0;i<fd_num_max;i++)
{
fd_array[i]=defaultfd;
}
}
bool Init()
{
_listensock.Sock();
_listensock.Bind(_port);
_listensock.Listen();
return true;
}
// void HandlerEvent(fd_set& fds)
// {
// for (int i = 0; i < fd_num_max; i++)
// {
// int fd=fd_array[i];
// if(fd==defaultfd)
// continue;
// if (FD_ISSET(fd, &fds))//判断当前fd是否在fd集合里,如果在表明事件就绪
// {
// // 如果listensock在fds位图中,表明链接就绪,下面的逻辑用于获取新链接加入fd_array中
// if (_listensock.Fd() == fd)
// {
// std::string clientip;
// uint16_t clientport = 0;
// int sock = _listensock.Accept(&clientip, &clientport); // 不会阻塞在这里
// if (sock < 0)
// continue;
// lg(Info, "accept success,%s,%d,sockfd:%d", clientip.c_str(), clientport, _listensock.Fd());
// // sock->fd_array[]
// int pos = 1;
// for (; pos < fd_num_max; pos++)
// {
// if (fd_array[pos] != defaultfd)
// continue;
// else
// break;
// }
// if (pos == fd_num_max)
// {
// lg(Warning, "server is full,close %d now", sock);
// close(sock);
// }
// else
// {
// fd_array[pos] = sock;
// //TODO
// }
// }
// else//如果不是listensock,那么就是读事件就绪的文件描述符
// {
// char buffer[1024];
// ssize_t n=read(fd,buffer,sizeof buffer-1);//?bug
// if(n>0)
// {
// buffer[n]=0;
// std::cout<<"get a message: "<<buffer<<std::endl;
// }
// else if(n==0)
// {
// lg(Info,"clinet quit,close fd is: ",fd);
// close(fd);
// fd_array[i]=defaultfd;
// }
// }
// }
// }
// }
void Accepter()
{
// 我们的连接事件就绪了
std::string clientip;
uint16_t clientport = 0;
int sock = _listensock.Accept(&clientip, &clientport); // 会不会阻塞在这里?不会
if (sock < 0) return;
lg(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);
// sock -> fd_array[]
int pos = 1;
for (; pos < fd_num_max; pos++) // 第二个循环
{
if (fd_array[pos] != defaultfd)
continue;
else
break;
}
if (pos == fd_num_max)
{
lg(Warning, "server is full, close %d now!", sock);
close(sock);
}
else
{
fd_array[pos] = sock;
//PrintFd();
// TODO
}
}
void Recver(int fd, int pos)
{
// demo
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?
if (n > 0)
{
buffer[n] = 0;
std::cout << "get a messge: " << buffer << std::endl;
}
else if (n == 0)
{
lg(Info, "client quit, me too, close fd is : %d", fd);
close(fd);
fd_array[pos] = defaultfd; // 这里本质是从select中移除
}
else
{
lg(Warning, "recv error: fd is : %d", fd);
close(fd);
fd_array[pos] = defaultfd; // 这里本质是从select中移除
}
}
void Dispatcher(fd_set &rfds)
{
for (int i = 0; i < fd_num_max; i++) // 这是第三个循环
{
int fd = fd_array[i];
if (fd == defaultfd)
continue;
if (FD_ISSET(fd, &rfds))
{
if (fd == _listensock.Fd())
{
Accepter(); // 连接管理器
}
else // non listenfd
{
Recver(fd, i);
}
}
}
}
void Start()
{
int listensock=_listensock.Fd();
fd_array[0]=listensock;
while(true)
{
fd_set rfds;
FD_ZERO(&rfds);
int maxfd=fd_array[0];
for(int i=0;i<fd_num_max;i++)
{
if(fd_array[i]==defaultfd)
continue;
FD_SET(fd_array[i],&rfds);
if(maxfd<fd_array[i])
{
maxfd=fd_array[i];
lg(Info,"max fd update,maxfd is:%d",maxfd);
}
}
//sleep(1);
struct timeval timeout={0,0};
//如果事件就绪,上层不处理,select会一直通知
//select告诉你就绪,接下来的一次读取,不会被阻塞
int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);
switch(n)
{
case 0:
//std::cout<<"time out,timeout: "<<timeout.tv_sec<<std::endl;
break;
case -1:
std::cerr<<"select error"<<std::endl;
break;
default:
//有事件就绪了
std::cout<<"get a new link"<<std::endl;
Dispatcher(rfds);
break;
}
}
}
~SelectServer()
{
_listensock.Close();
}
private:
Socket _listensock;
uint16_t _port;
int fd_array[fd_num_max];
};
缺点
fd有上限,输入输出型参数比较多,数据拷贝频率比较高,每次都要重置fd_set(不能重用),管理第三方数组的fd需要用户层多次遍历较繁杂,用户态到内核态数据拷贝的开销
poll
函数
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- fds:一个poll函数监视的结构列表,每一个元素包含三部分内容:文件描述符、监视的事件集合、就绪的事件集合。
- nfds:表示fds数组的长度。
- timeout:表示poll函数的超时时间,单位是毫秒(ms)。
struct pollfd
struct pollfd{
int fd;
short events;
short revents;
};
- fd:特定的文件描述符,若设置为负值则忽略events字段并且revents字段返回0。
- events:需要监视该文件描述符上的哪些事件。(&连接)
- revents:poll函数返回时告知用户该文件描述符上的哪些事件已经就绪。
简易实现
pollfd pollfds[5];
Socket lissock;
lissock.Sock();
lissock.Bind(8080);
lissock.Listen();
for(int i=0;i<5;i++)
{
std::string clientip;
uint16_t clientport;
pollfds[i].fd=lissock.Accept(&clientip,&clientport);
pollfds[i].events=POLLIN;
}
sleep(1);
while(true)
{
poll(pollfds,5,3000);
for(int i=0;i<5;i++)
{
if(pollfds[i].revents&POLLIN)
{
char buffer[1024];
read(pollfds[i].fd,buffer,sizeof buffer-1);
pollfds[i].revents=0;
}
}
}
epoll
底层原理
- epoll_create先在文件系统中创建epoll的struct file,并分配epfd给用户,同时在内核中创建RBTree用来存储以后epoll_ctr传来的socket,监听并维护这些fd,此外还要建立一个list用于存储就绪事件。
- 当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。
- list链表的维护:当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象epoll_event维护的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到就绪链表里了。
- epoll相比于select并不是在所有情况下都要高效,例如在如果有少于1024个文件描述符监听,且大多数socket都是出于活跃繁忙的状态,这种情况下,select要比epoll更为高效,因为epoll会有更多次的系统调用,用户态和内核态会有更加频繁的切换。
优势
1.检测就绪O(1),获取就绪O(n)
2.fd,event没有上限
3.wait返回值n,表示有几个fd就绪了,且就绪事件是连续的,拷贝到events数组头部n个位置。
LT和ET
Level Trigger
- 只要底层数据就绪就会一直通知用户
- 由于在LT工作模式下,只要底层有事件就绪就会一直通知用户,因此当epoll检测到底层读事件就绪时,可以不立即进行处理,或者只处理一部分,因为只要底层数据没有处理完,下一次epoll还会通知用户事件就绪。
- select和poll其实就是工作是LT模式下的。
- 支持阻塞读写和非阻塞读写。
Edge Trigger
- 只有底层就绪事件数量由无到有或由有到多发生变化的时候,epoll才会通知用户.
- 由于在ET工作模式下,只有底层就绪事件无到有或由有到多发生变化的时候才会通知用户,因此当epoll检测到底层读事件就绪时,必须立即进行处理,而且必须全部处理完毕,因为有可能此后底层再也没有事件就绪,那么epoll就再也不会通知用户进行事件处理,此时没有处理完的数据就相当于丢失了。
- ET工作模式下epoll通知用户的次数一般比LT少,因此ET的性能一般比LT性能更高,Nginx就是默认采用ET模式使用epoll的。
- 只支持非阻塞的读写,当底层读事件就绪时,循环调用recv函数进行读取,直到某次调用recv读取时,实际读取到的字节数小于期望读取的字节数,则说明本次底层数据已经读取完毕了.
- 但有可能最后一次调用recv读取时,刚好实际读取的字节数和期望读取的字节数相等,但此时底层数据也恰好读取完毕了,如果我们再调用recv函数进行读取,那么recv就会因为底层没有数据而被阻塞住。
- 而这里的阻塞是非常严重的,就比如我们这里写的服务器都是单进程的服务器,如果recv被阻塞住,并且此后该数据再也不就绪,那么就相当于我们的服务器挂掉了,因此在ET工作模式下循环调用recv函数进行读取时,必须将对应的文件描述符设置为非阻塞状态。
设置文件描述符为非阻塞
#include<unistd.h>
#include<fcntl.h>
#include<cstdlib>
void SetNonBlock(int sock)
{
int flag=fcntl(sock,F_GETFL);
if(flag<0)
exit(1);
fcntl(sock,F_SETFL,flag|O_NONBLOCK);
}
对比
ET的通知效率更高,ET的IO效率更高。
比如listenfd,接受缓冲区 可能存放多个客户端连接请求的信息,这时候要使用水平触发(LT),因为accept每次只能处理一个,需要多次触发。如果用边沿触发(ET)可能会漏掉一些连接。