首页 > 其他分享 >高性能队列Disruptor的初体验

高性能队列Disruptor的初体验

时间:2025-01-21 18:23:36浏览次数:1  
标签:Disruptor 初体验 消费者 disruptor 队列 CPU RingBuffer public

初探 Disruptor

1. 概述

Disruptor 是一个高性能、低延迟的无锁队列替代方案,最初由 LMAX 公司开发,专为处理高吞吐量和低延迟的消息传递系统而设计。它利用环形缓冲区(RingBuffer)和无锁的生产者-消费者模型,大幅提升并发性能。

相比传统的基于 java.util.concurrent 的队列(如 ArrayBlockingQueueLinkedBlockingQueue),Disruptor 通过避免锁竞争、减少 CPU 缓存行无效(cache invalidation)等方式提高吞吐量。

2. 核心概念

2.1 RingBuffer(环形缓冲区)

Disruptor 的核心数据结构是环形缓冲区(RingBuffer),它类似于一个固定大小的数组,数据结构如下:

+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
+----+----+----+----+----+----+----+----+

RingBuffer 通过索引递增的方式循环使用元素,避免内存分配和垃圾回收的开销。

2.2 Sequence(序列号)

在 Disruptor 中,所有读写操作都基于 Sequence,用于跟踪当前生产和消费的位置。它主要包括:

  • Cursor:指向 RingBuffer 中最后一个被写入的位置。
  • SequenceBarrier:用于协调生产者和消费者的进度,确保消费者不会读取尚未发布的数据。
  • Sequencer:用于管理 RingBuffer 的序列。

2.3 Producer(生产者)

生产者向 RingBuffer 写入数据,通常采用 ClaimStrategy 申请空间,然后写入数据并发布。

2.4 Consumer(消费者)

消费者从 RingBuffer 读取数据,并可以设置多个消费者进行并行处理,支持 WorkerPool 模式。

2.5 WaitStrategy(等待策略)

Disruptor 通过 WaitStrategy 来决定消费者如何等待新的数据到达。常见策略包括:

  • BusySpinWaitStrategy:自旋等待,适用于低延迟应用,但 CPU 开销较大。
  • SleepingWaitStrategy:适当休眠,减少 CPU 占用。
  • YieldingWaitStrategy:让出 CPU 时间片,适用于高吞吐场景。

3. Disruptor 的优势

3.1 无锁设计

传统队列使用 ReentrantLocksynchronized 来保证线程安全,而 Disruptor 通过 CAS(Compare-And-Swap)机制更新 Sequence,避免锁的开销。

3.2 高效的 CPU 缓存利用

Disruptor 采用 伪共享(False Sharing) 避免 CPU 缓存行竞争,并使用 缓存行填充(Cache Line Padding) 来减少缓存行失效。

3.3 生产者-消费者模型优化

Disruptor 允许多种消费者模式:

  • 单消费者:一个消费者处理所有数据。
  • 多消费者并行消费:多个消费者共同消费数据,提高吞吐量。
  • 菱形依赖消费:一个消费者的输出作为另一个消费者的输入。

4. 使用示例

4.1 引入依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

4.2 创建事件类

public class LongEvent {
    private long value;
    public void set(long value) {
        this.value = value;
    }
    public long getValue() {
        return value;
    }
}

4.3 定义事件工厂

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

4.4 事件处理器

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        System.out.println("Event: " + event.getValue());
    }
}

4.5 配置 Disruptor

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DisruptorExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        LongEventFactory factory = new LongEventFactory();
        int bufferSize = 1024;

        Disruptor<LongEvent> disruptor = new Disruptor<>(
            factory, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
        
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducer producer = new LongEventProducer(ringBuffer);
        
        for (long i = 0; i < 10; i++) {
            producer.onData(i);
        }
    }
}

5. 适用场景

Disruptor 适用于以下场景:

  • 高吞吐量、低延迟的消息队列
  • 日志系统(如 log4j2 采用 Disruptor 作为日志处理引擎)
  • 交易撮合系统
  • 事件驱动架构

6. 总结

今天先初步了解Disruptor的简单用法,后续会继续介绍Disruptor的特性,为什么性能秒杀JDK提供的队列,以及相关原理分析。

最后

欢迎follow加瓦点灯,每天推送干货知识!

本文由mdnice多平台发布

标签:Disruptor,初体验,消费者,disruptor,队列,CPU,RingBuffer,public
From: https://www.cnblogs.com/javadd/p/18684091

相关文章

  • 【LeetCode 刷题】栈与队列-基础操作
    此博客为《代码随想录》字符串章节的学习笔记,主要内容为栈与队列基础操作相关的题目解析。文章目录232.用栈实现队列225.用队列实现栈232.用栈实现队列题目链接classMyQueue:def__init__(self):self.in_s,self.out_s=[],[]......
  • 232. 用栈实现队列
    题目没想透的一点:对于要实现的队列的pop操作,其实先看输出栈是否为空,如果不为空,直接从输出栈弹出即可,如果为空,再将输入栈的元素依次压入输出栈(注意是全部压入输出栈,毕竟输入栈的最下面的那个元素其实是我们想要弹出的元素),再弹出输出栈的栈顶元素。卡哥思路讲得很清晰,跟着卡哥代码......
  • 单调队列:实用而好写的数据结构
    前言|Preface这几天连续做了好几道单调队列的题,难度从绿到蓝不等,摸索出了一些经验,也总结了一些单调队列的特点和规律。概述|Outline顾名思义,单调队列的重点分为「单调」和「队列」。「单调」指的是元素的「规律」——递增(或递减)。「队列」指的是元素只能从队头和队尾进......
  • StackOrQueueOJ2:用队列实现栈
    目录题目描述思路分析创建由队列实现的栈出栈压栈销毁代码展示题目描述原题:225.用队列实现栈思路分析这题我们需要知道栈和队列的差异,栈是先进后出,但队列是先进先出;出队列和出栈有冲突:创建由队列实现的栈这里我们要注意:如果使用MyStack*st创建,那是局部变......
  • C# PriorityQueue优先队列
    namespacePriorityQueueDemo{publicclassTask{publicstringName{get;set;}}publicclassTaskPriorityComparer:IComparer<(int,int)>{publicintCompare((int,int)x,(int,int)y){......
  • LeetCode栈和队列
    栈和队列LeetCode栈和队列刷题记录基础知识栈线性表,只允许在表的一段进行插入和删除操作,满足先进后出原则栈在python中没有特定的类或库函数,一般通过列表(list)或是collections.deque双端队列来实现liststack=[]stack.append(1)#压栈stack.append(2)print(st......
  • 单调队列优化dp
    一本通题解T2:再也不想碰这道题了。。。写了一下午。我们先设状态\(dp[i][j]\)表示前i个刷匠,考虑了前\(j\)个木板后所获得的最大价值(\(j\)个木板可以有空余)。然后枚举前\(i\)个刷匠,枚举每一条木板。对于一条木板可以此刷匠根本不刷,或不刷当前木板,状转方程:\[dp[i][j]......
  • 系统编程(进程通信--消息队列)
    消息队列概念:消息队列就是一个消息的链表,提供了一种由一个进程向另一个进程发送块数据的方法。另外,每一个数据块被看作有一个类型,而接收进程可以独立接收具有不同类型的数据块,在许多方面看来,消息队列类似于有名管道,但是却没有与打开与关闭管道的复杂关联。优点:1.通过发......
  • Kafka分布式消息队列
    一、概述kafka是一个分布式的基于发布/定义的消息队列(MessageQueue)通信处理同步处理客户端->数据库->发送短信->响应客户端异步处理客户端->数据库->发送短信放入MQ(直接响应客户端)消息队列的优势解耦:允许独立的处理两边处理过程,遵循接口约束即可可恢复性:当某......
  • 栈与队列(代码随想)
    目录1.理论分析1.1栈1.2队列2.用栈实现队列3.用队列实现栈算法公开课队列的基本操作!|LeetCode:225.用队列实现栈 (opensnewwindow)https://www.bilibili.com/video/BV1Fd4y1K7sm225.用队列实现栈-力扣(LeetCode)优化4.有效的括号5.删除字符串中的所有相邻重复......