首页 > 其他分享 >7-接受新连接

7-接受新连接

时间:2022-12-10 20:24:46浏览次数:66  
标签:SOCKET message id result 接受 TYPE 连接 socket

新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X

上一节讲了 发起网络监听 。现在接着说明 怎么接受新连接的。

image-20220909083634934

这个水龙头里面的水 主要就是业务逻辑需要的网络数据了。注意这是 不同于监听水龙头里面的内容。

image-20220629094424249

我们这里主要是讨论当有外部主动请求连接我们服务器时,skynet是怎么发现和处理的。

我们c层有一个网络线程,网络线程会不断的调用epoll提供的检测函数 epoll_wait 来发现网络事件。 也就是说,如果有可读事件或者可写事件发生了,我们是可以及时知道的。实际上这个检测函数是在 socket_server_pollsp_wait 里面调用的。如下所示

image-20220627104947204

看下面的 行5


static int 
sp_wait(int efd, struct event *e, int max) {
	struct epoll_event ev[max];
	int n = epoll_wait(efd , ev, max, -1);//此处是调用epoll提供的检测事件函数
	int i;
	for (i=0;i<n;i++) {
		e[i].s = ev[i].data.ptr;
		unsigned flag = ev[i].events;
		e[i].write = (flag & EPOLLOUT) != 0;
		e[i].read = (flag & EPOLLIN) != 0;
		e[i].error = (flag & EPOLLERR) != 0;
		e[i].eof = (flag & EPOLLHUP) != 0;
	}

	return n;
}

我们来分析下 sp_wait 主要做的事情。实际上他就是调用epoll的epoll_wait函数去检测各个socket上发生的事件,并暂时保存在一个数组中。我们这里是讨论新连接,实际上如果在监听socket上面有事件发生,那么我们认为可能有一个新连接请求到达了。socket_server_poll 主要就是把数组中的事件一个个的处理。看下面的代码

int 
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
	for (;;) {
		if (ss->event_index == ss->event_n) {//一轮批量处理完成 新的一轮需要开始了 调用一次sp_wait作为新一轮的开始
			ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);//检测新事件
			ss->checkctrl = 1;
			
			ss->event_index = 0;
		}
		struct event *e = &ss->ev[ss->event_index++];//取出某个socket身上发生的事件
		struct socket *s = e->s;//当前s是监听socket

		struct socket_lock l;
		socket_lock_init(s, &l);
		switch (ATOM_LOAD(&s->type)) {
		case SOCKET_TYPE_CONNECTING://查询主动发起的连接情况
			return report_connect(ss, s, &l, result);
		case SOCKET_TYPE_LISTEN: {
			int ok = report_accept(ss, s, result);//有新的外部连接请求 s是监听socket
			if (ok > 0) {
				return SOCKET_ACCEPT;//lua层收到此消息后 需要调用 socketdriver.start才能使 新连接由 SOCKET_TYPE_PACCEPT 变 SOCKET_TYPE_CONNECTED
			} if (ok < 0 ) {
				return SOCKET_ERR;
			}
			// when ok == 0, retry
			break;
		}
		case SOCKET_TYPE_INVALID:
			skynet_error(NULL, "socket-server: invalid socket");
			break;
		default:
			
	}
}

上面的代码 行15 说明是根据socket的type来做分支处理的。直接看 行18,我们还记得,上一节讨论发起网络监听时,监听socket的type最终会被设置为 SOCKET_TYPE_LISTEN。也就是说当前是在监听socket上发生的事件。那么说明有新连接请求。此时交给report_accept处理,看下面的代码。实际上主要做的事情是,

  1. 向skynet获取一个槽位,代表新的连接
  2. .填充这个槽位对应的socket结构
  3. 加入epoll 但是并没立即监听可读事件。这里跟当初发起监听类似,所以当前状态是预接受 即 SOCKET_TYPE_PACCEPT
  4. 获取请求的ip地址
static int //此时s是监听socket
report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) {
	union sockaddr_all u;
	socklen_t len = sizeof(u);
	int client_fd = accept(s->fd, &u.s, &len);//利用操作系统api 获取新连接是socket描述符
	if (client_fd < 0) {
		if (errno == EMFILE || errno == ENFILE) {
			result->opaque = s->opaque;
			result->id = s->id;
			result->ud = 0;
			result->data = strerror(errno);
			return -1;
		} else {
			return 0;
		}
	}
	int id = reserve_id(ss);//向skynet申请一个槽位
	if (id < 0) {
		close(client_fd);
		return 0;
	}
	socket_keepalive(client_fd);//设置网络保活
	sp_nonblocking(client_fd);//设置非阻塞
	struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);//填充槽位对应的结构。最后一个参数表示当前不监听可读事件
	if (ns == NULL) {
		close(client_fd);
		return 0;
	}
	// accept new one connection
	stat_read(ss,s,1);

	ATOM_STORE(&ns->type , SOCKET_TYPE_PACCEPT);//设置此时socket的类型 注意此时是  SOCKET_TYPE_PACCEPT
	result->opaque = s->opaque;//监听所在的服务地址
	result->id = s->id;//监听id
	result->ud = id;//代表新连接的id
	result->data = NULL;

	if (getname(&u, ss->buffer, sizeof(ss->buffer))) {
		result->data = ss->buffer;//设置新连接的请求ip地址
	}

	return 1;
}

上面的代码最终返回值是 1 ,回到上一层代码看看

case SOCKET_TYPE_LISTEN: {
			int ok = report_accept(ss, s, result);//有新的外部连接请求
			if (ok > 0) {
				return SOCKET_ACCEPT;//lua层收到此消息后 需要调用 socketdriver.start才能使 新连接由 SOCKET_TYPE_PACCEPT 变 SOCKET_TYPE_CONNECTED

也就是说这里会返回值 SOCKET_ACCEPT 。也就是 socket_server_poll 的返回值。我们知道 socket_server_pollskynet_socket_poll 调用的。注意下面的代码 行10。

int 
skynet_socket_poll() {
	struct socket_server *ss = SOCKET_SERVER;
	assert(ss);
	struct socket_message result;
	int more = 1;
	int type = socket_server_poll(ss, &result, &more);
	switch (type) {
	
	case SOCKET_ACCEPT://新连接 预创建完成, lua层收到此消息后需调用 socketdriver.start 最终会进入 case SOCKET_OPEN:表示新连接 真正创建完成
		forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
		break;
	
	return 1;
}

最终会调用 forward_message push一个消息到队列。注意此时的参数 type是SKYNET_SOCKET_TYPE_ACCEPT padding是true

static void
forward_message(int type, bool padding, struct socket_message * result) {
	struct skynet_socket_message *sm;
	size_t sz = sizeof(*sm);
	if (padding) {
		if (result->data) {//此时的data是请求连接的ip地址
			size_t msg_sz = strlen(result->data);
			if (msg_sz > 128) {
				msg_sz = 128;
			}
			sz += msg_sz;
		} 
	}
	sm = (struct skynet_socket_message *)skynet_malloc(sz);//分配内存
	sm->type = type;//SKYNET_SOCKET_TYPE_ACCEPT
	sm->id = result->id;//监听id
	sm->ud = result->ud;//新连接的id
	if (padding) {
		sm->buffer = NULL;
		memcpy(sm+1, result->data, sz - sizeof(*sm));//把ip地址放置到sm的尾部
	} 

	struct skynet_message message;
	message.source = 0;
	message.session = 0;
	message.data = sm;
	message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);//PTYPE_SOCKET放置在sz的高八位
	
	if (skynet_context_push((uint32_t)result->opaque, &message)) {//push消息到监听服务对应的队列
		// todo: report somewhere to close socket
		// don't call skynet_socket_close here (It will block mainloop)
		skynet_free(sm->buffer);
		skynet_free(sm);
	}
}

以上代码主要是发送了一个网络消息到监听服务所在的队列。注意这个网络消息的具体类型是 SKYNET_SOCKET_TYPE_ACCEPT

此后我们的lua层处理该接力了。实际上lua层处理过程是 skynet.dispatch_message - -> raw_dispatch_message 注意行8 获取f函数

-- skynet.lua
local function raw_dispatch_message(prototype, msg, sz, session, source)
	-- skynet.PTYPE_RESPONSE = 1, read skynet.h
	if prototype == 1 then --这里是处理响应
		-- 略
	else --这里主要是处理lua text socket 等消息类型
		local p = proto[prototype]
		
		local f = p.dispatch
		if f then
			local co = co_create(f)	--获取一个协程对象并设置任务函数f
			session_coroutine_id[co] = session --保存session以便找到回去的路;注意这里的session是其他服务独立产生的,所以不同请求者带过来的session可以是相同的
			session_coroutine_address[co] = source --保存source以便找到回去的路 即记录请求者是谁
	
			suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
		else
			--略
		end
	end
end

而这个所谓的 任务函数 f是在 socket.lua里面注册的。如下所示 行6. 最终会根据 t 调用socket_message里面对应的函数。t 主要是网络消息的具体类型,主要有下面几个

  • SKYNET_SOCKET_TYPE_DATA
  • SKYNET_SOCKET_TYPE_CONNECT
  • SKYNET_SOCKET_TYPE_CLOSE
  • SKYNET_SOCKET_TYPE_ACCEPT
  • SKYNET_SOCKET_TYPE_ERROR
-- socket.lua
skynet.register_protocol {
	name = "socket",
	id = skynet.PTYPE_SOCKET,	-- PTYPE_SOCKET = 6
	unpack = driver.unpack,--把struct skynet_socket_message内的数据提取出来 即 type id ud buffer 注意type不同 后面参数的意义也是不同的
	dispatch = function (_, _, t, ...) --t是 SKYNET_SOCKET_TYPE_ACCEPT
		socket_message[t](...) --监听id 新连接id 新连接ip地址
	end
}

我们此时的t是SKYNET_SOCKET_TYPE_ACCEPT 所以调用函数如下

-- SKYNET_SOCKET_TYPE_ACCEPT = 4
socket_message[4] = function(id, newid, addr) --参数的意思: id是监听id newid是新连接id addr是请求连接者的ip地址
	local s = socket_pool[id]
	if s == nil then
		driver.close(newid)
		return
	end
	s.callback(newid, addr) --新连接的回调函数
end

最终会调用 行8. 还记得吗 我们发起网络监听的时候,是设置了一个回调函数的。这个回调函数就是当有新连接消息来的时候触发的。回顾下代码

		local listenid = socket.listen("127.0.0.1", 8001)
	

		socket.start(listenid , function(newid, addr)--newid 是新连接的id, addr是新连接的发起者ip
        
        	print("new connection id is ".. newid)
			print("connect from " .. addr)
        
	
		end)

这里socket.start(listenid,accept)注册了一个accept函数。这里我们只是简单的做了打印信息。

另外,实际上每次调用accept这个回调函数,都是在某个协程里面执行的。也就是说 如果有多个新连接消息达到,实际上每个回调函数都是在不同的协程中执行的。

新连接就算是完成了?不是。

还记得,我们的c层当发现有新连接的时候,底层并没有立即开启可读事件监听,而是把底层的socket的type设置为 SOCKET_TYPE_PACCEPT ,也就是预接受状态。也就是还没有变成完全的连接状态。所以我们需要在某个服务中调用socket.start(newid)。这样底层才知道,lua层已经完成做好准备。之前网络监听的时候,已经具体讨论过socket.start的基本流程。我们知道socket.start函数主要驱动做了下面这些事情

  • 把请求写入管道然后挂起当前协程。

    挂起是因为socket.start里面调用了connet函数。connet函数还为即将正式启用的新连接创建了一个对象。这个对象初始化了连接的相关信息。

  • 网络线程读取管道请求做处理,然后push一个消息到队列,

  • 最后lua层处理消息。

网络监听的时候,我们调用socket.start是告诉底层我们最终的监听服务是在哪里,这里我们调用socket.start是告诉底层,我们最终哪个服务负责处理这个新连接。

我们先看写管道

void
socket_server_start(struct socket_server *ss, uintptr_t opaque, int id) {
	struct request_package request;
	request.u.resumepause.id = id;//新连接的id
	request.u.resumepause.opaque = opaque;//调用socket.start所在的服务
	send_request(ss, &request, 'R', sizeof(request.u.resumepause));//写入管道
}

再看网络线程的处理过程

static int
resume_socket(struct socket_server *ss, struct request_resumepause *request, struct socket_message *result) {
	int id = request->id;
	result->id = id;//新连接的id
	result->opaque = request->opaque;//调用socket.start时所在的服务
	result->ud = 0;
	result->data = NULL;
	struct socket *s = &ss->slot[HASH_ID(id)];
	if (socket_invalid(s, id)) {
		result->data = "invalid socket";
		return SOCKET_ERR;
	}
	if (halfclose_read(s)) {
		// The closing socket may be in transit, so raise an error. See https://github.com/cloudwu/skynet/issues/1374
		result->data = "socket closed";
		return SOCKET_ERR;
	}
	struct socket_lock l;
	socket_lock_init(s, &l);
	if (enable_read(ss, s, true)) {//开启可读事件监听
		result->data = "enable read failed";
		return SOCKET_ERR;
	}
	uint8_t type = ATOM_LOAD(&s->type);
	if (type == SOCKET_TYPE_PACCEPT || type == SOCKET_TYPE_PLISTEN) {//修改状态
		ATOM_STORE(&s->type , (type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN);
		s->opaque = request->opaque;
		result->data = "start";
		return SOCKET_OPEN;
	} else if (type == SOCKET_TYPE_CONNECTED) {
	
	}
	// if s->type == SOCKET_TYPE_HALFCLOSE_WRITE , SOCKET_CLOSE message will send later
	return -1;
}

注意 行20,这里开启了可读监听。再看 行25, 此时的 type是 SOCKET_TYPE_PACCEPT。 也就是预接受状态。行26 最后设置成了 SOCKET_TYPE_CONNECTED ,最后返回了 SOCKET_OPEN。我们重新来看网络线程的处理流程。网络线程会一直循环调用skynet_socket_poll。里面又会调用 socket_server_poll。看如下代码

int 
skynet_socket_poll() {

	int type = socket_server_poll(ss, &result, &more);//上面分析了,这次返回值是 SOCKET_OPEN 
	switch (type) {

	case SOCKET_OPEN://新连接正式完成
		forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
		break;

	}
}

继续看 forward_message。注意参数 type是 SKYNET_SOCKET_TYPE_CONNECT padding是true

static void
forward_message(int type, bool padding, struct socket_message * result) {//next
	struct skynet_socket_message *sm;
	size_t sz = sizeof(*sm);
	if (padding) {
		if (result->data) {//当前的data是 "start"
			size_t msg_sz = strlen(result->data);
			if (msg_sz > 128) {
				msg_sz = 128;
			}
			sz += msg_sz;
		}
	}
	sm = (struct skynet_socket_message *)skynet_malloc(sz);//分配内存 这个sz已经包括result->data占用的字节了
	sm->type = type;//SKYNET_SOCKET_TYPE_CONNECT
	sm->id = result->id;//新连接的id
	sm->ud = result->ud;//0
	if (padding) {
		sm->buffer = NULL;
		memcpy(sm+1, result->data, sz - sizeof(*sm));//把result->data的数据放置到sm尾部。
	} 

	struct skynet_message message;
	message.source = 0;
	message.session = 0;
	message.data = sm;
	message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);//PTYPE_SOCKET放置在sz的高八位
	
	if (skynet_context_push((uint32_t)result->opaque, &message)) {//push消息到调用socket.start的服务对应的队列
		// todo: report somewhere to close socket
		// don't call skynet_socket_close here (It will block mainloop)
		skynet_free(sm->buffer);
		skynet_free(sm);
	}
}

最终会push一个消息到调用socket.start所在服务的队列。此时又到lua层接力处理了。这次我们根据网络消息的具体类型 SKYNET_SOCKET_TYPE_CONNECT 直接看最终的处理函数

-- SKYNET_SOCKET_TYPE_CONNECT = 2
socket_message[2] = function(id, _ , addr)  
	local s = socket_pool[id] -- 通过新连接的id找到对应的对象 这个对象在调用socket.start时已经创建
	if s == nil then
		return
	end
	-- log remote addr
	if not s.connected then	-- resume may also post connect message
		s.connected = true
		wakeup(s) -- 把调用socket.start时挂起的协程加入唤醒队列
	end
end

上面是 addr 参数其实是"start" 根本不是地址;当然在某种情况下也可以是"transfer" ,这个文末会说明

这里是把之前挂起的协程x加入唤醒队列了。之后协程x会在合适的时候被执行,也就是挂起的connet函数会继续执行了。

这里总结下 socket.start做的事情。到目前为止,我们主要用在监听和接受新连接上面。

  • 对于监听来说,主要是底层检测到有新连接的时候,底层会把通知push到监听启用的最终位置,即监听所在的服务。
  • 对于新连接来说,主要是告诉底层新连接最终是在哪里位置启用的。这样当底层发现这个连接上有数据到来时,知道要把消息发送给哪个服务。主要是我们在lua层写业务代码时,有时候想底层把这个连接上的数据发送到其他服务上去。所以socket.start就提供了这样一个修改启用位置的机会。看看下面的代码
static int
resume_socket(struct socket_server *ss, struct request_resumepause *request, struct socket_message *result) {
	//略
	uint8_t type = ATOM_LOAD(&s->type);
	if (type == SOCKET_TYPE_PACCEPT || type == SOCKET_TYPE_PLISTEN) {
		ATOM_STORE(&s->type , (type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN);
		s->opaque = request->opaque;
		result->data = "start";
		return SOCKET_OPEN;
	} else if (type == SOCKET_TYPE_CONNECTED) {
		// todo: maybe we should send a message SOCKET_TRANSFER to s->opaque
		s->opaque = request->opaque;//修改了连接内部绑定的服务地址
		result->data = "transfer";
		return SOCKET_OPEN;
	}
	// if s->type == SOCKET_TYPE_HALFCLOSE_WRITE , SOCKET_CLOSE message will send later
	return -1;
}

假设一个连接是正常状态 即 SOCKET_TYPE_CONNECTED,此时我们调用 socket.start(),那么最终会执行 行12的代码。也就是会修改连接对应的服务地址。此时负责处理新连接的服务会在分发消息时发现socket_message[2] = function(id, _ , addr) ,而 addr 就是"transfer".

标签:SOCKET,message,id,result,接受,TYPE,连接,socket
From: https://www.cnblogs.com/waittingforyou/p/16972225.html

相关文章