串并行操作
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
串行操作:使用链式调用的方式
并行操作:使用单独调用的方式
串行
示例代码:
@Data
@ToString
public class Trade {
private String id;
private String name;
private double price;
private AtomicInteger count = new AtomicInteger(0);
}
public class Handler1 implements EventHandler<Trade>{
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("handler1");
event.setName("handler1");
Thread.sleep(3000);
}
}
public class Handler2 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Handler2");
event.setId(UUID.randomUUID().toString());
Thread.sleep(2000);
}
}
public class Handler3 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Handler3 : " + event.toString());
Thread.sleep(1000);
}
}
public class TradePublisher implements Runnable {
CountDownLatch latch;
Disruptor<Trade> disruptor;
@Override
public void run() {
TradeEventTranslator tradeEventTranslator = new TradeEventTranslator();
for (int i = 0; i < 1; i++) {
//新的提交任务的方式
disruptor.publishEvent(tradeEventTranslator);
}
latch.countDown();
}
public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
this.latch = latch;
this.disruptor = disruptor;
}
}
class TradeEventTranslator implements EventTranslator<Trade> {
@Override
public void translateTo(Trade event, long sequence) {
event.setPrice(100);
}
}
disruptor代码:
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Disruptor<Trade> disruptor = new Disruptor<>(Trade::new,
1024 * 1024,
DaemonThreadFactory.INSTANCE,
//单生产者或者多生产者
ProducerType.SINGLE,
//阻塞策略
new BusySpinWaitStrategy());
//串行操作
disruptor.handleEventsWith(new Handler1())
.handleEventsWith(new Handler2())
.handleEventsWith(new Handler3());
disruptor.start();
CountDownLatch latch = new CountDownLatch(1);
executorService.submit(new TradePublisher(latch, disruptor));
latch.await();
disruptor.shutdown();
executorService.shutdown();
}
运行结果:
并行
修改disruptor代码:
disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
或者:
disruptor.handleEventsWith(new Handler1());
disruptor.handleEventsWith(new Handler2());
disruptor.handleEventsWith(new Handler3());
运行结果:
多边形操作
Disruptor可以实现串并形同时编码:
如下图:独立的一个生产者和三个消费者。最棘手的一点是:第三个消费者必须等待前两个消费者处理完成后,才能开始工作。
修改disruptor代码:
disruptor.handleEventsWith(new Handler1(), new Handler2())
.then(new Handler3());
或者:
disruptor.handleEventsWith(new Handler1(), new Handler2())
.handleEventsWith(new Handler3());
六边形操作:
Handler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith(h1, h4);
disruptor.after(h1).handleEventsWith(h2);
disruptor.after(h4).handleEventsWith(h5);
disruptor.after(h2, h5).handleEventsWith(h3);
多生产者、多消费者
创建消费者:
public class Consumer implements WorkHandler<Trade> {
String consumerId;
private AtomicInteger count = new AtomicInteger(0);
public Consumer(String consumerId) {
this.consumerId = consumerId;
}
@Override
public void onEvent(Trade event) throws Exception {
LockSupport.parkNanos(10);
System.out.println("consumerId :" + consumerId + " trade" + event.toString());
count.incrementAndGet();
}
public int get() {
return count.get();
}
}
主方法:
public class Main {
public static void main(String[] args) throws InterruptedException {
//1.创建RingBuffer
RingBuffer<Trade> ringBuffer = RingBuffer.create(ProducerType.MULTI, Trade::new, 1024, new BlockingWaitStrategy());
//2.通过RingBuffer创建一个SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//3.构建多消费者
Consumer[] consumers = new Consumer[10];
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer("消费者" + i);
}
//4.构建多消费者工作池
WorkerPool<Trade> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, new TradeExceptionHandler(), consumers);
//5.设置多个消费者的sequence序号用于统计消费进度,并且设置到ringbuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
//启动WorkerPool
workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 100; i++) {
//生产数据
new Thread(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 1000; j++) {
long seq = ringBuffer.next();
LockSupport.parkNanos(10);
try {
Trade trade = ringBuffer.get(seq);
trade.setId(UUID.randomUUID().toString());
} finally {
ringBuffer.publish(seq);
}
}
}).start();
}
Thread.sleep(10000);
countDownLatch.countDown();
}
}
class TradeExceptionHandler implements ExceptionHandler<Trade> {
@Override
public void handleEventException(Throwable ex, long sequence, Trade event) {
System.out.println("handleEventException");
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("handleOnStartException");
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("handleOnShutdownException");
}
}
多消费者的EventProcessor
使用的是WorkProcessor
。