首页 > 编程语言 >[原创] KCP 源码分析(上)

[原创] KCP 源码分析(上)

时间:2024-03-15 15:34:22浏览次数:38  
标签:snd 原创 IUINT32 buffer seg 源码 重传 KCP kcp

KCP 协议是一种可靠的传输协议,对比 TCP 取消了累计确认(延迟 ACK)、减小 RTO增长速度、选择性重传而非全部重传。通过用流量换取低时延。 KCP 中最重要的两个数据结构IKCPCB和IKCPSEG,一个IKCPCB对应一个 KCP 连接,通过这个结构体维护发送缓存、接收缓存、超时重传时间、窗口大小等。IKCPSEG 对应一个 KCP 数据包,包含该数据包的命令、数据、时间戳、数据长度等信息。源码地址:https://github.com/skywind3000/kcp

KCP 数据包结构体:

struct IKCPSEG
{
	struct IQUEUEHEAD node;
	IUINT32 conv; 	// 会话 ID
	IUINT32 cmd;	// KCP 命令:
    				// IKCP_CMD_ACK:这是个 ACK 
    				// IKCP_CMD_WASK:发送方探测接收方的窗口
    				// IKCP_CMD_WINS:接收方回应自己的窗口大小
    				
	IUINT32 frg;	// fragment分段号,如果是流模式:默认为 0
	IUINT32 wnd;	// 窗口大小
	IUINT32 ts;		// 发送方:数据包的发送时间戳。 
  					// 接收方(ACK):所接受数据包的发送时间,而不是发送 ACK 的时间,方便发送方收到 ACK 后计算 rtt。
	IUINT32 sn;		// 发送方:发送数据包的序列号
  					// 接收方(ACK):ACK 号
	IUINT32 una;	// 未确认序列号:期待下次收到的数据包
	IUINT32 len;	// 数据包除去头部的字节数
  
/*-----------------以下成员不会实际发送到网络中,主要是超时重传和快速重传计算的辅助数据-----------------*/
	IUINT32 resendts;	// = current + rto, 超时重传的阈值, 当前时间超过resendts, 就要重发这个数据包
	IUINT32 rto;	// Retransmission Timeout, 下次超时重传的间隔时间, 会随着超时次数增加, 增加速率取决于是不是快速模式
	IUINT32 fastack;// 数据包被跳过次数, 快速重传功能需要
	IUINT32 xmit;	// 该数据包发送次数, transmit 的缩写, ,次数太多判断网络断开
/*-----------------以上成员不会实际发送到网络中,主要是超时重传和快速重传计算的辅助数据-----------------*/
  
	char data[1];	// 数据包携带的数据,大小根据ikcp_segment_new的参数决定
};

KCP 连接结构体:

struct IKCPCB
{
	IUINT32 conv; 	// 会话ID
	IUINT32 mtu; 	// 下层协议的最大传输单元, 一次发送若干个kcp包, 这些包的总长度不超过mtu
	IUINT32 mss; 	// 一个KCP数据包的最大数据载荷, mss+head一定不超过mtu
	IUINT32 state; 	// 连接状态

	IUINT32 snd_una; 	// snd_una之前的包对方(接收方)都已经收到了
	IUINT32 snd_nxt;  	// 下一个要从 send_que 发到 send_buf 的包序列号
	IUINT32 rcv_nxt;	// 下一个要从 rcv_buf 发到 rcv_que 的包序列号

	IUINT32 ts_recent; 	// 没用到
	IUINT32 ts_lastack;	// 没用到

	IUINT32 ssthresh;	// 拥塞窗口从慢启动转换到拥塞避免的窗口阈值
	IINT32 rx_rttval;	// 近4次rtt和srtt的平均差值,反应了rtt偏离srtt的程度
	IINT32 rx_srtt;		// 平滑的rtt,近8次rtt平均值
	IINT32 rx_rto;		// 系统的重传超时时间
	IINT32 rx_minrto; 	// 最小重传超时时间
	
	IUINT32 snd_wnd; 	// 发送窗口大小
	IUINT32 rcv_wnd; 	// 接收窗口大小
	IUINT32 rmt_wnd; 	// 对方接收窗口大小
	IUINT32 cwnd; 		// 拥塞窗口大小
	IUINT32 probe;		// 探测窗口大小

	IUINT32 current;	// 当前时间戳
	IUINT32 interval; 	// 内部flush刷新间隔
	IUINT32 ts_flush; 	// 下一次刷新输出的时间戳
	IUINT32 xmit;		// 该KCP连接超时重传次数

	IUINT32 nrcv_buf; 	// rcv_buf的长度
	IUINT32 nsnd_buf;	// snd_buf的长度
	IUINT32 nrcv_que; 	// rcv_que的长度
	IUINT32 nsnd_que; 	// snd_que的长度

	IUINT32 nodelay;	// 是否启用nodelay模式, ==2为快速模式
	IUINT32 updated;	// 是否调用过update函数
	IUINT32 ts_probe; 	// 下次探测窗口大小的时间戳
	IUINT32 probe_wait; // 探测窗口大小的间隔时间,每次探测对面窗口为0(失败), 探测时间*1.5
	IUINT32 dead_link;	// 断开连接的重传次数阈值
	IUINT32 incr; 		// k*mss , 拥塞窗口等于floor(k)
	struct IQUEUEHEAD snd_queue;// 发送队列
	struct IQUEUEHEAD rcv_queue;// 接收队列
	struct IQUEUEHEAD snd_buf; // 发送缓存, 还没收到 ACK 的包都在这里边
	struct IQUEUEHEAD rcv_buf; // 接收缓存, 将收到的数据暂存, 然后将其中连续的数据放到rcv_queue供上层读取
	IUINT32 *acklist; 	// 一个整数数组,存放要回复的ack,
  						// 结构为 [sn0(接收数据包的序号), ts0(接收数据包的发送时间), sn1, ts1, ...]
	IUINT32 ackcount; 	// 本次需要回复的ack个数
	IUINT32 ackblock; 	// acklist的大小,会动态扩容,类似于 vector
	void *user;			// 用户标识
	char *buffer; 		// 数据缓冲区
	int fastresend; 	// 快速重传的失序阈值, 发送方收到 fastresend 个冗余ACK就触发快速重传
	int fastlimit;  	// 快速重传的次数限制
	int nocwnd; 		// 0: 有拥塞控制, 1: 没有拥塞控制
	int stream;			// 流模式
	int logmask;
	int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); // 回调函数,数据发送到下层协议
	void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

ikcp_send

先来看发送方的用户接口:

int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
	IKCPSEG *seg;
	int count, // 需要装多少包
		 i;

	assert(kcp->mss > 0);
	if (len < 0) return -1;

	// 字节流模式,如果之前的包没装满,则先把之前的包装满。(粘包现象)
	if (kcp->stream != 0) { 
		if (!iqueue_is_empty(&kcp->snd_queue)) {
      // old:没有被装满的包
			IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
			if (old->len < kcp->mss) { // 前一个包没塞满, 粘包
				int capacity = kcp->mss - old->len;
				int extend = (len < capacity)? len : capacity;
				seg = ikcp_segment_new(kcp, old->len + extend);
				assert(seg);
				if (seg == NULL) {
					return -2;
				}
				iqueue_add_tail(&seg->node, &kcp->snd_queue);
				memcpy(seg->data, old->data, old->len); // 把old数据转移到seg,然后把old删了
				if (buffer) {
					memcpy(seg->data + old->len, buffer, extend);
					buffer += extend;
				}
				seg->len = old->len + extend;
				seg->frg = 0;
				len -= extend;
				iqueue_del_init(&old->node); 
				ikcp_segment_delete(kcp, old); // 删除 old 节点
			}
		}
		if (len <= 0) {
			return 0;
		}
	}

	// 需要几个包来装len字节的数据, 一个包最多装mss字节
	if (len <= (int)kcp->mss) count = 1;
	else count = (len + kcp->mss - 1) / kcp->mss;

	if (count >= (int)IKCP_WND_RCV) return -2;

	if (count == 0) count = 1;

	// 将buffer数据分段装入snd_queue
	for (i = 0; i < count; i++) {
		int size = len > (int)kcp->mss ? (int)kcp->mss : len;
		seg = ikcp_segment_new(kcp, size);
		assert(seg);
		if (seg == NULL) {
			return -2;
		}
		if (buffer && len > 0) {
			memcpy(seg->data, buffer, size);
		}
		seg->len = size;
    	// 上层数据包被分段后的段号,如果开启流模式,默认段号都为 0
		seg->frg = (kcp->stream == 0)? (count - i - 1) : 0; 
		iqueue_init(&seg->node);
		iqueue_add_tail(&seg->node, &kcp->snd_queue); // 将数据包 push 进发送队列
		kcp->nsnd_que++;
		if (buffer) {
			buffer += size;
		}
		len -= size;
	}

	return 0;
}

ikcp_send 的主要逻辑就是将用户数据分段组装成 IKCPSEG 然后将其添加到发送队列。如果是流模式,则没有段号,每个包都是满的,数据有粘包需要用户自己处理。

ikcp_update

上层定时调用,主要功能是设置当前时间戳、计算下一次update 事件以及调用 ikcp_flush,ikcp_flush 才是将数据从发送队列发送到发送缓存的函数。

ikcp_flush

Step1、回应ACK

void ikcp_flush(ikcpcb *kcp)
{
    char *buffer = kcp->buffer; // 数据缓冲区
	char *ptr = buffer;
 	IKCPSEG seg;
    seg.conv = kcp->conv;
	seg.cmd = IKCP_CMD_ACK; // 命令为 ACK
	seg.frg = 0;
	seg.wnd = ikcp_wnd_unused(kcp); // 设置窗口大小
	seg.una = kcp->rcv_nxt;
	seg.len = 0;
	seg.sn = 0;
	seg.ts = 0;
    ...
	count = kcp->ackcount; // 需要回复的 ack 个数
	for (i = 0; i < count; i++) { 
		size = (int)(ptr - buffer);
        // 如果buffer 放不下 seg 的 head ,那就先把 buffer 中的数据先发到网络中
		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
			ikcp_output(kcp, buffer, size); // 将buffer 中的数据先发到网络中
			ptr = buffer;
		}
        // 从 acklist 中取出要发送 ack 的 sn 和 ts
		ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
        // 只将 seg 的 head 拷贝到数据缓冲区里,注意回应 ack 的报文没有 data。
		ptr = ikcp_encode_seg(ptr, &seg); 
	}

	kcp->ackcount = 0;
  	...
}

这段代码主要功能就是回复 ACK,在接收数据的时候 kcp 会把需要回复的 ACK 放入 acklist,在这里检查kcp->ackcount,发现需要回复 ACK就从 acklist 中取出要发送 ack 的 sn 和 ts存入 seg 然后发送。

Step2、探测窗口

void ikcp_flush(ikcpcb *kcp)
{
    char *buffer = kcp->buffer; // 数据缓冲区
	char *ptr = buffer;
 	IKCPSEG seg;
    ...
	// 对面没有接收缓存,等待probe_wait
	if (kcp->rmt_wnd == 0) {
		if (kcp->probe_wait == 0) { // 初始化探测窗口
			kcp->probe_wait = IKCP_PROBE_INIT;
			kcp->ts_probe = kcp->current + kcp->probe_wait;
		}	
		else {
			if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {  // 已经到了探测时间
				if (kcp->probe_wait < IKCP_PROBE_INIT) 
					kcp->probe_wait = IKCP_PROBE_INIT;
				kcp->probe_wait += kcp->probe_wait / 2; // 每次探测间隔增长 0.5 倍
				if (kcp->probe_wait > IKCP_PROBE_LIMIT)
					kcp->probe_wait = IKCP_PROBE_LIMIT;
				kcp->ts_probe = kcp->current + kcp->probe_wait;
				kcp->probe |= IKCP_ASK_SEND; // 标记需要探测窗口
			}
		}
	}	else { // 一旦对方有,则重置探测时间和探测间隔
		kcp->ts_probe = 0;
		kcp->probe_wait = 0;
	}
	
    // 标记需要发送探测
	if (kcp->probe & IKCP_ASK_SEND) {
		seg.cmd = IKCP_CMD_WASK; // 命令设为 IKCP_CMD_WASK,其他头信息不需要
		size = (int)(ptr - buffer);
		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
			ikcp_output(kcp, buffer, size);
			ptr = buffer;
		}
		ptr = ikcp_encode_seg(ptr, &seg);
	}
  	...
}

当发送方发现对方窗口大小为 0,需要发送探测命令询问对方窗口大小,每次探测间隔都会增长0.5 倍,一旦对方有接收窗口,则重置探测时间和探测间隔。

Step3、回应探测窗口

void ikcp_flush(ikcpcb *kcp)
{
    char *buffer = kcp->buffer; // 数据缓冲区
	char *ptr = buffer;
 	IKCPSEG seg;
    ...
    // 需要回应窗口大小
	if (kcp->probe & IKCP_ASK_TELL) {
		seg.cmd = IKCP_CMD_WINS;  // 命令设为 IKCP_CMD_WINS,其他头信息不需要
		size = (int)(ptr - buffer);
		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
			ikcp_output(kcp, buffer, size);
			ptr = buffer;
		}
		ptr = ikcp_encode_seg(ptr, &seg);
	}
    ...
}

Step4、发送数据

void ikcp_flush(ikcpcb *kcp)
{
    char *buffer = kcp->buffer; // 数据缓冲区
	char *ptr = buffer;
 	IKCPSEG seg;
    ...
    // 计算可以发多少数据
	cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
    // kcp->nocwnd == 1 则关闭流控(拥塞控制)
	if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
    // 如果 snd_nxt(下一个要从 send_que 发到 send_buf 的包序列号) 在发送窗口内
    // 就一直从 snd_que 中取出数据包放到 snd_buf 中, 直到snd_buf满或者snd_queue为空
	while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
		IKCPSEG *newseg;
		if (iqueue_is_empty(&kcp->snd_queue)) break;

		newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

		iqueue_del(&newseg->node);
		iqueue_add_tail(&newseg->node, &kcp->snd_buf);
		kcp->nsnd_que--;
		kcp->nsnd_buf++;

		newseg->conv = kcp->conv;
		newseg->cmd = IKCP_CMD_PUSH;
		newseg->wnd = seg.wnd;
		newseg->ts = current;
		newseg->sn = kcp->snd_nxt++;
		newseg->una = kcp->rcv_nxt;
		newseg->resendts = current;
		newseg->rto = kcp->rx_rto;
		newseg->fastack = 0;
		newseg->xmit = 0;
	}

	// resent:收到 resent 个失序 ACK 就会触发快速重传,TCP 里是冗余 ACK。
    // 如果没开启快速重传,则 resent 为 inf。
	resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
    // 超时重传的最小超时时间
	rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;

	// 遍历snd_buf里的数据包是否需要发送
	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
		IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
		int needsend = 0; 
		/* 
			该数据包是否需要发送,三种情况:
			1. 该数据包没发送过
			2. 超时没有收到ack,触发超时重传
			3. 收到resent次冗余ack,触发快速重传
		*/

		// 1. 该数据包第一次发送
		if (segment->xmit == 0) {
			needsend = 1;
			segment->xmit++;
			segment->rto = kcp->rx_rto; // 超时重传时间
			segment->resendts = current + segment->rto + rtomin;
		}

		// 2. 该数据包超时没有收到ACK, 触发超时重传
		else if (_itimediff(current, segment->resendts) >= 0) { 
			needsend = 1;
			segment->xmit++;
			kcp->xmit++;
			if (kcp->nodelay == 0) { // 普通模式,超时时间*2
				segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
			}	else { // 快速模式,超时时间*1.5
				IINT32 step = (kcp->nodelay < 2)? 
					((IINT32)(segment->rto)) : kcp->rx_rto;
				segment->rto += step / 2;
			}
			segment->resendts = current + segment->rto;
			lost = 1;
		}

		// 3. 该数据包被跳过的次数超过了fastresend, 触发快速重传
		else if (segment->fastack >= resent) {  
			if ((int)segment->xmit <= kcp->fastlimit || // 快速重传的限制,不能一直快速重传
				kcp->fastlimit <= 0) {
				needsend = 1;
				segment->xmit++;
				segment->fastack = 0;
				segment->resendts = current + segment->rto;
				change++;
			}
		}

		if (needsend) {
			int need;
			segment->ts = current;
			segment->wnd = seg.wnd;
			segment->una = kcp->rcv_nxt;

			size = (int)(ptr - buffer);
			need = IKCP_OVERHEAD + segment->len; // 该数据包长度, 最大为head+mss

			if (size + need > (int)kcp->mtu) {
				ikcp_output(kcp, buffer, size);
				ptr = buffer;
			}

			ptr = ikcp_encode_seg(ptr, segment);

			if (segment->len > 0) {
				memcpy(ptr, segment->data, segment->len);
				ptr += segment->len;
			}
			// 某个数据包的传输次数超过了dead_link,则判断当前连接断开。
			if (segment->xmit >= kcp->dead_link) {  // 断开连接
				kcp->state = (IUINT32)-1;
			}
		}
	}

	// 把没有数据缓冲区的数据发送出去
	size = (int)(ptr - buffer);
	if (size > 0) {
		ikcp_output(kcp, buffer, size);
	}

	// 如果触发了快速重传,减小拥塞窗口(快速恢复)
	if (change) {
		IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
		kcp->ssthresh = inflight / 2;
		if (kcp->ssthresh < IKCP_THRESH_MIN)
			kcp->ssthresh = IKCP_THRESH_MIN;
		kcp->cwnd = kcp->ssthresh + resent;
		kcp->incr = kcp->cwnd * kcp->mss;
	}

	// 超时重传,丢包了,重置拥塞窗口和ssthresh。
	if (lost) {
		kcp->ssthresh = cwnd / 2;
		if (kcp->ssthresh < IKCP_THRESH_MIN)
			kcp->ssthresh = IKCP_THRESH_MIN;
		kcp->cwnd = 1;
		kcp->incr = kcp->mss;
	}

	if (kcp->cwnd < 1) {
		kcp->cwnd = 1;
		kcp->incr = kcp->mss;
	}
    ...
}

首先确定发送窗口 cwnd = min(kcp->snd_wnd, kcp->rmt_wnd); 接着检查是否开启流控(拥塞控制) if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);通过取消拥塞控制可以进一步降低延迟。

然后从snd_queue中取出数据包放到snd_buf中, 直到snd_buf满或者snd_queue为空。然后依次检查snd_buf 里的数据包需不需要发送,需要发送有三种情况:

  1. 该数据包首次发送。
  2. 触发超时重传:时间超过超时重传的阈值。超时重传普通模式下:每次超时,超时重传的时间就翻倍。在快速模式下:每次超时的重传时间翻 0.5 倍。降低传输时延同时重置拥塞窗口和ssthresh。
  3. 触发快速重传:收到了该数据包的resent次的失序(冗余)ACK。该数据包被跳过的次数超过了fastresend, 触发快速重传。同时减小拥塞窗口为原来的一半,ssthresh也设置为这个值。

标签:snd,原创,IUINT32,buffer,seg,源码,重传,KCP,kcp
From: https://www.cnblogs.com/hellozhangjz/p/18075534

相关文章

  • 全套大型体检中心PEIS源码 医院PEIS管理系统源码
    大型体检中心PEIS源码 医院PEIS管理系统源码医院智慧体检系统,该系统覆盖医院体检科、体检中心的所有业务,完成从预约、登记、收费、检查、检验、出报告、分析、报表等所有工作。系统可以对团检的每个环节设有操作界面,从检前的预约、记录、EXCEL数据批量导入、自动筛选、自......
  • Django admin管理工具的使用、定制及源码解析
    Djangoadmin管理工具的使用、定制及源码解析admin组件使用Django提供了基于web的管理工具。Django自动管理工具是django.contrib的一部分。你可以在项目的settings.py中的INSTALLED_APPS看到它:#ApplicationdefinitionINSTALLED_APPS=['django.contrib.a......
  • 小程序开发平台源码系统:万能建站门店小程序功能 带完整的搭建教程以及代码包
    在移动互联网时代,小程序以其独特的优势,迅速占领了市场的一席之地。然而,对于许多中小企业和个人开发者来说,缺乏专业的开发团队和技术支持,使得小程序开发成为一项难以逾越的技术门槛。小编给大家分享一款万能建站门店小程序源码系统,旨在降低小程序开发的难度,让更多的人能够轻松搭......
  • 团购小程序源码系统:快递代收+社区便利店+推送商品等功能 带完整的安装部署教程
    在移动互联网高速发展的今天,小程序以其轻便、快捷、无需下载的特点,迅速成为商家与用户之间的桥梁。为了满足社区团购市场的需求,小编给大家分享一款功能强大的团购小程序源码系统,该系统集成了快递代收、社区便利店、推送商品等多项功能,为商家提供了一个高效、便捷的运营平台。......
  • 微信小程序 代驾预约评价系统 uniapp毕业设计源码lw
    代驾系统的系统项目的概述设计分析,主要内容有平台的具体分析,进行数据库的是设计,数据采用mysql数据库,并且对于系统的设计采用比较人性化的操作设计,对于系统出现的错误信息可以及时做出处理及反馈。本代驾服务系统主要包括系统用户管理模块、代驾线路信息管理、车辆信息管理、代......
  • 基于微信小程序的公交信息在线查询系统小程序设计与实现(源码+lw+部署文档+讲解等)
    文章目录前言项目运行截图技术框架后端采用SpringBoot框架前端框架Vue可行性分析系统测试系统测试的目的系统功能测试数据库表设计代码参考数据库脚本为什么选择我?获取源码前言......
  • 基于SSM的网上医院预约挂号系统的设计与实现(论文+源码)_kaic
    摘 要如今的信息时代,对信息的共享性,信息的流通性有着较高要求,因此传统管理方式就不适合。为了让医院预约挂号信息的管理模式进行升级,也为了更好的维护医院预约挂号信息,网上医院预约挂号系统的开发运用就显得很有必要。并且通过开发网上医院预约挂号系统,不仅可以让所学的SSM......
  • 探索源码世界,Java毕设设计新选择:源码码头
    作为一名即将踏入职场的Java学子,你是否曾为毕业设计而犯愁?是否曾为找不到合适的项目而苦恼?别担心,源码码头(www.icodedock.com)为你提供了全新的解决方案!1.创新源码,灵感无限源码码头汇聚了大量独一无二的Java毕业设计项目,这些项目不仅仅是传统意义上的框架应用,更是蕴含了创新与......
  • Langchain-ChatGLM源码解读(一)-文档数据上传
    一、简介Langchain-ChatGLM 相信大家都不陌生,近几周计划出一个源码解读,先解锁langchain的一些基础用法。文档问答过程大概分为以下5部分,在Langchain中都有体现。上传解析文档文档向量化、存储文档召回query向量化文档问答今天主要讲langchain在上传解析文档时是怎么实......
  • 基于SpringBoot实现企业技术员工测评考试管理系统演示【附项目源码+论文说明】
    基于SpringBoot实现企业技术员工测评考试管理系统演示摘要社会的发展和科学技术的进步,互联网技术越来越受欢迎。网络计算机的生活方式逐渐受到广大人民群众的喜爱,也逐渐进入了每个用户的使用。互联网具有便利性,速度快,效率高,成本低等优点。因此,构建符合自己要求的操作系......