新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X
关闭网络连接。
我们进行网络通讯的时候,两端都有一个socket。每个socket又有读端和写端。
当我们在一端打算关闭一个socket的时候,其实就是想关闭socket对应的读端和写端。shutdown(SHUT_WR)
表示关闭写,shutdown(SHUT_RD)
表示关闭读。close()
则表示同时关闭读和写。
1.收到底层的关闭消息
2.主动发送关闭
收到底层关闭消息
当我们收到底层发送的关闭消息时,实际上是表示对方已经关闭写。我方socket已经关闭读。
-- SKYNET_SOCKET_TYPE_CLOSE = 3
socket_message[3] = function(id)
local s = socket_pool[id]
if s then
s.connected = false
wakeup(s) --唤醒挂起的协程
else
driver.close(id) --当前服务之前已经放弃控制权 所以socket_pool[id] == nil
end
local cb = socket_onclose[id]
if cb then
cb(id)
socket_onclose[id] = nil
end
end
当然 收到底层关闭的消息,lua层还是可以写数据的。其实目前上层只知道关闭了读。
主动关闭连接
如果lua层主动发起关闭。会通知底层,然后挂起等待底层push一个 SKYNET_SOCKET_TYPE_CLOSE消息。最后会清理lua层的socket对象。我们主动关闭时,如果还有另外一个协程y在读数据,那么我们挂起自己x,先让那个y协程把数据读出来,然后再唤醒我们x协程。看看代码
function socket.close(id)
local s = socket_pool[id]
if s == nil then
return
end
driver.close(id)
if s.connected then
s.pause = false -- Do not resume this fd if it paused.
if s.co then
-- reading this socket on another coroutine, so don't shutdown (clear the buffer) immediately
-- wait reading coroutine read the buffer.
assert(not s.closing)
s.closing = coroutine.running()
skynet.wait(s.closing)
else
suspend(s) -- 等待 底层发来的 SKYNET_SOCKET_TYPE_CLOSE 事件
end
s.connected = false
end
socket_pool[id] = nil
end
当我们在lua层调用关闭连接函数时,最终会执行底层的 close_socket
函数。
static int
close_socket(struct socket_server *ss, struct request_close *request, struct socket_message *result) {
int id = request->id;
struct socket * s = &ss->slot[HASH_ID(id)];
if (socket_invalid(s, id)) {
// The socket is closed, ignore
return -1;
}
struct socket_lock l;
socket_lock_init(s, &l);
int shutdown_read = halfclose_read(s);
//equest->shutdown是1的话 表示强制关闭
if (request->shutdown || nomore_sending_data(s)) {
// If socket is SOCKET_TYPE_HALFCLOSE_READ, Do not raise SOCKET_CLOSE again.
int r = shutdown_read ? -1 : SOCKET_CLOSE;
force_close(ss,s,&l,result);
return r;
}
s->closing = true;//说明还有数据缓存了没发送出去
if (!shutdown_read) {
// don't read socket after socket.close()
close_read(ss, s, result);
return SOCKET_CLOSE;
}
// recv 0 before (socket is SOCKET_TYPE_HALFCLOSE_READ) and waiting for sending data out.
return -1;
}
我们知道要发送出去的数据,如果不能一次性发送完,会先缓存到连接对应的链表中。此时如果链表中还有数据没发送出去,就不能立即关闭写,只能关闭读。当然如果lua层要求强制关闭的话,就一次性把读写都关闭。只要我们关闭了一个socket的读,那么一定会push一个SOCKET_CLOSE 网络消息到队列。相当于告诉lua层,读关闭了,没有数据可读了,也不要再读了。
我们lua层主动调用关闭之前,底层可能已经关闭读了。这个时机就是在底层读事件通知时,即调用 forward_message_tcp
。其实就是下面的 行35.即read返回值为0时。
另外当我们关闭了读时,底层就算时收到了数据,也会把数据丢弃。行40 就是这个意思。
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
int sz = s->p.size;
char * buffer = MALLOC(sz);//分配内存作为接收缓冲
int n = (int)read(s->fd, buffer, sz);
if (n<0) {
FREE(buffer);
switch(errno) {
case EINTR:
case AGAIN_WOULDBLOCK:
break;
default:
return report_error(s, result, strerror(errno));
}
return -1;
}
if (n==0) {
FREE(buffer);
if (s->closing) {
// Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
if (nomore_sending_data(s)) {
force_close(ss,s,l,result);
}
return -1;
}
int t = ATOM_LOAD(&s->type);
if (t == SOCKET_TYPE_HALFCLOSE_READ) {
// Rare case : Already shutdown read.
return -1;
}
if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {//之前写时发现对方关闭读,现在对方又关闭写,所以对方完全关闭,那么我方也可以完全关闭了
// Remote shutdown read (write error) before.
force_close(ss,s,l,result);
} else {
close_read(ss, s, result);
}
return SOCKET_CLOSE;
}
if (halfclose_read(s)) {
// discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
FREE(buffer);
return -1;
}
return SOCKET_DATA;
}
问题来了,既然已经关闭了读,底层怎么会还有读事件通知?
实际上,我们网络线程每次都是优先处理lua层通过管道传递的命令,即调用ctrl_cmd
处理管道里面的命令.然后再去执行epoll_wait
检测socket上发生的事件。如果关闭连接的指令,刚好在ctrl_cmd
和 epoll_wait
之间写入管道,那么就会出现epoll_wait
检查事件时发现管道里面有一个关闭某连接的命令等待执行,同时又可以检测到某个连接上有数据可读。之后又会马上重新去处理管道命令,这里是关闭读
,然后再去把刚刚检测出来并保存的可读事件进行处理。这样就出现了先关闭读,然后又收到读事件通知的错觉。网络线程处理过程如下图所示
我们需要继续分析 forward_message_tcp 中的下面这段代码。这段代码表示对端关闭了写。
if (n==0) {
FREE(buffer);
if (s->closing) {
// Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
if (nomore_sending_data(s)) {
force_close(ss,s,l,result);
}
return -1;
}
int t = ATOM_LOAD(&s->type);
if (t == SOCKET_TYPE_HALFCLOSE_READ) {
// Rare case : Already shutdown read.
return -1;
}
if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {//之前写时发现对方关闭读,现在对方又关闭写,所以对方完全关闭,那么我方也可以完全关闭了
// Remote shutdown read (write error) before.
force_close(ss,s,l,result);
} else {
close_read(ss, s, result);
}
return SOCKET_CLOSE;
}
行3 表示关闭正在进行中,说明lua层打算关闭连接,而且关闭了读。行5 表示我们数据都发生出去了,这个时候可以完全关闭了。虽然其实只需要关闭写,因为读早已关闭了。
行11 的情况以后待分析
行15 的情况是: 之前写时发现对方关闭读
,现在对方又关闭写,所以对方完全关闭,那么我方也可以完全关闭了.所以完全关闭了。
这里具体看看是怎么发现对方关闭读的,即设置SOCKET_TYPE_HALFCLOSE_WRITE
的具体过程。
static int
send_list_tcp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
while (list->head) {
struct write_buffer * tmp = list->head;
for (;;) {//如果对方已经关闭读 那么write是可以成功发送 但对方会把接收的数据丢弃 并返回rst ;当下次调用write时 write返回-1 错误码是 EPIPE
ssize_t sz = write(s->fd, tmp->ptr, tmp->sz);
if (sz < 0) {
switch(errno) {
case EINTR:
continue;
case AGAIN_WOULDBLOCK:
return -1;
}
return close_write(ss, s, l, result);
}
stat_write(ss,s,(int)sz);
s->wb_size -= sz;
if (sz != tmp->sz) {
tmp->ptr += sz;
tmp->sz -= sz;
return -1;
}
break;
}
list->head = tmp->next;
write_buffer_free(ss,tmp);
}
list->tail = NULL;
return -1;
}
send_list_tcp是当我们可写事件发生
时用来发送数据出去的函数。
上面的行14 就是表示对方已经关闭了读。既然对方已经关闭了读,那么也就写不进去了,所以我们调用 close_write
关闭了写。
关于write函数需要注意的是,当对端已经关闭了读,我们依旧可以成功调用write函数把数据发送出去。正常发送出去的意思是,我们的write会像往常一样,copy数据到内核,然后返回copy的值。只是对端收到数据后,会把数据丢弃。同时回复一个rst给我们的操作系统(rst是tcp包头中的一个标志位)。当我们再次调用write写数据的时候,write会出现异常。当然还有一个地方需要注意的是,我们第二次write时,系统发现rst就会发送一个信号SIGPIPE给我们进程,对于这个信号我们默认处理是退出进程。所以要保证程序继续运行,必须处理这个信号,我们skynet中就是忽略这个信号。看下面的启动代码
int sigign() {
struct sigaction sa;
sa.sa_handler = SIG_IGN;//忽略 SIGPIPE 这个信号
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sigaction(SIGPIPE, &sa, 0);
return 0;
}
int
main(int argc, char *argv[]) {
skynet_globalinit();
skynet_env_init();
sigign();//处理信号问题
//略
}
我们在具体看看 close_write
static int
close_write(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
if (s->closing) {//关闭进行中
force_close(ss,s,l,result);//写的过程中发现对方关闭了读,导致我方无法写.之前我方已经关闭了读,所以可以完全关闭连接了
return SOCKET_RST;
} else {
int t = ATOM_LOAD(&s->type);
if (t == SOCKET_TYPE_HALFCLOSE_READ) {//之前发现recv 0,即对方关闭写。现在对方关闭读,导致我方无法写,之前我方已经关闭了读,所以可以完全关闭连接了
// recv 0 before, ignore the error and close fd
force_close(ss,s,l,result);
return SOCKET_RST;
}
if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {//puzzled
// already raise SOCKET_ERR
return SOCKET_RST;
}
ATOM_STORE(&s->type, SOCKET_TYPE_HALFCLOSE_WRITE);
shutdown(s->fd, SHUT_WR);
enable_write(ss, s, false);
return report_error(s, result, strerror(errno));
}
}
我们主要看行17. 从这里开始我们关闭了写。
这里我们顺带分析下
行3.也就是写的过程中发现对方关闭了读,导致我方无法写.之前我方已经主动关闭了读,所以可以完全关闭连接了
行8 之前对方已经关闭了写,现在对方又关闭了读,即对方已经完全关闭,所以我方也可以完全关闭了
我们看看最后一种关闭的情况。当我们关闭了写,之后调用epoll_wait
做检测时,发现收到了对端的fin。此时会执行下面的代码。
if (e->eof) {
// For epoll (at least), FIN packets are exchanged both ways.
// See: https://stackoverflow.com/questions/52976152/tcp-when-is-epollhup-generated
int halfclose = halfclose_read(s);
force_close(ss, s, &l, result);
if (!halfclose) {
return SOCKET_CLOSE;
}
}
关于
shutdown
:
调用
shutdown(SHUT_WR)
会发送FIN
并用SEND_SHUTDOWN
标记自己的socket .收到FIN
的一方则用RCV_SHUTDOWN
标记自己的socket调用
shutdown(SHUT_RD)
不发送任何内容,但会用RCV_SHUTDOWN
标记自己的socket.关于
epoll
:如果 socket 标有
SEND_SHUTDOWN
和RCV_SHUTDOWN
,epoll
将返回EPOLLHUP
.
对方发送了fin 过来,此时我们socket有一个 RCV_SHUTDOWN
,此时如果我们关闭写,则自己贡献一个SEND_SHUTDOWN
。已经凑齐,此时如果检测则会出现 EPOLLHUP
事件。