首页 > 其他分享 >多线程篇(阻塞队列- BlockingQueue)(持续更新迭代)

多线程篇(阻塞队列- BlockingQueue)(持续更新迭代)

时间:2024-09-08 09:51:42浏览次数:11  
标签:迭代 队列 阻塞 生产者 线程 多线程 数据 BlockQueue BlockingQueue

目录

一、了解什么是阻塞队列之前,需要先知道队列

1. Queue(接口)

二、阻塞队列

1. 前言

2. 什么是阻塞队列

3. Java里面常见的阻塞队列

三、BlockingQueue(接口)

1. 前言

2. 简介

3. 特性

3.1. 队列类型

3.2. 队列数据结构

2. 简介

4. 核心功能

入队(放入数据)

出队(取出数据)

总结

四、常见BlockQueue

1. LinkdBlockQueue

生产者

消费者

测试代码

2. DelayQueue

3. PriorityBlockingQueue

4. SynchronousQueue

5. 等等等

五、知识小结


一、了解什么是阻塞队列之前,需要先知道队列

1. Queue(接口)

定义了队列的基本功能,添加、删除、查询。

满足FIFO(先进先出原则)。

public interface Queue<E> extends Collection<E> {
    //添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
    boolean add(E e);
    //添加一个元素,添加成功返回true, 如果队列满了,返回false
    boolean offer(E e);
    //返回并删除队首元素,队列为空则抛出异常
    E remove();
    //返回并删除队首元素,队列为空则返回null
    E poll();
    //返回队首元素,但不移除,队列为空则抛出异常
    E element();
    //获取队首元素,但不移除,队列为空则返回null
    E peek();
}

二、阻塞队列

1. 前言

JDK中提供了一系列场景的并发安全队列。

总的来说,按照实现方式的不同可分为阻塞队列和非阻塞队列。

阻塞队列使用锁实现,而非阻塞队列则使用 CAS 非阻塞算法实现。

2. 什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入

和移除方法。

  1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  2. 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是 从队列里

取元素的线程。

阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如表6-1所示。

  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移 除方法,则是从队列里取出一个元素,如果没有则返回null。
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程 一段时间,如果超过了指定的时间,生产者线程就会退出。

这两个附加操作的4种处理方式不方便记忆,所以我找了一下这几个方法的规律。

put和take分别尾首含有字母t,offer和poll都含有字母o。

注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻

塞,而且使用offer方法时,该方法永远返回true。

3. Java里面常见的阻塞队列

JDK 7 提供了 7 个阻塞队列,如下。

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向无界阻塞队列。

注意 - 是针对AQS这一块

三、BlockingQueue(接口)

1. 前言

在上面,我们已经了解到什么事阻塞队列。

阻塞队列,顾名思义,首先他是一个队列常用的队列主要有以下两种方式

(当然通过不同的方式可以衍生出不同类别的队列方式,比如DelayQueue

  • 先进先出(FIFO):先插入队列的元素也先出队列,类似排队的功能,从某种程度上来说,这种队列体现了公平性
  • 先进后出(LIFO):后插入队列的元素,最先出队多线程环境,通过队列很容易实现数据共享,比如经典的生产者与消费者模型中,通过队列可以很方便的实现数据共享。

假设我们有若干个生产者线程,又有若干个消费者线程。假设生产者线程把准备好的数据要共享给

消费者线程,利用队列来共享数据,就可以很方便的解决他们之间数据共享问题。但如果生产者跟

消费者在某个时间段内,万一发生处理数据速度不匹配的情况呢?

理想情况下,假如生产者产出数据的速度大于消费者消费数据的速度,并且生产数据累积到一定程

度的时候,那生产者必须暂停等待一下(阻塞生产者线程),以便消费者线程把累积的数据处理完

毕。反之亦然。然而在Concurent 包发布之前,在多线程环境下,我们每个程序员都要自己去控制

这些细节,尤其还要兼顾效率和安全。这些会给我们的程序带来不少的复杂度。好在这是强大的

Concurent 包横空出世了,而他也给我们带来了强大的 BlockQueue,(在多线程领域,所谓阻塞,

会挂起线程(即阻塞),一旦满足条件,被挂起的线程,又会自动被唤醒)

  • 当队列中没有数据的情况下,消费端的所有线程都会被阻塞,直到有数据放入队列
  • 当队列中装满了数据的情况下,生产者端的所有线程都会被阻塞(挂起),直到队列中有空余的位置,线程被自动唤醒

这也是我们在多线程环境下,为什么使用BlockQueue 的原因,作为BlockQueue的使用者,我们

再也不需要关心什么时候阻塞线程,什么时候唤醒线程,因为一起 BlockQueue 都给你包办了,既

然BlockQueue 如此申通广大,让我们一起见识下他的方法。

2. 简介

在新增的 Concurent 包中,BlockQueue 很好解决了多线程中,如何高效传输数据的问题。

通过这些高效且线程安全的队列类,为我们搭建高质量的多线程程序带来极大的遍历。

在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的

机制,在许BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的

最有用的类,它的特性是多生产场景里都可以看到这个enq工具的身影。可以看到他是继承于JDK

的Queue。

接下来将会介绍BlockQueue家庭中的所有成员,包括他们的功能,一些经常使用的场景。

3. 特性

3.1. 队列类型

无限队列 (unbounded queue ) - 几乎可以无限增长

有限队列 ( bounded queue ) - 定义了最大容量

3.2. 队列数据结构

队列实质就是一种存储数据的结构,通常用链表或者数组实现。

一般而言队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列

主要操作:入队(EnQueue)与出队(Dequeue)

2. 简介

在上面,我们已经了解到什么事阻塞队列。

阻塞队列,顾名思义,首先他是一个队列常用的队列主要有以下两种方式

(当然通过不同的方式可以衍生出不同类别的队列方式,比如DelayQueue

  • 先进先出FIFO

先插入队列的元素也先出队列,类似排队的功能,从某种程度上来说,这种队列体现了公平性

  • 先进后出(LIFO)后插入队列的元素,最先出队多线程环境,通过队列很容易实现数据共享,比如经典的生产者与消费者模型中,通过队列可以很方便的实现数据共享。

假设我们有若干个生产者线程,又有若干个消费者线程。假设生产者线程把准备好的数据要共享给消费者线程,利用队列来共享数据,就可以很方便的解决他们之间数据共享问题。但如果生产者跟消费者在某个时间段内,万一发生处理数据速度不匹配的情况呢?

理想情况下,假如生产者产出数据的速度大于消费者消费数据的速度,并且生产数据累积到一定程度的时候,那生产者必须暂停等待一下(阻塞生产者线程),以便消费者线程把累积的数据处理完毕。反之亦然。然而在Concurent 包发布之前,在多线程环境下,我们每个程序员都要自己去控制这些细节,尤其还要兼顾效率和安全。这些会给我们的程序带来不少的复杂度。好在这是强大的 Concurent 包横空出世了,而他也给我们带来了强大的 BlockQueue,(在多线程领域,所谓阻塞,会挂起线程(即阻塞),一旦满足条件,被挂起的线程,又会自动被唤醒)

  • 如上图所示,当队列中没有数据的情况下,消费端的所有线程都会被阻塞,直到有数据放入队列
  • 如上图所示,当队列中装满了数据的情况下,生产者端的所有线程都会被阻塞(挂起),直到队列中有空余的位置,线程被自动唤醒

这也是我们在多线程环境下,为什么使用BlockQueue 的原因,作为BlockQueue的使用者,我们再也不需要关心什么时候阻塞线程,什么时候唤醒线程,因为一起 BlockQueue 都给你包办了,既然BlockQueue 如此申通广大,让我们一起见识下他的方法。

4. 核心功能

入队(放入数据)

  • offer(anObject):表示有可能的话,将Object 放入BlockQueue 中,如果BlockQueue 可以容纳,则返回true,否则返回false,(本方法不阻塞当前线程)。
  • offer( E e,Long timeOut,TimeUnit unit):可以设置指定的时间,如果指定时间内还没有往队列中加入,则返回false.
  • put(anObject):把anObject 加入到队列中,如果BlockQueue 中没有空间,则调用此方法的线程被阻塞直到 BlockQueue有空间在继续。

出队(取出数据)

  • poll(time):取走BlockQueue 中排在首位的对象,若不能及时取出,则等待time 参数规定的时间,取不出时返回null.
  • poll(long time ,TimeUnit unit):取出BlockQueue 中排在首位的对象,在指定时间内,一旦有数据就返回,超时还没有数据,则返回失败
  • take():取出BlockQueue 中排在首位的数据,如果BlockQueue 中没有数据,则当前线程一直阻塞,直到有新的数据加入。
  • drainTo():一次性的从BlockQueue中取出所有可用数据对象(还可以指定取出数据个数),通过该方法可以提升获取数据效率,不需要多次加锁和释放锁,
  • peek():获取队首元素,但不移除,队列为空则返回null

总结

当队列满了无法添加元素,或者是队列空了无法移除元素时:

抛出异常:add、remove、element

返回结果但不抛出异常:offer、poll、peek

阻塞:put、take

四、常见BlockQueue

在了解BlockQueue 后,让我们来了解下他的大家庭都有哪些成员

  • ArrayBlockQueue,基于数组阻塞队列实现,在ArrayBlockQueue 内,维护了一个定长数组,以便缓存数据对象,这是个常用的阻塞队列,除了定长的数组外,ArralyBlockQueue 还保存着两个定长的整形变量,分别标志着队列的头部跟尾部在数组中的位置。
  • ArrayBlockQueue生产者放入数据消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正的并行运行,这点尤其不同意LinkBlockQueue,按照实现原理来分析,ArrayBlockQueue 完全可用分离锁,从而实现生产者与消费者操作的完全并行执行,之所以没有这么做,也许是因为,ArrayBlockQueue 数据写入与获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性,其性能上完全占不到便宜。ArrayBlockQueue和LinkedBlockQueue还有一个明显的不同之处在于,前着在插入和删除对象元素时,不会产生和销毁任何的对象实列,而后者则会产生一个额外的 node 对象,在长时间内需要高效并发的处理大批量系统中,其对GC的影响还是存在一定的区别,而在创建ArrayBlockQueue时,我们还可以控制对象的内部锁是否使用公平锁,默认采用非公平锁。

1. LinkdBlockQueue

  • 基于链表的阻塞队列,同ArrayBlockQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返】回,只有当队列缓存区达到最大缓存容量时(LinkedBlockQueue可以通过构造函数指定该值),才会阻塞生产队列,直到消费者消费掉一份数据后,生产者线程才会被唤醒,反之对消费者这端的处理也基于同样的原理。LinkedBlockQueue 之所以能够高效的处理并发数据,还因为生产者和消费者端分别采用了独立的锁来控制数据同步,这也意味着,在高并发的情况下,生产者和消费者可以并行的操作队列中的数据,以此来提供提供的队列的并发性。
  • 作为开发者,我们需要注意的是, 如果构造一个LinkedBlockQueue 队像,而没有指定其容量大小,LinkedBlockQueue 会默认一个类似无线大小的容量(Integer.MAX_VALUE),这样的话,一旦生产者生产者的速度大于消费者的速度,也许还没等到队列满,阻塞产生,系统的内存就有可能被消耗殆尽了,
  • ArrayBlockQueue和 linkedBlockQueue 是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程的生产者和消费者问题,这两个类足以。
  • 下面演示了如何使用BlockQueue,

生产者

package com.cj.demo.thread.queue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
    private volatile boolean isRunning = true;//是否在运行标志
    private BlockingQueue queue;//阻塞队列
    private static AtomicInteger count = new AtomicInteger();//自动更新的值
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
    private String threadName="";

    //构造函数
    public Producer(BlockingQueue queue,String threadName) {

        this.queue = queue;
        this.threadName = threadName;
    }

    @Override
    public void run() {
        String data = null;
        Random r = new Random();

        System.out.println("启动生产者线程!");
        try {
            while (isRunning) {
                System.out.println("正在生产数据...");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一个随机数

                data = "data:" + count.incrementAndGet();//以原子方式将count当前值加1
                System.out.println(threadName+" 将数据:" + data + "放入队列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//设定的等待时间为2s,如果超过2s还没加进去返回true
                    System.out.println("放入数据失败:" + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出生产者线程!");
        }
    }

    public void stop() {
        isRunning = false;
    }
}

消费者

package com.cj.demo.thread.queue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable {
    private BlockingQueue<String> queue;
    private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

    //构造函数
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("启动消费者线程!");
        Random r = new Random();
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("正从队列获取数据...");
                String data = queue.poll(2, TimeUnit.SECONDS);//有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
                if (null != data) {
                    System.out.println("拿到数据:" + data);
                    System.out.println("正在消费数据:" + data);
                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                } else {
                    // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
                    isRunning = false;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("退出消费者线程!");
        }
    }
}

测试代码

package com.cj.demo.thread.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        // 声明一个容量为10的缓存队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

        //new了三个生产者和一个消费者
        Producer producer1 = new Producer(queue,"thread1");
        Producer producer2 = new Producer(queue,"thread2");
        Producer producer3 = new Producer(queue,"thread3");
        Consumer consumer = new Consumer(queue);

        // 借助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 启动线程
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);

        // 执行10s
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();

        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
}

2. DelayQueue

DelayQueue 中的元素,只有当其指定的延迟时间到了,才能够从队列中获取元素。

DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会阻塞, 而只有获取

数据消费者才会阻塞。

3. PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是

PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的

时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有

的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

4. SynchronousQueue

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者

拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,

如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue

来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,

而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对

于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方

面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及

时响应性能可能会降低。

声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。

公平模式和非公平模式的区别:

如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者

和消费者,从而体系整体的公平策略;

但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合

一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有

差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

5. 等等等

五、知识小结

BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理

了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。

标签:迭代,队列,阻塞,生产者,线程,多线程,数据,BlockQueue,BlockingQueue
From: https://blog.csdn.net/qq_51226710/article/details/142000346

相关文章

  • [Redis]Redis到底是单线程还是多线程程序?
    概述这里我们先给出问题的全面回答:Redis到底是多线程还是单线程程序要看是针对哪个功能而言,对于核心业务功能部分(命令操作处理数据),Redis是单线程的,主要是指Redis的网络IO和键值对读写是由一个线程来完成的,这也是Redis对外提供键值存储服务的主要流程,所以一般我们认为Red......
  • Java多线程中常见死锁问题及解决方案
    在编写Java多线程代码的时候,很难避免会出现线程安全问题,在线程安全问题中也有一个很常见的现象就是死锁现象。今天我们就来聊一聊Java中的死锁问题,以及如何避免死锁问题。本次知识点讲解建立在大家已经知道“锁”......
  • 【C++】简单易懂的vector迭代器
    一、迭代器的本质vector的迭代器本质上就是一个被封装的指针。迭代器的用法和指针的用法十分相似,就是一个像指针的假指针,我们不妨把迭代器看作一个伪指针。二、迭代器的使用句式(可以看到迭代器和指针很像):迭代器有四种:1、正向迭代器:容器名<类型>::iterator迭代器名2、常......
  • 多线程:线程安全
    线程安全多个线程,同时操作同一个共享资源的时候,可能会出现安全问题。例如:两个人来取钱的案例 publicclasstest{publicstaticvoidmain(String[]args){//1.创建一个账户对象。代表账户人的共享账户。Accountacc=newAccount("ICBC-110",1000......
  • python语言基础(七)--多进程多线程
    多进程,多线程1、多任务概述多个任务同时执行目的节约资源,充分利用CPU资源,提高效率表现形式并发:针对于单核CPU来讲的,如果有多个任务同时请求执行,但是同一瞬间CPU只能执行1个(任务),于是就安排它们交替执行.因为时间间隔非常短(CPU执行速度太快......
  • ros 多线程模式
    ros::spin()的作用在ROS中,ros::spin()的主要作用是:让ROS节点持续运行,并处理所有注册的回调函数。在内部,它不断地检查ROS网络中的消息、服务请求,并调用相应的回调函数来处理它们。内部原理事件循环(EventLoop):ros::spin()进入一个事件循环。这个循环不断地......
  • 多线程
    线程(Thread)概述线程是一个程序内部的一条执行流程。多线程指的是从软硬件上实现的多条执行流程的技术(多条线程由CPU负责调度执行)。多线程的创建方式1.继承Thread。重写run方法,在run中执行每次执行结果都会不一样publicclassThreadTest1{//main方法是一条或多条线程负责执......
  • Java高级编程—多线程(完整详解线程的三种实现方式、以及守护线程、出让线程、插入线程
    二十八.多线程文章目录二十八.多线程28.1线程的三种实现方式28.1.1第一种28.1.2第二种28.1.3第三种28.2常见的成员方法28.3守护线程28.4出让线程28.5插入线程28.6线程生命周期28.7同步代码块28.8同步方法28.1线程的三种实现方式继承Thread类的方式进行......
  • Java多线程
    Java多线程什么是多线程和线程安全?多线程多线程的特点:线程安全线程安全的实现方式:举例说明:Java实现线程安全的几种方式1.**使用同步块(`synchronized`)**2.**使用显式锁(`Lock`接口)**3.**使用并发容器**4.**原子类(`Atomic`包)**5.**线程本地变量(`ThreadLocal`)**6.**......
  • 多线程
    多线程进程和线程区别并行和并发区别创建线程的方式(高频)线程包含哪些状态,状态如何的变化(高频)搜生命周期现成顺序执行java中的wait和sleep方法的不同wait必须要和syn..锁一块使用,不然报错如何停止一个正在运行的线程线程安全问题synchronized关键字底层......