首页 > 其他分享 >两个例子带你入门 Disruptor

两个例子带你入门 Disruptor

时间:2023-09-19 13:23:23浏览次数:36  
标签:Disruptor 入门 disruptor private 例子 new dataEventListener public

Disruptor 是英国外汇交易公司 LMAX 开发的一个高性能队列。很多知名开源项目里,比如 canal 、log4j2、 storm 都是用了 Disruptor 以提升系统性能 。

这篇文章,我们通过两个例子一步一个脚印帮助同学们入门 Disruptor 。

1 环形缓冲区

下图展示了 Disruptor 的流程图 。

和线程池机制非常类似, Disruptor 也是非常典型的生产者/消费者模式。线程池存储提交任务的容器是阻塞队列,而 Disruptor 使用的是环形缓冲区 RingBuffer

环形缓冲区的设计相比阻塞队列有如下优点:

  • 环形数组结构

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

  • 元素位置定位

数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式,不用担心 index 溢出的问题。index 是 long 类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

2 写一个Hello world

我们写一个非常简单的例子:生产者传递一个单一的长整型值给消费者,而消费者将简单地打印出这个值

2.1 添加依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

2.2 定义事件

首先,我们将定义一个事件(Event),它将携带数据,并且在接下来的所有示例中都是通用的。

public class LongEvent {

    private long value;

    public void set(long value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "LongEvent{" + "value=" + value + '}';
    }
}

为了让 Disruptor 为我们预分配这些事件,我们需要一个 EventFactory 来执行构造。这可以是一个方法引用,比如 LongEvent::new ,或者是 EventFactory 接口的显式实现:

public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

2.3 定义消费者

定义了事件,我们需要创建一个消费者来处理这些事件。我们会创建一个事件处理器(EventHandler),它会将把值打印到控制台上。

public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {
          System.out.println("currentThread:" + Thread.currentThread().getName() + " Event: " + longEvent);
    }
}

2.4 发布

public class LongEventMain {
    public static void main(String[] args) throws Exception {
        int bufferSize = 2;
        Disruptor<LongEvent> disruptor =
                new Disruptor<>(
                        new LongEventFactory(),
                        bufferSize,
                        DaemonThreadFactory.INSTANCE,
                        ProducerType.SINGLE,
                        new BlockingWaitStrategy());
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++) {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

整个发布流程分为四个部分:

  1. 指定环形缓冲区的大小,必须是2的幂次方,例子中设置的值是 1024 ;

  2. 构建 Disruptor ,参数分别是事件工厂EventFactory 环形缓冲区的大小ringBufferSize 处理器线程池生产者类型(单生产者/多生产者)、消费者阻塞策略

  3. 定义事件处理器eventHandler,我们这里的逻辑是打印数据打印在控制台;

  4. 启动 Disruptor,从 Disruptor 中获取环形缓冲区ringBuffer,在 for 循环里 ,调用环形队列的publishEvent方法。

    这里使用了 ByteBuffer 做为数据的存储容器 , 方便作为参数传递。

我们来看下执行结果 :

3 日志处理

3.1 应用场景

上面的例子比较简单,但假如要应用到生产环境,就显得非常粗糙。

我们模拟一个日志处理的场景,用户进入视频播放页面,浏览器定时的发送浏览日志到服务端,服务端将日志存储起来。

3.2 核心类设计

我们定义一个 DisruptorManager 管理器 , 管理器包含三个核心参数:消费者监听器 DataEventListener消费者数量环形队列长度

public class DisruptorManager<T> {

    private static final Integer DEFAULT_CONSUMER_SIZE = 4;

    public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;

    private DataEventListener<T> dataEventListener;

    private DisruptorProducer<T> producer;

    private int ringBufferSize;

    private int consumerSize;

    public DisruptorManager(DataEventListener<T> dataEventListener) {
        this(dataEventListener, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
    }

    public DisruptorManager(DataEventListener<T> dataEventListener, final int consumerSize, final int ringBufferSize) {
        this.dataEventListener = dataEventListener;
        this.ringBufferSize = ringBufferSize;
        this.consumerSize = consumerSize;
    }

    public void start() {
        EventFactory<DataEvent<T>> eventFactory = new DisruptorEventFactory<>();
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(
                eventFactory,
                ringBufferSize,
                DisruptorThreadFactory.create("consumer-thread", false),
                ProducerType.MULTI,
                new BlockingWaitStrategy()
        );
        DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize];
        for (int i = 0; i < consumerSize; i++) {
            consumers[i] = new DisruptorConsumer<>(dataEventListener);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.start();
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        this.producer = new DisruptorProducer<>(ringBuffer, disruptor);
    }

    public DisruptorProducer getProducer() {
        return this.producer;
    }

}

首先和 Hello world 代码中的不同的点,Disruptor 的构造函数中我们自定义了消费者的处理器线程。

DisruptorThreadFactory.create("consumer-thread", false),

然后我们定义消费者的业务逻辑 :

DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize];
for (int i = 0; i < consumerSize; i++) {
    consumers[i] = new DisruptorConsumer<>(dataEventListener);
}
disruptor.handleEventsWithWorkerPool(consumers);

消费者本质上是workHandler的实现类,只不过初始化时将 DataEventListener 作为构造函数的参数。

public class DisruptorConsumer<T> implements WorkHandler<DataEvent<T>> {

    private DataEventListener<T> dataEventListener;

    public DisruptorConsumer(DataEventListener dataEventListener) {
        this.dataEventListener = dataEventListener;
    }

    @Override
    public void onEvent(DataEvent<T> dataEvent) throws Exception {
        if (dataEvent != null) {
            dataEventListener.processDataEvent(dataEvent);
        }
    }

}

因为我们是希望线程池并行的处理这些消息数据,使用的是disruptor.handleEventsWithWorkerPool 可以保证每个事件只会由一个工作处理器处理

在 springboot 项目中,我们需要初始化相关 bean。

@Configuration
@AutoConfigureBefore(RedisConfig.class)
public class DisruptorConfig {

    private final static Logger logger = LoggerFactory.getLogger(DisruptorConfig.class);

    private final static String LIST_KEY = "disruptor:list";

    @Autowired
    private RedissonClient redissonClient;

    @Bean
    public DataEventListener<String> createConsumerListener() {
        DataEventListener<String> dataEventListener = new DataEventListener<String>() {
            @Override
            public void processDataEvent(DataEvent<String> dataEvent) throws InterruptedException {
                logger.info("processDateEvent data:" + dataEvent.getData());
                redissonClient.getList(LIST_KEY).add(dataEvent.getData());
            }
        };
        return dataEventListener;
    }

    @Bean
    public DisruptorProducer<String> createProducer(DataEventListener dataEventListener) {
        DisruptorManager disruptorManage = new DisruptorManager(dataEventListener,
                8,
                1024 * 1024);
        disruptorManage.start();
        return disruptorManage.getProducer();
    }

首先,我们定义好消费者的事件监听器,然后定义 DisruptorProducer, 该类用来将数据提交到环形队列。

public class DisruptorProducer<T> {

    private final Logger logger = LoggerFactory.getLogger(DisruptorProducer.class);

    private final RingBuffer<DataEvent<T>> ringBuffer;

    private final Disruptor<DataEvent<T>> disruptor;

    private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);

    public DisruptorProducer(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor) {
        this.ringBuffer = ringBuffer;
        this.disruptor = disruptor;
    }

    /**
     * Send a data.
     *
     * @param data the data
     */
    public void onData(final T data) {
        try {
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("publish event error:", ex);
        }
    }

    public void shutdown() {
        if (null != disruptor) {
            disruptor.shutdown();
        }
    }

}

最后,在控制器中,接收前端请求:

@Autowired
private DisruptorProducer<String> producer;

@GetMapping("/pushlog")
public ResponseEntity pushlog(String log) {
    producer.onData(log);
    return ResponseEntity.successResult(null);
}

从下图中,我们可以看到从控制器接收到请求后,消费处理器线程不断地将数据打印出来,并且发送到队列中。

4 写到最后

日志处理的例子里,我们试图封装 Disruptor 相关 API ,以便在 springboot 项目中更方便的使用。

笔者在测试过程时,发现若消费逻辑慢的时候,生产者发送数据事件时,可能会阻塞。

为什么生产者会阻塞,Disruptor 的核心原理是什么 ,如何使用 Disruptor 的高级特性顺序依赖执行 ?

正因为有这些疑问,笔者觉得深入理解 Disruptor 原理特别有必要,笔者也会在接下来的文章里一一为大家答疑解惑。


参考资料:

https://lmax-exchange.github.io/disruptor/disruptor.html

https://zhuanlan.zhihu.com/p/45575008

标签:Disruptor,入门,disruptor,private,例子,new,dataEventListener,public
From: https://www.cnblogs.com/makemylife/p/17714364.html

相关文章

  • 【原创】ospf入门知识三
        很高兴抽取一点时间为大家说下ospf中需要注意的一些基础性知识,大神可以飘过。开始如下:    (一)在一个MA网络中DR和BDR的个数有规定么?    在一个MA多路访问网络中,DR和BDR的个数是有规定的,具体的为:1)DR和BDR有且仅有一个;2)BDR可以没有,但DR必须要有一个;3......
  • 【原创】ospf入门知识二
        在上次写了ospf入门知识一,这次我继续写点关于ospf的几点零散知识,希望对大家有点帮助,也是对自己的一次回顾。    (一)ospf和RIP、EIgrp的宣告路由方式有什么不同?ospf是基于接口进行宣告的,它宣告的是接口路由;Rip宣告的是主网,特殊区域的网段;Eigrp宣告的是VLSM子网......
  • 【小沐学NLP】Python使用NLTK库的入门教程
    1、简介NLTK-自然语言工具包-是一套开源Python。支持自然研究和开发的模块、数据集和教程语言处理。NLTK需要Python版本3.7、3.8、3.9、3.10或3.11。NLTK是一个高效的Python构建的平台,用来处理人类自然语言数据。它提供了易于使用的接口,通过这些接口可以访问超过50个......
  • HDFS入门
    HDFS的块大小设计原则HDFS常用shell命令HDFS的读写流程第一章HDFS概述1.1HDFS产生背景和定义1.1.1产生背景大数据时代,需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统,HDFS就是分布式文件管理系统的一种1.1.2HDFS定义HDFS(HaddopDistributedFile......
  • Go比Python快多少倍?请看一个简单的例子
    需求两个0-10000的数组,循环遍历两个数组,获取两个元素,打印出乘积为56213的所有元素Python实现importdatetimes=datetime.datetime.now()foriinrange(10000):forjinrange(10000):ifi*j==56213:print(f"{i}*{......
  • 1.5万字长文:从 C# 入门 Kafka
    目录1,搭建Kafka环境安装docker-compose单节点Kafka的部署Kafka集群的部署2,Kafka概念基本概念关于Kafka脚本工具主题管理使用C#创建分区分区与复制生产者消费者修改配置3,Kafka.NET基础生产者批量生产使用Tasks.WhenAll如何进行性能测试消费4,生产者连接BrokerK......
  • HeadFirst设计模式学习之OO设计模式入门
    【一】引入---鸭子无论在哪门编程语言中,都离不开我们最熟悉的鸭子模型,因此作者在引入部分也是利用鸭子作为案例引入我们进行入门的学习【1】鸭子游戏现在我们需要做一款模拟鸭子游泳的游戏在游戏中,有不同的鸭子,不同的鸭子都会游泳和呱呱叫而这款游戏的实现思路就是一......
  • C++基础入门
    C++基础入门1C++初识1.1第一个C++程序编写一个C++程序总共分为4个步骤创建项目创建文件编写代码运行程序1.1.1创建项目​ VisualStudio是我们用来编写C++程序的主要工具,我们先将它打开1.1.2创建文件右键源文件,选择添加->新建项给C++文件起个名称,然后点击添......
  • BurpSuite入门指南
    ❤️拦截HTTP流量❤️1.开启拦截2.处理拦截 3.查看HTTP历史记录 ❤️修改HTTP请求❤️ ❤️重发请求❤️1.将报文发送到Repeater中继器2.在Repeater中重发请求 ......
  • ALV data_change事件例子2
    programztest_bcalv_edit_03.*&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&a......