首页 > 其他分享 >JUC-阻塞队列

JUC-阻塞队列

时间:2024-11-19 16:44:02浏览次数:3  
标签:JUC 队列 lock 阻塞 线程 items ArrayBlockingQueue

JUC-阻塞队列

如有侵权,请联系~
如有错误,也欢迎批评指正~

1、阻塞队列概述

阻塞队列在业务代码中可能较少使用,但是只要喜欢看源码的同学就会发现,阻塞队列使用的很频繁,到处都有它的影子。例如:线程池ThreadPoolExecutor中的工作队列。
阻塞队列(BlockingQueue)是 Java 并发包(java.util.concurrent)中的一个重要接口,允许多个线程同时安全地操作队列。它不仅提供了队列的基本操作(如插入、删除、检查元素),还可以在队列满时阻塞插入线程,在队列空时阻塞删除线程。这种机制非常适合用于生产者-消费者问题等多线程场景。

特点:

  • 线程安全:阻塞队列是线程安全的,多个线程可以安全地访问和操作它。
  • 阻塞操作:
    • 在队列已满时,插入元素的线程会阻塞,直到有空间可用。
    • 在队列为空时,取出元素的线程会阻塞,直到有元素可用。
      多种实现:

Java 提供了多种阻塞队列的实现,例如:

  • ArrayBlockingQueue: 使用数组实现的有界阻塞队列。
  • LinkedBlockingQueue: 使用链表实现的可选界限的阻塞队列。
  • PriorityBlockingQueue: 支持优先级的无界阻塞队列。
  • DelayQueue: 延迟队列,元素在到达特定时间后才能被获取。
  • SynchronousQueue: 不持有元素的阻塞队列,插入和获取操作必须同步进行。

2、ArrayBlockingQueue阻塞队列

2.1 ArrayBlockingQueue架构图

因为有很多种阻塞队列,我们以ArrayBlockingQueue阻塞队列进行讲解、学习。首先先看下整体的架构图,先看到整体的流程图再学习源码:
在这里插入图片描述
上面这个图其实就是管程模型,上述图转换为下面这张图结合之前的管程模型进行对比。
在这里插入图片描述

2.2 ArrayBlockingQueue源码

结合上面的图一起看源码效果更佳哦。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
	// 阻塞队列相关的属性
	/** The queued items */
	final Object[] items;  // 队列,将数据都是往这个队列中存储数据
	/** items index for next take, poll, peek or remove */
	int takeIndex;  // 队列的起始位置
	/** items index for next put, offer, or add */
	int putIndex; // 队列的最终位置
	/** Number of elements in the queue */
	int count;   // 队列中数据的个数
	
	// 控制逻辑【控制数据的put和take】:锁和条件
	/** Main lock guarding all access */
	final ReentrantLock lock;
	/** Condition for waiting takes */
	private final Condition notEmpty;
	/** Condition for waiting puts */
	private final Condition notFull;

	// 从阻塞队列拿值
	public E poll(long timeout, TimeUnit unit) throws InterruptedException {
	    long nanos = unit.toNanos(timeout);
	    final ReentrantLock lock = this.lock;
	    lock.lockInterruptibly();
	    try {
	        while (count == 0) {
	            if (nanos <= 0)
	                return null;
	            nanos = notEmpty.awaitNanos(nanos);
	        }
	        return dequeue();
	    } finally {
	        lock.unlock();
	    }
	}
	
	// 往阻塞队列中塞数据
	public boolean offer(E e, long timeout, TimeUnit unit)
	    throws InterruptedException {
	
	    checkNotNull(e);
	    long nanos = unit.toNanos(timeout);
	    final ReentrantLock lock = this.lock;
	    // 这里获取锁
	    lock.lockInterruptibly();
	    try {
	        while (count == items.length) {
	            if (nanos <= 0)
	                return false;
	            nanos = notFull.awaitNanos(nanos);
	        }
	        enqueue(e);
	        return true;
	    } finally {
	        // 必须要等到这里才会释放锁么,唯一释放锁的地方么?
	        // 很明显不是。例如两个线程,第一个线程先获取数据,等待10ms【此时阻塞队列没有数据】,第二个线程存储数据。如果真的是这样,那岂不是第一个线程获取不到数据了
	        lock.unlock();
	    }
	}
	
	// 阻塞nanosTimeout毫秒
	public final long awaitNanos(long nanosTimeout) throws InterruptedException {
       if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node); //仔细看这里会释放锁,在超时等待之前释放锁,允许其他线程对阻塞队列进行操作
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout); // 超时等待
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  // 这里会重新获取锁。这样就实现了对队列操作的时候进行锁,阻塞的时候释放锁
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
	}
	
	 private E dequeue() {
	    // assert lock.getHoldCount() == 1;
	    // assert items[takeIndex] != null;
	    final Object[] items = this.items;
	    @SuppressWarnings("unchecked")
	    E x = (E) items[takeIndex];
	    items[takeIndex] = null;
	    if (++takeIndex == items.length)
	        takeIndex = 0;
	    count--;
	    if (itrs != null)
	        itrs.elementDequeued();
	    notFull.signal();
	    return x;
	}
}

标签:JUC,队列,lock,阻塞,线程,items,ArrayBlockingQueue
From: https://blog.csdn.net/m0_50149847/article/details/143775980

相关文章

  • 【高贵的数据结构】学了python你一定要知道的知识之deque双端队列
    deque是Python的collections模块提供的一种双端队列数据结构,支持从队列的两端快速添加和删除元素,时间复杂度为(O(1))。与列表相比,它在高效的双端操作中有明显优势。1.导入dequefromcollectionsimportdeque2.初始化deque创建空队列dq=deque()print(......
  • 逆波兰表达式求值——栈与队列
    首先是第一次的代码classSolution{public:intevalRPN(vector<string>&tokens){stack<longlong>str;for(inti=0;i<tokens.size();i++){if(tokens[i]!="+"&&tokens[i]!=......
  • 消息队列Kafka与RabbitMq异同分析
    消息模型:Kafka消息模型Topic和Partition:Topic:是消息的分类,所有相关的消息都被发送到同一个Topic。Partition:每个Topic可以有多个Partition,Partition是Topic的基本存储单元。Partition允许数据的并行处理,提高了吞吐量。消费者组:消费者可以组成一个消费者组(Co......
  • 消息队列的作用?
    先告诉你答案:异步、削峰、解耦。异步异步对应着同步,了解异步先了解什么是同步。同步:请求发送后,在收到结果之前一直等待。异步:请求发送后,可以去做其他事情。下面来看一个同步案例    用户发送请求之后,会一直等待,整个链路调用时间150ms+200ms+200ms=550ms。但是......
  • 删除字符串中的所有相邻重复项--栈与队列
    第一版的代码如下下:点击查看代码classSolution{public:stringremoveDuplicates(strings){stack<char>str;for(inti=0;i<s.size();i++){//要先判断才能进行压栈,再次记住栈一定要先判断是否为空i......
  • JUC的常见类
    1.callable接口callable是一个interface,相当于把线程封装了一个“返回值”,方便程序员借助多线程的方式计算结果。callable接口和Runnable接口是并列关系,但是Runnable返回值是void重写的是run方法更注重执行的过程而不是结果,而callable重写的是call方法,call是有返回值的返回值......
  • JUC---ThreadLocal原理详解
    什么是ThreadLocal?通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。如果想实现每一个线程都有自己的专属本地变量该如何解决呢?JDK中自带的ThreadLocal类正是为了解决这样的问题。ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻......
  • JUC---多线程下的数据共享(基于ThreadLocal的思考)
    多线程下的数据共享(基于ThreadLocal的思考)起初实在写项目过程中,在完成超时订单自动取消的任务时,使用xxl-job,整个逻辑是需要从订单表中找出过期的订单,然后将其存入订单取消表。存入订单取消表时需要存储用户的信息。我最开始没想那么多,就直接从ThreadLocal中取出用户信息,但......
  • DAY65||Bellman_ford 队列优化算法(又名SPFA)|bellman_ford之判断负权回路|bellman_ford
    Bellman_ford队列优化算法(又名SPFA)94.城市间货物运输I思路大家可以发现Bellman_ford算法每次松弛都是对所有边进行松弛。但真正有效的松弛,是基于已经计算过的节点在做的松弛。给大家举一个例子:本图中,对所有边进行松弛,真正有效的松弛,只有松弛边(节点1->节点2)和......
  • 数据结构——栈和队列的模拟实现
    文章目录前言一、栈1.1概念与结构1.2栈的实现二、队列2.1概念与结构2.2队列的实现总结前言继上篇博客,已经把链表的有关习题完成,链表也已经收尾啦。今天来学习新的数据结构——栈和队列,fellowme一、栈1.1概念与结构栈:⼀种特殊的线性表,其只允许在固定......