首页 > 其他分享 >JUC之阻塞队列BlockingQueue的实现原理

JUC之阻塞队列BlockingQueue的实现原理

时间:2023-04-15 22:33:57浏览次数:32  
标签:JUC product nodeNodeItem 队列 custom key public BlockingQueue

1. 阻塞队列首先它是一个队列,是队列就会遵循先进先出(FIFO)的原则,又因为它是阻塞的,故与普通的队列有两点区别:

  A. 当一个线程向队列里面添加数据时,如果队列是满的,那么将阻塞该线程,暂停添加数据。

  B. 当一个线程从队列里面取出数据时,如果队列是空的,那么将阻塞该线程,暂停取出数据。

2. JUC中实现一个阻塞队列一般都会实现BlockingQueue接口,主要方法说明:

方法/处理方式

抛出异常

返回特殊值

一直阻塞

超时退出

插入数据方法

add(e)

offer(e)

put(e)

offer(e, timeout, unit)

移除数据方法

remove()

poll()

take()

pull(timeout, unit)

检查方法

element()

peek()

不可用

不可用

 

 

 

JUC之阻塞队列BlockingQueue的实现原理_阻塞队列

注:"返回特殊值"的意思是说当向队列插入(offer)数据时,会返回数据是否插入成功,成功返回true。如果是移除方法(poll),则是从队列里面取出一个数据,如果没有就返回null。

3. jdk里提供的阻塞队列

JUC之阻塞队列BlockingQueue的实现原理_java_02

   1). ArrayBlockingQueue

  基于数组的阻塞队列实现,其内部维护一个定长的数组,用于存储队列元素。线程阻塞的实现是通过ReentrantLock来完成的,数据的插入与取出共用同一个锁,因此ArrayBlockingQueue并不能实现生产、消费同时进行。而且在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

  2). LinkedBlockingQueue

  基于单向链表的阻塞队列实现,在初始化LinkedBlockingQueue的时候可以指定对立的大小,也可以不指定,默认类似一个无限大小的容量(Integer.MAX_VALUE),不指队列容量大小也是会有风险的,一旦数据生产速度大于消费速度,系统内存将有可能被消耗殆尽,因此要谨慎操作。另外LinkedBlockingQueue中用于阻塞生产者、消费者的锁是两个(锁分离),因此生产与消费是可以同时进行的。

  3). PriorityBlockingQueue  一个支持优先级排序的无界阻塞队列

  4). SynchronousBlockingQueue  一个不存储原色的阻塞队列

  5). DelayQueue  一个使用优先级队列队列实现的无界阻塞队列

4. 实现一个简单的阻塞队列,基于List实现,生产、消费共用同一个锁,而且是一个有界的队列

 1). 实现阻塞队:MyBlockingQueue

1 package com.winter.juc;
 2 
 3 import java.util.*;
 4 import java.util.concurrent.locks.Condition;
 5 import java.util.concurrent.locks.Lock;
 6 import java.util.concurrent.locks.ReentrantLock;
 7 
 8 public class MyBlockingQueue<E> {
 9 
10     private int count;
11     private int capacity;
12     private final List<E> containor;
13 
14     private final Lock lock = new ReentrantLock();
15     private final Condition notFull = lock.newCondition();
16     private final Condition notEmpty = lock.newCondition();
17 
18     public MyBlockingQueue(int capacity) {
19         containor = new ArrayList<E>(capacity);
20         this.count = 0;
21         this.capacity = capacity;
22     }
23 
24     public void put(E data) throws InterruptedException {
25         lock.lockInterruptibly();
26         try {
27             while (count == capacity) {
28                 notFull.await();
29             }
30             containor.add(data);
31             this.count++;
32             notEmpty.signal();
33         } finally {
34             lock.unlock();
35         }
36     }
37 
38     public E take() throws InterruptedException {
39         lock.lockInterruptibly();
40         try {
41             while (count == 0) {
42                 notEmpty.await();
43             }
44             this.count--;
45             notFull.signal();
46             E node = containor.get(0);
47             containor.remove(0);
48             return node;
49         } finally {
50             lock.unlock();
51         }
52     }
53 }

 

2).接下来使用该队列

生产者:

1 package com.winter.juc;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 import org.apache.commons.lang3.RandomUtils;
 5 
 6 public class Productor implements Runnable {
 7    private MyBlockingQueue<NodeItem> queue;
 8 
 9    public Productor(MyBlockingQueue<NodeItem> queue) {
10       this.queue = queue;
11    }
12 
13    public void run() {
14       while (true) {
15          try {
16             TimeUnit.SECONDS.sleep(RandomUtils.nextInt(0, 5));
17             NodeItem node = new NodeItem();
18             node.setKey(GlobalKey.get());
19             System.out.println("produce a node" + node);
20             queue.put(node);
21          } catch (InterruptedException e) {
22             e.printStackTrace();
23          }
24       }
25    }
26 }

 

消费者:

1 package com.winter.juc;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 import org.apache.commons.lang3.RandomUtils;
 5 
 6 public class Customer implements Runnable {
 7     private MyBlockingQueue<NodeItem> queue;
 8     public Customer(MyBlockingQueue<NodeItem> queue) {
 9         this.queue = queue;
10     }
11 
12     public void run() {
13         while (true) {
14             try {
15                 TimeUnit.SECONDS.sleep(RandomUtils.nextInt(0, 5));
16                 NodeItem node = queue.take();
17                 System.out.println("consume a node" + node);
18             } catch (InterruptedException e) {
19                 e.printStackTrace();
20             }
21         }
22 
23     }
24 }

 

元素实体:

1 package com.winter.juc;
 2 public class NodeItem {
 3 
 4     private int key;
 5 
 6     public int getKey() {
 7         return key;
 8     }
 9 
10     public void setKey(int key) {
11         this.key = key;
12     }
13 
14     @Override
15     public String toString() {
16         return "NodeItem{" +
17                 "key=" + key +
18                 '}';
19     }
20 }

 

全局键值:

1 package com.winter.juc;
 2 import java.util.concurrent.atomic.AtomicInteger;
 3 public class GlobalKey {
 4 
 5     private static AtomicInteger key = new AtomicInteger(1);
 6 
 7     public static int get() {
 8         return key.getAndIncrement();
 9     }
10 }

 

main方法:

1 package com.winter.juc;
 2 
 3 import java.util.concurrent.Executors;
 4 import java.util.concurrent.ExecutorService;
 5 
 6 public class ProviderService {
 7     public static void main(String[] args) {
 8 
 9         final MyBlockingQueue blockingQueue = new MyBlockingQueue(3);
10 
11         ExecutorService exec = Executors.newCachedThreadPool();
12 
13         exec.submit(new Productor(blockingQueue));
14         exec.submit(new Customer(blockingQueue));
15     }
16 }

 

执行结果:

product a nodeNodeItem{key=1}
custom a nodeNodeItem{key=1}
product a nodeNodeItem{key=2}
custom a nodeNodeItem{key=2}
product a nodeNodeItem{key=3}
custom a nodeNodeItem{key=3}
product a nodeNodeItem{key=4}
custom a nodeNodeItem{key=4}
product a nodeNodeItem{key=5}
product a nodeNodeItem{key=6}
custom a nodeNodeItem{key=5}
product a nodeNodeItem{key=7}
custom a nodeNodeItem{key=6}
custom a nodeNodeItem{key=7}
product a nodeNodeItem{key=8}
custom a nodeNodeItem{key=8}
product a nodeNodeItem{key=9}
custom a nodeNodeItem{key=9}
product a nodeNodeItem{key=10}
product a nodeNodeItem{key=11}
product a nodeNodeItem{key=12}
custom a nodeNodeItem{key=10}
custom a nodeNodeItem{key=11}
product a nodeNodeItem{key=13}
product a nodeNodeItem{key=14}
custom a nodeNodeItem{key=12}
product a nodeNodeItem{key=15}
custom a nodeNodeItem{key=13}
product a nodeNodeItem{key=16}
product a nodeNodeItem{key=17}
custom a nodeNodeItem{key=14}
product a nodeNodeItem{key=18}
custom a nodeNodeItem{key=15}
product a nodeNodeItem{key=19}
custom a nodeNodeItem{key=16}
custom a nodeNodeItem{key=17}
custom a nodeNodeItem{key=18}
product a nodeNodeItem{key=20}
custom a nodeNodeItem{key=19}
custom a nodeNodeItem{key=20}
product a nodeNodeItem{key=21}
custom a nodeNodeItem{key=21}
product a nodeNodeItem{key=22}
custom a nodeNodeItem{key=22}

可以看到product和custom是对应执行的。

后续会继续实现一个类似LinkedBlockingQueue的阻塞队列,支持锁分离。

标签:JUC,product,nodeNodeItem,队列,custom,key,public,BlockingQueue
From: https://blog.51cto.com/u_14682436/6192473

相关文章

  • 栈和队列的基本操作
     目录一.栈和队列的概念......
  • 华电软工非全研究生室内定位研究-室内定位物联网平台中的时序数据库和mq队列他们的作
    时序数据库(TimeSeriesDatabase,TSDB)是一种专门用于存储和处理时间序列数据的数据库系统。在室内定位物联网平台中,时序数据库通常用于存储传感器采集的数据,如定位节点的位置、传感器数据等。时序数据库具有以下优点:优点:快速插入和查询时序数据,适用于海量数据存储和实时数据分析;......
  • python3多线程-线程池和优先队列
    1、介绍有两种线程池方案。各线程持续存在,从任务池获取任务进行执行按照需求创建线程,每个线程只执行一个任务,结束完毕则该线程结束2、准备(1)任务池task_list任务池是用于准备各任务单元的环境,比如http爆破时的请求参数,读写文件时的路径。任务池的准备可能会占用一定时间,边准......
  • 23-4-14--链表--银行排队问题之单队列多窗口服务
    假设银行有K个窗口提供服务,窗口前设一条黄线,所有顾客按到达时间在黄线后排成一条长龙。当有窗口空闲时,下一位顾客即去该窗口处理事务。当有多个窗口可选择时,假设顾客总是选择编号最小的窗口。本题要求输出前来等待服务的N位顾客的平均等待时间、最长等待时间、最后完成时间,并且统......
  • 设计循环队列
    设计循环队列题目链接思路这道题如果用循环链表会有很多问题,如图下 下面首先说一下用数组实现循环队列的结构 然后用这个结构实现入队,出队,判空,判满操作操作,如图下  下面代码实现typedefstruct{int*dys;intfront;intrear;......
  • 剑指 Offer 09. 用两个栈实现队列 && leetcode225.用队列实现栈
     剑指Offer09.用两个栈实现队列 classCQueue{private:stack<int>inStack,outStack;voidin2out(){//这里必须是while循环,如果是if判断,则输出栈日常只有一个值,没有起到先入后出的作用while(!inStack.empty()){//将输入栈栈顶......
  • [USACO12MAR]Flowerpot S 单调队列
    [USACO12MAR]FlowerpotStag:单调队列很惭愧,今天发现自己连滑动窗口都不会了,遂做了一些题两滴水的高度之差大于等于D的情况下的最小花盆宽度暴力思路:对于任意两点求水滴高度差是否大于等于D,若大于等于\(D\)则计算最下的两点距离\(w\)但这显然是能过但不完全过,手玩一下样例,是......
  • 华为认证HCIE Datacom培训理论技术学习关于中队列技术
    华为认证HCIEDatacom培训理论技术学习关于中队列技术关注WOLFLAB网络技术实验室,讲师:崔志鹏,杨广成。关注我,每周都会更新,华为认证WOLFLAB网络技术实验室!华为认证HCIE(1) FIFO:先进先出队列,是单队列技术,不会引入额外延迟,延迟只与队列长度有关,不提供任何差分服务。(2) RR:轮询调度,采用轮询......
  • 栈实现队列
    用两个栈实现队列题目链接 思路首先,梳理下栈和队列的概念,如下图栈中所有数据遵循后入先出,而队列是先入先出然后,理解用两个栈模拟出的队列结构最后思考如何用模拟出的队列实现入队,出队,取队头数据和判空操作,这里说一下我的思路入队:入pushst栈出队:将pu......
  • 3.【RabbitMQ实战】- 工作队列(Work Queue)
    工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。轮询分发消息......