用C/C++构建自己的Redis——第六章、事件循环和计时器
文章目录
前言
这一章我们将一起学习如何在服务器中实现超时机制来处理空闲TCP连接。
一、超时和计时器
到目前为止,我们的Redis缺少一个重要的处理:超时处理。在网络应用程序中,处理超时是非常必要的,因为网络的另一端可能会突然断开连接,导致通信中断。不仅仅是正在进行的输入/输出(IO)操作(如读取和写入)需要设置超时,而且主动断开空闲的TCP连接。因为空闲连接可能会占用服务器资源,而没有实际的通信发生。
为了实现超时功能,需要修改事件循环(event loop)。事件循环是程序中负责处理事件和任务调度的部分。作者提到,必须修改事件循环,因为当前的事件循环是阻塞的,它只处理poll操作,而poll是一种等待IO操作完成的机制,它不会自动处理超时。
当前事件循环代码:
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), 1000);
在网络编程中,由于网络的不稳定性,一个连接可能随时会断开,因此需要有一种机制来处理这种超时情况。这段内容主要涉及以下几个关键点:
-
poll系统调用和超时:poll是一个系统调用,用于等待一组文件描述符(file descriptors)中的一个或多个变得可读或可写。这个调用接受一个超时参数,这个参数限制了poll调用在等待文件描述符状态改变之前可以阻塞的最长时间。在这段代码中,超时值被任意设置为1000毫秒。
-
根据计时器设置超时:如果我们根据计时器来设置超时值,poll应该在计时器到期时或之前唤醒,这样我们就有机会及时触发计时器。
-
处理多个计时器:如果程序中有多个计时器,那么poll的超时值应该是所有计时器中最接近当前时间的那个。为了找到最近的计时器,需要某种数据结构。堆(heap)数据结构是寻找最小/最大值的流行选择,通常用于此类目的。此外,任何用于排序的数据结构也可以使用,例如,可以使用AVL树来排序计时器,并可能扩展树以跟踪最小值。
-
添加计时器以结束空闲的TCP连接:对于每个TCP连接,都有一个计时器,设置为固定的时间超时到未来。每次连接上有IO活动时,计时器就会更新为固定的时间超时。
-
计时器更新和数据结构简化:当我们更新一个计时器时,它变成了最远的一个。因此,可以利用这个事实来简化数据结构;一个简单的链表就足以保持计时器的顺序:新的或更新的计时器简单地放在列表的末尾,列表保持排序顺序。此外,链表上的操作是O(1)的,这比排序数据结构更优。
二、链表
定义链表非常的简单:
struct DList {
DList *prev = NULL;
DList *next = NULL;
};
inline void dlist_init(DList *node) {
node->prev = node->next = node;
}
inline bool dlist_empty(DList *node) {
return node->next == node;
}
inline void dlist_detach(DList *node) {
DList *prev = node->prev;
DList *next = node->next;
prev->next = next;
next->prev = prev;
}
inline void dlist_insert_before(DList *target, DList *rookie) {
DList *prev = target->prev;
prev->next = rookie;
rookie->prev = prev;
rookie->next = target;
target->prev = rookie;
}
三、事件循环
接下来将列表添加到服务器和连接结构中
// global variables
static struct {
HMap db;
// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;
// timers for idle connections
DList idle_list;
} g_data;
struct Conn {
// code omitted...
uint64_t idle_start = 0;
// timer
DList idle_list;
};
修改事件循环
int main() {
// some initializations
dlist_init(&g_data.idle_list);
int fd = socket(AF_INET, SOCK_STREAM, 0);
// bind, listen & other miscs
// code omitted...
// the event loop
std::vector<struct pollfd> poll_args;
while (true) {
// prepare the arguments of the poll()
// code omitted...
// poll for active fds
int timeout_ms = (int)next_timer_ms();
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), timeout_ms);
if (rv < 0) {
die("poll");
}
// process active connections
for (size_t i = 1; i < poll_args.size(); ++i) {
if (poll_args[i].revents) {
Conn *conn = g_data.fd2conn[poll_args[i].fd];
connection_io(conn);
if (conn->state == STATE_END) {
// client closed normally, or something bad happened.
// destroy this connection
conn_done(conn);
}
}
}
// handle timers
process_timers();
// try to accept a new connection if the listening fd is active
if (poll_args[0].revents) {
(void)accept_new_conn(fd);
}
}
return 0;
}
-
修改了超时参数的计算方式:在这段代码中,poll 函数的超时参数是通过 next_timer_ms 函数计算得出的。这意味着,poll 函数将等待直到下一个定时器到期或者直到有新的网络事件需要处理,以先到者为准。
-
移动了销毁连接的代码:原先可能在不同地方处理销毁连接的代码被移动到了 conn_done 函数中。这样做的好处是代码更加模块化,易于管理和维护。
-
添加了触发定时器的函数:通过添加 process_timers 函数,可以在事件循环的每次迭代中检查并触发到期的定时器。这个函数会遍历定时器列表,对于已经到期的定时器,执行相应的操作,比如关闭空闲的TCP连接。
-
定时器的更新和初始化:定时器的更新是在 connection_io 函数中进行的,每当有IO活动发生时,连接的定时器会被重置。而定时器的初始化则是在 accept_new_conn 函数中完成的,当一个新的连接被接受时,会为这个连接设置一个初始的定时器。
四、链表排序
4.1 寻找最近的计时器
这个函数从一个列表中取出第一个(最接近当前时间的)定时器,并使用它来计算.next_timer_mspoll的超时值。
const uint64_t k_idle_timeout_ms = 5 * 1000;
static uint32_t next_timer_ms() {
if (dlist_empty(&g_data.idle_list)) {
return 10000; // no timer, the value doesn't matter
}
uint64_t now_us = get_monotonic_usec();
Conn *next = container_of(g_data.idle_list.next, Conn, idle_list);
uint64_t next_us = next->idle_start + k_idle_timeout_ms * 1000;
if (next_us <= now_us) {
// missed?
return 0;
}
return (uint32_t)((next_us - now_us) / 1000);
}
get_monotonic_usec 用于获取时间。这个函数特别强调了时间戳必须是单调递增的,也就是说,时间戳应该是连续的,不会向后跳跃。如果时间戳发生向后跳跃,即时间戳的值突然变小,这可能会导致计算机系统中出现各种问题。这种时间戳的不连续性可能会干扰依赖于时间顺序的系统功能,比如日志记录、事件排序和时间同步等。
static uint64_t get_monotonic_usec() {
timespec tv = {0, 0};
clock_gettime(CLOCK_MONOTONIC, &tv);
return uint64_t(tv.tv_sec) * 1000000 + tv.tv_nsec / 1000;
}
4.2 激活计时器
在事件循环(event loop)的每一次迭代中,都会检查一个列表,以确保在正确的时间触发(fire)计时器(timers)。这里的“事件循环”是编程中的一个概念,它是一个等待和处理事件的循环,例如用户交互、文件读写操作完成、网络请求的响应等。而“计时器”则是指在特定时间间隔后执行某些操作的机制。
static void process_timers() {
uint64_t now_us = get_monotonic_usec();
while (!dlist_empty(&g_data.idle_list)) {
Conn *next = container_of(g_data.idle_list.next, Conn, idle_list);
uint64_t next_us = next->idle_start + k_idle_timeout_ms * 1000;
if (next_us >= now_us + 1000) {
// not ready, the extra 1000us is for the ms resolution of poll()
break;
}
printf("removing idle connection: %d\n", next->fd);
conn_done(next);
}
}
4.3 维护计时器
计时器将会使用connection_io函数更新
static void connection_io(Conn *conn) {
// waked up by poll, update the idle timer
// by moving conn to the end of the list.
conn->idle_start = get_monotonic_usec();
dlist_detach(&conn->idle_list);
dlist_insert_before(&g_data.idle_list, &conn->idle_list);
// do the work
if (conn->state == STATE_REQ) {
state_req(conn);
} else if (conn->state == STATE_RES) {
state_res(conn);
} else {
assert(0); // not expected
}
}
acceot_new_conn初始化计时器
static int32_t accept_new_conn(int fd) {
// code omitted...
// creating the struct Conn
struct Conn *conn = (struct Conn *)malloc(sizeof(struct Conn));
if (!conn) {
close(connfd);
return -1;
}
conn->fd = connfd;
conn->state = STATE_REQ;
conn->rbuf_size = 0;
conn->wbuf_size = 0;
conn->wbuf_sent = 0;
conn->idle_start = get_monotonic_usec();
dlist_insert_before(&g_data.idle_list, &conn->idle_list);
conn_put(g_data.fd2conn, conn);
return 0;
}
不要忘记回收连接
static void conn_done(Conn *conn) {
g_data.fd2conn[conn->fd] = NULL;
(void)close(conn->fd);
dlist_detach(&conn->idle_list);
free(conn);
}
五、测试
我们可以使用命令行工具来测试空闲超时。
$ ./server
removing idle connection: 4
$ socat tcp:127.0.0.1:1234 -
总结
这一章我们学习了如何在服务器中实现超时机制,以便处理空闲的TCP连接。我们首先了解超时在网络编程中的重要性,然后讨论修改事件循环的必要性,并提出了使用链表作为计时器的数据结构。接着,文章提供了链表操作的具体代码,并展示了如何将这些操作集成到事件循环中。此外,还介绍了如何处理和触发计时器,并提供了测试空闲超时机制的方法。
12_server.cpp: https://build-your-own.org/redis/12/12_server.cpp.htm
avl.cpp: https://build-your-own.org/redis/12/avl.cpp.htm
avl.h: https://build-your-own.org/redis/12/avl.h.htm
common.h: https://build-your-own.org/redis/12/common.h.htm
hashtable.cpp: https://build-your-own.org/redis/12/hashtable.cpp.htm
hashtable.h: https://build-your-own.org/redis/12/hashtable.h.htm
list.h: https://build-your-own.org/redis/12/list.h.htm
zset.cpp: https://build-your-own.org/redis/12/zset.cpp.htm
zset.h: https://build-your-own.org/redis/12/zset.h.htm