网络I/O模型
同步I/O
阻塞I/O
一个基本的C/S模型如下图所图:其中 listen()、connect()、write()、read() 都是阻塞I/O,用户态的进程在执行这些操作时会陷入内核态并被挂起。直到 socket() 上有数据可读/写时,进程才会从内核态切换到用户态,并完成数据在内核态和用户态之间的复制。
int fd = open("file.txt", O_RDONLY);
char buffer[100];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer)); // 阻塞
close(fd);
非阻塞I/O
I/O多路复用
在获取事件时,先把我们要关心的连接传给内核,再由内核检测:
- 如果没有事件发生,线程只需阻塞在这个系统调用,而无需像前面的线程池方案那样轮训调用 read 操作来判断是否有数据。
- 如果有事件发生,内核会返回产生了事件的连接,线程就会从阻塞状态返回,然后在用户态中再处理这些连接对应的业务即可。
select
select() 函数是一个用于多路复用I/O的系统调用,允许程序同时监控多个文件描述符(如套接字、管道、终端等)的可读、可写或异常状态。
函数接口
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
其中:
- nfds: 整型变量,表示在所有集合中最大的文件描述符加1。这是为了告诉select()需要检查的文件描述符范围。实际上,你只需要提供所有被监控的文件描述符中的最大值加1即可。
- readfds: 指向fd_set结构的指针,用于存放待检查可读性的文件描述符集合。如果某个文件描述符在此集合中并且变为可读,select()返回后,该文件描述符在集合中的状态仍会被保持为已设置。
- writefds: 用于存放待检查可写性的文件描述符集合。
- exceptfds: 用于存放待检查异常条件的文件描述符集合。
- timeout: 指向timeval结构的指针,用来指定select()调用的最大等待时间。如果为NULL,select()将一直阻塞直到有文件描述符满足条件;如果不为NULL且值为(0, 0),则select()会立即返回,不进行等待;如果指定了具体的时间,则select()最多等待指定的时间。
返回值
-
正值: 返回的是已就绪的文件描述符的数量(包括可读、可写或异常的)。注意,这个值并不直接告诉你哪些描述符就绪,你需要通过FD_ISSET宏检查集合来确定具体哪些描述符准备好了。
-
0: 表示在指定的超时时间内没有文件描述符变为就绪状态。
-
-1: 表示调用出错,此时可以查看errno来获取具体的错误信息。
示例
void updateReadSet(std::unordered_set<int> &clientFds, int &maxFd, int sockFd, fd_set &readSet) {
maxFd = sockFd;
FD_ZERO(&readSet);
FD_SET(sockFd, &readSet);
for (const auto &clientFd : clientFds) {
if (clientFd > maxFd) {
maxFd = clientFd;
}
FD_SET(clientFd, &readSet);
}
}
void handlerClient(int clientFd) {
std::string msg;
if (not EchoServer::RecvMsg(clientFd, msg)) {
return;
}
EchoServer::SendMsg(clientFd, msg);
}
int main(int argc, char *argv[]) {
if (argc != 3) {
std::cout << "invalid input" << std::endl;
std::cout << "example: ./Select 0.0.0.0 1688" << std::endl;
return -1;
}
int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), false);
if (sockFd < 0) {
return -1;
}
int maxFd;
fd_set readSet;
EchoServer::SetNotBlock(sockFd);
std::unordered_set<int> clientFds;
while (true) {
updateReadSet(clientFds, maxFd, sockFd, readSet);
int ret = select(maxFd + 1, &readSet, NULL, NULL, NULL);
if (ret <= 0) {
if (ret < 0) perror("select failed");
continue;
}
for (int i = 0; i <= maxFd; i++) {
if (not FD_ISSET(i, &readSet)) {
continue;
}
if (i == sockFd) { // 监听的sockFd可读,则表示有新的链接
EchoServer::LoopAccept(sockFd, 1024, [&clientFds](int clientFd) {
clientFds.insert(clientFd); // 新增到要监听的fd集合中
});
continue;
}
handlerClient(i);
clientFds.erase(i);
close(i);
}
}
return 0;
}
poll
在 select() 中文件描述符集合是由一个大小为1024的位图实现的,为了支持监听更多的文件描述符,poll 结构体数组存放描述符集合。
struct pollfd {
int fd; // 文件描述符
short events; // 监听的事件
short revents; // 返回的事件
}
函数接口
#include <poll.h>
// nfds 指明fds数组的大小
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
示例
void updateFds(std::unordered_set<int> &clientFds, pollfd **fds, int &nfds) {
if (*fds != nullptr) {
delete[](*fds);
}
nfds = clientFds.size();
*fds = new pollfd[nfds];
int index = 0;
for (const auto &clientFd : clientFds) {
(*fds)[index].fd = clientFd;
(*fds)[index].events = POLLIN;
(*fds)[index].revents = 0;
index++;
}
}
void handlerClient(int clientFd) {
std::string msg;
if (not EchoServer::RecvMsg(clientFd, msg)) {
return;
}
EchoServer::SendMsg(clientFd, msg);
}
int main(int argc, char *argv[]) {
if (argc != 3) {
std::cout << "invalid input" << std::endl;
std::cout << "example: ./Poll 0.0.0.0 1688" << std::endl;
return -1;
}
int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), false);
if (sockFd < 0) {
return -1;
}
int nfds = 0;
pollfd *fds = nullptr;
std::unordered_set<int> clientFds;
clientFds.insert(sockFd);
EchoServer::SetNotBlock(sockFd);
while (true) {
updateFds(clientFds, &fds, nfds);
int ret = poll(fds, nfds, -1);
if (ret <= 0) {
if (ret < 0) perror("poll failed");
continue;
}
for (int i = 0; i < nfds; i++) {
if (not(fds[i].revents & POLLIN)) {
continue;
}
int curFd = fds[i].fd;
if (curFd == sockFd) {
EchoServer::LoopAccept(sockFd, 1024, [&clientFds](int clientFd) {
clientFds.insert(clientFd); // 新增到要监听的fd集合中
});
continue;
}
handlerClient(curFd);
clientFds.erase(curFd);
close(curFd);
}
}
return 0;
}
poll 和 select 的区别
- 数据结构和参数
- select: 使用固定大小的位图(fd_set)来表示文件描述符集合,上限为1024.
- poll: 使用动态数组(pollfd 结构体数组)来表示文件描述符集合。
- 性能
- select: 每次调用时,用户态的文件描述符集合需要复制到内核态。
- poll: 通过传递指针的方式避免了每次调用时复制整个文件描述符集合到内核。
epoll
原理:
-
事件注册:使用epoll_create创建一个epoll文件描述符,然后通过epoll_ctl向这个文件描述符注册事件(如读、写、错误等)和对应的socket描述符。
-
事件等待:通过epoll_wait函数等待一个或多个事件的发生。这个调用是阻塞的,直到至少有一个已注册的事件发生,或者超时,或者被中断。相比于select和poll,epoll_wait的优势在于它不会随着监听的文件描述符数量增加而导致效率下降,因为它内部维护了一个高效的红黑树结构来管理这些描述符。
-
事件处理:当epoll_wait返回时,会给出一个就绪事件的列表,应用可以直接对这些事件进行处理,而无需遍历所有监控的文件描述符。
水平触发和边缘触发:
- LT模式下,只要事件未被处理,每次调用epoll_wait都会返回该事件。
- ET模式下,事件仅在状态发生变化的那一刻返回一次,要求应用程序一次性处理完所有就绪的数据,否则可能会丢失事件。
示例
void handlerClient(int clientFd) {
std::string msg;
if (not EchoServer::RecvMsg(clientFd, msg)) {
return;
}
EchoServer::SendMsg(clientFd, msg);
}
int main(int argc, char *argv[]) {
if (argc != 3) {
std::cout << "invalid input" << std::endl;
std::cout << "example: ./Epoll 0.0.0.0 1688" << std::endl;
return -1;
}
int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), false);
if (sockFd < 0) {
return -1;
}
epoll_event events[2048];
int epollFd = epoll_create(1024);
if (epollFd < 0) {
perror("epoll_create failed");
return -1;
}
EchoServer::Conn conn(sockFd, epollFd, false);
EchoServer::SetNotBlock(sockFd);
EchoServer::AddReadEvent(&conn);
while (true) {
int num = epoll_wait(epollFd, events, 2048, -1);
if (num < 0) {
perror("epoll_wait failed");
continue;
}
for (int i = 0; i < num; i++) {
EchoServer::Conn *conn = (EchoServer::Conn *)events[i].data.ptr;
if (conn->Fd() == sockFd) {
EchoServer::LoopAccept(sockFd, 2048, [epollFd](int clientFd) {
EchoServer::Conn *conn = new EchoServer::Conn(clientFd, epollFd, false);
EchoServer::AddReadEvent(conn); // 监听可读事件,保持fd为阻塞IO
EchoServer::SetTimeOut(conn->Fd(), 0, 500000); // 设置读写超时时间为500ms
});
continue;
}
handlerClient(conn->Fd());
EchoServer::ClearEvent(conn);
delete conn;
}
}
return 0;
}
Reactor
Reactor 的意思是「反应器」,这里的反应指的是「对事件反应」,也就是来了一个事件,Reactor 就有相对应的反应/响应。
事实上,Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复用监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。
下图中,select 应为 epoll_wait!
Reactor 模式主要由 Reactor 和处理资源池这两个核心部分组成:
- Reactor 负责监听和分发事件,事件类型包含连接事件、读写事件;
- 处理资源池负责处理事件,如 read -> 业务逻辑 -> send;
Reactor 模式是灵活多变的,可以应对不同的业务场景,灵活在于:Reactor 的数量可以只有一个,也可以有多个;处理资源池可以是单个进程 / 线程,也可以是多个进程 /线程;
单 Reactor 单进程 / 线程;
进程里有 Reactor、Acceptor、Handler 这三个对象:
- Reactor 对象的作用是监听和分发事件;
- Acceptor 对象的作用是获取连接;
- Handler 对象的作用是处理业务;
处理流程:
- Reactor 对象通过 epoll_wait 监听事件,根据事件类型通过 dispatch 进行分发
- 连接建立的事件交由 Acceptor 对象进行处理,Acceptor 对象会通过 accept 方法 获取连接,并创建一个 Handler 对象来处理后续的响应事件;
- 如果不是连接建立事件, 则交由当前连接对应的 Handler 对象来进行响应;
- Handler 对象通过 read -> 业务处理 -> send 的流程来完成完整的业务流程。
缺点:
- 只有一个进程,无法充分利用 多核 CPU 的性能;
- Handler 对象在业务处理时,整个进程是无法处理其他连接的事件的,如果业务处理耗时比较长,那么就造成响应的延迟;
int main(int argc, char *argv[]) {
if (argc != 4) {
std::cout << "invalid input" << std::endl;
std::cout << "example: ./EpollReactorSingleProcess 0.0.0.0 1688 1" << std::endl;
return -1;
}
int sockFd = EchoServer::CreateListenSocket(argv[1], atoi(argv[2]), false);
if (sockFd < 0) {
return -1;
}
epoll_event events[2048];
int epollFd = epoll_create(1024);
if (epollFd < 0) {
perror("epoll_create failed");
return -1;
}
bool isMultiIo = (std::string(argv[3]) == "1");
EchoServer::Conn conn(sockFd, epollFd, isMultiIo);
EchoServer::SetNotBlock(sockFd);
EchoServer::AddReadEvent(&conn);
while (true) {
int num = epoll_wait(epollFd, events, 2048, -1);
if (num < 0) {
perror("epoll_wait failed");
continue;
}
for (int i = 0; i < num; i++) {
EchoServer::Conn *conn = (EchoServer::Conn *)events[i].data.ptr;
if (conn->Fd() == sockFd) {
EchoServer::LoopAccept(sockFd, 2048, [epollFd, isMultiIo](int clientFd) {
EchoServer::Conn *conn = new EchoServer::Conn(clientFd, epollFd, isMultiIo);
EchoServer::SetNotBlock(clientFd);
EchoServer::AddReadEvent(conn); // 监听可读事件
});
continue;
}
auto releaseConn = [&conn]() {
EchoServer::ClearEvent(conn);
delete conn;
};
if (events[i].events & EPOLLIN) { // 可读
if (not conn->Read()) { // 执行读失败
releaseConn();
continue;
}
if (conn->OneMessage()) { // 判断是否要触发写事件
EchoServer::ModToWriteEvent(conn); // 修改成只监控可写事件
}
}
if (events[i].events & EPOLLOUT) { // 可写
if (not conn->Write()) { // 执行写失败
releaseConn();
continue;
}
if (conn->FinishWrite()) { // 完成了请求的应答写,则可以释放连接
releaseConn();
}
}
}
}
return 0;
}
单 Reactor 多线程 / 进程;
- Reactor 对象通过 epoll_wait 监听事件,根据事件类型通过 dispatch 进行分发
- 如果是连接建立的事件,则交由 Acceptor 对象进行处理,Acceptor 对象会通过 accept 方法 获取连接,并创建一个 Handler 对象来处理后续的响应事件;
- 如果不是连接建立事件, 则交由当前连接对应的 Handler 对象来进行响应;
上面的三个步骤和单 Reactor 单线程方案是一样的,接下来的步骤就开始不一样了:
- Handler 对象不再负责业务处理,只负责数据的接收和发送,Handler 对象通过 read 读取到数据后,会将数据发给子线程里的 Processor 对象进行业务处理;
- 子线程里的 Processor 对象就进行业务处理,处理完后,将结果发给主线程中的 Handler 对象,接着由 Handler 通过 send 方法将响应结果发送给 client;
缺点:
因为一个 Reactor 对象承担所有事件的监听和响应,而且只在主线程中运行,在面对瞬间高并发的场景时,容易成为性能的瓶颈的地方
多 Reactor 多进程 / 线程;
- 线程中的 MainReactor 对象通过 epoll_wait 监控连接建立事件,收到事件后通过 Acceptor 对象中的 accept 获取连接,将新的连接分配给某个子线程;
- 子线程中的 SubReactor 对象将 MainReactor 对象分配的连接加入 select 继续进行监听,并创建一个 Handler 用于处理连接的响应事件。
- 如果有新的事件发生时,SubReactor 对象会调用当前连接对应的 Handler 对象来进行响应。
- Handler 对象通过 read -> 业务处理 -> send 的流程来完成完整的业务流程。
异步I/O
异步 I/O 「内核数据准备好」和「数据从内核态拷贝到用户态」这两个过程都不用等待。