03-disruptor的使用及缺陷
disruptor的使用
disruptor
的使用代码可以见git
仓库,这里仅仅对使用流程做一下简单介绍。
消费者读数据
消费者读数据的步骤如下:
- 注册消费者,此时每个消费者会返回一个可读的消费者索引
index_for_customer_use
- 使用
index_for_customer_use
在共享内存环形队列上等待,直到该索引位置可读,将返回一个新的索引cursor
,此时[index_for_customer_use,cursor]
的数据都是可读的 - 通过
GetData(index)
获取共享内存中的用户数据 - 使用
CommitRead
更新共享内存状态管理器中id
对应的索引,表示该id
的消费者已经 - 将
index_for_customer_use
累加,重复步骤1~3
生产者产生数据
生产者产生数据步骤如下:
- 使用
ClaimIndex
获取下一可写的buffer
索引 - 写入数据
- 更新索引
disruptor的缺陷
个人认为disruptor
主要有以下几个缺陷:
- 生产者缺少一次获取多个
buffer
的接口。生产者不能一次获取多个buffer
,如果要一次性写入多个数据,则必须多次调用claimindex
,不仅效率低下,而且对上层调用者不太友好 - 消费者注册的索引必须是从
0
开始按1
进行递增的,不能随机,不仅限制了上层应用的使用,而且如果不按此方式进行递增,可能会造成获取生产者索引时的陷入无限等待。如果将id
和index
修改成键值对进行映射的关系,则可以不必循环访问数组。如果再加一个变量用来存放最小的消费者索引,则可以避免在获取生产者索引时遍历整个消费者辅助数组(或者将数组改成set
也行) - 多线程测试程序中,在进程结束时,没有删除共享内存。应该在进程结束前调用
TerminateRingBuffer
- 消费者读取数据后需要手动更新读索引,生产者在写入数据后需要调用另外的接口更新写索引。不管是消费者还是生产者,这两步都应该有一个接口可以一次性的获取或写入数据,并且更新索引。如果每次都要求显示的调用更新接口,那么肯定会存在忘记调用该接口的情况,不太友好。
disruptor
自己的SharedMemRingBuffer
仅仅支持OneBufferData
类型进行,应该设计成模板或者其中的data
由int64_t
改成void *
以接收任何类型的数据
不注册id
为0
的消费者验证disruptor
的陷入死循环
条件:
- 不按照
id
从0
依次按1
递增注册消费者 - 将循环队列的
buffer_size
设置成较小,以满足等待的写入索引 - buffer_size > 消费者最小索引
我们使用线程间通信的方式进行测试,测试代码如下:
#include <iostream>
#include <atomic>
#include <thread>
#include <mutex>
#include <fstream>
#include <random>
#include <sstream>
#include "../../ring_buffer_on_shmem.hpp"
#include "../../shared_mem_manager.hpp"
#include "../../atomic_print.hpp"
#include "../../elapsed_time.hpp"
//SharedMemRingBuffer g_shared_mem_ring_buffer (YIELDING_WAIT);
//SharedMemRingBuffer g_shared_mem_ring_buffer (SLEEPING_WAIT);
SharedMemRingBuffer g_shared_mem_ring_buffer (BLOCKING_WAIT);
void ThreadWorkWrite(std::string tid, int my_id)
{
int64_t my_index = -1;
while (true)
{
OneBufferData my_data;
my_index = g_shared_mem_ring_buffer.ClaimIndex(my_id);
my_data.producer_id = my_id;
my_data.data = std::rand() % 1000 + 99;
g_shared_mem_ring_buffer.SetData( my_index, &my_data );
g_shared_mem_ring_buffer.Commit(my_id, my_index);
{
std::stringstream ss;
ss << "product,id:" << my_id << ",data:" << my_data.data << std::endl;
AtomicPrint ap(ss.str());
}
usleep(my_data.data % 100 + 1);
}
}
void ThreadWorkRead(std::string tid, int my_id, int64_t index_for_customer_use)
{
int64_t index = index_for_customer_use;
while(true)
{
int64_t ret_index = g_shared_mem_ring_buffer.WaitFor(my_id,index);
for (int64_t i = index;i <= ret_index;++i)
{
auto pdata = g_shared_mem_ring_buffer.GetData(i);
std::stringstream ss;
ss << "consumer,id:" << pdata->producer_id << ",data:" << pdata->data << std::endl;
AtomicPrint ap(ss.str());
usleep(pdata->data % 100 + 1);
}
index++;
}
}
void TestFunc(size_t consumer_cnt,size_t producer_cnt)
{
std::vector<std::thread> consumer_threads ;
std::vector<std::thread> producer_threads ;
//Consumer
//1. register
std::vector<int64_t> vec_consumer_indexes ;
for(size_t i = 0; i < consumer_cnt; i++) {
int64_t index_for_customer_use = -1;
// 这里,我们跳过 id 为 0 的消费者注册,会看到
if (i == 0)
{
continue;
}
if(!g_shared_mem_ring_buffer.RegisterConsumer(i, &index_for_customer_use )) {
return; //error
}
DEBUG_LOG("index_for_customer_use = " <<index_for_customer_use );
vec_consumer_indexes.push_back(index_for_customer_use);
}
//2. 运行消费者线程
for(size_t i = 1; i < consumer_cnt; i++) {
consumer_threads.push_back (std::thread (ThreadWorkRead, "consumer", i, vec_consumer_indexes[i] ) );
}
//3. 运行生产者线程
for(size_t i = 0; i < producer_cnt; i++) {
producer_threads.push_back (std::thread (ThreadWorkWrite, "procucer", i ) );
}
for(auto &p:producer_threads) {
p.join();
}
for(auto &c:consumer_threads) {
c.join();
}
g_shared_mem_ring_buffer.TerminateRingBuffer();
}
int main(int argc, char* argv[])
{
int64_t buffer_cap = 8;
if(! g_shared_mem_ring_buffer.InitRingBuffer(buffer_cap) ) {
printf("InitRingBuffer failed,process exiting...");
return 1;
}
TestFunc(10,1);
return 0;
}
编译并运行上诉代码,如果在disruptor
中添加打印信息,将看到生产者永远在执行。这里贴一下disruptor
相关的源码然后再进行解释。
int64_t SharedMemRingBuffer::GetMinIndexOfConsumers()
{
int64_t min_index = INT64_MAX ;
bool is_found = false;
for(size_t i = 0; i < ring_buffer_status_on_shared_mem_->registered_consumer_count; i++) {
int64_t index = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[i];
if( index < min_index ) {
min_index = index;
is_found = true;
}
}
if(! is_found) {
return 0;
}
return min_index ;
}
// fetch_add:先返回值,然后对原来的值进行加一,等价于i++的原子操作
// next.fetch_add(1) + 1,实际就是返回next的下一索引,并将next移动到下一索引
int64_t SharedMemRingBuffer::GetNextSequenceForClaim()
{
return ring_buffer_status_on_shared_mem_->next.fetch_add(1) + 1;
}
int64_t SharedMemRingBuffer::ClaimIndex(int caller_id )
{
int64_t nNextSeqForClaim = GetNextSequenceForClaim() ;
int64_t wrapPoint = nNextSeqForClaim - buffer_size_;
do {
int64_t gatingSequence = GetMinIndexOfConsumers();
if (wrapPoint >= gatingSequence ) {// 写索引 - 最小读索引 >= buffer_size_,说明写者快,生产者需要等待消费者读取数据!!
std::this_thread::yield();
continue;
} else {
break;
}
}
while (true);
{AtomicPrint atomicPrint("ClaimIndex ok");}
return nNextSeqForClaim;
}
由于测试代码没有注册id
为0
的消费者,因此GetMinIndexOfConsumers
将永远返回-1
。当生产者生产的数据超过了buffer_size
的大小后,wrapPoint = nNextSeqForClaim - buffer_size_ >= 0
将永远成立,此时wrapPoint >= gatingSequence
也将永远成立,导致生产者陷入死循环。