首页 > 其他分享 >9-写网络数据

9-写网络数据

时间:2022-12-10 20:23:59浏览次数:60  
标签:struct buffer 网络 send write ss 数据 socket

新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X


skynet对于写网络数据。可以使用 socket.write(id,str)

-- socket.lua
socket.write = assert(driver.send)

实际上写的操作过程如下所示

image-20220705104909083

针对 socket.write(id,str)我们下面具体看代码 从行21 开始

static void
get_buffer(lua_State *L, int index, struct socket_sendbuffer *buf) {
	void *buffer;
	switch(lua_type(L, index)) {
		size_t len;
	case LUA_TUSERDATA:
		break;
	case LUA_TLIGHTUSERDATA: {
		break;
		}
	case LUA_TTABLE:
		break;
	default://我们分析的代码是走这里
		buf->type = SOCKET_BUFFER_RAWPOINTER;//这里没有新分配内存
		buf->buffer = luaL_checklstring(L, index, &buf->sz);//获取lua字符串在虚拟机中对应的c字符串指针
		break;
	}
}

static int
lsend(lua_State *L) {
	struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
	int id = luaL_checkinteger(L, 1);
	struct socket_sendbuffer buf;
	buf.id = id;
	get_buffer(L, 2, &buf);//把我们传入的str提取到buf中
	int err = skynet_socket_sendbuffer(ctx, &buf);
	lua_pushboolean(L, !err);
	return 1;
}

行26主要是构建一个buff表示我们要发送出去的数据。继续看 行27

int
skynet_socket_sendbuffer(struct skynet_context *ctx, struct socket_sendbuffer *buffer) {
	return socket_server_send(SOCKET_SERVER, buffer);
}

继续看 socket_server_send。这里主要做的事情是

  1. 如果数据可以直接发送,就调用网络接口write直接发送。如果一次性没有发送完,那么把剩余的数据保存到所谓的 直接写缓冲区dw_buffer。然后通过管道告诉skynet需要开启可写监听。这样当可写事件到来的时候,网络线程把dw_buffer里面的数据发出去。
  2. 如果不可以直接发送,就把数据写入管道。网络线程会把管道数据读出来,加入到连接对应的缓冲链表中。实际上每一个连接都会有两个缓冲链表,一个是高优先级,一个是低优先级。当我们无法一次性把数据发出去时,就会先保存在这个缓冲链表中。当可写事件到来时,网络线程会把缓冲链表的数据发送出去。
int 
socket_server_send(struct socket_server *ss, struct socket_sendbuffer *buf) {
	int id = buf->id;
	struct socket * s = &ss->slot[HASH_ID(id)];
	struct socket_lock l;
	socket_lock_init(s, &l);
	if (can_direct_write(s,id) && socket_trylock(&l)) {
		// may be we can send directly, double check
		if (can_direct_write(s,id)) {
			// send directly
			struct send_object so;
			send_object_init_from_sendbuffer(ss, &so, buf);
			ssize_t n;
			if (s->protocol == PROTOCOL_TCP) {
				n = write(s->fd, so.buffer, so.sz);//在当前工作线程直接发送
			} 
			stat_write(ss,s,n);
			if (n == so.sz) {//一次发送就完成了
				// write done
				socket_unlock(&l);
				so.free_func((void *)buf->buffer);
				return 0;
			}
			// write failed, put buffer into s->dw_* , and let socket thread send it. see send_buffer()
			s->dw_buffer = clone_buffer(buf, &s->dw_size);//分配内存 大小是原始发送数据的大小 
			s->dw_offset = n;//记录已经发送完成的数据的位置 这个位置后面的数据就是下次需要发送的

			socket_unlock(&l);
			struct request_package request;
			// let socket thread enable write event 监听可写事件
			send_request(ss, &request, 'W', sizeof(request.u.send));
			return 0;
		}
		socket_unlock(&l);
	}
	inc_sending_ref(s, id);
	struct request_package request;
	request.u.send.id = id;
	request.u.send.buffer = clone_buffer(buf, &request.u.send.sz);//分配内存
	send_request(ss, &request, 'D', sizeof(request.u.send));//把数据写入管道
	return 0;
}

  • 如果一次性可以发送完成,那么就不会额外分配内存。行17 调用write发送数据的时候,我们发送的buffer指针是lua虚拟机c字符串的指针,这个内存是lua虚拟机分配管理的。通过上面代码中的 行31和 行53 可以知道。只要还有数据没有发送出去,就会额外分配内存来保存剩余的数据。

行40 把没法直接发送的数据写入了管道。在这之前 行36做了一次引用计数。表示我们lua层调用write打算发送出去的网络数据是先写入了管道,网络线程还没有读取管道。意思是我们这个连接上要发送的数据开始有点多了。实际上在网络线程读取了这个请求后,引用计数又会减少一次。我们看看代码

static inline void
inc_sending_ref(struct socket *s, int id) {
	if (s->protocol != PROTOCOL_TCP)
		return;
	for (;;) {
		unsigned long sending = ATOM_LOAD(&s->sending);
		if ((sending >> 16) == ID_TAG16(id)) {//sending的高16位跟当前id的高16位比较
			if ((sending & 0xffff) == 0xffff) {
				// s->sending may overflow (rarely), so busy waiting here for socket thread dec it. see issue #794
				continue;
			}
			// inc sending only matching the same socket id
			if (ATOM_CAS(&s->sending, sending, sending + 1))
				return;
			// atom inc failed, retry
		} else {//id是无效的
			// socket id changed, just return
			return;
		}
	}
}

上面这段代码就是给连接 加一次发送引用。这个s->sending是32位的,高16位保存的是对应连接id的高16位,低16位用做引用计数。在调用这个函数前,我们是通过id拿到socket的,即struct socket * s = &ss->slot[HASH_ID(id)]; 我们知道如果一个id连接关闭了,那么他对应的槽位,即socket是可能会分配给其他id的。这样一来,不同的id都可以拿到相同的socket,特别是如果一个已经过期的id来发送数据,肯定不能让他发送。既然他们都可以得到同样的槽位,其实说明新老id的低16位是相同的。但是老id的高16位跟当前id的高16位一定不同。所以通过 行7 就可以判断当前的id是不是无效的。

之后再网络线程处理管道数据后,引用计数就会减一次。

我们看看网络线程是怎么处理管道数据的。注意下面 行6开始。我们这里type是 D,属于发送高优先级数据。

// return type
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
	switch (type) {
	case 'D':
	case 'P': {
		int priority = (type == 'D') ? PRIORITY_HIGH : PRIORITY_LOW;//高优先级链表 还是 低优先级链表
		struct request_send * request = (struct request_send *) buffer;
		int ret = send_socket(ss, request, result, priority, NULL);
		dec_sending_ref(ss, request->id);//引用减一
		return ret;
	}
	return -1;
}

具体看行 9 的 send_socket 。这里主要就是把 数据加入到链表中。

static int
send_socket(struct socket_server *ss, struct request_send * request, struct socket_message *result, int priority, const uint8_t *udp_address) {
	int id = request->id;
	struct socket * s = &ss->slot[HASH_ID(id)];
	struct send_object so;
	send_object_init(ss, &so, request->buffer, request->sz);
	uint8_t type = ATOM_LOAD(&s->type);

	if (send_buffer_empty(s)) {//两个链表都是空
		if (s->protocol == PROTOCOL_TCP) {
			append_sendbuffer(ss, s, request);	// add to high priority list, even priority == PRIORITY_LOW
		} 
		if (enable_write(ss, s, true)) {//开启写监听
			return report_error(s, result, "enable write failed");
		}
	} else {
		if (s->protocol == PROTOCOL_TCP) {
			if (priority == PRIORITY_LOW) {
				append_sendbuffer_low(ss, s, request);
			} else {
				append_sendbuffer(ss, s, request);
			}
		} 
	}
	return -1;
}

我们主要看 上面的 append_sendbuffer。这个函数其实就是把要发送的数据打包分配一个 write_buffer,然后加入到链表中。

具体从下面行22开始看。

static struct write_buffer *
append_sendbuffer_(struct socket_server *ss, struct wb_list *s, struct request_send * request, int size) {
	struct write_buffer * buf = MALLOC(size);//分配一个内存
	struct send_object so;
	buf->userobject = send_object_init(ss, &so, request->buffer, request->sz);
	buf->ptr = (char*)so.buffer;//剩余需要发送的数据的起始位置
	buf->sz = so.sz;//剩余需要发送的数据
	buf->buffer = request->buffer;//写缓冲区的原始数据
	buf->next = NULL;
	if (s->head == NULL) {
		s->head = s->tail = buf;
	} else {
		assert(s->tail != NULL);
		assert(s->tail->next == NULL);
		s->tail->next = buf;
		s->tail = buf;
	}
	return buf;
}

static inline void
append_sendbuffer(struct socket_server *ss, struct socket *s, struct request_send * request) {
	struct write_buffer *buf = append_sendbuffer_(ss, &s->high, request, sizeof(*buf));
	s->wb_size += buf->sz;//累计写缓冲区的大小 注意不是 s->dw_buff
}

我们构建一个 write_buffer 时,需要注意其中几个成员的意义。当我们把 write_buffer 缓冲区的数据交给网络线程发送时,可能无法一次性发送完。比如我们的 write_buffer原本保存的是 50个字节,我们网络线程可能只发送了20个字节,那么剩余的30个字节需要下次再发送出去。这时候 ptr和sz 就表示剩余需要发送数据的信息。

当把数据加入链表后,网络线程会在可写事件监听发生时,会调用 write 函数 把数据真正发送出去。看下面的行20.

int 
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
	for (;;) {
		if (ss->checkctrl) {	
		}
		if (ss->event_index == ss->event_n) {//一轮批量处理完成 新的一轮需要开始了 调用一次sp_wait作为新一轮的开始
			ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);//检测网络事件
			ss->checkctrl = 1;
			ss->event_index = 0;	
		}
		struct event *e = &ss->ev[ss->event_index++];
		struct socket *s = e->s;

		struct socket_lock l;
		socket_lock_init(s, &l);
		switch (ATOM_LOAD(&s->type)) {
		default:
			if (e->write) {
				int type = send_buffer(ss, s, &l, result);//是时候发送数据出去了
				if (type == -1)
					break;
				return type;
			}
			}
		}
	}
}

进入行20内部看看。实际上主要做两件事

  1. 直接缓存dw_buffer里面的数据包装成 一个write_buffer ,然后加入链表。但是是加入链表是头部。也就是说发送时优先级最高
  2. 调用 send_buffer_ 去发送 链表里面的数据
static int
send_buffer(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
	if (!socket_trylock(l))
		return -1;	// blocked by direct write, send later.
	if (s->dw_buffer) {
		// add direct write buffer before high.head
		struct write_buffer * buf = MALLOC(sizeof(*buf));//分配内存
		struct send_object so;
		buf->userobject = send_object_init(ss, &so, (void *)s->dw_buffer, s->dw_size);
		buf->ptr = (char*)so.buffer+s->dw_offset;//剩余发送的其实位置
		buf->sz = so.sz - s->dw_offset;//剩余没有发送的
		buf->buffer = (void *)s->dw_buffer;//原始数据 包括发送和没发送的
		s->wb_size+=buf->sz;
		if (s->high.head == NULL) {
			s->high.head = s->high.tail = buf;
			buf->next = NULL;
		} else {
			buf->next = s->high.head;
			s->high.head = buf;
		}
		s->dw_buffer = NULL;//表示 直接缓冲区 清空了
	}
	int r = send_buffer_(ss,s,l,result);
	socket_unlock(l);

	return r;
}

我们具体去看看 send_buffer_

static int
send_buffer_(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
	assert(!list_uncomplete(&s->low));//low链表中不能存在某个节点数据只发送了一部分
	// step 1
	int ret = send_list(ss,s,&s->high,l,result);//发送高优先级链表中的数据

	if (s->high.head == NULL) {//如果高优先级链表为空了
		// step 2
		if (s->low.head != NULL) {//发送低优先级链表数据
			int ret = send_list(ss,s,&s->low,l,result);
			// step 3
			if (list_uncomplete(&s->low)) {//如果低优先级链表中有一个节点数据只发送了一部分
				raise_uncomplete(s);//则把这个节点的数据添加到 已经空的高优先级链表中
				return -1;
			}
			if (s->low.head)
				return -1;
		} 
		// step 4
		assert(send_buffer_empty(s) && s->wb_size == 0);

		if (s->closing) {
			// finish writing
			force_close(ss, s, l, result);
			return -1;
		}
		int err = enable_write(ss, s, false);//关闭可写监听
	}

	return -1;
}

上面的代码主要做的是事情

  1. 先把高优先级链表里面的数据发送

  2. 高优先级的数据都发送完了 那么开始发送低优先级链表的数据

    1. 如果低优先级数据不能在这次发完,那么把还有剩余数据的那个节点添加到高优先级链表中去
    2. 如果两个链表的数据都发送完了,就关闭可写监听。

最后我们看看到底 send_list是怎么发送的 看行34

static int
send_list_tcp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
	while (list->head) {
		struct write_buffer * tmp = list->head;
		for (;;) {
			ssize_t sz = write(s->fd, tmp->ptr, tmp->sz);
			if (sz < 0) {//出现异常情况
				switch(errno) {
				case EINTR:
					continue;
				case AGAIN_WOULDBLOCK:
					return -1;
				}
				return close_write(ss, s, l, result);
			}
			stat_write(ss,s,(int)sz);
			s->wb_size -= sz;
			if (sz != tmp->sz) {
				tmp->ptr += sz;//剩余没发送的起始位置
				tmp->sz -= sz;//剩余没发送的还有多少
				return -1;
			}
			break;
		}
		list->head = tmp->next;
		write_buffer_free(ss,tmp);//发送完一个write_buffer ,释放内部的 发送数据 和 write_buffer自己
	}
	list->tail = NULL;
	return -1;
}
static int
send_list(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
	if (s->protocol == PROTOCOL_TCP) {
		return send_list_tcp(ss, s, list, l, result);
	} 
}

上面的代码就是不断的把链表中的节点取出来,然后通过write函数发送出去。最后释放节点的内存。

关于写数据时出现异常,我们会在关闭连接时再次讨论。

在多线程环境下,针对同一个网络套接字socket id每一次调用write,实际上都是原子操作。也就是你在a线程调用write成功写入123,我在b线程调用write写入abc,最终写入的是 123abc或者是abc123.不可能是其他组合状态。比如不可能是12ab3c这种。

标签:struct,buffer,网络,send,write,ss,数据,socket
From: https://www.cnblogs.com/waittingforyou/p/16972228.html

相关文章