01 disruptor实现原理
disruptor是一种基于共享内存的进程间通信方式;接下来我们对该开源代码进行解读
环形队列设计原理
使用环形队列,实际上就是在堆上申请的一个大小为cap的数组,要求队列大小为2的N次方,为了满足位运算,快速计算出索引index(比取模的速度快)。对该数组的访问将由2个索引进行(cursor:消费者目前所在的索引位置,next:生产者目前所在的索引位置)前者表示的是读取者索引,后者表示的是写者索引。不管是cursor还是next,我们在获取索引时都需要让它们和cap进行取模操作,disruptor为了加速,强制要求 cap为2的N次方,然后使用以下的算法来替代取模操作,以下是算法语法:
//快速判断cap是否为2的N次方
bool is_power2 = cap && !( (cap-1) & cap);
在环形队列中有2种循环方式:
- 当索引小于cap的时候累加,当索引等于cap的时候索引置0.这种情况可以直接使用索引对队列进行访问。
- 索引一直累加,当需要访问队列的时候,将索引对cap进行与运算;
//将index和cap进行与操作,(类似取模操作 5%8=0 ...余5; 9%8=1 ...余1;)前提是cap必须是2的N次方
int64_t translated_index = (index & (cap -1) );
disruptor选择第二种,该方法的好处是,写索引永远>=读索引。这样生产者写入数据时,只需要判断写入索引减去读取索引是否>=cap; 如果是,则生产者需要等待消费者消费,disruptor中获取生产者下标索引的实现就是使用了这一特性。
int64_t SharedMemRingBuffer::ClaimIndex(int caller_id )
{
// 1. 获取生产者的下标
int64_t nNextSeqForClaim = GetNextSequenceForClaim() ;
// 2. 得到生产者实际下标
int64_t wrapPoint = nNextSeqForClaim - buffer_size_;
do {
//3. 获取消费者下标
int64_t gatingSequence = GetMinIndexOfConsumers();
//4. 如果实际生产者下标比最小消费者下标还要大
if (wrapPoint >= gatingSequence ) {
//等待
std::this_thread::yield();
continue;
} else {
break;
}
}
while (true);
return nNextSeqForClaim;
}
使用原子变量记录生产者和消费者的个数,减少锁的使用
使用cache line进行隔离
使用cache line进行隔离,避免多线程情况下,由于两个变量处于同一个cache line的伪共享问题;
读写的无锁设计
通过原子变量,每个生产者和消费者都需要申请数组中可以操作的元素索引,申请到了才可以进行读写操作; 引入状态记录数组来记录数组元素的状态,判断该索引位置是否可以进行读取或者写入操作;
disruptor 消费者辅助数组
disruptor引入了一个大小为MAX_CONSUMER的消费者辅助数组,存放消费者索引;以满足多消费者使用的场景。假设辅助数组名为 array_of_consumer_indexes,使用逻辑如下:
- array_of_consumer_indexes 所有元素初始化都为-1,此时表示该数组所有的id都没有消费者注册;
void SharedMemRingBuffer::ResetRingBufferState()
{
if(ring_buffer_status_on_shared_mem_ == NULL ) {
DEBUG_LOG("call InitRingBuffer first !");
return;
}
DEBUG_LOG("---");
ring_buffer_status_on_shared_mem_->cursor.store(-1);
ring_buffer_status_on_shared_mem_->next.store(-1);
ring_buffer_status_on_shared_mem_->registered_producer_count.store(0);
ring_buffer_status_on_shared_mem_->registered_consumer_count.store(0);
total_mem_size_ = 0;
for(int i = 0; i < MAX_CONSUMER; i++) {
ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[i] = -1;
}
//for blocking wait strategy : shared mutex, shared cond var
pthread_mutexattr_t mutexAttr;
pthread_mutexattr_init(&mutexAttr);
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init( & ring_buffer_status_on_shared_mem_->mtx_lock, &mutexAttr);
pthread_condattr_t condAttr;
pthread_condattr_init(&condAttr);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
pthread_cond_init( & ring_buffer_status_on_shared_mem_->cond_var, &condAttr);
}
- 通过id注册消费者,当该id已被注册时,直接返回array_of_consumer_indexes[id]+1;当该id未被注册时,返回当前最大的消费者读取索引;
bool SharedMemRingBuffer::RegisterConsumer (int id, int64_t* index_for_customer)
{
if(ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] == -1 ) {
ring_buffer_status_on_shared_mem_->registered_consumer_count++;
if( ring_buffer_status_on_shared_mem_->registered_consumer_count >= MAX_CONSUMER) {
DEBUG_ELOG("Error: Exceeds MAX_CONSUMER : " << MAX_CONSUMER);
return false;
}
if(ring_buffer_status_on_shared_mem_->cursor >= 0 ) {
DEBUG_LOG("cursor >= 0");
ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] =
ring_buffer_status_on_shared_mem_->cursor.load() ;
} else {
DEBUG_LOG("set 0 ");
ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] = 0;
}
*index_for_customer = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id];
} else {
//last read message index
*index_for_customer = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] + 1;
}
DEBUG_LOG("USAGE_CONSUMER ID : " << id<< " / index : "<< *index_for_customer);
return true;
}
- 当生产者写入数据时,先获取array_of_consumer_indexes中最小的消费者索引,然后判断写入的索引位置与最小消费者索引插值是否超过cap; 如果超过则需要进入等待,否则可以直接写入;
- 当身份为id的消费者消费的索引达到了index时,需要将array_of_consumer_indexes[id]更新为index;
bool SharedMemRingBuffer::CommitRead(int user_id, int64_t index)
{
ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[user_id] = index ; //update
#if DEBUG_PRINTF
char msg_buffer[1024];
snprintf(msg_buffer, sizeof(msg_buffer),
"[id:%d] \t\t\t\t\t\t\t\t\t\t\t\t[%s-%d] index[%" PRId64 "] ",
user_id, __func__, __LINE__, index );
{AtomicPrint atomicPrint(msg_buffer);}
#endif
return true;
}
使用互斥锁和条件变量
可以设置支持进程间通信,支持进程间同步;
使用共享内存
支持进程间的数据交互,实现==zero cory== 见文章 linux/qnx共享内存API介绍
在共享内存上进行数据收发
disruptor在创建共享内存时,除了用户所需的内存空间;还会将一些管理信息以及用于同步用的互斥锁和条件变量也创建在共享内存上。 disruptor整体的数据同步机制如下:
- 互斥锁和条件变量:通过设置PTHREAD_PROCESS_SHARED属性达到进程间同步的目的;
- 通过将原子变量设置在共享内存上,达到进程间的原子同步;
- 通过将共享内存地址映射到环形队列上,配合原子变量以及等待策略,以方便进行进程间同步;
首先==SharedMemRingBuffer==创建一个共享内存,其大小为==sizeof(RingBufferStatusOnSharedMem) + sizeof(OneBufferData)*size==,然后将共享内存==attach==到所需要的进程,==通过内存偏移量计算出用户数据的起始地址,就可以在该空间是进行数据的读写了。== ==disruptor将申请的cap个元素通过内存地址映射到一个包含cap个元素指针的环形队列上,通过该操作,九二一方便的对共享内存进行访问管理。==
等待策略
消费者通过等待策略来获取最新的可读数据。 disruptor目前有3钟等待策略,如下:
- YieldingWaitStrategy: 无锁,累加100次,每次都进行资源是否可用判断。如果可用,则返回所需要的资源;如果不可用,则一直累加到100次之后,让出线程调度时间片,当下次系统进行调度的时候,再判断资源是否可用,如果不可用则继续yield。该策略的逻辑是spin--》yield;
- SleepingWaitStrategy:无锁,该策略先快速累加100次,此过程和YieldingWaitStrategy的累加一样;再进行第二次100次的累加,在此过程中,如果条件不满足则会进行yield操作;之后则每次循环都休眠1ns。该策略逻辑为spin--》yield--》sleep。
- BlockingWaitStrategy:该策略为加锁阻塞策略。