新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X
发起主动连接
我们做服务器一般都是接收外部发起的连接。但是有时候也需要主动发起连接请求。这个通过socket.open(addr,port)
我们看代码
function socket.open(addr, port)
local id = driver.connect(addr,port) --如果写管道前就有问题 这里返回的id是 -1
return connect(id)
end
实际上我们主动发起连接的过程是这样的:
- 把请求连接的命令写入管道,然后挂起当前协程。
- 网络线程处理管道命令,发起连接。
- 如果连接立即成功,则push一个
SKYNET_SOCKET_TYPE_CONNECT
到 主动发起连接所在的服务的队列 - 如果不能立即成功,网络线程会不断的检测,即每次通过
epoll_wait
来发现连接是否成功。一旦发现连接成功则会pushSKYNET_SOCKET_TYPE_CONNECT
到 主动发起连接所在的服务的队列 - 发起连接请求的服务处理
SKYNET_SOCKET_TYPE_CONNECT
消息,最后唤醒挂起的协程。 socket.open
返回 连接的id
- 如果连接立即成功,则push一个
下面具体看代码
static int
lconnect(lua_State *L) {
size_t sz = 0;
const char * addr = luaL_checklstring(L,1,&sz);
char tmp[sz];
int port = 0;
const char * host = address_port(L, tmp, addr, 2, &port);
if (port == 0) {
return luaL_error(L, "Invalid port");
}
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = skynet_socket_connect(ctx, host, port);//主动发起连接 返回的id是为新连接分配的id
lua_pushinteger(L, id);
return 1;
}
继续看 skynet_socket_connect
。看下面的代码,从行14 开始。最终在行7 把请求写入管道。
int
socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) {
struct request_package request;
int len = open_request(ss, &request, opaque, addr, port);
if (len < 0)
return -1;
send_request(ss, &request, 'O', sizeof(request.u.open) + len);//把请求连接写入管道
return request.u.open.id;
}
int
skynet_socket_connect(struct skynet_context *ctx, const char *host, int port) {
uint32_t source = skynet_context_handle(ctx);
return socket_server_connect(SOCKET_SERVER, source, host, port);
}
下面我们看网络线程是怎么处理管道请求的。
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
// ctrl command only exist in local fd, so don't worry about endian.
switch (type) {
case 'O':
return open_socket(ss, (struct request_open *)buffer, result);
}
}
接下来主要看看 open_socket
static int
open_socket(struct socket_server *ss, struct request_open * request, struct socket_message *result) {
int id = request->id;
result->opaque = request->opaque;
result->id = id;
result->ud = 0;
result->data = NULL;
struct socket *ns;
int status;
struct addrinfo ai_hints;
struct addrinfo *ai_list = NULL;
struct addrinfo *ai_ptr = NULL;
char port[16];
sprintf(port, "%d", request->port);
memset(&ai_hints, 0, sizeof( ai_hints ) );
ai_hints.ai_family = AF_UNSPEC;
ai_hints.ai_socktype = SOCK_STREAM;
ai_hints.ai_protocol = IPPROTO_TCP;
status = getaddrinfo( request->host, port, &ai_hints, &ai_list );//解析后得到的地址都放在ai_list
int sock= -1;
for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next ) {//找出一个可以连接的
sock = socket( ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol );
if ( sock < 0 ) {continue;}
socket_keepalive(sock);//设置保活
sp_nonblocking(sock);//设置非阻塞
status = connect( sock, ai_ptr->ai_addr, ai_ptr->ai_addrlen);//发起连接
if ( status != 0 && errno != EINPROGRESS) {close(sock);sock = -1;continue;} break;
}
if (sock < 0) { result->data = strerror(errno);goto _failed; }
ns = new_fd(ss, id, sock, PROTOCOL_TCP, request->opaque, true);//最后一个参数 表示监听可读事件
if(status == 0) {//已经连接成功
ATOM_STORE(&ns->type , SOCKET_TYPE_CONNECTED);//设置为成功连接
struct sockaddr * addr = ai_ptr->ai_addr;
void * sin_addr = (ai_ptr->ai_family == AF_INET) ? (void*)&((struct sockaddr_in *)addr)->sin_addr : (void*)&((struct sockaddr_in6 *)addr)->sin6_addr;
if (inet_ntop(ai_ptr->ai_family, sin_addr, ss->buffer, sizeof(ss->buffer))) {
result->data = ss->buffer;//设置对端的地址
}
freeaddrinfo( ai_list );
return SOCKET_OPEN;
} else {//没能一次性连接成功 后面需要查询连接情况
ATOM_STORE(&ns->type , SOCKET_TYPE_CONNECTING);//设置为连接进行中
if (enable_write(ss, ns, true)) {//监听可写事件
result->data = "enable write failed";
goto _failed;
}
}
return -1;
return SOCKET_ERR;
}
行20 获取我们要连接的地址集合。
行22 从上面集合中找出一个可以成功连接的。注意这里都是非阻塞的socket
行 31 把之前分配的槽位id对应的socket结构进行填充。注意这里开启了 可读监听
。
行32 表示一次性连接成功。注意这里直接返回了 SOCKET_OPEN
。后面会push一个消息到主动发起连接的服务。
行41 表示没能一次性连接成功。此时socket的类型为 SOCKET_TYPE_CONNECTING
。后面网络线程会不断查询连接的状况。这里开启了 监听写事件
。也即是说如果一次性没有连接成功,那么会同时开启监听读和写
。
假设我们没有一次性连接成功。我们看看最终网络线程是怎么发现连接成功的,以及后续怎么处理的。实际上,当我们调用sp_wait检测的时候,只要 socket上有事件发送,且socket的类型是SOCKET_TYPE_CONNECTING
,我们就认为可能
连接成功了。至于怎么后续处理,看下面代码行 15.
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->event_index == ss->event_n) {//一轮批量处理完成 新的一轮需要开始了 调用一次sp_wait作为新一轮的开始
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
ss->checkctrl = 1;
ss->event_index = 0;
}
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
struct socket_lock l;
socket_lock_init(s, &l);
switch (ATOM_LOAD(&s->type)) {
case SOCKET_TYPE_CONNECTING://查询主动发起的连接情况
return report_connect(ss, s, &l, result);
default:
if (e->read) {}
if (e->write) {}
if (e->error) {}
if (e->eof) {}
break;
}
}
}
具体看看 report_connect
static int
report_connect(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
int error;
socklen_t len = sizeof(error);
int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);//注意这里判断连接成功的方法
if (code < 0 || error) {
error = code < 0 ? errno : error;
force_close(ss, s, l, result);
result->data = strerror(error);
return SOCKET_ERR;
} else {//连接成功
ATOM_STORE(&s->type , SOCKET_TYPE_CONNECTED);
result->opaque = s->opaque;
result->id = s->id;
result->ud = 0;
if (nomore_sending_data(s)) {
if (enable_write(ss, s, false)) {//因为之前监听了可写事件 且没有数据需要发送 所以现在关闭可写监听 不然该sock会一直报告可写事件
force_close(ss,s,l, result);
result->data = "disable write failed";
return SOCKET_ERR;
}
}
union sockaddr_all u;
socklen_t slen = sizeof(u);
if (getpeername(s->fd, &u.s, &slen) == 0) {
void * sin_addr = (u.s.sa_family == AF_INET) ? (void*)&u.v4.sin_addr : (void *)&u.v6.sin6_addr;
if (inet_ntop(u.s.sa_family, sin_addr, ss->buffer, sizeof(ss->buffer))) {
result->data = ss->buffer;//获取对端的ip地址
return SOCKET_OPEN;
}
}
result->data = NULL;
return SOCKET_OPEN;
}
}
上面的代码主要是判断连接是否成功。
如果连接成功,且没有数据发送,就要把监听写关闭
。因为我们的epoll默认是是水平触发模式,如果不关闭监听写事件的话,会一直有可写通知产生。最后返回值 也是 SOCKET_OPEN
。最终会push消息给主动发起连接的服务。具体看看下面代码,也就是行9.
int
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more);
switch (type) {
case SOCKET_OPEN://新连接创建完成
forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
break;
}
if (more) {
return -1;
}
return 1;
}
再看 forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
static void
forward_message(int type, bool padding, struct socket_message * result) {
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
if (padding) {//true
if (result->data) {
size_t msg_sz = strlen(result->data);//对端地址
if (msg_sz > 128) {
msg_sz = 128;
}
sz += msg_sz;
}
}
sm = (struct skynet_socket_message *)skynet_malloc(sz);
sm->type = type;//SKYNET_SOCKET_TYPE_CONNECT
sm->id = result->id;//新连接的id
sm->ud = result->ud;//0
if (padding) {
sm->buffer = NULL;
memcpy(sm+1, result->data, sz - sizeof(*sm));//把对端地址写入末尾
}
struct skynet_message message;
message.source = 0;
message.session = 0;
message.data = sm;
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);//PTYPE_SOCKET放置在sz的高八位
if (skynet_context_push((uint32_t)result->opaque, &message)) {//push到主动发起连接的服务
// todo: report somewhere to close socket
// don't call skynet_socket_close here (It will block mainloop)
skynet_free(sm->buffer);
skynet_free(sm);
}
}
这里 最终把 SKYNET_SOCKET_TYPE_CONNECT
消息push到服务。接下我们看是怎么唤醒lua层挂起的协程的。
-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)
local s = socket_pool[id]
if s == nil then
return
end
-- log remote addr
if not s.connected then -- resume may also post connect message
s.connected = true
wakeup(s) -- 唤醒之前挂起的协程
end
end
看上面代码行 10,最终导致 socket.open
返回。
当然我们真正需要跟外部通讯,而且是主动发起时,skynet 推荐使用 socket channel。比如我们可以主动连接数据库服务器。
over
标签:11,socket,发起,ai,ss,SOCKET,result,连接,struct From: https://www.cnblogs.com/waittingforyou/p/16972236.html