Java中的锁
1. 顶级接口Lock
Java SE5之后并发包中新增了Lock接口,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。虽然它缺少了隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized 关键字所不具备的同步特性。
看一下Lock接口的定义:
public interface Lock {
// 获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回
void lock();
// 可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取
// 中可以中断当前线程
void lockInterruptibly() throws InterruptedException;
// 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返false
boolean tryLock();
// 超时的获取锁,当前线程在以下3种情况下会返回:
// 1. 当前线程在超时时间内获得了锁
// 2. 当前线程在超时时间内被中断
// 3. 超时时间结束,返回 false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的 wait()方法,而调用后,当前线程将释放锁
Condition newCondition();
}
2. AQS 队列同步器
队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的方法来进行操作,因为它们能够保证状态的改变是安全的。
同步器是实现锁(也可以是任意同步组件)的关键。锁是面向使用者的,它定义了使用者与锁交互的接口(比如Lock和UnLock等),隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
2.1 队列同步器的接口
同步器的设计是典型的模板方法,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
// 设置和获取同步状态
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享式释放同步状态
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
// 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(intarg)方法
public final void acquire(int arg) {
// .....
}
// 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进人同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// .....
}
// 在 acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// .....
}
// 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
public final boolean release(int arg) {
//.....
}
// 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进人同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
public final void acquireShared(int arg) {
// .....
}
// 与acquireShared(int arg)相同,该方法响应中断
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// .....
}
// 在 acquireSharedlnterruptibly(int arg)基础上增加了超时限制
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
// ......
}
// 共享式的释放同步状态
public final boolean releaseShared(int arg) {
// ....
}
// 获取等待在同步队列上的线程集合
public final Collection<Thread> getQueuedThreads() {
// .....
}
}
同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放同步状态以及查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。
2.2 同步队列的源码分析
这里分析的代码包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等。
2.2.1 同步队列
同步器依赖内部的同步队列(一个FIFO双向队列,注意与之后介绍的等待队列不同)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒使其再次尝试获取同步状态。
同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。
static final class Node {
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 获取同步状态的线程
Node nextWaiter; // 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段
volatile int waitStatus; // 等待状态
// 包含如下状态。
//1. CANCELLED,值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进人该状态将不会变化
//2. SIGNAL,值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
//3. CONDITION,值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加人到对同步状态的获取中
//4. PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件地被传播下去
//5. INITIAL,值为 0,初始状态
}
节点是构成 同步队列/等待队列 的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,形成链表的结构。
同步队列遵循FIFO,首节点是获取同步状态成功的节点(但此时正在执行该线程的逻辑,还没有释放资源,资源可以简单理解为锁),首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的,因为只有一个线程能够成功获取到同步状态,因此设置头节点的方法本身就是线程安全的。
2.2.2 独占式同步状态的获取和释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是线程由于获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
代码功能:完成了同步状态获取、节点构造、加人同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tyAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加人到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 首先尝试在尾节点插入,如果成功,则不需要进入死循环
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 死循环将节点插入尾部
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
节点进入同步队列之后,就进人了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧在这个自旋过程中(并会阻塞节点的线程)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 前驱节点是首节点,且获取同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时同步器调用release(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
2.2.3共享式的同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。
通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态。
public final void acquireShared(int arg) {
// 调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。
if (tryAcquireShared(arg) < 0)
// 无法获取同步状态,
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 前驱节点是头节点
int r = tryAcquireShared(arg);
if (r >= 0) { // 获得到同步状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以释放同步状态。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 唤醒后继节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
2.2.4 独占式超时获取同步状态
通过调用同步器的 doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。同步器提供了 acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 先判断一下是否有中断
if (Thread.interrupted())
throw new InterruptedException();
// 先尝试获取,失败则进入doAcquireNanos方法中
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 绝对时间上的超时时间
final long deadline = System.nanoTime() + nanosTimeout;
// 封装节点,并加入同步队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算还需等待的时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 阻塞等待
LockSupport.parkNanos(this, nanosTimeout);
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3. 操作实践
要求:设计一个同步工具,该工具在同一时刻,只允许至多三个线程同时访问,超过三个线程
的访问将被阻塞,我们将这个同步工具命名为ThirdLock。
详见下一章。
标签:同步,Java,开篇,获取,--,int,线程,arg,节点 From: https://www.cnblogs.com/strind/p/18486991