1.介绍
KCP是一种在应用层的旨在优化网络传输性能的快速的可靠的协议,KCP本身并不会直接处理底层网络通信,而是作为一个中间层协议,其通常基于UDP,这意味着用户要自己定义底层的发送方式,并且通过回调传递给KCP。
2.KCP原理
2.1网络传输如何做到可靠
目前业界用于实现网络可靠传输方式有以下方式:
- ACK机制
- 重传机制
- 序号机制
- 重派机制
- 窗口机制
以上机制并不是单独存在,一般会有多中机制共存,如TCP,以此来保证数据的可靠传输,所谓可靠, 指的是数据能够正常收到,且能够顺序收到。
2.1.1ARQ协议
ARQ协议(Automatic Repeat-reQuest),即自动重传请求,是传输层的错误纠正协议之一,它通过使用确认和超时两个机制,在不可靠的网络上实现可靠的信息传输。ARQ协议主要有3种模式:
- 即停等式(stop-and-wait)
- 回退N帧(go-back-n)
- 选择重传(selectiverepeat)
TCP协议能够做到可靠传输,正是基于此。
2.1.1.1即停等式
即停等式工作原理如下:
-
发送方对接收方发送数据包,然后等待接收方回复ACK并且开始计时。
-
在等待过程中,发送方停止发送新的数据包。
-
当数据包没有成功被接收方接收,接收方不会发送ACK.这样发送方在等待一定时间后,重新发送数据包
-
反复以上步骤直到收到从接收方发送的ACK
整体流程图为下:
即停等式的缺点也很明显,发送一帧数据必须要等待ACK确认,才会发送后面的数据,这也导致有较长的等待时间,会拉低数据的传输速度。
2.1.1.2回退N帧 (GBN)
为了克服停等协议长时间等待ACK的缺陷,连续ARQ协议会连续发送一组数据包,然后再等待这些数据包的ACK。发送方和接收方都会维护一个数据帧的序列,这个序列被称作窗口,也就是滑动窗口。发送方的窗口大小由接收方确定,目的在于控制发送速度,以免接收方的缓存不够大,而导致溢出,同时控制流量也可以避免网络拥塞。协议中规定,对于窗口内未经确认的分组需要重传。
回退N步协议允许发送方在等待超时的间歇,可以继续发送分组。所有发送的分组,都带有序号。在GBN协议中,发送方需响应以下三种事件:
- 上层的调用。上层调用相应send()时,发送方首先要检查发送窗口是否已满
-
接收ACK。在该协议中,对序号为n的分组的确认采取累积确认的方式,表明接收方已正确接收到序号n以前(包括n)的所有分组
-
超时,若出现超时,发送方将重传所有已发出但还未被确认的分组
对于接收方来说,若一个序号为n的分组被正确接收,并且按序,则接收方会为该分组返回一个ACK给发送方,并将该分组中的数据交付给上层。在其他情况下,接收方都会丢弃分组。若分组n已接收并交付,那么所有序号比n小的分组也已完成了交付。因此GBN采用累积确认是一个很自然的选择。发送方在发完一个窗口里的所有分组后,会检查最大的有效确认,然后从最大有效确认的后一个分组开始重传。
流程图如下:
如上图所示,序号为2的分组丢失,因此分组2及之后的分组都将被重传。
GBN采用了序号机制,累积确认以及计时重传等机制来保证网络的可靠传输。
2.1.1.3选择重传 (SR)
虽然GBN改善了停等协议中时间等待较长的缺陷,但它依旧存在着性能问题。特别是当窗口长度很大的时候,会使效率大大降低。而SR协议通过让发送方仅重传在接收方丢失或损坏了的分组,从而避免了不必要的重传,提高了效率。
在SR协议下,发送方需响应以下三种事件:
-
从上层收到数据。当从上层收到数据后,发送方需检查下一个可用于该分组的序号。若序号在窗口中则将数据发送。
-
接收ACK。若收到ACK,且该分组在窗口内,则发送方将那个被确认的分组标记为已接收。若该分组序号等于基序号,则窗口序号向前移动到具有最小序号的未确认分组处。若窗口移动后并且有序号落在窗口内的未发送分组,则发送这些分组。
-
超时。若出现超时,发送方将重传已发出但还未确认的分组。与GBN不同的是,SR协议中的每个分组都有独立的计时器。
在SR协议下,接收方需响应以下三种事件:
-
序号在[4,7]内的分组被正确接收。该情况下,收到的分组落在接收方的窗口内,一个ACK 将发送给发送方。若该分组是以前没收到的分组,则被缓存。若该分组的序号等于基序号4,则该分组以及以前缓存的序号连续的分组都交付给上层,然后,接收窗口将向前移动。(假设接收窗口的基序号为4,分组长度也为4)
-
序号在[0,3]内的分组被正确接收。在该情况下,必须产生一个ACK,尽管该分组是接收方以前已确认过的分组。若接收方不确认该分组,发送方窗口将不能向前移动。
-
其他情况。忽略该分组对于接收方来说,若一个分组正确接收而不管其是否按序,则接收方会为该分组返回一个ACK 给发送方。失序的分组将被缓存,直到所有丢失的分组都被收到,这时才可以将一批分组按序交付给上层。
流程如图:
2.1.2RTT和RTO
-
RTO(Retransmission TimeOut)即重传超时时间
-
RTT(Round-Trip Time): 往返时延。表示从发送端发送数据开始,到发送端收到来自接收端的确认(接收端收到数据后便立即发送确认),总共经历的时延由三部分组成:链路的传播时间(propagation delay)、末端系统的处理时间、路由器缓存中的排队和处理时间(queuing delay)其中,前两个部分的值对于一个TCP连接相对固定,路由器缓存中的排队和处理时间会随着整个网络拥塞程度的变化而变化。 所以RTT的变化在一定程度上反应网络的拥塞程度
2.1.3流量控制
双方在通信的时候,发送方的速率与接收方的速率是不一定相等,如果发送方的发送速率太快,会导致接收方处理不过来,这时候接收方只能把处理不过来的数据存在缓存区里(失序的数据包也会被存放在缓存区里)接收缓存。
-
如果缓存区满了发送方还在疯狂着发送数据,接收方只能把收到的数据包丢掉,大量的丢包会极大着浪费网络资源,因此,我们需要控制发送方的发送速率,让接收方与发送方处于一种动态平衡才好。
-
对发送方发送速率的控制,称之为流量控制。
-
公平使用带宽 100M 10个 10M左右
-
接收方每次收到数据包,可以在发送确定报文的时候,同时告诉发送方自己的缓存区还剩余多少是空闲的,我们也把缓存区的剩余大小称之为接收窗口大小,用变量win来表示接收窗口的大小。
-
发送方收到之后,便会调整自己的发送速率,也就是调整自己发送窗口的大小,当发送方收到接收窗口的大小为0时,发送方就会停止发送数据,防止出现大量丢包情况的发生。
如下图:
当发送方停止发送数据后,该怎样才能知道自己可以继续发送数据?
-
当接收方处理好数据,接受窗口 win > 0 时,接收方发个通知报文去通知发送方,告诉他可以继续发送数据了。当发送方收到窗口大于0的报文时,就继续发送数据。
-
当发送方收到接受窗口 win = 0 时,这时发送方停止发送报文,并且同时开启一个定时器,每隔一段时间就发个测试报文去询问接收方,打听是否可以继续发送数据了,如果可以,接收方就告诉他此时接受窗口的大小;如果接受窗口大小还是为0,则发送方再次刷新启动定时器。
如下图:
2.1.4拥塞控制
拥塞控制和流量控制虽然采取的动作很相似,但拥塞控制与网络的拥堵情况相关联,而流量控制与接收方的缓存状态相关联。
2.2KCP是如何做的?
KCP以10%-20%带宽浪费的代价换取了比 TCP快30%-40%的传输速度,对于RTO,TCP超时计算是RTOx2,这样连续丢三次包就变成RTOx8了,十分恐怖,而KCP启动快速模式后不x2,只是x1.5(实验证明1.5这个值相对比较好),提高了传输速度,下图以RTO=100ms为例:
对于重传,TCP丢包时会全部重传从丢的那个包开始以后的数据,KCP是选择性重传,只重传真正丢失的数据包,发送端发送了1,2,3,4,5几个包,然后收到远端的ACK: 1, 3, 4, 5,当收到ACK3时,KCP知道2被跳过1次,收到ACK4时,知道2被跳过了2次,此时可以认为2号丢失,不用等超时,直接重传2号包,大大改善了丢包时的传输速度。
对于ACK,TCP为了充分利用带宽,延迟发送ACK(NODELAY-针对发送的都没用),这样超时计算会算出较大 RTT时间,延长了丢包时的判断过程。KCP的ACK是否延迟发送可以调节
对于UNA,ARQ模型响应有两种,UNA(此编号前所有包已收到,如TCP)和ACK(该编号包已收到),光用UNA将导致全部重传,光用ACK则丢失成本太高,以往协议都是二选其一,而KCP协议中,除去单独的 ACK包外,所有包都有UNA信息
最后,KCP还会不讲武德,KCP正常模式同TCP一样使用公平退让法则,即发送窗口大小由:发送缓存大小、接收端剩余接收缓存大小、丢包退让及慢启动这四要素决定。但传送及时性要求很高的小数据时,可选择通过配置跳过后两步,仅用前两项来控制发送频率。以牺牲部分公平性及带宽利用率之代价,换取了开着BT都能流畅传输的效果
3.代码示例
3.1简单demo
我们先来做一个KCP的简单demo:
server端:
#include "ikcp.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/time.h>
#include <pthread.h>
#include <errno.h>
typedef struct conn {
int fd;
struct sockaddr_in saddr;
socklen_t slen;
struct sockaddr_in caddr;
socklen_t clen;
}conn;
static inline void itimeofday(long *sec, long *usec)
{
struct timeval time;
gettimeofday(&time, NULL);
if (sec) *sec = time.tv_sec;
if (usec) *usec = time.tv_usec;
}
static inline int64_t iclock64()
{
long s, u;
int64_t value;
itimeofday(&s, &u);
value = ((int64_t)s) * 1000 + (u / 1000);
return value;
}
int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) {
conn* c = (conn*)user;
int ret = sendto(c->fd, (const void *)buf, len, 0, (struct sockaddr*)&c->caddr, c->clen);
printf("sendto client ip %s, port %d, send %s, len %d send sucees %d\n", inet_ntoa(c->caddr.sin_addr), ntohs(c->caddr.sin_port), buf, len, ret);
return ret;
}
int is_close = 0;
void* update(void* arg) {
ikcpcb *kcp = (ikcpcb*)arg;
while(!is_close) {
ikcp_update(kcp, (int32_t)iclock64());
usleep(10);
}
}
int check(int ret) {
if (ret == -1) {
if (errno != EAGAIN)
perror("recvform error!");
return 0;
}
if(ret < 24) {
return 0;
}
return 1;
}
int main() {
conn user;
int sfd = socket(PF_INET, SOCK_DGRAM, 0);
if (sfd < 0) {
return -1;
}
struct sockaddr_in s_addr;
s_addr.sin_family = PF_INET;
s_addr.sin_port = htons(9999);
s_addr.sin_addr.s_addr = INADDR_ANY;
socklen_t slen = sizeof(s_addr);
struct sockaddr_in c_addr;
c_addr.sin_family = PF_INET;
c_addr.sin_port = htons(10000);
c_addr.sin_addr.s_addr = inet_addr("192.168.126.129");
socklen_t clen = sizeof(c_addr);
int opt = 1;
if(setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &opt,sizeof(opt))){
exit(1);
}
int reuse = 1;
if(setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse))){
exit(1);
}
int rs = bind(sfd, (struct sockaddr*)&s_addr, sizeof(struct sockaddr));
if(rs < 0) {
close(sfd);
perror("chid bind");
return -1;
}
user.saddr = s_addr;
user.fd = sfd;
user.slen = slen;
user.caddr = c_addr;
user.clen = clen;
ikcpcb *kcp = ikcp_create(0x1234, &user);
kcp->output = udp_output; //设置发送函数
// 配置窗口大小:平均延迟200ms,每20ms发送一个包,
// 而考虑到丢包重发,设置最大收发窗口为128
ikcp_wndsize(kcp, 128, 128);
// 第二个参数 nodelay-启用以后若干常规加速将启动
// 第三个参数 interval为内部处理时钟,默认设置为 10ms
// 第四个参数 resend为快速重传指标,
// 第五个参数 为是否禁用常规流控,
ikcp_nodelay(kcp, 0, 10, 0, 0);
pthread_t p = 0;
pthread_create(&p, NULL, update, (void*)kcp);
char buf1[4096] = {0};
char buf2[4096] = {0};
while(1) {
memset(buf1, 0, 4096);
struct sockaddr_in tmp;
socklen_t ll = sizeof(tmp);
int len = recvfrom(sfd, buf1, 4096, 0, (struct sockaddr*)&tmp, &ll);
if(len < 0) {
perror("recvfrom");
continue;
}
if(!check(len)) {
printf("check fail\n");
continue;
}
printf("recvfrom %d\n", len);
//这里是会有问题,多个client会出问题
int ret = ikcp_input(kcp, buf1, len);
if(0 != ret) {
//出错,不是kcp数据包
printf("ikcp_input error\n");
break;
}
//memset(buf2, 0, 2048);
len = ikcp_recv(kcp, buf2, 4096);
if(len <= 0) {
continue;
}
buf2[len] = '\0';
printf("recv: %s, len = %d, buf1 %s\n", buf2, len, buf1);
len = ikcp_send(kcp, buf2, len);
}
is_close = 1;
pthread_join(p, NULL);
close(sfd);
return 0;
}
client端:
#include "ikcp.h"
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
typedef struct conn {
int fd;
struct sockaddr_in saddr;
socklen_t slen;
struct sockaddr_in caddr;
socklen_t clen;
}conn;
static inline void itimeofday(long *sec, long *usec)
{
struct timeval time;
gettimeofday(&time, NULL);
if (sec) *sec = time.tv_sec;
if (usec) *usec = time.tv_usec;
}
static inline int64_t iclock64()
{
long s, u;
int64_t value;
itimeofday(&s, &u);
value = ((int64_t)s) * 1000 + (u / 1000);
return value;
}
int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) {
conn* c = (conn*)user;
int ret = sendto(c->fd, (const void *)buf, len, 0, (struct sockaddr*)&c->saddr, c->slen);
printf("send server ip %s, port %d, send %s, len %d, send sucees %d\n", inet_ntoa(c->saddr.sin_addr), ntohs(c->saddr.sin_port), buf, len, ret);
return ret;
}
int num = 0;
void* update(void* arg) {
ikcpcb *kcp = (ikcpcb*)arg;
while(!num) {
ikcp_update(kcp, (int32_t)iclock64());
usleep(10);
}
}
int main() {
conn user;
int socketFd;
struct sockaddr_in peer_Addr;
peer_Addr.sin_family = PF_INET;
peer_Addr.sin_port = htons(9999);
peer_Addr.sin_addr.s_addr = inet_addr("192.168.126.128");
struct sockaddr_in self_Addr;
self_Addr.sin_family = PF_INET;
self_Addr.sin_port = htons(10000);
self_Addr.sin_addr.s_addr = inet_addr("0.0.0.0");
if ((socketFd = socket(PF_INET, SOCK_DGRAM| SOCK_CLOEXEC, IPPROTO_UDP)) == -1) {
perror("child socket");
exit(1);
}
int opt = 1;
if(setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &opt,sizeof(opt))){
exit(1);
}
if(setsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))){
exit(1);
}
if (bind(socketFd, (struct sockaddr *) &self_Addr, sizeof(struct sockaddr))){
perror("chid bind");
exit(1);
}
user.fd = socketFd;
//user.saddr = peer_Addr;
printf("server ip %s, port %d\n", inet_ntoa(peer_Addr.sin_addr), ntohs(peer_Addr.sin_port));
memcpy(&user.saddr, &peer_Addr, sizeof(struct sockaddr_in));
user.slen = sizeof(peer_Addr);
//user.caddr = self_Addr;
memcpy(&user.caddr, &self_Addr, sizeof(struct sockaddr_in));
user.clen = sizeof(self_Addr);
ikcpcb *kcp = ikcp_create(0x1234, &user);
kcp->output = udp_output; //设置发送函数
// 配置窗口大小:平均延迟200ms,每20ms发送一个包,
// 而考虑到丢包重发,设置最大收发窗口为128
ikcp_wndsize(kcp, 128, 128);
// 第二个参数 nodelay-启用以后若干常规加速将启动
// 第三个参数 interval为内部处理时钟,默认设置为 10ms
// 第四个参数 resend为快速重传指标,
// 第五个参数 为是否禁用常规流控,
ikcp_nodelay(kcp, 0, 10, 0, 0);
pthread_t p = 0;
pthread_create(&p, NULL, update, (void*)kcp);
char buffer[2050] = {0};
char rbuffer[2048] = {0};
while(1) {
getchar();
//一定要发1400以下,不然分包server处理不了
memset(buffer, 'a', 1000);
ikcp_send(kcp, buffer, 1000);
user.slen = sizeof(user.saddr);
// 接收服务器返回的数据
int nread = recvfrom(socketFd, rbuffer, 2048, 0, (struct sockaddr*)&user.saddr, &user.slen);
if (nread == -1) {
perror("recvfrom");
continue;
}
int ret = ikcp_input(kcp, rbuffer, nread);
if(0 != ret) {
//出错,不是kcp数据包
break;
}
memset(buffer, 0, 2048);
ikcp_recv(kcp, buffer, 2048);
printf("server : %s, len %ld\n", buffer, strlen(buffer));
}
num = 1;
pthread_join(p, NULL);
close(socketFd);
return 0;
}
接下来看看运行结果:
3.2优化UDP服务器
我们的代码基于UDP并发编程 的基础上做优化,以KCP来保证UDP server数据传输的可靠性,接下来先来说说简单思路。
首先,我们对我们UDP server用于监听的fd的conv(ikcp_create函数的第一个参数,他必须是唯一的)做定义,我们这里取1;然后UDP客户端连接先用1来连接,我们先约定好。
之后,对每条连接从新创建一个kcp块,它的conv由server端分配。
OK,思路理清楚后我们上代码。
server端代码如下:
#include "ikcp.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/time.h>
#define HELLO "hello"
typedef int (*rcallback)(int sd);
typedef int (*wcallback)(int sd);
typedef int (*acallback)(int sd);
typedef struct conn {
int fd;
union {
rcallback _rcall;
acallback _acall;
} rcall;
wcallback wcall;
char rbuffer[1024];
char wbuffer[1024];
struct sockaddr_in saddr;
struct sockaddr_in caddr;
socklen_t slen;
socklen_t clen;
int rlen;
int wlen;
ikcpcb* kcp;
} conn;
conn conn_list[2048] = {0};
int epfd = 0;
int connect_port = 10000;
int conv = 0x1234;
int is_close = 0;
int index_pid = 0;
ikcpcb* arr[10] = {0};
static inline void itimeofday(long *sec, long *usec)
{
struct timeval time;
gettimeofday(&time, NULL);
if (sec) *sec = time.tv_sec;
if (usec) *usec = time.tv_usec;
}
static inline int64_t iclock64()
{
long s, u;
int64_t value;
itimeofday(&s, &u);
value = ((int64_t)s) * 1000 + (u / 1000);
return value;
}
void* update(void* arg) {
while(!is_close) {
for(int i = 0; i < 10; ++i) {
ikcpcb* tmp = arr[i];
if(tmp) {
ikcp_update(tmp, (int32_t)iclock64());
}
}
usleep(10);
}
}
void mod_event(int fd, int event) {
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
int handle_kcp_data(ikcpcb* kcp, char* data, int dlen, char* buf, int blen) {
int ret = ikcp_input(kcp, data, dlen);
if(ret != 0) {
return -1;
}
ret = ikcp_recv(kcp, buf, blen);
return ret;
}
int read_callback(int sd) {
conn* c = &conn_list[sd];
c->rlen = recvfrom(sd, c->rbuffer, 1024, 0, NULL, NULL);
//将读到的数据直接放进写缓冲区
if((c->wlen = handle_kcp_data(c->kcp, c->rbuffer, c->rlen, c->wbuffer, 1024)) < 0) {
return -1;
}
mod_event(sd, EPOLLOUT);
return c->rlen;
}
int write_callback(int sd) {
conn* c = &conn_list[sd];
ikcp_send(c->kcp, c->wbuffer, c->wlen);
mod_event(sd, EPOLLIN);
return c->wlen;
}
int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) {
conn* c = (conn*)user;
int ret = sendto(c->fd, (const void *)buf, len, 0, (struct sockaddr*)&c->caddr, c->clen);
printf("sendto client ip %s, port %d, send %s, len %d send sucees %d, fd %d\n", inet_ntoa(c->caddr.sin_addr), ntohs(c->caddr.sin_port), buf, len, ret, c->fd);
return ret;
}
int udp_accept(int sd)
{
int new_fd = -1;
int ret = 0;
int reuse = 1;
char buf[128] = {0};
struct sockaddr_in peer_addr;
socklen_t cli_len = sizeof(peer_addr);
//1.拿到本地信息,然后改一个端口,于client建立另外一条连接
struct sockaddr_in my_addr = conn_list[sd].saddr;
my_addr.sin_port = htons(connect_port++);
//2.接受client的握手数据
ret = recvfrom(sd, buf, 128, 0, (struct sockaddr *)&peer_addr, &cli_len);
if (ret < 0) {
return -1;
}
// memcmp(&conn_list[sd].caddr, &peer_addr, cli_len);
conn_list[sd].caddr.sin_addr.s_addr = peer_addr.sin_addr.s_addr;
conn_list[sd].caddr.sin_port = peer_addr.sin_port;
conn_list[sd].clen = cli_len;
char data[128] = {0};
if(handle_kcp_data(conn_list[sd].kcp, buf, ret, data, 128) < 0) {
return -1;
}
//3.创建一个新的fd
if ((new_fd = socket(PF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0)) == -1) {
return -1;
}
int opt = 1;
if(setsockopt(new_fd, SOL_SOCKET, SO_REUSEADDR, &opt,sizeof(opt))){
exit(1);
}
if(setsockopt(new_fd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse))){
exit(1);
}
//4.与新端口绑定
ret = bind(new_fd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr));
if (ret){
close(new_fd);
return -1;
}
//5.连接,设置默认目标地址
peer_addr.sin_family = PF_INET;
if (connect(new_fd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) {
close(new_fd);
return -1;
}
//我们是单线程,暂时先这么写
memset(conn_list[sd].wbuffer, 0, 1024);
sprintf(conn_list[sd].wbuffer, "%d", conv);
//6.回应client,让对端知道我们的新端口
//char buffer[16] = "hello client";
//sendto(new_fd, buffer, strlen(buffer), 0, (struct sockaddr*)&peer_addr, sizeof(struct sockaddr_in));
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = new_fd;
//7.加入epoll
if(epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &ev) < 0) {
close(new_fd);
return -1;
}
conn_list[new_fd].fd = new_fd;
conn_list[new_fd].rcall._acall = read_callback;
conn_list[new_fd].wcall = write_callback;
conn_list[new_fd].saddr = my_addr;
conn_list[new_fd].slen = sizeof(my_addr);
conn_list[new_fd].caddr = peer_addr;
conn_list[new_fd].clen = sizeof(peer_addr);
conn_list[new_fd].kcp = ikcp_create(conv, &conn_list[new_fd]);
conn_list[new_fd].kcp->output = udp_output;
++conv;
arr[index_pid++] = conn_list[new_fd].kcp;
memcmp(&conn_list[new_fd].caddr, &peer_addr, cli_len);
conn_list[new_fd].clen = cli_len;
ikcp_send(conn_list[sd].kcp, conn_list[sd].wbuffer, sizeof(conv));
ikcp_send(conn_list[new_fd].kcp, HELLO, strlen(HELLO));
return new_fd;
}
int main() {
int sfd = socket(PF_INET, SOCK_DGRAM, 0);
if (sfd < 0) {
return -1;
}
struct sockaddr_in s_addr;
s_addr.sin_family = PF_INET;
s_addr.sin_port = htons(9999);
s_addr.sin_addr.s_addr = INADDR_ANY;
socklen_t slen = sizeof(s_addr);
if(bind(sfd, (struct sockaddr*)&s_addr, sizeof(struct sockaddr)) < 0) {
close(sfd);
return -1;
}
epfd = epoll_create(1);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sfd;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, sfd, &ev) < 0) {
close(sfd);
close(epfd);
return -1;
}
conn_list[sfd].fd = sfd;
conn_list[sfd].rcall._acall = udp_accept;
conn_list[sfd].wcall = NULL;
conn_list[sfd].saddr = s_addr;
conn_list[sfd].slen = sizeof(s_addr);
conn_list[sfd].kcp = ikcp_create(0x1, &conn_list[sfd]);
conn_list[sfd].kcp->output = udp_output;
arr[index_pid++] = conn_list[sfd].kcp;
pthread_t pid = 0;
pthread_create(&pid, NULL, update, NULL);
printf("udp_server start...\n");
while(1) {
struct epoll_event evs[128] = {0};
int nready = epoll_wait(epfd, evs, 128, -1);
if(nready > 0) {
for(int i = 0; i < nready; ++i) {
int sd = evs[i].data.fd;
if(evs[i].events & EPOLLIN) {
if(conn_list[sd].rcall._rcall) {
conn_list[sd].rcall._rcall(sd);
}
}
if(evs[i].events & EPOLLOUT) {
if(conn_list[sd].wcall) {
conn_list[sd].wcall(sd);
}
}
}
}
}
is_close = 1;
pthread_join(pid, NULL);
close(sfd);
close(epfd);
return 0;
}
client端代码如下:
#include "ikcp.h"
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <errno.h>
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <strings.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#define SO_REUSEPORT 15
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
typedef struct conn {
struct sockaddr_in s_addr;
int fd;
}conn;
static inline void itimeofday(long *sec, long *usec)
{
struct timeval time;
gettimeofday(&time, NULL);
if (sec) *sec = time.tv_sec;
if (usec) *usec = time.tv_usec;
}
static inline int64_t iclock64()
{
long s, u;
int64_t value;
itimeofday(&s, &u);
value = ((int64_t)s) * 1000 + (u / 1000);
return value;
}
int num = 0;
void* update(void* arg) {
ikcpcb *kcp = (ikcpcb*)arg;
while(!num) {
if(kcp) {
ikcp_update(kcp, (int32_t)iclock64());
}
usleep(10);
}
}
int handle_kcp_data(ikcpcb* kcp, char* data, int dlen, char* buf, int blen) {
int ret = ikcp_input(kcp, data, dlen);
if(ret != 0) {
return -1;
}
ret = ikcp_recv(kcp, buf, blen);
return ret;
}
int udp_output(const char *buf, int len, ikcpcb *kcp, void *user) {
conn* c = (conn*)user;
int ret = sendto(c->fd, (const void *)buf, len, 0, (struct sockaddr*)&c->s_addr, sizeof(struct sockaddr_in));
printf("send server ip %s, port %d, send %s, len %d, send sucees %d\n", inet_ntoa(c->s_addr.sin_addr), ntohs(c->s_addr.sin_port), buf, len, ret);
return ret;
}
void createClient(int id,int myPort,int peerPort){
int socketFd;
struct sockaddr_in peer_Addr;
peer_Addr.sin_family = PF_INET;
peer_Addr.sin_port = htons(9999);
peer_Addr.sin_addr.s_addr = inet_addr("192.168.126.128");
struct sockaddr_in self_Addr;
self_Addr.sin_family = PF_INET;
self_Addr.sin_port = htons(10000);
self_Addr.sin_addr.s_addr = inet_addr("0.0.0.0");
if ((socketFd = socket(PF_INET, SOCK_DGRAM| SOCK_CLOEXEC, 0)) == -1) {
perror("child socket");
exit(1);
}
int opt = 1;
if(setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &opt,sizeof(opt))){
exit(1);
}
if(setsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt))){
exit(1);
}
if (bind(socketFd, (struct sockaddr *) &self_Addr, sizeof(struct sockaddr))){
perror("chid bind");
exit(1);
}
conn c1;
c1.s_addr = peer_Addr;
c1.fd = socketFd;
ikcpcb* kcp1 = ikcp_create(0x1, &c1);
pthread_t p1 = 0;
kcp1->output = udp_output;
pthread_create(&p1, NULL, update, (void*)kcp1);
char buffer[1024] = {0};
memset(buffer, 0, 1024);
sprintf(buffer, "hello server %d", 0);
ikcp_send(kcp1, buffer, strlen(buffer));
//sendto(socketFd, buffer, strlen(buffer), 0, (struct sockaddr *) &peer_Addr, sizeof(struct sockaddr_in));
bzero(&peer_Addr, sizeof(peer_Addr));
char rbuffer[1024] = {0};
socklen_t slen = sizeof(peer_Addr);
char data[1024] = {0};
// 接收服务器返回的数据
int nread = recvfrom(socketFd, rbuffer, 1024, 0, (struct sockaddr*)&peer_Addr, &slen);
if (nread == -1) {
close(socketFd);
}
if(handle_kcp_data(kcp1, rbuffer, nread, data, 1024) < 0) {
exit(1);
}
conn c2;
c2.s_addr = peer_Addr;
c2.fd = socketFd;
int conv = atoi(data);
ikcpcb* kcp2 = ikcp_create(conv, &c2);
pthread_t p2 = 0;
kcp2->output = udp_output;
pthread_create(&p2, NULL, update, (void*)kcp2);
memset(rbuffer, 0, 1024);
nread = recvfrom(socketFd, rbuffer, 1024, 0, (struct sockaddr*)&peer_Addr, &slen);
if(handle_kcp_data(kcp2, rbuffer, nread, data, 1024) < 0) {
exit(1);
}
c2.s_addr.sin_addr.s_addr = peer_Addr.sin_addr.s_addr;
c2.s_addr.sin_port = peer_Addr.sin_port;
usleep(1000);
peer_Addr.sin_family = PF_INET;
if(connect(socketFd, (struct sockaddr *) &peer_Addr, sizeof(struct sockaddr)) == -1) {
perror("chid connect");
exit(1);
}
memset(buffer, 0, 1024);
memset(rbuffer, 0, 1024);
memset(data, 0, 1024);
strcpy(buffer, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
//sendto(socketFd, buffer, strlen(buffer), 0, (struct sockaddr *) &peer_Addr, sizeof(struct sockaddr_in));
ikcp_send(kcp2, buffer, strlen(buffer));
nread = recvfrom(socketFd, rbuffer, 1024, 0, (struct sockaddr*)&peer_Addr, &slen);
if(handle_kcp_data(kcp2, rbuffer, nread, data, 1024) < 0) {
exit(1);
}
printf("server echo: %s\n", data);
ikcp_release(kcp1);
ikcp_release(kcp2);
num = 1;
pthread_join(p1, NULL);
pthread_join(p2, NULL);
}
void serial(int clinetNum){
for(int i=1;i<=clinetNum;i++){
createClient(i,30000+i,9999);
}
}
int main(int argc, char * argv[])
{
createClient(0 ,30000,9999);
return 0;
}
编译运行client代码:
成功发回我们的数据,并且可以看到第二条send server就是ACK包。
学习参考:
标签:struct,int,kcp,详解,fd,KCP,conn,addr From: https://blog.csdn.net/weixin_55951019/article/details/143450326