首页 > 编程语言 >unix网络编程2.5——高并发服务器(五)epoll进阶篇——基于epoll实现reactor

unix网络编程2.5——高并发服务器(五)epoll进阶篇——基于epoll实现reactor

时间:2022-12-03 17:22:34浏览次数:58  
标签:item int epoll 进阶篇 unix fd blk ev buf

目录

系列文章

阅读本文需要先阅读下面的文章:

unix网络编程1.1——TCP协议详解(一)

unix网络编程2.1——高并发服务器(一)基础——io与文件描述符、socket编程与单进程服务端客户端实现

unix网络编程2.2——高并发服务器(二)多进程与多线程实现

unix网络编程2.3——高并发服务器(三)多路IO复用之select

unix网络编程2.4——高并发服务器(四)epoll基础篇

前言

服务器reactor模型

网络IO涉及的系统对象

  • 网络IO涉及的系统对象包括:用户空间调用IO的进程或线程内核空间的内核系统
  • 如下图所示,当IO操作read,它会经历两个阶段:
  1. 等待数据准备就绪(如果是阻塞IO则会阻塞在这里)
  2. 将数据从内核缓冲区拷贝到用户进程或线程自定义的buf中
    image

IO多路复用

  • 对于高并发编程,网络连接上的消息处理往往分为两个阶段:等待消息准备好消息处理。当使用默认的阻塞socket时(如上图的read),往往时把这两个阶段合二为一,这样操作socket的代码所在的线程就得睡眠来等待消息准备好,这导致了高并发下线程会频繁的睡眠、唤醒,从而影响了cpu的使用效率。
  • 高并发编程方法会把两个阶段分开处理(等待消息准备好消息处理)。这要求必须使用非阻塞的IO,否则,处理消息的代码段很容易导致条件不满足时,所在线程又进入了睡眠等待阶段,造成"死锁"。
  • 此时就需要在等待消息准备好阶段,有线程主动查询(这个线程还是需要睡眠进入阻塞态的),或者说让1个线程为所有连接等待:这就是IO多路复用。 它也可能睡眠等待,但是不要紧,因为它"一对多,可以监控所有连接"。这样,当线程被唤醒执行时,就一定是有一些连接准备好或者数据准备好了。(可以实现这个功能的接口代表为select和epoll,在之前的文章已经有所介绍,本文要介绍基于epoll实现reactor模型。)
  • 作为一个高性能服务器程序通常需要考虑处理三类事件:I/O 事件,定时事件及信号。两种 高效 的事件处理模型: Reactor 和 Proactor 。

reactor模型

  • 首先来回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程序,程序继续处理。 Reactor 释义 反应堆,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反, Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的时间发生, Reactor 将主动调用应用程序注册的接口,这些接口又称为 回调函数
  • Reactor 模式是处理并发I/O 比较常见的一种模式,用于同步I/O,中心思想是将所有要处理的I/O 事件注册到一个中心I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O 事件到来或是准备就绪(文件描述符或socket 可读、写),多路复用器返回并将事先注册的相应I/O 事件分发到对应的处理器中。

遇到的问题

1.数组下标越界

  • 解放办法:使用动态数组,更好的数据结构(下面会说)

2.socket: Too many open files fd的个数限制

image

  • 解决办法 ulimit 修改资源限制
    image

ulimit 无法反复修改,只能设置1次,重启后失效,可以修改配置文件

image

3.Cannot assign requested address 客户端端口资源不够

image

image

解决办法

何为连接(针对tcp而言)

  • 五元组: (src_ip, src_port, des_ip, des_port, 协议) 确定1个连接

解决办法1:查看(修改)资源上限

确认port_range范围
  • sudo sysctl -a | grep port_range
    image
  • 或 cat /proc/sys/net/ipv4/ip_local_port_range
32768 60999
修改配置
  • 从本机修改内核参数考虑:参考,上图可知port范围是60999-32768=28231个(第一次测试程序,客户端最多分配28231个,与其保持一致)
  • sudo vi /etc/sysctl.conf打开,再添加 net.ipv4.ip_local_port_range = 15000 60999
  • 再执行:sudo sysctl -p /etc/sysctl.conf,使生效
  • 查看是否生效:cat /proc/sys/net/ipv4/ip_local_port_range
15000 60999
这样能用的port范围:60999-15000 = 45999

解决办法2:改变五元组

  • 从实际可用的并发考虑:当出现"Cannot assign requested address"说明客户端端口资源不够,,如果我们有多张网卡,可以改变src_ip(此时客户端作为源),这样网卡的数量可以决定连接的个数,百万级并发很容易实现;同理,服务端也可以改变其ip或扩大监听的端口范围,总之就是改变连接五元组的其中之一。
    image

event_block 数据结构————reactor实现核心

  • 观察下图,要实现这样一个具有动态扩容能力的动态数组,有以下几个好处:
  1. 不用一次性malloc一个非常大的连续内存,而是用链表next指针的方式,内存不要求有一整片很大的,避免内存碎片冲击;
  2. 按照sock_item[1024]的维度去申请和管理内存,用多少申请多少;
  3. 具体的,假设有100w个客户端连接连接,需要100w个fd,对应100w个sock_item项,是循环去申请出来的,free的时候也是一片片去free的;
  4. sock_item中包括了:fd,rbuf,rlen,wbuf,wlen,events,callback,读写解耦等待消息处理消息(回调函数实现)分成了两个阶段,便于业务实现。
    image

啥时候需要这种数据结构

  • 数组->"链表数组",block块,为什么这里用这个而不是红黑树
  • 这种数据结构什么时候适用:用来做存储,查找的时间复杂度,存储的空间复杂度
  • 存储有序、有规律的数据 key / 取余 1024等

测试并发(客户端不断连接)源码

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>

#define MAX_BUFFER		128
#define MAX_EPOLLSIZE	(384*1024)
#define MAX_PORT		1

#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

int isContinue = 0;

static int ntySetNonblock(int fd) {
	int flags;

	flags = fcntl(fd, F_GETFL, 0);
	if (flags < 0) return flags;
	flags |= O_NONBLOCK;
	if (fcntl(fd, F_SETFL, flags) < 0) return -1;
	return 0;
}

static int ntySetReUseAddr(int fd) {
	int reuse = 1;
	return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}



int main(int argc, char **argv) {
	if (argc <= 2) {
		printf("Usage: %s ip port\n", argv[0]);
		exit(0);
	}

	const char *ip = argv[1];
	int port = atoi(argv[2]);
	int connections = 0;
	char buffer[128] = {0};
	int i = 0, index = 0;

	struct epoll_event events[MAX_EPOLLSIZE];
	
	int epoll_fd = epoll_create(MAX_EPOLLSIZE);
	
	strcpy(buffer, " Data From MulClient\n");
		
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(struct sockaddr_in));
	
	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = inet_addr(ip);

	struct timeval tv_begin;
	gettimeofday(&tv_begin, NULL);

	while (1) {
		if (++index >= MAX_PORT) index = 0;
		
		struct epoll_event ev;
		int sockfd = 0;

		if (connections < 340000 && !isContinue) {
			sockfd = socket(AF_INET, SOCK_STREAM, 0);
			if (sockfd == -1) {
				perror("socket");
				goto err;
			}

			//ntySetReUseAddr(sockfd);
			addr.sin_port = htons(port+index);

			if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
				perror("connect");
				goto err;
			}
			ntySetNonblock(sockfd);
			ntySetReUseAddr(sockfd);

			sprintf(buffer, "Hello Server: client --> %d\n", connections);
			send(sockfd, buffer, strlen(buffer), 0);

			ev.data.fd = sockfd;
			ev.events = EPOLLIN | EPOLLOUT;
			epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
		
			connections ++;
		}
		//connections ++;
		if (connections % 1000 == 999 || connections >= 340000) {
			struct timeval tv_cur;
			memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
			
			gettimeofday(&tv_begin, NULL);

			int time_used = TIME_SUB_MS(tv_begin, tv_cur);
			printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);

			int nfds = epoll_wait(epoll_fd, events, connections, 100);
			for (i = 0;i < nfds;i ++) {
				int clientfd = events[i].data.fd;

				if (events[i].events & EPOLLOUT) {
					sprintf(buffer, "data from %d\n", clientfd);
					send(sockfd, buffer, strlen(buffer), 0);
				} else if (events[i].events & EPOLLIN) {
					char rBuffer[MAX_BUFFER] = {0};				
					ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
					if (length > 0) {
						printf(" RecvBuffer:%s\n", rBuffer);

						if (!strcmp(rBuffer, "quit")) {
							isContinue = 0;
						}
						
					} else if (length == 0) {
						printf(" Disconnect clientfd:%d\n", clientfd);
						connections --;
						close(clientfd);
					} else {
						if (errno == EINTR) continue;

						printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
						close(clientfd);
					}
				} else {
					printf(" clientfd:%d, errno:%d\n", clientfd, errno);
					close(clientfd);
				}
			}
		}

		usleep(500);
	}

	return 0;

err:
	printf("error : %s\n", strerror(errno));
	return 0;
	
}

服务端源码server.c

#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <ctype.h>
#include <unistd.h>
#include <stdio.h>
// struct sockaddr_in对应的头文件 <arpa/inet.h> 
#include <arpa/inet.h>  
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <errno.h> 
#include <fcntl.h>

#define SERVER_PORT 9999
#define PORT_COUNT 100
#define MAX_EVENTS 128
#define BUFF_SIZE 128

#define ITEMS_LEN 1024

extern int errno;

/*
* 待实现:
* 1.listen_fd和conn_fd的sock_item有没有区别?
* 2.假色有100w的sock_item,如何快速插入和查找?
*/

// TODO: listen_fd + conn_fd
typedef struct sock_item {
    int fd;
    
    char *r_buf;
    int r_len;

    char *w_buf;
    int w_len;

    int event; // fd对应的事件
    void (*recv)(int fd, char *buf, int len);
    void (*send)(int fd, char *buf, int len);

    void (*accept)(int fd);
} sock_item;

/*
* 使用场景:查找,有序,有规律
* 1.查找时间复杂度?
* 2.存储空间复杂度?
*/
typedef struct event_block {
    sock_item *items;
    struct event_block *next;
} event_block;

// TODO
typedef struct reactor {
    int epfd;
    int blkcnt;
    event_block *ev_blk;
} reactor;

/* new event_block */
int ReactorResize(reactor *r)
{
    /* 不能做头插法,因为是顺序增加的 */ 
    if (r == NULL) return -1;

    event_block *ev_blk = r->ev_blk;
    while (ev_blk != NULL && ev_blk->next != NULL) {
        ev_blk = ev_blk->next; // 找到最后一个节点,后面会用尾插法,插入新的节点
    }

    /******************** 申请 sock_item ********************/
    sock_item *item = (sock_item *)malloc(ITEMS_LEN * sizeof(sock_item));
    if (item == NULL) return -1;
    memset(item, 0, ITEMS_LEN * sizeof(sock_item));

    /******************** 申请 event_block ********************/
    event_block *new_ev_blk = (event_block *)malloc(sizeof(event_block));
    if (new_ev_blk == NULL) {
        free(item); // 注意异常情况,前面的内存也要回收
        return -1;
    }
    new_ev_blk->items = item; 
    new_ev_blk->next = NULL;

    if (ev_blk == NULL) {
        r->ev_blk = new_ev_blk; // 这种情况一般是第一次申请内存,头节点还是NULL
        r->ev_blk = new_ev_blk; // 这种情况一般是第一次申请内存,头节点还是NULL
    } else {
        ev_blk->next = new_ev_blk; // 尾插法
    }

    r->blkcnt++;
    return 0;
}

sock_item* ReactorLookUp(reactor *r, int socket_fd)
{
    if (r == NULL) return NULL;
    // if (r->ev_blk == NULL) return NULL;
    if (socket_fd <= 0) return NULL;

    int blk_idx = socket_fd / ITEMS_LEN;
    /******************** 调用 ReactorResize 每1024个申请一个ev_blk块 ********************/
    while (blk_idx >= r->blkcnt) {
        ReactorResize(r);
    }

    int i = 0;
    event_block *blk = r->ev_blk;
    while (i++ < blk_idx && blk != NULL) {
        blk = blk->next; // ReactorResize种保证blk非NULL
    }

    return &blk->items[socket_fd % ITEMS_LEN];
}

void SetNonBlocking(int fd)
{
    /* 设置fd为non-blocking*/
    int flag = fcntl(fd, F_GETFL, 0);
    flag |= O_NONBLOCK;
    fcntl(fd, F_SETFD, flag);
}

int InitServer(short port)
{
    int listen_fd;
    struct sockaddr_in server_addr;
    
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);

    listen_fd = socket(AF_INET, SOCK_STREAM, 0);
#if 1
    SetNonBlocking(listen_fd);
#endif

    bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));
    listen(listen_fd, 10);
    return listen_fd;
}

int IsListenFd(int *listen_fds, int fd)
{
    for (int i = 0; i < PORT_COUNT; i++) {
        if (listen_fds[i] == fd) {
            return 1;
        }
    }
    return 0;
}

int main(void)
{
    int conn_fd, client_fd;
    int i, n, res;
    /******************** epoll init ********************/
    reactor *r = (reactor *)calloc(1, sizeof(reactor));
    if (r == NULL) return -1;
    r->blkcnt = 0;
    r->epfd = epoll_create(1); // 红黑树头节点
    int n_ready;
    struct epoll_event ev, events[MAX_EVENTS];
#if 0
    r->items = (sock_item *)calloc(MAX_EVENTS, sizeof(sock_item));
    if (r->items == NULL) return -1;
#endif

    /******************** 连接五元组 绑定多个port ********************/
    int listen_fds[PORT_COUNT];
    for (i = 0; i < PORT_COUNT; i++) {
        listen_fds[i] = InitServer(SERVER_PORT + i);
        ev.data.fd = listen_fds[i];
        ev.events = EPOLLIN; // 监听listen_fd的读事件
        res = epoll_ctl(r->epfd, EPOLL_CTL_ADD, listen_fds[i], &ev);
        if (res == -1) return -1;
    }

    struct sockaddr_in client_addr; 
    int client_addr_len = sizeof(client_addr);

    /******************** 端口复用 ********************/
#if 0
    int opt = 1;
    setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
#endif

    for (i = 0; i < PORT_COUNT; i++) {
        
    }

    while (1) {
        n_ready = epoll_wait(r->epfd, events, MAX_EVENTS, -1); // 阻塞监听
        
        for (i = 0; i < n_ready; i++) {
            client_fd = events[i].data.fd;
            /**************************************** 建立连接阶段 ****************************************/
            if (IsListenFd(listen_fds, client_fd)) {
                conn_fd = accept(client_fd, (struct sockaddr *)&client_addr, &client_addr_len);
#if 1
                SetNonBlocking(conn_fd);
#endif
                /* 监听conn_fd的读事件 */ 
                ev.data.fd = conn_fd;
                ev.events = EPOLLIN; 
                res = epoll_ctl(r->epfd, EPOLL_CTL_ADD, conn_fd, &ev);
                if (res == -1) return -1;
#if 0
                /* 为conn_fd申请内存 */
                r->items[conn_fd].fd = conn_fd;
                r->items[conn_fd].r_buf = calloc(1, BUFF_SIZE);
                r->items[conn_fd].r_len = 0;
                r->items[conn_fd].w_buf = calloc(1, BUFF_SIZE);
                r->items[conn_fd].w_len = 0;
                r->items[conn_fd].event = EPOLLIN;
#else
                sock_item *item = ReactorLookUp(r, conn_fd);
                if (item == NULL) return -1;
                item->fd = conn_fd;
                item->r_buf = calloc(1, BUFF_SIZE);
                item->r_len = 0;
                item->w_buf = calloc(1, BUFF_SIZE);
                item->w_len = 0;
#endif
            } else {
                /**************************************** 数据传输阶段 ****************************************/
                
                if (events[i].events & EPOLLIN) {
                    sock_item *item = ReactorLookUp(r, client_fd);
                    if (item == NULL) return -1;
                    char *r_buf = item->r_buf;
                    char *w_buf = item->w_buf;
                    memset(r_buf, 0, BUFF_SIZE);
                    memset(w_buf, 0, BUFF_SIZE);
                    n = recv(client_fd, r_buf, BUFF_SIZE, 0);
                    if (n == 0) {
                        // 客户端断开了连接
                        free(r_buf);
                        free(w_buf);
                        // epoll_ctl(r->epfd, EPOLL_CTL_DEL, client_fd, NULL);
                        close(client_fd);
                    } else if (n > 0) {
                        printf("收到了客户端发来的数据, n=%d, buf=%s\n", n, r_buf);
                        memcpy(w_buf, r_buf, BUFF_SIZE);

                        /* 读事件->写事件 */ 
                        ev.data.fd = client_fd;
                        ev.events = EPOLLOUT; 
                        res = epoll_ctl(r->epfd, EPOLL_CTL_MOD, client_fd, &ev);
                    } else {
                        return -1;
                    }
                } else if (events[i].events & EPOLLOUT) {
                    sock_item *item = ReactorLookUp(r, client_fd);
                    if (item == NULL) return -1;
                    char *w_buf = item->w_buf;
                    n = send(client_fd, w_buf, BUFF_SIZE, 0);
                    printf("给客户端回的数据..., n=%d, buf=%s\n", n, w_buf);

                    /* 读事件->写事件 */ 
                    ev.data.fd = client_fd;
                    ev.events = EPOLLIN;
                    res = epoll_ctl(r->epfd, EPOLL_CTL_MOD, client_fd, &ev);
                }
            }
        }

    }
    return 0;
}

标签:item,int,epoll,进阶篇,unix,fd,blk,ev,buf
From: https://www.cnblogs.com/kongweisi/p/16930973.html

相关文章

  • dos2unix 安装和使用
    dos2unix是将Windows格式文件转换为Unix、Linux格式的实用命令。Windows格式文件的换行符为rn,而Unix&Linux文件的换行符为n。dos2unix命令其实就是将文件中的rn转换为n......
  • select、poll、epoll之间的区别
    (1)select==>时间复杂度O(n)它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们......
  • Learn by testing: Unix socket 如何通信?
    本文参考communicatewithunixsockets做了一些简单的测试,了解unixsocket是如何通信的。创建一个tcpsocket创建一个unixsocket,命令不会返回,会一直等待:#nc-U/tmp......
  • 使用epoll实现单进程、单线程非阻塞Tcp服务端
    importreimportsocketimportselectdefrecv_msg(tcp_socket,recv_data):requests=recv_data.splitlines()print(requests)file_name=""ret=re.match(r"[/......
  • unix网络编程2.4——高并发服务器(四)epoll基础篇
    目录前置文章unix网络编程1.1——TCP协议详解(一)unix网络编程2.1——高并发服务器(一)基础——io与文件描述符、socket编程与单进程服务端客户端实现unix网络编程2.2——高并......
  • mysql 进阶篇
    Mysql体系结构分为连接层,服务层,引擎层(索引在这一层),存储层存储引擎就是存储数据、建立索引、更新/查询数据等技术的实现方式。存储引擎是基于表而不是基于库的,所以存储引......
  • LINUX和UNIX主要发行版本
    UNIX版本操作系统公司硬件平台AIXIBMPowerPCHP-UXHPPA-RISCSolarisSunSPARCLINUXRedHatLINUX等等IA等 LINUX内核版本Linux内核官网:www.kernel.org......
  • epoll 通俗理解
    1.首先创建一个ip、port这就是自己的地址  (酒店地址)2.创建一个监听socket,将这个socket与步骤1的地址绑定起来 (可以理解为酒店的门岗?)3.epoll_create......
  • 《Unix/Linuv系统编程》第十四章学习笔记
    第14章MYSQL数据库系统MYSQL简介1.MySQL是一个关系型数据库管理系统,由瑞典MySQLAB公司开发,属于Oracle旗下产品。MySQL是最流行的关系型数据库管理系统之一,在WEB......
  • 《Unix/Linux系统编程》第十二周学习笔记
    《Unix/Linux系统编程》第十二周学习笔记MySQL数据库简介MySQL是一个关系型数据库管理系统,是最流行的关系型数据库管理系统之一。在WEB应用方面,MySQL是最好的RDBMS......