首页 > 其他分享 >Disruptor(3):高级应用

Disruptor(3):高级应用

时间:2022-10-11 18:12:07浏览次数:43  
标签:Disruptor disruptor void 高级 event 应用 handleEventsWith new public

串并行操作

public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)

串行操作:使用链式调用的方式

并行操作:使用单独调用的方式

串行

示例代码:

@Data
@ToString
public class Trade {

    private String id;
    private String name;
    private double price;
    private AtomicInteger count = new AtomicInteger(0);

}

public class Handler1 implements EventHandler<Trade>{
    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("handler1");
        event.setName("handler1");
        Thread.sleep(3000);
    }
}

public class Handler2 implements EventHandler<Trade> {
    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("Handler2");
        event.setId(UUID.randomUUID().toString());
        Thread.sleep(2000);
    }
}
public class Handler3 implements EventHandler<Trade> {
    @Override
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("Handler3 : " + event.toString());
        Thread.sleep(1000);
    }

}

public class TradePublisher implements Runnable {

    CountDownLatch latch;

    Disruptor<Trade> disruptor;

    @Override
    public void run() {

        TradeEventTranslator tradeEventTranslator = new TradeEventTranslator();
        for (int i = 0; i < 1; i++) {
            //新的提交任务的方式
            disruptor.publishEvent(tradeEventTranslator);
        }
        latch.countDown();
    }

    public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
        this.latch = latch;
        this.disruptor = disruptor;
    }
}

class TradeEventTranslator implements EventTranslator<Trade> {

    @Override
    public void translateTo(Trade event, long sequence) {
        event.setPrice(100);
    }
}

disruptor代码:

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        Disruptor<Trade> disruptor = new Disruptor<>(Trade::new,
                1024 * 1024,
                DaemonThreadFactory.INSTANCE,
                //单生产者或者多生产者
                ProducerType.SINGLE,
                //阻塞策略
                new BusySpinWaitStrategy());

        //串行操作
        disruptor.handleEventsWith(new Handler1())
                .handleEventsWith(new Handler2())
                .handleEventsWith(new Handler3());

        disruptor.start();
        CountDownLatch latch = new CountDownLatch(1);

        executorService.submit(new TradePublisher(latch, disruptor));

        latch.await();
        disruptor.shutdown();
        executorService.shutdown();
    }

运行结果:

image-20221011155715322

并行

修改disruptor代码:

disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());

或者:

disruptor.handleEventsWith(new Handler1());
disruptor.handleEventsWith(new Handler2());
disruptor.handleEventsWith(new Handler3());

运行结果:

image-20221011155913202

多边形操作

Disruptor可以实现串并形同时编码:

如下图:独立的一个生产者和三个消费者。最棘手的一点是:第三个消费者必须等待前两个消费者处理完成后,才能开始工作。

image-20221011160238330

修改disruptor代码:

disruptor.handleEventsWith(new Handler1(), new Handler2())
    .then(new Handler3());

或者:

disruptor.handleEventsWith(new Handler1(), new Handler2())
    .handleEventsWith(new Handler3());

六边形操作:

image-20221011162219853

Handler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith(h1, h4);
disruptor.after(h1).handleEventsWith(h2);
disruptor.after(h4).handleEventsWith(h5);
disruptor.after(h2, h5).handleEventsWith(h3);

多生产者、多消费者

创建消费者:

public class Consumer implements WorkHandler<Trade> {

    String consumerId;

    private AtomicInteger count = new AtomicInteger(0);

    public Consumer(String consumerId) {
        this.consumerId = consumerId;
    }

    @Override
    public void onEvent(Trade event) throws Exception {
        LockSupport.parkNanos(10);
        System.out.println("consumerId :" + consumerId + " trade" + event.toString());
        count.incrementAndGet();
    }

    public int get() {
        return count.get();
    }
}

主方法:

public class Main {

    public static void main(String[] args) throws InterruptedException {
        //1.创建RingBuffer
        RingBuffer<Trade> ringBuffer = RingBuffer.create(ProducerType.MULTI, Trade::new, 1024, new BlockingWaitStrategy());
        //2.通过RingBuffer创建一个SequenceBarrier
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        //3.构建多消费者
        Consumer[] consumers = new Consumer[10];
        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new Consumer("消费者" + i);
        }

        //4.构建多消费者工作池
        WorkerPool<Trade> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, new TradeExceptionHandler(), consumers);

        //5.设置多个消费者的sequence序号用于统计消费进度,并且设置到ringbuffer中
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        //启动WorkerPool
        workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));


        CountDownLatch countDownLatch = new CountDownLatch(1);

        for (int i = 0; i < 100; i++) {

            //生产数据
            new Thread(() -> {
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int j = 0; j < 1000; j++) {
                    long seq = ringBuffer.next();
                    LockSupport.parkNanos(10);
                    try {
                        Trade trade = ringBuffer.get(seq);
                        trade.setId(UUID.randomUUID().toString());
                    } finally {
                        ringBuffer.publish(seq);
                    }
                }
            }).start();

        }

        Thread.sleep(10000);
        countDownLatch.countDown();
    }

}

class TradeExceptionHandler implements ExceptionHandler<Trade> {

    @Override
    public void handleEventException(Throwable ex, long sequence, Trade event) {
        System.out.println("handleEventException");
    }

    @Override
    public void handleOnStartException(Throwable ex) {
        System.out.println("handleOnStartException");
    }

    @Override
    public void handleOnShutdownException(Throwable ex) {
        System.out.println("handleOnShutdownException");
    }
}

多消费者的EventProcessor使用的是WorkProcessor

标签:Disruptor,disruptor,void,高级,event,应用,handleEventsWith,new,public
From: https://www.cnblogs.com/wwjj4811/p/16780096.html

相关文章