高性能的有界安全内存队列-Disruptor
为什么(WHY)
Java内置队列现状
队列 | 有界性 | 锁 | 数据结构 |
---|---|---|---|
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
队列的底层一般分成三种:数组、链表和堆。
数组
典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;
链表
分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者通过CAS来实现的。
和ConcurrentLinkedQueue一样,LinkedTransferQueue是通过CAS实现的
对volatile类型的变量进行CAS 操作,存在伪共享问题
堆
堆一般情况下是为了实现带有优先级特性的队列,暂时不做介绍
解决问题
优化高吞吐量和低延迟系统的性能
Disruptor 使用无锁的、非阻塞的算法和环形缓冲区来避免锁竞争、内存开销和 GC 压力等问题,提供了一种高效、可扩展的并发编程模型。
简化并发编程的复杂性
Disruptor 提供了一个简单而强大的 API,采用了生产者-消费者模式和 barrier 概念来控制多个线程对共享数据结构的访问,帮助开发者更容易地编写高性能、低延迟的并发代码。
满足特定应用场景的需求
Disruptor 适用于需要处理大量数据和快速响应的应用程序,如金融交易、日志处理、实时数据分析等。它支持多种消费者策略,可以根据具体的业务需求进行灵活配置。
总结:Disruptor是为了满足高性能、低延迟系统中对并发编程的需求而设计的,并且提供了一种优雅的解决方案来处理传统并发编程模型中的问题。
是什么(WHAT)
Disruptor 是一个开源的并发编程框架,专门设计用来在高性能、低延迟系统中处理大量数据和事件。
Disruptor 的核心思想是使用无锁的算法和环形缓冲区来避免锁竞争和内存开销,从而提高系统的性能和响应速度。它提供了一种生产者-消费者模式的实现,通过 barrier 概念控制多个线程对共享数据结构的访问,保证了线程安全。
特点
高吞吐量和低延迟
由于无锁设计和环形缓冲区的使用,Disruptor 可以在不牺牲性能的情况下处理大量数据和事件。
可扩展性
Disruptor 支持多个生产者和多个消费者,并且可以根据需要灵活地配置缓冲区大小和消费者策略。
灵活的消费者策略
Disruptor 提供了多种消费者策略,例如单个消费者、多个消费者、批量消费者等,适用于不同类型的应用场景。
易于使用
Disruptor 提供了一个简单而强大的 API,帮助开发者更容易地编写高性能、低延迟的并发代码。
内存效率
Disruptor 使用预分配的环形缓冲区,减少了内存分配和回收的开销,降低了 GC 压力。
事件驱动
Disruptor 是一个事件驱动的框架,非常适合处理实时数据流和异步事件。
使用场景(WHO/WHEN/WHERE)
生产者消费者场景
单生产者多消费者场景
多生产者单消费者场景
单生产者多消费者场景
多个消费者串行消费场景
菱形方式执行场景
链式并行执行场景
多组消费者相互隔离场景
多组消费者航道执行模式
发布订阅场景
观察者模式的一种实现,实现发布订阅模式
怎么做(HOW)
前置知识
三级缓存
L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小
L1 Cache
每个核上都有一个L1 Cache(准确地说是两个,一个存数据 L1d Cache,一个存指令 L1i Cache);
L2 Cache
二级缓存就是一级缓存的存储器:
一级缓存制造成本很高因此它的容量有限,二级缓存的作用就是存储那些CPU处理时需要用到、一级缓存又无法存储的数据。
L3 Cache
三级缓存和内存可以看作是二级缓存的存储器,它们的容量递增,但单位制造成本却递减。
L3 Cache和L1,L2 Cache有着本质的区别。L1和L2 Cache都是每个CPU core独立拥有一个,而L3 Cache是几个Cores共享的,可以认为是一个更小但是更快的内存。
缓存行
为了提高IO效率,CPU每次从内存读取数据,是一批一批去读取的,这一批数据,也叫Cache Line(缓存行)。
空间的局部性原理
一般一行缓存行有64字节。intel处理器的缓存行是64字节。目前主流的CPU Cache的Cache Line大小都是64Bytes。假设我们有一个512 Bytes 的一级缓存,那么按照64 Bytes 的缓存单位大小来算,这个一级缓存所能存放的缓存个数就是512/64 = 8个。
Cache Line可以简单的理解为CPU Cache中的最小缓存单位。这些CPU Cache的写回和加载,都不是以一个变量作为单位。这些都是以整个Cache Line作为单位。
如果一个常量和变量放在一行,那么变量的更新,也会影响常量的使用:
伪/错共享(False Sharing)问题
在多线程程序的执行过程中,存在着一种情况,多个需要频繁修改的变量存在同一个缓存行当中。如果需要修改“共享同一个缓存行的其中一个变量”,该行中其他变量的状态 就会失效,甚至进行一致性保护。
本质
对缓存行中的单个变量进行修改了,导致整个缓存行其他不相关的数据也就失效了,需要从主存重新加载
如果其中有volatile修饰的变量,需要保证线程可见性的变量,还需要进入缓存与数据一致性的保障流程, 如mesi协议的数据一致性保障 用了其他变量的 Core的缓存一致性。
一个CPU核心在加载一个缓存行时要执行上百条指令。如果一个核心要等待另外一个核心来重新加载缓存行,那么他就必须等在那里,称之为stall(停止运转)。
解决方案
通过填充(Padding)数据的形式,来保证本应有可能位于同一个缓存行的两个变量,在被多线程访问时必定位于不同的缓存行。简单的说,就是以空间换时间,使用占位字节,将变量的所在的缓冲行塞满。
java8中解决方案
JAVA 8中添加了一个@Contended的注解,添加这个的注解,将会在自动进行缓存行填充。
执行时,必须加上虚拟机参数-XX:-RestrictContended,@Contended注释才会生效。
LongAdder以及Striped64使用了@Contended注解解决伪共享问题
实践
maven依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
Event和工厂
@Data
public class LogEvent {
private long value;
}
public class LogEventFactory implements EventFactory {
@Override
public Object newInstance() {
return new LogEvent();
}
}
消费者
public class LogEventHandler implements EventHandler<LogEvent> {
@Override
public void onEvent(LogEvent logEvent, long l, boolean b) {
System.out.println(logEvent.getValue());
}
}
生产者
public class LogEventProducer {
private final RingBuffer<LogEvent> ringBuffer;
public LogEventProducer(RingBuffer<LogEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* onData用来发布事件,每调用一次就发布一次事件事件
* 它的参数会通过事件传递给消费者
*/
public void onData(long data) {
// step1:通过从 环形队列中 获取 序号
//可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
long sequence = ringBuffer.next();
try {
//step2: 通过序号获取 对应的 事件对象, 将数据填充到 事件对象,
//用上面的索引,取出一个空的事件用于填充
LogEvent event = ringBuffer.get(sequence);// for the sequence
event.setValue(data);
} finally {
//step3: 再通过 序号将 事件对象 发布出去。
//发布事件
ringBuffer.publish(sequence);
}
}
}
step1: 使用RingBuffer.next()获取下一个事件槽
step2: 通过序号获取 对应的 事件对象, 将数据填充到 事件对象
step3: 再通过 序号将 事件对象 发布出去
发布事件时要使用try/finnally保证事件一定会被发布,否则会引起Disruptor状态的混乱,尤其多个事件生产者的情况下会导致事件消费者失速,从而不得不重启恢复。
事件转换器
public class LogEventProducerWithTranslator {
//一个translator可以看做一个事件初始化器,publicEvent方法会调用它
//填充Event
private static final EventTranslatorOneArg<LogEvent, Long> TRANSLATOR =
new EventTranslatorOneArg<LogEvent, Long>() {
public void translateTo(LogEvent event, long sequence, Long data) {
event.setValue(data);
}
};
private final RingBuffer<LogEvent> ringBuffer;
public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(Long data) {
ringBuffer.publishEvent(TRANSLATOR, data);
}
}
使用事件转换器,省了从环形队列获取序号,然后拿到事件填充数据,再发布序号中的第②步骤,Disruptor提供了不同的接口去产生一个Translator对象:
① EventTranslator
② EventTranslatorOneArg
③ EventTranslatorTwoArg
测试
public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
// 消费者线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200,
500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(200));
// 事件工厂
LogEventFactory eventFactory = new LogEventFactory();
// 环形队列大小,2的指数
int bufferSize = 1024;
// 构造 分裂者(事件分发者)
Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(eventFactory, bufferSize, executor);
// 连接 消费者 处理器
disruptor.handleEventsWith(new LogEventHandler());
// 开启 分裂者(事件分发)
disruptor.start();
// 获取环形队列,用于生产 事件
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
LogEventProducer producer = new LogEventProducer(ringBuffer);
for (long i = 0; true; i++) {
//发布事件
producer.onData(i);
Thread.sleep(1000);
}
}
}
通过Java8 Lambda使用Disruptor
public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
// 消费者线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200,
500L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(200));
// 环形队列大小,2的指数
int bufferSize = 1024;
// 构造 分裂者 (事件分发者)
Disruptor<LogEvent> disruptor = new Disruptor<>(LogEvent::new, bufferSize, executor);
// 连接 消费者 处理器
// 可以使用lambda来注册一个EventHandler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue()));
// 开启 分裂者(事件分发)
disruptor.start();
// 获取环形队列,用于生产 事件
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
LogEventProducerWithTranslator producer = new LogEventProducerWithTranslator(ringBuffer);
for (long i = 0; true; i++) {
//发布事件
producer.onData(i);
Thread.sleep(1000);
}
}
}
经验总结
① 事件工厂(Event Factory)定义了如何实例化事件(Event),Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。
② ringBuffer这个数组的大小,一般根据业务指定成2的指数倍。
③ 消费者线程池,事件的处理是在构造的线程池里来进行处理的。
④ 指定等待策略,Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待Event事件。
原理
Disruptor如何实现高性能
环形缓冲区
使用一个固定大小的环形数组作为消息队列,并限制只有一个线程可以写入。这样做可以消除动态内存分配、锁竞争等开销,提高写入和读取的速度
无锁编程
利用 CAS(Compare-And-Swap)和其他原子操作来实现无锁的并发控制。这种方式避免了传统锁机制带来的上下文切换和等待时间,从而大幅提升了并发性能。
批量处理
将消息处理逻辑封装在事件处理器中,并支持批量处理多个消息。这样可以充分利用多核 CPU 的并行处理能力,提高整体的处理效率。同时,批量处理也减少了方法调用和上下文切换的次数,进一步优化了性能。
其他
零拷贝
在数据传递过程中不需要进行内存拷贝,直接在内存中传递指针,减少了 CPU 和内存的使用。
事件序列化
Disruptor 支持事件序列化,允许在不同线程或进程之间安全地共享数据,提高了系统的可靠性和扩展性。
属性填充
通过添加额外的无用信息,避免伪共享问题
伪共享Disruptor框架解决方案
缓存行填充和读写指针分离
避免多个线程在同一缓存行中读写不同变量
在 Disruptor 中,每个元素(比如 RingBuffer 中的 Entry)都被设计为占用一个完整的缓存行(通常是 64 字节)
在 RingBuffer 中,读取操作由 Sequence 实现,而写入操作则由 RingBuffer 本身管理。这意味着读写操作不会同时访问同一缓存行
使用 volatile 和 CAS 操作
保证了在多个线程并发访问和修改关键字段时,操作的原子性和可见性
避免共享状态和预分配 RingBuffer
减少了线程之间的缓存共享和内存移动,进一步提高了并发性能
RingBuffer 是预先分配的,不会在运行时进行动态扩展或缩小。这样可以确保 RingBuffer 的元素不会在内存中移动,减少了缓存失效的风险
RingBuffer环形队列
RingBuffer是一个环(首尾相连的环),用做在不同上下文(线程)间传递数据的buffer。RingBuffer拥有一个序号,这个序号指向数组中下一个可用元素。无论是生产者向缓冲区里提交任务,还是消费者从缓冲区里获取任务执行,都使用CAS操作。
优点
简化了多线程同步的复杂度
链表使用两个指针head和tail来管理这个队列,会导致指针冲突问题和"伪共享"问题。
① 环形队列使用CAS操作来更新指针,避免了指针冲突问题。
② 环形队列只有一个指针,只通过一个指针来实现出列和入列操作,不会出现"伪共享"问题。
减少了系统对内存空间管理的压力
数组不像链表,Java会定期回收链表中一些不再引用的对象,而数组不会出现空间的新分配和回收问题。
比较BlockingQueue
① Disruptor队列中同一个事件可以有多个消费者,消费者之间既可以并行处理,也可以形成依赖图相互依赖,按照先后次序进行处理
② Disruptor可以预分配用于存储事件内容的内存空间
③ Disruptor使用极度优化和无锁的设计实现极高性能的目标
注:
disruptor.shutdown(): 关闭Disruptor方法会阻塞,直至所有的事件都得到处理
executor.shutdown():关闭Disruptor使用的线程池,如果线程池需要关闭,必须进行手动关闭, Disruptor在shutdown时不会自动关闭使用的线程池
源码分析
概念
Ring Buffer
从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
Sequence
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。
Sequence采用缓存行填充的方式对long类型的一层包装,避免了伪共享问题。另外,Sequence通过cas避免了锁的开销。
Sequencer
Disruptor的核心, 生产者与缓存RingBuffer之间的桥梁
此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier
消费者 与 消费者 直接的隔离屏障。
消费者 之间,并不是通过 RingBuffer 进行加锁互斥 隔离,而是 通过 Sequence Barrier 来管理依赖次序关系,从而能减少RingBuffer上的并发冲突;
在一定程度上,Sequence Barrier类似aqs同步队列
① 保持对RingBuffer的Sequence和Consumer依赖的其它Consumer的Sequence的引用。
② 定义了Consumer是否还有可处理的事件的逻辑。
Wait Strategy
定义 Consumer 如何进行等待下一个事件的策略。
① BlockingWaitStrategy(默认)
最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;
使用一个锁和条件变量来控制线程的执行和等待(Java基本的同步方法)
② SleepingWaitStrategy
性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景;
循环等待并且在循环中间调用LockSupport.parkNanos(1)来睡眠,(在Linux系统上面睡眠时间60µs).
优点在于生产线程只需要计数,而不执行任何指令。并且没有条件变量的消耗。但是,事件对象从生产者到消费者传递的延迟变大了。
③ YieldingWaitStrategy
性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;。
循环等待sequence增加到合适的值, 循环中调用Thread.yield()允许其他准备好的线程执行。
如果需要高性能而且事件消费者线程比逻辑内核少的时候,推荐使用YieldingWaitStrategy策略。
例如:在开启超线程的时候。
④ BusySpinW4aitStrategy
BusySpinWaitStrategy是性能最高的等待策略,同时也是对部署环境要求最高的策略。
这个性能最好用在事件处理线程比物理内核数目还要小的时候。例如:在禁用超线程技术的时候。
Event
生产者和消费者之间进行交换的数据被称为事件(Event)。
EventProcessor
事件处理器,是消费者线程池Executor的调度单元,
EventProcessor是对事件业务处理EventHandler与异常处理ExceptionHandler等的一层封装;
EventProcessor持有特定消费者(Consumer)的 Sequence,并提供事件循环(Event Loop),用于调用业务事件处理实现EventHandler
EventHandler
Disruptor定义的事件处理接口,由用户实现,用于处理事件,是Consumer的真正实现。
Producer
泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型
RingBuffer
基于数组的缓存实现,也是创建sequencer与定义WaitStrategy的入口;
Disruptor
Disruptor的使用入口。
持有RingBuffer、消费者线程池Executor、消费者仓库ConsumerRepository等引用。
无锁架构
通过序号屏障对依赖关系的管理,RingBuffer实现了事件缓存的无锁架构。
Disruptor使用环形缓冲区RingBuffer作为共享数据的媒介,生产者通过Sequencer控制RingBuffer,以及唤醒等待事件的消费者,消费者通过SequenceBarrier监听RingBuffer的可消费事件。
Disruptor中,生产者与Sequencer有关系,由生产者通过Sequencer控制RingBuffer的写入。
RingBuffer是Disruptor高性能的一个亮点。RingBuffer就是一个大数组,事件以循环覆盖的方式写入。
与常规RingBuffer拥有2个首尾指针的方式不同,Disruptor的RingBuffer只有一个指针(或称序号),指向数组下一个可写入的位置,该序号在Disruptor源码中就是Sequencer中的cursor,
如何管理消费者和生产者之间的依赖关系呢?
通过SequenceBarrier 进行依赖管理,
消费者的 processer,通过 SequenceBarrier 获取生产者的 生产 序号
如何管理消费者与消费者之间的依赖关系呢?
每个消费者拥有各自独立的事件序号Sequence,消费者之间不通过Sequence在共享竞态,或者说依赖管理。
消费者与消费者之间的依赖关系是,通过SequenceBarrier 进行依赖管理。
如何避免未消费事件的写入覆盖呢?
生产者的 Sequencer需要监听所有消费者的消息处理进度,也就是 gatingSequences。
核心类Sequence和Sequencer
Sequence
Sequence的内部实现主要是 volatile long, 支持以下特性
① CAS 更新
② order writes (Store/Store barrier,改动不保证立即可见) vs volatile writes (Store/Load barrier,改动保证立即可见)
③ 在 volatile 字段 附近添加 padding 解决伪共享问题
Sequencer
Sequencer 负责在生产者和消费者之间快速、正确地传递数据的序号。
生产者发布 event 的时候首先需要预定一个 sequence,Sequencer 就是计算和发布 sequence 的。
SingleProducerSequencer
发布事件的步骤:
① 通过 Sequencer.next(n) 来预定下面 n 个可以写入的位置序号
② 根据序号获取事件,然后修改事件数据,然后发布 event。
gatingSequences
为解决数据覆盖的问题,Sequencer在内部维护了一个gatingSequences数组
volatile Sequence[] gatingSequences = new Sequence[0];
gatingSequences数据里边,记录的是消费者的Sequence,每个消费者会维护一个自己的 Sequence对象,来记录自己已经消费到的序例位置。
通过访问gatingSequences,Sequencer可以得知消费的最慢的消费者消费到了哪个位置。如果之前的event还有消费者没有消费,这时SingleProducerSequencer会等待并自旋。等到 SingleProducerSequencer自旋到下一个位置所有人都消费过的时候,它就可以从next方法中返回,生产者拿着sequence就可以继续去发布。
消费者只能落后生产者一圈,不然就已经存在数据覆盖了
MultiProducerSequencer
多个生产者的场合使用
区别
1、数据结构上多出来availableBuffer,用来记录RingBuffer上哪些位置有数据可以读。
2、有多个publisher同时访问Sequencer.next(n)方法,所以在确定最终位置的时候用了一个CAS操作,如果失败了就自旋再来一次。
2、publish(final long sequence)方法,setAvailable去设置availableBuffer的状态位。
根据calculateAvailabilityFlag(sequence)方法计算出来availabilityFlag,是该 sequence环绕RingBuffer的圈数。
在单个生产者的场景下,publishEvent 的时候才会推进cursor,所以只要 sequence<=cursor,就说明数据是可消费的。
多个生产者的场景下,在next(n)方法中,就已经通过 cursor.compareAndSet(current, next) 移动cursor了,此时event还没有publish,所以cursor所在的位置不能保证event一定可用。
消费者
ConsumerRepository
通过ConsumerRepository来管理所有消费者,主要维护以下结构
① EventHandler 到 消费者处理器 信息的映射,用于信息查询
② Sequence 到消费者信息的映射
ConsumerInfo 和 Sequence 是 一对多 关系
ConsumerInfo
ConsumerRepository用于维护Disruptor的所有消费者的信息,管理的集合类里主要有ConsumerInfo接口。
EventProcessorInfo单事件处理器消费者信息
一个单线程的消费者(只有一个EventProcessor), 代理EventHandler,管理处理事件以外的其他事情(如:拉取事件,等待事件...)
WorkerPoolInfo线程池消费者信息对象/工作者池信息
WorkPool整体是一个消费者,是一个多线程的消费者,每个生产者publish的事件只会被WorkerPool里的某一个WorkProcessor消费。
WorkerPoolInfo包含了一个WorkerPool类型的成员,WorkerPool和处理器没有任何继承关系,是一个独立的类
消费者处理器
handler和processer都可以翻译为“处理器”,但是process侧重于 处理执行,实际执行,
processer与cpu有关系,一个processer事件处理器关联一个执行线程,而handle侧重于 业务处理器,表示用户逻辑的处理, process表示 handler 的执行过程。handle和process 的关系,类似于 程序 与进程的关系
主要的消费者处理器类型
BatchEventProcessor
单线程批处理消费者,同一批次添加的消费者,会消费每一个event。
在使用BatchEventProcessor时,通过Disruptor#handleEventsWith方法可以获取一个EventHandlerGroup,再通过EventHandlerGroup的and和then方法可以构建一个复杂的消费者链。
BatchEventProcessor可以处理超时,可以处理中断,可以通过用户实现的异常处理类处理异常,同时,发生异常之后再次启动,不会漏消费,也不会重复消费。
IdentityHashMap和HashMap最大的不同,就是使用==而不是equals比较key。
createEventProcessors
使用BatchEventProcessor构建消费者链时的逻辑都在createEventProcessors方法中
createEventProcessors方法接收两个参数
EventHandlerGroup<T createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T[] eventHandlers)
① barrierSequences表示当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组。
② eventHandlers代表事件消费逻辑的EventHandler数组。Disruptor为每个EventHandler实现类都创建了一个对应的BatchEventProcessor。
构建BatchEventProcessor时需要以下传入三个构造参数
① dataProvider是数据存储结构如RingBuffer;
② sequenceBarrier用于跟踪生产者游标,协调数据处理;
③ eventHandler是用户实现的事件处理器,也就是实际的消费者。
EventHandlerGroup
EventHandlerGroup表示一组事件消费者,内部持有了Disruptor类实例disruptor,其大部分功能都是通过调用disruptor实现
WorkProcessor
消费者池,同一批次添加的消费者,每个event只会被其中一个processer 消费。
WorkProcessor 通过 WorkerPool 进行管理
SequenceBarrier协调屏障
SequenceBarrier用来跟踪发布者(publisher)的游标(cursor)和事件处理者(EventProcessor)的序列号(sequence)。
两种依赖关系
① 生产者与消费者之间的依赖关系: sequenceBarrier的seqquencer
② 消费者与消费者之间的依赖关系: sequenceBarrier的dependentSequence
生产者对最慢+末端消费者直接的依赖关系,使用门禁序号gatingSequence来管理
消除锁和CAS操作
Disruptor中,通过联合使用SequenceBarrier和Sequence, 协调和管理消费者和生产者之间的处理关系,避免了锁和CAS操作,Disruptor中的各个消费者和生产者持有自己的序号Sequence。
序号Sequence需要满足以下条件:
① 消费者的序号Sequence的数值必须小于生产者的序号Sequence的数值
② 消费者的序号Sequence的数值必须小于依赖关系中前置的消费者的序号Sequence的数值
③ 生产者的序号Sequence的数值不能大于消费者正在消费的序号Sequence的数值,防止生产者速度过快,将还没有来得及消费的事件消息覆盖
条件一和条件二在SequenceBarrier中的waitFor() 方法中实现: 条件三是针对生产者建立的SequenceBarrier,逻辑判定发生在生产者从RingBuffer获取下一个可用的entry时,RingBuffer会将获取下一个可用的entry委托给Sequencer处理:
ProcessingSequenceBarrier
SequenceBarrier只有一个重要的实现类,就是ProcessingSequenceBarrier。
ProcessingSequenceBarrier有以下几个重要的属性:
① 生产者Sequencer,
② 消费定位cursorSequence,
③ 等待策略waitStrategy ,
④ 还有一组依赖sequence:dependentSequence
其他
RingBuffer预分配内存
RingBuffer使用数组Object[] entries来存储元素:
1、初始化RingBuffer时,会将所有数组元素entries的指定为特定的事件Event参数,此时Event中的detail属性为null
2、生产者向RingBuffer写入消息时 ,RingBuffer不是直接将数组元素entries指向Event对象,而是先获取Event对象,更改Event对象中的detail属性
3、消费者在消费时,也是从RingBuffer中读取Event, 读取Event对象中的detail属性
4、由此可见,在生产和消费过程中 ,RingBuffer中的数组元素entries没有发生任何变化,没有产生临时对象,数组中的元素一直存活,直到RingBuffer消亡
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
// 使用工厂方法初始化数组中的entries元素
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
通过以上方式,可以最小化JVM中的垃圾回收GC的频率,提升性能
缓存行填充
Disruptor RingBuffer(环形缓冲区)定义了RingBufferFields类,里面有indexMask和其他几个变量存放RingBuffer的内部状态信息。
Disruptor利用了缓存行填充,在 RingBufferFields里面定义的变量的前后,分别定义了7个long类型的变量:
前面7个来自继承的 RingBufferPad 类
后面7个直接定义在 RingBuffer 类
这14个变量无任何实际用途。我们既不读他们,也不写他们。而RingBufferFields里面定义的这些变量都是final,第一次写入后就不会再修改。
所以,一旦它被加载到CPU Cache后,只要被频繁读取访问,就不会再被换出Cache。这意味着,对于该值的读取速度,会一直是CPU Cache的访问速度,而非内存的访问速度。
对于大小为64个字节的缓存行来说,如果缓存行大小大于64个字节,那么还是会出现伪共享问题,但是毕竟非64个字节的Cache Line并不是当前的主流