首页 > 其他分享 >[disruptor]03-disruptor的使用及缺陷

[disruptor]03-disruptor的使用及缺陷

时间:2023-01-16 14:32:16浏览次数:53  
标签:disruptor include index buffer 03 索引 缺陷 my id

03-disruptor的使用及缺陷

disruptor的使用

disruptor的使用代码可以见git仓库,这里仅仅对使用流程做一下简单介绍。

消费者读数据

消费者读数据的步骤如下:

  1. 注册消费者,此时每个消费者会返回一个可读的消费者索引index_for_customer_use
  2. 使用index_for_customer_use在共享内存环形队列上等待,直到该索引位置可读,将返回一个新的索引cursor,此时[index_for_customer_use,cursor]的数据都是可读的
  3. 通过GetData(index)获取共享内存中的用户数据
  4. 使用CommitRead更新共享内存状态管理器中id对应的索引,表示该id的消费者已经
  5. index_for_customer_use累加,重复步骤1~3

生产者产生数据

生产者产生数据步骤如下:

  1. 使用ClaimIndex获取下一可写的buffer索引
  2. 写入数据
  3. 更新索引

disruptor的缺陷

个人认为disruptor主要有以下几个缺陷:

  1. 生产者缺少一次获取多个buffer的接口。生产者不能一次获取多个buffer,如果要一次性写入多个数据,则必须多次调用claimindex,不仅效率低下,而且对上层调用者不太友好
  2. 消费者注册的索引必须是从0开始按1进行递增的,不能随机,不仅限制了上层应用的使用,而且如果不按此方式进行递增,可能会造成获取生产者索引时的陷入无限等待。如果将idindex修改成键值对进行映射的关系,则可以不必循环访问数组。如果再加一个变量用来存放最小的消费者索引,则可以避免在获取生产者索引时遍历整个消费者辅助数组(或者将数组改成set也行)
  3. 多线程测试程序中,在进程结束时,没有删除共享内存。应该在进程结束前调用TerminateRingBuffer
  4. 消费者读取数据后需要手动更新读索引,生产者在写入数据后需要调用另外的接口更新写索引。不管是消费者还是生产者,这两步都应该有一个接口可以一次性的获取或写入数据,并且更新索引。如果每次都要求显示的调用更新接口,那么肯定会存在忘记调用该接口的情况,不太友好。
  5. disruptor自己的SharedMemRingBuffer仅仅支持OneBufferData类型进行,应该设计成模板或者其中的dataint64_t改成void *以接收任何类型的数据

不注册id0的消费者验证disruptor的陷入死循环

条件:

  1. 不按照id0依次按1递增注册消费者
  2. 将循环队列的buffer_size设置成较小,以满足等待的写入索引 - buffer_size > 消费者最小索引

我们使用线程间通信的方式进行测试,测试代码如下:

#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <fstream>
#include <random>
#include <sstream>

#include "../../ring_buffer_on_shmem.hpp" 
#include "../../shared_mem_manager.hpp" 
#include "../../atomic_print.hpp"
#include "../../elapsed_time.hpp"


//SharedMemRingBuffer g_shared_mem_ring_buffer (YIELDING_WAIT); 
//SharedMemRingBuffer g_shared_mem_ring_buffer (SLEEPING_WAIT); 
SharedMemRingBuffer g_shared_mem_ring_buffer (BLOCKING_WAIT); 

void ThreadWorkWrite(std::string tid, int my_id) 
{
    int64_t my_index = -1;
    while (true)
    {
        OneBufferData my_data;
        my_index = g_shared_mem_ring_buffer.ClaimIndex(my_id);
        my_data.producer_id = my_id;
        my_data.data = std::rand() % 1000 + 99;
        g_shared_mem_ring_buffer.SetData( my_index, &my_data );
        g_shared_mem_ring_buffer.Commit(my_id, my_index); 
        {
            std::stringstream ss;
            ss << "product,id:" << my_id << ",data:" << my_data.data << std::endl;
            AtomicPrint ap(ss.str());
        }
        usleep(my_data.data % 100 + 1);
    }
}

void ThreadWorkRead(std::string tid, int my_id, int64_t index_for_customer_use) 
{
    int64_t index = index_for_customer_use;
    while(true)
    {
        int64_t ret_index = g_shared_mem_ring_buffer.WaitFor(my_id,index);
        for (int64_t i = index;i <= ret_index;++i)
        {
            auto pdata = g_shared_mem_ring_buffer.GetData(i);
            std::stringstream ss;
            ss << "consumer,id:" << pdata->producer_id << ",data:" << pdata->data << std::endl;
            AtomicPrint ap(ss.str());
            usleep(pdata->data % 100 + 1);
        }
        index++;
    }
}

void TestFunc(size_t consumer_cnt,size_t producer_cnt)
{
    std::vector<std::thread> consumer_threads ;
    std::vector<std::thread> producer_threads ;
    //Consumer
    //1. register
    std::vector<int64_t> vec_consumer_indexes ;
    for(size_t i = 0; i < consumer_cnt; i++) {
        int64_t index_for_customer_use = -1;
        // 这里,我们跳过 id 为 0 的消费者注册,会看到
        if (i == 0)
        {
            continue;
        }
        if(!g_shared_mem_ring_buffer.RegisterConsumer(i, &index_for_customer_use )) {
            return; //error
        }
        DEBUG_LOG("index_for_customer_use = " <<index_for_customer_use );
        vec_consumer_indexes.push_back(index_for_customer_use);
    }

    //2. 运行消费者线程
    for(size_t i = 1; i < consumer_cnt; i++) {
        consumer_threads.push_back (std::thread (ThreadWorkRead, "consumer", i, vec_consumer_indexes[i] ) );
    }

    //3. 运行生产者线程
    for(size_t i = 0; i < producer_cnt; i++) {
        producer_threads.push_back (std::thread (ThreadWorkWrite, "procucer", i ) );
    }
    for(auto &p:producer_threads) {
        p.join();
    }
    for(auto &c:consumer_threads) {
        c.join();
    }

    g_shared_mem_ring_buffer.TerminateRingBuffer();
}

int main(int argc, char* argv[])
{
    int64_t buffer_cap = 8;
    if(! g_shared_mem_ring_buffer.InitRingBuffer(buffer_cap) ) { 
        printf("InitRingBuffer failed,process exiting...");
        return 1; 
    }

    TestFunc(10,1);
    return 0;
}

编译并运行上诉代码,如果在disruptor中添加打印信息,将看到生产者永远在执行。这里贴一下disruptor相关的源码然后再进行解释。

int64_t SharedMemRingBuffer::GetMinIndexOfConsumers()
{
    int64_t min_index = INT64_MAX ;
    bool is_found = false;

    for(size_t i = 0; i < ring_buffer_status_on_shared_mem_->registered_consumer_count; i++) {
        int64_t index = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[i];
        if( index < min_index ) {
            min_index = index;
            is_found = true;
        }
    }
    if(! is_found) {
        return 0;
    }
    return min_index ;
}

// fetch_add:先返回值,然后对原来的值进行加一,等价于i++的原子操作
// next.fetch_add(1) + 1,实际就是返回next的下一索引,并将next移动到下一索引
int64_t SharedMemRingBuffer::GetNextSequenceForClaim()
{
    return ring_buffer_status_on_shared_mem_->next.fetch_add(1) + 1;
}

int64_t SharedMemRingBuffer::ClaimIndex(int caller_id )
{
    int64_t nNextSeqForClaim = GetNextSequenceForClaim() ;
    int64_t wrapPoint = nNextSeqForClaim - buffer_size_;
    do {
        int64_t gatingSequence = GetMinIndexOfConsumers();
        if (wrapPoint >=  gatingSequence  ) {// 写索引 - 最小读索引 >= buffer_size_,说明写者快,生产者需要等待消费者读取数据!!
            std::this_thread::yield(); 
            continue;
        } else {
            break;
        }
    }
    while (true);


    {AtomicPrint atomicPrint("ClaimIndex ok");}
    return nNextSeqForClaim;
}

由于测试代码没有注册id0的消费者,因此GetMinIndexOfConsumers将永远返回-1。当生产者生产的数据超过了buffer_size的大小后,wrapPoint = nNextSeqForClaim - buffer_size_ >= 0 将永远成立,此时wrapPoint >= gatingSequence 也将永远成立,导致生产者陷入死循环。

标签:disruptor,include,index,buffer,03,索引,缺陷,my,id
From: https://blog.51cto.com/u_6650004/6010422

相关文章

  • 分布式搜索引擎03
    分布式搜索引擎030.学习目标数据聚合:解决复杂的统计搜索问题自动补全:当用户在搜索框内输入相关的词条拼音首字母实时给与对应的提示数据同步: 当mysql中的数......
  • [1036]Linux启动时间分析
    简述今天有同事咨询:项目上有台服务器操作系统启动时间较长,如何分析?果然,好问题都来自实践。经过查找,对于所有基于systemd的系统,可以使用systemd-analyze来分析系统启动时间。......
  • simulink使用AWGN报错:When the 'Mode' parameter is set to 'Signal to noise ratio',
    原因:当“模式”参数设置为“信噪比”时,输入和输出必须有离散的采样时间。解决:输入端的信号设置sample time,即采样率;输出端增加0阶保持器,不然matlab无法计算 ......
  • Google's B-tree挺快
    #if0#include"set.h"//github.com/Kronuz/cpp-btreeusingnamespacebtree;#else#include<set>//en.cppreference.com/w/cpp/container/multiset#endif#incl......
  • Execution failed for task ':app:checkDebugDuplicateClasses'解决办法
    Afailureoccurredwhileexecutingcom.android.build.gradle.internal.tasks.CheckDuplicatesRunnable >Duplicateclassandroid.support.v4.app.INotificationSi......
  • ORA-06502: PL/SQL: 'Numeric or Value Error' When CLOB Convert to VARCHAR2 on a M
    OnadatabasewithmultibytecharactersetlikeAL32UTF8specifiedforNLS_CHARACTERSETthefollowingerrorisreceivedwhenaCLOBwhichcontainsmorethan819......
  • Go语言学习之 Day03
    网络编程TCP服务器/客户端开发UDP服务器/客户端开发命令行聊天室web开发HTTP协议web应用开发客户端开发Web爬虫HTML结构GoqueryRPC......
  • 每日食词—day103
    lookupn.查找、检查、查询startern. adj.启动器、启动机goaln. v.目标、目的、球门frameworkn.框架、构架、架构、结构enablev.开启、有效、使能够......
  • 洛谷 P1036 选数
    原题链接题解:#include"iostream"#include"algorithm"#definelllonglongusingnamespacestd;llsum=0;boolprime(llx){intn=2;for(;x%n!=0;n++)......
  • Potree 003 基于Potree Desktop创建自定义工程
    1、第三方js库第三方库js库选择dojo,其官网地址为https://dojotoolkit.org/,git地址为https://github.com/dojo/dojo,demo地址为https://demos.dojotoolkit.org/demos/,如果打......