首页 > 编程语言 >KCP源码剖析和应用解析

KCP源码剖析和应用解析

时间:2023-12-16 11:55:54浏览次数:38  
标签:seg IUINT32 int ikcp len 剖析 源码 KCP kcp

 

一,什么是KCP

KCP是一个快速可靠的协议,基于UDP的类似TCP的协议。 随着网络带宽的不断增大,在很多网络应用场景中,TCP的旧有特性对当今一些要求及时响应的网络要求不符合。而TCP又是嵌在操作系统内核中的模块,用户态软件不能够自定义来修改太多TCP的细节。所以推出了KCP以应对延迟越发要求高的网络应用场景

 

二,为什么有KCP

因为TCP在内核中比UDP多做了很多的事情,一个数据报文从发送端发出,通过网络传输到接收端,UDP报文所费时间比TCP报文的期望耗时更短。所以有一些网络应用场景下对延迟较高的应用,会采用自定义的KCP协议,在保障可靠传输的时候尽可能减少报文传递的延迟。

 

三,详解KCP源码

因为可以在用户态自定义该协议的使用,各种各样的KCP版本百花齐放,本文介绍的是原版的kcp协议

首先来看kcp的报文数据结构:

// KCP 包头结构 
// |< -------------- 4 bytes ----------------------->| 
// +----------+------------+-------------+-----------+ 
// |                     conv                        |    // 连接号  表示来自哪个客户端 
// + ---------+------------+--------------+----------+ 
// |  cmd     |    frg     |             wnd         |    // 命令字,   分片,   接收窗口大小 
// +----------+------------+--------------+----------+
// |                       ts                        |    // 时间序列  
// +----------+------------+--------------+----------+
// |                        sn                       |    // 序列号 
// +----------+------------+--------------+----------+ 
// |                      una                        |      // 下一个可接收的序列号,确认号 
// +----------+------------+--------------+----------+ 
// |                      len                        |     // 数据长度  
// +----------+------------+--------------+----------+
// |    --------          DATA         ----          |    // 用户数据 

 

从上面的数据结构来看看kcp segment的定义  

//===================================================================
// KCP data SEGMENT            数据报文的结构,存储需要发送的package的内容 
//===================================================================
struct IKCPSEG
{
    struct IQUEUEHEAD node;             // 双向链表定义的队列 用于发送和接受队列的缓冲 
    IUINT32 conv;                       // conversation ,会话序列号:接收到的数据包与发送的一致才接收此数据包 
    IUINT32 cmd;                        // command,指令类型:代表这个segment的类型 
    IUINT32 frg;                        // fragment 分段序号 
    IUINT32 wnd;                        // window 窗口大小 
    IUINT32 ts;                         // timestamp,发送的时间戳 
    IUINT32 sn;                         // sequence number , segment序号 
    IUINT32 una;                        // unacknowledged,当前未收到的序号,即代表这个序号之前的包都收到 
    IUINT32 len;                        // length,数据长度 
    IUINT32 resendts;                   // 重发的时间戳 
    IUINT32 rto;                        // 超时重传的时间间隔 
    IUINT32 fastack;                    // ack跳过的次数,用于快速重传 
    IUINT32 xmit;                       // 发送的次数(次数为1则是第一次发送,次数>=2则为重传) 
    char data[1];
};

  

代码的根本是kcp控制器,负责kcp算法逻辑,数据报文的收发,各种指标和变量的存放

//---------------------------------------------------------------------
// IKCPCB           KCP控制块,负责整个kCP算法逻辑和数据存取
//---------------------------------------------------------------------
struct IKCPCB
{
    IUINT32 conv;     // conv 会话ID,
    IUINT32 mtu;      // mtu 最大传输单元,
    IUINT32 mss;      // mss 最大分片大小,不大于mtu,     
    IUINT32 state;     // state 连接状态,0xFFFFFFFF(即-1)表示断开链接 
    

    IUINT32 snd_una;      // snd_una   第一个未确认的包
    IUINT32 snd_nxt;      // snd_nxt   待发送包的序号 
    IUINT32 rcv_nxt;       // rcv_nxt   待接收消息的序号为了保证包的顺序,接收方会维护一个接收窗口。
    //           接收窗口有一个起始序号 rcv_nxt(待接收消息序号)以及尾序号 rcv_nxt + rcv_wnd (接收窗口大小)
    
    IUINT32 ts_recent;
    IUINT32 ts_lastack;
    IUINT32 ssthresh;     //  ssthresh  拥塞窗口的阈值,以包为单位(TCP以字节为单位)
 
    
    IINT32 rx_rttval;    // rx_rttval    ack接收rtt浮动值,代表连接的抖动情况,
    IINT32 rx_srtt;      // rx_srtt      ack接收rtt平滑值,(smoothed) 
    IINT32 rx_rto;       // rx_rto       由ACK接收延迟计算出来的重传超时时间 
    IINT32 rx_minrto;     // rx_minrto    最小重传超时时间 

    
    IUINT32 snd_wnd;     // snd_wnd      发送窗口大小 
    IUINT32 rcv_wnd;     // rcv_wnd      接收窗口大小
    IUINT32 rmt_wnd;     // rmt_wnd      远端接收窗口大小 
    IUINT32 cwnd;        // cwnd         拥塞窗口大小 
    IUINT32 probe;        // probe        探查开关,是否开启探查对方的接收窗口大小
    
    IUINT32 current;         // current       当前时间戳 
    IUINT32 interval;        // interval      内部flush刷新间隔 
    IUINT32 ts_flush;        // ts_flush      下次flush刷新时间戳 
    IUINT32 xmit;             // xmit          总共重发次数
     
    
    IUINT32 nrcv_buf;        // 当前的接受缓存的个数,
    IUINT32 nsnd_buf;         // 发送缓存的个数
     
    IUINT32 nrcv_que;        // 当前的接受队列的个数
    IUINT32 nsnd_que;        // 发送队列的个数

    IUINT32 nodelay;    // nodelay  是否启动无延迟模式 
    
    IUINT32 updated;    // updated  是否调用过update函数的标识(KCP需要上层通过不断的ikcp_update和ikcp_check来驱动KCP的收发过程)

    IUINT32 ts_probe;    // ts_probe 下次探查窗口的时间戳 
    
    IUINT32 probe_wait;   // probe_wait 探查窗口需要等待的时间 
    

    IUINT32 dead_link;   // dead_link 最大重传次数,达到则改状态为连接中断 
    
    IUINT32 incr;       // incr   可发送的最大数据量 

    struct IQUEUEHEAD snd_queue;  // 发送消息的队列   作用是可以区分发送包的流模式或者段模式
    
    struct IQUEUEHEAD rcv_queue;  // 接收消息的队列 
    
    struct IQUEUEHEAD snd_buf;   // 发送消息的缓存 
    
    struct IQUEUEHEAD rcv_buf;   // 接收消息的缓存 
    
    
    IUINT32 *acklist;   // 待发送的ack的列表 
    IUINT32 ackcount;   // 本次已收到的ack数量
    IUINT32 ackblock;  // 当前acklist所分配的内存长度
    

    void *user;      // 存储消息字节流的内存 
    char *buffer;    // 触发快速重传的重复ACK个数 
    int fastresend;  // 触发快速重传的最大次数
    int fastlimit;
    
    int nocwnd;  // nocwnd 取消拥塞控制 (丢包退让,慢启动)
    int stream;  // stream 是否采用流传输模式 
    int logmask;

    int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
    void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

  

从main入口,kcp控制器的创建和销毁的函数是

ikcpcb* ikcp_create(IUINT32 conv, void *user);   // 初始化kcp控制块(根据kcp协议号来初始化)

void ikcp_release(ikcpcb *kcp);        // 释放kcp控制块

  

协议控制头的收报文的接口,将kcp的收报队列中的数据拿来给应用层
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
    int ispeek = (len < 0)? 1 : 0;
    ......
    peeksize = ikcp_peeksize(kcp);

    if (kcp->nrcv_que >= kcp->rcv_wnd)
        recover = 1;        /// 快速恢复标记 
    ////  如果待接收数据大于接收窗口值,那此时实际接收窗口为0.
    ////  发送端开始定时探查窗口

    // merge fragment
    for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
        int fragment;
        seg = iqueue_entry(p, IKCPSEG, node);
        p = p->next;

        if (buffer) {
            memcpy(buffer, seg->data, seg->len);
            buffer += seg->len;
        }

        len += seg->len;
        fragment = seg->frg;

        if (ispeek == 0) {
            iqueue_del(&seg->node);
            ikcp_segment_delete(kcp, seg);
            kcp->nrcv_que--;
        }
        if (fragment == 0) 
            break;
    }

    // move available data from rcv_buf -> rcv_queue
    while (! iqueue_is_empty(&kcp->rcv_buf)) {
        seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
        if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
            iqueue_del(&seg->node);
            kcp->nrcv_buf--;
            iqueue_add_tail(&seg->node, &kcp->rcv_queue);
            kcp->nrcv_que++;
            kcp->rcv_nxt++;
        }   else {
            break;
        }
    }

    // fast recover
    if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
        // ready to send back IKCP_CMD_WINS in ikcp_flush
        // tell remote my window size
        kcp->probe |= IKCP_ASK_TELL;
    }

    return len;
}

  

获得当前控制头的所有收到的queue上的数据大小
// peek data size
//---------------------------------------------------------------------
int ikcp_peeksize(const ikcpcb *kcp)
{
    struct IQUEUEHEAD *p;
    IKCPSEG *seg;
    int length = 0;

    if (iqueue_is_empty(&kcp->rcv_queue)) return -1;

    seg = iqueue_entry(kcp->rcv_queue.next, IKCPSEG, node);
    if (seg->frg == 0) return seg->len;

    if (kcp->nrcv_que < seg->frg + 1) return -1;

    for (p = kcp->rcv_queue.next; p != &kcp->rcv_queue; p = p->next) {
        seg = iqueue_entry(p, IKCPSEG, node);
        length += seg->len;
        if (seg->frg == 0) break;
    }
    return length;
}

  

对于发送数据报文,从kcp的可发送接口大小,取决对对方端可以接收的长度,将buff里面的数据放到kcp的待发送队列
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
    IKCPSEG *seg;
    int count, i;

    assert(kcp->mss > 0);
    if (len < 0) return -1;

    // append to previous segment in streaming mode (if possible)       # 针对流模式 
    if (kcp->stream != 0) {
        if (!iqueue_is_empty(&kcp->snd_queue)) {
            IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
            if (old->len < kcp->mss) {
                int capacity = kcp->mss - old->len;
                int extend = (len < capacity)? len : capacity;
                seg = ikcp_segment_new(kcp, old->len + extend);
                assert(seg);
                if (seg == NULL) {
                    return -2;
                }
                iqueue_add_tail(&seg->node, &kcp->snd_queue);
                memcpy(seg->data, old->data, old->len);
                if (buffer) {
                    memcpy(seg->data + old->len, buffer, extend);
                    buffer += extend;
                }
                seg->len = old->len + extend;
                seg->frg = 0;
                len -= extend;
                iqueue_del_init(&old->node);
                ikcp_segment_delete(kcp, old);
            }
        }
        if (len <= 0) {
            return 0;
        }
    }

    if (len <= (int)kcp->mss) count = 1;
    else count = (len + kcp->mss - 1) / kcp->mss;

    if (count >= (int)IKCP_WND_RCV) return -2;

    if (count == 0) count = 1;

    // fragment
    for (i = 0; i < count; i++) {
        int size = len > (int)kcp->mss ? (int)kcp->mss : len;
        seg = ikcp_segment_new(kcp, size);
        assert(seg);
        if (seg == NULL) {
            return -2;
        }
        if (buffer && len > 0) {
            memcpy(seg->data, buffer, size);
        }
        seg->len = size;
        seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
        iqueue_init(&seg->node);
        iqueue_add_tail(&seg->node, &kcp->snd_queue);
        kcp->nsnd_que++;
        if (buffer) {
            buffer += size;
        }
        len -= size;
    }

    return 0;
}

  

根据收到的报文来更新kcp控制块的rtt相关的值
// 根据当前的rtt来更新rtt浮动值和平滑值
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt)
{
    IINT32 rto = 0;
    if (kcp->rx_srtt == 0) {
        kcp->rx_srtt = rtt;
        kcp->rx_rttval = rtt / 2;
    }   else {
        long delta = rtt - kcp->rx_srtt;
        if (delta < 0) delta = -delta;
        kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4;               // 每次rtt变化的浮动大小,绝对值,最近4次的平均,平滑系数为1/4
        kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8;                    // rtt的平均值, 最近八次的平均
        if (kcp->rx_srtt < 1) kcp->rx_srtt = 1;
    }
    rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval);         // rto计算为 rtt + kcp主循环时间 (或者是4倍的rtt浮动值)
    kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX);               // 约束一下新算出的rto 
}

  

根据当前的send buff来更新未确认报文号码
// 根据当前的send buff来更新未确认包号
static void ikcp_shrink_buf(ikcpcb *kcp)
{
    struct IQUEUEHEAD *p = kcp->snd_buf.next;
    if (p != &kcp->snd_buf) {
        IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
        kcp->snd_una = seg->sn;
    }   else {      // snd_buf 空了,等确认包号就是待发送包号
        kcp->snd_una = kcp->snd_nxt;
    }
}

  

两个工具函数

// 根据确认号来确认收到的包,并将snd_buff对应的报文删掉 
void ikcp_parse_ack(ikcpcb *kcp, IUINT32 sn);

// 处理未确认报文号
static void ikcp_parse_una(ikcpcb *kcp, IUINT32 una);

  

在处理确认报文号,在该包之前的未确认的报文,都进行fastack + 1 的操作
static void ikcp_parse_fastack(ikcpcb *kcp, IUINT32 sn, IUINT32 ts)
{
    struct IQUEUEHEAD *p, *next;

    if (_itimediff(sn, kcp->snd_una) < 0 || _itimediff(sn, kcp->snd_nxt) >= 0)  // 包号小于等待确认号,或者大于待发送的包号则不合法
        return;

    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = next) {
        IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
        next = p->next;
        if (_itimediff(sn, seg->sn) < 0) {
            break;
        }
        else if (sn != seg->sn) {
        #ifndef IKCP_FASTACK_CONSERVE
            seg->fastack++;
        #else
            if (_itimediff(ts, seg->ts) >= 0)
                seg->fastack++;
        #endif
        }
    }
}

  

获取ack包中的segment number确认号 和 timstamp 发送时间戳
void ikcp_ack_get(const ikcpcb *kcp, int p, IUINT32 *sn, IUINT32 *ts);

  

atk append操作,将ack编号缓存起来,等待发送
static void ikcp_ack_push(ikcpcb *kcp, IUINT32 sn, IUINT32 ts)

  

从网络socket中获得了数据后,塞入kcp协议中,等待kcp处理
// input data ,  拿到data放入kcp中
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    IUINT32 prev_una = kcp->snd_una;
    IUINT32 maxack = 0, latest_ts = 0;
    int flag = 0;

    if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) {
        ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size);
    }

    if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;

    while (1) {
        IUINT32 ts, sn, len, una, conv;
        IUINT16 wnd;
        IUINT8 cmd, frg;
        IKCPSEG *seg;

        if (size < (int)IKCP_OVERHEAD) break;

        data = ikcp_decode32u(data, &conv);
        if (conv != kcp->conv) return -1;

        size -= IKCP_OVERHEAD;

        if ((long)size < (long)len || (int)len < 0) return -2;

        if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
            cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) 
            return -3;

        kcp->rmt_wnd = wnd;
        ikcp_parse_una(kcp, una);
        ikcp_shrink_buf(kcp);

        if (cmd == IKCP_CMD_ACK) {
            if (_itimediff(kcp->current, ts) >= 0) {                // current 减去 对方发送的时间戳就是 rtt 
                ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
            }
            ikcp_parse_ack(kcp, sn);
            ikcp_shrink_buf(kcp);           // 在接受完ack包体后更新snd_una
            if (flag == 0) {
                flag = 1;
                maxack = sn;
                latest_ts = ts;
            }   else {
                if (_itimediff(sn, maxack) > 0) {
                #ifndef IKCP_FASTACK_CONSERVE
                    maxack = sn;
                    latest_ts = ts;
                #else
                    if (_itimediff(ts, latest_ts) > 0) {
                        maxack = sn;
                        latest_ts = ts;
                    }
                #endif
                }
            }
        }
        else if (cmd == IKCP_CMD_PUSH) {
            if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
                ikcp_log(kcp, IKCP_LOG_IN_DATA, 
                    "input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
            }
            if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
                ikcp_ack_push(kcp, sn, ts);
                if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
                    seg = ikcp_segment_new(kcp, len);
                    seg->conv = conv;
                    seg->cmd = cmd;
                    seg->frg = frg;
                    seg->wnd = wnd;
                    seg->ts = ts;
                    seg->sn = sn;
                    seg->una = una;
                    seg->len = len;

                    if (len > 0) {
                        memcpy(seg->data, data, len);
                    }

                    ikcp_parse_data(kcp, seg);
                }
            }
        }
        else if (cmd == IKCP_CMD_WASK) {
            // ready to send back IKCP_CMD_WINS in ikcp_flush
            // tell remote my window size
            kcp->probe |= IKCP_ASK_TELL;
            if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) {
                ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe");
            }
        }
        else if (cmd == IKCP_CMD_WINS) {
            // do nothing
            if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) {
                ikcp_log(kcp, IKCP_LOG_IN_WINS,
                    "input wins: %lu", (unsigned long)(wnd));
            }
        }
        else {
            return -3;
        }

        data += len;
        size -= len;
    }

    if (flag != 0) {
        ikcp_parse_fastack(kcp, maxack, latest_ts);
    }

    if (_itimediff(kcp->snd_una, prev_una) > 0) {   // 有包被确认了则更新
        if (kcp->cwnd < kcp->rmt_wnd) {         // 判断当前发送窗口是否小于对方的可接收窗口
            IUINT32 mss = kcp->mss;
            if (kcp->cwnd < kcp->ssthresh) {
                kcp->cwnd++;
                kcp->incr += mss;
            }   else {
                if (kcp->incr < mss) kcp->incr = mss;
                kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
                if ((kcp->cwnd + 1) * mss <= kcp->incr) {
                #if 1
                    kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
                #else
                    kcp->cwnd++;
                #endif
                }
            }
            if (kcp->cwnd > kcp->rmt_wnd) {
                kcp->cwnd = kcp->rmt_wnd;
                kcp->incr = kcp->rmt_wnd * mss;
            }
        }
    }

    return 0;
}

  

 

kcp算法的tick模块,每次更新的函数

void ikcp_flush(ikcpcb *kcp)
{
    IUINT32 current = kcp->current;
    char *buffer = kcp->buffer;
    char *ptr = buffer;
    int count, size, i;
    IUINT32 resent, cwnd;
    IUINT32 rtomin;
    struct IQUEUEHEAD *p;
    int change = 0;              // 是否触发快速重传
    int lost = 0;                // 是否存在丢包,需要重传
    IKCPSEG seg;

    // 'ikcp_update' haven't been called.  flush只有在update后有东西需要发送才执行
    if (kcp->updated == 0) return;

    seg.conv = kcp->conv;
    seg.cmd = IKCP_CMD_ACK;
    seg.frg = 0;
    seg.wnd = ikcp_wnd_unused(kcp);
    seg.una = kcp->rcv_nxt;
    seg.len = 0;
    seg.sn = 0;
    seg.ts = 0;

    // flush acknowledges
    // 将acklist中的ack报文发送出去,在ikcp_input中会通过ikcp_ack_push将收到的报文的sn和ts字段写入acklist中
    count = kcp->ackcount;
    for (i = 0; i < count; i++) {
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    kcp->ackcount = 0;

    // probe window size (if remote window size equals zero)
    //  检查当前是否需要对远端窗口进行探测 
    if (kcp->rmt_wnd == 0) {
        if (kcp->probe_wait == 0) {
            kcp->probe_wait = IKCP_PROBE_INIT;
            kcp->ts_probe = kcp->current + kcp->probe_wait;         // 初始化探测间隔和下一次探测时间
        }   
        else {
            if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {     // 发生超时
                if (kcp->probe_wait < IKCP_PROBE_INIT) 
                    kcp->probe_wait = IKCP_PROBE_INIT;
                kcp->probe_wait += kcp->probe_wait / 2;             // 等待时间变为原来的 1.5 倍
                if (kcp->probe_wait > IKCP_PROBE_LIMIT)
                    kcp->probe_wait = IKCP_PROBE_LIMIT;
                kcp->ts_probe = kcp->current + kcp->probe_wait;     // 重新设置等待时间
                kcp->probe |= IKCP_ASK_SEND;
            }
        }
    }   else {
        kcp->ts_probe = 0;
        kcp->probe_wait = 0;
    }

    // flush window probing commands
    if (kcp->probe & IKCP_ASK_SEND) {
        seg.cmd = IKCP_CMD_WASK;
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    // flush window probing commands
    if (kcp->probe & IKCP_ASK_TELL) {
        seg.cmd = IKCP_CMD_WINS;
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    kcp->probe = 0;

    // calculate window size
    cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
    if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

    // move data from snd_queue to snd_buf
    while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
        IKCPSEG *newseg;
        if (iqueue_is_empty(&kcp->snd_queue)) break;

        newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

        iqueue_del(&newseg->node);
        iqueue_add_tail(&newseg->node, &kcp->snd_buf);
        kcp->nsnd_que--;
        kcp->nsnd_buf++;

        newseg->conv = kcp->conv;
        newseg->cmd = IKCP_CMD_PUSH;
        newseg->wnd = seg.wnd;
        newseg->ts = current;
        newseg->sn = kcp->snd_nxt++;
        newseg->una = kcp->rcv_nxt;
        newseg->resendts = current;
        newseg->rto = kcp->rx_rto;
        newseg->fastack = 0;
        newseg->xmit = 0;
    }

    // calculate resent
    resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
    rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;

    // flush data segments
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        if (segment->xmit == 0) {
            needsend = 1;
            segment->xmit++;
            segment->rto = kcp->rx_rto;
            segment->resendts = current + segment->rto + rtomin;        // 重传时间 = 当前时间 + rto + 1/8 * rto (如果是nodelay则 只加rto)
        }
        else if (_itimediff(current, segment->resendts) >= 0) {
            needsend = 1;
            segment->xmit++;
            kcp->xmit++;
            if (kcp->nodelay == 0) {
                segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);         // 重传 更新包体的rto, rto翻倍
            }   else {
                IINT32 step = (kcp->nodelay < 2)? 
                    ((IINT32)(segment->rto)) : kcp->rx_rto;
                segment->rto += step / 2;                                           // 重传 更新包体rto,增加为原来的 3/2 
            }
            segment->resendts = current + segment->rto;
            lost = 1;
        }
        else if (segment->fastack >= resent) {                  // 触发了快速重传则立刻发送
            if ((int)segment->xmit <= kcp->fastlimit || 
                kcp->fastlimit <= 0) {
                needsend = 1;
                segment->xmit++;
                segment->fastack = 0;
                segment->resendts = current + segment->rto;                     // 有快速重传模式下则rto保持不变
                change++;
            }
        }

        if (needsend) {
            int need;
            segment->ts = current;
            segment->wnd = seg.wnd;
            segment->una = kcp->rcv_nxt;

            size = (int)(ptr - buffer);
            need = IKCP_OVERHEAD + segment->len;

            if (size + need > (int)kcp->mtu) {
                ikcp_output(kcp, buffer, size);
                ptr = buffer;
            }

            ptr = ikcp_encode_seg(ptr, segment);

            if (segment->len > 0) {
                memcpy(ptr, segment->data, segment->len);
                ptr += segment->len;
            }

            if (segment->xmit >= kcp->dead_link) {
                kcp->state = (IUINT32)-1;
            }
        }
    }

    // flash remain segments
    size = (int)(ptr - buffer);
    if (size > 0) {
        ikcp_output(kcp, buffer, size);
    }

    // update ssthresh    触发了快速重传,更新拥塞窗口的大小
    if (change) {
        IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
        kcp->ssthresh = inflight / 2;                       // 拥塞窗口阈值变为已发送窗口的一半
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;
        kcp->cwnd = kcp->ssthresh + resent;
        kcp->incr = kcp->cwnd * kcp->mss;
    }

    if (lost) {        // 发生丢包
        kcp->ssthresh = cwnd / 2;           // 阈值调整为当前窗口的一半
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }

    if (kcp->cwnd < 1) {
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }
}

  

kcp封装flush的函数是 ikcp_update , 包括更新当前时间戳 还有一些设置 setmtu 设置传输大小,interval 为设置更新频率,nodelay 设置模式为NoDelay, 设置window size,获取kcp的协议号

 

 

四,KCP协议的特点

1, RTO不翻倍 在nodelay模式下,对超时需要重传的数据报文,更新其超时重传时间为原来的 3/2 ,避免超时重传时间不断翻倍,增加时延   2,快速重传 kcp对于收到的确认号之前未确认的报文,尽管未在超时时间也会触发重传   3,选择性重传 相比TCP对于没有收到的包序号之后的包全部需要重传来说,kcp只需要传输丢失的包 TCP只有una,而kcp都有una和ack   4,非退让流控 TCP的发送窗口的大小由4个因素决定,发送缓存大小,接收端剩余接受缓存大小,丢包退让,慢启动 kcp的发送窗口只受到发送缓存大小和接收端剩余接收缓存大小的两个因素的限制

 

 

五,KCP建立连接的方式

方式1,借助TCP的连接 先建立TCP连接,建立成功后相互传递好事先准备好的 (id key),给UDP使用  

方式2,直接UDP发报文模拟TCP三次握手来建立连接

 

 

六,KCP的实际应用

在实际的应用中,kcp所基于的UDP协议数据报文在电信运营商层面是比较不受待见的

 

在电信运营商层面来说,TCP也是一个可控的协议,“运营商友好”,运营商可以依据 TCP 状态机能以任意粒度对 TCP 连接进行任意管控,包括不限于限速,整形,重置,计费,TCP 和运营商是互惠的。 UDP则为不可控,UDP 像脱缰野马变幻莫测,运营商只能一刀切,比如高峰期给所有 UDP 流量仅 10 % 的资源份额。 这是因为TCP的连接导向的特性,让运营商能够在连接建立和关闭的过程中获取到相关信息。TCP报文的序列号,确认号,窗口大小等,运营商可以监控这些来获取连接的状态和数据流动的情况,判断数据的完整性和丢包情况。   而UDP的面向无连接的协议,数据包之间独立性很强,运营商很难追踪和监控UDP数据包的完整流动。 因为难以管理和监控和协议的不可控,会给路线造成较大的网络波动和无序传输,运营商也没办法维护网络的公平性稳定性;在安全性上也有较大挑战,比如UDP洪泛攻击,攻击者可以大量发送UDP包来淹没网络目标,导致网络拥塞和服务不可用。   所以,一些运营商会限制UDP的流量,甚至一些网络会禁止UDP协议的使用。

 

标签:seg,IUINT32,int,ikcp,len,剖析,源码,KCP,kcp
From: https://www.cnblogs.com/zhang-yd/p/17904652.html

相关文章

  • Kubernetes: client-go 源码剖析(一)
    0.前言在看kube-scheduler组件的过程中遇到了kube-scheduler对于client-go的调用,泛泛的理解调用过程总有种隔靴搔痒的感觉,于是调转头先把client-go理清楚在回来看kube-scheduler。为什么要看client-go,并且要深入到原理,源码层面去看。很简单,因为它很重要。重要在两方......
  • java智慧校园物联网平台源码
    智慧校园特征综合运用物联网、大数据、人工智能等新兴技术;构建智能感知环境,构建新式的教务课堂空間,智能识别老师学生群体的学习、工作场景和个体特性;促进教课、学习、管理、生活和文化的流程优化与体统重构;提升教育人才培养质量和教育管理决策水平;建立“可认知、可诊断、可分......
  • cas客户端流程详解(源码解析)--单点登录
    博主之前一直使用了cas客户端进行用户的单点登录操作,决定进行源码分析来看cas的整个流程,以便以后出现了问题还不知道是什么原因导致的cas主要的形式就是通过过滤器的形式来实现的,来,贴上示例配置:<listener><listener-class>org.jasig.cas.client.session.SingleSig......
  • 基于SpringBoot的网上租赁系统-计算机毕业设计源码+LW文档
    摘要本课题是根据用户的需要以及网络的优势建立的一个基于SpringBoot的网上租贸系统,来满足用户网络商品租赁的需求。本网上租贸系统应用Java技术,MYSQL数据库存储数据,基于SpringBoot框架开发。在网站的整个开发过程中,首先对系统进行了需求分析,设计出系统的主要功能模块,其次对网......
  • 基于Web足球青训俱乐部管理后台系统-计算机毕业设计源码+LW文档
    摘要随着社会经济的快速发展,人们对足球俱乐部的需求日益增加,加快了足球健身俱乐部的发展,足球俱乐部管理工作日益繁忙,传统的管理方式已经无法满足足球俱乐部管理需求,因此,为了提高足球俱乐部管理效率,足球俱乐部管理后台系统应运而生。本文重点阐述了足球青训俱乐部管理后台系统的......
  • 社区医院信息平台-计算机毕业设计源码+LW文档
    社区医院信息平台摘要随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了社区医院信息平台的开发全过程。通过分析社区医院信息平台管理的不足,创建了一个计算机管理社区医院信息平台的方案。文章介绍了社区医院信息平台的系统分析部分......
  • 师生健康信息管理系统-计算机毕业设计源码+LW文档
    摘要随着移动应用技术的发展,越来越多的用户借助于移动手机、电脑完成生活中的事务,许多的传统行业也更加重视与互联网的结合。本论文主要介绍基于java的师生健康信息管理系统,运用软件工程原理和开发方法,采用springboot框架构建的一个管理系统。整个开发过程首先对软件系统进行需求......
  • 谁都能看懂的单点登录(SSO)实现方式(附源码)
    SSO的基本概念SSO英文全称SingleSignOn(单点登录)。SSO是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。它包括可以将这次主要的登录映射到其他应用中用于同一个用户的登录的机制。它是目前比较流行的企业业务整合的解决方案之一。(本段内容来自百度百科)今......
  • 打赏源码|视频打赏源码h5附搭建教程
     在互联网时代,视频成为了人们获取信息和娱乐的主要方式之一。而随着内容创作者的崛起,视频平台也逐渐成为了他们展示才华和创作成果的舞台。然而,仅仅创作优质的内容并不足以维持创作者的持续创作热情和生活稳定。因此,视频打赏源码的出现,为内容创作者提供了一个重要的价值变现途径......
  • netty源码:(1)NioEventLoopGroup
    EventLoopGroupbossGroup=newNioEventLoopGroup();不加参数创建NioEventLoopGroup的话,会使用cpu核数*2作为bossGroup的线程数。......