1. 简介
- 使用堆空间来保存发送缓冲区
- 发送缓冲区中的数据按块进行管理,读写数据时,数据块是基本的读写单元
- 使用一个循环链表来管理发送缓冲区中的数据块
- 发送缓冲区可以动态扩容,并且受到UDT流量控制机制的限制,避免了发送缓冲区无限扩容的bug
2. 发送缓冲区源码分析
- 相关文件:
buffer.h
buffer.cpp
- 发送缓冲区管理类:
class CSndBuffer
2.1 使用堆空间来存储待发送的数据
- 所有待发送的数据都缓存在堆空间中,使用一个单向链表来维护,数据结构如下
struct Buffer { char* m_pcData; // buffer int m_iSize; // size,链表中有多少个节点 Buffer* m_pNext; // next buffer } *m_pBuffer; // physical buffer
- 在
class SndBuffer
的构造函数中申请堆空间;注意:每个节点都申请了m_iSize * m_iMSS
大小的堆空间,可以理解为堆内存按照数据块进行分割,每个数据块的大小都是m_iMSS
,每个链表中有m_iSize
个数据块m_pBuffer = new Buffer; m_pBuffer->m_pcData = new char [m_iSize * m_iMSS]; m_pBuffer->m_iSize = m_iSize; m_pBuffer->m_pNext = NULL;
2.2 使用一个循环链表来管理堆空间
- 既然所有待发送的数据都存储在堆空间中,那么堆空间是如何管理的呢?
- 前文已经提到:堆内存中按照数据块进行分割,每个数据块的大小都是
m_iMSS
- 由于数据块的大小都是固定大小
m_iMSS
,这样只要确定了堆内存的首地址,就可以通过偏移,得到每个数据块的地址 - 既然一次性申请了
m_iSize
个数据块,当然就要记录有多少个数据块已经被使用了,m_iCount
就是用来实现这个功能的变量
- 前文已经提到:堆内存中按照数据块进行分割,每个数据块的大小都是
- 数据块定义
struct Block { // 指向相应堆空间的指针 char* m_pcData; // 数据块中有效数据的大小,有些数据可能并不会占用一个完整的数据块 int m_iLength; // 消息编号,用来区分是不是一个完整的消息和是否需要按序发送 int32_t m_iMsgNo; // 数据被放入发送缓冲区时的时间戳 uint64_t m_OriginTime; // 数据生存时间,ms int m_iTTL; // time to live (milliseconds) // 指向下一个数据块的指针 Block* m_pNext; // next block } *m_pBlock, *m_pFirstBlock, *m_pCurrBlock, *m_pLastBlock;
- 使用一个循环链表
m_pBlock
来维护这些数据块,CSndBuffer
的构造函数中,循环链表初始化如下// 循环链表,链表中的每个节点都是一个数据块 m_pBlock = new Block; Block* pb = m_pBlock; for (int i = 1; i < m_iSize; ++ i) { pb->m_pNext = new Block; pb->m_iMsgNo = 0; pb = pb->m_pNext; } pb->m_pNext = m_pBlock; pb = m_pBlock; // 循环链表中的指针,指向真实的堆内存 char* pc = m_pBuffer->m_pcData; for (int i = 0; i < m_iSize; ++ i) { pb->m_pcData = pc; pb = pb->m_pNext; pc += m_iMSS; // 指针偏移到下一个数据块 } // 初始化数据块指针都指向申请的堆内存起始位置 m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
2.3 发送缓冲区动态扩容
- 发送缓冲区满时,需要动态扩容;
- 扩容的策略就是再申请一段
m_iSize * m_iMSS
大小的堆空间,将其纳入m_pBlock
的管理中 - 发送缓冲区的扩容策略看起来能够无限扩容,直至达到进程堆空间的上限;实际上,UDT还有一套类似TCP滑动窗口的流量控制机制,该机制限制了发送端的发送速率,避免了发送缓冲区无限扩容的bug;稍后会详细介绍UDT的流量控制机制
- 动态扩容源码,精简版,删除了异常与错误处理,只关注逻辑
void CSndBuffer::increase() { int unitsize = m_pBuffer->m_iSize; // 申请堆空间,大小 m_iSize * m_iMSS Buffer* nbuf = new Buffer; nbuf->m_pcData = new char [unitsize * m_iMSS]; nbuf->m_iSize = unitsize; nbuf->m_pNext = NULL; // 将新申请的堆空间添加到m_pBuffer链表尾 Buffer* p = m_pBuffer; while (NULL != p->m_pNext) p = p->m_pNext; p->m_pNext = nbuf; // 将堆空间纳入m_pBlock循环链表的管理中 Block* nblk = new Block; Block* pb = nblk; for (int i = 1; i < unitsize; ++ i) { pb->m_pNext = new Block; pb = pb->m_pNext; } pb->m_pNext = m_pLastBlock->m_pNext; m_pLastBlock->m_pNext = nblk; pb = nblk; char* pc = nbuf->m_pcData; for (int i = 0; i < unitsize; ++ i) { pb->m_pcData = pc; pb = pb->m_pNext; pc += m_iMSS; } // 更新发送缓冲区大小 m_iSize += unitsize; }
2.4 向发送缓冲区中添加数据
- 当发送缓冲区满时,按前文中的逻辑动态扩容
- 使用一个
int32_t m_iMsgNo
来表示消息编号bit[29]
表示数据是否需要按序发送bit[31:30]
表示一个完整消息的起始和结尾,可以用来进行消息的分片和重组
- 向发送缓冲区中添加数据的源码
void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order) { // 计算要插入的数据需要占用多少数据块 int size = len / m_iMSS; if ((len % m_iMSS) != 0) size ++; // dynamically increase sender buffer // 动态增大发送缓冲区 while (size + m_iCount >= m_iSize) increase(); uint64_t time = CTimer::getTime(); // 是否需要按序发送 int32_t inorder = order; inorder <<= 29; // 指向最后一个数据块 Block* s = m_pLastBlock; // 将数据插入到数据块中 for (int i = 0; i < size; ++ i) { // 待插入的数据长度 int pktlen = len - i * m_iMSS; if (pktlen > m_iMSS) pktlen = m_iMSS; // 将数据拷贝到数据块中 memcpy(s->m_pcData, data + i * m_iMSS, pktlen); // 数据块中有效数据的大小,有些数据可能并不会占用一个完整的数据块 s->m_iLength = pktlen; // m_iMsgNo的bit[29]表示是否需要按序发送, m_iNextMsgNo在构造是被初始化为0 s->m_iMsgNo = m_iNextMsgNo | inorder; // 下面这个逻辑用来进行data重组 // data起始:data占用的首个数据块, m_iMsgNo的bit[31]为1 if (i == 0) s->m_iMsgNo |= 0x80000000; // data结束:data占用的最后一个数据块,m_iMsgNo的bit[30]为1 if (i == size - 1) s->m_iMsgNo |= 0x40000000; // 数据被放入发送缓冲区时的时间戳 s->m_OriginTime = time; s->m_iTTL = ttl; // 下一个数据块 s = s->m_pNext; } // 更新最后一个数据块指针 m_pLastBlock = s; // 更新发送缓冲区中已被占用的数据块数量 CGuard::enterCS(m_BufLock); m_iCount += size; CGuard::leaveCS(m_BufLock); // 更新消息编号,超出后回绕至1 m_iNextMsgNo ++; if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) m_iNextMsgNo = 1; }
2.5 从发送缓冲区中获取数据
- 当发送缓冲区中无数据时,返回0
- 按数据块进行读取,也就是说每次不一定会读取一个完整的消息
- 也可以指定偏移量从发送缓冲区中读数据,在此过程中,如果数据在发送缓冲区中的时间已经超过了
TTL
,则将其从发送缓冲区中删除 - 简单地从发送缓冲区中读数据,无论数据是否过期,即超过了
TTL
int CSndBuffer::readData(char** data, int32_t& msgno) { // 发送缓冲区中无数据,return 0 if (m_pCurrBlock == m_pLastBlock) return 0; // 读一个数据块 *data = m_pCurrBlock->m_pcData; int readlen = m_pCurrBlock->m_iLength; msgno = m_pCurrBlock->m_iMsgNo; // 更新数据块指针 m_pCurrBlock = m_pCurrBlock->m_pNext; return readlen; }
- 按偏移量从发送缓冲区中读数据,并且丢弃超时未发送的数据
int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen) { CGuard bufferguard(m_BufLock); Block* p = m_pFirstBlock; // 偏移,按数据块进行偏移 for (int i = 0; i < offset; ++ i) p = p->m_pNext; // 数据是否过期,如果当前时间-数据产生的时间大于数据的TTL,则认为数据过期; // 删除数据并返回-1,表示读取失败 if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t)p->m_iTTL)) { // 获取消息号 msgno = p->m_iMsgNo & 0x1FFFFFFF; msglen = 1; p = p->m_pNext; bool move = false; // 移除所有消息号相同的数据块 while (msgno == (p->m_iMsgNo & 0x1FFFFFFF)) { if (p == m_pCurrBlock) move = true; p = p->m_pNext; if (move) m_pCurrBlock = p; msglen ++; } return -1; } // 正常读取数据,读一个数据块 *data = p->m_pcData; int readlen = p->m_iLength; msgno = p->m_iMsgNo; return readlen; }
2.6 丢弃已被确认的数据
- 需要丢弃已经被对端确认的数据
void CSndBuffer::ackData(int offset) { CGuard bufferguard(m_BufLock); // 丢弃已经被对端确认的数据 for (int i = 0; i < offset; ++ i) m_pFirstBlock = m_pFirstBlock->m_pNext; // 更新发送缓冲区中已占用的数据块数量 m_iCount -= offset; // 触发事件,通知发送缓冲区已更新;由UDT epoll来处理,后续再详细介绍 CTimer::triggerEvent(); }
3. 总结
- UDT实现了一套类似TCP的发送缓冲区机制
- 发送缓冲区可动态扩容,但是由于收到UDT流量控制机制的影响,发送端的发送速率收到了限制,这样也避免了发送缓冲区无限扩容的bug
- 调用相关API发送数据时,数据首先转存到发送缓冲区中,等数据达到调度时间后,将数据从发送缓冲区中取出,调用系统API
sendto()/sendmsg()
发送到对端,后文再详细介绍数据的发送过程 - 发送缓冲区中的数据按块进行管理,每个数据块的大小固定为
m_iMSS
,数据块中的数据可以少于m_iMSS
,此时仍占用一个完整的数据块 - 所有的数据块使用一个循环链表来维护
- 数据块是向发送缓冲区中读写数据的基本单元,无法实现按字节进行读取
- 一个完整的消息可能会占用多个数据块,使用消息编号
int32_t m_iMsgNo
来区分不同的消息;既然一个消息可能占用多个数据块,而消息的发送/接收又是按块进行的,那么必然需要一种分片重组的机制;m_iMsgNo的bit[31:30]
就用来表示一个消息的头和尾,在循环链表中找到消息的头和尾,就能够还原出一条完整的消息 - 将数据放入发送缓冲区时,同时指定了数据的
TTL
,即数据的生存时长,超时未发送的数据将被丢弃