目录
在Linux系统中,I/O多路转接是一种重要的I/O模型(也称I/O多路复用),它能够同时等待多个文件描述符的就绪状态,提高程序的效率。本文将重点介绍I/O多路转接中的select和poll。
理解 I/O 多路复用中的“多路”和“复用”:
“多路”指的是存在大量需要处理的连接或任务。在网络通信或文件操作等场景中,可能同时有众多的客户端请求、多个文件的读写操作等。这意味着系统面临着处理众多不同来源和类型的输入/输出需求。
“复用”强调的是通过有效利用有限的资源,如线程,来应对这些众多的连接或任务。它的核心是避免为每个连接或任务单独分配一个资源(如线程),而是让一个资源能够同时处理多个连接或任务,从而降低系统开销,提高资源的利用效率。
实现复用及相关问题的解决
在非阻塞 I/O 中,通过不断轮询众多连接的 socket 上的接收缓冲区来判断是否有数据到达,从而实现一个线程处理多个连接的读写事件。然而,这种方式存在频繁系统调用带来大量上下文切换开销的问题。
为解决这一问题,将轮询操作从用户空间转移到内核空间,让操作系统来实现轮询过程。这样做的好处在于,减少了用户空间与内核空间的频繁切换,降低了系统调用的次数,从而提高了系统的性能。
常见的操作系统实现的 I/O 多路复用模型有 select、poll 和 epoll 等。以 epoll 为例,它通过事件驱动的方式,当有数据就绪时才通知应用程序进行处理,避免了无意义的轮询,进一步提高了性能。
一、select
(一)初识select
系统提供select
函数来实现多路复用输入/输出模型。select
系统调用用于监视多个文件描述符的状态变化,程序会停在select
这里等待,直到被监视的文件描述符有一个或多个发生了状态改变。
select
函数原型如下:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
传入最大文件描述符,以实现内核中根据进程的文件描述符表遍历fd
参数解释:
nfds
:需要监视的最大的文件描述符值 + 1。rdset
、wrset
、exset
:分别对应于需要检测的可读文件描述符的集合、可写文件描述符的集合及异常文件描述符的集合;它们是输入输出型参数,输入时告知内核要检测的fd,输出时告知调用者已经就绪的fd;每个参数都是一个位图,比特位代表了fd的值,0/1代表的1是否检测或是否就绪。timeout
:为结构timeval
,用来设置select()
的等待时间。其取值包括:NULL
:表示select()
没有timeout
,将一直被阻塞,直到某个文件描述符上发生了事件。0
:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。- 特定的时间值:如果在指定的时间段里没有事件发生,
select
将超时返回。
关于fd_set
结构:
typedef struct
{
#ifdef USE_XOPEN
_fd_mask fds_bits[_FD_SETSIZE / NFDBITS];
#define FDS_BS(set) (set)->fds_bits
#else
fd_mask __fds_bits[_FD_SETSIZE / NFDBITS];
#define FDS_BITS(set) ((set)->fds_bits)
#endif
} fd_set;
fd_set
实际上是一个整数数组,更严格地说,是一个“位图”,使用位图中对应的位来表示要监视的文件描述符。同时,提供了一组操作fd_set
的接口,如FD_CLR
、FD_ISSET
、FD_SET
、FD_ZERO
,用于方便地操作位图。
关于timeval
结构:
struct timeval
{
time_t tv_sec; // 秒
suseconds_t tv_usec; // 微秒
};
timeval
结构用于描述一段时间长度,如果在这个时间内,需要监视的描述符没有事件发生则函数返回,返回值为0。
函数返回值:
- 执行成功则返回文件描述词状态已改变的个数。
- 如果返回0代表在描述词状态改变前已超过
timeout
时间,等待下一轮的检测。 - 当有错误发生时则返回 - 1,错误原因存于
errno
,此时参数readfds
、writefds
、exceptfds
和timeout
的值变成不可预测。
常见的程序片段如下:
fd_set readset;
FD_SET(fd, &readset);
select(fd + 1, &readset, NULL, NULL, NULL);
if (FD_ISSET(fd, readset)){……}
(二)理解select执行过程
理解select
模型的关键在于理解fd_set
。为说明方便,取fd_set
长度为1字节,fd_set
中的每一bit
可以对应一个文件描述符fd
。
- 执行
fd_set set; FD_ZERO(&set);
,则set
用位表示是0000,0000
。 - 若
fd = 5
,执行FD_SET(fd, &set);
后,set
变为0001,0000
(第5位置为1)。 - 若再加入
fd = 2
,fd = 1
,则set
变为0001,0011
。 - 执行
select(6, &set, 0, 0, 0)
阻塞等待。 - 若
fd = 1
,fd = 2
上都发生可读事件,则select
返回,此时set
变为0000,0011
。注意:没有事件发生的fd = 5
被清空。
(三)socket就绪条件
- 读就绪:
socket
内核中,接收缓冲区中的字节数,大于等于低水位标记SO_RCVLOWAT
。此时可以无阻塞地读该文件描述符,并且返回值大于0。socket
TCP
通信中,对端关闭连接,此时对该socket
读,则返回0。- 监听的
socket
上有新的连接请求。 socket
上有未处理的错误。
- 写就绪:
socket
内核中,发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小),大于等于低水位标记SO_SNDLOWAT
,此时可以无阻塞地写,并且返回值大于0。socket
的写操作被关闭(close
或者shutdown
)。对一个写操作被关闭的socket
进行写操作,会触发SIGPIPE
信号。socket
使用非阻塞connect
连接成功或失败之后。socket
上有未读取的错误。
- 异常就绪:
socket
上收到带外数据。关于带外数据,和TCP
紧急模式相关(TCP
协议头中,有一个紧急指针的字段)。
(四)select的特点
- 可监控的文件描述符个数取决于
sizeof(fd_set)
的值。服务器上sizeof(fd_set) = 512
,每bit
表示一个文件描述符,则服务器上支持的最大文件描述符是512 * 8 = 4096(不同内核可能情况不同)
- 将
fd
加入select
监控集的同时,还要再使用一个数据结构array
保存放到select
监控集中的fd
,一是用于在select
返回后,array
作为源数据和fd_set
进行FD_ISSET
判断;二是select
返回后会把以前加入的但并无事件发生的fd
清空,则每次开始select
前都要重新从array
取得fd
逐一加入,扫描array
的同时取得fd
最大值maxfd
,用于select
的第一个参数。
(五)select缺点
- 每次调用
select
,都需要手动设置fd
集合,从接口使用角度来说非常不便。 - 每次调用
select
,都需要把fd
集合从用户态拷贝到内核态,这个开销在fd
很多时会很大。 - 同时每次调用
select
都需要在内核遍历传递进来的所有fd
,这个开销在fd
很多时也很大。 select
支持的文件描述符数量太小,所以select本身能检测的fd是有上限的。
(六)select使用示例
- 检测标准输入输出:
#include <stdio.h>
#include <unistd.h>
#include <sys/select.h>
int main() {
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(0, &read_fds);
for (;;) {
printf("> ");
fflush(stdout);
int ret = select(1, &read_fds, NULL, NULL, NULL);
if (ret < 0) {
perror("select");
continue;
}
if (FD_ISSET(0, &read_fds)) {
char buf[1024] = {0};
read(0, buf, sizeof(buf) - 1);
printf("input: %s", buf);
} else {
printf("error! invaild fd\n");
continue;
}
FD_ZERO(&read_fds);
FD_SET(0, &read_fds);
}
return 0;
}
说明:当只检测文件描述符0(标准输入)时,因为输入条件只有在有输入信息的时候才成立,所以如果一直不输入,如果设置了超时时间,就会产生超时信息。
- 使用select实现服务器:
通过单执行流,就可处理多个客户端的连接
#pragma once
#include <iostream>
#include <string>
#include <sys/select.h>
#include <memory>
#include "Log.hpp"
#include "Socket.hpp"
using namespace Net_Work;
const static int gdefaultport = 8888;
const static int gbacklog = 8;
const static int num = sizeof(fd_set) * 8;
class SelectServer
{
private:
void HandlerEvent(fd_set &rfds)
{
for (int i = 0; i < num; i++)
{
if (_rfds_array[i] == nullptr)
continue;
// 合法的sockfd
// 读事件分两类,一类是新连接到来。 一类是新数据到来
int fd = _rfds_array[i]->GetSockFd();
if (FD_ISSET(fd, &rfds))
{
// 读事件就绪
if (fd == _listensock->GetSockFd())
{
lg.LogMessage(Info, "get a new link\n");
// 获取连接
std::string clientip;
uint16_t clientport;
// 不会阻塞!!,因为select已经检测到了listensock已经就绪了
Socket *sock = _listensock->AcceptConnection(&clientip, &clientport);
if (!sock)
{
lg.LogMessage(Error, "accept error\n");
continue;
}
lg.LogMessage(Info, "get a client, client info is# %s:%d, fd: %d\n", clientip.c_str(), clientport, sock->GetSockFd());
// 这里已经获取连接成功了,接下来怎么办???
// read?write?绝对不能!!!read 底层数据是否就绪时不确定的!谁清楚fd上面是否有读事件呢?select!
// 新链接fd到来的时候,要把新的fd, 想办法交给select托管 -- 只需要添加到数组_rfds_array中即可
int pos = 0;
for (; pos < num; pos++)
{
if (_rfds_array[pos] == nullptr)
{
_rfds_array[pos] = sock;
lg.LogMessage(Info, "get a new link, fd is : %d\n", sock->GetSockFd());
break;
}
}
if (pos == num)
{
sock->CloseSocket();
delete sock;
lg.LogMessage(Warning, "server is full...!\n");
}
}
else
{
// 普通的读事件就绪
// 读数据是有问题的
// 这一次读取不会被卡住吗?
std::string buffer;
bool res = _rfds_array[i]->Recv(&buffer, 1024);
if (res)
{
lg.LogMessage(Info, "client say# %s\n", buffer.c_str());
buffer += ": 你好呀,少年";
_rfds_array[i]->Send(buffer);
buffer.clear();
}
else
{
lg.LogMessage(Warning, "client quit, maybe close or error, close fd : %d\n", _rfds_array[i]->GetSockFd());
_rfds_array[i]->CloseSocket();
delete _rfds_array[i];
_rfds_array[i] = nullptr;
}
}
}
}
}
public:
SelectServer(int port = gdefaultport) : _port(port), _listensock(new TcpSocket()), _isrunning(false)
{
}
void InitServer()
{
_listensock->BuildListenSocketMethod(_port, gbacklog);
for (int i = 0; i < num; i++)
{
_rfds_array[i] = nullptr;
}
_rfds_array[0] = _listensock.get();
}
void Loop()
{
_isrunning = true;
while (_isrunning)
{
// 我们能不能直接accept新连接呢?不能!所有的fd,都要交给select. listensock上面新连接,相当于读事件,有新连接,就等价于有新数据到来
// 首先不能直接accept,而是将listensock交给select。因为只有select有资格知道有没有IO事件就绪
// 故意放在循环内部
// 遍历数组,1. 找最大的fd 2. 合法的fd添加到rfds集合中
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = _listensock->GetSockFd();
for (int i = 0; i < num; i++)
{
if (_rfds_array[i] == nullptr)
{
continue;
}
else
{
int fd = _rfds_array[i]->GetSockFd();
FD_SET(fd, &rfds); // 添加所有合法fd到rfds集合中
if (max_fd < fd) // 更新最大fd
{
max_fd = fd;
}
}
}
// 定义时间
struct timeval timeout = {0, 0};
// rfds本质是一个输入输出型参数,rfds是在select调用返回的时候,不断被修改,所以,每次都要重置
PrintDebug();
int n = select(max_fd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);
switch (n)
{
case 0:
lg.LogMessage(Info, "select timeout..., last time: %u.%u\n", timeout.tv_sec, timeout.tv_usec);
break;
case -1:
lg.LogMessage(Error, "select error!!!\n");
break;
default:
// 正常的就绪的fd
lg.LogMessage(Info, "select success, begin event handler, last time: %u.%u\n", timeout.tv_sec, timeout.tv_usec);
HandlerEvent(rfds); // _rfds_array: 3,4,5,6,7,8,9,10 -> rfds: 4,5,6
break;
}
}
_isrunning = false;
}
void Stop()
{
_isrunning = false;
}
void PrintDebug()
{
std::cout << "current select rfds list is : ";
for (int i = 0; i < num; i++)
{
if (_rfds_array[i] == nullptr)
continue;
else
std::cout << _rfds_array[i]->GetSockFd() << " ";
}
std::cout << std::endl;
}
~SelectServer()
{
}
private:
std::unique_ptr<Socket> _listensock;
int _port;
int _isrunning;
// select 服务器要被正确设计,需要程序员定义数据结构,来把所有的fd管理起来,往往是数组!
Socket *_rfds_array[num];
};
二、poll
(一)poll函数接口
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
传入pollfd数组首地址和元素个数,以实现内核中遍历fd
pollfd
结构:
struct pollfd {
int fd; // 文件描述符
short events; // 请求的事件集合
short revents; // 返回的事件集合
};
参数说明:
fds
:是一个poll
函数监听的结构列表,每一个元素中包含了文件描述符、监听的事件集合、返回的事件集合。nfds
:表示fds
数组的长度。timeout
:表示poll
函数的超时时间,单位是毫秒(ms)。
events
和revents
的取值:
事件 | 描述 | 是否可作为输入 | 是否可作为输出 |
---|---|---|---|
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP 带外数据 | 是 | 是 |
POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | TCP 连接被对方关闭,或者对方关闭了写操作。它由GNU引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP 事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 是 |
返回结果:
- 返回值小于0,表示出错。
- 返回值等于0,表示
poll
函数等待超时。 - 返回值大于0,表示
poll
由于监听的文件描述符就绪而返回。
(二)socket就绪条件
同select,见上文。读就绪、写就绪、异常就绪。
(三)poll的优点
- 不同于
select
使用三个位图来表示三个事件集合,poll
使用一个pollfd
的指针实现。 pollfd
结构包含了要监视的event
和发生的event
,不再使用select
“参数 - 值”传递的方式,分离了输入输出参数,接口使用比select
更方便。poll
并没有最大数量限制(但是数量过大后性能也是会下降)。
(四)poll的缺点
- 和
select
函数一样,poll
返回后,需要轮询pollfd
来获取就绪的描述符。 - 每次调用
poll
都需要把大量的pollfd
结构从用户态拷贝到内核中。 - 同时连接的大量客户端在某一时刻可能只有很少的处于就绪状态,随着监听的描述符数量增多,效率也会线性下降。
(五)poll使用示例
#pragma once
#include <iostream>
#include <string>
#include <poll.h>
#include <memory>
#include "Log.hpp"
#include "Socket.hpp"
using namespace Net_Work;
const static int gdefaultport = 8888;
const static int gbacklog = 8;
const int gnum = 1024;
class PollServer
{
private:
void HandlerEvent()
{
for (int i = 0; i < _num; i++)
{
if (_rfds[i].fd == -1)
continue;
// 合法的sockfd
// 读事件分两类,一类是新连接到来。 一类是新数据到来
int fd = _rfds[i].fd;
short revents = _rfds[i].revents;
if (revents & POLLIN)
{
// 新连接到来了
if (fd == _listensock->GetSockFd())
{
lg.LogMessage(Info, "get a new link\n");
// 获取连接
std::string clientip;
uint16_t clientport;
// 不会阻塞!!,因为select已经检测到了listensock已经就绪了
int sock = _listensock->AcceptConnection(&clientip, &clientport);
if (sock == -1)
{
lg.LogMessage(Error, "accept error\n");
continue;
}
lg.LogMessage(Info, "get a client, client info is# %s:%d, fd: %d\n", clientip.c_str(), clientport, sock);
// 这里已经获取连接成功了,接下来怎么办???
// read?write?绝对不能!!!read 底层数据是否就绪时不确定的!谁清楚fd上面是否有读事件呢?poll!
// 新链接fd到来的时候,要把新的fd, 想办法交给poll托管 -- 只需要添加到数组_rfds中即可
int pos = 0;
for (; pos < _num; pos++)
{
if (_rfds[pos].fd == -1)
{
_rfds[pos].fd = sock;
_rfds[pos].events = POLLIN;
lg.LogMessage(Info, "get a new link, fd is : %d\n", sock);
break;
}
}
if (pos == _num)
{
// 1. 扩容
// 2. 关闭
close(sock);
lg.LogMessage(Warning, "server is full...!\n");
}
}
else
{
// 普通的读事件就绪
// 读数据是有问题的
// 这一次读取不会被卡住吗?
char buffer[1024];
ssize_t n = recv(fd, buffer, sizeof(buffer-1), 0); // 这里读取会阻塞吗?不会!
if (n > 0)
{
buffer[n] = 0;
lg.LogMessage(Info, "client say# %s\n", buffer);
std::string message = "你好呀,少年, ";
message += buffer;
send(fd, message.c_str(), message.size(), 0);
}
else
{
lg.LogMessage(Warning, "client quit, maybe close or error, close fd : %d\n", fd);
close(fd);
// 取消poll的关心
_rfds[i].fd = -1;
_rfds[i].events = 0;
_rfds[i].revents = 0;
}
}
}
}
}
public:
PollServer(int port = gdefaultport) : _port(port), _listensock(new TcpSocket()), _isrunning(false), _num(gnum)
{
}
void InitServer()
{
_listensock->BuildListenSocketMethod(_port, gbacklog);
_rfds = new struct pollfd[_num];
for (int i = 0; i < _num; i++)
{
_rfds[i].fd = -1;
_rfds[i].events = 0;
_rfds[i].revents = 0;
}
// 最开始的时候,只有一个文件描述符, Listensock
_rfds[0].fd = _listensock->GetSockFd();
_rfds[0].events |= POLLIN;
}
void Loop()
{
_isrunning = true;
while (_isrunning)
{
// 定义时间
int timeout = -1;
// rfds本质是一个输入输出型参数,rfds是在select调用返回的时候,不断被修改,所以,每次都要重置
PrintDebug();
int n = poll(_rfds, _num, timeout);
switch (n)
{
case 0:
lg.LogMessage(Info, "poll timeout...\n");
break;
case -1:
lg.LogMessage(Error, "poll error!!!\n");
break;
default:
// 正常的就绪的fd
lg.LogMessage(Info, "select success, begin event handler\n");
HandlerEvent(); // _rfds_array: 3,4,5,6,7,8,9,10 -> rfds: 4,5,6
break;
}
}
_isrunning = false;
}
void Stop()
{
_isrunning = false;
}
void PrintDebug()
{
std::cout << "current poll fd list is : ";
for (int i = 0; i < _num; i++)
{
if (_rfds[i].fd == -1)
continue;
else
std::cout << _rfds[i].fd << " ";
}
std::cout << std::endl;
}
~PollServer()
{
delete[] _rfds;
}
private:
std::unique_ptr<Socket> _listensock;
int _port;
int _isrunning;
struct pollfd *_rfds;
int _num;
};
标签:转接,int,rfds,set,fd,poll,select
From: https://blog.csdn.net/weixin_73567058/article/details/141288217