环形队列
这个环形队列的原理很简单,前面已经介绍过,再次进行简略说明:
-
cap
大小必须是2
的N
次方 - 索引可以一直递增
- 访问元素时索引需要对
cap
取模
实现代码很简单,不再介绍。
template<typename T>
class RingBuffer
{
public:
RingBuffer() {
capacity_ = DEFAULT_RING_BUFFER_SIZE;
buffer_.reserve(DEFAULT_RING_BUFFER_SIZE);
}
RingBuffer(const std::vector<T>& buffer) : buffer_(buffer) {}
T& operator[](const int64_t & sequence) {
return buffer_[sequence & (capacity_ - 1)]; //only when multiple of 2
}
int64_t GetTranslatedIndex( int64_t sequence) {
int64_t translated_index = (sequence & (capacity_ - 1)) ;
return translated_index ;
}
bool SetCapacity(size_t capacity) {
bool is_power2 = capacity && !( (capacity-1) & capacity ) ;
if( is_power2 ==0 ) {
DEBUG_ELOG("Buffer capacity error: power of 2 required!");
return false;
}
try {
buffer_.reserve(capacity);
} catch (const std::length_error& le) {
DEBUG_ELOG("Length error: " << le.what() );
return false;
}
capacity_ = capacity;
return true;
}
private:
size_t capacity_ ;
std::vector<T> buffer_;
RingBuffer(const RingBuffer&);
void operator=(const RingBuffer&);
RingBuffer(RingBuffer&&);
void operator=(const RingBuffer&&);
};
共享内存状态管理器
typedef struct _RingBufferStatusOnSharedMem_
{
size_t buffer_size ;// buffer 的个数,这里就是N个 OneBufferData
size_t total_mem_size ;// 总的内存大小
std::atomic<size_t> registered_producer_count ;// 生产者个数
std::atomic<size_t> registered_consumer_count;// 消费者个数
std::atomic<int64_t> cursor alignas(CACHE_LINE_SIZE); // 读取者最后的索引位置,使用cpu cache line对齐
std::atomic<int64_t> next alignas(CACHE_LINE_SIZE); // 写入这最后的索引位置,使用cpu cache line对齐
std::atomic<int64_t> array_of_consumer_indexes [MAX_CONSUMER] __attribute__ ((aligned (CACHE_LINE_SIZE)));// 每个消费者已经读取到的索引位置,该索引永远都是递增的。该变量最大的用处就是用来获取最小的读索引。
pthread_cond_t cond_var; // 用于进程间同步的条件变量
pthread_mutex_t mtx_lock; // 用于进程间同步的互斥锁
} RingBufferStatusOnSharedMem ;
cpu cache line
是disruptor
所吹嘘
的高性能关键之一。关于cpu cache line
的原理以及测试可以参考内存对齐与伪共享。
另外disruptor
之所以可以进行跨进程之间的数据同步,是因为它是按以下行为创建互斥锁和条件变量的:
- 将互斥锁和条件变量创建在共享内存上
- 互斥锁和条件变量都设置了
PTHREAD_PROCESS_SHARED
属性
RingBufferStatusOnSharedMem
是被用来进行数据的同步以及生产者消费者读写数据时的一个状态管理器。该结构和SharedMemRingBuffer
进行了强耦合,不建议这样实现。
共享内存上的环形队列——SharedMemRingBuffer
SharedMemRingBuffer
是disruptor
的核心,它主要负责以下几个方面:
- 在共享内存上创建状态管理器、分配内存,并将内存映射到当前进程
- 创建等待策略
- 注册消费者
- 生产者获取可以写入的索引并提交数据(获取索引时可能需要等待)
- 消费者等待可读的索引,读完之后更新
id
对应的索引
下面将通过注释的方式说明其代码实现细节。
SharedMemRingBuffer
数据结构
// ring_buffer_on_shmem.hpp
#ifndef DISRUPTORCPP_RING_BUFFER_ON_SHM_HPP
#define DISRUPTORCPP_RING_BUFFER_ON_SHM_HPP
#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
#include <inttypes.h>
#include "common_def.hpp"
#include "ring_buffer.hpp"
#include "shared_mem_manager.hpp"
#include "wait_strategy.hpp"
///////////////////////////////////////////////////////////////////////////////
class SharedMemRingBuffer
{
public:
SharedMemRingBuffer(ENUM_WAIT_STRATEGY wait_strategy);
~SharedMemRingBuffer();
bool InitRingBuffer(int size=DEFAULT_RING_BUFFER_SIZE);
void ResetRingBufferState();
bool TerminateRingBuffer();
bool SetData( int64_t index, OneBufferData* data);
OneBufferData* GetData(int64_t index);
bool RegisterConsumer (int id, int64_t* index_for_customer);
int64_t GetTranslatedIndex( int64_t sequence);
void SignalAll();
//producer
int64_t ClaimIndex(int caller_id);
bool Commit(int user_id, int64_t index);
//consumer
int64_t WaitFor(int user_id, int64_t index);
bool CommitRead(int user_id, int64_t index);
private:
int64_t GetMinIndexOfConsumers();
int64_t GetNextSequenceForClaim();
//no copy allowed
SharedMemRingBuffer(SharedMemRingBuffer&) = delete;
void operator=(SharedMemRingBuffer) = delete;
private:
size_t buffer_size_ ;
size_t total_mem_size_ ;
RingBuffer<OneBufferData*> ring_buffer_ ;
WaitStrategyInterface* wait_strategy_ ;
SharedMemoryManager shared_mem_mgr_;
ENUM_WAIT_STRATEGY wait_strategy_type_;
RingBufferStatusOnSharedMem* ring_buffer_status_on_shared_mem_;
};
#endif //DISRUPTORCPP_RING_BUFFER_HPP
消费者注册(核心)
///////////////////////////////////////////////////////////////////////////////
// 注册消费者id
// index_for_customer 表示该 id 可以读取到的最大索引,在外部使用该索引的前提是,需要进行 wait
bool SharedMemRingBuffer::RegisterConsumer (int id, int64_t* index_for_customer)
{
// 该id未注册,array_of_consumer_indexes[id]用于表示id所代表的消费者可读取的索引,-1表示第一次注册
if(ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] == -1 ) { // <---- id 没有判断是否越界!!
// 首次注册
ring_buffer_status_on_shared_mem_->registered_consumer_count++;// 总的消费者个数,每注册一个id,消费者个数加一
if( ring_buffer_status_on_shared_mem_->registered_consumer_count >= MAX_CONSUMER) {
DEBUG_ELOG("Error: Exceeds MAX_CONSUMER : " << MAX_CONSUMER);
return false;
}
if(ring_buffer_status_on_shared_mem_->cursor >= 0 ) {// 当前读取到的索引位置,>=0表示之前已经读取过
DEBUG_LOG("cursor >= 0");
//데이터 전달 중이데 새로운 소비자가 추가
ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] =
ring_buffer_status_on_shared_mem_->cursor.load() ; // 新注册的消费者索引更新为最新的读取索引
} else {
DEBUG_LOG("set 0 ");
ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] = 0; // 表示当前id是第一个注册的消费者,0表示消费者可以读取的索引(使用该索引时需要进行wait,也就是等待生产者写数据)
}
*index_for_customer = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id];
} else { // 已经注册过了该id
// 返还之前最终更新的索引+1。这是consumer要调用的索引。
*index_for_customer = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[id] + 1;
}
DEBUG_LOG("USAGE_CONSUMER ID : " << id<< " / index : "<< *index_for_customer);
return true;
}
这里针对index_for_customer
的更新规则进行简要说明:
-
id
为第一次注册,且curso
r为-1
,此时消费者想要读取的索引为0
-
id
为第一次注册,且cursor
非-1
,此时cursor
就是消费者可以读取的最大索引 - 非第一次注册,则应该返回该
id
上次已经读取的索引值+1
获取消费者最小索引(核心)
// 获取最小消费者索引
int64_t SharedMemRingBuffer::GetMinIndexOfConsumers()
{
int64_t min_index = INT64_MAX ;
bool is_found = false;
for(size_t i = 0; i < ring_buffer_status_on_shared_mem_->registered_consumer_count; i++) {
int64_t index = ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[i];
if( index < min_index ) {
min_index = index;
is_found = true;
}
}
if(! is_found) {
return 0;
}
return min_index ;
}
该函数的目的并不是用来进行数据的读取的。而是为了在写数据时,获取最小的消费者索引,计算生产者索引和该索引的差值,判断生产者在写数据时是否需要进行等待。
note:以下为个人疑惑部分,待进一步验证!!
这个获取消费者最小索引,貌似有个问题。从disruptor
的实现来看,array_of_consumer_indexes
并不是从0
开始从小到大进行索引的。假设第一次注册的id
为5
,则array_of_consumer_indexes[5]=0
,此时索引为0~4
的array_of_consumer_indexes
元素值都是-1
,此时GetMinIndexOfConsumers
函数将返回-1
。那么,如果返回-1
,而不是预期的0
,将如何处理?
按照上面的逻辑,我们在注册消费者时,必须保证id
是从0
开始递增的??且id
的值不能超过MAX_CONSUMER
??
关于这部分,我们可以在之后专门写一个测试程序进行验证!!
获取生产者下一写入的开始位置(核心)
// fetch_add:先返回值,然后对原来的值进行加一,等价于i++的原子操作
int64_t SharedMemRingBuffer::GetNextSequenceForClaim()
{
return ring_buffer_status_on_shared_mem_->next.fetch_add(1) + 1;
}
获取并等待生产者的下一索引可写(核心)
- 先获取生产者下一索引
- 当
nNextSeqForClaim - gatingSequence >= buffer_size_
时,需要等待消费者的最小索引读取数据,此时让出线程调度;否则返回nNextSeqForClaim
,表示该位置可写
int64_t SharedMemRingBuffer::ClaimIndex(int caller_id )
{
int64_t nNextSeqForClaim = GetNextSequenceForClaim() ;
int64_t wrapPoint = nNextSeqForClaim - buffer_size_;
do {
int64_t gatingSequence = GetMinIndexOfConsumers();
if (wrapPoint >= gatingSequence ) {// 写索引 - 最小读索引 >= buffer_size_,说明写者快,生产者需要等待消费者读取数据!!
std::this_thread::yield();
continue;
} else {
break;
}
}
while (true);
return nNextSeqForClaim;
}
生产者更新索引(核心)
生产者先通过ClaimIndex
等待并获取下一可写位置,然后通过SetData
写入数据,最后通过Commit
等待消费者消费数据。
bool SharedMemRingBuffer::Commit(int user_id, int64_t index)// index 一定要大于 cursor ,否则永远不会成功!!!
{
//cursor 가 index 바로 앞인 경우만 성공한다.
int64_t expected = index -1 ;
while (true) {
// 等待其他线程消费数据,数据消费后才会更新 cursor
// 当消费者消费数据时,会更新 cursor ,如果 cursor == expected ,则说明有一个消费者已经消费完数据
// 此时生产者应该通知所有等待的消费者一次性的读取完数据
if ( ring_buffer_status_on_shared_mem_->cursor == expected ) {
ring_buffer_status_on_shared_mem_->cursor = index;
break;
}
std::this_thread::yield();// 等待其他线程将 cursor 累加
}
wait_strategy_->SignalAllWhenBlocking(); // 生产者通知消费者读取数据
return true;
}
消费者等待指定索引可读(核心)
// 等待 index 索引位置可读。实际使用时,消费者应该先根据 id 获取当前索引,然后再加上自己要读的自结束,最终得到 index
// 成功返回当前 cursor ,此时消费者可以读取 [index,cursor] 之间的所有数据
int64_t SharedMemRingBuffer::WaitFor(int user_id, int64_t index)
{
int64_t nCurrentCursor = ring_buffer_status_on_shared_mem_->cursor.load() ;
if( index > nCurrentCursor ) {
//wait strategy
return wait_strategy_->Wait(index);
} else {
return nCurrentCursor ;
}
return 0;
}
消费者更新索引(核心)
// 读取后更新索引
bool SharedMemRingBuffer::CommitRead(int user_id, int64_t index)
{
ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[user_id] = index ; //update
return true;
}
初始化
bool SharedMemRingBuffer::InitRingBuffer(int size /*= DEFAULT_RING_BUFFER_SIZE*/)
{
if(size<= 0) {
DEBUG_ELOG("Error: Invalid size : " << size );
return false;
}
buffer_size_ = size;
if(!ring_buffer_.SetCapacity(size) ) {
DEBUG_ELOG("Error: Invalid size : " << size );
return false;
}
//shared memory consists of : RingBufferStatusOnSharedMem + actual data
total_mem_size_ = sizeof(_RingBufferStatusOnSharedMem_) + (sizeof(OneBufferData) * size) ;
bool bSharedMemFirstCreated = false;
if(! shared_mem_mgr_.CreateShMem(123456, total_mem_size_, &bSharedMemFirstCreated )) {
DEBUG_ELOG("Error: CreateShMem failed :" <<shared_mem_mgr_.GetLastErrMsg());
return false;
}
if(! shared_mem_mgr_.AttachShMem()) {
DEBUG_ELOG("Error: AttachShMem failed :"<<shared_mem_mgr_.GetLastErrMsg());
return false;
}
ring_buffer_status_on_shared_mem_ = (RingBufferStatusOnSharedMem*)shared_mem_mgr_. GetShMemStartAddr();
if(bSharedMemFirstCreated) {
ResetRingBufferState();
}
char* pBufferStart = (char*)shared_mem_mgr_.GetShMemStartAddr() + sizeof(_RingBufferStatusOnSharedMem_) ;
// 将共享内存映射到环形队列上
for(int i = 0; i < size; i++) {
ring_buffer_[i] = (OneBufferData*) ( (char*)pBufferStart + (sizeof(OneBufferData)*i) ) ;
}
ring_buffer_status_on_shared_mem_->buffer_size = size;
ring_buffer_status_on_shared_mem_->total_mem_size = total_mem_size_;
//---------------------------------------------
//wait strategy
if(wait_strategy_type_ == BLOCKING_WAIT ) {
DEBUG_LOG("Wait Strategy :BLOCKING_WAIT" );
wait_strategy_ = new BlockingWaitStrategy(ring_buffer_status_on_shared_mem_);
} else if(wait_strategy_type_ == YIELDING_WAIT ) {
DEBUG_LOG("Wait Strategy :YIELDING_WAIT" );
wait_strategy_ = new YieldingWaitStrategy(ring_buffer_status_on_shared_mem_);
} else if(wait_strategy_type_ == SLEEPING_WAIT ) {
DEBUG_LOG("Wait Strategy :SLEEPING_WAIT" );
wait_strategy_ = new SleepingWaitStrategy(ring_buffer_status_on_shared_mem_);
} else {
DEBUG_ELOG("Error: Invalid Wait Strategy :" << wait_strategy_type_);
return false;
}
return true;
}
重置管理器状态
void SharedMemRingBuffer::ResetRingBufferState()
{
if(ring_buffer_status_on_shared_mem_ == NULL ) {
DEBUG_LOG("call InitRingBuffer first !");
return;
}
DEBUG_LOG("---");
ring_buffer_status_on_shared_mem_->cursor.store(-1);
ring_buffer_status_on_shared_mem_->next.store(-1);
ring_buffer_status_on_shared_mem_->registered_producer_count.store(0);
ring_buffer_status_on_shared_mem_->registered_consumer_count.store(0);
total_mem_size_ = 0;
for(int i = 0; i < MAX_CONSUMER; i++) {
ring_buffer_status_on_shared_mem_->array_of_consumer_indexes[i] = -1;
}
//for blocking wait strategy : shared mutex, shared cond var
pthread_mutexattr_t mutexAttr;
pthread_mutexattr_init(&mutexAttr);
pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init( & ring_buffer_status_on_shared_mem_->mtx_lock, &mutexAttr);
pthread_condattr_t condAttr;
pthread_condattr_init(&condAttr);
pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED);
pthread_cond_init( & ring_buffer_status_on_shared_mem_->cond_var, &condAttr);
}
disruptor 等待策略
disruptor
有三种等待策略,前面已经介绍过了,这里简单的列一下源代码。
///////////////////////////////////////////////////////////////////////////////标签:02,disruptor,buffer,代码,mem,_-,int64,shared,ring From: https://blog.51cto.com/u_6650004/6008820
int64_t Wait(int64_t nIndex) {
int nCounter = 100;
while (true) {
int64_t nCurrentCursor = pRingBufferStatusOnSharedMem_->cursor.load() ;
if( nIndex > nCurrentCursor ) {
//spins --> yield
if(nCounter ==0) {
std::this_thread::yield();
} else {
nCounter--;
}
continue;
} else {
return nCurrentCursor;
}
}//while
}
void SignalAllWhenBlocking() { //blocking strategy only
}
///////////////////////////////////////////////////////////////////////////////
int64_t Wait(int64_t nIndex) {
int nCounter = 200;
while (true) {
int64_t nCurrentCursor = pRingBufferStatusOnSharedMem_->cursor.load() ;
if( nIndex > nCurrentCursor ) {
//spins --> yield --> sleep
if(nCounter > 100) {
nCounter--;
} else if(nCounter > 0) {
std::this_thread::yield();
nCounter--;
} else {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
}
continue;
} else {
return nCurrentCursor;
}
}//while
}
void SignalAllWhenBlocking() { //blocking strategy only
}
///////////////////////////////////////////////////////////////////////////////
int64_t Wait(int64_t nIndex) {
while (true) {
int64_t nCurrentCursor = pRingBufferStatusOnSharedMem_->cursor.load() ;
if( nIndex > nCurrentCursor ) {
struct timespec timeToWait;
struct timeval now;
gettimeofday(&now,NULL);
timeToWait.tv_sec = now.tv_sec;
timeToWait.tv_nsec = now.tv_usec * 1000;
timeToWait.tv_sec += 1;
//timeToWait.tv_nsec += 100;
pthread_mutex_lock(&(pRingBufferStatusOnSharedMem_->mtx_lock) );
pthread_cond_timedwait(& (pRingBufferStatusOnSharedMem_->cond_var),
&(pRingBufferStatusOnSharedMem_->mtx_lock),
& timeToWait );
pthread_mutex_unlock(&(pRingBufferStatusOnSharedMem_->mtx_lock));
} else {
return nCurrentCursor;
}
}//while
}
void SignalAllWhenBlocking() { //blocking strategy only
//생산자가 Commit 시 호출됨.
pthread_mutex_lock(&(pRingBufferStatusOnSharedMem_->mtx_lock));
pthread_cond_broadcast(&(pRingBufferStatusOnSharedMem_->cond_var));
pthread_mutex_unlock(&(pRingBufferStatusOnSharedMem_->mtx_lock));
}