新入门skynet系列视频b站网址 https://www.bilibili.com/video/BV19d4y1678X
skynet对于写网络数据。可以使用 socket.write(id,str)
。
-- socket.lua
socket.write = assert(driver.send)
实际上写的操作过程如下所示
针对 socket.write(id,str)
我们下面具体看代码 从行21 开始
static void
get_buffer(lua_State *L, int index, struct socket_sendbuffer *buf) {
void *buffer;
switch(lua_type(L, index)) {
size_t len;
case LUA_TUSERDATA:
break;
case LUA_TLIGHTUSERDATA: {
break;
}
case LUA_TTABLE:
break;
default://我们分析的代码是走这里
buf->type = SOCKET_BUFFER_RAWPOINTER;//这里没有新分配内存
buf->buffer = luaL_checklstring(L, index, &buf->sz);//获取lua字符串在虚拟机中对应的c字符串指针
break;
}
}
static int
lsend(lua_State *L) {
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = luaL_checkinteger(L, 1);
struct socket_sendbuffer buf;
buf.id = id;
get_buffer(L, 2, &buf);//把我们传入的str提取到buf中
int err = skynet_socket_sendbuffer(ctx, &buf);
lua_pushboolean(L, !err);
return 1;
}
行26主要是构建一个buff表示我们要发送出去的数据。继续看 行27
int
skynet_socket_sendbuffer(struct skynet_context *ctx, struct socket_sendbuffer *buffer) {
return socket_server_send(SOCKET_SERVER, buffer);
}
继续看 socket_server_send
。这里主要做的事情是
- 如果数据可以
直接发送
,就调用网络接口write直接发送。如果一次性没有发送完,那么把剩余的数据保存到所谓的 直接写缓冲区 即dw_buffer
。然后通过管道告诉skynet需要开启可写监听。这样当可写事件到来
的时候,网络线程把dw_buffer
里面的数据发出去。 - 如果不可以直接发送,就把数据写入管道。网络线程会把管道数据读出来,加入到连接对应的
缓冲链表中
。实际上每一个连接都会有两个缓冲链表,一个是高优先级,一个是低优先级。当我们无法一次性把数据发出去时,就会先保存在这个缓冲链表中。当可写事件到来时,网络线程会把缓冲链表的数据发送出去。
int
socket_server_send(struct socket_server *ss, struct socket_sendbuffer *buf) {
int id = buf->id;
struct socket * s = &ss->slot[HASH_ID(id)];
struct socket_lock l;
socket_lock_init(s, &l);
if (can_direct_write(s,id) && socket_trylock(&l)) {
// may be we can send directly, double check
if (can_direct_write(s,id)) {
// send directly
struct send_object so;
send_object_init_from_sendbuffer(ss, &so, buf);
ssize_t n;
if (s->protocol == PROTOCOL_TCP) {
n = write(s->fd, so.buffer, so.sz);//在当前工作线程直接发送
}
stat_write(ss,s,n);
if (n == so.sz) {//一次发送就完成了
// write done
socket_unlock(&l);
so.free_func((void *)buf->buffer);
return 0;
}
// write failed, put buffer into s->dw_* , and let socket thread send it. see send_buffer()
s->dw_buffer = clone_buffer(buf, &s->dw_size);//分配内存 大小是原始发送数据的大小
s->dw_offset = n;//记录已经发送完成的数据的位置 这个位置后面的数据就是下次需要发送的
socket_unlock(&l);
struct request_package request;
// let socket thread enable write event 监听可写事件
send_request(ss, &request, 'W', sizeof(request.u.send));
return 0;
}
socket_unlock(&l);
}
inc_sending_ref(s, id);
struct request_package request;
request.u.send.id = id;
request.u.send.buffer = clone_buffer(buf, &request.u.send.sz);//分配内存
send_request(ss, &request, 'D', sizeof(request.u.send));//把数据写入管道
return 0;
}
- 如果一次性可以发送完成,那么就不会额外分配内存。行17 调用write发送数据的时候,我们发送的buffer指针是lua虚拟机c字符串的指针,这个内存是lua虚拟机分配管理的。通过上面代码中的 行31和 行53 可以知道。只要还有数据没有发送出去,就会额外分配内存来保存剩余的数据。
行40 把没法直接发送的数据写入了管道。在这之前 行36做了一次引用计数。表示我们lua层调用write打算发送出去的网络数据是先写入了管道,网络线程还没有读取管道。意思是我们这个连接上要发送的数据开始有点多了。实际上在网络线程读取了这个请求后,引用计数又会减少一次。我们看看代码
static inline void
inc_sending_ref(struct socket *s, int id) {
if (s->protocol != PROTOCOL_TCP)
return;
for (;;) {
unsigned long sending = ATOM_LOAD(&s->sending);
if ((sending >> 16) == ID_TAG16(id)) {//sending的高16位跟当前id的高16位比较
if ((sending & 0xffff) == 0xffff) {
// s->sending may overflow (rarely), so busy waiting here for socket thread dec it. see issue #794
continue;
}
// inc sending only matching the same socket id
if (ATOM_CAS(&s->sending, sending, sending + 1))
return;
// atom inc failed, retry
} else {//id是无效的
// socket id changed, just return
return;
}
}
}
上面这段代码就是给连接 加一次发送引用。这个s->sending是32位的,高16位保存的是对应连接id的高16位,低16位用做引用计数。在调用这个函数前,我们是通过id拿到socket的,即struct socket * s = &ss->slot[HASH_ID(id)];
我们知道如果一个id连接关闭了,那么他对应的槽位,即socket是可能会分配给其他id的。这样一来,不同的id都可以拿到相同的socket,特别是如果一个已经过期的id来发送数据,肯定不能让他发送。既然他们都可以得到同样的槽位,其实说明新老id的低16位是相同的。但是老id的高16位跟当前id的高16位一定不同。所以通过 行7 就可以判断当前的id是不是无效的。
之后再网络线程处理管道数据后,引用计数就会减一次。
我们看看网络线程是怎么处理管道数据的。注意下面 行6开始。我们这里type是 D,属于发送高优先级数据。
// return type
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
switch (type) {
case 'D':
case 'P': {
int priority = (type == 'D') ? PRIORITY_HIGH : PRIORITY_LOW;//高优先级链表 还是 低优先级链表
struct request_send * request = (struct request_send *) buffer;
int ret = send_socket(ss, request, result, priority, NULL);
dec_sending_ref(ss, request->id);//引用减一
return ret;
}
return -1;
}
具体看行 9 的 send_socket
。这里主要就是把 数据加入到链表中。
static int
send_socket(struct socket_server *ss, struct request_send * request, struct socket_message *result, int priority, const uint8_t *udp_address) {
int id = request->id;
struct socket * s = &ss->slot[HASH_ID(id)];
struct send_object so;
send_object_init(ss, &so, request->buffer, request->sz);
uint8_t type = ATOM_LOAD(&s->type);
if (send_buffer_empty(s)) {//两个链表都是空
if (s->protocol == PROTOCOL_TCP) {
append_sendbuffer(ss, s, request); // add to high priority list, even priority == PRIORITY_LOW
}
if (enable_write(ss, s, true)) {//开启写监听
return report_error(s, result, "enable write failed");
}
} else {
if (s->protocol == PROTOCOL_TCP) {
if (priority == PRIORITY_LOW) {
append_sendbuffer_low(ss, s, request);
} else {
append_sendbuffer(ss, s, request);
}
}
}
return -1;
}
我们主要看 上面的 append_sendbuffer
。这个函数其实就是把要发送的数据打包分配一个 write_buffer
,然后加入到链表中。
具体从下面行22开始看。
static struct write_buffer *
append_sendbuffer_(struct socket_server *ss, struct wb_list *s, struct request_send * request, int size) {
struct write_buffer * buf = MALLOC(size);//分配一个内存
struct send_object so;
buf->userobject = send_object_init(ss, &so, request->buffer, request->sz);
buf->ptr = (char*)so.buffer;//剩余需要发送的数据的起始位置
buf->sz = so.sz;//剩余需要发送的数据
buf->buffer = request->buffer;//写缓冲区的原始数据
buf->next = NULL;
if (s->head == NULL) {
s->head = s->tail = buf;
} else {
assert(s->tail != NULL);
assert(s->tail->next == NULL);
s->tail->next = buf;
s->tail = buf;
}
return buf;
}
static inline void
append_sendbuffer(struct socket_server *ss, struct socket *s, struct request_send * request) {
struct write_buffer *buf = append_sendbuffer_(ss, &s->high, request, sizeof(*buf));
s->wb_size += buf->sz;//累计写缓冲区的大小 注意不是 s->dw_buff
}
我们构建一个
write_buffer
时,需要注意其中几个成员的意义。当我们把 write_buffer 缓冲区的数据交给网络线程发送时,可能无法一次性发送完。比如我们的 write_buffer原本保存的是 50个字节,我们网络线程可能只发送了20个字节,那么剩余的30个字节需要下次再发送出去。这时候 ptr和sz 就表示剩余需要发送数据的信息。
当把数据加入链表后,网络线程会在可写事件监听发生时,会调用 write
函数 把数据真正发送出去。看下面的行20.
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->checkctrl) {
}
if (ss->event_index == ss->event_n) {//一轮批量处理完成 新的一轮需要开始了 调用一次sp_wait作为新一轮的开始
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);//检测网络事件
ss->checkctrl = 1;
ss->event_index = 0;
}
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
struct socket_lock l;
socket_lock_init(s, &l);
switch (ATOM_LOAD(&s->type)) {
default:
if (e->write) {
int type = send_buffer(ss, s, &l, result);//是时候发送数据出去了
if (type == -1)
break;
return type;
}
}
}
}
}
进入行20内部看看。实际上主要做两件事
- 把
直接缓存
dw_buffer里面的数据包装成 一个write_buffer
,然后加入链表。但是是加入链表是头部。也就是说发送时优先级最高 - 调用
send_buffer_
去发送 链表里面的数据
static int
send_buffer(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
if (!socket_trylock(l))
return -1; // blocked by direct write, send later.
if (s->dw_buffer) {
// add direct write buffer before high.head
struct write_buffer * buf = MALLOC(sizeof(*buf));//分配内存
struct send_object so;
buf->userobject = send_object_init(ss, &so, (void *)s->dw_buffer, s->dw_size);
buf->ptr = (char*)so.buffer+s->dw_offset;//剩余发送的其实位置
buf->sz = so.sz - s->dw_offset;//剩余没有发送的
buf->buffer = (void *)s->dw_buffer;//原始数据 包括发送和没发送的
s->wb_size+=buf->sz;
if (s->high.head == NULL) {
s->high.head = s->high.tail = buf;
buf->next = NULL;
} else {
buf->next = s->high.head;
s->high.head = buf;
}
s->dw_buffer = NULL;//表示 直接缓冲区 清空了
}
int r = send_buffer_(ss,s,l,result);
socket_unlock(l);
return r;
}
我们具体去看看 send_buffer_
static int
send_buffer_(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
assert(!list_uncomplete(&s->low));//low链表中不能存在某个节点数据只发送了一部分
// step 1
int ret = send_list(ss,s,&s->high,l,result);//发送高优先级链表中的数据
if (s->high.head == NULL) {//如果高优先级链表为空了
// step 2
if (s->low.head != NULL) {//发送低优先级链表数据
int ret = send_list(ss,s,&s->low,l,result);
// step 3
if (list_uncomplete(&s->low)) {//如果低优先级链表中有一个节点数据只发送了一部分
raise_uncomplete(s);//则把这个节点的数据添加到 已经空的高优先级链表中
return -1;
}
if (s->low.head)
return -1;
}
// step 4
assert(send_buffer_empty(s) && s->wb_size == 0);
if (s->closing) {
// finish writing
force_close(ss, s, l, result);
return -1;
}
int err = enable_write(ss, s, false);//关闭可写监听
}
return -1;
}
上面的代码主要做的是事情
-
先把高优先级链表里面的数据发送
-
高优先级的数据都发送完了 那么开始发送低优先级链表的数据
- 如果低优先级数据不能在这次发完,那么把还有剩余数据的那个节点添加到高优先级链表中去
- 如果两个链表的数据都发送完了,就关闭可写监听。
最后我们看看到底 send_list
是怎么发送的 看行34
static int
send_list_tcp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
while (list->head) {
struct write_buffer * tmp = list->head;
for (;;) {
ssize_t sz = write(s->fd, tmp->ptr, tmp->sz);
if (sz < 0) {//出现异常情况
switch(errno) {
case EINTR:
continue;
case AGAIN_WOULDBLOCK:
return -1;
}
return close_write(ss, s, l, result);
}
stat_write(ss,s,(int)sz);
s->wb_size -= sz;
if (sz != tmp->sz) {
tmp->ptr += sz;//剩余没发送的起始位置
tmp->sz -= sz;//剩余没发送的还有多少
return -1;
}
break;
}
list->head = tmp->next;
write_buffer_free(ss,tmp);//发送完一个write_buffer ,释放内部的 发送数据 和 write_buffer自己
}
list->tail = NULL;
return -1;
}
static int
send_list(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
if (s->protocol == PROTOCOL_TCP) {
return send_list_tcp(ss, s, list, l, result);
}
}
上面的代码就是不断的把链表中的节点取出来,然后通过write函数发送出去。最后释放节点的内存。
关于写数据时出现异常,我们会在关闭连接时再次讨论。
标签:struct,buffer,网络,send,write,ss,数据,socket From: https://www.cnblogs.com/waittingforyou/p/16972228.html在多线程环境下,针对同一个网络套接字socket id每一次调用write,实际上都是原子操作。也就是你在a线程调用write成功写入123,我在b线程调用write写入abc,最终写入的是 123abc或者是abc123.不可能是其他组合状态。比如不可能是12ab3c这种。