1 背景
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
内存队列 使用场景一般在系统内部,提高在高并发的情况下系统的性能,一般作用于线程间的消息传递
分布式消息队列 使用场景一般在系统和系统间的消息传递,吞吐量高,也适用于消息流数据处理的中间件
2 JAVA内存队列
介绍Disruptor之前,先介绍一下常用线程安全的内置队列。Java的内置队列下表所示:
队列 | 有界性 | 锁 | 数据结构 |
ArrayBlockingQueue | bounded | 加锁 | arraylist |
LinkedBlockingQueue | optionally-bounded | 加锁 | linkedlist |
ConcurrentLinkedQueue | unbounded | 无锁 | linkedlist |
LinkedTransferQueue | unbounded | 无锁 | linkedlist |
PriorityBlockingQueue | unbounded | 加锁 | heap |
DelayQueue | unbounded | 加锁 | heap |
队列的底层一般分成三种:数组、链表和堆
堆一般情况下是为了实现带有优先级特性的队列,暂不考虑
- 基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全
- 基于链表的线程安全队列分成
- LinkedBlockingQueue 通过锁的方式来实现线程安全
- ConcurrentLinkedQueue 上面表格中的LinkedTransferQueue都是通过原子变量compare and swap这种不加锁的方式来实现
通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内) 而加锁的方式,可以实现有界队列,在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue
在实际使用过程中,ArrayBlockingQueue会因为加锁和伪共享等出现严重的性能问题
3 Disruptor原理
先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
Ring Buffer 如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
Sequence Disruptor 通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。 (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。
Sequencer Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier 用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
Wait Strategy 定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
Event 在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
EventProcessor EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
EventHandler Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
Producer 即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。
4 代码样例
代码实现的功能:每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。
以下代码基于3.3.4版本的Disruptor包
/**
* @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
*/
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
public class DisruptorMain
{
public static void main(String[] args) throws Exception
{
// 队列中的元素
class Element {
private int value;
public int get(){
return value;
}
public void set(int value){
this.value= value;
}
}
// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
// RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
// 处理Event的handler
EventHandler<Element> handler = new EventHandler<Element>(){
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch)
{
System.out.println("Element: " + element.get());
}
};
// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
// 指定RingBuffer的大小
int bufferSize = 16;
// 创建disruptor,采用单生产者模式
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
// 设置EventHandler
disruptor.handleEventsWith(handler);
// 启动disruptor的线程
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for (int l = 0; true; l++)
{
// 获取下一个可用位置的下标
long sequence = ringBuffer.next();
try
{
// 返回可用位置的元素
Element event = ringBuffer.get(sequence);
// 设置该位置元素的值
event.set(l);
}
finally
{
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
}
5 应用场景
5.1 Log4j2异步日志打印
log4j2支持日志的异步打印,日志异步输出的好处在于,使用单独的进程来执行日志打印的功能,可以提高日志执行效率,减少日志功能对正常业务的影响。
异步日志在程序的classpath需要加载disruptor-3.0.0.jar或者更高的版本。
5.2 海量job处理
现在有8个库1024张表,大量的job需要处理,每时每刻任务都在海量增加
启动8台机器,每台机器扫描一个库的待执行job,共128个表需要扫描,这里可以启动128个线程去并发扫描,每查出来一次,立马通过disruptor发布出去,另外一端监听到发布的任务之后调用任务处理接口进行处理,就算有任务执行异常,也不会阻塞其它的任务,可以边发布边处理,最大程度提升任务处理能力。
一直积压的任务有旁路报警机制,每次执行失败的job执行次数+1,当大于指定阈值则报警。
一旦无法放入disruptor就会报警,表明队列已满,处理不过来了,得扩容下游处理任务的机器
disruptor的消费末端通过线程池严格控制消费能力,不会出现任务生产过快消费不过来的情况
如果有多种不同类型的任务要处理,可以初始化多个不同size的ringbuffer去处理,定义不同的evenHandler
局限性应该是它是个内存队列,处理不了分布式场景的
标签:Disruptor,加锁,disruptor,队列,Sequence,线程,内存 From: https://blog.51cto.com/u_13222507/6035779