新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X
上一节讲了
--某个snlua服务中
socket.start(id)
local sz = 996
local str = socket.read(id,sz)
--do something
现在主要讨论读取连接上的数据的过程。实际上依旧是底层网络线程检测到连接上有数据到达,那么就会把数据push到连接所在服务的队列。
我们c层有一个网络线程,网络线程会不断的调用epoll提供的检测函数 epoll_wait
来发现网络事件。 这之前说过。现在我们直接看代码行17
.我们假设有socket连接上有数据到来,通过上一节我们知道此时连接的type是 SOCKET_TYPE_CONNECTED 所以我们的关注点从 行27开始
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->checkctrl) {
//略
}
if (ss->event_index == ss->event_n) {//一轮批量处理完成 新的一轮需要开始了 调用一次sp_wait作为新一轮的开始
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
ss->checkctrl = 1;
ss->event_index = 0;
}
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
struct socket_lock l;
socket_lock_init(s, &l);
switch (ATOM_LOAD(&s->type)) {
case SOCKET_TYPE_CONNECTING://查询主动发起的连接情况
return report_connect(ss, s, &l, result);
case SOCKET_TYPE_LISTEN: {
//略
break;
}
case SOCKET_TYPE_INVALID:
skynet_error(NULL, "socket-server: invalid socket");
break;
default:
if (e->read) {//有可读事件发生
int type;
if (s->protocol == PROTOCOL_TCP) {
type = forward_message_tcp(ss, s, &l, result);
if (type == SOCKET_MORE) {
--ss->event_index;
return SOCKET_DATA;
}
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
// Try to dispatch write message next step if write flag set.
e->read = false;
--ss->event_index;
}
if (type == -1)
break;
return type;
}
if (e->write) {//略}
if (e->error) {//略}
if (e->eof) {break;}
}
}
之前我们调用sp_wait检测了已经发生的网络事件,并做了保存。现在我们开始查看保存的数据了。行28 ,表示有可读事件发生。也就是连接上有数据到来。我们具体看看 forward_message_tcp是怎么处理的。
// return -1 (ignore) when error
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
int sz = s->p.size;
char * buffer = MALLOC(sz);//分配内存作为接收缓冲
int n = (int)read(s->fd, buffer, sz);//读取网络数据到我们自己的buffer
if (n<0) {//略}
if (n==0) {//略}
if (halfclose_read(s)) {
// discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
FREE(buffer);
return -1;
}
stat_read(ss,s,n);
result->opaque = s->opaque;//连接绑定的服务地址
result->id = s->id;//连接的id
result->ud = n;//读取的网络数据的大小
result->data = buffer;//网络数据
//下面这段代码 主要是根据每次收到的数据量来调整接收缓冲区的大小。合适就不调整,如果小了就需要扩容,大了就需要及时缩小
if (n == sz) {
s->p.size *= 2;
return SOCKET_MORE;
} else if (sz > MIN_READ_BUFFER && n*2 < sz) {
s->p.size /= 2;
}
return SOCKET_DATA;//注意返回值
}
上面的代码 行6 是读取网络数据到连接的缓冲区。这个缓冲区是有默认大小的。我们注意read函数的返回值主要是 三种情况
- n<0 异常或者是被信号中断
- n==0 连接的对端断开了网络
- n>0 正常
我们暂时只看正常情况。后面了解了 网络数据的写和关闭后,会再做一次分析。
上面的代码最终把数据保存到了 result里面。最后还根据这次读取数据的情况,动态调整了缓冲区大小。调整策略是
- 如果这次读取数据把缓冲区填满了,说明内核缓冲区数据还比较多,那么把我们自己的应用缓冲区buffer调整为原来的两倍,准备继续读取数据。这时候注意 行25 的返回值是 SOCKET_MORE 就说明要继续读数据。
- 如果当前我们读取的网络数据的大小连我们缓冲区的一半都不到,那么说明我们的缓冲区设置太大了,所以缩小为原来的一半。这个时候返回值是 SOCKET_DATA
我们再次回到调用 forward_message_tcp
的地方看看。实际上最终返回 SOCKET_DATA 给 上一层调用,表示有网络数据需要处理。看下面的代码
if (e->read) {//有可读事件发生
int type;
if (s->protocol == PROTOCOL_TCP) {
type = forward_message_tcp(ss, s, &l, result);
if (type == SOCKET_MORE) {
--ss->event_index;
return SOCKET_DATA;
}
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
// Try to dispatch write message next step if write flag set.
e->read = false;
--ss->event_index;
}
if (type == -1)
break;
return type;
}
看行5 ,如果forward_message_tcp 直接返回 SOCKET_MORE 也就是还要继续读数据的情况。我们知道 socket_server_poll 是不断的调用 for循环 处理连接事件的。按照规矩,每个连接给一次机会处理,雨露均沾。而行6 这里 --ss->event_index
其实是想多给当前连接一次处理机会。最终返回 SOCKET_DATA给上一层调用。如果forward_message_tcp 返回 SOCKET_DATA ,那么也是返回 SOCKET_DATA给上一层。现在回到上一层调用。注意看 行10
int
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more);
switch (type) {
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
default:
skynet_error(NULL, "Unknown socket message type %d.",type);
return -1;
}
return 1;
}
注意参数是 type 是 SKYNET_SOCKET_TYPE_DATA padding 是 false
static void
forward_message(int type, bool padding, struct socket_message * result) {
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
sm = (struct skynet_socket_message *)skynet_malloc(sz);//分配内存
sm->type = type;//SKYNET_SOCKET_TYPE_DATA
sm->id = result->id;//连接的id
sm->ud = result->ud;//网络数据的大小
sm->buffer = result->data;//网络数据
struct skynet_message message;
message.source = 0;
message.session = 0;
message.data = sm;
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);//PTYPE_SOCKET放置在sz的高八位
if (skynet_context_push((uint32_t)result->opaque, &message)) {//push到连接所绑定的服务
// todo: report somewhere to close socket
// don't call skynet_socket_close here (It will block mainloop)
skynet_free(sm->buffer);
skynet_free(sm);
}
}
上面最终push了一个网络消息到连接绑定的的服务队列。现在 内存的分配是下面这个样子
接下来又是lua层开始接力了。我们直接找到处理位置。
-- read skynet_socket.h for these macro
-- SKYNET_SOCKET_TYPE_DATA = 1
socket_message[1] = function(id, size, data)
local s = socket_pool[id]
local sz = driver.push(s.buffer, s.pool, data, size)
local rr = s.read_required
local rrt = type(rr)
if rrt == "number" then
-- read size
if sz >= rr then
s.read_required = nil
if sz > BUFFER_LIMIT then
pause_socket(s, sz)
end
wakeup(s)
end
else
if s.buffer_limit and sz > s.buffer_limit then
skynet.error(string.format("socket buffer overflow: fd=%d size=%d", id , sz))
driver.close(id)
return
end
if rrt == "string" then
-- read line
if driver.readline(s.buffer,nil,rr) then
s.read_required = nil
if sz > BUFFER_LIMIT then
pause_socket(s, sz)
end
wakeup(s)
end
elseif sz > BUFFER_LIMIT and not s.pause then
pause_socket(s, sz)
end
end
end
注意上面行3 的data已经是上图中绿色的网络数据
。因为在处理网络消息类型时,已经通过 unpack
解包了。看 socket.lua
注册的协议代码
skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = driver.unpack,--把struct skynet_socket_message内的数据提取出来 即 type id ud buffer 注意type不同 后面参数的意义也是不同的
dispatch = function (_, _, t, ...)
socket_message[t](...)
end
}
下面是具体讨论 lua层处理收到的网络数据
标签:sz,SOCKET,ss,数据,网络,message,type,socket From: https://www.cnblogs.com/waittingforyou/p/16972227.html