首页 > 系统相关 >高性能内存队列Disruptor

高性能内存队列Disruptor

时间:2023-02-03 14:07:03浏览次数:49  
标签:Disruptor 加锁 disruptor 队列 Sequence 线程 内存

1 背景

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。


内存队列 使用场景一般在系统内部,提高在高并发的情况下系统的性能,一般作用于线程间的消息传递

分布式消息队列 使用场景一般在系统和系统间的消息传递,吞吐量高,也适用于消息流数据处理的中间件

2 JAVA内存队列

介绍Disruptor之前,先介绍一下常用线程安全的内置队列。Java的内置队列下表所示:

队列

有界性

数据结构

ArrayBlockingQueue

bounded

加锁

arraylist

LinkedBlockingQueue

optionally-bounded

加锁

linkedlist

ConcurrentLinkedQueue

unbounded

无锁

linkedlist

LinkedTransferQueue

unbounded

无锁

linkedlist

PriorityBlockingQueue

unbounded

加锁

heap

DelayQueue

unbounded

加锁

heap

队列的底层一般分成三种:数组、链表和堆

堆一般情况下是为了实现带有优先级特性的队列,暂不考虑

  • 基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全
  • 基于链表的线程安全队列分成
  • LinkedBlockingQueue 通过锁的方式来实现线程安全
  • ConcurrentLinkedQueue 上面表格中的LinkedTransferQueue都是通过原子变量compare and swap这种不加锁的方式来实现

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内) 而加锁的方式,可以实现有界队列,在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue

在实际使用过程中,ArrayBlockingQueue会因为加锁和伪共享等出现严重的性能问题

3 Disruptor原理

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

Ring Buffer 如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

高性能内存队列Disruptor_disruptor

Sequence  Disruptor 通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。 (注:这是 Disruptor 实现高性能的关键点之一,网上关于伪共享问题的介绍已经汗牛充栋,在此不再赘述)。

Sequencer  Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

Sequence Barrier 用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

高性能内存队列Disruptor_事件处理_02

Wait Strategy 定义 Consumer 如何进行等待下一个事件的策略。 (注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)

Event 在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

EventProcessor EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

EventHandler Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

Producer 即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

高性能内存队列Disruptor_disruptor_03

4 代码样例

代码实现的功能:每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。

以下代码基于3.3.4版本的Disruptor包

/**
* @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
*/
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;

public class DisruptorMain
{
public static void main(String[] args) throws Exception
{
// 队列中的元素
class Element {

private int value;

public int get(){
return value;
}

public void set(int value){
this.value= value;
}

}

// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};

// RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};

// 处理Event的handler
EventHandler<Element> handler = new EventHandler<Element>(){
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch)
{
System.out.println("Element: " + element.get());
}
};

// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();

// 指定RingBuffer的大小
int bufferSize = 16;

// 创建disruptor,采用单生产者模式
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

// 设置EventHandler
disruptor.handleEventsWith(handler);

// 启动disruptor的线程
disruptor.start();

RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

for (int l = 0; true; l++)
{
// 获取下一个可用位置的下标
long sequence = ringBuffer.next();
try
{
// 返回可用位置的元素
Element event = ringBuffer.get(sequence);
// 设置该位置元素的值
event.set(l);
}
finally
{
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
}

5 应用场景

5.1 Log4j2异步日志打印

log4j2支持日志的异步打印,日志异步输出的好处在于,使用单独的进程来执行日志打印的功能,可以提高日志执行效率,减少日志功能对正常业务的影响。


异步日志在程序的classpath需要加载disruptor-3.0.0.jar或者更高的版本。


5.2 海量job处理

现在有8个库1024张表,大量的job需要处理,每时每刻任务都在海量增加


启动8台机器,每台机器扫描一个库的待执行job,共128个表需要扫描,这里可以启动128个线程去并发扫描,每查出来一次,立马通过disruptor发布出去,另外一端监听到发布的任务之后调用任务处理接口进行处理,就算有任务执行异常,也不会阻塞其它的任务,可以边发布边处理,最大程度提升任务处理能力。


一直积压的任务有旁路报警机制,每次执行失败的job执行次数+1,当大于指定阈值则报警。


一旦无法放入disruptor就会报警,表明队列已满,处理不过来了,得扩容下游处理任务的机器


disruptor的消费末端通过线程池严格控制消费能力,不会出现任务生产过快消费不过来的情况


如果有多种不同类型的任务要处理,可以初始化多个不同size的ringbuffer去处理,定义不同的evenHandler


局限性应该是它是个内存队列,处理不了分布式场景的

标签:Disruptor,加锁,disruptor,队列,Sequence,线程,内存
From: https://blog.51cto.com/u_13222507/6035779

相关文章

  • 双端队列
    题目描述:达达现在碰到了一个棘手的问题,有N个整数需要排序。达达手头能用的工具就是若干个双端队列。她从1到N需要依次处理这N个数,对于每个数,达达能做以下两件事:1.新建一个双......
  • c分配内存底层函数 realloc
    realloc(void*__ptr,size_t__size):更改已经配置的内存空间,即更改由malloc()函数分配的内存空间的大小。如果将分配的内存减少,realloc仅仅是改变索引的信息。如果是将......
  • springcloud:安装rabbitmq并配置延迟队列插件
    0.引言本期主要讲解如何利用docker快速安装rabbitmq并且配置延迟队列插件1.docker安装1.1安装rabbitmq1、下载镜像dockerpullrabbitmq2、安装镜像dockerrun-d--host......
  • 栈和队列
    栈和队列都是通过动态集合来存储数据,在栈和队列中添加和删除数据都是预先设定的。在栈(Stack)中,被删除的元素是最近添加的元素,所以栈的实现方式是后进先出(Last-in,First-out......
  • 设置和修改Linux的swap分区大小解决内存不足问题
    一、创建1.查看当前分区情况free-m2.增加swap大小,2G左右ddif=/dev/zeroof=/var/swapbs=1024count=20480003.设置交换文件mkswap/var/swap4.立......
  • 《程序是怎样跑起来的》·第四章 熟练使用有棱有角的内存
    重点:计算机是进行数据处理的设备,而程序表示的就是处理顺序和数据结构。由于处理对象数据是存储在内存和磁盘上的,因此程序必须能自由地使用内存和磁盘。因此,大家有必要对内......
  • 决战圣地玛丽乔亚Day06-- MQ消息队列
    MQ消息队列。目前市面上流行的MQ有:RocketMQ、kafka、RabbitMQ、ActiveMQ比较一下这几个消息队列一般消息队列的作用和使用场景是:1.解耦。(本来A要调很多接口,现在A直接把......
  • 【小记】如果 golang 内存不够了怎么办
    在看redis1.0源码时,总会看到需要申请内存的地方,如果申请不到需要大的内存就会返回NULL,然后在调用层抛出oom。比如listDup中在复制特殊value或者加入新节点时都有......
  • 内存分析 - 初始
    内存分析-初始内存内存栈堆栈、堆//1.声明数组//在栈中创建一个array的名字,这个时候array是空的,没有实际意义//这个时候的array就相当......
  • 【C语言】memset() 内存填充块
    ......