获得Disruptor
Disruptor是什么,我就不废话了,本文是对官方文档的实现,直接进入主题,要使用Disruptor可以通过Maven或者下载jar来安装Disruptor,只要把对应的jar放在Java classpath就可以了。
1.定义事件
首先声明一个Event来包含需要传递的数据
public class LongEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
2.定义事件工厂
由于需要让Disruptor为我们创建事件,我们同时还需要声明一个EventFactory来实例化Event对象,事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口com.lmax.disruptor.EventFactory<T>。Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
3.定义事件处理的具体实现
我们还需要一个事件消费者,也就是一个事件处理器。这个事件处理器简单地把事件中存储的数据打印到终端,通过实现接口 com.lmax.disruptor.EventHandler<T> 定义事件处理的具体实现。
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
System.out.println(longEvent.getValue());
}
}
4.定义事件源
事件都会有一个生成事件的源,举例来说,假设数据来自某种I / O设备,网络或者ByteBuffer格式的文件,事件源会在读取到一部分数据的时候触发事件(触发事件不是自动的,程序员需要在读取到数据的时候自己触发事件并发布):
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* onData用来发布事件,每调用一次就发布一次事件事件
* 它的参数会通过事件传递给消费者
*
* @param bb
*/
public void onData(ByteBuffer bb) {
long sequence = ringBuffer.next(); // Grab the next sequence
try {
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor for the sequence
event.setValue(bb.getLong(0)); // Fill with data
} finally {
ringBuffer.publish(sequence);
}
}
}
很明显的是:这会比用一个简单队列来发布事件的时候牵涉更多的细节,这是因为事件对象还需要预先分配。发布事件最少需要两步(在最低级别):声明环形缓冲区中的事件槽,然后发布可用数据。发布事件的时候要使用try/finnally保证事件一定会被发布。如果我们使用RingBuffer.next()获取一个事件槽,那么一定要发布对应的事件。如果不能发布事件,那么就会引起Disruptor状态的混乱。尤其是在多个事件生产者的情况下会导致事件消费者失速,从而不得不重启应用才能会恢复。
5.使用3.0版本翻译器
Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件。
public class LongEventProducerWithTranslator {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.setValue(bb.getLong(0));
}
};
public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
这种方法的另一个优点是,翻译器代码可以被拉入单独的类,并且可以轻松地单独测试单元。Disruptor提供了不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去产生一个Translator对象。通过环缓冲区上的调用传递给Translator的原因是允许翻译器被表示为静态类或非捕获lambda(当Java 8滚动时)作为翻译方法的参数。
6.测试代码
/**
* 使用LongEventProducer
* Created by xmr on 2017/6/7.
*/
public class LongEventMain {
public static void main(String[] args) {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// The factory for the event
EventFactory eventFactory = new LongEventFactory();
// RingBuffer 大小,必须是 2 的 N 次方
int ringBufferSize = 1024 * 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, ringBufferSize, executor);
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);//连接handler
disruptor.start();//启动disruptor,启动所有线程
//从disruptor中获得ringBuffer用于发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
producer.onData(bb);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 使用LongEventProducerWithTranslator
* Created by xmr on 2017/6/7.
*/
public class LongEventMain1 {
public static void main(String[] args) {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// The factory for the event
EventFactory eventFactory = new LongEventFactory();
// RingBuffer 大小,必须是 2 的 N 次方
int ringBufferSize = 1024 * 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, ringBufferSize, executor);
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);//连接handler
disruptor.start();//启动disruptor,启动所有线程
//从disruptor中获得ringBuffer用于发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
LongEventProducerWithTranslator translator=new LongEventProducerWithTranslator(ringBuffer);
for (long l = 0; true; l++) {
bb.putLong(0, l);
translator.onData(bb);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
7.使用Java 8
Disruptor在自己的接口里面添加了对于Java 8 Lambda的支持。大部分Disruptor中的接口都符合Functional Interface的要求(也就是在接口中仅仅有一个方法)。所以在Disruptor中,可以广泛使用Lambda来代替自定义类。
/**
* 用lambda表达式来注册EventHandler和EventProductor
* Created by xmr on 2017/6/8.
*/
public class LongEventMain2 {
public static void main(String[] args) throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// 可以使用lambda来注册一个EventHandler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue()));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
请注意,上例中不再需要许多类(例如处理程序,翻译器)。还要注意,用于publishEvent()的lambda仅引用传入的参数。如果我们将代码写成:
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
这将创建一个捕获lambda,这意味着在将lambda传递给publishEvent()调用时,需要实例化一个对象来保存ByteBuffer bb变量。这会增加不必要的GC。所以在需要较低GC水平的情况下最好把所有的参数都通过lambda传递。
由于在Java 8中方法引用也是一个lambda,因此还可以把上面的代码改成下面的代码:
/**
* 用lambda表达式来注册EventHandler和EventProductor
* Created by xmr on 2017/6/8.
*/
public class LongEventMain3 {
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println(event);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.setValue(buffer.getLong(0));
}
public static void main(String[] args) throws Exception
{
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(LongEventMain3::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(LongEventMain3::translate, bb);
Thread.sleep(1000);
}
}
}
8.基本调整选项
上面的代码已经可以处理大多数的情况了,但是,如果您能够对Disruptor运行的硬件和软件环境做出某些假设,那么您可以利用多个调优选项来提高性能。基本的选项有两个:单或者多生产者模式和可选的等待策略。
单或多 事件生产者
在并发系统中提高性能最好的方式之一就是单一写者原则,对Disruptor也是适用的。如果在你的代码中仅仅有一个事件生产者,那么可以设置为单一生产者模式来提高系统的性能。
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
eventFactory, ringBufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());
9.指定等待策略
Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用。Disruptor 提供了多个 WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运行环境的 CPU 的硬件特点选择恰当的策略,并配合特定的 JVM 的配置参数,能够实现不同的性能提升。
● 其中,Disruptor默认的等待策略是BlockingWaitStrategy,这个策略的内部使用一个锁和条件变量来控制线程的执行和等待(Java基本的同步方法),BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现;
● SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,它的方式是循环等待并且在循环中间调用LockSupport.parkNanos(1)来睡眠,(在Linux系统上面睡眠时间60µs).然而,它的优点在于生产线程只需要计数,而不执行任何指令。并且没有条件变量的消耗。但是,事件对象从生产者到消费者传递的延迟变大了。SleepingWaitStrategy最好用在不需要低延迟,而且事件发布对于生产者的影响比较小的情况下。比如异步日志功能;
● YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。这种策略在减低系统延迟的同时也会增加CPU运算量。YieldingWaitStrategy策略会循环等待sequence增加到合适的值。循环中调用Thread.yield()允许其他准备好的线程执行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用YieldingWaitStrategy策略。例如,CPU开启超线程的时候。
● BusySpinWaitStrategy是性能最高的等待策略,同时也是对部署环境要求最高的策略。这个性能最好用在事件处理线程比物理内核数目还要小的时候。例如:在禁用超线程技术的时候。
10.从RingBuffer中清除对象
当通过Disruptor传递数据时,对象可能比预期寿命更长。所以为了避免这种情况发生,可能需要在处理之后清除事件。如果您有一个事件处理程序,那么一个能够清除同一处理程序中的值的程序就足够了。如果你有一连串的事件处理程序,那么您可能需要一个放置在链末端的特定处理程序来处理清除对象。
class ObjectEvent<T>{
T val;
void clear(){
val = null;
}
}
public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
{
// Failing to call clear here will result in the
// object associated with the event to live until
// it is overwritten once the ring buffer has wrapped
// around to the beginning.
event.clear();
}
}
public static void main(String[] args)
{
Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
() -> ObjectEvent<String>(), bufferSize, executor);
disruptor
.handleEventsWith(new ProcessingEventHandler())
.then(new ClearingObjectHandler());
}
上面的就是官方文档的一些介绍,因为时间仓促还有很多不足,希望多多指教
参考代码
参考:
http://ifeve.com/disruptor-getting-started/
https://github.com/LMAX-Exchange/disruptor
http://xsh5324.iteye.com/blog/2058925?utm_source=tool.lu