首页 > 其他分享 >开源版本Disruptor代码解读记录

开源版本Disruptor代码解读记录

时间:2023-10-19 20:33:19浏览次数:43  
标签:Disruptor buffer 解读 索引 开源 shared ring mem id

01 disruptor实现原理

disruptor是一种基于共享内存的进程间通信方式;接下来我们对该开源代码进行解读

环形队列设计原理

使用环形队列,实际上就是在堆上申请的一个大小为cap的数组,要求队列大小为2的N次方,为了满足位运算,快速计算出索引index(比取模的速度快)。对该数组的访问将由2个索引进行(cursor:消费者目前所在的索引位置,next:生产者目前所在的索引位置)前者表示的是读取者索引,后者表示的是写者索引。不管是cursor还是next,我们在获取索引时都需要让它们和cap进行取模操作,disruptor为了加速,强制要求 cap为2的N次方,然后使用以下的算法来替代取模操作,以下是算法语法:

//快速判断cap是否为2的N次方
bool is_power2 = cap && !( (cap-1) & cap);

在环形队列中有2种循环方式:

  1. 当索引小于cap的时候累加,当索引等于cap的时候索引置0.这种情况可以直接使用索引对队列进行访问。
  2. 索引一直累加,当需要访问队列的时候,将索引对cap进行与运算;
//将index和cap进行与操作,(类似取模操作 5%8=0 ...余5; 9%8=1 ...余1;)前提是cap必须是2的N次方
int64_t translated_index = (index & (cap -1) );

disruptor选择第二种,该方法的好处是,写索引永远>=读索引。这样生产者写入数据时,只需要判断写入索引减去读取索引是否>=cap; 如果是,则生产者需要等待消费者消费,disruptor中获取生产者下标索引的实现就是使用了这一特性。

int64_t SharedMemRingBuffer::ClaimIndex(int caller_id )
{
    // 1. 获取生产者的下标
    int64_t nNextSeqForClaim = GetNextSequenceForClaim() ;
   // 2. 得到生产者实际下标
    int64_t wrapPoint = nNextSeqForClaim - buffer_size_;
    do {
        //3. 获取消费者下标
        int64_t gatingSequence = GetMinIndexOfConsumers();
        //4. 如果实际生产者下标比最小消费者下标还要大
        if (wrapPoint >=  gatingSequence  ) {
           //等待
            std::this_thread::yield();
            continue;
        } else {
            break;
        }
    }
    while (true);
    return nNextSeqForClaim;
}

使用原子变量记录生产者和消费者的个数,减少锁的使用

使用cache line进行隔离

使用cache line进行隔离,避免多线程情况下,由于两个变量处于同一个cache line的伪共享问题;

读写的无锁设计

通过原子变量,每个生产者和消费者都需要申请数组中可以操作的元素索引,申请到了才可以进行读写操作; 引入状态记录数组来记录数组元素的状态,判断该索引位置是否可以进行读取或者写入操作;

disruptor 消费者辅助数组

disruptor引入了一个大小为MAX_CONSUMER的消费者辅助数组,存放消费者索引;以满足多消费者使用的场景。假设辅助数组名为 array_of_consumer_indexes,使用逻辑如下:

  • array_of_consumer_indexes 所有元素初始化都为-1,此时表示该数组所有的id都没有消费者注册;
void SharedMemRingBuffer::ResetRingBufferState() 
{
    if(ring_buffer_status_on_shared_mem_ == NULL ) {
        DEBUG_LOG("call InitRingBuffer first !"); 
        return;
    }
    DEBUG_LOG("---"); 
    ring_buffer_status_on_shared_mem_->cursor.store(-1);
    ring_buffer_status_on_shared_mem_->next.store(-1);
    ring_buffer_status_on_shared_mem_->registered_producer_count.store(0);
    ring_buffer_status_on_shared_mem_->registered_consumer_count.store(0);
    total_mem_size_ = 0;
    for(int i = 0; i < MAX_CONSUMER; i++) {
        ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[i] = -1;
    }
    //for blocking wait strategy : shared mutex, shared cond var
    pthread_mutexattr_t mutexAttr;
    pthread_mutexattr_init(&mutexAttr);
    pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init( & ring_buffer_status_on_shared_mem_->mtx_lock, &mutexAttr);

    pthread_condattr_t condAttr;
    pthread_condattr_init(&condAttr);
    pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
    pthread_cond_init( & ring_buffer_status_on_shared_mem_->cond_var, &condAttr);
}
  • 通过id注册消费者,当该id已被注册时,直接返回array_of_consumer_indexes[id]+1;当该id未被注册时,返回当前最大的消费者读取索引;
bool SharedMemRingBuffer::RegisterConsumer (int id, int64_t* index_for_customer)
{
    if(ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] == -1 ) {
        ring_buffer_status_on_shared_mem_->registered_consumer_count++;
        if( ring_buffer_status_on_shared_mem_->registered_consumer_count >= MAX_CONSUMER) {
            DEBUG_ELOG("Error: Exceeds MAX_CONSUMER : " << MAX_CONSUMER); 
            return false;
        }
        if(ring_buffer_status_on_shared_mem_->cursor >= 0 ) {
            DEBUG_LOG("cursor >= 0"); 
            ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] = 
                        ring_buffer_status_on_shared_mem_->cursor.load() ; 
        } else {
            DEBUG_LOG("set 0 "); 
            ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] = 0; 
        }
        *index_for_customer = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id];
    } else {
        //last read message index 
        *index_for_customer = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] + 1;
    }
    DEBUG_LOG("USAGE_CONSUMER ID : " << id<< " / index : "<< *index_for_customer); 
    return true;
}
  • 当生产者写入数据时,先获取array_of_consumer_indexes中最小的消费者索引,然后判断写入的索引位置与最小消费者索引插值是否超过cap; 如果超过则需要进入等待,否则可以直接写入;
  • 当身份为id的消费者消费的索引达到了index时,需要将array_of_consumer_indexes[id]更新为index;
bool  SharedMemRingBuffer::CommitRead(int user_id, int64_t index)
{
    ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[user_id] = index ; //update
#if DEBUG_PRINTF
    char msg_buffer[1024];
    snprintf(msg_buffer, sizeof(msg_buffer), 
        "[id:%d]     \t\t\t\t\t\t\t\t\t\t\t\t[%s-%d] index[%" PRId64 "] ", 
        user_id, __func__, __LINE__, index );
    {AtomicPrint atomicPrint(msg_buffer);}
#endif
    return true;
}

使用互斥锁和条件变量

可以设置支持进程间通信,支持进程间同步;

使用共享内存

支持进程间的数据交互,实现==zero cory== 见文章 linux/qnx共享内存API介绍

在共享内存上进行数据收发

disruptor在创建共享内存时,除了用户所需的内存空间;还会将一些管理信息以及用于同步用的互斥锁和条件变量也创建在共享内存上。 disruptor整体的数据同步机制如下:

  • 互斥锁和条件变量:通过设置PTHREAD_PROCESS_SHARED属性达到进程间同步的目的;
  • 通过将原子变量设置在共享内存上,达到进程间的原子同步;
  • 通过将共享内存地址映射到环形队列上,配合原子变量以及等待策略,以方便进行进程间同步;

首先==SharedMemRingBuffer==创建一个共享内存,其大小为==sizeof(RingBufferStatusOnSharedMem) + sizeof(OneBufferData)*size==,然后将共享内存==attach==到所需要的进程,==通过内存偏移量计算出用户数据的起始地址,就可以在该空间是进行数据的读写了。== ==disruptor将申请的cap个元素通过内存地址映射到一个包含cap个元素指针的环形队列上,通过该操作,九二一方便的对共享内存进行访问管理。==

等待策略

消费者通过等待策略来获取最新的可读数据。 disruptor目前有3钟等待策略,如下:

  • YieldingWaitStrategy: 无锁,累加100次,每次都进行资源是否可用判断。如果可用,则返回所需要的资源;如果不可用,则一直累加到100次之后,让出线程调度时间片,当下次系统进行调度的时候,再判断资源是否可用,如果不可用则继续yield。该策略的逻辑是spin--》yield;
  • SleepingWaitStrategy:无锁,该策略先快速累加100次,此过程和YieldingWaitStrategy的累加一样;再进行第二次100次的累加,在此过程中,如果条件不满足则会进行yield操作;之后则每次循环都休眠1ns。该策略逻辑为spin--》yield--》sleep。
  • BlockingWaitStrategy:该策略为加锁阻塞策略。

标签:Disruptor,buffer,解读,索引,开源,shared,ring,mem,id
From: https://blog.51cto.com/u_15804342/7942066

相关文章

  • 开源游戏 | 一款采用 Java开发的基于小孔成像原理与图形光栅化的字符 3D 画面框架构建
     去关注、不迷路一、项目概述       这是一款采用JavaSwing开发的基于小孔成像原理与图形光栅化的字符3D画面框架构建的空战游戏,简单说就是作者为了做个3D字符空战游戏,顺手写了个3D引擎,别人的本科毕设。注:dogfight为军事用语,是指战机近距离接战缠斗,可直接......
  • 加拿大生信开源学习资源Bioinformatics.ca
    之前给大家推荐过教育部首批490门“国家精品在线开放课程”,里面很多跟生物或编程相关的免费经典课程。除了国内这些开放的学习资源外,还有许多国外的免费资源,比如英语写作常见错误和视频中是斯坦福大学老师的授课视频,很经典。如果时间紧张,只看前两节也挺好。今天给大家推荐的是加拿......
  • 同城代驾开源版小程序开发
    功能特性描述:定价模式:本系统支持灵活的计价模式,包括白天和夜晚的起步价、起步里程、每公里价以及超时费用,从而满足不同时段的定价需求。实时路径计算:通过集成腾讯地图的软件开发工具包(SDK),系统能够实时计算路线规划、订单里程并预计费用,为司机提供准确的订单信息。多功能一体:用户......
  • 最详细的 T Test 方差分析结果解读
    PValue:P值(Pvalue)是在假设检验中一个非常关键的概念。它提供了一个量化的方法来评估观察到的数据与零假设(nullhypothesis)下期望的数据之间的差异。具体来说,P值是在零假设为真的条件下,观察到当前统计量或更极端统计量的概率。以下是关于P值的更详细解释:定义:P值是给定零假设......
  • 开源项目 | 美团开源监控告警服务,Java 开发的实时应用监控平台,能够帮助开发者快速定位
     一、项目概述        CAT是基于Java开发的实时应用监控平台,为美团点评提供了全面的实时监控告警服务。        CAT作为服务端项目基础组件,提供了Java,C/C++,Node.js,Python,Go等多语言客户端,已经在美团点评的基础架构中间件框架(MVC框架,RPC框架,数据......
  • Dotnet工具箱:开源、免费的纯前端工具网站,带你探索10大工具分类和73个实时在线小工具
    https://www.cnblogs.com/Dotnet9-com/p/17767405.html1.前言大家好,我是沙漠尽头的狼。Dotnet工具箱是一个纯前端的、开源和免费的工具网站,周末我参考了开源项目it-tools,对网站界面文字进行了汉化,并重新部署了网站。该网站共有10大工具分类,提供了73个实时在线小工具。使用Vue3......
  • 开源协议说明
    GPL协议:即通用性公开许可证(GeneralPublicLicense,简称GPL)。GPL同其它的自由软件许可证一样,许可社会公众享有:运行、复制软件的自由,发行传播软件的自由,获得软件源码的自由,改进软件并将自己作出的改进版本向社会发行传播的自由。 GPL还规定:只要这种修改文本在整体上或者其......
  • PHP微信墙制作,开源
    注意:由于微信官网不定时会更新,其中模拟登陆以及爬取数据的方式可能会失效,最近这12个月里,就有两次更新导致此功能需要重写。服务端源码->github地址传送门思路其实实现思路就是通过模拟登陆的方式登录到微信平台,然后通过正则表达式获取指定的内容放到数据库里面,同时这个操作要......
  • 基于开源模型搭建实时人脸识别系统(四):人脸质量
    续人脸识别实战之基于开源模型搭建实时人脸识别系统(三):人脸关键点、对齐模型概览与模型选型_CodingInCV的博客-CSDN博客不论对于静态的人脸识别还是动态的人脸识别,我们都会面临一个问题,就是输入的人脸图像的质量可能会很差,比如人脸角度很大,人脸很模糊,人脸亮度很亮或很暗。这些质量......
  • 开源或免费字体
    文泉驿 http://wenq.org/wqy2/index.cgi霞鹜文楷 https://github.com/lxgw/LxgwWenKai阿里巴巴字体 https://fonts.alibabagroup.com/#/home站酷仓耳渔阳体等https://www.zcool.com.cn/special/zcoolyytfonts/金山云技术体 https://design.ksyun.com/font思源宋体 https://s......