1 前言
最近复习 AQS,回顾下其中的知识哈。
2 概述
2.1 AQS——锁的底层支持
AbstractQueuedSynchronizer 抽象同步队列简称 AQS,它是实现同步器的基础组件, 并发包中锁的底层就是使用 AQS 实现的。另外,大多数开发者可能永远不会直接使用 AQS,但是知道其原理对于架构设计还是很有帮助的。下面看下 AQS 的类图结构,如图 6-1 所示。
由该图可以看到,AQS 是一个 FIFO 的双向队列,其内部通过节点 head 和 tail 记录队 首和队尾元素,队列元素的类型为 Node。
其中 Node 中的 thread 变量用来存放进入 AQS 队列里面的线程 ;Node 节点内部的 SHARED 用来标记该线程是获取共享资源时被阻塞 挂起后放入 AQS 队列的,EXCLUSIVE 用来标记线程是获取独占资源时被挂起后放入 AQS 队列的 ;
waitStatus 记录当前线程等待状态,可以为 CANCELLED(线程被取消了)、 SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释 放共享资源时需要通知其他节点);prev 记录当前节点的前驱节点,next 记录当前节点的 后继节点。
在 AQS 中维持了一个单一的状态信息 state, 可以通过 getState、setState、 compareAndSetState 函数修改其值。对于 ReentrantLock 的实现来说,state 可以用来表示 当前线程获取锁的可重入次数 ;对于读写锁 ReentrantReadWriteLock 来说,state 的高 16 位表示读状态,也就是获取该读锁的次数,低 16 位表示获取到写锁的线程的可重入次数; 对于 semaphore 来说,state 用来表示当前可用信号的个数 ;对于 CountDownlatch 来说, state 用来表示计数器当前的值。可以理解 state 就是资源。
AQS 有个内部类 ConditionObject,用来结合锁实现线程同步。ConditionObject 可以 直接访问 AQS 对象内部的变量,比如 state 状态值和 AQS 队列。ConditionObject 是条件 变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的 await 方法后被阻塞的线程,如类图所示,这个条件队列的头、尾元素分别为 firstWaiter 和 lastWaiter。
对于 AQS 来说,线程同步的关键是对状态值 state 进行操作。根据 state 是否属于一个 线程,操作 state 的方式分为独占方式和共享方式。 在独占方式下获取和释放资源使用的 方法为 : void acquire(int arg) void acquireInterruptibly(int arg) boolean release(int arg)。
在共享方式下获取和释放资源的方法为: void acquireShared(int arg) void acquireSharedInterruptibly(int arg) boolean releaseShared(int arg)。
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源, 就会标记是这个线程获取到了,其他线程再尝试操作 state 获取资源时会发现当前该资源 不是自己持有的,就会在获取失败后被阻塞。比如独占锁 ReentrantLock 的实现,当一个 线程获取了 ReentrantLock 的锁后,在 AQS 内部会首先使用 CAS 操作把 state 状态值从 0 变为 1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持 有者,则会把状态值从 1 变为 2,也就是设置可重入次数,而当另外一个线程获取锁时发 现自己并不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。
对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过 CAS 方 式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源 还能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。比如 Semaphore 信 号量,当一个线程通过 acquire() 方法获取信号量时,会首先看当前信号量个数是否满足需 要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋 CAS 获取信号量。
在独占方式下,获取与释放资源的流程如下 :
(1)当一个线程调用 acquire(int arg) 方法获取独占资源时,会首先使用 tryAcquire 方 法尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线程 封装为类型为 Node.EXCLUSIVE 的 Node 节点后插入到 AQS 阻塞队列的尾部,并调用 LockSupport.park(this) 方法挂起自己。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(2)当一个线程调用 release(int arg) 方法时会尝试使用 tryRelease 操作释放资源,这里 是设置状态变量 state 的值,然后调用 LockSupport.unpark(thread) 方法激活 AQS 队列里面 被阻塞的一个线程 (thread)。被激活的线程则使用 tryAcquire 尝试,看当前状态变量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放 入 AQS 队列并被挂起。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
需要注意的是,AQS 类并没有提供可用的 tryAcquire 和 tryRelease 方法,正如 AQS 是锁阻塞和同步器的基础框架一样,tryAcquire 和 tryRelease 需要由具体的子类来实现。 子类在实现 tryAcquire 和 tryRelease 时要根据具体场景使用 CAS 算法尝试修改 state 状态值 , 成功则返回 true, 否则返回 false。子类还需要定义,在调用 acquire 和 release 方法时 state 状态值的增减代表什么含义。
比如继承自 AQS 实现的独占锁 ReentrantLock,定义当 status 为 0 时表示锁空闲,为 1 时表示锁已经被占用。在重写 tryAcquire 时,在内部需要使用 CAS 算法查看当前 state 是否为 0,如果为 0 则使用 CAS 设置为 1,并设置当前锁的持有者为当前线程,而后返回true,如果 CAS 失败则返回 false。
比如继承自 AQS 实现的独占锁在实现 tryRelease 时,在内部需要使用 CAS 算法把当 前 state 的值从 1 修改为 0,并设置当前锁的持有者为 null,然后返回 true,如果 CAS 失败 则返回 false。
在共享方式下,获取与释放资源的流程如下:
(1)当线程调用 acquireShared(int arg) 获取共享资源时,会首先使用 tryAcquireShared 尝试获取资源,具体是设置状态变量 state 的值,成功则直接返回,失败则将当前线 程封装为类型为 Node.SHARED 的 Node 节点后插入到 AQS 阻塞队列的尾部,并使用 LockSupport.park(this) 方法挂起自己。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
(2)当一个线程调用 releaseShared(int arg) 时会尝试使用 tryReleaseShared 操作释放资 源,这里是设置状态变量 state 的值,然后使用 LockSupport.unpark(thread)激活 AQS 队 列里面被阻塞的一个线程 (thread)。被激活的线程则使用 tryReleaseShared 查看当前状态变 量 state 的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还 是会被放入 AQS 队列并被挂起。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
同样需要注意的是,AQS 类并没有提供可用的 tryAcquireShared 和 tryReleaseShared 方法,正如 AQS 是锁阻塞和同步器的基础框架一样,tryAcquireShared 和 tryReleaseShared 需要由具体的子类来实现。子类在实现 tryAcquireShared 和 tryReleaseShared 时要根据具体 场景使用 CAS 算法尝试修改 state 状态值,成功则返回 true,否则返回 false。
比如继承自 AQS 实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryAcquireShared 时,首先查看写锁是否被其他线程持有,如果是则直接返回 false,否则使用 CAS 递增 state 的高 16 位 ( 在 ReentrantReadWriteLock 中,state 的高 16 位为获取读 锁的次数 )。
比如继承自 AQS 实现的读写锁 ReentrantReadWriteLock 里面的读锁在重写 tryReleaseShared 时,在内部需要使用 CAS 算法把当前 state 值的高 16 位减 1,然后返回 true,如果 CAS 失败则返回 false。
基于 AQS 实现的锁除了需要重写上面介绍的方法外 ,还需要重写 isHeldExclusively 方法,来判断锁是被当前线程独占还是被共享。
另外,也许你会好奇,独占方式下的 void acquire(int arg) 和 void acquireInterruptibly(int arg),与共享方式下的 void acquireShared(int arg) 和 void acquireSharedInterruptibly(int arg), 这两套函数中都有一个带有 Interruptibly 关键字的函数,那么带这个关键字和不带有什么 区别呢?我们来讲讲。
其实不带 Interruptibly 关键字的方法的意思是不对中断进行响应,也就是线程在调用 不带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了 该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就 是说不对中断进行响应,忽略中断。
而带 Interruptibly 关键字的方法要对中断进行响应,也就是线程在调用带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线 程会抛出 InterruptedException 异常而返回。
最后,我们来看看如何维护 AQS 提供的队列,主要看入队操作。
入队操作 :当一个线程获取锁失败后该线程会被转换为 Node 节点,然后就会使用 enq(final Node node) 方法将该节点插入到 AQS 的阻塞队列。
private Node enq(final Node node) { for (;;) { Node t = tail;//(1) if (t == null) { // Must initialize if (compareAndSetHead(new Node()))//(2) tail = head; } else { node.prev = t;//(3) if (compareAndSetTail(t, node)) {//(4) t.next = node; return t; } } } }
下面结合代码和节点图(见图 6-2)来讲解入队的过程。如上代码在第一次循环中, 当要在 AQS 队列尾部插入元素时,AQS 队列状态如图 6-2 中(default) 所示。也就是队列 头、尾节点都指向 null ;当执行代码(1)后节点 t 指向了尾部节点,这时候队列状态如图 6-2 中(I)所示。
这时候 t 为 null,故执行代码(2),使用 CAS 算法设置一个哨兵节点为头节点,如果 CAS 设置成功,则让尾部节点也指向哨兵节点,这时候队列状态如图 6-2 中(II)所示。
到现在为止只插入了一个哨兵节点,还需要插入 node 节点,所以在第二次循环后执 行到代码(1),这时候队列状态如图 6-2(III)所示 ;然后执行代码(3)设置 node 的 前驱节点为尾部节点,这时候队列状态如图 6-2 中(IV)所示 ;然后通过 CAS 算法设置 node 节点为尾部节点,CAS 成功后队列状态如图 6-2 中(V)所示 ;CAS 成功后再设置原 来的尾部节点的后驱节点为 node, 这时候就完成了双向链表的插入,此时队列状态如图 6-2 中(VI)所示。
2.2 AQS——条件变量的支持
正如在基础篇中讲解的,notify 和 wait,是配合 synchronized 内置锁实现线程间同步 的基础设施一样,条件变量的 signal 和 await 方法也是用来配合锁(使用 AQS 实现的锁) 实现线程间同步的基础设施。
它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步, 而 AQS 的一个锁可以对应多个条件变量。
在基础篇中讲解了,在调用共享变量的 notify 和 wait 方法前必须先获取该共享变量的 内置锁,同理,在调用条件变量的 signal 和 await 方法前也必须先获取条件变量对应的锁。
那么,到底什么是条件变量呢?如何使用呢?不急,下面看一个例子。
ReentrantLock lock = new ReentrantLock();//(1) Condition condition = lock.newCondition();//(2) lock.lock();//(3) try { System.out.println("begin wait"); condition.await();//(4) System.out.println("end wait"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//(5) } lock.lock();//(6) try { System.out.println("begin signal"); condition.signal();//(7) System.out.println("end signal"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//(8) }
代码(1)创建了一个独占锁 ReentrantLock 对象,ReentrantLock 是基于 AQS 实现的锁。
代码(2)使用创建的 Lock 对象的 newCondition()方法创建了一个 ConditionObject 变量,这个变量就是 Lock 锁对应的一个条件变量。需要注意的是,一个 Lock 对象可以创 建多个条件变量。
代码(3)首先获取了独占锁,代码(4) 则调用了条件变量的 await()方法阻塞挂起 了当前线程。当其他线程调用条件变量的 signal 方法时,被阻塞的线程才会从 await 处返回。 需要注意的是,和调用 Object 的 wait 方法一样,如果在没有获取到锁前调用了条件变量 的 await 方法则会抛出 java.lang.IllegalMonitorStateException 异常。
代码(5) 则释放了获取的锁。
其实这里的 Lock 对象等价于 synchronized 加上共享变量,调用 lock.lock()方法就 相当于进入了 synchronized 块(获取了共享变量的内置锁),调用 lock.unLock() 方法就相 当于退出 synchronized 块。 调用条件变量的 await() 方法就相当于调用共享变量的 wait() 方法,调用条件变量的 signal 方法就相当于调用共享变量的 notify() 方法。调用条件变量 的 signalAll()方法就相当于调用共享变量的 notifyAll() 方法。
经过上面解释,相信大家已经知道条件变量是什么,它是用来做什么的了。
在上面代码中,lock.newCondition() 的作用其实是 new 了一个在 AQS 内部声明的 ConditionObject 对象,ConditionObject 是 AQS 的内部类,可以访问 AQS 内部的变量(例 如状态变量 state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条 件变量的 await() 方法时被阻塞的线程。注意这个条件队列和 AQS 队列不是一回事。
在如下代码中,当线程调用条件变量的 await() 方法时(必须先调用锁的 lock() 方法 获取锁),在内部会构造一个类型为 Node.CONDITION 的 node 节点,然后将该节点插入 条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的 state 变量的值), 并被阻塞挂起。这时候如果有其他线程调用 lock.lock() 尝试获取锁,就会有一个线程获取 到锁,如果获取到锁的线程调用了条件变量的 await()方法,则该线程也会被放入条件 变量的阻塞队列,然后释放获取到的锁,在 await() 方法处阻塞。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //创建新的node节点,并插入到条件队列末尾(9) Node node = addConditionWaiter(); //释放当前线程获取的锁(10) int savedState = fullyRelease(node); int interruptMode = 0; //调用park方法阻塞挂起当前线程(11) while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } ... }
在如下代码中,当另外一个线程调用条件变量的 signal 方法时(必须先调用锁的 lock() 方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并 放入 AQS 的阻塞队列里面,然后激活这个线程。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //将条件队列头元素移动到AQS队列 doSignal(first); }
需要注意的是,AQS 只提供了 ConditionObject 的实现,并没有提供 newCondition 函数, 该函数用来 new 一个 ConditionObject 对象。需要由 AQS 的子类来提供 newCondition 函数。
下面来看当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列。
private Node addConditionWaiter() { Node t = lastWaiter; ... //(1) Node node = new Node(Thread.currentThread(), Node.CONDITION); //(2) if (t == null) firstWaiter = node; else t.nextWaiter = node;//(3) lastWaiter = node;//(4) return node; }
代码(1)首先根据当前线程创建一个类型为 Node.CONDITION 的节点,然后通过代 码(2)(3)(4)在单向条件队列尾部插入一个元素。
注意 :当多个线程同时调用 lock.lock() 方法获取锁时,只有一个线程获取到了锁,其 他线程会被转换为 Node 节点插入到 lock 锁对应的 AQS 阻塞队列里面,并做自旋 CAS 尝 试获取锁。
如果获取到锁的线程又调用了对应的条件变量的 await() 方法,则该线程会释放获取 到的锁,并被转换为 Node 节点插入到条件变量对应的条件队列里面。
这时候因为调用 lock.lock() 方法被阻塞到 AQS 队列里面的一个线程会获取到被释放 的锁,如果该线程也调用了条件变量的 await()方法则该线程也会被放入条件变量的条 件队列里面。
当另外一个线程调用条件变量的 signal() 或者 signalAll() 方法时,会把条件队列里面 的一个或者全部 Node 节点移动到 AQS 的阻塞队列里面,等待时机获取锁。
最后使用一个图(见图 6-3)总结如下 :一个锁对应一个 AQS 阻塞队列,对应多个条 件变量,每个条件变量有自己的一个条件队列。
2.3 基于 AQS 实现自定义同步器
本节我们基于 AQS 实现一个不可重入的独占锁,正如前文所讲的,自定义 AQS 需要 重写一系列函数,还需要定义原子变量 state 的含义。这里我们定义,state 为 0 表示目前锁没有被线程持有,state 为 1 表示锁已经被某一个线程持有,由于是不可重入锁,所以不 需要记录持有锁的线程获取锁的次数。另外,我们自定义的锁支持条件变量。
代码实现:
final static NonReentrantLock lock = new NonReentrantLock(); final static Condition notFull = lock.newCondition(); final static Condition notEmpty = lock.newCondition(); final static Queue<String> queue = new LinkedBlockingQueue<String>(); final static int queueSize = 10; public static void main(String[] args) { Thread producer = new Thread(new Runnable() { public void run() { //获取独占锁 lock.lock(); try{ //(1)如果队列满了,则等待 while(queue.size() == queueSize){ notEmpty.await(); } //(2)添加元素到队列 queue.add("ele"); //(3)唤醒消费线程 notFull.signalAll(); }catch(Exception e){ e.printStackTrace(); }finally { //释放锁 lock.unlock(); } } }); Thread consumer = new Thread(new Runnable() { public void run() { //获取独占锁 lock.lock(); try{ //队列空,则等待 while(0 == queue.size() ){ notFull.await();; } //消费一个元素 String ele = queue.poll(); //唤醒生产线程 notEmpty.signalAll(); }catch(Exception e){ e.printStackTrace(); }finally { //释放锁 lock.unlock(); } } }); //启动线程 producer.start(); consumer.start(); }
如上代码首先创建了 NonReentrantLock 的一个对象 lock,然后调用 lock.newCondition 创建了两个条件变量,用来进行生产者和消费者线程之间的同步。
在 main 函数里面,首先创建了 producer 生产线程,在线程内部首先调用 lock.lock() 获取独占锁,然后判断当前队列是否已经满了,如果满了则调用 notEmpty.await() 阻塞挂 起当前线程。需要注意的是,这里使用 while 而不是 if 是为了避免虚假唤醒。如果队列不 满则直接向队列里面添加元素,然后调用 notFull.signalAll() 唤醒所有因为消费元素而被阻 塞的消费线程,最后释放获取的锁。
然后在 main 函数里面创建了 consumer 生产线程,在线程内部首先调用 lock.lock() 获 取独占锁,然后判断当前队列里面是不是有元素,如果队列为空则调用 notFull.await() 阻 塞挂起当前线程。需要注意的是,这里使用 while 而不是 if 是为了避免虚假唤醒。如果队 列不为空则直接从队列里面获取并移除元素,然后唤醒因为队列满而被阻塞的生产线程, 最后释放获取的锁。
3 小结
好啦,关于AQS的基础就复习到这里哈。
标签:调用,JDK,回顾,队列,lock,获取,线程,AQS From: https://www.cnblogs.com/kukuxjx/p/18012613