首页 > 其他分享 >KCP详解

KCP详解

时间:2024-11-09 16:17:17浏览次数:3  
标签:struct int kcp 详解 fd KCP conn addr

1.介绍

        KCP是一种在应用层的旨在优化网络传输性能的快速的可靠的协议,KCP本身并不会直接处理底层网络通信,而是作为一个中间层协议,其通常基于UDP,这意味着用户要自己定义底层的发送方式,并且通过回调传递给KCP。

2.KCP原理

        2.1网络传输如何做到可靠

     目前业界用于实现网络可靠传输方式有以下方式:

  1.  ACK机制
  2. 重传机制
  3. 序号机制
  4. 重派机制
  5. 窗口机制 

        以上机制并不是单独存在,一般会有多中机制共存,如TCP,以此来保证数据的可靠传输,所谓可靠, 指的是数据能够正常收到,且能够顺序收到。

        2.1.1ARQ协议

        ARQ协议(Automatic Repeat-reQuest),即自动重传请求,是传输层的错误纠正协议之一,它通过使用确认和超时两个机制,在不可靠的网络上实现可靠的信息传输。ARQ协议主要有3种模式:

  1. 即停等式(stop-and-wait)
  2. 回退N帧(go-back-n)
  3. 选择重传(selectiverepeat)

        TCP协议能够做到可靠传输,正是基于此。

        2.1.1.1即停等式 

        即停等式工作原理如下:

  1. 发送方对接收方发送数据包,然后等待接收方回复ACK并且开始计时。

  2. 在等待过程中,发送方停止发送新的数据包。

  3. 当数据包没有成功被接收方接收,接收方不会发送ACK.这样发送方在等待一定时间后,重新发送数据包

  4. 反复以上步骤直到收到从接收方发送的ACK

        整体流程图为下:

 

        即停等式的缺点也很明显,发送一帧数据必须要等待ACK确认,才会发送后面的数据,这也导致有较长的等待时间,会拉低数据的传输速度。

        2.1.1.2回退N帧 (GBN)

        为了克服停等协议长时间等待ACK的缺陷,连续ARQ协议会连续发送一组数据包,然后再等待这些数据包的ACK。发送方和接收方都会维护一个数据帧的序列,这个序列被称作窗口,也就是滑动窗口。发送方的窗口大小由接收方确定,目的在于控制发送速度,以免接收方的缓存不够大,而导致溢出,同时控制流量也可以避免网络拥塞。协议中规定,对于窗口内未经确认的分组需要重传。

        回退N步协议允许发送方在等待超时的间歇,可以继续发送分组。所有发送的分组,都带有序号。在GBN协议中,发送方需响应以下三种事件:

  1. 上层的调用。上层调用相应send()时,发送方首先要检查发送窗口是否已满
  2. 接收ACK。在该协议中,对序号为n的分组的确认采取累积确认的方式,表明接收方已正确接收到序号n以前(包括n)的所有分组

  3. 超时,若出现超时,发送方将重传所有已发出但还未被确认的分组

        对于接收方来说,若一个序号为n的分组被正确接收,并且按序,则接收方会为该分组返回一个ACK给发送方,并将该分组中的数据交付给上层。在其他情况下,接收方都会丢弃分组。若分组n已接收并交付,那么所有序号比n小的分组也已完成了交付。因此GBN采用累积确认是一个很自然的选择。发送方在发完一个窗口里的所有分组后,会检查最大的有效确认,然后从最大有效确认的后一个分组开始重传。

        流程图如下:

       如上图所示,序号为2的分组丢失,因此分组2及之后的分组都将被重传。

       GBN采用了序号机制,累积确认以及计时重传等机制来保证网络的可靠传输。

        2.1.1.3选择重传 (SR)

        虽然GBN改善了停等协议中时间等待较长的缺陷,但它依旧存在着性能问题。特别是当窗口长度很大的时候,会使效率大大降低。而SR协议通过让发送方仅重传在接收方丢失或损坏了的分组,从而避免了不必要的重传,提高了效率。 

        在SR协议下,发送方需响应以下三种事件:

  1. 从上层收到数据。当从上层收到数据后,发送方需检查下一个可用于该分组的序号。若序号在窗口中则将数据发送。

  2. 接收ACK。若收到ACK,且该分组在窗口内,则发送方将那个被确认的分组标记为已接收。若该分组序号等于基序号,则窗口序号向前移动到具有最小序号的未确认分组处。若窗口移动后并且有序号落在窗口内的未发送分组,则发送这些分组。

  3. 超时。若出现超时,发送方将重传已发出但还未确认的分组。与GBN不同的是,SR协议中的每个分组都有独立的计时器。

        在SR协议下,接收方需响应以下三种事件: 

  1. 序号在[4,7]内的分组被正确接收。该情况下,收到的分组落在接收方的窗口内,一个ACK 将发送给发送方。若该分组是以前没收到的分组,则被缓存。若该分组的序号等于基序号4,则该分组以及以前缓存的序号连续的分组都交付给上层,然后,接收窗口将向前移动。(假设接收窗口的基序号为4,分组长度也为4)

  2. 序号在[0,3]内的分组被正确接收。在该情况下,必须产生一个ACK,尽管该分组是接收方以前已确认过的分组。若接收方不确认该分组,发送方窗口将不能向前移动。

  3. 其他情况。忽略该分组对于接收方来说,若一个分组正确接收而不管其是否按序,则接收方会为该分组返回一个ACK 给发送方。失序的分组将被缓存,直到所有丢失的分组都被收到,这时才可以将一批分组按序交付给上层。

        流程如图:

 

        2.1.2RTT和RTO 

  • RTO(Retransmission TimeOut)即重传超时时间 

  • RTT(Round-Trip Time): 往返时延。表示从发送端发送数据开始,到发送端收到来自接收端的确认(接收端收到数据后便立即发送确认),总共经历的时延由三部分组成:链路的传播时间(propagation delay)、末端系统的处理时间、路由器缓存中的排队和处理时间(queuing delay)其中,前两个部分的值对于一个TCP连接相对固定,路由器缓存中的排队和处理时间会随着整个网络拥塞程度的变化而变化。 所以RTT的变化在一定程度上反应网络的拥塞程度

        2.1.3流量控制 

        双方在通信的时候,发送方的速率与接收方的速率是不一定相等,如果发送方的发送速率太快,会导致接收方处理不过来,这时候接收方只能把处理不过来的数据存在缓存区里(失序的数据包也会被存放在缓存区里)接收缓存。 

  • 如果缓存区满了发送方还在疯狂着发送数据,接收方只能把收到的数据包丢掉,大量的丢包会极大着浪费网络资源,因此,我们需要控制发送方的发送速率,让接收方与发送方处于一种动态平衡才好。

  • 对发送方发送速率的控制,称之为流量控制。

  • 公平使用带宽 100M 10个 10M左右

  • 接收方每次收到数据包,可以在发送确定报文的时候,同时告诉发送方自己的缓存区还剩余多少是空闲的,我们也把缓存区的剩余大小称之为接收窗口大小,用变量win来表示接收窗口的大小。

  • 发送方收到之后,便会调整自己的发送速率,也就是调整自己发送窗口的大小,当发送方收到接收窗口的大小为0时,发送方就会停止发送数据,防止出现大量丢包情况的发生。

        如下图:

        当发送方停止发送数据后,该怎样才能知道自己可以继续发送数据?

  1. 当接收方处理好数据,接受窗口 win > 0 时,接收方发个通知报文去通知发送方,告诉他可以继续发送数据了。当发送方收到窗口大于0的报文时,就继续发送数据。

  2. 当发送方收到接受窗口 win = 0 时,这时发送方停止发送报文,并且同时开启一个定时器,每隔一段时间就发个测试报文去询问接收方,打听是否可以继续发送数据了,如果可以,接收方就告诉他此时接受窗口的大小;如果接受窗口大小还是为0,则发送方再次刷新启动定时器。

        如下图:

 

         2.1.4拥塞控制

        拥塞控制和流量控制虽然采取的动作很相似,但拥塞控制与网络的拥堵情况相关联,而流量控制与接收方的缓存状态相关联。

        2.2KCP是如何做的? 

         KCP以10%-20%带宽浪费的代价换取了比 TCP快30%-40%的传输速度,对于RTO,TCP超时计算是RTOx2,这样连续丢三次包就变成RTOx8了,十分恐怖,而KCP启动快速模式后不x2,只是x1.5(实验证明1.5这个值相对比较好),提高了传输速度,下图以RTO=100ms为例:

 

        对于重传,TCP丢包时会全部重传从丢的那个包开始以后的数据,KCP是选择性重传,只重传真正丢失的数据包,发送端发送了1,2,3,4,5几个包,然后收到远端的ACK: 1, 3, 4, 5,当收到ACK3时,KCP知道2被跳过1次,收到ACK4时,知道2被跳过了2次,此时可以认为2号丢失,不用等超时,直接重传2号包,大大改善了丢包时的传输速度。

        对于ACK,TCP为了充分利用带宽,延迟发送ACK(NODELAY-针对发送的都没用),这样超时计算会算出较大 RTT时间,延长了丢包时的判断过程。KCP的ACK是否延迟发送可以调节

        对于UNA,ARQ模型响应有两种,UNA(此编号前所有包已收到,如TCP)和ACK(该编号包已收到),光用UNA将导致全部重传,光用ACK则丢失成本太高,以往协议都是二选其一,而KCP协议中,除去单独的 ACK包外,所有包都有UNA信息

        最后,KCP还会不讲武德,KCP正常模式同TCP一样使用公平退让法则,即发送窗口大小由:发送缓存大小、接收端剩余接收缓存大小、丢包退让及慢启动这四要素决定。但传送及时性要求很高的小数据时,可选择通过配置跳过后两步,仅用前两项来控制发送频率。以牺牲部分公平性及带宽利用率之代价,换取了开着BT都能流畅传输的效果

KCP仓库地址 

3.代码示例

        3.1简单demo

                我们先来做一个KCP的简单demo:

         server端:


#include "ikcp.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/time.h>
#include <pthread.h>
#include <errno.h>

typedef struct conn {
    int fd;
    struct sockaddr_in saddr;
    socklen_t slen;

    struct sockaddr_in caddr;
    socklen_t clen;
}conn;

static inline void itimeofday(long *sec, long *usec)
{
	struct timeval time;
	gettimeofday(&time, NULL);
	if (sec) *sec = time.tv_sec;
	if (usec) *usec = time.tv_usec;
}

static inline int64_t iclock64()
{
	long s, u;
	int64_t value;
	itimeofday(&s, &u);
	value = ((int64_t)s) * 1000 + (u / 1000);
	return value;
}

int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) {
    conn* c = (conn*)user;
    int ret = sendto(c->fd, (const void *)buf, len, 0, (struct sockaddr*)&c->caddr, c->clen);
    printf("sendto client ip %s, port %d, send %s, len %d send sucees %d\n", inet_ntoa(c->caddr.sin_addr), ntohs(c->caddr.sin_port), buf, len, ret);
    return ret;
}

int is_close = 0;
void* update(void* arg) {
    ikcpcb *kcp = (ikcpcb*)arg;
    while(!is_close) {
        ikcp_update(kcp, (int32_t)iclock64());
        usleep(10);
    }
}

int check(int ret) {
    if (ret == -1) {
        if (errno != EAGAIN)
            perror("recvform error!");

        return 0;
    }

    if(ret < 24) {
        return 0;
    }

    return 1;
}

int main() {
    conn user;

    int sfd = socket(PF_INET, SOCK_DGRAM, 0);
    if (sfd < 0) {
        return -1;
    }
    
    struct sockaddr_in s_addr;
    s_addr.sin_family = PF_INET;
    s_addr.sin_port = htons(9999);
    s_addr.sin_addr.s_addr = INADDR_ANY;
    socklen_t slen = sizeof(s_addr);

    struct sockaddr_in c_addr;
    c_addr.sin_family = PF_INET;
    c_addr.sin_port = htons(10000);
    c_addr.sin_addr.s_addr = inet_addr("192.168.126.129");
    socklen_t clen = sizeof(c_addr);

    int opt = 1;
    if(setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &opt,sizeof(opt))){
        exit(1);
    }

    int reuse = 1;
    if(setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse))){
        exit(1);
    }

    int rs = bind(sfd, (struct sockaddr*)&s_addr, sizeof(struct sockaddr));
    if(rs < 0) {
        close(sfd);
        perror("chid bind");
        return -1;
    }

    user.saddr = s_addr;
    user.fd = sfd;
    user.slen = slen;
    user.caddr = c_addr;
    user.clen = clen;

    ikcpcb *kcp = ikcp_create(0x1234, &user);
    kcp->output = udp_output; //设置发送函数

    // 配置窗口大小:平均延迟200ms,每20ms发送一个包,
	// 而考虑到丢包重发,设置最大收发窗口为128
	ikcp_wndsize(kcp, 128, 128);

    // 第二个参数 nodelay-启用以后若干常规加速将启动
    // 第三个参数 interval为内部处理时钟,默认设置为 10ms
    // 第四个参数 resend为快速重传指标, 
    // 第五个参数 为是否禁用常规流控, 
    ikcp_nodelay(kcp, 0, 10, 0, 0);
    
    pthread_t p = 0;
    pthread_create(&p, NULL, update, (void*)kcp);

    char buf1[4096] = {0};
    char buf2[4096] = {0};
    while(1) {
        memset(buf1, 0, 4096);
        struct sockaddr_in tmp;
        socklen_t ll = sizeof(tmp);
        int len = recvfrom(sfd, buf1, 4096, 0, (struct sockaddr*)&tmp, &ll);
        if(len < 0) {
            perror("recvfrom");
            continue;
        }

        if(!check(len)) {
            printf("check fail\n");
            continue;
        }

        printf("recvfrom %d\n", len);

        //这里是会有问题,多个client会出问题
        int ret = ikcp_input(kcp, buf1, len);
        if(0 != ret) {
            //出错,不是kcp数据包
            printf("ikcp_input error\n");
            break;
        }

        //memset(buf2, 0, 2048);
        len = ikcp_recv(kcp, buf2, 4096);
        if(len <= 0) {
            continue;
        }
        buf2[len] = '\0';

        printf("recv: %s, len = %d, buf1 %s\n", buf2, len, buf1);
        len = ikcp_send(kcp, buf2, len);
    }

    is_close = 1;
    pthread_join(p, NULL);
    close(sfd);
    return 0;
}

        client端:


#include "ikcp.h"
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>

typedef struct conn {
    int fd;
    struct sockaddr_in saddr;
    socklen_t slen;

    struct sockaddr_in caddr;
    socklen_t clen;
}conn;

static inline void itimeofday(long *sec, long *usec)
{
	struct timeval time;
	gettimeofday(&time, NULL);
	if (sec) *sec = time.tv_sec;
	if (usec) *usec = time.tv_usec;
}

static inline int64_t iclock64()
{
	long s, u;
	int64_t value;
	itimeofday(&s, &u);
	value = ((int64_t)s) * 1000 + (u / 1000);
	return value;
}

int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) {
    conn* c = (conn*)user;
    int ret = sendto(c->fd, (const void *)buf, len, 0, (struct sockaddr*)&c->saddr, c->slen);
    printf("send server ip %s, port %d, send %s, len %d, send sucees %d\n", inet_ntoa(c->saddr.sin_addr), ntohs(c->saddr.sin_port), buf, len, ret);
    return ret;
}

int num = 0;
void* update(void* arg) {
    ikcpcb *kcp = (ikcpcb*)arg;
    while(!num) {
        ikcp_update(kcp, (int32_t)iclock64());
        usleep(10);
    }
}


int main() {
    conn user;

    int socketFd;
    struct sockaddr_in peer_Addr;
    peer_Addr.sin_family = PF_INET;
    peer_Addr.sin_port = htons(9999);
    peer_Addr.sin_addr.s_addr = inet_addr("192.168.126.128");

    struct sockaddr_in self_Addr;
    self_Addr.sin_family = PF_INET;
    self_Addr.sin_port = htons(10000);
    self_Addr.sin_addr.s_addr = inet_addr("0.0.0.0"); 
    
    if ((socketFd = socket(PF_INET, SOCK_DGRAM| SOCK_CLOEXEC, IPPROTO_UDP)) == -1) {
        perror("child socket");
        exit(1);
    }

    int opt = 1;
    if(setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &opt,sizeof(opt))){
        exit(1);
    }

    if(setsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))){
        exit(1);
    }

    if (bind(socketFd, (struct sockaddr *) &self_Addr, sizeof(struct sockaddr))){
        perror("chid bind");
        exit(1);
    } 
    user.fd = socketFd;
    //user.saddr = peer_Addr;
    printf("server ip %s, port %d\n", inet_ntoa(peer_Addr.sin_addr), ntohs(peer_Addr.sin_port));
    memcpy(&user.saddr, &peer_Addr, sizeof(struct sockaddr_in));
    user.slen = sizeof(peer_Addr);
    //user.caddr = self_Addr;
    memcpy(&user.caddr, &self_Addr, sizeof(struct sockaddr_in));
    user.clen = sizeof(self_Addr);

    ikcpcb *kcp = ikcp_create(0x1234, &user);
    kcp->output = udp_output; //设置发送函数

    // 配置窗口大小:平均延迟200ms,每20ms发送一个包,
	// 而考虑到丢包重发,设置最大收发窗口为128
	ikcp_wndsize(kcp, 128, 128);

    // 第二个参数 nodelay-启用以后若干常规加速将启动
    // 第三个参数 interval为内部处理时钟,默认设置为 10ms
    // 第四个参数 resend为快速重传指标, 
    // 第五个参数 为是否禁用常规流控,    
    ikcp_nodelay(kcp, 0, 10, 0, 0);
    pthread_t p = 0;
    pthread_create(&p, NULL, update, (void*)kcp);
    char buffer[2050] = {0};
    char rbuffer[2048] = {0};
    while(1) {
        getchar();
        //一定要发1400以下,不然分包server处理不了
        memset(buffer, 'a', 1000);
        ikcp_send(kcp, buffer, 1000);

        
        
        user.slen = sizeof(user.saddr);
        // 接收服务器返回的数据
        int nread = recvfrom(socketFd, rbuffer, 2048, 0, (struct sockaddr*)&user.saddr, &user.slen);
        if (nread == -1) { 
            perror("recvfrom");
            continue;         
        }

        int ret = ikcp_input(kcp, rbuffer, nread);
        if(0 != ret) {
            //出错,不是kcp数据包
            break;
        }

        memset(buffer, 0, 2048);
        ikcp_recv(kcp, buffer, 2048);

        printf("server : %s, len %ld\n", buffer, strlen(buffer));
    }
    
    num = 1;
    pthread_join(p, NULL);  
    close(socketFd);

    return 0;
}

        接下来看看运行结果:

 

        3.2优化UDP服务器 

          我们的代码基于UDP并发编程 的基础上做优化,以KCP来保证UDP server数据传输的可靠性,接下来先来说说简单思路。

        首先,我们对我们UDP server用于监听的fd的conv(ikcp_create函数的第一个参数,他必须是唯一的)做定义,我们这里取1;然后UDP客户端连接先用1来连接,我们先约定好。

         之后,对每条连接从新创建一个kcp块,它的conv由server端分配。

         OK,思路理清楚后我们上代码。

         server端代码如下:


#include "ikcp.h"

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/time.h>

#define HELLO "hello"

typedef int (*rcallback)(int sd);
typedef int (*wcallback)(int sd);
typedef int (*acallback)(int sd);

typedef struct conn {
    int fd;

    union {
        rcallback _rcall;
        acallback _acall;
    } rcall;

    wcallback wcall;

    char rbuffer[1024];
    char wbuffer[1024];

    struct sockaddr_in saddr;
    struct sockaddr_in caddr;

    socklen_t slen;
    socklen_t clen;
    int rlen;
    int wlen;

    ikcpcb* kcp;
} conn;

conn conn_list[2048] = {0};
int epfd = 0;
int connect_port = 10000;
int conv = 0x1234;

int is_close = 0;
int index_pid = 0;
ikcpcb* arr[10] = {0};

static inline void itimeofday(long *sec, long *usec)
{
	struct timeval time;
	gettimeofday(&time, NULL);
	if (sec) *sec = time.tv_sec;
	if (usec) *usec = time.tv_usec;
}

static inline int64_t iclock64()
{
	long s, u;
	int64_t value;
	itimeofday(&s, &u);
	value = ((int64_t)s) * 1000 + (u / 1000);
	return value;
}
void* update(void* arg) {
    while(!is_close) {
        for(int i = 0; i < 10; ++i) {
            ikcpcb* tmp = arr[i];
            if(tmp) {
                ikcp_update(tmp, (int32_t)iclock64());
            }
        }
        
        usleep(10);
    }
}

void mod_event(int fd, int event) {
    struct epoll_event ev;
    ev.events = event;
    ev.data.fd = fd;
    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}

int handle_kcp_data(ikcpcb* kcp, char* data, int dlen, char* buf, int blen) {
    int ret = ikcp_input(kcp, data, dlen);
    if(ret != 0) {
        return -1;
    }

    ret = ikcp_recv(kcp, buf, blen);
    return ret;
}

int read_callback(int sd) {
    conn* c = &conn_list[sd];
    c->rlen = recvfrom(sd, c->rbuffer, 1024, 0, NULL, NULL);
    //将读到的数据直接放进写缓冲区
    if((c->wlen = handle_kcp_data(c->kcp, c->rbuffer, c->rlen, c->wbuffer, 1024)) < 0) {
        return -1;
    }

    mod_event(sd, EPOLLOUT);
    return c->rlen;
}

int write_callback(int sd) {
    conn* c = &conn_list[sd];
    ikcp_send(c->kcp, c->wbuffer, c->wlen);
    mod_event(sd, EPOLLIN);
    return c->wlen;
}

int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) {
    conn* c = (conn*)user;
    int ret = sendto(c->fd, (const void *)buf, len, 0, (struct sockaddr*)&c->caddr, c->clen);
    printf("sendto client ip %s, port %d, send %s, len %d send sucees %d, fd %d\n", inet_ntoa(c->caddr.sin_addr), ntohs(c->caddr.sin_port), buf, len, ret, c->fd);
    return ret;
}

int udp_accept(int sd)
{
    int new_fd = -1;
    int ret = 0;
    int reuse = 1;
    char buf[128] = {0};
    struct sockaddr_in peer_addr;
    socklen_t cli_len = sizeof(peer_addr);
   
    //1.拿到本地信息,然后改一个端口,于client建立另外一条连接
    struct sockaddr_in my_addr = conn_list[sd].saddr;
    my_addr.sin_port = htons(connect_port++);

    //2.接受client的握手数据
    ret = recvfrom(sd, buf, 128, 0, (struct sockaddr *)&peer_addr, &cli_len);
    if (ret < 0) {
		return -1;
    }

   // memcmp(&conn_list[sd].caddr, &peer_addr, cli_len);
    conn_list[sd].caddr.sin_addr.s_addr = peer_addr.sin_addr.s_addr;
    conn_list[sd].caddr.sin_port = peer_addr.sin_port;
    conn_list[sd].clen = cli_len;

    char data[128] = {0};
    if(handle_kcp_data(conn_list[sd].kcp, buf, ret, data, 128) < 0) {
        return -1;
    }

    //3.创建一个新的fd
    if ((new_fd = socket(PF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0)) == -1) {
        return -1;
    }

    int opt = 1;
    if(setsockopt(new_fd, SOL_SOCKET, SO_REUSEADDR, &opt,sizeof(opt))){
        exit(1);
    }

    if(setsockopt(new_fd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse))){
        exit(1);
    }

    //4.与新端口绑定
    ret = bind(new_fd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr));
    if (ret){
        close(new_fd);
        return -1;
    }

    //5.连接,设置默认目标地址
    peer_addr.sin_family = PF_INET;
    if (connect(new_fd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) {
        close(new_fd);
        return -1;
    } 

    //我们是单线程,暂时先这么写
    memset(conn_list[sd].wbuffer, 0, 1024);
    sprintf(conn_list[sd].wbuffer, "%d", conv);
    
    //6.回应client,让对端知道我们的新端口
    //char buffer[16] = "hello client";
    //sendto(new_fd, buffer, strlen(buffer), 0, (struct sockaddr*)&peer_addr,  sizeof(struct sockaddr_in));

    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = new_fd;

    //7.加入epoll
    if(epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &ev) < 0) {
        close(new_fd);
        return -1;
    }

    conn_list[new_fd].fd = new_fd;
    conn_list[new_fd].rcall._acall = read_callback;
    conn_list[new_fd].wcall = write_callback;
    conn_list[new_fd].saddr = my_addr;
    conn_list[new_fd].slen = sizeof(my_addr);
    conn_list[new_fd].caddr = peer_addr;
    conn_list[new_fd].clen = sizeof(peer_addr);
    conn_list[new_fd].kcp = ikcp_create(conv, &conn_list[new_fd]);
    conn_list[new_fd].kcp->output = udp_output;
    ++conv;
    arr[index_pid++] = conn_list[new_fd].kcp;
    memcmp(&conn_list[new_fd].caddr, &peer_addr, cli_len);
    conn_list[new_fd].clen = cli_len;
    ikcp_send(conn_list[sd].kcp, conn_list[sd].wbuffer, sizeof(conv));
    ikcp_send(conn_list[new_fd].kcp, HELLO, strlen(HELLO));
    return new_fd;
}

int main() {

    int sfd = socket(PF_INET, SOCK_DGRAM, 0);
    if (sfd < 0) {
        return -1;
    }
    
    struct sockaddr_in s_addr;
    s_addr.sin_family = PF_INET;
    s_addr.sin_port = htons(9999);
    s_addr.sin_addr.s_addr = INADDR_ANY;
    socklen_t slen = sizeof(s_addr);

    if(bind(sfd, (struct sockaddr*)&s_addr, sizeof(struct sockaddr)) < 0) {
        close(sfd);
        return -1;
    }

    epfd = epoll_create(1);

    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sfd;
    if(epoll_ctl(epfd, EPOLL_CTL_ADD, sfd, &ev) < 0) {
        close(sfd);
        close(epfd);
        return -1;
    }

    conn_list[sfd].fd = sfd;
    conn_list[sfd].rcall._acall = udp_accept;
    conn_list[sfd].wcall = NULL;
    conn_list[sfd].saddr = s_addr;
    conn_list[sfd].slen = sizeof(s_addr);
    conn_list[sfd].kcp = ikcp_create(0x1, &conn_list[sfd]);   
    conn_list[sfd].kcp->output = udp_output;
    arr[index_pid++] = conn_list[sfd].kcp;
    pthread_t pid = 0;
    pthread_create(&pid, NULL, update, NULL);

    printf("udp_server start...\n");
    while(1) {
        struct epoll_event evs[128] = {0};
        int nready = epoll_wait(epfd, evs, 128, -1);
        if(nready > 0) {
            for(int i = 0; i < nready; ++i) {
                int sd = evs[i].data.fd;
                if(evs[i].events & EPOLLIN) {
                    if(conn_list[sd].rcall._rcall) {
                        conn_list[sd].rcall._rcall(sd);
                    }
                }

                if(evs[i].events & EPOLLOUT) {
                    if(conn_list[sd].wcall) {
                        conn_list[sd].wcall(sd);
                    }
                }
            }
        }
    }
    
    is_close = 1;
    pthread_join(pid, NULL);
    close(sfd);
    close(epfd);
    return 0;
}




        client端代码如下:

#include "ikcp.h"

#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <errno.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <strings.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>


#define SO_REUSEPORT    15
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

typedef struct conn {
    struct sockaddr_in s_addr;
    int fd;
}conn;

static inline void itimeofday(long *sec, long *usec)
{
	struct timeval time;
	gettimeofday(&time, NULL);
	if (sec) *sec = time.tv_sec;
	if (usec) *usec = time.tv_usec;
}

static inline int64_t iclock64()
{
	long s, u;
	int64_t value;
	itimeofday(&s, &u);
	value = ((int64_t)s) * 1000 + (u / 1000);
	return value;
}

int num = 0;
void* update(void* arg) {
    ikcpcb *kcp = (ikcpcb*)arg;
    while(!num) {
        if(kcp) {
            ikcp_update(kcp, (int32_t)iclock64());
        }

        usleep(10);
    }
}

int handle_kcp_data(ikcpcb* kcp, char* data, int dlen, char* buf, int blen) {
    int ret = ikcp_input(kcp, data, dlen);
    if(ret != 0) {
        return -1;
    }

    ret = ikcp_recv(kcp, buf, blen);
    return ret;
}

int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) {
    conn* c = (conn*)user;
    int ret = sendto(c->fd, (const void *)buf, len, 0, (struct sockaddr*)&c->s_addr, sizeof(struct sockaddr_in));
    printf("send server ip %s, port %d, send %s, len %d, send sucees %d\n", inet_ntoa(c->s_addr.sin_addr), ntohs(c->s_addr.sin_port), buf, len, ret);
    return ret;
}

void createClient(int id,int myPort,int peerPort){
    int socketFd;
    struct sockaddr_in peer_Addr;
    peer_Addr.sin_family = PF_INET;
    peer_Addr.sin_port = htons(9999);
    peer_Addr.sin_addr.s_addr = inet_addr("192.168.126.128");

    struct sockaddr_in self_Addr;
    self_Addr.sin_family = PF_INET;
    self_Addr.sin_port = htons(10000);
    self_Addr.sin_addr.s_addr = inet_addr("0.0.0.0"); 
    
    if ((socketFd = socket(PF_INET, SOCK_DGRAM| SOCK_CLOEXEC, 0)) == -1) {
        perror("child socket");
        exit(1);
    } 

    int opt = 1;
    if(setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &opt,sizeof(opt))){
            exit(1);
    }
    if(setsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))){
        exit(1);
    }

    if (bind(socketFd, (struct sockaddr *) &self_Addr, sizeof(struct sockaddr))){
        perror("chid bind");
        exit(1);
    }

    conn c1;
    c1.s_addr = peer_Addr;
    c1.fd = socketFd;

    ikcpcb* kcp1 = ikcp_create(0x1, &c1);
    pthread_t p1 = 0;
    kcp1->output = udp_output;
    pthread_create(&p1, NULL, update, (void*)kcp1);   
    
    char buffer[1024] = {0};
    memset(buffer, 0, 1024);
    sprintf(buffer, "hello server %d", 0);
    ikcp_send(kcp1, buffer, strlen(buffer));
    //sendto(socketFd, buffer, strlen(buffer), 0, (struct sockaddr *) &peer_Addr, sizeof(struct sockaddr_in));

    bzero(&peer_Addr, sizeof(peer_Addr));
    char rbuffer[1024] = {0};
    socklen_t slen = sizeof(peer_Addr);
    char data[1024] = {0};
    
    // 接收服务器返回的数据
    int nread = recvfrom(socketFd, rbuffer, 1024, 0, (struct sockaddr*)&peer_Addr, &slen);
    if (nread == -1) { 
        close(socketFd);          
    }

    if(handle_kcp_data(kcp1, rbuffer, nread, data, 1024) < 0) {
        exit(1);
    }


    conn c2;
    c2.s_addr = peer_Addr;
    c2.fd = socketFd;

    int conv = atoi(data);
    ikcpcb* kcp2 = ikcp_create(conv, &c2);
    pthread_t p2 = 0;
    kcp2->output = udp_output;
    pthread_create(&p2, NULL, update, (void*)kcp2);   
    memset(rbuffer, 0, 1024);
    nread = recvfrom(socketFd, rbuffer, 1024, 0, (struct sockaddr*)&peer_Addr, &slen);
    if(handle_kcp_data(kcp2, rbuffer, nread, data, 1024) < 0) {
        exit(1);
    }

    c2.s_addr.sin_addr.s_addr = peer_Addr.sin_addr.s_addr;
    c2.s_addr.sin_port = peer_Addr.sin_port;

    usleep(1000);

    peer_Addr.sin_family = PF_INET;
    if(connect(socketFd, (struct sockaddr *) &peer_Addr, sizeof(struct sockaddr)) == -1) {
        perror("chid connect");
        exit(1);
    }

    memset(buffer, 0, 1024);
    memset(rbuffer, 0, 1024);
    memset(data, 0, 1024);
    strcpy(buffer, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
    //sendto(socketFd, buffer, strlen(buffer), 0, (struct sockaddr *) &peer_Addr, sizeof(struct sockaddr_in));

    ikcp_send(kcp2, buffer, strlen(buffer));

    nread = recvfrom(socketFd, rbuffer, 1024, 0, (struct sockaddr*)&peer_Addr, &slen);
    if(handle_kcp_data(kcp2, rbuffer, nread, data, 1024) < 0) {
        exit(1);
    }

    printf("server echo: %s\n", data);

    ikcp_release(kcp1);
    ikcp_release(kcp2);

    num = 1;
    pthread_join(p1, NULL);
    pthread_join(p2, NULL);
}

void serial(int clinetNum){
    for(int i=1;i<=clinetNum;i++){
        createClient(i,30000+i,9999);
    }
}

int main(int argc, char * argv[])
{ 
    createClient(0 ,30000,9999);
    return 0;
}

        编译运行client代码:

 

        成功发回我们的数据,并且可以看到第二条send server就是ACK包。

 学习参考:

https://github.com/0voice

标签:struct,int,kcp,详解,fd,KCP,conn,addr
From: https://blog.csdn.net/weixin_55951019/article/details/143450326

相关文章

  • Nuxt.js 应用中的 listen 事件钩子详解
    title:Nuxt.js应用中的listen事件钩子详解date:2024/11/9updated:2024/11/9author:cmdragonexcerpt:它为开发者提供了一个自由的空间可以在开发服务器启动时插入自定义逻辑。通过合理利用这个钩子,开发者能够提升代码的可维护性和调试能力。注意处理性能、错误和环......
  • YOLO系列基础(一)卷积神经网络原理详解与基础层级结构说明
    系列文章地址YOLO系列基础(一)卷积神经网络原理详解与基础层级结构说明-CSDN博客YOLO系列基础(二)Bottleneck瓶颈层原理详解-CSDN博客目录卷积神经网络的原理及卷积核详解一、卷积神经网络的原理二、卷积层与卷积核详解卷积核的作用卷积核的设计卷积样例与代码说明:卷积核......
  • Python内置函数1详解案例
    1.列表的最值运算描述牛牛给了牛妹一个一串无规则的数字,牛妹将其转换成列表后,使用max和min函数快速的找到了这些数字的最值,你能用Python代码实现一下吗?输入描述:输入一行多个整数,数字之间以空格间隔。输出描述:输出这些数字的zuizhi示例1输入:35691062输出:10......
  • 鸿蒙next5.0版开发:ArkTS组件点击事件详解
    在HarmonyOS5.0中,ArkTS提供了一套完整的组件和事件处理机制,使得开发者能够创建交互性强的应用程序。本文将详细解读如何使用ArkTS组件处理点击事件,包括事件的注册、回调函数的编写以及事件对象的使用。点击事件基础点击事件是用户与应用交互的基本方式之一。在ArkTS中,点击......
  • 微信小程序scroll-view详解及案例
    需求:实现类似美团外卖。1.点击左侧菜单右侧滚动到对应内容。2.滚动右侧内容左侧对应菜单高亮。一、首先介绍下scroll-view可滚动视图区域。案例用到如下属性(如需了解更多请访问官网https://developers.weixin.qq.com/miniprogram/dev/component/scroll-view.html):以下属性1.0.0版......
  • GoLang协程Goroutiney原理与GMP模型详解
    本文原文地址:GoLang协程Goroutiney原理与GMP模型详解什么是goroutineGoroutine是Go语言中的一种轻量级线程,也成为协程,由Go运行时管理。它是Go语言并发编程的核心概念之一。Goroutine的设计使得在Go中实现并发编程变得非常简单和高效。以下是一些关于Goroutine的关键特性:轻量......
  • CSS中 特性查询(@supports)详解及使用
    1.简介CSS中的@supports用于检测浏览器是否支持CSS的某个属性。其实就是条件判断,如果支持某个属性可以写一套样式,如果不支持某个属性,可以提供另外一套样式作为替补。可以放在代码的顶层,也可以嵌套在任何其他条件组规则中。语法@supports规则由一组样式声明和一条支持条件构......
  • RT DETR v2 TensorRT C++ 部署详解
    RT-DETRv2TensorRTC++部署详解概述随着深度学习技术的发展,目标检测算法在各种应用场景下展现出了卓越的表现。RT-DETRv2(Real-TimeDetectionTransformerv2)作为一款高效的实时目标检测模型,其结合了Transformer架构的优势与传统卷积神经网络(CNNs)的速度,为开发者提供了在......
  • HarmonyOs DevEco Studio小技巧28--部分鸿蒙生命周期详解
    目录前言 页面和自定义组件生命周期页面生命周期onPageShow--- 表示页面已经显示 onPageHide--- 表示页面已经隐藏onBackPress--- 表示用户点击了返回键组件生命周期aboutToAppear---表示组件即将出现onDidBuild--- 表示组件已经构建完成aboutToDisappe......
  • 华为市场ASO详解
    今天我们要讲的是安卓市场华为应用市场新包从0-1如何从应用市场获量,首先我们要先知道的一个市场大环境,不管是从流量,还是付费来看,华为在安卓市场中是为首的。华为目前的市场状态是有点混乱的,马甲包满天飞,同名应用权重非常高,但混乱就有机会,这个市场也是可操作性最强的一个安卓市......