首页 > 系统相关 >SpringBoot + Disruptor实现高并发内存消息队列

SpringBoot + Disruptor实现高并发内存消息队列

时间:2023-02-10 17:33:59浏览次数:67  
标签:Disruptor SpringBoot disruptor com Cache 内存 import message Consumer

1. 简介

  Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。
  Disruptor区别于Kafka、RabbitMQ等消息队列,它是一个高性能的线程间异步通信的框架,即在同一个JVM进程中的多线程间消息传递。和ArrayBlockingQueue比较类似,但是它是一个有界无锁的高并发队列,如果项目中使用ArrayBlockingQueue在多线程之间传递消息,可以考虑是用Disruptor来代替。
  Github:https://github.com/LMAX-Exchange/disruptor
  文档:https://lmax-exchange.github.io/disruptor/
  资料及测试结论:https://lmax-exchange.github.io/disruptor/disruptor.html#_overview

2. 官方测试

  • 吞吐量性能测试

  参考ArrayBlockingQueue(简称ABQ)在不同机器、不同消费模式(下文详细介绍)环境下的性能对比,详情浏览官方测试报告:https://lmax-exchange.github.io/disruptor/disruptor.html#_throughput_performance_testing

Nehalem 2.8Ghz – Windows 7 SP1 64-bit Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit
ABQ

Disruptor

ABQ

Disruptor

Unicast: 1P – 1C 5,339,256 25,998,336 4,057,453 22,381,378
Pipeline: 1P – 3C 2,128,918 16,806,157 2,006,903 15,857,913
Sequencer: 3P – 1C 5,539,531 13,403,268 2,056,118 14,540,519
Multicast: 1P – 3C 1,077,384 9,377,871 260,733 10,860,121
Diamond: 1P – 3C 2,113,941 16,143,613 2,082,725 15,295,197
  • 延迟性能测试

  参考ArrayBlockingQueue(简称ABQ)在相同机器、相同同消费模式环境下的对比,详情浏览官方测试报告:https://lmax-exchange.github.io/disruptor/disruptor.html#_latency_performance_testing

Array Blocking Queue (ns) Disruptor (ns)
Min Latency 145 29
Mean Latency 32,757 52
99% observations less than 2,097,152 128
99.99% observations less than 4,194,304 8,192
Max Latency 5,069,086 175,567

3. 相关博客

  【面试专栏】Java 阻塞队列
  【面试专栏】Java常用并发工具类
  【面试专栏】JAVA锁机制
  【面试专栏】JAVA CAS(Conmpare And Swap)原理
  【面试专栏】Java并发编程:volatile关键字

4. Java阻塞队列存在的弊端

  常用的Java阻塞队列:

名称 是否有界 是否加锁 数据结构 队列类型
ArrayBlockingQueue 有界 加锁 数组 阻塞
LinkedBlockingQueue 可选有界 加锁 链表 阻塞
PriorityBlockingQueue 无界 加锁 数组 阻塞
DelayQueue 无界 加锁 数组 阻塞
LinkedTransferQueue 无界 无锁 链表 阻塞
LinkedBlockingDeque 可选有界 有锁 链表 阻塞

  在使用中,为了防止生产过快而消费不及时导致的内存溢出,或垃圾回收频繁导致的性能问题,一般会使用有界且数据结构为数组的阻塞队列,即:ArrayBlockingQueue。
        但是,ArrayBlockingQueue使用加锁的方式保证线程安全,在低延迟的场景中表现悲观,且存在伪共享的问题,因此结果不尽人意。

5. 伪共享

5.1 CPU内部存储结构

  现在的CPU都是多个CPU核心,如下图。为了提高访问效率,都有缓存行。每个核中都有L1 Cache和L2 Cache,L3 Cache则在多核之间共享。CPU在执行运算时,首先从L1 Cache查找数据,找不到则以一次从L2 Cache、L3 Cache查找,如果还是没有,则会去内存中查找,路径越长,耗时越长,性能越低。当数据被修改后,通过主线通知其他CPU将读取的数据标记为失效状态,下次访问时从内存重新读取数据到Cache。

  对于计算机的存储设备而言,除了CPU之外,外部还有内存和磁盘。如下图,存储容量越来越大,成本越来越大,但访问速度却越来越慢。

5.2 缓存行

  CPU从内存中加载数据时,并不是一个字节一个字节的加载,而是一块一块的的加载数据,这样的一块称为:缓存行(Cache Line),即缓存行是CPU读取数据的最小单位。
  CPU的缓存行一般为32 ~ 128字节,常见的CPU缓存行为64字节。

# 查询CPU的Cache Line大小
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size

  对于数组而言,CPU每次会加载数据中多个数据到Cache中。所以,如果按照物理内存地址分布的顺序去访问数据,Cache命中率最高,从而减少从内存加载数据的次数,提高性能。
  但对于单个变量而言,会存在Cache伪共享的问题。假设定义衣蛾Long类型的变量A,占用8个字节,则CPU每次从内存中读取数剧是,会连同连续地址内的其余7个数据一并加载到Cache中。

5.3 伪共享

  多个CPU缓存遵循MESI协议
  假设,定义两个变量A、B,线程1绑定Core1,读取变量A,线程2绑定Core2,读取变量B。

  Core1和Core2分别读取变量A、B到Cache中,但变量A、B在同一Cache Line,因此,Core1和Core2都会把数据A、B加载到Cache中。

  线程1通过Core1修改变量A。首先通过主线发送消息给Core2将存放变量A的Cache Line标记为失效状态,然后将Core1中的Cache Line的状态标为已修改,并修改数据。

  线程2修改变量B时,发现Cache Line的状态为失效,并且Core1中的Cache Line为已修改状态,则先把Core1中的变量A、B写回内存,然后从内存中重新读取变量A、B。然后通过主线发送消息给Core1将存放变量B的Cache Line标记为失效状态,然后将Core2中的Cache Line的状态标为已修改,并修改数据。

  如果线程1、2频繁交替修改变量A、B,则会重复以上步骤,导致Cache没有意义。虽然变量A、B之间没有任何关系,但属于同一Cache Line。
  这种多个线程同时读写同一个Cache Line的不同变量而导致CPU Cache失效的现象称为伪共享(False Sharing)。

5.4 ArrayBlockingQueue中的伪共享

  查看ArrayBlockingQueue源码,有三个核心变量:

# 出队下标
/** items index for next take, poll, peek or remove */
int takeIndex;

# 入队下标
/** items index for next put, offer, or add */
int putIndex;

# 队列中元素数量
/** Number of elements in the queue */
int count;

  这三个变量很容易放到同一Cache Line,这也是影响ArrayBlockingQueue性能的重要原因之一。

6. Disruptor如何避免伪共享

  ArrayBlockingQueue因为加锁和伪共享的问题影响性能,那Disruptor是如何避免这两个问题来提供性能的呢?主要表现在以下几个方面:

  • 采用环形数组结构
  • 采用CAS无锁方式
  • 添加额外的信息避免伪共享
6.1 环形数组结构

  环形数组(RingBuffer)结构是Disruptor的核心。官网对环形数据结构的剖析:Dissecting the Disruptor: What’s so special about a ring buffer?

优势:

  • 数组结构:当CPU Cache加载数据时,相邻的数据也会被加载到Cache中,避免CPU频繁从内存中获取数据。
  • 避免GC:实质还是一个普通的数组,当数据填满队列时(2^n - 1)时,再次添加数据回覆盖之前的数据。
  • 位运算:数组大小必须为2的n次方,通过位运算提高效率。
6.2 CAS无锁方式

        Disruptor与传统的队列不同,分为队首指针和队尾指针,而是只有一个游标器Sequencer,它可以保证生产的消息不回覆盖还未消费的消息。Sequencer分为两个实现类:SingleProducerSequencerMultiProducerSequencer,即单生产者和多生产者。当单个生产者时,生产者每次从RingBuffer中获取下一个可以生产的位置,然后存放数据;消费者先获取最大的可消费的位置,再读取数据进行消费。当多个生产者时,每个生产者下通过CAS竞争获取可以生产的位置,然后存放数据;每个消费者都需要先获取最大可消费的下标,然后读取数据进行消费。

6.3 添加额外信息

        RingBuffer的下标是一个volatile变量,即不用加锁就能保证多线程安全,同时每个long类型的下标还会附带7个long的额外变量,避免伪共享的问题。

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

7. 示例代码

  • 创建项目
  • 修改pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.c3stones</groupId>
    <artifactId>springboot-disruptor-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
  • 创建消息模型类
      用于在Disruptor之间传递消息。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 消息
 *
 * @author CL
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {

    private Object data;

    @Override
    public String toString() {
        return data.toString();
    }

}
  • 创建生产者
import com.c3stones.model.Message;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.RequiredArgsConstructor;

/**
 * 生产者
 *
 * @author CL
 */
@RequiredArgsConstructor
public class Producer {

    private final Disruptor disruptor;

    /**
     * 发送数据
     *
     * @param data 数据
     */
    public void send(Object data) {
        RingBuffer<Message> ringBuffer = disruptor.getRingBuffer();
        // 获取可以生成的位置
        long next = ringBuffer.next();
        try {
            Message msg = ringBuffer.get(next);
            msg.setData(data);
        } finally {
            ringBuffer.publish(next);
        }
    }

}
  • 创建分组模式消费者
import com.c3stones.model.Message;
import com.lmax.disruptor.WorkHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * 分组消费
 *
 * @author CL
 */
@Slf4j
@RequiredArgsConstructor
public class GroupConsumer implements WorkHandler<Message> {

    /**
     * 消费着编号
     */
    private final Integer number;

    /**
     * 分组消费:每个生产者生产的数据只能被一个消费者消费
     *
     * @param message 消息
     */
    @Override
    public void onEvent(Message message) {
        log.info("Group Consumer number: {}, message: {}", number, message);
    }

}
  • 创建重复模式消费者
import com.c3stones.model.Message;
import com.lmax.disruptor.EventHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * 重复消费
 *
 * @author CL
 */
@Slf4j
@RequiredArgsConstructor
public class RepeatConsumer implements EventHandler<Message> {

    /**
     * 消费着编号
     */
    private final Integer number;

    /**
     * 重复消费:每个消费者重复消费生产者生产的数据
     *
     * @param message    消息
     * @param sequence   当前序列号
     * @param endOfBatch 批次结束标识(常用于将多个消费着的数据依次组合到最后一个消费者统一处理)
     */
    @Override
    public void onEvent(Message message, long sequence, boolean endOfBatch) {
        log.info("Repeat Consumer number: {}, message: {}, curr sequence: {}, is end: {}",
                number, message, sequence, endOfBatch);
    }

}
  • 创建通用模式消费者
      一般采用这种更加通用的模式。
import com.c3stones.model.Message;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * 消费者
 *
 * @author CL
 */
@Slf4j
@RequiredArgsConstructor
public class Consumer implements WorkHandler<Message>, EventHandler<Message> {

    private final String desc;

    /**
     * 重复消费:每个消费者重复消费生产者生产的数据
     *
     * @param message    消息
     * @param sequence   当前序列号
     * @param endOfBatch 批次结束标识(常用于将多个消费着的数据依次组合到最后一个消费者统一处理)
     */
    @Override
    public void onEvent(Message message, long sequence, boolean endOfBatch) {
        this.onEvent(message);
    }

    /**
     * 分组消费:每个生产者生产的数据只能被一个消费者消费
     *
     * @param message 消息
     */
    @Override
    public void onEvent(Message message) {
        log.info("Group Consumer describe: {}, message: {}", desc, message);
    }

}

8. 单元测试

  • 测试分组消费:每个生产者生产的数据只能被一个消费者消费
import com.c3stones.consumer.GroupConsumer;
import com.c3stones.model.Message;
import com.c3stones.producer.Producer;
import com.lmax.disruptor.dsl.Disruptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.Executors;

/**
 * 分组消费 单元测试
 *
 * @author CL
 */
public class GroupConsumerTest {

    private Disruptor<Message> disruptor;

    @Before
    public void init() {
        GroupConsumer a = new GroupConsumer(1);
        GroupConsumer b = new GroupConsumer(2);

        disruptor = new Disruptor(Message::new, 1024, Executors.defaultThreadFactory());

        disruptor.handleEventsWithWorkerPool(a, b);

        disruptor.start();
    }

    @After
    public void close() {
        disruptor.shutdown();
    }

    @Test
    public void test() {
        Producer producer = new Producer(disruptor);

        Arrays.asList("aaa", "bbb").forEach(data -> producer.send(data));
    }

}

  单元测试结果:

[pool-1-thread-1] INFO com.c3stones.consumer.GroupConsumer - Group Consumer number: 1, message: bbb
[pool-1-thread-2] INFO com.c3stones.consumer.GroupConsumer - Group Consumer number: 2, message: aaa
  • 测试重复消费:每个消费者重复消费生产者生产的数据
import com.c3stones.consumer.RepeatConsumer;
import com.c3stones.model.Message;
import com.c3stones.producer.Producer;
import com.lmax.disruptor.dsl.Disruptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.Executors;

/**
 * 重复消费 单元测试
 *
 * @author CL
 */
public class RepeatConsumerTest {

    private Disruptor<Message> disruptor;

    @Before
    public void init() {
        RepeatConsumer a = new RepeatConsumer(1);
        RepeatConsumer b = new RepeatConsumer(2);

        disruptor = new Disruptor(Message::new, 1024, Executors.defaultThreadFactory());

        disruptor.handleEventsWith(a, b);

        disruptor.start();
    }

    @After
    public void close() {
        disruptor.shutdown();
    }

    @Test
    public void test() {
        Producer producer = new Producer(disruptor);

        Arrays.asList("aaa", "bbb").forEach(data -> producer.send(data));
    }

}

  单元测试结果:

[pool-1-thread-1] INFO com.c3stones.consumer.RepeatConsumer - Repeat Consumer number: 1, message: aaa, curr sequence: 0, is end: false
[pool-1-thread-2] INFO com.c3stones.consumer.RepeatConsumer - Repeat Consumer number: 2, message: aaa, curr sequence: 0, is end: false
[pool-1-thread-1] INFO com.c3stones.consumer.RepeatConsumer - Repeat Consumer number: 1, message: bbb, curr sequence: 1, is end: true
[pool-1-thread-2] INFO com.c3stones.consumer.RepeatConsumer - Repeat Consumer number: 2, message: bbb, curr sequence: 1, is end: true
  • 测试链路模式
/**
 * 测试链路模式
 * <p>
 * p => a -> b -> c
 * </p>
 */
@Test
public void testChain() throws InterruptedException {
    Consumer a = new Consumer("a");
    Consumer b = new Consumer("b");
    Consumer c = new Consumer("c");

    Disruptor<Message> disruptor = new Disruptor(Message::new, 1024, Executors.defaultThreadFactory());

    disruptor.handleEventsWith(a).then(b).then(c);

    disruptor.start();

    Producer producer = new Producer(disruptor);

    producer.send("Chain");

    Thread.sleep(1000);

    disruptor.shutdown();
}

  单元测试结果:

[pool-1-thread-1] INFO com.c3stones.consumer.Consumer - Group Consumer describe: a, message: Chain
[pool-1-thread-2] INFO com.c3stones.consumer.Consumer - Group Consumer describe: b, message: Chain
[pool-1-thread-3] INFO com.c3stones.consumer.Consumer - Group Consumer describe: c, message: Chain
  • 测试钻石模式
/**
     * 测试钻石模式
     * <p>
     *           a
     *        ↗     ↘
     *  p =>            c
     *        ↘     ↗
     *           b
     * </p>
     */
    @Test
    public void testDiamond() throws InterruptedException {
        Consumer a = new Consumer("a");
        Consumer b = new Consumer("b");
        Consumer c = new Consumer("c");

        Disruptor<Message> disruptor = new Disruptor(Message::new, 1024, Executors.defaultThreadFactory());

        disruptor.handleEventsWithWorkerPool(a, b).then(c);

        disruptor.start();

        Producer producer = new Producer(disruptor);

        producer.send("Diamond1");
        producer.send("Diamond2");

        Thread.sleep(1000);

        disruptor.shutdown();
    }

  单元测试结果:

[pool-1-thread-1] INFO com.c3stones.consumer.Consumer - Group Consumer describe: a, message: Diamond2
[pool-1-thread-2] INFO com.c3stones.consumer.Consumer - Group Consumer describe: b, message: Diamond1
[pool-1-thread-3] INFO com.c3stones.consumer.Consumer - Group Consumer describe: c, message: Diamond1
[pool-1-thread-3] INFO com.c3stones.consumer.Consumer - Group Consumer describe: c, message: Diamond2

9. 项目地址

  spring-boot-disuptor-demo

标签:Disruptor,SpringBoot,disruptor,com,Cache,内存,import,message,Consumer
From: https://www.cnblogs.com/cao-lei/p/17099729.html

相关文章

  • springboot开发日记(10)——web开发
    通过SpringInitializer进行项目创建文件->新建项目->选中SpringInitializer->勾选所需的插件和项目类型(本次勾选插件为:lombok、dev-tools、SpringConfigurationProcessor,......
  • SpringBoot启动流程
    SpringBoot项目都需要一个启动类。在启动类上标注@SpringBootApplication,在main方法中调用SpringApplication.run()方法,就可以启动项目:@SpringBootApplicationpublic......
  • springboot SpEL关于@ConditionalOnExpression注解,在使用spel表达式引用配置属性bean
    springboot关于@ConditionalOnExpression注解,在使用spel表达式引用配置属性bean导致提前初始化,无绑定数据的问题及相应的解决方法。SpringBoot版本<parent>......
  • 4.1内存的物理机制很简单
       内存实际上是一种名为内存IC的电子元件。虽然内存IC包括DRAM、SRAM、ROM等多种形式,但从外部来看,基本机制都是一样的。内存IC中有电源、地址信号、数据信号、控制......
  • Disruptor入门
    Disruptor介绍主页:http://lmax-exchange.github.io/disruptor/源码:https://github.com/LMAX-Exchange/disruptorGettingStarted:https://github.com/LMAX-Exchange/di......
  • 动态内存的开辟
    c程序的内存分配:执行程序会将程序加载到内存,内存大体上被分为三个区:栈段、堆段、数据段(全局变量和static变量)  栈:局部变量和形式参数会保存在栈区,函数调用完之后,释放......
  • 内存泄露 内存释放 和垃圾回收机制
    答:当使用一个参数,会进行内存的分配,内存的使用,内存的释放。什么会导致内存泄露答:1.意外的全局变量=》必须确保在使用过后将它设置为null2定时器=》清除定......
  • SpringBoot解决跨域问题
    遇到前端跨域访问问题,类似于这样的:在Springboot项目里加上这个配置文件CorsConfig.java,重启之后即可实现跨域访问,前端无需再配置跨域。importorg.springframework.......
  • arthas内存也可以分析 -线程死锁 -cpu高
    https://blog.csdn.net/qq_43692950/article/details/122688520......
  • Linux系统查看内存型号、主板、硬盘等信息
    1.Linux查看内存的插槽数,已经使用多少插槽.每条内存多大:[root@mxh~]#dmidecode|grep-A5"MemoryDevice"|grepSize|grep-vRangeSize:4096MBSize:4096MBSiz......