首页 > 其他分享 >从read 系统调用到 C10M 问题

从read 系统调用到 C10M 问题

时间:2023-08-06 10:22:04浏览次数:50  
标签:C10M 调用 err read dg bufs aio io sockfd

一.前言

从上个世纪到现在,工程师们在优化服务器性能的过程中,提出了各种不同的io模型,比如非阻塞io,io复用,信号驱动式io,异步io。具体io模型在不同平台上的实现也不一样,比如io复用在bsd上可以由kqueue实现,在solaris系统上可以由/dev/poll实现。为了实现系统的可移植性,POSIX 确保 select和poll在 unix-like系统上得到广泛的支持。

在上个世纪,Dan Kegel 提出了C10K的设想,现在C10K 已经不是什么问题,比如nginx就可以做到百万级别的qps。于是又有人提出来了C10M的设想,Robert David Graham 从unix的最初设计初衷给出了自己的解决方案。

二.常见io模型

1.阻塞io

常见的read系统调用,是最常见的阻塞io:

2.非阻塞式io

非阻塞io的典型使用方式如下,设置非阻塞标志,并且常与io复用一起使用,使用起来比较复杂。

val = Fcntl(sockfd, F_GETFL, 0);
Fcntl(sockfd, F_SETFL, val | O_NONBLOCK); /* O_NONBLOCK 标志非阻塞 */

 

3.io 复用 (select/poll)

io复用在处理数量庞大的fd时非常有效,我们以select为例,select的核心api是select函数:

int select(int nfds, fd_set *_Nullable restrict readfds,
                  fd_set *_Nullable restrict writefds,
                  fd_set *_Nullable restrict exceptfds,
                  struct timeval *_Nullable restrict timeout);

看一个例子:

#include	"unp.h"

void
str_cli(FILE *fp, int sockfd)
{
	int			maxfdp1;
	fd_set		rset;
	char		sendline[MAXLINE], recvline[MAXLINE];

	FD_ZERO(&rset);
	for ( ; ; ) {
		FD_SET(fileno(fp), &rset); /* 设置要监听的socket fd */
		FD_SET(sockfd, &rset); /* 设置要监听的file fd */
		maxfdp1 = max(fileno(fp), sockfd) + 1;
		Select(maxfdp1, &rset, NULL, NULL, NULL); /* select 调用 */

		if (FD_ISSET(sockfd, &rset)) {	/* socket 可读 */
			if (Readline(sockfd, recvline, MAXLINE) == 0)
				err_quit("str_cli: server terminated prematurely");
			Fputs(recvline, stdout);
		}

		if (FD_ISSET(fileno(fp), &rset)) {  /* input 可读 */
			if (Fgets(sendline, MAXLINE, fp) == NULL)
				return;		/* all done */
			Writen(sockfd, sendline, strlen(sendline));
		}
	}
}

 

4.信号驱动式io

但凡涉及到信号的程序都比较复杂。要使用信号驱动式io,先开启socket的信号驱动式io功能,并通过sigaction 系统调用安装一个信号处理函数:

void
dg_echo(int sockfd_arg, SA *pcliaddr, socklen_t clilen_arg)
{
	int			i;
	const int	on = 1;
	sigset_t	zeromask, newmask, oldmask;

	sockfd = sockfd_arg;
	clilen = clilen_arg;

	for (i = 0; i < QSIZE; i++) {	/* init queue of buffers */
		dg[i].dg_data = Malloc(MAXDG);
		dg[i].dg_sa = Malloc(clilen);
		dg[i].dg_salen = clilen;
	}
	iget = iput = nqueue = 0;

	Signal(SIGHUP, sig_hup); /* 安装信号处理函数 */
	Signal(SIGIO, sig_io);
	Fcntl(sockfd, F_SETOWN, getpid()); /* 设置属主 */
	Ioctl(sockfd, FIOASYNC, &on); /* 开启信号驱动式io */
	Ioctl(sockfd, FIONBIO, &on); /* non-bloking */

	Sigemptyset(&zeromask);		/* init three signal sets */
	Sigemptyset(&oldmask);
	Sigemptyset(&newmask);
	Sigaddset(&newmask, SIGIO);	/* signal we want to block */

	Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
	for ( ; ; ) {
		while (nqueue == 0)
			sigsuspend(&zeromask);	/* wait for datagram to process */

			/* 4unblock SIGIO */
		Sigprocmask(SIG_SETMASK, &oldmask, NULL);

		Sendto(sockfd, dg[iget].dg_data, dg[iget].dg_len, 0,
			   dg[iget].dg_sa, dg[iget].dg_salen);

		if (++iget >= QSIZE)
			iget = 0;

			/* 4block SIGIO */
		Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
		nqueue--;
	}
}

 

5.异步io

我们来看一个aio的例子(由于aio的例子过于复杂,我们这里只截取部分关键代码):

for (i = 0; i < NBUF; i++) {
    switch (bufs[i].op) {
        case UNUSED:
            /*
				 * Read from the input file if more data
				 * remains unread.
				 */
            if (off < sbuf.st_size) {
                bufs[i].op = READ_PENDING;
                bufs[i].aiocb.aio_fildes = ifd;
                bufs[i].aiocb.aio_offset = off;
                off += BSZ;
                if (off >= sbuf.st_size)
                    bufs[i].last = 1;
                bufs[i].aiocb.aio_nbytes = BSZ;
                if (aio_read(&bufs[i].aiocb) < 0) /* aio_read */
                    err_sys("aio_read failed");
                aiolist[i] = &bufs[i].aiocb;
                numop++;
            }
            break;

        case READ_PENDING:
            if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS) /* aio_error */
                continue;
            if (err != 0) {
                if (err == -1)
                    err_sys("aio_error failed");
                else
                    err_exit(err, "read failed");
            }

            /*
				 * A read is complete; translate the buffer
				 * and write it.
				 */
            if ((n = aio_return(&bufs[i].aiocb)) < 0) /* 调用aio_return成功则 说明数据已经返回 */
                err_sys("aio_return failed");
            if (n != BSZ && !bufs[i].last)
                err_quit("short read (%d/%d)", n, BSZ);
            for (j = 0; j < n; j++)
                bufs[i].data[j] = translate(bufs[i].data[j]);
            bufs[i].op = WRITE_PENDING;
            bufs[i].aiocb.aio_fildes = ofd;
            bufs[i].aiocb.aio_nbytes = n;
            if (aio_write(&bufs[i].aiocb) < 0) /* aio_write */
                err_sys("aio_write failed");
            /* retain our spot in aiolist */
            break;

        case WRITE_PENDING:
            if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS) /* aio_error */
                continue;
            if (err != 0) {
                if (err == -1)
                    err_sys("aio_error failed");
                else
                    err_exit(err, "write failed");
            }

            /*
				 * A write is complete; mark the buffer as unused.
				 */
            if ((n = aio_return(&bufs[i].aiocb)) < 0)
                err_sys("aio_return failed");
            if (n != bufs[i].aiocb.aio_nbytes)
                err_quit("short write (%d/%d)", n, BSZ);
            aiolist[i] = NULL;
            bufs[i].op = UNUSED;
            numop--;
            break;
    }
}

 

6.同步和异步的分类

网络上对io同步和异步的争论很多,这里给出Stevens的分类标准: 

同步 阻塞io,非阻塞io,io复用,信号驱动式io
异步 异步io

 

三.C10K io策略

在上个世纪,Dan Kegel 提出了C10K的设想,即单机实现10k的并发量,主要提出了以下四种类型的解决方法:

服务器范式 例子 备注 软件实现
Serve many clients with each thread, and use nonblocking I/O(level-triggered) select, poll(posix), /dev/poll(solaris), kqueue(bsd) 轮询  
Serve many clients with each thread, and use nonblocking I/O (readiness change) kqueue(bsd), epoll(linux), Realtime Signals(linux) 事件通知 nginx, redis
Serve many clients with each server thread, and use asynchronous I/O aio 异步,没有得到广泛支持  
Serve one client with each server thread

LinuxThreads, Java threading support in JDK 1.3.x and earlier 

早期的java使用绿色线程

 
  • 在实现的过程中有诸多限制,比如打开fd的限制,创建thread数量的限制,根据不同内核而异。
  • 32 位系统,用户态的虚拟空间只有3G,如果创建线程时分配的栈空间是10M,那么一个进程最多只能创建300 个左右的线程。 64 位系统,用户态的虚拟空间大到有128T,理论上不会受虚拟内存大小的限制(10M个线程),而会受系统的参数或性能限制(线程上下文切换)。

四.C10M

Robert David Graham认为如果要解决C10M的问题,必须对unix内核进行改造。当下的unix系统的设计目标是为了满足非常广泛的需求,于是加上了许多通用的功能,比如进程管理,内存管理等等。C10M的问题不是通用的问题,需要自己处理数据控制,而不是依赖unix内核,而且需要做到packet scalability, multi-core scalability, memory scalability。

专项问题,需要特殊的解决方案。

五.总结

本文从常见io模型出发,梳理了高并发服务器可能涉及到的io模型,这些经典io模型在过去十年基本没有发生变化。了解这些底层技术对我们了解深入理解服务器是非常有必要的。

六.参考

《unix网络编程》

http://www.kegel.com/c10k.html#threads.java

http://highscalability.com/blog/2013/5/13/the-secret-to-10-million-concurrent-connections-the-kernel-i.html 

https://man7.org/linux/man-pages/man2/select.2.html                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          

标签:C10M,调用,err,read,dg,bufs,aio,io,sockfd
From: https://www.cnblogs.com/darcy-yuan/p/17603735.html

相关文章

  • 多个C文件混合编译,涉及函数相互调用,地址传递要注意的!
    tc.h#pragmaoncechar*fun();//main函数调用到这个函数,但是在其他.c中定义,在头文件申明下先tacc.c#include<stdio.h>char*fun(){printf("saDHAKJHFJ\n");inti=100;printf("i=%d\n",i);char*pr=(char*)malloc(100);*pr......
  • Exercise: rot13Reader
    rot13是英文字母加密里面CaesarCipher(其实就是移位加密)的一种特殊形式,简单来说就是把字母前后部分对调,其中a和n对调,以此类推。针对性的简单来做就是直接判断字母所在范围,然后视情况+/-13即可。但是rot13也可以用CaesarCipher的通用形式来做,移位的公式其实很简单,见下面代码中rot1......
  • Exercise: Readers
    这个练习说明是实现接口Read,返回一个无限的字母‘A’字符流。接口Read的调用,通过error为io.EOF来判定数据流结尾。那么往slice里面一直写‘A’,error里面一直写nil不就是没有io.EOF了,所以就是无限字母‘A’的字符流了。主要代码如下:1typeMyReaderstruct{}23func(MyReader......
  • python中如何实现链式调用
    Python中实现链式调用通常使用方法链(MethodChaining)技术。方法链可以通过在每个方法末尾返回实例本身(即self)来实现。如:classPerson:def__init__(self,name,age):self.name=nameself.age=agedefset_name(self,name):self.name......
  • 系统调用
    系统调用系统调用是什么:  系统调用是用户在编程时调用的操作系统功能。系统调用的作用:  系统调用是操作系统提供给编程人员的唯一接口;使CPU状态从用户态陷入内核态的唯一途径。典型系统调用举例:每个操作系统都提供几百种系统调用(进程控制、进程通信、文件使用、目录操作......
  • C# 如何调用C++ dll string类型返回
    这篇文章主要介绍了C# 如何调用C++ dll string类型返回问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教 −目录C#调用C++dllstring类型返回C++端:(定义返回数据为结构体Vector4)C#端:(接收返回的结构体Vector4)C#调用C++dll类型......
  • ITK在C++文件里面,可以这样调用开旁路的函数
    问题:如果直接在c++文件引入开旁路函数POM_AM__set_application_bypass,是编译不通过的(PS:好像是因为开旁路函数是用C写的,和C++不兼容,具体也不是很懂的,有懂的大佬,可以帮忙评论解答下) 解决方法:在c++文件前面加上这行extern"C"intPOM_AM__set_application_bypass(logicalbypa......
  • 函数(void *) 被谁调用了——图像采集卡经验总结
    一块图像采集卡上有两个CameraLink接口,程序里“采集卡”理解为:一个接口就是一个采集卡。即工控机上插一块,就是两个采集卡对象。【问题】函数(void*)被谁哪个采集卡调用了?下面通过IKap、Matrox、Silicon三个采集卡的案例来理解1、2、3、Windows的创建线程函数,LPVOID其实......
  • echo命令、read命令不换行用户输入
    一、echo输出提示#!/bin/bashecho"请输入要选择的数字(1-9):"readnum效果:我们的想法是,光标停在:号后,等待用户输入,结果光标跑下一行了。解决方法:在:后加入\c去掉回车#!/bin/bashecho"请输入要选择的数字(1-9):\c"readnum效果:二、直接用read的参数设置......
  • Go语言Http调用之Get、Post请求详解
    HTTP 调用需要通过 http 包里的 Client 结构体里的 Do 方法去实现,因此需要先声明一个 Client 结构体变量,该结构体可以设置超时时间等配置。对于一个请求里的 URL,查询参数,请求 method 等参数,需要 http 包里的 Request 结构体去封装。我们可以通过 NewRequestWith......