disruptor原理
- 使用循环队列,且要求队列大小为2的N次方,以满足位运算快速计算索引的要求(比取模速度快)
- 使用原子变量记录生产者和消费者的个数,并且使用
cache line
进行隔离,避免多线程情况下由于两个变量处于同一cache line
的伪共享问题 - 无锁设计。通过原子变量,每个生产者和消费者都需要先申请数组中可以操作的元素索引,申请到后才能进行读取或者写入
- 引入状态记录数组来记录数组元素的状态,判断该索引位置是否可以进行读取或者写入
- 使用互斥锁和条件变量,通过设置支持进程间通信
- 使用共享内存支持进程间的数据交互,实现
zero copy
disruptor 环形队列
所谓的环形队列,实际上就是一个申请在堆上的数组,假设其大小为cap
。对该数组的访问将由2
个索引进行——cursor
(读取者目前所在索引位置)、next
(写入者目前所在索引位置),前者表示的是读取者索引,后者表示写者索引。不管是cursor
还是next
,我们在获取索引时都需要让他们和cap
进行取模操作,disruptor
为了加快操作速度,强制要求cap
为2
的N
次方,然后使用以下算法来代替取模操作,以下为相关算法:
// 将index和cap进行取模操作,前提是cap必须是2的N次方
int64_t translated_index = (index & (cap - 1));
// 快速判断cap是否为2的N次方
bool is_power2 = cap && !( (cap-1) & cap) ;
在环形队列的应用上有2
种循环方式:
- 当索引小于
cap
的时候累加,当索引等于cap
的时候将索引置零。这种情况可以直接使用索引对队列进行访问。 - 索引一直累加,当需要访问队列的时候,需要将索引对
cap
取模
disruptor
选择的是第二种,该方法的好处是写索引永远是大于等于读索引的。这样生产者写入数据时只需要判断写入索引减去读取索引是否大于等于cap
,如果是则生产者需要等待消费者。disruptor
中获取写索引的实现就使用了这一特性。
disruptor 消费者辅助数组
disruptor
引入了一个大小为MAX_CONSUMER
的消费者辅助数组存放消费者索引,用满足多消费者使用场景。假设该辅助数组名为array_of_consumer_indexes
,其使用逻辑如下:
-
array_of_consumer_indexes
所有元素初始化为-1
,此时表示该id
没有消费者注册 - 通过
id
注册消费者,当该id
已注册时,直接返回array_of_consumer_indexes[id]+1
;当该id
未注册时,返回当前最大的消费者读取索引(实际就是上面提到的curor
) - 当生产者写入数据时,先获取
array_of_consumer_indexes
中最小的消费者索引,然后判断要写入的索引位置与最小消费者索引差值是否超过了数组个数,如果超过了则需要进行等待,否则可以直接写入 - 当身份为
id
的消费者消费的索引到达了index
时,需要将array_of_consumer_indexes[id]
更新为index
等待策略
disruptor
目前有3
种等待策略,分别如下:
- YieldingWaitStrategy:无锁,累加100次,每次都进行资源是否可用的判断,如果可用则返回所需资源;如果不可用则一直累加到100次之后,让出线程调度时间片,当下次系统进行调度的时候再判断资源是否可用,如果不可用则继续
yield
。该策略的逻辑就是spin
—>yield
. - SleepingWaitStrategy:无锁,该策略先快速累加
100
次,此过程和YieldingWaitStrategy
的累加一样;再进行第二次100
次累加,在此过程中如果条件不满足则会进行yield
操作;之后则每次循环都休眠1ns
。该策略逻辑为spin
—>yield
—>sleep
- BlockingWaitStrategy:该策略为加锁阻塞策略,没有什么好介绍的。唯一要提一下的是,它获取时间使用的是
gettimeofday
,该函数获取的时间会跟随系统时间改变,因此使用clock_gettime
更加合适,因为clock_gettime
不仅精度更高,而且可以通过指定参数获取系统启动后运行的时间,且该时间不能被更改!!
共享内存管理
disruptor
的共享内存使用的是下列一系列函数:
-
shmget
:创建或者获取共享内存id
-
shmat
:将共享内存id
所代表的内存地址映射到当前进程 -
shmdt
:取消共享内存id
所代表的内存地址在当前进程的映射 -
shmctl
:删除共享内存
以上函数没有太多要介绍的,具体使用可以参考[IPC基础]01-Linux共享内存API简介和[IPC基础]02-共享内存使用示例。
在共享内存上进行数据收发
共享内存上的环形队列管理器
disruptor
在创建共享内存时,会将一些管理信息以及用于同步用的互斥锁和条件变量也创建在共享内存上。关于该部分的内容,将在代码解读部分进行说明。
共享内存上的环形队列
disruptor
通过SharedMemRingBuffer
创建锁、条件变量以及所需要的数据空间,以达到在进程间进行数据同步以及高效的进行数据交互的目的。SharedMemRingBuffer
所管理的数据类型固定为OneBufferData
,我们可以通过修改轻松的达到管理任何类型数据的目的,可以参考[IPC基础]03-通过共享内存和互斥锁、条件变量实现进程同步。
首先SharedMemRingBuffer
创建一个共享内存,其大小为sizeof(_RingBufferStatusOnSharedMem_) + sizeof(OneBufferData)*size
,然后将共享内存attach
到所需要的进程,通过内存偏移量计算出用户数据的其实地址,就可以在该空间上进行数据的读写了。有一点要稍微介绍一下的,disruptor
将申请的N
个元素通过内存地址映射到了一个包含N
个元素指针的环形队列上,通过该操作,可以方便的对共享内存进行访问管理。