首页 > 编程语言 >Java并发编程——ArrayBlockingQueue

Java并发编程——ArrayBlockingQueue

时间:2023-02-01 16:01:53浏览次数:56  
标签:元素 Java 队列 lock 编程 阻塞 ArrayBlockingQueue public BlockingQueue

一、阻塞队列 BlockingQueue

在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

1.1、BlockingQueue的基本原理

先来解释一下阻塞队列:

如上图:

  • 1、生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 2、消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

查阅BlockingQueue总结了以下阻塞队列的方法:

1、boolean add(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回IllegalStateException异常。

2、boolean offer(E e)

  • 在不违反容量限制的情况下,可立即将指定元素插入此队列,成功返回true,当无可用空间时候,返回false。

3、void put(E e)

  • 直接在队列中插入元素,当无可用空间时候,阻塞等待。

4、boolean offer(E e, long timeout, TimeUnit unit)

  • 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false。

5、E take()

  • 获取并移除队列头部的元素,无元素时候阻塞等待。

6、E poll( long time, timeunit unit)

  • 获取并移除队列头部的元素,无元素时候阻塞等待指定时间。

7、boolean remove()

  • 获取并移除队列头部的元素,无元素时候会抛出NoSuchElementException异常。

8、E element()

  • 不移除的情况下返回列头部的元素,无元素时候会抛出NoSuchElementException异常。

9、E peek()

  • 不移除的情况下返回列头部的元素,队列为空无元素时返回null。

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。 以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

 

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

二、ArrayBlockingQueue

ArrayBlockingQueue使用的数据结构是数组

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 队列中元素保存的地方 */
    final Object[] items;

    /** 取元素的指针 记录下一次操作的位置 */
    int takeIndex;

    /** 放元素的指针 记录下一次操作的位置 */
    int putIndex;

    /** 元素数量 */
    int count;

    /** 保证并发访问的锁 */
    final ReentrantLock lock;

    /** 等待出队的条件 消费者监视器 */
    private final Condition notEmpty;

    /** 等待入队的条件 生产者监视器 */
    private final Condition notFull;
}	

构造函数

  • 容量大小有构造函数的capacity参数决定。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

	//必须传入容量,可以控制重入锁是公平还是非公平
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
	
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
		// 初始化数组	
        this.items = new Object[capacity];
		// 创建重入锁及两个条件
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
		//final修饰的变量不会发生指令重排
        final ReentrantLock lock = this.lock;
        lock.lock(); // 保证可见性 不是为了互斥  防止指令重排 保证item的安全
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }	
}	

2.1、入队

2.1.1、add(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public boolean add(E e) {
        return super.add(e);
    }
}	

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
	
	// AbstractQueue 调用offer(e)如果成功返回true,如果失败抛出异常	
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
}

2.1.2、offer(E e)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public boolean offer(E e) {
		// 元素不可为空
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
		// 加锁
        lock.lock();
        try {
            if (count == items.length)
				// 如果数组满了就返回false  
                return false;
            else {
				// 如果数组没满就调用入队方法并返回true
                enqueue(e);
                return true;
            }
        } finally {
			//释放锁
            lock.unlock();
        }
    }
}	

2.1.3、put(E e) 方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
		// 获取ReentrantLock锁
        final ReentrantLock lock = this.lock;
		// 加锁,如果线程中断了抛出异常
        lock.lockInterruptibly();
        try {
			// 如果队列满了,则进入条件队列进行等待
            while (count == items.length)
                notFull.await();
			// 队列不满,或者被取数线程唤醒了,那么会继续执行
			// 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程	
            enqueue(e);
        } finally {
			// 释放ReentrantLock锁
            lock.unlock();
        }
    }
}	

2.1.4、offer(E e, long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
				// 如果数组满了,就阻塞nanos纳秒,如果唤醒这个线程时依然没有空间且时间到了就返回false	
                nanos = notFull.awaitNanos(nanos);
            }
			//入队
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
	
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
		// 把元素直接放在放指针的位置上
        items[putIndex] = x;
		// 如果放指针到数组尽头了,就返回头部
        if (++putIndex == items.length)
            putIndex = 0;
		// 数量加1	
        count++;
		// 唤醒notEmpty,因为入队了一个元素,所以肯定不为空了
        notEmpty.signal();
    }	
}	
  • add(e)时如果队列满了则抛出异常;
  • offer(e)时如果队列满了则返回false;
  • put(e)时如果队列满了则使用notFull等待;
  • offer(e, timeout, unit)时如果队列满了则等待一段时间后如果队列依然满就返回false;
  • 利用放指针循环使用数组来存储元素;

2.2、出队

2.2.1、 remove()方法

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
	
    public E remove() 
		// 调用poll()方法出队
        E x = poll();
        if (x != null)
			// 如果有元素出队就返回这个元素
            return x;
        else
			// 如果没有元素出队就抛出异常
            throw new NoSuchElementException();
    }
}

2.2.2、 poll()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
			//如果队列里没有数据就直接返回null
			//否则从队列头部出队
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
}

2.2.3、 poll(long timeout, TimeUnit unit)方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
		
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
		    // 如果队列无元素,则阻塞等待nanos纳秒
			// 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
}

2.2.4、 take()方法

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
		//加锁,如果线程中断了抛出异常
        lock.lockInterruptibly();
        try {
			//队列中不存元素
            while (count == 0)
				/*
				 * 一直等待条件notEmpty,即被其他线程唤醒
				 * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()
				 * 唤醒其他等待这个条件的线程,同时队列也不空了)
				 */
                notEmpty.await();
			//否则出队	
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
	
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
		// 取取指针位置的元素
        E x = (E) items[takeIndex];
		// 把取指针位置设为null
        items[takeIndex] = null;
		// 取指针前移,如果数组到头了就返回数组前端循环利用
        if (++takeIndex == items.length)
            takeIndex = 0;
		// 元素数量减1	
        count--;
        if (itrs != null)
            itrs.elementDequeued();
		// 唤醒notFull条件	
        notFull.signal();
        return x;
    }	
}	
  • remove()时如果队列为空则抛出异常;
  • poll()时如果队列为空则返回null;
  • take()时如果队列为空则阻塞等待在条件notEmpty上;
  • poll(timeout, unit)时如果队列为空则阻塞等待一段时间后如果还为空就返回null;
  • 利用取指针循环从数组中取元素;

如下图,以put和take方法为例:

这里put和take使用了同一个ReentrantLock,不能并发执行。

2.3、缺点

  • a、队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量;

  • b、如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险;

  • c、只使用了一个锁来控制入队出队,效率较低。

参考: https://www.itzhai.com/articles/graphical-blocking-queue.html

https://zhuanlan.zhihu.com/p/224946304

标签:元素,Java,队列,lock,编程,阻塞,ArrayBlockingQueue,public,BlockingQueue
From: https://blog.51cto.com/u_14014612/6031642

相关文章

  • Java并发编程——LinkedBlockingQueue
    一、阻塞队列BlockingQueue在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速......
  • java/Android获取单个文件的MD5值,解决首位0被省略问题,解决超大文件问题,cmd命令行查看
    code来源:https://betheme.net/news/txtlist_i67135v.html?action=onClickcmd命令行查看文件md5码:certutil-hashfilea.txtmd5,不加后面的md5,查看的默认是sha1码。packag......
  • 【Java】自定义mybatis
    处理sqlin多条件搜索时单引号双引号问题StringBuilderpidNoZeroIds=newStringBuilder();IntegernumTmp=0;for(ShequLsDatingTypepidNoZero:pidNoZeroLis......
  • Java线程池中的execute和submit
    一、概述execute和submit都是线程池中执行任务的方法。execute是Executor接口中的方法publicinterfaceExecutor{voidexecute(Runnablecommand);}submit是......
  • Java多线程:Future和FutureTask
    一、FutureFuture是一个接口,所有方法如下:上源码:packagejava.util.concurrent;publicinterfaceFuture<V>{booleancancel(booleanmayInterruptIfRunning);......
  • 使用java python 实现 QI-页面排序-骑马钉
    链接:http://www.cnprint.org/bbs/thread/77/339531/......
  • Spring开启@Async异步方法(javaconfig配置)
    在Spring中,基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。应用场景:某些耗时较长的......
  • JavaScript闭包的概念
    什么是闭包?闭包有什么作用,缺点是什么?闭包的概念:JavaScript中函数会产生闭包(closure)。闭包是函数本身和该函数声明时所处的环境状态的组合;函数能够“记忆住”其定义......
  • Java 多态
    多态就是指程序中定义的引用变量所指向的具体类型和通过该引用变量发出的方法调用在编译时并不确定,而是在程序运行期间才确定。即一个引用变量倒底会指向哪个类的实例对象......
  • 部署在docker里的java程序获取真实的用户ip地址
    目前我们的服务都是全部docker化,网关zuul和各微服务都部署在docker里,构成了集群。用户请求全部到HaProxy,由HaProxy转发到zuul,再由zuul分发给各微服务。那么我们在做黑名单,或......