1 前言
看过了LinkedBlockingQueue、ArrayBlockingQueue、DelayQueue等阻塞队列,这节我们又要看一个不一样的队列,SynchronousQueue同步阻塞队列。
2 SynchronousQueue是什么
SynchronousQueue的同步队列,使用的场景比较少,主要是用来做线程之间的数据同步传输的。
线程之间的同步数据传输是什么意思?我看LinkedBlockingQueue、ArrayBlockingQueue不也是使用在线程之间传输数据吗,生产者线程往队列里面存入数据,消费者线程从队列里面取出数据,进行线程之间的数据传输。
确实是这样的,LinkedBlockingQueue、ArrayBlockingQueue、DelayQueue之类的阻塞队列,有一个容器的概念,生产者只需要往容器里面存入数据就完事了,如下图所示:
这里的图,生产者线程和消费者线程没有之间的交互;生产者只负责把数据放入容器中,消费者只负责从容器中取出数据进行消费。这种模型类似于生产者通过容器将数据间接的传递给了消费者。
然而SynchronousQueue的模型是不一样的,生产者和消费者的数据传输不是通过容器来的,而SynchronousQueue内部没有存储数据的容器。
(1)生产者是直接将数据传递给消费者的,通过手递手的方式来进行传递,同时消费者线程直接从生产者线程手中取得数据。
(2)如果生产者线程A传输过程中,发现没有消费者线程获取自己的数据,自己则阻塞等待;直到有消费者线程B取数据的时候唤醒生产者线程A,然后生产者线程A亲手数据交给消费者线程B。
(3)同理,如果消费者线程B取数据的时候,发现没有生产者线程传递数据过来,自己则阻塞等待;直到有生产者线程A传递数据的时候唤醒消费者线程B,然后生产者线程A亲手数据交给消费者线程B。
就类似如下的模型:
(1)如上图所示:生产者线程A、线程B在传递数据给消费者的时候;发现此时没有消费者线程取数据,此时自己进入阻塞队列阻塞等待。
(2)同理消费者线程C、线程D在取数据的时候,发现没有生产者线程传递数据过来,此时消费者线程进入阻塞队列阻塞等待。
我们继续,假设首先生产者线程A、线程B在传递数据的时候发现没有消费者线程取,自己阻塞等待。然后过一段时间之后,当消费者线程B、线程C来取数据就会得到如下图形:
(1)线程A、线程B在传递数据的时候发现没有消费者线程取数据,这个时候,线程A、线程B进入阻塞队列阻塞等待
(2)等一段时间之后,线程C来取数据了,这个时候发现已经有生产者在等着把数据给我了,这个时候我唤醒生产者线程A,然后线程A把数据传给线程C。
同样的道理,假设最开始是线程C、线程D来取数据,但是发现没有生产者传递数据过来,自己就会进入阻塞等待;但是一段时间之后生产者线程A、线程B来传递数据过来,就会唤醒线程C、线程D,然后亲手把数据传给线程C、线程D,就会得到如下图形:
(1)消费者线程C、线程D取数据发现,没有生产者给传数据给它们;然后就进入阻塞队列阻塞等待
(2)生产者线程A要进行数据传递,检查发现等待队列已经有人在等待取数据了,此时唤醒线程C,然后将数据交给线程C
那么通过上述我们大概可以总结下:
(1)首先SynchronousQueue队列是没有存储数据的容器的,要进行数据传输,必须是要进行手递手传递,也就是生产者必须亲手传递给消费者
(2)其次是生产者和消费者必须一一匹配,如果只有生产者传递数据,此时需要进入等待队列阻塞等待,然后由消费者将其唤醒;同样只有消费者取数据也需要在阻塞队列等待,然后由生产者将其唤醒。
我们接下来就来看看具体是如何实现的。
3 SynchronousQueue内部源码
我们来看下SynchronousQueue的各个方法的源码:
3.1 put方法
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 调用底层的transfer表示要进行数据传输,e 是要进行传输的元素 // false表示不可设置阻塞时间,也就是如果没消费者取数据,则一直阻塞等待 // 0 表示一直阻塞等待 if (transferer.transfer(e, false, 0) == null) { // 如果返回null,说明线程被中断或者超时了 Thread.interrupted(); throw new InterruptedException(); } }
3.2 take方法
public E take() throws InterruptedException { // 和put方法一样,取数据的时候也是调用transfer方法 // null表示自己是进行取数据操作,如果传递不是null则是数据传输操作 // false表示如果没有生产者传输数据,则一直阻塞等待 // 0 表示一直阻塞等待 E e = transferer.transfer(null, false, 0); if (e != null) // 如果拉取到的数据非null,说明取数据成功,返回数据 return e; // 取数据失败,线程被中断了 Thread.interrupted(); throw new InterruptedException(); }
上面的源码我们得出,生产者调用transfer来进行数据传输的,传递的数据为e;消费者通过调用transfer来获取数据,参数null表示取数据。生产者和消费者调用同样的方法,操作不同通过参数是否为null来区分。
3.3 offer(E e)方法
public boolean offer(E e) { // 检查传递的元素e非null,如果是null则抛出异常 if (e == null) throw new NullPointerException(); // e表示要传递的数据 // true && 0 表示如果没有消费者取数据下,自己要阻塞等待的时间为0纳秒 // 也就是说如果没有消费者取数据,马上返回传输失败 return transferer.transfer(e, true, 0) != null; }
offer(E e, long timeout, TimeUnit unit)方法:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 检查要传递的数据e非null,如果是null抛出异常 if (e == null) throw new NullPointerException(); // e表示要进行传输的数据 // true && timeout表示如果没有消费者取数据的情况下 // 最多等待timeout的时间,如果超时还是没有消费者取数据,就返回失败 if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
3.4 poll方法
public E poll() { // 直接调用transfer方法取数据 // 传递参数null表示自己的操作是消费者取数据,区分生产者的传递操作 // true && 0 表示如果此时没有生产者传输数据过来,则立马返回取数据失败 return transferer.transfer(null, true, 0); }
poll(long timeout, TimeUnit unit)方法:
public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 调用transfer方法进行取数据 // null表示自己的操作是消费者取数据,区分生产者传输数据操作 // true && timeout 表示如果取数据的时候没有生产者发送数据过来 // 最多阻塞等待timeout的时间,超过了这个时间还是没有生产者发送数据过来,抛出中断异常 E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); }
SynchronousQueue同步等待队列的源码精髓就在于transfer方法里面了,生产者传输数据、消费者取数据都是通过transfer方法来实现的。
4 小结
到这里,SynchronousQueue延迟队列就看的差不多了,这个阻塞队列的使用场景其实并不多,大多数是在线程之间的同步数据传递场景,有理解不对的地方欢迎指正哈。
标签:JUC,Java,消费者,SynchronousQueue,阻塞,生产者,线程,null,数据 From: https://www.cnblogs.com/kukuxjx/p/17300910.html