1. 如何理解 reactor
reactor 是一种设计模式。用于处理事件驱动的系统。
reactor 模式,主要有两个组件:
reactor 反应器:负责监听所有事件,当事件发生时,调用相应的处理程序。reactor 本身时一个事件循环,负责处理 I/O 事件。
handler 处理程序:处理特点类型的事件。当 reactor 接收到事件时,他会调用相应的处理程序来执行具体的业务逻辑。
使用 epoll IO 多路复用实现一个阻塞对象监听多路连接请求。
说明:
- Reactor 对象通过 epoll 监听客户端请求,收到事件后通过 Dispatch 进行分发;
- 如果是建立连接的请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后续业务处理;
- 如果不是连接事件,则 Reactor 回调用连接对应的 Handler 来响应。
- Headler 会完成 Read -> 业务处理 -> Send 的完成业务流程。
优点:
模型简单,没有多线程,不存在资源竞争的问题。
缺点:
性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。
当某个 Headler 阻塞时,会导致其他所有的 client 的 handler 都无法执行。
使用场景:
客户端数量有限,业务处理非常快速。
2. 实现单进程 reactor
#ifndef REACTOR_H
#define REACTOR_H
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#define MAX_EVENTS 10
#define BUFFER_SIZE 1024
typedef struct {
int fd;
void (*callback)(int fd, int events, void *arg);
int events;
void *arg;
} EventHandler;
typedef struct {
int server_fd;
int epoll_fd;
EventHandler *server_handler;
} TCPServer;
TCPServer *tcp_server_create(const char *ip, int port);
void dispathch(TCPServer *server);
void tcp_server_destroy(TCPServer *server);
static void handle_accept(int fd, int events, void *arg);
static void handle_read (int fd, int events, void *arg);
#endif /* REACTOR_H */
#include "reactor.h"
static void handle_accept(int fd, int events, void *arg) {
TCPServer *server = (TCPServer *)arg;
int client_fd;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
if (client_fd == -1) {
perror("Failed to accept connection");
return;
}
printf("Connection established with %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
EventHandler *client_handler = (EventHandler *)malloc(sizeof(EventHandler));
client_handler->fd = client_fd;
client_handler->events = EPOLLIN;
client_handler->callback = handle_read;
client_handler->arg = NULL;
epoll_ctl(server->epoll_fd, EPOLL_CTL_ADD, client_fd, &(struct epoll_event){EPOLLIN, {.ptr = client_handler}});
}
static void handle_read(int fd, int events, void *arg) {
char buffer[BUFFER_SIZE];
int bytes_received = recv(fd, buffer, BUFFER_SIZE, 0);
if (bytes_received <= 0) {
if (bytes_received == 0) {
printf("Connection closed by client\n");
} else {
perror("Failed to receive data from client");
}
close(fd);
free(arg);
return;
}
printf("Received data: %.*s\n", bytes_received, buffer);
// Echo back the received data
send(fd, buffer, bytes_received, 0);
}
TCPServer *tcp_server_create(const char *ip, int port) {
TCPServer *server = (TCPServer *)malloc(sizeof(TCPServer));
if (!server) {
perror("Failed to allocate memory for server");
return NULL;
}
// 创建 epoll 文件描述符
server->epoll_fd = epoll_create1(0);
if (server->epoll_fd == -1) {
perror("Failed to create epoll file descriptor");
free(server);
return NULL;
}
// 创建服务器 socket
server->server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server->server_fd == -1) {
perror("Failed to create server socket");
free(server);
return NULL;
}
// 设置服务器 socket 选项
int opt = 1;
if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) == -1) {
perror("Failed to set server socket options");
close(server->server_fd);
free(server);
return NULL;
}
// 设置服务器地址信息
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(ip);
server_addr.sin_port = htons(port);
// 绑定服务器 socket
if (bind(server->server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("Failed to bind server socket");
close(server->server_fd);
free(server);
return NULL;
}
// 监听服务器 socket
if (listen(server->server_fd, SOMAXCONN) == -1) {
perror("Failed to listen on server socket");
close(server->server_fd);
free(server);
return NULL;
}
// 注册服务器 socket 处理程序到 epoll
server->server_handler = (EventHandler *)malloc(sizeof(EventHandler));
server->server_handler->fd = server->server_fd;
server->server_handler->events = EPOLLIN;
server->server_handler->callback = handle_accept;
server->server_handler->arg = server;
epoll_ctl(server->epoll_fd, EPOLL_CTL_ADD, server->server_fd, &(struct epoll_event){EPOLLIN, {.ptr = server->server_handler}});
return server;
}
void dispathch(TCPServer *server) {
struct epoll_event events[MAX_EVENTS];
int event_count;
printf("Server started. Waiting for connections...\n");
while (1) {
event_count = epoll_wait(server->epoll_fd, events, MAX_EVENTS, -1);
if (event_count == -1) {
perror("Failed to wait for events");
break;
}
for (int i = 0; i < event_count; i++) {
EventHandler *handler = events[i].data.ptr;
if (handler->callback)
handler->callback(handler->fd, handler->events, handler->arg);
}
}
}
void tcp_server_destroy(TCPServer *server) {
close(server->server_fd);
close(server->epoll_fd);
free(server->server_handler);
free(server);
}
#include "reactor.h"
int main() {
TCPServer *server = tcp_server_create("127.0.0.1", 8888);
if (!server) {
fprintf(stderr, "Failed to create TCP server\n");
return EXIT_FAILURE;
}
dispathch(server);
tcp_server_destroy(server);
return 0;
}
标签:Reactor,epoll,单线程,模型,server,int,handler,fd,client
From: https://blog.csdn.net/A152419/article/details/137613788