首页 > 编程语言 >Java线程相关知识及线程池学习二

Java线程相关知识及线程池学习二

时间:2025-01-19 17:28:15浏览次数:3  
标签:元素 Java 队列 lock 知识 阻塞 线程 final

阻塞队列

定义

在Java中,阻塞队列(Blocking Queue)是一种线程安全的队列。阻塞队列是Java并发包(java.util.concurrent)中的一个重要组件,常用于生产者-消费者模式中,一个线程产生数据放入队列,另外一个从队列取出数据进行消费。
主要有两种情况

  • 在尝试添加元素到队列中时,如果队列已满,则线程会被阻塞;
  • 在尝试从队列中移除元素时,如果队列为空,则线程也会被阻塞。

BlockingQueue

BlockingQueue是一个接口,方法主要有如下四种形式。

方法用途\行为方式抛出异常返回特殊值阻塞超时阻塞
插入add(E e)offer(E e)put(E e)offer(E e, long timeout, TimeUnit unit)
移除remove(Object o)poll()take()poll(long timeout, TimeUnit unit)
检查element()peek()

行为说明

  • 抛出异常
    如果试图的操作无法立即执行,抛一个异常。比如:
    1、add(e):给队列添加元素,当队列的容量已满就会抛出异常 IllegalStateException。
    2、remove(): 当队列为空时会抛出异常NoSuchElementException。
    3、队列为空时会抛出NoSuchElementException。
  • 返回特殊值
    如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
    1、offer(e) : 给添加元素,如果成功返回true,队列满了就返回false。
    2、poll:从头部取元素,如果成功返回true,队列为空就返回false。
  • 阻塞
    如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
    1、put(e):如果队列满了就会阻塞当前put的线程。
    2、如果队列为空也会阻塞当前线程。
  • 超时阻塞
    如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

几种常用的队列

ArrayBlockingQueue
定义和使用

一个由数组结构组成的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。

        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
        //生成数据的线程
        Thread putThread = new Thread(()->{
            for (int i = 0; i < 20; i++) {
                try {
                    System.out.println("生成:"+i);
                    blockingQueue.put(i);//队列满了会阻塞
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //消费数据的线程
        Thread takeThread = new Thread(()->{
            for (int i = 0; i < 20; i++) {
                try {
                    int item = blockingQueue.take();//队列空了会阻塞
                    System.out.println("消费:"+item);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        putThread.start();
        takeThread.start();
源码分析(JDK17)
  • 初始化
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    //存储数据的数组,构造的时候会根据传入的大小进行初始化
    final Object[] items;
    //取元素的下标
    int takeIndex;
    //存入元素的下标
    int putIndex;
    //队列元素大小
    int count;
    //锁
    final ReentrantLock lock;
    //取元素时阻塞线程用的条件变量
    private final Condition notEmpty;
    //存元素时阻塞线程用的条件变量
    private final Condition notFull;

    /*
    * 传入队列容量大小
    * @param capacity 队列容量
    */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    /*
    * 传入队列容量大小
    * @param capacity 队列容量
    * @param fair true表示创建的是公平锁,在锁中等待的线程被唤醒后会按照阻塞时间的长短来竞争锁
    *             表示非公平锁,在锁中等待的线程被唤醒后自由竞争锁
    */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
}
  • put添加元素
    /*
    * 向队列添加元素
    * @param e 要添加的元素
    */
    public void put(E e) throws InterruptedException {
        //检查元素是否为空
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //先加锁,因为只有一把锁所以这个队列存和取不能同时进行
        lock.lockInterruptibly();
        try {
            //当队列中已经满了时阻塞住存元素的线程
            while (count == items.length)
                notFull.await();
            //走到这里表示队列未满了,执行添加元素的逻辑
            enqueue(e);
        } finally {
            //放在finally保证一定会解锁
            lock.unlock();
        }
    }

    /*
    * 入队
    * @param e 要添加的元素
    */
    private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        //把要添加的元素e放在数组的putIndex下标处
        items[putIndex] = e;
        //存元素的下标自增1,下次put就可以往下一个位置放元素,
        //所以放元素是在数组的尾部添加元素
        //如果+1后超出了数组的容量限制就重置成0,表示已按顺序放到了队列的末尾,
        //按先入先出的原则肯定是0号位的元素会先被取走,所以下次就在0号位put
        if (++putIndex == items.length) putIndex = 0;
        //队列大小+1
        count++;
        //唤醒正在阻塞的获取元素的线程
        notEmpty.signal();
    }
  • take取出元素
    /*
    * 从队列获取元素
    */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //如果队列大小为空,则取元素的线程阻塞
            while (count == 0)
                notEmpty.await();
            //从队列取出元素
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    /*
    * 出队
    */
    private E dequeue() {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //从数组中取出元素
        E e = (E) items[takeIndex];
        //将取出元素后的位置置为null
        items[takeIndex] = null;
        //取完后让takeIndex自增指向下次要取的下标,
        //如果超过了数组的容量就重新指向0,因为按先入先出原则取完最后一个就要回到起点准备下一轮获取
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //唤醒正在阻塞的添加元素的线程
        notFull.signal();
        return e;
    }
LinkedBlockingQueue
定义

一个由链表结构组成的可选有界阻塞队列,创建时可以不指定容量,但本质上看还是一个有界队列,当你不指定容量是默认的容量是Integer.MAX_VALUE。因为基于链表实现,所以存和取操作的是不同的节点,所以内部有两把锁,存和取是可以同时进行的。

源码分析(JDK17)
  • 初始化
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    /**
    * 链表节点
    */
    static class Node<E> {
        //存储的值
        E item;

        //下个节点
        Node<E> next;
        Node(E x) { item = x; }
    }
    //容量
    private final int capacity;
    
    //当前队列大小
    private final AtomicInteger count = new AtomicInteger();
    
    //指向链表的头节点,链表的头节点不放元素,head.item == null,
    //这是为了处理在链表中只有一个元素时存线程和取线程同时操作这个节点的next属性引发线程安全问题,
    //这样设计后当链表中只有一个头节点时就认为当前队列是空的,让取线程阻塞,就可以避免上边的问题。
    transient Node<E> head;
    
    //尾节点
    private transient Node<E> last;

    //取元素的锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //取元素时等待的条件变量
    private final Condition notEmpty = takeLock.newCondition();

    //添加元素的锁
    private final ReentrantLock putLock = new ReentrantLock();
    //添加元素时等待的条件变量
    private final Condition notFull = putLock.newCondition();
    
    /*
    * 无参构造函数,容量按Integer.MAX_VALUE
    */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    /*
    * 有参构造函数,传入指定容量大小
    * @param capacity 容量
    */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        //头节点不存元素
        last = head = new Node<E>(null);
    }
}
  • put添加元素
    /*
    * 添加元素
    * @param e 存入的元素
    */
    public void put(E e) throws InterruptedException {
        //元素不能为空
        if (e == null) throw new NullPointerException();
        final int c;
        //创建新节点
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //加锁
        putLock.lockInterruptibly();
        try {
            //当前队列已满,存的线程阻塞
            while (count.get() == capacity) {
                notFull.await();
            }
            //入队
            enqueue(node);
            //队列大小+1
            c = count.getAndIncrement();
            //如果当前容量+1后没有超过容量大小,唤醒存的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //释放锁
            putLock.unlock();
        }
        //只有当c=0时才会去叫醒取元素的线程,因为c=0说明添加之前队列是空的,才有可能会有阻塞的取元素线程
        if (c == 0)
            //唤醒取线程
            signalNotEmpty();
    }

    /*
    * 入队
    * @param node 
    */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        //在链表尾部添加节点
        last = last.next = node;
    }
  • take取出元素
    /*
    * 取出元素
    */
    public E take() throws InterruptedException {
        final E x;
        final int c;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //加锁
        takeLock.lockInterruptibly();
        try {
            //当前队列为空,取线程阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            //元素出队
            x = dequeue();
            //队列大小-1
            c = count.getAndDecrement();
            //如果还有元素唤醒取线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            //解锁
            takeLock.unlock();
        }
        //如果取这个元素之前队列满了,那就可能会有阻塞的存元素线程,所以调用唤醒
        //存元素线程的方法
        if (c == capacity)
            signalNotFull();
        return x;
    }
    /*
    * 出队,从对头取出
    */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        //获取头节点的下一个节点
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        //取头节点下一个节点的元素
        E x = first.item;
        //把头节点的下一个节点的内容清空,这样它就可以作为新的头节点
        first.item = null;
        return x;
    }
DelayQueue
  • 一个无界阻塞队列,其中的元素只能在其延迟期满时才能从队列中提取。元素必须实现Delayed接口。
  • DelayQueue 中的元素必须是 Delayed 的子类,Delayed 是表达延迟能力的关键接口,其继承了 Comparable 接口,并定义了还剩多久过期的方法,需要实现getDelay()和compareTo()。
  • compareTo(Delayed o):用于比较延时,这是队列里元素的排序依据。当生产者线程调用 put 之类的方法加入元素时,会触发 Delayed 接口中的 compareTo 方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。
  • getDelay(TimeUnit unit):这个接口返回元素是否到期,小于等于 0 表示元素已到期,大于 0 表示元素未到期。消费者线程查看队列头部的元素,然后调用元素的 getDelay 方法,如果此方法返回的值小于0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果 getDelay 方法返回的值大于 0,则消费者线程 wait 返回的时间值后,再从队列头部取出元素,此时元素已经到期。
public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}
源码分析
  • 初始化
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    //锁
    private final transient ReentrantLock lock = new ReentrantLock();
    
    //PriorityQueue是一个基于优先级堆的无界队列,它实现了Queue接口。
    //PriorityQueue中的元素并不   是按照它们被添加到队列中的顺序来排序的,
    //而是根据元素的自然顺序(如果元素实现了Comparable接口)或者根据在构造PriorityQueue时提供的Comparator来排序的
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    //第一个等待某个延时对象的线程,在延时对象还没有到期时其他线程看到这个 leader 不为 null,那么就直接 wait,主要是为了避免大量线程在同一时间点唤醒,导致大量的竞争,反而影响性能。
    private Thread leader;
}
  • 添加元素
public void put(E e) {
        offer(e);
    }
    /*
    * 入队
    * @param e 入队元素
    */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        //获取锁
        lock.lock();
        try {
            //插入元素
            q.offer(e);
            //检查插入的元素是否是队首,是的话将leader = null,然后唤醒一个消费线程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            //释放锁
            lock.unlock();
        }
    }
  • take取出元素
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //获取锁
        lock.lockInterruptibly();
        try {
            for (;;) {
                //获取头部元素,但不会删除
                E first = q.peek();
                //如果头部元素为空,阻塞线程
                if (first == null)
                    available.await();
                else {
                    //获取延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    //延迟时间小于0,表示到期了,直接取出
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    //leader != null 表明有其他线程在操作,线程阻塞
                    if (leader != null)
                        available.await();
                    else {
                        //获取当前线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //超时阻塞
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader 为 null 并且队列不为空,说明没有其他线程在等待,那就唤醒
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

标签:元素,Java,队列,lock,知识,阻塞,线程,final
From: https://blog.csdn.net/linwq8/article/details/145229528

相关文章

  • java类型转换
    由于java是强类型语言,所以在运算的时候需要用到类型转换低(byte、short、char——>int——>long——>float——>double)高运算中不同类型的数据先转化为同一类型,然后在进行运算强制转换:(类型)变量名(高——>低)/自动转换(低——>高)注意点不能对布尔值进行转换不能对象类型转换......
  • 深入理解 Java 双列集合:Map 家族的探索与实践
    在Java编程的世界里,集合框架是组织和操作数据的强大工具。其中,双列集合以独特的键值对存储方式,为我们处理数据提供了别样的思路。本文将深入探讨Java双列集合的核心概念、常见实现类及其应用场景。双列集合的基本特性双列集合,区别于单列集合,它一次存储一对数据,即键(Key)和......
  • 偷偷的学Java
    序章:为何要偷偷学Java?•Java,不仅仅是一种编程语言• 偷偷学Java,快速提升你的竞争力•Java学习秘籍第一章:Java的神秘面纱•Java的起源与发展历程•Java的生态系统与应用场景•Java与其他编程语言的比较第二章:搭建你的Java秘密基地•安装Java开发工具包(JDK):不被发现......
  • java变量及八大基本数据类型的定义
    变量变量是什么:就是可以变化的量!java是一种强类型语言,每个变量都必须声明其类型java变量是程序中最基本的存储单元,其中要素包括变量名,变量类型,作用域注意事项每个变量都有类型,类型可以是基本类型,也可以是引用类型变量名必须是合法的标识符变量声明是一条完整的语句,因此......
  • 【华为OD-E卷 - 第k个排列 100分(python、java、c++、js、c)】
    【华为OD-E卷-第k个排列100分(python、java、c++、js、c)】题目给定参数n,从1到n会有n个整数:1,2,3,…,n,这n个数字共有n!种排列。按大小顺序升序列出所有排列的情况,并一一标记,当n=3时,所有排列如下:“123”“132”“213”“231”“312”“321”给......
  • 咱们继续学Java——高级篇 第一百八十三篇:之Java高级Swing编程之JEditorPane组件与进
    咱们继续学Java——高级篇第一百八十三篇:之Java高级Swing编程之JEditorPane组件与进度指示器在Java编程的学习旅程中,我们始终保持着积极探索、共同成长的态度。今天,我们将深入学习Java高级Swing编程中关于JEditorPane组件与进度指示器的部分,包括JEditorPane组件的功能特性......
  • Java学习,删除集合指定元素
    Java删除集合中指定元素,通常依赖于集合具体类型。不同的集合类型(如ArrayList,HashSet,LinkedList等)提供了不同的方法来执行此操作。使用ArrayList:importjava.util.ArrayList;importjava.util.List; publicclassMain{  publicstaticvoidmain(String[]ar......
  • 小志的Java学习计划
    小志的Java学习计划自身情况分析及目标​普通二本计算机软件工程专业,大学期间未参加比赛,绩点和个人技术水平也不高只能说可以保证毕业。一战考研数学发挥失利。受到网络上学历贬值的信息的影响,考虑到本身报考院校也不是出色的双非院校三年以后就业也许也不容易,于是并不打......
  • [2025.1.19 JavaSE学习]网络编程-2(netstat指令 && TCP补充)
    netstatnetstat-an:可以查看当前主机网络情况,包括端口监听情况和网络连接情况netstat-an|more:可以分页显示在dos控制台执行Listening表示某个端口在监听如果有一个外部程序(客户端)连接到该端口,就会显示一条连接信息PS:netstat-anb,可以发现,8888端口号在上一节程序运行......
  • 【开源】一款基于JAVA的国产化自主可控的人工智能开源平台
    一、项目简介人工智能开源平台是由联合国内顶尖科研力量共同打造的国产化自主可控的人工智能开源平台。平台面向人工智能研究中的数据处理、算法开发、模型训练、算力管理和推理应用等各个流程的技术难点,研发了包括一站式算法开发平台、高性能分布式深度学习框架、先进算法模型库......