首页 > 编程语言 >Java集合之Disruptor 介绍

Java集合之Disruptor 介绍

时间:2023-07-04 11:57:55浏览次数:46  
标签:Disruptor Java 1.2 事件 RingBuffer 集合 new public

目录

1 Disruptor

1.1 简介

1.1.1 定义

Disruptor 是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 选择大奖)。

Disruptor 提供的功能类似于 KafkaRocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存),Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题,Disruptor 有个最大的优点就是快

Disruptor被设计用于在生产者消费者producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟
Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到 Disruptor,它可以带来显著的性能提升。其实 Disruptor 与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在并发、缓冲区、生产者—消费者模型、事务处理这些元素的程序来说,Disruptor 提出了一种大幅提升性能(TPS)的方案。

github 地址

Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html

1.1.2 Java中线程安全队列

JDK 中常见的线程安全的队列如下:

队列名字 是否有界
ArrayBlockingQueue 加锁(ReentrantLock) 有界
LinkedBlockingQueue 加锁(ReentrantLock) 有界
LinkedTransferQueue 无锁(CAS) 无界
ConcurrentLinkedQueue 无锁(CAS) 无界

从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。
因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。
Disruptor 就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。

1.1.3 Disruptor 核心概念

Disruptor 核心概念:

  • Event:可以把 Event 理解为存放在队列中等待消费的消息对象。
    Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
  • EventFactory:事件工厂用于生产事件,我们在初始化 Disruptor 类的时候需要用到。
  • EventHandlerEvent 在对应的 Handler 中被处理,你可以将其理解为生产消费者模型中的消费者。
    Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现
  • EventProcessorEventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)
  • Disruptor:事件的生产和消费需要用到 Disruptor 对象。
  • RingBufferRingBuffer(环形数组)用于保存事件
    如其名,环形的缓冲区。曾经 RingBufferDisruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
  • WaitStrategy:等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
  • Producer:生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型
  • ProducerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似)。
  • SequencerSequencerDisruptor 的真正核心。此接口有两个实现类 - SingleProducerSequencerMultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
  • Sequence Disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。
    虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的 CPU 缓存伪共享(Flase Sharing)问题。(注:这是 Disruptor 实现高性能的关键点之一)
  • Sequence Barrier:用于保持对 RingBuffermain published SequenceConsumer 依赖的其它 ConsumerSequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。

在这里插入图片描述

1.2 操作

1.2.1 坐标依赖

pom.xml

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

Gradle:

implementation 'com.lmax:disruptor:3.4.4'

1.2.2 创建事件

我们先来定义一个代表日志事件的类:LogEvent 。

事件中包含了一些和事件相关的属性,比如我们这里定义的 LogEvent 对象中就有一个用来表示日志消息内容的属性:message。

@Data
public class LogEvent {
    private String message;
}

我们这里只是为了演示,实际项目中,一个标准日志事件对象所包含的属性肯定不是只有一个 message

1.2.3 创建事件工厂

创建一个工厂类 LogEventFactory 用来创建 LogEvent 对象。
LogEventFactory 继承 EventFactory 接口并实现了 newInstance() 方法 。

public class LogEventFactory implements EventFactory<LogEvent> {
    @Override
    public LogEvent newInstance() {
        return new LogEvent();
    }
}

1.2.4 创建处理事件Handler--消费者

创建一个用于处理后续发布的事件的类:LogEventHandler 。
LogEventHandler 继承 EventHandler 接口并实现了 onEvent() 方法 。

public class LogEventHandler implements EventHandler<LogEvent> {
    @Override
    public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(logEvent.getMessage());
    }
}

EventHandler 接口的 onEvent() 方法共有 3 个参数:

  • event:待消费/处理的事件
  • sequence:正在处理的事件在环形数组(RingBuffer)中的位置
  • endOfBatch:表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件)

1.2.5 初始化 Disruptor

1.2.5.1 静态类

我们这里定义一个方法用于获取 Disruptor 对象

private static Disruptor<LogEvent> getLogEventDisruptor() {
    // 创建 LogEvent 的工厂
    LogEventFactory logEventFactory = new LogEventFactory();
    // Disruptor 的 RingBuffer 缓存大小
    int bufferSize = 1024 * 1024;
    // 生产者的线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };
    //实例化 Disruptor
    return new Disruptor<>(
            logEventFactory,
            bufferSize,
            threadFactory,
            // 单生产者
            ProducerType.SINGLE,
            // 阻塞等待策略
            new BlockingWaitStrategy());
}

1.2.5.2 配置类

使用配置类的方式

@Configuration
public class MQManager {

    @Bean("messageModel")
    public RingBuffer<LogEvent> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        // 生产者的线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicInteger threadNum = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
        }
    };

        //指定事件工厂
        LogEventFactory factory = new LogEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
        	 bufferSize, 
        	 threadFactory,
        	 ProducerType.SINGLE, 
        	 new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        //Disruptor 的 handleEventsWith 方法来绑定处理事件的 Handler 对象。
       
        disruptor.handleEventsWith(new LogEventHandler ());
      // Disruptor 可以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。
       //就比如下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3 。
       //disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
       
        // 启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }

1.2.5.3 Disruptor 构造函数讲解

Disruptor 的推荐使用的构造函数如下:

public class Disruptor<T> {
  public Disruptor(
          final EventFactory<T> eventFactory,
          final int ringBufferSize,
          final ThreadFactory threadFactory,
          final ProducerType producerType,
          final WaitStrategy waitStrategy)
  {
      this(
          RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
          new BasicExecutor(threadFactory));
  }

......
}

我们需要传递 5 个参数:

  • eventFactory:我们自定义的事件工厂。
  • ringBufferSize:指定 RingBuffer 的容量大小。
  • threadFactory:自定义的线程工厂。Disruptor 的默认线程池是自定义的,我们只需要传入线程工厂即可。
  • producerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。
  • waitStrategy:等待策略,决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。

ProducerType 的源码如下,它是一个包含两个变量的枚举类型

  • SINGLE:单个事件发布者模式,不需要保证线程安全。
  • MULTI:多个事件发布者模式,基于 CAS 来保证线程安全。

WaitStrategy (等待策略)接口的实现类中只有两个方法:

  • waitFor() :等待新事件的到来。
  • signalAllWhenBlocking():唤醒所有等待的消费者。
public interface WaitStrategy
{
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    void signalAllWhenBlocking();
}

WaitStrategy 的实现类共有 8 个,也就是说共有 8 种等待策略可供选择。
在这里插入图片描述

除了上面介绍的这个构造函数之外,Disruptor 还有一个只有 3 个参数构造函数。

使用这个构造函数创建的 Disruptor 对象会默认使用 ProducerType.MULTI(多个事件发布者模式)和 BlockingWaitStrategy(阻塞等待策略) 。

public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

1.2.6 发布事件

1.2.6.1 main方法测试

//获取 Disruptor 对象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//绑定处理事件的Handler对象
disruptor.handleEventsWith(new LogEventHandler());
//启动 Disruptor
disruptor.start();
//获取保存事件的环形数组(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//发布 10w 个事件
for (int i = 1; i <= 100000; i++) {
    // 通过调用 RingBuffer 的 next() 方法获取下一个空闲事件槽的序号
    long sequence = ringBuffer.next();
    try {
        LogEvent logEvent = ringBuffer.get(sequence);
        // 初始化 Event,对其赋值
        logEvent.setMessage("这是第%d条日志消息".formatted(i));
    } finally {
        // 发布事件
        ringBuffer.publish(sequence);
    }
}
// 关闭 Disruptor
disruptor.shutdown();

1.2.6.2 使用配置方式

public interface DisruptorMqService {

    /**
     * 消息
     * @param message
     */
    void sayHelloMq(String message);
}

@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {

    @Autowired
    private RingBuffer<LogEvent> messageModelRingBuffer;

    @Override
    public void sayHelloMq(String message) {
        log.info("record the message: {}",message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

标签:Disruptor,Java,1.2,事件,RingBuffer,集合,new,public
From: https://www.cnblogs.com/jingzh/p/17525369.html

相关文章

  • MQ常用命令集合
    MQ常用命令集合1、分配MQ./mqadminallocateMQ-nlocalhost:9876-ttst-topic-iipListipList以逗号分隔2、删除topic./mqadmindeleteTopic-nlocalhost:9876-tzto-example-cDefultCluster3、获取topic的cluster./mqadmintopicClusterList-n192.168.1.x:987......
  • 初学者:8个JavaScript技巧
    有很多方法可以优化我们的JavaScript代码,本文总结了我在工作中经常使用的8个JavaScript技巧,希望它也能帮助你。减少使用if-else在编写两个以上的if...else时,是否有更好的优化方法?如下代码,我们需要根据一个汉堡包的名字来计算它的价格。constgetPriceByName=(na......
  • JavaScript中的if与switch的区别是什么?
    很多同学问我if和swicth的区别,今天我就从多个维度来和大家分析一下if与switch的区别到底是什么?1.语法结构:if语句:使用if关键字后接条件表达式,如果条件为真,则执行if块中的代码。switch语句:使用switch关键字后接一个表达式,根据表达式的值匹配相应的case标签,并执行对应的代码块。2.可读......
  • JavaScript中的if与switch的区别是什么?
    很多同学问我if和swicth的区别,今天我就从多个维度来和大家分析一下if与switch的区别到底是什么?1.语法结构:if语句:使用if关键字后接条件表达式,如果条件为真,则执行if块中的代码。switch语句:使用switch关键字后接一个表达式,根据表达式的值匹配相应的case标签,并执行对应的代码块。2.可读......
  • vscode不支持 java 1.8 问题
    vscode不支持jdk1.8问题,实际上是 vscode的部分java插件不支持java1.8有些插件要求jdk11以上,可降级避开,不过没必要有些插件要求jdk17以上可以同时安装两个版本的jdk,例如jdk1.8和jdk17在vscode的settings.json文件中,添加jdk配置,将java1.8设为默认"java.jdt.ls......
  • Java流程控制
    Java流程控制用户交互Scanner1.java.util.Scanner(java5新特性)1.Scannerscanner=newScanner(System.in);2.scanner.next()/scanner.nextline()3.IO类使用完之后记得关闭顺序结构1.按照顺序执行依次执行基本数据结构选择结构1.if单选择双选择多......
  • 如何通过Java读取到Windows系统日志evtx文件
    近日公司有个需求,需要调研如何使用Java来读取Windows日志文件(类型:应用程序,安全,Setup,系统)一番调研以后,在仅使用java的基础上系统日志文件似乎不太可能(就个人调研结果来看),再通过多渠道查询(百度、chargpt),找到2个可能的实现的方案:1、使用Java来调用C++方法JNA(JavaNativeAccess)......
  • Java并发工具包详解
    针对并发编程,Java提供了很多并发工具类供我们使用,下面我们详细介绍一下。SemaphoreSemaphore,现在普遍翻译为“信号量”,以前也曾被翻译成“信号灯”,因为类似现实生活里的红绿灯,车辆能不能通行,要看是不是绿灯。同样,在编程世界里,线程能不能执行,也要看信号量是不是允许。信号量模型......
  • Java 中使用 OkHttpClent 请求接口 get方式 post方式
    学习记录,不喜勿喷什么是OkHttp一般在Java平台上,我们会使用ApacheHttpClient作为Http客户端,用于发送HTTP请求,并对响应进行处理。比如可以使用http客户端与第三方服务(如SSO服务)进行集成,当然还可以爬取网上的数据等。OKHttp与HttpClient类似,也是一个Http客户端,提供了对HTTP......
  • Java异常处理机制及Result最佳实践
    从jvm层看待异常处理机制1.当方法抛出异常时,首先会在当前方法的异常表中查找符合的异常处理程序2.如果找到匹配的异常处理程序,则继续在该异常处理程序中继续执行逻辑3.如果找不到匹配的,则弹出当前栈帧即结束当前方法的执行,让上一层调用者在其异常表中寻找匹配的异常处......