首页 > 其他分享 >Disruptor入门

Disruptor入门

时间:2023-02-10 15:57:39浏览次数:59  
标签:Disruptor 入门 disruptor 10000L sequence ringBuffer event

Disruptor

介绍

主页:http://lmax-exchange.github.io/disruptor/

源码:https://github.com/LMAX-Exchange/disruptor

GettingStarted: https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

api: http://lmax-exchange.github.io/disruptor/docs/index.html

maven: https://mvnrepository.com/artifact/com.lmax/disruptor

Disruptor的特点

对比ConcurrentLinkedQueue : 链表实现

JDK中没有ConcurrentArrayQueue

Disruptor是数组实现的

无锁,高并发,使用环形Buffer,直接覆盖(不用清除)旧的数据,降低GC频率

实现了基于事件的生产者消费者模式(观察者模式)

RingBuffer

环形队列

RingBuffer的序号,指向下一个可用的元素

采用数组实现,没有首尾指针

对比ConcurrentLinkedQueue,用数组实现的速度更快

假如长度为8,当添加到第12个元素的时候在哪个序号上呢?用12%8决定

当Buffer被填满的时候到底是覆盖还是等待,由Producer决定

长度设为2的n次幂,利于二进制计算,例如:12%8 = 12 & (8 - 1) pos = num & (size -1)

Disruptor开发步骤

  1. 定义Event - 队列中需要处理的元素

  2. 定义Event工厂,用于填充队列

    这里牵扯到效率问题:disruptor初始化的时候,会调用Event工厂,对ringBuffer进行内存的提前分配

    GC产频率会降低

  3. 定义EventHandler(消费者),处理容器中的元素

事件发布模板

long sequence = ringBuffer.next();  // Grab the next sequence
try {
    LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
    // for the sequence
    event.set(8888L);  // Fill with data
} finally {
    ringBuffer.publish(sequence);
}

使用EventTranslator发布事件

//===============================================================
        EventTranslator<LongEvent> translator1 = new EventTranslator<LongEvent>() {
            @Override
            public void translateTo(LongEvent event, long sequence) {
                event.set(8888L);
            }
        };

        ringBuffer.publishEvent(translator1);

        //===============================================================
        EventTranslatorOneArg<LongEvent, Long> translator2 = new EventTranslatorOneArg<LongEvent, Long>() {
            @Override
            public void translateTo(LongEvent event, long sequence, Long l) {
                event.set(l);
            }
        };

        ringBuffer.publishEvent(translator2, 7777L);

        //===============================================================
        EventTranslatorTwoArg<LongEvent, Long, Long> translator3 = new EventTranslatorTwoArg<LongEvent, Long, Long>() {
            @Override
            public void translateTo(LongEvent event, long sequence, Long l1, Long l2) {
                event.set(l1 + l2);
            }
        };

        ringBuffer.publishEvent(translator3, 10000L, 10000L);

        //===============================================================
        EventTranslatorThreeArg<LongEvent, Long, Long, Long> translator4 = new EventTranslatorThreeArg<LongEvent, Long, Long, Long>() {
            @Override
            public void translateTo(LongEvent event, long sequence, Long l1, Long l2, Long l3) {
                event.set(l1 + l2 + l3);
            }
        };

        ringBuffer.publishEvent(translator4, 10000L, 10000L, 1000L);

        //===============================================================
        EventTranslatorVararg<LongEvent> translator5 = new EventTranslatorVararg<LongEvent>() {

            @Override
            public void translateTo(LongEvent event, long sequence, Object... objects) {
                long result = 0;
                for(Object o : objects) {
                    long l = (Long)o;
                    result += l;
                }
                event.set(result);
            }
        };

        ringBuffer.publishEvent(translator5, 10000L, 10000L, 10000L, 10000L);

使用Lamda表达式

package com.mashibing.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

public class Main03
{
    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();


        ringBuffer.publishEvent((event, sequence) -> event.set(10000L));

        System.in.read();
    }
}

ProducerType生产者线程模式

ProducerType有两种模式 Producer.MULTI和Producer.SINGLE

默认是MULTI,表示在多线程模式下产生sequence

如果确认是单线程生产者,那么可以指定SINGLE,效率会提升

如果是多个生产者(多线程),但模式指定为SINGLE,会出什么问题呢?

等待策略

1,(常用)BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。

2,BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu

3,LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.

4,LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。

5,PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用哪种等待策略

6,TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常

7,(常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu

  1. (常用)SleepingWaitStrategy : sleep

消费者异常处理

默认:disruptor.setDefaultExceptionHandler()

覆盖:disruptor.handleExceptionFor().with()

依赖处理

标签:Disruptor,入门,disruptor,10000L,sequence,ringBuffer,event
From: https://www.cnblogs.com/Acaak/p/17109204.html

相关文章

  • PLC入门笔记4
    逻辑指令及其应用基本逻辑指令续电器PLC公式        扩展逻辑指令1.置位和复位线圈      置位ON/复位OFF(SET/RST)不推荐置位复位......
  • webgl学习笔记2-入门画点
    实现目标画一个点案例流程具体代码<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"/><metahttp-equiv="X-UA-Compatible"content="IE=......
  • JSP_案例_改造Cookie案例与会话技术_Session_快速入门
     JSP_案例_改造Cookie案例<%@pageimport="java.util.Date"%><%@pageimport="java.text.SimpleDateFormat"%><%@pageimport="java.net.URLEncoder"%><......
  • jquery概念以及jquery快速入门
    JQuery概念概念:一个JAVAScript框架jQuery是一个快速、简洁的JavaScript框架,是继Prototype之后又一个优秀的JavaScript代码库(框架)。jQuery设计的宗旨是“writeLes......
  • JSP_脚本与JSP_入门学习
    JSP_脚本JSP的脚本:定义java代码的方式1.<%代码 %>:定义的java代码,在service方法中。service方法中可以定义什么,该脚本就可以定义什么。......
  • 会话技术Cookie快速入门、原理分析
    会话技术Cookie快速入门1.概念:客户端会话技术,将数据保存到客户端2.快速入门:使用步骤:1.创建Cookie对象,绑定数据newCook......
  • 简单入门echart方法
    链接:https://www.jianshu.com/p/1f2c37c5c02f 官网:https://echarts.apache.org/examples/zh/index.html#chart-type-pie 1.引入echart库import*asechartsfrom......
  • Web安全入门与靶场实战(32)- 利用find提权
    在上篇文章中,我们找到了靶机中的find命令被设置了SUID权限,那么如何利用find来提权呢?这里需要用到find命令的exec处理动作。之前说过,find属于Linux中比较复杂的一个命令,主要......
  • 《Terraform 101 从入门到实践》 第三章 Modules模块化
    《Terraform101从入门到实践》这本小册在南瓜慢说官方网站和GitHub两个地方同步更新,书中的示例代码也是放在GitHub上,方便大家参考查看。模块的概念模块化是Terrafor......
  • PLC入门笔记3
    熟悉开发环境工具下载官网失效软件安装官网失效第一次PLC之旅走廊灯两地控制案例PLC型号确定梯形图(LAD)和指令表(STL)两种编程方式程序编辑符号变量类型数据类型......