首页 > 其他分享 >11-发起主动连接

11-发起主动连接

时间:2022-12-10 20:22:34浏览次数:65  
标签:11 socket 发起 ai ss SOCKET result 连接 struct

新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X


发起主动连接

我们做服务器一般都是接收外部发起的连接。但是有时候也需要主动发起连接请求。这个通过socket.open(addr,port)

我们看代码

function socket.open(addr, port)
	local id = driver.connect(addr,port) --如果写管道前就有问题 这里返回的id是 -1
	return connect(id)
end

实际上我们主动发起连接的过程是这样的:

  1. 把请求连接的命令写入管道,然后挂起当前协程。
  2. 网络线程处理管道命令,发起连接。
    1. 如果连接立即成功,则push一个 SKYNET_SOCKET_TYPE_CONNECT到 主动发起连接所在的服务的队列
    2. 如果不能立即成功,网络线程会不断的检测,即每次通过 epoll_wait来发现连接是否成功。一旦发现连接成功则会push SKYNET_SOCKET_TYPE_CONNECT到 主动发起连接所在的服务的队列
    3. 发起连接请求的服务处理 SKYNET_SOCKET_TYPE_CONNECT 消息,最后唤醒挂起的协程。
    4. socket.open 返回 连接的id

image-20220712095141254

下面具体看代码

static int
lconnect(lua_State *L) {
	size_t sz = 0;
	const char * addr = luaL_checklstring(L,1,&sz);
	char tmp[sz];
	int port = 0;
	const char * host = address_port(L, tmp, addr, 2, &port);
	if (port == 0) {
		return luaL_error(L, "Invalid port");
	}
	struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
	int id = skynet_socket_connect(ctx, host, port);//主动发起连接 返回的id是为新连接分配的id
	lua_pushinteger(L, id);

	return 1;
}

继续看 skynet_socket_connect。看下面的代码,从行14 开始。最终在行7 把请求写入管道。

int 
socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) {
	struct request_package request;
	int len = open_request(ss, &request, opaque, addr, port);
	if (len < 0)
		return -1;
	send_request(ss, &request, 'O', sizeof(request.u.open) + len);//把请求连接写入管道
	return request.u.open.id;
}

int 
skynet_socket_connect(struct skynet_context *ctx, const char *host, int port) {
	uint32_t source = skynet_context_handle(ctx);
	return socket_server_connect(SOCKET_SERVER, source, host, port);
}

下面我们看网络线程是怎么处理管道请求的。


static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
	// ctrl command only exist in local fd, so don't worry about endian.
	switch (type) {
	case 'O':
		return open_socket(ss, (struct request_open *)buffer, result);
    }
}

接下来主要看看 open_socket

static int
open_socket(struct socket_server *ss, struct request_open * request, struct socket_message *result) {
	int id = request->id;
	result->opaque = request->opaque;
	result->id = id;
	result->ud = 0;
	result->data = NULL;
	struct socket *ns;
	int status;
	struct addrinfo ai_hints;
	struct addrinfo *ai_list = NULL;
	struct addrinfo *ai_ptr = NULL;
	char port[16];
	sprintf(port, "%d", request->port);
	memset(&ai_hints, 0, sizeof( ai_hints ) );
	ai_hints.ai_family = AF_UNSPEC;
	ai_hints.ai_socktype = SOCK_STREAM;
	ai_hints.ai_protocol = IPPROTO_TCP;

	status = getaddrinfo( request->host, port, &ai_hints, &ai_list );//解析后得到的地址都放在ai_list
	int sock= -1;
	for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next ) {//找出一个可以连接的
		sock = socket( ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol );
		if ( sock < 0 ) {continue;}
		socket_keepalive(sock);//设置保活
		sp_nonblocking(sock);//设置非阻塞
		status = connect( sock, ai_ptr->ai_addr, ai_ptr->ai_addrlen);//发起连接
		if ( status != 0 && errno != EINPROGRESS) {close(sock);sock = -1;continue;} break;
	}
	if (sock < 0) { result->data = strerror(errno);goto _failed; }
	ns = new_fd(ss, id, sock, PROTOCOL_TCP, request->opaque, true);//最后一个参数 表示监听可读事件
	if(status == 0) {//已经连接成功
		ATOM_STORE(&ns->type , SOCKET_TYPE_CONNECTED);//设置为成功连接
		struct sockaddr * addr = ai_ptr->ai_addr;
		void * sin_addr = (ai_ptr->ai_family == AF_INET) ? (void*)&((struct sockaddr_in *)addr)->sin_addr : (void*)&((struct sockaddr_in6 *)addr)->sin6_addr;
		if (inet_ntop(ai_ptr->ai_family, sin_addr, ss->buffer, sizeof(ss->buffer))) {
			result->data = ss->buffer;//设置对端的地址
		}
		freeaddrinfo( ai_list );
		return SOCKET_OPEN;
	} else {//没能一次性连接成功 后面需要查询连接情况
		ATOM_STORE(&ns->type , SOCKET_TYPE_CONNECTING);//设置为连接进行中
		if (enable_write(ss, ns, true)) {//监听可写事件
			result->data = "enable write failed";
			goto _failed;
		}
	}
	return -1;
	return SOCKET_ERR;
}

行20 获取我们要连接的地址集合。

行22 从上面集合中找出一个可以成功连接的。注意这里都是非阻塞的socket

行 31 把之前分配的槽位id对应的socket结构进行填充。注意这里开启了 可读监听

行32 表示一次性连接成功。注意这里直接返回了 SOCKET_OPEN。后面会push一个消息到主动发起连接的服务。

行41 表示没能一次性连接成功。此时socket的类型为 SOCKET_TYPE_CONNECTING。后面网络线程会不断查询连接的状况。这里开启了 监听写事件。也即是说如果一次性没有连接成功,那么会同时开启监听读和写

假设我们没有一次性连接成功。我们看看最终网络线程是怎么发现连接成功的,以及后续怎么处理的。实际上,当我们调用sp_wait检测的时候,只要 socket上有事件发送,且socket的类型是SOCKET_TYPE_CONNECTING,我们就认为可能连接成功了。至于怎么后续处理,看下面代码行 15.

int 
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
	for (;;) {
		if (ss->event_index == ss->event_n) {//一轮批量处理完成 新的一轮需要开始了 调用一次sp_wait作为新一轮的开始
			ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
			ss->checkctrl = 1;
			ss->event_index = 0;
		}
		struct event *e = &ss->ev[ss->event_index++];
		struct socket *s = e->s;
		struct socket_lock l;
		socket_lock_init(s, &l);
		switch (ATOM_LOAD(&s->type)) {
		case SOCKET_TYPE_CONNECTING://查询主动发起的连接情况
			return report_connect(ss, s, &l, result);
		default:
			if (e->read) {}
			if (e->write) {}
			if (e->error) {}
			if (e->eof) {}
			break;
		}
	}
}

具体看看 report_connect

static int
report_connect(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
	int error;
	socklen_t len = sizeof(error);  
	int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);//注意这里判断连接成功的方法  
	if (code < 0 || error) {  
		error = code < 0 ? errno : error;
		force_close(ss, s, l, result);
		result->data = strerror(error);
		return SOCKET_ERR;
	} else {//连接成功
		ATOM_STORE(&s->type , SOCKET_TYPE_CONNECTED);
		result->opaque = s->opaque;
		result->id = s->id;
		result->ud = 0;
		if (nomore_sending_data(s)) {
			if (enable_write(ss, s, false)) {//因为之前监听了可写事件 且没有数据需要发送 所以现在关闭可写监听 不然该sock会一直报告可写事件
				force_close(ss,s,l, result);
				result->data = "disable write failed";
				return SOCKET_ERR;
			}
		}
		union sockaddr_all u;
		socklen_t slen = sizeof(u);
		if (getpeername(s->fd, &u.s, &slen) == 0) {
			void * sin_addr = (u.s.sa_family == AF_INET) ? (void*)&u.v4.sin_addr : (void *)&u.v6.sin6_addr;
			if (inet_ntop(u.s.sa_family, sin_addr, ss->buffer, sizeof(ss->buffer))) {
				result->data = ss->buffer;//获取对端的ip地址
				return SOCKET_OPEN;
			}
		}
		result->data = NULL;
		return SOCKET_OPEN;
	}
}

上面的代码主要是判断连接是否成功。

如果连接成功,且没有数据发送,就要把监听写关闭。因为我们的epoll默认是是水平触发模式,如果不关闭监听写事件的话,会一直有可写通知产生。最后返回值 也是 SOCKET_OPEN。最终会push消息给主动发起连接的服务。具体看看下面代码,也就是行9.

int 
skynet_socket_poll() {
	struct socket_server *ss = SOCKET_SERVER;
	assert(ss);
	struct socket_message result;
	int more = 1;
	int type = socket_server_poll(ss, &result, &more);
	switch (type) {
	case SOCKET_OPEN://新连接创建完成
		forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
		break;
	}
	if (more) {
		return -1;
	}
	return 1;
}

再看 forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);

static void
forward_message(int type, bool padding, struct socket_message * result) {
	struct skynet_socket_message *sm;
	size_t sz = sizeof(*sm);
	if (padding) {//true
		if (result->data) {
			size_t msg_sz = strlen(result->data);//对端地址
			if (msg_sz > 128) {
				msg_sz = 128;
			}
			sz += msg_sz;
		} 
	}
	sm = (struct skynet_socket_message *)skynet_malloc(sz);
	sm->type = type;//SKYNET_SOCKET_TYPE_CONNECT
	sm->id = result->id;//新连接的id
	sm->ud = result->ud;//0
	if (padding) {
		sm->buffer = NULL;
		memcpy(sm+1, result->data, sz - sizeof(*sm));//把对端地址写入末尾
	} 

	struct skynet_message message;
	message.source = 0;
	message.session = 0;
	message.data = sm;
	message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);//PTYPE_SOCKET放置在sz的高八位
	
	if (skynet_context_push((uint32_t)result->opaque, &message)) {//push到主动发起连接的服务
		// todo: report somewhere to close socket
		// don't call skynet_socket_close here (It will block mainloop)
		skynet_free(sm->buffer);
		skynet_free(sm);
	}
}

这里 最终把 SKYNET_SOCKET_TYPE_CONNECT 消息push到服务。接下我们看是怎么唤醒lua层挂起的协程的。

-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)
	local s = socket_pool[id]
	if s == nil then
		return
	end
	-- log remote addr
	if not s.connected then	-- resume may also post connect message
		s.connected = true
		wakeup(s) -- 唤醒之前挂起的协程
	end
end

看上面代码行 10,最终导致 socket.open 返回。

当然我们真正需要跟外部通讯,而且是主动发起时,skynet 推荐使用 socket channel。比如我们可以主动连接数据库服务器。

over

标签:11,socket,发起,ai,ss,SOCKET,result,连接,struct
From: https://www.cnblogs.com/waittingforyou/p/16972236.html

相关文章