新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X
上一节讲了 发起网络监听 。现在接着说明 怎么接受新连接的。
这个水龙头里面的水 主要就是业务逻辑需要的网络数据了。注意这是 不同于监听水龙头里面的内容。
我们这里主要是讨论当有外部主动请求连接我们服务器时,skynet是怎么发现和处理的。
我们c层有一个网络线程,网络线程会不断的调用epoll提供的检测函数 epoll_wait
来发现网络事件。 也就是说,如果有可读事件或者可写事件发生了,我们是可以及时知道的。实际上这个检测函数是在 socket_server_poll
的 sp_wait
里面调用的。如下所示
看下面的 行5
static int
sp_wait(int efd, struct event *e, int max) {
struct epoll_event ev[max];
int n = epoll_wait(efd , ev, max, -1);//此处是调用epoll提供的检测事件函数
int i;
for (i=0;i<n;i++) {
e[i].s = ev[i].data.ptr;
unsigned flag = ev[i].events;
e[i].write = (flag & EPOLLOUT) != 0;
e[i].read = (flag & EPOLLIN) != 0;
e[i].error = (flag & EPOLLERR) != 0;
e[i].eof = (flag & EPOLLHUP) != 0;
}
return n;
}
我们来分析下 sp_wait 主要做的事情。实际上他就是调用epoll的epoll_wait函数去检测各个socket上发生的事件,并暂时保存在一个数组中。我们这里是讨论新连接,实际上如果在监听socket
上面有事件发生,那么我们认为可能有一个新连接请求到达了。socket_server_poll 主要就是把数组中的事件一个个的处理。看下面的代码
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->event_index == ss->event_n) {//一轮批量处理完成 新的一轮需要开始了 调用一次sp_wait作为新一轮的开始
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);//检测新事件
ss->checkctrl = 1;
ss->event_index = 0;
}
struct event *e = &ss->ev[ss->event_index++];//取出某个socket身上发生的事件
struct socket *s = e->s;//当前s是监听socket
struct socket_lock l;
socket_lock_init(s, &l);
switch (ATOM_LOAD(&s->type)) {
case SOCKET_TYPE_CONNECTING://查询主动发起的连接情况
return report_connect(ss, s, &l, result);
case SOCKET_TYPE_LISTEN: {
int ok = report_accept(ss, s, result);//有新的外部连接请求 s是监听socket
if (ok > 0) {
return SOCKET_ACCEPT;//lua层收到此消息后 需要调用 socketdriver.start才能使 新连接由 SOCKET_TYPE_PACCEPT 变 SOCKET_TYPE_CONNECTED
} if (ok < 0 ) {
return SOCKET_ERR;
}
// when ok == 0, retry
break;
}
case SOCKET_TYPE_INVALID:
skynet_error(NULL, "socket-server: invalid socket");
break;
default:
}
}
上面的代码 行15 说明是根据socket的type来做分支处理的。直接看 行18,我们还记得,上一节讨论发起网络监听时,监听socket的type最终会被设置为 SOCKET_TYPE_LISTEN。也就是说当前是在监听socket上发生的事件。那么说明有新连接请求。此时交给report_accept
处理,看下面的代码。实际上主要做的事情是,
- 向skynet获取一个槽位,代表新的连接
- .填充这个槽位对应的socket结构
- 加入epoll 但是并没立即监听可读事件。这里跟当初发起监听类似,所以当前状态是预接受 即 SOCKET_TYPE_PACCEPT
- 获取请求的ip地址
static int //此时s是监听socket
report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) {
union sockaddr_all u;
socklen_t len = sizeof(u);
int client_fd = accept(s->fd, &u.s, &len);//利用操作系统api 获取新连接是socket描述符
if (client_fd < 0) {
if (errno == EMFILE || errno == ENFILE) {
result->opaque = s->opaque;
result->id = s->id;
result->ud = 0;
result->data = strerror(errno);
return -1;
} else {
return 0;
}
}
int id = reserve_id(ss);//向skynet申请一个槽位
if (id < 0) {
close(client_fd);
return 0;
}
socket_keepalive(client_fd);//设置网络保活
sp_nonblocking(client_fd);//设置非阻塞
struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);//填充槽位对应的结构。最后一个参数表示当前不监听可读事件
if (ns == NULL) {
close(client_fd);
return 0;
}
// accept new one connection
stat_read(ss,s,1);
ATOM_STORE(&ns->type , SOCKET_TYPE_PACCEPT);//设置此时socket的类型 注意此时是 SOCKET_TYPE_PACCEPT
result->opaque = s->opaque;//监听所在的服务地址
result->id = s->id;//监听id
result->ud = id;//代表新连接的id
result->data = NULL;
if (getname(&u, ss->buffer, sizeof(ss->buffer))) {
result->data = ss->buffer;//设置新连接的请求ip地址
}
return 1;
}
上面的代码最终返回值是 1 ,回到上一层代码看看
case SOCKET_TYPE_LISTEN: {
int ok = report_accept(ss, s, result);//有新的外部连接请求
if (ok > 0) {
return SOCKET_ACCEPT;//lua层收到此消息后 需要调用 socketdriver.start才能使 新连接由 SOCKET_TYPE_PACCEPT 变 SOCKET_TYPE_CONNECTED
也就是说这里会返回值 SOCKET_ACCEPT 。也就是 socket_server_poll
的返回值。我们知道 socket_server_poll
被 skynet_socket_poll
调用的。注意下面的代码 行10。
int
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more);
switch (type) {
case SOCKET_ACCEPT://新连接 预创建完成, lua层收到此消息后需调用 socketdriver.start 最终会进入 case SOCKET_OPEN:表示新连接 真正创建完成
forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
break;
return 1;
}
最终会调用 forward_message push一个消息到队列。注意此时的参数 type是SKYNET_SOCKET_TYPE_ACCEPT padding是true
static void
forward_message(int type, bool padding, struct socket_message * result) {
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
if (padding) {
if (result->data) {//此时的data是请求连接的ip地址
size_t msg_sz = strlen(result->data);
if (msg_sz > 128) {
msg_sz = 128;
}
sz += msg_sz;
}
}
sm = (struct skynet_socket_message *)skynet_malloc(sz);//分配内存
sm->type = type;//SKYNET_SOCKET_TYPE_ACCEPT
sm->id = result->id;//监听id
sm->ud = result->ud;//新连接的id
if (padding) {
sm->buffer = NULL;
memcpy(sm+1, result->data, sz - sizeof(*sm));//把ip地址放置到sm的尾部
}
struct skynet_message message;
message.source = 0;
message.session = 0;
message.data = sm;
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);//PTYPE_SOCKET放置在sz的高八位
if (skynet_context_push((uint32_t)result->opaque, &message)) {//push消息到监听服务对应的队列
// todo: report somewhere to close socket
// don't call skynet_socket_close here (It will block mainloop)
skynet_free(sm->buffer);
skynet_free(sm);
}
}
以上代码主要是发送了一个网络消息到监听服务所在的队列。注意这个网络消息的具体类型是 SKYNET_SOCKET_TYPE_ACCEPT 。
此后我们的lua层处理该接力了。实际上lua层处理过程是 skynet.dispatch_message - -> raw_dispatch_message
注意行8 获取f函数
-- skynet.lua
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then --这里是处理响应
-- 略
else --这里主要是处理lua text socket 等消息类型
local p = proto[prototype]
local f = p.dispatch
if f then
local co = co_create(f) --获取一个协程对象并设置任务函数f
session_coroutine_id[co] = session --保存session以便找到回去的路;注意这里的session是其他服务独立产生的,所以不同请求者带过来的session可以是相同的
session_coroutine_address[co] = source --保存source以便找到回去的路 即记录请求者是谁
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
--略
end
end
end
而这个所谓的 任务函数 f是在 socket.lua里面注册的。如下所示 行6. 最终会根据 t 调用socket_message里面对应的函数。t 主要是网络消息的具体类型,主要有下面几个
- SKYNET_SOCKET_TYPE_DATA
- SKYNET_SOCKET_TYPE_CONNECT
- SKYNET_SOCKET_TYPE_CLOSE
- SKYNET_SOCKET_TYPE_ACCEPT
- SKYNET_SOCKET_TYPE_ERROR
-- socket.lua
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = driver.unpack,--把struct skynet_socket_message内的数据提取出来 即 type id ud buffer 注意type不同 后面参数的意义也是不同的
dispatch = function (_, _, t, ...) --t是 SKYNET_SOCKET_TYPE_ACCEPT
socket_message[t](...) --监听id 新连接id 新连接ip地址
end
}
我们此时的t是SKYNET_SOCKET_TYPE_ACCEPT 所以调用函数如下
-- SKYNET_SOCKET_TYPE_ACCEPT = 4
socket_message[4] = function(id, newid, addr) --参数的意思: id是监听id newid是新连接id addr是请求连接者的ip地址
local s = socket_pool[id]
if s == nil then
driver.close(newid)
return
end
s.callback(newid, addr) --新连接的回调函数
end
最终会调用 行8. 还记得吗 我们发起网络监听的时候,是设置了一个回调函数的。这个回调函数就是当有新连接消息来的时候触发的。回顾下代码
local listenid = socket.listen("127.0.0.1", 8001)
socket.start(listenid , function(newid, addr)--newid 是新连接的id, addr是新连接的发起者ip
print("new connection id is ".. newid)
print("connect from " .. addr)
end)
这里socket.start(listenid,accept)
注册了一个accept函数。这里我们只是简单的做了打印信息。
另外,实际上每次调用accept这个回调函数,都是在某个协程里面执行的。也就是说 如果有多个新连接消息达到,实际上每个回调函数都是在不同的协程中执行的。
新连接就算是完成了?不是。
还记得,我们的c层当发现有新连接的时候,底层并没有立即开启可读事件监听,而是把底层的socket的type设置为 SOCKET_TYPE_PACCEPT ,也就是预接受状态。也就是还没有变成完全的连接状态。所以我们需要在某个服务
中调用socket.start(newid)
。这样底层才知道,lua层已经完成做好准备。之前网络监听的时候,已经具体讨论过socket.start的基本流程。我们知道socket.start函数主要驱动做了下面这些事情
-
把请求写入管道然后挂起当前协程。
挂起是因为socket.start里面调用了connet函数。connet函数还为即将正式启用的新连接创建了一个对象。这个对象初始化了连接的相关信息。
-
网络线程读取管道请求做处理,然后push一个消息到队列,
-
最后lua层处理消息。
网络监听的时候,我们调用socket.start是告诉底层我们最终的监听服务是在哪里,这里我们调用socket.start是告诉底层,我们最终哪个服务负责处理这个新连接。
我们先看写管道
void
socket_server_start(struct socket_server *ss, uintptr_t opaque, int id) {
struct request_package request;
request.u.resumepause.id = id;//新连接的id
request.u.resumepause.opaque = opaque;//调用socket.start所在的服务
send_request(ss, &request, 'R', sizeof(request.u.resumepause));//写入管道
}
再看网络线程的处理过程
static int
resume_socket(struct socket_server *ss, struct request_resumepause *request, struct socket_message *result) {
int id = request->id;
result->id = id;//新连接的id
result->opaque = request->opaque;//调用socket.start时所在的服务
result->ud = 0;
result->data = NULL;
struct socket *s = &ss->slot[HASH_ID(id)];
if (socket_invalid(s, id)) {
result->data = "invalid socket";
return SOCKET_ERR;
}
if (halfclose_read(s)) {
// The closing socket may be in transit, so raise an error. See https://github.com/cloudwu/skynet/issues/1374
result->data = "socket closed";
return SOCKET_ERR;
}
struct socket_lock l;
socket_lock_init(s, &l);
if (enable_read(ss, s, true)) {//开启可读事件监听
result->data = "enable read failed";
return SOCKET_ERR;
}
uint8_t type = ATOM_LOAD(&s->type);
if (type == SOCKET_TYPE_PACCEPT || type == SOCKET_TYPE_PLISTEN) {//修改状态
ATOM_STORE(&s->type , (type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN);
s->opaque = request->opaque;
result->data = "start";
return SOCKET_OPEN;
} else if (type == SOCKET_TYPE_CONNECTED) {
}
// if s->type == SOCKET_TYPE_HALFCLOSE_WRITE , SOCKET_CLOSE message will send later
return -1;
}
注意 行20,这里开启了可读监听。再看 行25, 此时的 type是 SOCKET_TYPE_PACCEPT。 也就是预接受状态。行26 最后设置成了 SOCKET_TYPE_CONNECTED ,最后返回了 SOCKET_OPEN。我们重新来看网络线程的处理流程。网络线程会一直循环调用skynet_socket_poll
。里面又会调用 socket_server_poll
。看如下代码
int
skynet_socket_poll() {
int type = socket_server_poll(ss, &result, &more);//上面分析了,这次返回值是 SOCKET_OPEN
switch (type) {
case SOCKET_OPEN://新连接正式完成
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
}
}
继续看 forward_message。注意参数 type是 SKYNET_SOCKET_TYPE_CONNECT padding是true
static void
forward_message(int type, bool padding, struct socket_message * result) {//next
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
if (padding) {
if (result->data) {//当前的data是 "start"
size_t msg_sz = strlen(result->data);
if (msg_sz > 128) {
msg_sz = 128;
}
sz += msg_sz;
}
}
sm = (struct skynet_socket_message *)skynet_malloc(sz);//分配内存 这个sz已经包括result->data占用的字节了
sm->type = type;//SKYNET_SOCKET_TYPE_CONNECT
sm->id = result->id;//新连接的id
sm->ud = result->ud;//0
if (padding) {
sm->buffer = NULL;
memcpy(sm+1, result->data, sz - sizeof(*sm));//把result->data的数据放置到sm尾部。
}
struct skynet_message message;
message.source = 0;
message.session = 0;
message.data = sm;
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);//PTYPE_SOCKET放置在sz的高八位
if (skynet_context_push((uint32_t)result->opaque, &message)) {//push消息到调用socket.start的服务对应的队列
// todo: report somewhere to close socket
// don't call skynet_socket_close here (It will block mainloop)
skynet_free(sm->buffer);
skynet_free(sm);
}
}
最终会push一个消息到调用socket.start所在服务的队列。此时又到lua层接力处理了。这次我们根据网络消息的具体类型 SKYNET_SOCKET_TYPE_CONNECT 直接看最终的处理函数
-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)
local s = socket_pool[id] -- 通过新连接的id找到对应的对象 这个对象在调用socket.start时已经创建
if s == nil then
return
end
-- log remote addr
if not s.connected then -- resume may also post connect message
s.connected = true
wakeup(s) -- 把调用socket.start时挂起的协程加入唤醒队列
end
end
上面是 addr 参数其实是"start" 根本不是地址;当然在某种情况下也可以是"transfer" ,这个文末会说明
这里是把之前挂起的协程x加入唤醒队列了。之后协程x会在合适的时候被执行,也就是挂起的connet函数会继续执行了。
这里总结下 socket.start做的事情。到目前为止,我们主要用在监听和接受新连接上面。
- 对于监听来说,主要是底层检测到有新连接的时候,底层会把通知push到监听启用的最终位置,即监听所在的服务。
- 对于新连接来说,主要是告诉底层新连接最终是在哪里位置启用的。这样当底层发现这个连接上有数据到来时,知道要把消息发送给哪个服务。主要是我们在lua层写业务代码时,有时候想底层把这个连接上的数据发送到其他服务上去。所以socket.start就提供了这样一个修改启用位置的机会。看看下面的代码
static int
resume_socket(struct socket_server *ss, struct request_resumepause *request, struct socket_message *result) {
//略
uint8_t type = ATOM_LOAD(&s->type);
if (type == SOCKET_TYPE_PACCEPT || type == SOCKET_TYPE_PLISTEN) {
ATOM_STORE(&s->type , (type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN);
s->opaque = request->opaque;
result->data = "start";
return SOCKET_OPEN;
} else if (type == SOCKET_TYPE_CONNECTED) {
// todo: maybe we should send a message SOCKET_TRANSFER to s->opaque
s->opaque = request->opaque;//修改了连接内部绑定的服务地址
result->data = "transfer";
return SOCKET_OPEN;
}
// if s->type == SOCKET_TYPE_HALFCLOSE_WRITE , SOCKET_CLOSE message will send later
return -1;
}
假设一个连接是正常状态 即 SOCKET_TYPE_CONNECTED,此时我们调用 socket.start(),那么最终会执行 行12的代码。也就是会修改连接对应的服务地址。此时负责处理新连接的服务会在分发消息时发现socket_message[2] = function(id, _ , addr)
,而 addr 就是"transfer".