首页 > 编程语言 >大数据必学Java基础(六十六):BlockingQueue常见子类

大数据必学Java基础(六十六):BlockingQueue常见子类

时间:2022-10-03 11:34:15浏览次数:55  
标签:Java 子类 必学 System aq println new public out


大数据必学Java基础(六十六):BlockingQueue常见子类_java

文章目录

​BlockingQueue常见子类​

​一、ArrayBlockingQueue​

​1、添加元素​

​2、获取元素​

​3、源码​

​4、其他的添加或者获取的方法都是依托与这个入队和出队的基础方法​

​5、感受一下put和take的阻塞​

​二、LinkedBlockingQueue​

​1、添加元素​

​2、取出元素​

​3、特点​

​4、源码​

​5、put的阻塞​

​三、SynchronousQueue​

​1、先添加元素​

​2、put方法阻塞​

​3、先取再放​

​4、poll方法​

​四、PriorityBlockingQueue ​

​1、添加null数据​

​2、添加四个数据​

​3、取出数据​


BlockingQueue常见子类

一、ArrayBlockingQueue

源码中的注释的解释说明:

大数据必学Java基础(六十六):BlockingQueue常见子类_jvm_02

1、添加元素

package com.lanson.test05;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author : Lansonli
*/
public class Test01 {
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException {
//创建一个队列,队列可以指定容量指定长度3:
ArrayBlockingQueue aq = new ArrayBlockingQueue(3);
//添加元素:
//【1】添加null元素:不可以添加null元素,会报空指针异常:NullPointerException
//aq.add(null);
//aq.offer(null);
//aq.put(null);
//【2】正常添加元素:
aq.add("aaa");
aq.offer("bbb");
aq.put("ccc");
System.out.println(aq);//[aaa, bbb, ccc]

//【3】在队列满的情况下,再添加元素:
//aq.add("ddd");//在队列满的情况下,添加元素 出现异常:Queue full
//System.out.println(aq.offer("ddd"));//没有添加成功,返回false
//设置最大阻塞时间,如果时间到了,队列还是满的,就不再阻塞了
//aq.offer("ddd",2, TimeUnit.SECONDS);
//真正阻塞的方法: put ,如果队列满,就永远阻塞
aq.put("ddd");
System.out.println(aq);
}
}

2、获取元素

package com.lanson.test05;

import javax.sound.midi.Soundbank;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author : Lansonli
*/
public class Test02 {
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException {
//创建一个队列,队列可以指定容量指定长度3:
ArrayBlockingQueue aq = new ArrayBlockingQueue(3);
aq.add("aaa");
aq.add("bbb");
aq.add("ccc");
//得到头元素但是不移除
System.out.println(aq.peek());
System.out.println(aq);
//得到头元素并且移除
System.out.println(aq.poll());
System.out.println(aq);
//得到头元素并且移除
System.out.println(aq.take());
System.out.println(aq);

//清空元素:
aq.clear();
System.out.println(aq);

System.out.println(aq.peek());//null
System.out.println(aq.poll());//null
//设置阻塞事件,如果队列为空,返回null,时间到了以后就不阻塞了
//System.out.println(aq.poll(2, TimeUnit.SECONDS));
//真正阻塞:队列为空,永远阻塞
System.out.println(aq.take());


}
}

3、源码

public class ArrayBlockingQueue<E> {
//底层就是一个数组:
final Object[] items;
//取元素用到的索引,初始结果为0
int takeIndex;
//放元素用到的索引,初始结果为0
int putIndex;
//数组中元素的个数:
int count;

//一把锁:这个锁肯定很多方法中用到了,所以定义为属性,初始化以后可以随时使用
final ReentrantLock lock;

//锁伴随的一个等待吃:notEmpty
private final Condition notEmpty;

//锁伴随的一个等待吃:notFull
private final Condition notFull;

//构造器:
public ArrayBlockingQueue(int capacity) {//传入队列指定的容量
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {//传入队列指定的容量
//健壮性考虑
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化底层数组
this.items = new Object[capacity];
//初始化锁 和 等待队列
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

//两个基本方法:一个是入队,一个是出队 ,是其他方法的基础:
//入队:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;//底层数组赋给items
//在对应的下标位置放入元素
items[putIndex] = x;
if (++putIndex == items.length) //++putIndex putIndex 索引 加1
putIndex = 0;
//每放入一个元素,count加1操作
count++;
notEmpty.signal();
}


//出队:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;//底层数组赋给items
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];//在对应的位置取出元素
items[takeIndex] = null;//对应位置元素取出后就置为null
if (++takeIndex == items.length)//++takeIndex 加1操作
takeIndex = 0;
count--;//每取出一个元素,count减1操作
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;//将取出的元素作为方法的返回值
}

}

takeIndex和putIndex置为0的原因:

大数据必学Java基础(六十六):BlockingQueue常见子类_BlockingQueue_03

4、其他的添加或者获取的方法都是依托与这个入队和出队的基础方法

大数据必学Java基础(六十六):BlockingQueue常见子类_开发语言_04

5、感受一下put和take的阻塞

大数据必学Java基础(六十六):BlockingQueue常见子类_main方法_05

上面的while不可以换为if,因为如果notFull中的线程被激活的瞬间,有其他线程放入元素,那么队列就又满了。

那么沿着await后面继续执行就不可以,所以一定要反复确定队列是否满的,才能放入元素。

 

二、LinkedBlockingQueue

一个可选择的有边界的队列:

意思就是队列的长度可以指定,也可以不指定

大数据必学Java基础(六十六):BlockingQueue常见子类_java_06

 

1、添加元素

package com.lanson.test05;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author : Lansonli
*/
public class Test01 {
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException {
//创建一个队列,队列可以指定容量指定长度3:
LinkedBlockingQueue aq = new LinkedBlockingQueue(3);
//添加元素:
//【1】添加null元素:不可以添加null元素,会报空指针异常:NullPointerException
//aq.add(null);
//aq.offer(null);
aq.put(null);
//【2】正常添加元素:
aq.add("aaa");
aq.offer("bbb");
aq.put("ccc");
System.out.println(aq);//[aaa, bbb, ccc]

//【3】在队列满的情况下,再添加元素:
//aq.add("ddd");//在队列满的情况下,添加元素 出现异常:Queue full
//System.out.println(aq.offer("ddd"));//没有添加成功,返回false
//设置最大阻塞时间,如果时间到了,队列还是满的,就不再阻塞了
//aq.offer("ddd",2, TimeUnit.SECONDS);
//真正阻塞的方法: put ,如果队列满,就永远阻塞
aq.put("ddd");
System.out.println(aq);
}
}

 

2、取出元素

package com.lanson.test05;

import javax.sound.midi.Soundbank;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* @author : Lansonli
*/
public class Test02 {
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException {
//创建一个队列,队列可以指定容量指定长度3:
LinkedBlockingQueue aq = new LinkedBlockingQueue();
aq.add("aaa");
aq.add("bbb");
aq.add("ccc");
//得到头元素但是不移除
System.out.println(aq.peek());
System.out.println(aq);
//得到头元素并且移除
System.out.println(aq.poll());
System.out.println(aq);
//得到头元素并且移除
System.out.println(aq.take());
System.out.println(aq);

//清空元素:
aq.clear();
System.out.println(aq);

System.out.println(aq.peek());//null
System.out.println(aq.poll());//null
//设置阻塞事件,如果队列为空,返回null,时间到了以后就不阻塞了
//System.out.println(aq.poll(2, TimeUnit.SECONDS));
//真正阻塞:队列为空,永远阻塞
System.out.println(aq.take());


}
}

3、特点

ArrayBlockingQueue : 不支持读写同时操作,底层基于数组的。

LinkedBlockingQueue:支持读写同时操作,并发情况下,效率高。底层基于链表。

4、源码

入队操作:

大数据必学Java基础(六十六):BlockingQueue常见子类_jvm_07

 出队操作:

大数据必学Java基础(六十六):BlockingQueue常见子类_main方法_08

public class LinkedBlockingQueue<E>{
//内部类Node就是链表的节点的对象对应的类:
static class Node<E> {
E item;//封装你要装的那个元素

Node<E> next;//下一个Node节点的地址

Node(E x) { item = x; }//构造器
}
//链表的长度
private final int capacity;
//计数器:
private final AtomicInteger count = new AtomicInteger();
//链表的头结点
transient Node<E> head;
//链表的尾结点
private transient Node<E> last;
//取元素用的锁
private final ReentrantLock takeLock = new ReentrantLock();
//等待池
private final Condition notEmpty = takeLock.newCondition();
//放元素用的锁
private final ReentrantLock putLock = new ReentrantLock();
//等待池
private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);//调用类本类的空构造器,传入正21亿
}

public LinkedBlockingQueue(int capacity) {
//健壮性考虑
if (capacity <= 0) throw new IllegalArgumentException();
//给队列指定长度
this.capacity = capacity;
//last,head指向一个新的节点,新的节点中 元素为null
last = head = new Node<E>(null);
}


//入队:
private void enqueue(Node<E> node) {
last = last.next = node;
}

//出队:
private E dequeue() {
Node<E> h = head;//h指向了head
Node<E> first = h.next;//first 指向head的next
h.next = h; // help GC h.next指向自己,更容易被GC发现 被GC
head = first;//head的指向指为first
E x = first.item;//取出链中第一个元素,给了x
first.item = null;
return x;//把x作为方法的返回值
}
}

5、put的阻塞

阻塞的前提是  队列是固定长度的

大数据必学Java基础(六十六):BlockingQueue常见子类_BlockingQueue_09

三、SynchronousQueue

大数据必学Java基础(六十六):BlockingQueue常见子类_main方法_10

这个特殊的队列设计的意义:

大数据必学Java基础(六十六):BlockingQueue常见子类_java_11

 

大数据必学Java基础(六十六):BlockingQueue常见子类_开发语言_12

 

1、先添加元素

public class Test01 {
//这是main方法,程序的入口
public static void main(String[] args) {
SynchronousQueue sq = new SynchronousQueue();
sq.add("aaa");
}
}

直接报错:说队列满了,因为队列没有容量,理解为满也是正常的

大数据必学Java基础(六十六):BlockingQueue常见子类_main方法_13

2、put方法阻塞

队列是空的,可以理解为队列满了,满的话放入元素 put 一定会阻塞 

public class Test01 {
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException {
SynchronousQueue sq = new SynchronousQueue();
sq.put("aaa");
}
}

大数据必学Java基础(六十六):BlockingQueue常见子类_jvm_14

 

3、先取再放

package com.lanson.test06;

import java.util.concurrent.SynchronousQueue;

/**
* @author : Lansonli
*/
public class Test02 {
//这是main方法,程序的入口
public static void main(String[] args) {
SynchronousQueue sq = new SynchronousQueue();

//创建一个线程,取数据:
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
System.out.println(sq.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

//搞一个线程,往里面放数据:
new Thread(new Runnable() {
@Override
public void run() {
try {
sq.put("aaa");
sq.put("bbb");
sq.put("ccc");
sq.put("ddd");
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}).start();
}
}

结果:

大数据必学Java基础(六十六):BlockingQueue常见子类_jvm_15

4、poll方法

package com.lanson.test06;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
* @author : Lansonli
*/
public class Test02 {
//这是main方法,程序的入口
public static void main(String[] args) {
SynchronousQueue sq = new SynchronousQueue();

//创建一个线程,取数据:
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
//设置一个阻塞事件:超出事件就不阻塞了
Object result = sq.poll(5, TimeUnit.SECONDS);
System.out.println(result);
if(result == null){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

//搞一个线程,往里面放数据:
new Thread(new Runnable() {
@Override
public void run() {
try {
sq.put("aaa");
sq.put("bbb");
sq.put("ccc");
sq.put("ddd");
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}).start();
}
}

 

大数据必学Java基础(六十六):BlockingQueue常见子类_main方法_16

注意:取出元素 不能用peek,因为peek不会将元素从队列中拿走,只是查看的效果;

四、PriorityBlockingQueue 

带有优先级的阻塞队列。

优先级队列,意味着队列有先后顺序的,数据有不同的权重。 

无界的队列,没有长度限制,但是在你不指定长度的时候,默认初始长度为11,也可以手动指定,

当然随着数据不断的加入,底层(底层是数组Object[])会自动扩容,直到内存全部消耗殆尽了,导致 OutOfMemoryError内存溢出 程序才会结束。

大数据必学Java基础(六十六):BlockingQueue常见子类_jvm_17

不可以放入null元素的,不允许放入不可比较的对象(导致抛出ClassCastException),对象必须实现内部比较器或者外部比较器。 

1、添加null数据

public class Test {
//这是main方法,程序的入口
public static void main(String[] args) {
PriorityBlockingQueue pq = new PriorityBlockingQueue();
pq.put(null);
}
}

大数据必学Java基础(六十六):BlockingQueue常见子类_开发语言_18

2、添加四个数据

package com.lanson.test07;

/**
* @author : Lansonli
*/
public class Student implements Comparable<Student> {
String name;
int age;

public Student() {
}

public Student(String name, int age) {
this.name = name;
this.age = age;
}

@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}

@Override
public int compareTo(Student o) {
return this.age - o.age;
}
}


package com.lanson.test07;

import java.util.concurrent.PriorityBlockingQueue;

/**
* @author : Lansonli
*/
public class Test02 {
//这是main方法,程序的入口
public static void main(String[] args) {
PriorityBlockingQueue<Student> pq = new PriorityBlockingQueue<>();
pq.put(new Student("nana",18));
pq.put(new Student("lulu",11));
pq.put(new Student("feifei",6));
pq.put(new Student("mingming",21));
System.out.println(pq);
}
}

结果:

大数据必学Java基础(六十六):BlockingQueue常见子类_java_19

发现结果并没有按照优先级顺序排列 

3、取出数据

package com.lanson.test07;

import java.util.concurrent.PriorityBlockingQueue;

/**
* @author : Lansonli
*/
public class Test02 {
//这是main方法,程序的入口
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Student> pq = new PriorityBlockingQueue<>();
pq.put(new Student("nana",18));
pq.put(new Student("lulu",11));
pq.put(new Student("feifei",6));
pq.put(new Student("mingming",21));
System.out.println("------------------------------------------");
System.out.println(pq.take());
System.out.println(pq.take());
System.out.println(pq.take());
System.out.println(pq.take());
}
}

大数据必学Java基础(六十六):BlockingQueue常见子类_开发语言_20

 

从结果证明,这个优先级队列,并不是在put数据的时候计算谁在前谁在后

而是取数据的时候,才真正判断谁在前谁在后

优先级:取数据的优先级


相关文章