首页 > 其他分享 >UDT(三):T发送缓冲区管理

UDT(三):T发送缓冲区管理

时间:2024-10-28 11:31:43浏览次数:3  
标签:iMSS UDT 管理 发送缓冲区 pb int 数据 pNext

1. 简介

  • 使用堆空间来保存发送缓冲区
  • 发送缓冲区中的数据按块进行管理,读写数据时,数据块是基本的读写单元
  • 使用一个循环链表来管理发送缓冲区中的数据块
  • 发送缓冲区可以动态扩容,并且受到UDT流量控制机制的限制,避免了发送缓冲区无限扩容的bug

2. 发送缓冲区源码分析

  • 相关文件:buffer.h buffer.cpp
  • 发送缓冲区管理类:class CSndBuffer

2.1 使用堆空间来存储待发送的数据

  • 所有待发送的数据都缓存在堆空间中,使用一个单向链表来维护,数据结构如下
    struct Buffer
    {
        char* m_pcData;			// buffer
        int m_iSize;			// size,链表中有多少个节点
        Buffer* m_pNext;			// next buffer
    } *m_pBuffer;			// physical buffer
    
  • class SndBuffer的构造函数中申请堆空间;注意:每个节点都申请了m_iSize * m_iMSS大小的堆空间,可以理解为堆内存按照数据块进行分割,每个数据块的大小都是m_iMSS,每个链表中有m_iSize个数据块
    m_pBuffer = new Buffer;
    m_pBuffer->m_pcData = new char [m_iSize * m_iMSS];
    m_pBuffer->m_iSize = m_iSize;
    m_pBuffer->m_pNext = NULL;
    

2.2 使用一个循环链表来管理堆空间

  • 既然所有待发送的数据都存储在堆空间中,那么堆空间是如何管理的呢?
    • 前文已经提到:堆内存中按照数据块进行分割,每个数据块的大小都是m_iMSS
    • 由于数据块的大小都是固定大小m_iMSS,这样只要确定了堆内存的首地址,就可以通过偏移,得到每个数据块的地址
    • 既然一次性申请了m_iSize个数据块,当然就要记录有多少个数据块已经被使用了,m_iCount就是用来实现这个功能的变量
  • 数据块定义
    struct Block
    {
        // 指向相应堆空间的指针
        char* m_pcData;
        // 数据块中有效数据的大小,有些数据可能并不会占用一个完整的数据块
        int m_iLength;
        // 消息编号,用来区分是不是一个完整的消息和是否需要按序发送
        int32_t m_iMsgNo;
        // 数据被放入发送缓冲区时的时间戳
        uint64_t m_OriginTime;
        // 数据生存时间,ms
        int m_iTTL;                       // time to live (milliseconds)
        // 指向下一个数据块的指针
        Block* m_pNext;                   // next block
    } *m_pBlock, *m_pFirstBlock, *m_pCurrBlock, *m_pLastBlock;
    
  • 使用一个循环链表m_pBlock来维护这些数据块,CSndBuffer的构造函数中,循环链表初始化如下
    // 循环链表,链表中的每个节点都是一个数据块
    m_pBlock = new Block;
    Block* pb = m_pBlock;
    for (int i = 1; i < m_iSize; ++ i)
    {
        pb->m_pNext = new Block;
        pb->m_iMsgNo = 0;
        pb = pb->m_pNext;
    }
    pb->m_pNext = m_pBlock;
    
    pb = m_pBlock;
    
    // 循环链表中的指针,指向真实的堆内存
    char* pc = m_pBuffer->m_pcData;
    for (int i = 0; i < m_iSize; ++ i)
    {
        pb->m_pcData = pc;
        pb = pb->m_pNext;
        pc += m_iMSS;     // 指针偏移到下一个数据块
    }
    
    // 初始化数据块指针都指向申请的堆内存起始位置
    m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;
    

2.3 发送缓冲区动态扩容

  • 发送缓冲区满时,需要动态扩容;
  • 扩容的策略就是再申请一段m_iSize * m_iMSS大小的堆空间,将其纳入m_pBlock的管理中
  • 发送缓冲区的扩容策略看起来能够无限扩容,直至达到进程堆空间的上限;实际上,UDT还有一套类似TCP滑动窗口的流量控制机制,该机制限制了发送端的发送速率,避免了发送缓冲区无限扩容的bug;稍后会详细介绍UDT的流量控制机制
  • 动态扩容源码,精简版,删除了异常与错误处理,只关注逻辑
    void CSndBuffer::increase()
    {
        int unitsize = m_pBuffer->m_iSize;
    
        // 申请堆空间,大小 m_iSize * m_iMSS
        Buffer* nbuf = new Buffer;
        nbuf->m_pcData = new char [unitsize * m_iMSS];
        nbuf->m_iSize = unitsize;
        nbuf->m_pNext = NULL;
    
        // 将新申请的堆空间添加到m_pBuffer链表尾
        Buffer* p = m_pBuffer;
        while (NULL != p->m_pNext)
        p = p->m_pNext;
        p->m_pNext = nbuf;
    
        // 将堆空间纳入m_pBlock循环链表的管理中
        Block* nblk = new Block;
        Block* pb = nblk;
        for (int i = 1; i < unitsize; ++ i)
        {
            pb->m_pNext = new Block;
            pb = pb->m_pNext;
        }
        pb->m_pNext = m_pLastBlock->m_pNext;
        m_pLastBlock->m_pNext = nblk;
        pb = nblk;
        char* pc = nbuf->m_pcData;
        for (int i = 0; i < unitsize; ++ i)
        {
            pb->m_pcData = pc;
            pb = pb->m_pNext;
            pc += m_iMSS;
        }
    
        // 更新发送缓冲区大小
        m_iSize += unitsize;
    }
    

2.4 向发送缓冲区中添加数据

  • 当发送缓冲区满时,按前文中的逻辑动态扩容
  • 使用一个int32_t m_iMsgNo来表示消息编号
    • bit[29]表示数据是否需要按序发送
    • bit[31:30]表示一个完整消息的起始和结尾,可以用来进行消息的分片和重组
  • 向发送缓冲区中添加数据的源码
    void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order)
    {
       // 计算要插入的数据需要占用多少数据块
       int size = len / m_iMSS;
       if ((len % m_iMSS) != 0)
          size ++;
    
       // dynamically increase sender buffer
       // 动态增大发送缓冲区
       while (size + m_iCount >= m_iSize)
          increase();
    
       uint64_t time = CTimer::getTime();
       // 是否需要按序发送
       int32_t inorder = order;
       inorder <<= 29;
    
       // 指向最后一个数据块
       Block* s = m_pLastBlock;
       // 将数据插入到数据块中
       for (int i = 0; i < size; ++ i)
       {
          // 待插入的数据长度
          int pktlen = len - i * m_iMSS;
          if (pktlen > m_iMSS)
             pktlen = m_iMSS;
          // 将数据拷贝到数据块中
          memcpy(s->m_pcData, data + i * m_iMSS, pktlen);
          // 数据块中有效数据的大小,有些数据可能并不会占用一个完整的数据块
          s->m_iLength = pktlen;
    
          // m_iMsgNo的bit[29]表示是否需要按序发送, m_iNextMsgNo在构造是被初始化为0
          s->m_iMsgNo = m_iNextMsgNo | inorder;
          
          // 下面这个逻辑用来进行data重组
          // data起始:data占用的首个数据块, m_iMsgNo的bit[31]为1
          if (i == 0)
             s->m_iMsgNo |= 0x80000000;
          // data结束:data占用的最后一个数据块,m_iMsgNo的bit[30]为1
          if (i == size - 1)
             s->m_iMsgNo |= 0x40000000;
    
          // 数据被放入发送缓冲区时的时间戳
          s->m_OriginTime = time;
          s->m_iTTL = ttl;
    
          // 下一个数据块
          s = s->m_pNext;
       }
       // 更新最后一个数据块指针
       m_pLastBlock = s;
    
       // 更新发送缓冲区中已被占用的数据块数量
       CGuard::enterCS(m_BufLock);
       m_iCount += size;
       CGuard::leaveCS(m_BufLock);
    
       // 更新消息编号,超出后回绕至1
       m_iNextMsgNo ++;
       if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo)
          m_iNextMsgNo = 1;
    }
    

2.5 从发送缓冲区中获取数据

  • 当发送缓冲区中无数据时,返回0
  • 按数据块进行读取,也就是说每次不一定会读取一个完整的消息
  • 也可以指定偏移量从发送缓冲区中读数据,在此过程中,如果数据在发送缓冲区中的时间已经超过了TTL,则将其从发送缓冲区中删除
  • 简单地从发送缓冲区中读数据,无论数据是否过期,即超过了TTL
    int CSndBuffer::readData(char** data, int32_t& msgno)
    {
        // 发送缓冲区中无数据,return 0
        if (m_pCurrBlock == m_pLastBlock)
            return 0;
    
        // 读一个数据块
        *data = m_pCurrBlock->m_pcData;
        int readlen = m_pCurrBlock->m_iLength;
        msgno = m_pCurrBlock->m_iMsgNo;
    
        // 更新数据块指针
        m_pCurrBlock = m_pCurrBlock->m_pNext;
        
        return readlen;
    }
    
  • 按偏移量从发送缓冲区中读数据,并且丢弃超时未发送的数据
    int CSndBuffer::readData(char** data, const int offset, int32_t& msgno, int& msglen)
    {
        CGuard bufferguard(m_BufLock);
        
        Block* p = m_pFirstBlock;
        
        // 偏移,按数据块进行偏移
        for (int i = 0; i < offset; ++ i)
            p = p->m_pNext;
    
        // 数据是否过期,如果当前时间-数据产生的时间大于数据的TTL,则认为数据过期;
        // 删除数据并返回-1,表示读取失败
        if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t)p->m_iTTL))
        {
        // 获取消息号
        msgno = p->m_iMsgNo & 0x1FFFFFFF;
    
        msglen = 1;
        p = p->m_pNext;
        bool move = false;
        // 移除所有消息号相同的数据块
        while (msgno == (p->m_iMsgNo & 0x1FFFFFFF))
        {
            if (p == m_pCurrBlock)
                move = true;
            p = p->m_pNext;
            if (move)
                m_pCurrBlock = p;
            
            msglen ++;
        }
    
            return -1;
        }	
    
        // 正常读取数据,读一个数据块
        *data = p->m_pcData;
        int readlen = p->m_iLength;
        msgno = p->m_iMsgNo;
    
        return readlen;
    }
    

2.6 丢弃已被确认的数据

  • 需要丢弃已经被对端确认的数据
    void CSndBuffer::ackData(int offset)
    {
        CGuard bufferguard(m_BufLock);
        
        // 丢弃已经被对端确认的数据
        for (int i = 0; i < offset; ++ i)
            m_pFirstBlock = m_pFirstBlock->m_pNext;
        
        // 更新发送缓冲区中已占用的数据块数量
        m_iCount -= offset;
        
        // 触发事件,通知发送缓冲区已更新;由UDT epoll来处理,后续再详细介绍
        CTimer::triggerEvent();
    }
    

3. 总结

  • UDT实现了一套类似TCP的发送缓冲区机制
  • 发送缓冲区可动态扩容,但是由于收到UDT流量控制机制的影响,发送端的发送速率收到了限制,这样也避免了发送缓冲区无限扩容的bug
  • 调用相关API发送数据时,数据首先转存到发送缓冲区中,等数据达到调度时间后,将数据从发送缓冲区中取出,调用系统API sendto()/sendmsg()发送到对端,后文再详细介绍数据的发送过程
  • 发送缓冲区中的数据按块进行管理,每个数据块的大小固定为m_iMSS,数据块中的数据可以少于m_iMSS,此时仍占用一个完整的数据块
  • 所有的数据块使用一个循环链表来维护
  • 数据块是向发送缓冲区中读写数据的基本单元,无法实现按字节进行读取
  • 一个完整的消息可能会占用多个数据块,使用消息编号int32_t m_iMsgNo来区分不同的消息;既然一个消息可能占用多个数据块,而消息的发送/接收又是按块进行的,那么必然需要一种分片重组的机制;m_iMsgNo的bit[31:30]就用来表示一个消息的头和尾,在循环链表中找到消息的头和尾,就能够还原出一条完整的消息
  • 将数据放入发送缓冲区时,同时指定了数据的TTL,即数据的生存时长,超时未发送的数据将被丢弃

标签:iMSS,UDT,管理,发送缓冲区,pb,int,数据,pNext
From: https://www.cnblogs.com/zhijun1996/p/18510097

相关文章

  • 培育增长新动能,英搏尔数字化管理升级与创新的实践
    新能源汽车产业作为国家战略性新兴产业,是新质生产力的重要阵地,目前我国已成为全球最大的新能源汽车消费市场,随着全球化趋势的加速和国际厂商的布局,国内汽车零部件市场面临着激烈的竞争,亟需通过数字化转型和技术的研发和创新,以提高产业竞争力和市场占有率。针对目前行业加速分化......
  • 基于SSM餐饮管理系统的设计与实现
    前言餐饮管理系统的目的是让使用者可以更方便的将人、设备和场景更立体的连接在一起。能让用户以更科幻的方式使用产品,体验高科技时代带给人们的方便,同时也能让用户体会到与以往常规产品不同的体验风格。此系统设计主要采用的是JAVA语言来进行开发,JSP技术、采用SSM框架技......
  • 智慧税务管理:金融企业报税效率与合规性提升
    前言在数字化浪潮席卷全球的今天,金融行业正面临前所未有的挑战与机遇。如何在复杂的税务环境中保持合规并提高效率,已成为每个金融企业的重中之重。今天小编就为大家介绍一下如何通过借助智能税务平台,实现税务管理的智能化革新,提升企业的竞争力和可持续发展能力。行业背景税制改......
  • springboot琴行培训机构管理系统-计算机毕业设计源码49732
    目 录摘要1绪论1.1研究背景1.2 研究意义1.3论文结构与章节安排2 系统分析2.1可行性分析2.2系统流程分析2.2.1数据新增流程2.2.2 数据删除流程2.3 系统功能分析2.3.1功能性分析2.3.2非功能性分析2.4本章小结3系统总体设计3.1系......
  • project项目管理下载:附安装包+保姆级教程
    如大家所熟悉的,Project是一种用的比较多的项目管理软件,它主要用于帮助项目经理在各个方面高效地规划、监控、并管理项目。这款软件支持任务分配、进度跟踪、资源管理、和财务预算等关键领域的管理工作。其中,资源管理功能尤为突出,可以在确保资源高效分配的同时,优化整个项目的执......
  • 在 HarmonyOS Next 中使用 Core File Kit 管理应用文件
    本文旨在深入探讨华为鸿蒙HarmonyOSNext系统(截止目前API12)的文件管理技术细节,基于实际开发实践进行总结。主要作为技术分享与交流载体,难免错漏,欢迎各位同仁提出宝贵意见和问题,以便共同进步。本文为原创内容,任何形式的转载必须注明出处及原作者。在应用开发中,文件的创建、读......
  • java+SSM+mysql缴税管理系统70555-计算机毕设 原创毕设选题推荐(免费领源码)
    摘 要随着互联网大趋势的到来,社会的方方面面,各行各业都在考虑利用互联网作为媒介将自己的信息更及时有效地推广出去,而其中最好的方式就是建立网络管理系统,并对其进行信息管理。由于现在网络的发达,缴税管理系统的信息通过网络进行信息管理掀起了热潮,所以针对管理的用户需求......
  • API 接口管理 架构 api接口设计
    提供给第三方的业务接口应该如何设计呢?需要从哪些方面考虑?以及如何实现这些方面?1、标准化RESTful 2、安全性1)请求token(防止接口被第三方调用)token作为调用系统的凭证。token可以设置一次有效(安全性最高,完全防止接口被第三方调用),不过推荐设置时效性,减少获......
  • 数据科学项目管理的最佳实践
    文章开头段落:数据科学项目管理的最佳实践包括项目定义与规划、团队构建与合作、数据管理、流程与工具、沟通与报告。其中,项目定义与规划是项目成功的关键因素,它要求明确项目的目标、范围、时间线和预期结果。在这一阶段中,以问题为导向,制定合理的假设条件、识别关键假设,并为接下......
  • 基于Springboot+Vue的候鸟监测数据管理系统 (含源码数据库)
    1.开发环境开发系统:Windows10/11架构模式:MVC/前后端分离JDK版本:JavaJDK1.8开发工具:IDEA数据库版本:mysql5.7或8.0数据库可视化工具:navicat服务器:SpringBoot自带apachetomcat主要技术:Java,Springboot,mybatis,mysql,vue2.视频演示地址3.功能这个系......