首页 > 其他分享 >Disruptor官方文档实现

Disruptor官方文档实现

时间:2023-09-25 12:33:12浏览次数:26  
标签:Disruptor bb disruptor 官方 文档 ringBuffer new public


获得Disruptor
Disruptor是什么,我就不废话了,本文是对官方文档的实现,直接进入主题,要使用Disruptor可以通过Maven或者下载jar来安装Disruptor,只要把对应的jar放在Java classpath就可以了。
1.定义事件
首先声明一个Event来包含需要传递的数据

public class LongEvent {
private long value;
public long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}
}


2.定义事件工厂


由于需要让Disruptor为我们创建事件,我们同时还需要声明一个EventFactory来实例化Event对象,事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口com.lmax.disruptor.EventFactory<T>。Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。


public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}


3.定义事件处理的具体实现


我们还需要一个事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端,通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。


public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println(longEvent.getValue());
}
}


4.定义事件源


事件都会有一个生成事件的源,举例来说,假设数据来自某种I / O设备,网络或者ByteBuffer格式的文件,事件源会在读取到一部分数据的时候触发事件(触发事件不是自动的,程序员需要在读取到数据的时候自己触发事件并发布):


public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

/**
* onData用来发布事件,每调用一次就发布一次事件事件
* 它的参数会通过事件传递给消费者
*
* @param bb
*/
public void onData(ByteBuffer bb) {
long sequence = ringBuffer.next(); // Grab the next sequence
try {
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor for the sequence
event.setValue(bb.getLong(0)); // Fill with data
} finally {
ringBuffer.publish(sequence);
}
}
}

很明显的是:这会比用一个简单队列来发布事件的时候牵涉更多的细节,这是因为事件对象还需要预先分配。发布事件最少需要两步(在最低级别):声明环形缓冲区中的事件槽,然后发布可用数据。发布事件的时候要使用try/finnally保证事件一定会被发布。如果我们使用RingBuffer.next()获取一个事件槽,那么一定要发布对应的事件。如果不能发布事件,那么就会引起Disruptor状态的混乱。尤其是在多个事件生产者的情况下会导致事件消费者失速,从而不得不重启应用才能会恢复。


5.使用3.0版本翻译器


Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件。


public class LongEventProducerWithTranslator {
private final RingBuffer<LongEvent> ringBuffer;

public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}

private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.setValue(bb.getLong(0));
}
};

public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}

这种方法的另一个优点是,翻译器代码可以被拉入单独的类,并且可以轻松地单独测试单元。Disruptor提供了不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去产生一个Translator对象。通过环缓冲区上的调用传递给Translator的原因是允许翻译器被表示为静态类或非捕获lambda(当Java 8滚动时)作为翻译方法的参数。


6.测试代码


/**
* 使用LongEventProducer
* Created by xmr on 2017/6/7.
*/
public class LongEventMain {
public static void main(String[] args) {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// The factory for the event
EventFactory eventFactory = new LongEventFactory();
// RingBuffer 大小,必须是 2 的 N 次方
int ringBufferSize = 1024 * 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, ringBufferSize, executor);
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);//连接handler
disruptor.start();//启动disruptor,启动所有线程
//从disruptor中获得ringBuffer用于发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.onData(bb);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}



/**
* 使用LongEventProducerWithTranslator
* Created by xmr on 2017/6/7.
*/
public class LongEventMain1 {
public static void main(String[] args) {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// The factory for the event
EventFactory eventFactory = new LongEventFactory();
// RingBuffer 大小,必须是 2 的 N 次方
int ringBufferSize = 1024 * 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, ringBufferSize, executor);
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);//连接handler
disruptor.start();//启动disruptor,启动所有线程
//从disruptor中获得ringBuffer用于发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

ByteBuffer bb = ByteBuffer.allocate(8);
LongEventProducerWithTranslator translator=new LongEventProducerWithTranslator(ringBuffer);
for (long l = 0; true; l++) {
bb.putLong(0, l);
translator.onData(bb);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}


7.使用Java 8


Disruptor在自己的接口里面添加了对于Java 8 Lambda的支持。大部分Disruptor中的接口都符合Functional Interface的要求(也就是在接口中仅仅有一个方法)。所以在Disruptor中,可以广泛使用Lambda来代替自定义类。


/**
* 用lambda表达式来注册EventHandler和EventProductor
* Created by xmr on 2017/6/8.
*/
public class LongEventMain2 {

public static void main(String[] args) throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// 可以使用lambda来注册一个EventHandler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue()));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}

请注意,上例中不再需要许多类(例如处理程序,翻译器)。还要注意,用于publishEvent()的lambda仅引用传入的参数。如果我们将代码写成:


ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}

这将创建一个捕获lambda,这意味着在将lambda传递给publishEvent()调用时,需要实例化一个对象来保存ByteBuffer bb变量。这会增加不必要的GC。所以在需要较低GC水平的情况下最好把所有的参数都通过lambda传递。


由于在Java 8中方法引用也是一个lambda,因此还可以把上面的代码改成下面的代码:


/**
* 用lambda表达式来注册EventHandler和EventProductor
* Created by xmr on 2017/6/8.
*/
public class LongEventMain3 {

public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println(event);
}

public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.setValue(buffer.getLong(0));
}

public static void main(String[] args) throws Exception
{
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();

// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;

// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);

// Connect the handler
disruptor.handleEventsWith(LongEventMain3::handleEvent);

// Start the Disruptor, starts all threads running
disruptor.start();

// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(LongEventMain3::translate, bb);
Thread.sleep(1000);
}
}
}


8.基本调整选项


上面的代码已经可以处理大多数的情况了,但是,如果您能够对Disruptor运行的硬件和软件环境做出某些假设,那么您可以利用多个调优选项来提高性能。基本的选项有两个:单或者多生产者模式和可选的等待策略。


单或多 事件生产者


在并发系统中提高性能最好的方式之一就是单一写者原则,对Disruptor也是适用的。如果在你的代码中仅仅有一个事件生产者,那么可以设置为单一生产者模式来提高系统的性能。


Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
eventFactory, ringBufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());


9.指定等待策略


Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。


● 其中,Disruptor默认的等待策略是BlockingWaitStrategy,这个策略的内部使用一个锁和条件变量来控制线程的执行和等待(Java基本的同步方法),BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;


● SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,它的方式是循环等待并且在循环中间调用LockSupport.parkNanos(1)来睡眠,(在Linux系统上面睡眠时间60µs).然而,它的优点在于生产线程只需要计数,而不执行任何指令。并且没有条件变量的消耗。但是,事件对象从生产者到消费者传递的延迟变大了。SleepingWaitStrategy最好用在不需要低延迟,而且事件发布对于生产者的影响比较小的情况下。比如异步日志功能;


● YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。这种策略在减低系统延迟的同时也会增加CPU运算量。YieldingWaitStrategy策略会循环等待sequence增加到合适的值。循环中调用Thread.yield()允许其他准备好的线程执行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用YieldingWaitStrategy策略。例如,CPU开启超线程的时候。


● BusySpinWaitStrategy是性能最高的等待策略,同时也是对部署环境要求最高的策略。这个性能最好用在事件处理线程比物理内核数目还要小的时候。例如:在禁用超线程技术的时候。


10.从RingBuffer中清除对象


当通过Disruptor传递数据时,对象可能比预期寿命更长。所以为了避免这种情况发生,可能需要在处理之后清除事件。如果您有一个事件处理程序,那么一个能够清除同一处理程序中的值的程序就足够了。如果你有一连串的事件处理程序,那么您可能需要一个放置在链末端的特定处理程序来处理清除对象。


class ObjectEvent<T>{
  T val;

  void clear(){
    val = null;
  }
}

public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
{
// Failing to call clear here will result in the
// object associated with the event to live until
// it is overwritten once the ring buffer has wrapped
// around to the beginning.
event.clear();
}
}

public static void main(String[] args)
{
Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
() -> ObjectEvent<String>(), bufferSize, executor);

disruptor
.handleEventsWith(new ProcessingEventHandler())
.then(new ClearingObjectHandler());
}

上面的就是官方文档的一些介绍,因为时间仓促还有很多不足,希望多多指教

参考代码

参考:


http://ifeve.com/disruptor/


http://ifeve.com/disruptor-getting-started/



https://github.com/LMAX-Exchange/disruptor


http://xsh5324.iteye.com/blog/2058925?utm_source=tool.lu



标签:Disruptor,bb,disruptor,官方,文档,ringBuffer,new,public
From: https://blog.51cto.com/u_6947107/7594641

相关文章

  • 文档图像处理:大模型的突破与新探索
    前言随着数字化时代的到来,文档图像处理技术在各行各业扮演着越来越重要的角色。在2023第十二届中国智能产业高峰论坛(CIIS 2023)的专题论坛上,合合信息智能技术平台事业部副总经理、高级工程师丁凯博士分享了当前文档图像处理面临的困难,并讨论大模型在该领域的突破和新探索。虚竹哥把......
  • 文档升级 | iTOP-RK3568开发板ADB工具的安装和使用
    iTOP-RK3568开发板使用手册更新,后续资料会不断更新,不断完善,帮助用户快速入门,大大提升研发速度。《iTOP-3568开发板ADB使用手册》进行了文档升级,对ADB工具的安装和使用进行了更全面的步骤介绍。第1章安装adb工具1.1adb简介1.2Windows下安装ADB工具1.3Ubuntu下安装ADB工具第2章......
  • es 编写查询DSL,查询user_name字段不为空的文档
    内容来自对chatgpt的咨询我们可以使用exists查询来检查user_name字段是否存在且包含非空值:{"query":{"bool":{"must":{"exists":{"field":"user_name"......
  • 基于SpringBoot的书籍阅读管理系统设计与实现-计算机毕业设计源码+LW文档
    摘要:书籍阅读管理系统是针对目前书籍阅读管理的实际需求,从实际工作出发,对过去的书籍阅读管理系统存在的问题进行分析,完善客户的使用体会。采用计算机系统来管理信息,取代人工管理模式,查询便利,信息准确率高,节省了开支,提高了工作的效率。本系统结合计算机系统的结构、概念、模型、原......
  • 基于SpringBoot的校园疫情防控系统-计算机毕业设计源码+LW文档
    一、选题的背景与意义开发背景21世纪,我国早在上世纪就已普及互联网信息,互联网对人们生活中带来了无限的便利。像大部分的企事业单位等机构都有自己的管理系统,由传统的管理模式向互联网发展,如今开发自己的系统是时代发展的必然产物。那么开发校园疫情防控系统意义和用处有哪些呢?......
  • 基于SpringBoot的幼儿园管理系统-计算机毕业设计源码+LW文档
    摘 要幼儿园管理系统是一种以信息技术为基础,为幼儿园提供全面管理和服务的软件系统。本文旨在设计并实现一种针对幼儿园管理的信息化系统,以提高幼儿园的管理效率、服务质量和家长满意度。本文首先介绍了幼儿园管理系统的背景和意义,以及相关的技术和理论基础。随后,对幼儿园管理......
  • 基于Springboot的准妈妈孕期交流平台的设计与实现-计算机毕业设计源码+LW文档
    系统的背景及意义 智能化的高校排课系统是基于WEB的校园教务信息综合管理系统的一项重要功能,能很好的解决上述问题。为响应我国的教育改革和素质教育的号召,学生自主选课及校园内日常标准化选课的信息量日夜增长,合理安排课程的工作量与复杂程度也随之增大,手工管理的传统模式已经......
  • camtasia studio下载-camtasia studio官方版下载 各个版本下载
    CamtasiaStudio中文版是专业的屏幕录像和后期编辑软件,能在任何颜色模式下轻松地记录屏幕动作(屏幕/摄像头),包括影像、音效、鼠标移动轨迹、解说声音等,具有即时播放和编辑压缩的功能,让用户可对视频即时播放和编辑压缩。本站提供CamtasiaStudio最新下载,有需要的朋友们快来体验吧。软......
  • EDIUS下载_EDIUS官方下载 各个版本下载
    EDIUS最新版是一款简单又好用的电脑视频编辑工具软件,通过EDIUS最新版强大的功能,用户可以更加简单轻松的完成各种视频剪辑工作任务,帮助用户快速完成对视频的更多修改以及创作,完美支持各种格式非常强大,感兴趣的快来下载尝试一下吧。软件地址:看置顶贴EDIUSPro8功能特点实时混编任何S......
  • 始终安装Burpsite Pro官方的最新版
    0x01下载安装包1.直接到官网下载BurpPro的最新版jar包,网上的各种倒了几道手的不推荐下载(可以一直使用最新版)官网地址:https://portswigger.net/burp2.下载破解工具文件比较小,这里我直接传到博客园了(h3110w0r1d-y大佬的项目)下载地址:https://files.cnblogs.com/files/......