首页 > 编程语言 >Java同步器之ReentrantLock源码分析(一)

Java同步器之ReentrantLock源码分析(一)

时间:2022-12-05 10:33:49浏览次数:46  
标签:node Java lock ReentrantLock final 获取 源码 线程 节点

一、概述

ReentrantLockJava并发包中提供的一个可重入的互斥锁ReentrantLocksynchronized在基本用法,行为语义上都是类似的,同样都具有可重入性。只不过相比原生的SynchronizedReentrantLock增加了一些高级的扩展功能,比如它可以实现公平锁,同时也可以绑定多个Condition

二、特性

2.1 可重入性

所谓的可重入性,就是可以支持一个线程对锁的重复获取,原生的synchronized就具有可重入性,一个用synchronized修饰的递归方法,当线程在执行期间,它是可以反复获取到锁的,而不会出现自己把自己锁死的情况。ReentrantLock也是如此,在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。那么有可重入锁,就有不可重入锁,我们在之前文章中自定义的一个Mutex锁就是个不可重入锁,不过使用场景极少而已。

2.2 公平锁与非公平锁

所谓公平锁,顾名思义是指锁的获取策略相对公平,当多个线程在获取同一个锁时,必须按照锁的申请时间来依次获得锁,不能插队;非公平锁则不同,当锁被释放时,等待中的线程均有机会获得锁。synchronized是非公平锁,ReentrantLock默认也是非公平的,可以通过带boolean参数的构造方法指定使用公平锁,但非公平锁的性能一般要优于公平锁。

synchronizedJava原生的互斥同步锁,使用方便,对于synchronized修饰的方法或同步块,无需再显式释放锁。synchronized底层是通过monitorentermonitorexit两个字节码指令来实现加锁解锁操作的。而ReentrantLock做为API层面的互斥锁,需要显式地去加锁解锁。

class X {
    private final ReentrantLock lock = new ReentrantLock();

    // ...
 
    public void m() {
        lock.lock();  // 加锁
        try {
            // ... 函数主题
        } finally {
            lock.unlock(); //解锁
        }
    }
}

2.3 条件锁

条件锁,是指在获取锁之后发现当前业务场景自己无法处理,而需要等待某个条件的出现才可以继续处理时使用的一种锁。

比如,在阻塞队列中,当队列中没有元素的时候是无法弹出一个元素的,这时候就需要阻塞在条件notEmpty上,等待其它线程往里面放入一个元素后,唤醒这个条件notEmpty,当前线程才可以继续去做“弹出一个元素”的行为。

注意,这里的条件,必须是在获取锁之后去等待,对应到ReentrantLock的条件锁,就是获取锁之后才能调用condition.await()方法。

java中,条件锁的实现都在AQSConditionObject类中,ConditionObject实现了Condition接口。

public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {
        // 声明一个重入锁
        ReentrantLock lock = new ReentrantLock();
        // 声明一个条件锁
        Condition condition = lock.newCondition();

        new Thread(() -> {
            try {
                lock.lock();  // 1
                try {
                    System.out.println("before await");  // 2
                    // 等待条件
                    condition.await();  // 3
                    System.out.println("after await");  // 10
                } finally {
                    lock.unlock();  // 11
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        // 这里睡1000ms是为了让上面的线程先获取到锁
        Thread.sleep(1000);
        lock.lock();  // 4
        try {
            // 这里睡2000ms代表这个线程执行业务需要的时间
            Thread.sleep(2000);  // 5
            System.out.println("before signal");  // 6
            // 通知条件已成立
            condition.signal();  // 7
            System.out.println("after signal");  // 8
        } finally {
            lock.unlock();  // 9
        }
    }
}

三、源码分析 - 公平锁/非公平锁

ReentrantLock是基于AQS的,AQSJava并发包中众多同步组件的构建基础,它通过一个int类型的状态变量state和一个FIFO队列来完成共享资源的获取,线程的排队等待等。AQS是个底层框架,采用模板方法模式,它定义了通用的较为复杂的逻辑骨架,比如线程的排队,阻塞,唤醒等,将这些复杂但实质通用的部分抽取出来,这些都是需要构建同步组件的使用者无需关心的,使用者仅需重写一些简单的指定的方法即可(其实就是对于共享变量state的一些简单的获取释放的操作)。

3.1 内部类

abstract static class Sync extends AbstractQueuedSynchronizer {}

static final class NonfairSync extends Sync {}

static final class FairSync extends Sync {}
  1. 抽象类Sync实现了AQS的部分方法;
  2. NonfairSync实现了Sync,主要用于非公平锁的获取;
  3. FairSync实现了Sync,主要用于公平锁的获取。

3.2 属性

private final Sync sync;

主要属性就一个sync,它在构造方法中初始化,决定使用公平锁还是非公平锁的方式获取锁。

3.3 构造器

3.3.1 无参构造器(默认为非公平锁)

public ReentrantLock() {
    sync = new NonfairSync();//默认是非公平的
}

syncReentrantLock内部实现的一个同步组件,它是ReentrantLock的一个静态内部类,继承于AQS,后面我们再分析。

3.3.2 带布尔值的构造器(是否公平)

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();//fair为true,公平锁;反之,非公平锁
}

看到了吧,此处可以指定是否采用公平锁,FailSyncNonFailSync亦为ReentrantLock的静态内部类,都继承于Sync

3.4 lock()方法

3.4.1 公平锁

这里我们假设ReentrantLock的实例是通过以下方式获得的:

ReentrantLock reentrantLock = new ReentrantLock(true);

加锁的主要逻辑:

// ReentrantLock.lock()
public void lock() {
    // 调用的sync属性的lock()方法
    // 这里的sync是公平锁,所以是FairSync的实例
    sync.lock();
}

// ReentrantLock.FairSync.lock()
final void lock() {
    // 调用AQS的acquire()方法获取锁
    // 注意,这里传的值为1
    acquire(1);
}

// AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
    // 尝试获取锁
    // 如果失败了,就排队
    if (!tryAcquire(arg) &&
        // 注意addWaiter()这里传入的节点模式为独占模式
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// ReentrantLock.FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 当前线程
    final Thread current = Thread.currentThread();
    // 查看当前状态变量的值
    int c = getState();
    // 如果状态变量的值为0,说明暂时还没有人占有锁
    if (c == 0) {
        // 如果没有其它线程在排队,那么当前线程尝试更新state的值为1
        // 如果成功了,则说明当前线程获取了锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 当前线程获取了锁,把自己设置到exclusiveOwnerThread变量中
            // exclusiveOwnerThread是AQS的父类AbstractOwnableSynchronizer中提供的变量
            setExclusiveOwnerThread(current);
            // 返回true说明成功获取了锁
            return true;
        }
    }
    // 如果当前线程本身就占有着锁,现在又尝试获取锁
    // 那么,直接让它获取锁并返回true
    else if (current == getExclusiveOwnerThread()) {
        // 状态变量state的值加1
        int nextc = c + acquires;
        // 如果溢出了,则报错
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 设置到state中
        // 这里不需要CAS更新state
        // 因为当前线程占有着锁,其它线程只会CAS把state从0更新成1,是不会成功的
        // 所以不存在竞争,自然不需要使用CAS来更新
        setState(nextc);
        // 当线程获取锁成功
        return true;
    }
    // 当前线程尝试获取锁失败
    return false;
}

// AbstractQueuedSynchronizer.addWaiter()
// 调用这个方法,说明上面尝试获取锁失败了
private Node addWaiter(Node mode) {
    // 新建一个节点
    Node node = new Node(Thread.currentThread(), mode);
    // 这里先尝试把新节点加到尾节点后面
    // 如果成功了就返回新节点
    // 如果没成功再调用enq()方法不断尝试
    Node pred = tail;
    // 如果尾节点不为空
    if (pred != null) {
        // 设置新节点的前置节点为现在的尾节点
        node.prev = pred;
        // CAS更新尾节点为新节点
        if (compareAndSetTail(pred, node)) {
            // 如果成功了,把旧尾节点的下一个节点指向新节点
            pred.next = node;
            // 并返回新节点
            return node;
        }
    }
    // 如果上面尝试入队新节点没成功,调用enq()处理
    enq(node);
    return node;
}

// AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
    // 自旋,不断尝试
    for (;;) {
        Node t = tail;
        // 如果尾节点为空,说明还未初始化
        if (t == null) { // Must initialize
            // 初始化头节点和尾节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 如果尾节点不为空
            // 设置新节点的前一个节点为现在的尾节点
            node.prev = t;
            // CAS更新尾节点为新节点
            if (compareAndSetTail(t, node)) {
                // 成功了,则设置旧尾节点的下一个节点为新节点
                t.next = node;
                // 并返回旧尾节点
                return t;
            }
        }
    }
}
// AbstractQueuedSynchronizer.acquireQueued()
// 调用上面的addWaiter()方法使得新节点已经成功入队了
// 这个方法是尝试让当前节点来获取锁的
final boolean acquireQueued(final Node node, int arg) {
    // 失败标记
    boolean failed = true;
    try {
        // 中断标记
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 当前节点的前一个节点
            final Node p = node.predecessor();
            // 如果当前节点的前一个节点为head节点,则说明轮到自己获取锁了
            // 调用ReentrantLock.FairSync.tryAcquire()方法再次尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 尝试获取锁成功
                // 这里同时只会有一个线程在执行,所以不需要用CAS更新
                // 把当前节点设置为新的头节点
                setHead(node);
                // 并把上一个节点从链表中删除
                p.next = null; // help GC
                // 未失败
                failed = false;
                return interrupted;
            }
            // 是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 真正阻塞的方法
                parkAndCheckInterrupt())
                // 如果中断了
                interrupted = true;
        }
    } finally {
        // 如果失败了
        if (failed)
            // 取消获取锁
            cancelAcquire(node);
    }
}

// AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
// 这个方法是在上面的for()循环里面调用的
// 第一次调用会把前一个节点的等待状态设置为SIGNAL,并返回false
// 第二次调用才会返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 上一个节点的等待状态
    // 注意Node的waitStatus字段我们在上面创建Node的时候并没有指定
    // 也就是说使用的是默认值0
    // 这里把各种等待状态再贴出来
    //static final int CANCELLED =  1;
    //static final int SIGNAL    = -1;
    //static final int CONDITION = -2;
    //static final int PROPAGATE = -3;
    int ws = pred.waitStatus;
    // 如果等待状态为SIGNAL(等待唤醒),直接返回true
    if (ws == Node.SIGNAL)
        return true;
    // 如果前一个节点的状态大于0,也就是已取消状态
    if (ws > 0) {
        // 把前面所有取消状态的节点都从链表中删除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前一个节点的状态小于等于0,则把其状态设置为等待唤醒
        // 这里可以简单地理解为把初始状态0设置为SIGNAL
        // CONDITION是条件锁的时候使用的
        // PROPAGATE是共享锁使用的
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
    // 阻塞当前线程
    // 底层调用的是Unsafe的park()方法
    LockSupport.park(this);
    // 返回是否已中断
    return Thread.interrupted();
}

主要方法的调用关系:

ReentrantLock#lock()
->ReentrantLock.FairSync#lock() // 公平模式获取锁
  ->AbstractQueuedSynchronizer#acquire() // AQS的获取锁方法
    ->ReentrantLock.FairSync#tryAcquire() // 尝试获取锁
    ->AbstractQueuedSynchronizer#addWaiter()  // 添加到队列
	  ->AbstractQueuedSynchronizer#enq()  // 入队
    ->AbstractQueuedSynchronizer#acquireQueued() // 里面有个for()循环,唤醒后再次尝试获取锁
      ->AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() // 检查是否要阻塞
      ->AbstractQueuedSynchronizer#parkAndCheckInterrupt()  // 真正阻塞的地方

获取锁的主要过程大致如下:

  1. 尝试获取锁,如果获取到了就直接返回了;
  2. 尝试获取锁失败,再调用addWaiter()构建新节点并把新节点入队;
  3. 然后调用acquireQueued()再次尝试获取锁,如果成功了,直接返回;
  4. 如果再次失败,再调用shouldParkAfterFailedAcquire()将节点的等待状态置为等待唤醒(SIGNAL);
  5. 调用parkAndCheckInterrupt()阻塞当前线程;
  6. 如果被唤醒了,会继续在acquireQueued()for()循环再次尝试获取锁,如果成功了就返回;
  7. 如果不成功,再次阻塞,重复(3)(4)(5)直到成功获取到锁。

以上就是整个公平锁获取锁的过程,下面我们看看非公平锁是怎么获取锁的。

3.4.2 非公平锁

// ReentrantLock.lock()
public void lock() {
    sync.lock();
}

// ReentrantLock.NonfairSync.lock()
// 这个方法在公平锁模式下是直接调用的acquire(1);
final void lock() {
    // 直接尝试CAS更新状态变量
    if (compareAndSetState(0, 1))
        // 如果更新成功,说明获取到锁,把当前线程设为独占线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

// ReentrantLock.NonfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    // 调用父类的方法
    return nonfairTryAcquire(acquires);
}

// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 如果状态变量的值为0,再次尝试CAS更新状态变量的值
        // 相对于公平锁模式少了!hasQueuedPredecessors()条件
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

相对于公平锁,非公平锁加锁的过程主要有两点不同:

  1. 一开始就尝试CAS更新状态变量state的值,如果成功了就获取到锁了;
  2. tryAcquire()的时候没有检查是否前面有排队的线程,直接上去获取锁才不管别人有没有排队呢;

总的来说,相对于公平锁,非公平锁在一开始就多了两次直接尝试获取锁的过程。

3.5 lockInterruptibly()方法

支持线程中断,它与lock()方法的主要区别在于lockInterruptibly()获取锁的时候如果线程中断了,会抛出会抛出InterruptedException异常,而lock()不会管线程是否中断都会一直尝试获取锁,获取锁之后把自己标记为已中断,继续执行自己的逻辑,后面也会正常释放锁。

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);//代理到sync的相应方法上,同lock方法的区别是此方法响应中断
}

题外话:

线程中断,只是在线程上打一个中断标志,并不会对运行中的线程有什么影响,具体需要根据这个中断标志干些什么,用户自己去决定。

比如,如果用户在调用lock()获取锁后,发现线程中断了,就直接返回了,而导致没有释放锁,这也是允许的,但是会导致这个锁一直得不到释放,就出现了死锁。

lock.lock();

if (Thread.currentThread().interrupted()) {
    return ;
}

lock.unlock();

当然,这里只是举个例子,实际使用肯定是要把lock.lock()后面的代码都放在try...finally...里面的以保证锁始终会释放,这里主要是为了说明线程中断只是一个标志,至于要做什么完全由用户自己决定。

3.6 tryLock()方法

尝试获取一次锁,成功了就返回true,没成功就返回false,不会继续尝试。

// ReentrantLock.tryLock()
public boolean tryLock() {
    // 直接调用Sync的nonfairTryAcquire()方法
    return sync.nonfairTryAcquire(1);//代理到sync的相应方法上
}

// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

tryLock直接以非公平的模式去尝试获取一次锁,获取到了或者锁本来就是当前线程占有着就返回true,否则返回false

3.7 tryLock(long time, TimeUnit unit)方法

尝试获取锁,并等待一段时间,如果在这段时间内都没有获取到锁,就返回false

// ReentrantLock.tryLock()
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    // 调用AQS中的方法
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

// AbstractQueuedSynchronizer.tryAcquireNanos()
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果线程中断了,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 先尝试获取一次锁
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

// AbstractQueuedSynchronizer.doAcquireNanos()
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果时间已经到期了,直接返回false
    if (nanosTimeout <= 0L)
        return false;
    // 到期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            // 如果到期了,就直接返回false
            if (nanosTimeout <= 0L)
                return false;
            // spinForTimeoutThreshold = 1000L;
            // 只有到期时间大于1000纳秒,才阻塞
            // 小于等于1000纳秒,直接自旋解决就得了
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 阻塞一段时间
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在阻塞的时候加上阻塞时间,并且会随时检查是否到期,只要到期了没获取到锁就返回false

3.8 unlock()方法

// ReentrantLock.unlock()
public void unlock() {
    sync.release(1);
}

// AbstractQueuedSynchronizer.release
public final boolean release(int arg) {
    // 调用AQS实现类的tryRelease()方法释放锁
    if (tryRelease(arg)) {
        Node h = head;
        // 如果头节点不为空,且等待状态不是0,就唤醒下一个节点
        // 还记得waitStatus吗?
        // 在每个节点阻塞之前会把其上一个节点的等待状态设为SIGNAL(-1)
        // 所以,SIGNAL的准确理解应该是唤醒下一个等待的线程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// ReentrantLock.Sync.tryRelease
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 如果当前线程不是占有着锁的线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果状态变量的值为0了,说明完全释放了锁
    // 这也就是为什么重入锁调用了多少次lock()就要调用多少次unlock()的原因
    // 如果不这样做,会导致锁不会完全释放,别的线程永远无法获取到锁
    if (c == 0) {
        free = true;
        // 清空占有线程
        setExclusiveOwnerThread(null);
    }
    // 设置状态变量的值
    setState(c);
    return free;
}

private void unparkSuccessor(Node node) {
    // 注意,这里的node是头节点
    
    // 如果头节点的等待状态小于0,就把它设置为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 头节点的下一个节点
    Node s = node.next;
    // 如果下一个节点为空,或者其等待状态大于0(实际为已取消)
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾节点向前遍历取到队列最前面的那个状态不是已取消状态的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果下一个节点不为空,则唤醒它
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放锁的过程大致为:

  1. state的值减1
  2. 如果state减到了0,说明已经完全释放锁了,唤醒下一个等待着的节点;

3.9 内部类详情

3.9.1 NonFairSync(非公平锁)

static final class NonfairSync extends Sync {//继承Sync

    private static final long serialVersionUID = 7316153563782823691L;

    // 获取锁
    final void lock() {
        if (compareAndSetState(0, 1))//CAS设置state状态,若原值是0,将其置为1
            setExclusiveOwnerThread(Thread.currentThread());//将当前线程标记为已持有锁
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);//调用Sync中的方法
    }
}

compareAndSetState若设置失败,调用AQSacquire方法,acquire又会调用下面重写的tryAcquire方法。

这里说的调用失败有两种情况:

  1. 当前没有线程获取到资源,state0,但是将state0设置为1的时候,其他线程抢占资源,将state修改了,导致了CAS失败;
  2. state原本就不为0,也就是已经有线程获取到资源了,有可能是别的线程获取到资源,也有可能是当前线程获取的,这时线程又重复去获取,所以去tryAcquire中的nonfairTryAcquire我们应该就能看到可重入的实现逻辑了。

3.9.1.1 nonfairTryAcquire()

final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    // 获取当前state值    
    int c = getState();
    // 若state为0,意味着没有线程获取到资源,CAS将state设置为1,并将当前线程标记我获取到排他锁的线程,返回true    
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 若state不为0,但是持有锁的线程是当前线程
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;//state累加1
        if (nextc < 0) // int类型溢出了
            throw new Error("Maximum lock count exceeded");
        // 设置state,此时state大于1,代表着一个线程多次获锁,state的值即是线程重入的次数
        setState(nextc);
        return true;//返回true,获取锁成功
    }
    return false;//获取锁失败了
}

简单总结下流程:

1. 先获取state值,若为0,意味着此时没有线程获取到资源,CAS将其设置为1,设置成功则代表获取到排他锁了;
2. 若state大于0,肯定有线程已经抢占到资源了,此时再去判断是否就是自己抢占的,是的话,state累加,返回true,重入成功,state的值即是线程重入的次数;
3. 其他情况,则获取锁失败。

3.9.2 FairSync(公平锁)

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);//直接调用AQS的模板方法acquire
    }

    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程
        final Thread current = Thread.currentThread();
        // 获取state值
        int c = getState();
        // 若state为0,意味着当前没有线程获取到资源,那就可以直接获取资源了吗?
        // NO!这不就跟之前的非公平锁的逻辑一样了嘛。看下面的逻辑
        if (c == 0) {
            // 判断在时间顺序上,是否有申请锁排在自己之前的线程,
            // 若没有,才能去获取,CAS设置state,并标记当前线程为持有排他锁的线程;
            // 反之,不能获取!这即是公平的处理方式。
            if (!hasQueuedPredecessors() 
                && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//重入的处理逻辑,与上文一致,不再赘述
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

可以看到,公平锁的大致逻辑与非公平锁是一致的,不同的地方在于有了!hasQueuedPredecessors()这个判断逻辑,即便state0,也不能贸然直接去获取,要先去看有没有还在排队的线程,若没有,才能尝试去获取,做后面的处理。反之,返回false,获取失败。

看看这个判断是否有排队中线程的逻辑

3.9.2.1 hasQueuedPredecessors()方法

public final boolean hasQueuedPredecessors() {
    Node t = tail; // 尾结点
    Node h = head;//头结点
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());//判断是否有排在自己之前的线程
}

需要注意的是,这个判断是否有排在自己之前的线程的逻辑稍微有些绕,我们来梳理下,由代码得知,有两种情况会返回true,我们将此逻辑分解一下(注意:返回true意味着有其他线程申请锁比自己早,需要放弃抢占)

  1. h !=t && (s = h.next) == null,这个逻辑成立的一种可能是head指向头结点,tail此时还为null
    考虑这种情况:当其他某个线程去获取锁失败,需构造一个结点加入同步队列中(假设此时同步队列为空),在添加的时候,需要先创建一个无意义傀儡头结点(在AQSenq方法中,这是个自旋CAS操作),有可能在将head指向此傀儡结点完毕之后,还未将tail指向此结点。很明显,此线程时间上优于当前线程,所以,返回true,表示有等待中的线程且比自己来的还早。
  2. h != t && (s = h.next) != null && s.thread != Thread.currentThread() 。同步队列中已经有若干排队线程且当前线程不是队列的老二结点,此种情况会返回true
    假如没有s.thread !=Thread.currentThread()这个判断的话,会怎么样呢?
    若当前线程已经在同步队列中是老二结点(头结点此时是个无意义的傀儡结点),此时持有锁的线程释放了资源,唤醒老二结点线程,老二结点线程重新tryAcquire(此逻辑在AQS中的acquireQueued方法中),又会调用到hasQueuedPredecessors,不加s.thread !=Thread.currentThread()这个判断的话,返回值就为true,导致tryAcquire失败。

最后,来看看ReentrantLocktryRelease,定义在Sync

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;//减去1个资源
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //若state值为0,表示当前线程已完全释放干净,返回true,上层的AQS会意识到资源已空出。
    //若不为0,则表示线程还占有资源,只不过将此次重入的资源的释放了而已,返回false。
    if (c == 0) {
        free = true;//
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

四、源码分析 - 条件锁

4.1 内部类

public class ConditionObject implements Condition, java.io.Serializable {
    
    /** First node of condition queue. */
    private transient Node firstWaiter;
    
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

可以看到条件锁中也维护了一个队列,为了和AQS的队列区分,我这里称为条件队列,firstWaiter是队列的头节点,lastWaiter是队列的尾节点。

4.2 lock.newCondition()方法

新建一个条件锁。

// ReentrantLock.newCondition()
public Condition newCondition() {
    return sync.newCondition();
}

// ReentrantLock.Sync.newCondition()
final ConditionObject newCondition() {
    return new ConditionObject();
}

// AbstractQueuedSynchronizer.ConditionObject.ConditionObject()
public ConditionObject() { }

新建一个条件锁最后就是调用的AQS中的ConditionObject类来实例化条件锁。

4.3 condition.await()方法

condition.await()方法,表明现在要等待条件的出现。

// AbstractQueuedSynchronizer.ConditionObject.await()
public final void await() throws InterruptedException {
    // 如果线程中断了,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 添加节点到Condition的队列中,并返回该节点
    Node node = addConditionWaiter();
    // 完全释放当前线程获取的锁
    // 因为锁是可重入的,所以这里要把获取的锁全部释放
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 是否在同步队列中
    while (!isOnSyncQueue(node)) {
        // 阻塞当前线程
        LockSupport.park(this);
        
        // 上面部分是调用await()时释放自己占有的锁,并阻塞自己等待条件的出现

        // *************************分界线*************************  //

        // 下面部分是条件已经出现,尝试去获取锁
        
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 尝试获取锁,注意第二个参数,这是上一章分析过的方法
    // 如果没获取到会再次阻塞(这个方法这里就不贴出来了,有兴趣的翻翻上一章的内容)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除取消的节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 线程中断相关
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

// AbstractQueuedSynchronizer.ConditionObject.addConditionWaiter
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果条件队列的尾节点已取消,从头节点开始清除所有已取消的节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        // 重新获取尾节点
        t = lastWaiter;
    }
    // 新建一个节点,它的等待状态是CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 如果尾节点为空,则把新节点赋值给头节点(相当于初始化队列)
    // 否则把新节点赋值给尾节点的nextWaiter指针
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 尾节点指向新节点
    lastWaiter = node;
    // 返回新节点
    return node;
}

// AbstractQueuedSynchronizer.fullyRelease
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取状态变量的值,重复获取锁,这个值会一直累加
        // 所以这个值也代表着获取锁的次数
        int savedState = getState();
        // 一次性释放所有获得的锁
        if (release(savedState)) {
            failed = false;
            // 返回获取锁的次数
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

// AbstractQueuedSynchronizer.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
    // 如果等待状态是CONDITION,或者前一个指针为空,返回false
    // 说明还没有移到AQS的队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果next指针有值,说明已经移到AQS的队列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;
    // 从AQS的尾节点开始往前寻找看是否可以找到当前节点,找到了也说明已经在AQS的队列中了
    return findNodeFromTail(node);
}

这里有几个难理解的点:

  1. Condition的队列和AQS的队列不完全一样;

    • AQS的队列头节点是不存在任何值的,是一个虚节点;
    • Condition的队列头节点是存储着实实在在的元素值的,是真实节点。
  2. 各种等待状态(waitStatus)的变化;

    • 首先,在条件队列中,新建节点的初始等待状态是CONDITION(-2)
    • 其次,移到AQS的队列中时等待状态会更改为0(AQS队列节点的初始等待状态为0);
    • 然后,在AQS的队列中如果需要阻塞,会把它上一个节点的等待状态设置为SIGNAL(-1)
    • 最后,不管在Condition队列还是AQS队列中,已取消的节点的等待状态都会设置为CANCELLED(1)
    • 另外,后面我们在共享锁的时候还会讲到另外一种等待状态叫PROPAGATE(-3)
  3. 相似的名称;

    • AQS中下一个节点是next,上一个节点是prev
    • Condition中下一个节点是nextWaiter,没有上一个节点。

总结一下await()方法的大致流程:

  1. 新建一个节点加入到条件队列中去;
  2. 完全释放当前线程占有的锁;
  3. 阻塞当前线程,并等待条件的出现;
  4. 条件已出现(此时节点已经移到AQS的队列中),尝试获取锁;

也就是说await()方法内部其实是先释放锁->等待条件->再次获取锁的过程。

4.4 condition.signal()方法

condition.signal()方法通知条件已经出现。

// AbstractQueuedSynchronizer.ConditionObject.signal
public final void signal() {
    // 如果不是当前线程占有着锁,调用这个方法抛出异常
    // 说明signal()也要在获取锁之后执行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 条件队列的头节点
    Node first = firstWaiter;
    // 如果有等待条件的节点,则通知它条件已成立
    if (first != null)
        doSignal(first);
}

// AbstractQueuedSynchronizer.ConditionObject.doSignal
private void doSignal(Node first) {
    do {
        // 移到条件队列的头节点往后一位
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 相当于把头节点从队列中出队
        first.nextWaiter = null;
        // 转移节点到AQS队列中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

// AbstractQueuedSynchronizer.transferForSignal
final boolean transferForSignal(Node node) {
    // 把节点的状态更改为0,也就是说即将移到AQS队列中
    // 如果失败了,说明节点已经被改成取消状态了
    // 返回false,通过上面的循环可知会寻找下一个可用节点
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 调用AQS的入队方法把节点移到AQS的队列中
    // 注意,这里enq()的返回值是node的上一个节点,也就是旧尾节点
    Node p = enq(node);
    // 上一个节点的等待状态
    int ws = p.waitStatus;
    // 如果上一个节点已取消了,或者更新状态为SIGNAL失败(也是说明上一个节点已经取消了)
    // 则直接唤醒当前节点对应的线程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    // 如果更新上一个节点的等待状态为SIGNAL成功了
    // 则返回true,这时上面的循环不成立了,退出循环,也就是只通知了一个节点
    // 此时当前节点还是阻塞状态
    // 也就是说调用signal()的时候并不会真正唤醒一个节点
    // 只是把节点从条件队列移到AQS队列中
    return true;
}

signal()方法的大致流程为:

  1. 从条件队列的头节点开始寻找一个非取消状态的节点;
  2. 把它从条件队列移到AQS队列;
  3. 且只移动一个节点;

注意,这里调用signal()方法后并不会真正唤醒一个节点,那么,唤醒一个节点是在啥时候呢?

还记得开头例子吗?倒回去再好好看看,signal()方法后,最终会执行lock.unlock()方法,此时才会真正唤醒一个节点,唤醒的这个节点如果曾经是条件节点的话又会继续执行await()方法“分界线”下面的代码。

五、总结

ReentrantLock是一种可重入的,可实现公平性的互斥锁(默认是非公平模式,因为非公平模式效率更高),它的设计基于AQS框架,可重入和公平性的实现逻辑都不难理解,每重入一次,state就加1,当然在释放的时候,也得一层一层释放。至于公平性,在尝试获取锁的时候多了一个判断:是否有比自己申请早的线程在同步队列中等待,若有,去等待;若没有,才允许去抢占。

条件锁是指为了等待某个条件出现而使用的一种锁;条件锁比较经典的使用场景就是队列为空时阻塞在条件notEmpty上;ReentrantLock中的条件锁是通过AQSConditionObject内部类实现的;

await()signal()方法都必须在获取锁之后释放锁之前使用;await()方法会新建一个节点放到条件队列中,接着完全释放锁,然后阻塞当前线程并等待条件的出现;signal()方法会寻找条件队列中第一个可用节点移到AQS队列中;在调用signal()方法的线程调用unlock()方法才真正唤醒阻塞在条件上的节点(此时节点已经在AQS队列中);之后该节点会再次尝试获取锁,后面的逻辑与lock()的逻辑基本一致了。

标签:node,Java,lock,ReentrantLock,final,获取,源码,线程,节点
From: https://www.cnblogs.com/ciel717/p/16185038.html

相关文章

  • 两道面试题,带你解析Java类加载机制
    在许多Java面试中,我们经常会看到关于Java类加载机制的考察,例如下面这道题:classGrandpa{static{System.out.println("爷爷在静态代码块");}}classFatherextendsGra......
  • JAVA 解压缩代码写法
    packagecom.chinaunicom.asset.common.utils.compress;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.compress.archivers.ArchiveEntry;importorg.......
  • Java8新特性之方法引用
    1.1基本概念方法引用主要指通过方法的名字来指向一个方法而不需要为方法引用提供方法体,该方法的调用交给函数式接口执行。方法引用是在特定场景下lambda表达式的一种......
  • JavaScript入门⑤-欲罢不能的对象原型与继承-全网一般图文版
    JavaScript入门系列目录JavaScript入门①-基础知识筑基JavaScript入门②-函数(1)基础{浅出}JavaScript入门③-函数(2)原理{深入}执行上下文JavaScript入门④-万物皆......
  • javascript中屏蔽esc键
     今天有客户说网页输入时,不小心按ESC键,结果把结果清除了,想屏蔽,其实是可以的,虽然要求怪怪,JAVASCRIPT可以实现:<scripttype="text/javascr......
  • jaxb中对java.util.Date的处理
    JAXB是个好东西,转换JAVAOBJECT到XML的,最近发现JAXB中对java.util.Date的转换有些要注意的地方,笔记之。比如有一个POJO如下:importjava.util.Date;p......
  • javascrpt 监听元素变化的两个API
    <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"/><metahttp-equiv="X-UA-Compatible"content="IE=edge"/><metaname="viewport"c......
  • 1.4 Apache Hadoop完全分布式集群搭建-hadoop-最全最完整的保姆级的java大数据学习资
    目录1.4ApacheHadoop完全分布式集群搭建1.4.1虚拟机环境准备1.4.2集群规划1.4.3安装Hadoop1.4.3.1集群配置1.4.3.1.1HDFS集群配置1.4.3.1.2MapReduce集群配置1.4.......
  • Web入门:JavaScript文字动画
    欢迎来的我的小院,恭喜你今天又要涨知识了!案例内容利用JavaScript实现文字逐步展现的动画效果。演示学习<!DOCTYPEhtml><htmllang="en"><head><metach......
  • java 给csv增加一列
    java给csv增加一列 /***读取CSV文件内容*@paramcsvFileName*@throwsIOException*/publicstaticvoidreadCSVAndWrite(StringinputCsvFileN......