首页 > 其他分享 >使用Disruptor完成多线程下并发、等待、先后等操作

使用Disruptor完成多线程下并发、等待、先后等操作

时间:2023-02-01 13:06:07浏览次数:43  
标签:Disruptor handleEventsWith last disruptor 执行 并发 new import 多线程


Java完成多线程间的等待功能:

场景1:一个线程等待其他多个线程都完成后,再进行下一步操作(如裁判员计分功能,需要等待所有运动员都跑完后,才去统计分数。裁判员和每个运动员都是一个线程)。

场景2:多个线程都等待至某个状态后,再同时执行(模拟并发操作,启动100个线程 ,先启动完的需要等待其他未启动的,然后100个全部启动完毕后,再一起做某个操作)。

以上两个场景都较为常见,Java已经为上面的场景1和2分别提供了CountDownLatch和CyclicBarrier两个实现类来完成,参考

而对于更复杂的场景,如

使用Disruptor完成多线程下并发、等待、先后等操作_java

譬如希望1执行完后才执行2,3执行完后才执行4,1和3并行执行,2和4都执行完后才执行last。

还有其他的更奇怪的执行顺序等等。当然这些也可以通过组合多个CountDownLatch或者CyclicBarrier、甚至使用wait、Lock等组合来实现。不可避免的是,都需要使用大量的锁,直接导致性能的急剧下降和多线程死锁等问题发生。那么有没有高性能的无锁的方式来完成这种复杂的需求实现呢?

那就是Disruptor!

Disruptor可以非常简单的完成这种复杂的多线程并发、等待、先后执行等。

至于Disruptor是什么就不说了,直接来看使用:

直接添加依赖包,别的什么都不需要。

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.1</version>
</dependency>


package b;

import a.FirstEventHandler;
import a.LongEvent;
import a.LongEventFactory;
import a.LongEventProducer;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* @author wuweifeng wrote on 2018/3/29.
*/
public class Simple {
public static void main(String[] args) {
ThreadFactory producerFactory = Executors.defaultThreadFactory();
LongEventFactory eventFactory = new LongEventFactory();
int bufferSize = 8;

Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, producerFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());

FirstEventHandler firstEventHandler = new FirstEventHandler();
SecondEventHandler secondHandler = new SecondEventHandler();
ThirdEventHandler thirdEventHandler = new ThirdEventHandler();
FourthEventHandler fourthEventHandler = new FourthEventHandler();
LastEventHandler lastEventHandler = new LastEventHandler();

//1,2,last顺序执行
//disruptor.handleEventsWith(new LongEventHandler()).handleEventsWith(new SecondEventHandler())
// .handleEventsWith(new LastEventHandler());

//也是1,2,last顺序执行
//disruptor.handleEventsWith(firstEventHandler);
//disruptor.after(firstEventHandler).handleEventsWith(secondHandler).then(lastEventHandler);

//1,2并发执行,之后才是last
//disruptor.handleEventsWith(firstEventHandler, secondHandler);
//disruptor.after(firstEventHandler, secondHandler).handleEventsWith(lastEventHandler);

//1后2,3后4,1和3并发,2和4都结束后last
disruptor.handleEventsWith(firstEventHandler, thirdEventHandler);
disruptor.after(firstEventHandler).handleEventsWith(secondHandler);
disruptor.after(thirdEventHandler).handleEventsWith(fourthEventHandler);
disruptor.after(secondHandler, fourthEventHandler).handleEventsWith(lastEventHandler);


disruptor.start();

RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long i = 0; i < 10L; i++) {
bb.putLong(0, i);
longEventProducer.onData(bb);
}

disruptor.shutdown();
}
}

主要就是这个类。我注释掉的部分分别为顺序执行、和12并发然后执行last。

上面那个图对应的代码主要就是after的使用。

运行结果 :

使用Disruptor完成多线程下并发、等待、先后等操作_Disruptor使用_02

可以看到,由于buffer为8,所以在一个周期内,最大value=7.顺序就是1-3,2-4,1、3是并发的。对于同一个Event,2和4执行完后才执行last。多执行几次看看,就能看明白。

这里用的producer只打印了10个,可以调大,结果就会随机性更好。

标签:Disruptor,handleEventsWith,last,disruptor,执行,并发,new,import,多线程
From: https://blog.51cto.com/u_13706148/6031385

相关文章

  • 并发编程-Python
    目录01、理论多道技术null02、进程进程运行的三状态图同步和异步阻塞和非阻塞开启进程的两种方式进程对象的join方法进程之间数据相互隔离(默认情况下)进程对象及其他方法僵......
  • 理解Golang 赋值的并发安全性
     1.什么是并发安全并发安全就是程序在并发情况下执行的结果是正确的。比如对一个变量简单的自增操作count++,在非并发下很好理解,而在并发情况下却容易出现预期之外......
  • 并发编程
    第1章并发编程的三大挑战线程的上下切换死锁资源限制解决方法:解决上下文切换无锁并发编程Cas使用最少的线程协程避免死锁避免一个线程同时获取多把锁避免一个线程在锁内同......
  • 极客时间 Java并发编程实战 笔记
    思考、再思考、总结、再总结01可见性、原子性和有序性举几个例子先。缓存可能导致可见性问题,因为多核CPU上的多个核可能都持有同一数据的不同缓存。两个线程并行地对......
  • netcore之异步并不是多线程!
    1、遇到await,线程的变化遇到await会把当前线程返回且返回值就是await后面的Task,再从线程池随机取一个线程往下执行代码。我们使用封装好的异步方法模拟写入大量字符串的......
  • i++在多线程下的原子性问题
     staticinti=0;@TestvoidiTest()throwsInterruptedException{Threadt1=newThread(()->{for(intj=0;j<50000;......
  • [Python] 爬虫系统与数据处理实战 Part.3 多线程和分布式
    为什么用快反爬虫 多线程复杂性资源、数据的安全性:锁保护原子性:数据操作是天然互斥的同步等待:wait()、notify()、notifyall()死锁:多个线程对资源互锁容灾:任......
  • OpenHarmony stage worker 多线程
    作者:徐金生OpenHarmony存在一个与主线程并行的独立线程--Worker。对于处理耗时操作且不阻塞主线程起到了重要的作用,并且多个线程并发可以提高CPU和内存的利用率。在实际开......
  • 多线程--消费者与生产者实例
    多线程实例1.消费者与生产者实例(管程法)产品、消费者、生产者、缓冲区产品,保证有一个唯一标识即可消费者继承Thread,注册缓冲区,从缓冲区消费生产者继承Thread,注册缓冲......
  • Java并发JUC——Future和Callable
    Runnable的缺陷不能返回一个返回值也不能抛出checkedExceptionCallable接口类似于Runnable,被其他线程执行的任务Callable接口中只有一个call()方法,和Runnable相比,......