多路复用技术
在电信和计算机网络中,多路复用(有时简称为多路复用)是一种通过共享介质将多个模拟或数字信号组合成一个信号的方法。其目的是共享一种稀缺资源——一种物理传输介质。例如,在电信中,可以使用一根电线进行多个电话呼叫。多路复用起源于 1870 年代的电报,现在广泛应用于通信中。在电话领域,George Owen Squier 在 1910 年开发了电话载波复用技术。
复用信号通过诸如电缆的通信信道传输。多路复用将通信信道的容量划分为若干个逻辑信道,一个用于要传输的每个消息信号或数据流。一个称为解复用的反向过程在接收端提取原始通道。
执行多路复用的设备称为多路复用器(MUX),执行相反过程的设备称为解复用器(DEMUX 或 DMX)。
反向多路复用(IMUX)的目的与多路复用相反,即将一个数据流分解为多个流,在多个通信通道上同时传输它们,并重新创建原始数据流。
在计算中,I/O 多路复用也可用于指代处理来自单个事件循环的多个输入/输出 事件的概念,系统调用如 poll 和 select (Unix)。
网络多路复用
http 2.0 tcp/ip 连接的多路复用 -> 对成本比较大的 tcp/ip 连接进行复用
文件 io 的多路复用
I/O 多路复用是一种告诉内核我们希望在一个或多个 I/O 条件准备好时收到通知的能力,例如输入准备好被读取,或者描述符能够获取更多输出。
什么场景下使用 I/O 多路复用:
- 当客户端处理多个描述符(如标准输入和网络套接字)时
- 当客户端同时处理多个套接字时
- 当 TCP 服务器同时处理监听及其已连接的套接字时
- 当服务器同时处理 TCP 和 UDP 时
- 当服务器处理多个服务或多个协议时
I/O 模型
- 阻塞 I/O
- 非阻塞 I/O
- I/O 多路复用(select, poll, epoll)
- 信号驱动 I/O(SIGIO)
- 异步 I/O(POSIX 的 aio_系列函数)
任何输入操作都有两个阶段
- 等待数据准备好
- 从内核向进程复制数据
使用 read 系统调用得到 fd 中的相关状态
fdlist.add(connfd);
while(1) {
for(fd <-- fdlist) {
// 系统调用成本很大
if(read(fd) != -1) {
doSomeThing();
}
}
}
select
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/select.h>
const static int MAXLINE = 1024;
const static int SERV_PORT = 10001;
int main()
{
int i , maxi , maxfd, listenfd , connfd , sockfd ;
/*nready 描述字的数量*/
int nready ,client[FD_SETSIZE];
int n ;
/*创建描述字集合,由于select函数会把未有事件发生的描述字清零,所以我们设置两个集合*/
fd_set rset , allset;
char buf[MAXLINE];
socklen_t clilen;
struct sockaddr_in cliaddr , servaddr;
/*创建socket*/
listenfd = socket(AF_INET , SOCK_STREAM , 0);
/*定义sockaddr_in*/
memset(&servaddr , 0 ,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(listenfd, (struct sockaddr *) & servaddr , sizeof(servaddr));
listen(listenfd , 100);
/*listenfd 是第一个描述字*/
/*最大的描述字,用于select函数的第一个参数*/
maxfd = listenfd;
/*client的数量,用于轮询*/
maxi = -1;
/*init*/
for(i=0 ;i<FD_SETSIZE ; i++)
client[i] = -1;
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
for (;;)
{
rset = allset;
/*只select出用于读的描述字,阻塞无timeout*/
nready = select(maxfd+1 , &rset , NULL , NULL , NULL);
if(FD_ISSET(listenfd,&rset))
{
clilen = sizeof(cliaddr);
connfd = accept(listenfd , (struct sockaddr *) & cliaddr , &clilen);
/*寻找第一个能放置新的描述字的位置*/
for (i=0;i<FD_SETSIZE;i++)
{
if(client[i]<0)
{
client[i] = connfd;
break;
}
}
/*找不到,说明client已经满了*/
if(i==FD_SETSIZE)
{
printf("Too many clients , over stack .\n");
return -1;
}
FD_SET(connfd,&allset);//设置fd
/*更新相关参数*/
if(connfd > maxfd) maxfd = connfd;
if(i>maxi) maxi = i;
if(nready<=1) continue;
else nready --;
}
for(i=0 ; i<=maxi ; i++)
{
if (client[i]<0) continue;
sockfd = client[i];
if(FD_ISSET(sockfd,&rset))
{
n = read(sockfd , buf , MAXLINE);
if (n==0)
{
/*当对方关闭的时候,server关闭描述字,并将set的sockfd清空*/
close(sockfd);
FD_CLR(sockfd,&allset);
client[i] = -1;
}
else
{
buf[n]='\0';
printf("Socket %d said : %s\n",sockfd,buf);
write(sockfd,buf,n); //Write back to client
}
nready --;
if(nready<=0) break;
}
}
}
return 0;
}
poll
和 select 的主要区别就是,去掉了 select 只能监听 1024 个文件描述符的限制
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <poll.h>
/*环境为ubuntu10.04自带c环境,无法自动引入下列宏,所以自己写在前面了*/
#define INFTIM -1
#define POLLRDNORM 0x040 /* Normal data may be read. */
#define POLLRDBAND 0x080 /* Priority data may be read. */
#define POLLWRNORM 0x100 /* Writing now will not block. */
#define POLLWRBAND 0x200 /* Priority data may be written. */
#define MAXLINE 1024
#define OPEN_MAX 16 //一些系统会定义这些宏
#define SERV_PORT 10001
int main()
{
int i , maxi ,listenfd , connfd , sockfd ;
int nready;
int n;
char buf[MAXLINE];
socklen_t clilen;
struct pollfd client[OPEN_MAX];
struct sockaddr_in cliaddr , servaddr;
listenfd = socket(AF_INET , SOCK_STREAM , 0);
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(listenfd , (struct sockaddr *) & servaddr, sizeof(servaddr));
listen(listenfd,10);
client[0].fd = listenfd;
client[0].events = POLLRDNORM;
for(i=1;i<OPEN_MAX;i++)
{
client[i].fd = -1;
}
maxi = 0;
for(;;)
{
nready = poll(client,maxi+1,INFTIM);
if (client[0].revents & POLLRDNORM)
{
clilen = sizeof(cliaddr);
connfd = accept(listenfd , (struct sockaddr *)&cliaddr, &clilen);
for(i=1;i<OPEN_MAX;i++)
{
if(client[i].fd<0)
{
client[i].fd = connfd;
client[i].events = POLLRDNORM;
break;
}
}
if(i==OPEN_MAX)
{
printf("too many clients! \n");
}
if(i>maxi) maxi = i;
nready--;
if(nready<=0) continue;
}
for(i=1;i<=maxi;i++)
{
if(client[i].fd<0) continue;
sockfd = client[i].fd;
if(client[i].revents & (POLLRDNORM|POLLERR))
{
n = read(client[i].fd,buf,MAXLINE);
if(n<=0)
{
close(client[i].fd);
client[i].fd = -1;
}
else
{
buf[n]='\0';
printf("Socket %d said : %s\n",sockfd,buf);
write(sockfd,buf,n); //Write back to client
}
nready--;
if(nready<=0) break; //no more readable descriptors
}
}
}
return 0;
}
epoll
-
select 调用需要传入 fd 数组,需要拷贝一份到内核,高并发场景下这样的拷贝消耗的资源是惊人的。(可优化为不复制)
-
select 在内核层仍然是通过遍历的方式检查文件描述符的就绪状态,是个同步过程,只不过无系统调用切换上下文的开销。(内核层可优化为异步事件通知)
-
select 仅仅返回可读文件描述符的个数,具体哪个可读还是要用户自己遍历。(可优化为只返回给用户就绪的文件描述符,无需用户做无效的遍历)
-
内核中保存一份文件描述符集合,无需用户每次都重新传入,只需告诉内核修改的部分即可。
-
内核不再通过轮询的方式找到就绪的文件描述符,而是通过异步 IO 事件唤醒。
-
内核仅会将有 IO 事件的文件描述符返回给用户,用户也无需遍历整个文件描述符集合。
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <poll.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/resource.h>
#define MAXLINE 1024
#define OPEN_MAX 16 //一些系统会定义这些宏
#define SERV_PORT 10001
int main()
{
int i , maxi ,listenfd , connfd , sockfd ,epfd, nfds;
int n;
char buf[MAXLINE];
struct epoll_event ev, events[20];
socklen_t clilen;
struct pollfd client[OPEN_MAX];
struct sockaddr_in cliaddr , servaddr;
listenfd = socket(AF_INET , SOCK_STREAM , 0);
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(listenfd , (struct sockaddr *) & servaddr, sizeof(servaddr));
listen(listenfd,10);
epfd = epoll_create(256);
ev.data.fd=listenfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
for(;;)
{
nfds=epoll_wait(epfd,events,20,500);
for(i=0; i<nfds; i++)
{
if (listenfd == events[i].data.fd)
{
clilen = sizeof(cliaddr);
connfd = accept(listenfd , (struct sockaddr *)&cliaddr, &clilen);
if(connfd < 0)
{
perror("connfd < 0");
exit(1);
}
ev.data.fd=connfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
}
else if (events[i].events & EPOLLIN)
{
if ( (sockfd = events[i].data.fd) < 0)
continue;
n = recv(sockfd,buf,MAXLINE,0);
if (n <= 0)
{
close(sockfd);
events[i].data.fd = -1;
}
else
{
buf[n]='\0';
printf("Socket %d said : %s\n",sockfd,buf);
ev.data.fd=sockfd;
ev.events=EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,connfd,&ev);
}
}
else if(events[i].events & EPOLLOUT)
{
sockfd = events[i].data.fd;
send(sockfd, "Hello!", 7, 0);
ev.data.fd=sockfd;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
}
else
{
printf("This is not avaible!");
}
}
}
close(epfd);
return 0;
}
下次分享主题: epoll 的实现原理
参考
https://zhuanlan.zhihu.com/p/470778284
https://zhuanlan.zhihu.com/p/530567976
https://en.wikipedia.org/wiki/Multiplexing
https://www.cnblogs.com/bugutian/p/4816764.html
https://www.ithome.com/0/644/835.htm
unix 网络编程