文章目录
ReentrantLock 基本使用
在Java中,synchronized 和 ReentrantLock 都是用于确保线程同步的锁,都属于可重入锁。
ReentrantLock 相对于 synchronized 具有以下特点:等待可中断,可以设置超时时间,可以设置为公平锁,支持多个条件变量。
可重入锁
可重入锁是指如果一个线程已经获取了某个对象的锁,那么它可以重新获取这个锁而不会被阻塞。
public class ReentrantLockTest {
private final ReentrantLock lock = new ReentrantLock();
public void method1() {
lock.lock(); // 第一次获取锁
try {
System.out.println("进入method1");
method2(); // 调用method2,再次获取锁
} finally {
lock.unlock(); // 释放锁
}
}
public void method2() {
lock.lock(); // 第二次获取锁
try {
System.out.println("进入method2");
} finally {
lock.unlock(); // 释放锁
}
}
public static void main(String[] args) {
ReentrantLockTest lock = new ReentrantLockTest();
lock.method1();
}
}
等待可中断
等待可中断是指在使用lockInterruptibly()方法获取锁时,如果线程在等待获取锁的过程中被中断,那么该线程会抛出InterruptedException异常,从而中断等待的状态,这种方式可以避免线程一直等待下去,从而避免死锁的现象。
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
System.out.println(("t1线程启动..."));
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(("t1线程等锁的过程中被打断"));
return;
}
try {
System.out.println(("t1线程获得了锁"));
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
System.out.println(("main线程获得了锁"));
t1.start();
try {
Thread.sleep(1000);
t1.interrupt();
System.out.println(("执行打断"));
} finally {
lock.unlock();
}
}
设置超时时间
ReentrantLock的tryLock()方法支持等待一段时间后就放弃锁的争抢,该方法有两个重载版本:
-
boolean tryLock(): 尝试获取锁,如果成功则返回true,如果锁被其他线程持有则立即返回false。
-
boolean tryLock(long timeout, TimeUnit unit):尝试在指定的时间内获取锁,如果在指定时间内成功获取锁则返回true,如果在指定时间内未能获取锁则返回false。
无参的 tryLock()
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
System.out.println(("t1线程启动..."));
if (!lock.tryLock()) {
System.out.println(("t1线程获取失败,返回"));
return;
}
try {
System.out.println(("t1线程获得了锁"));
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
System.out.println(("main线程获得了锁"));
t1.start();
try {
Thread.sleep(2000);
} finally {
lock.unlock();
}
}
带时间参数的tryLock()
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
System.out.println(("t1线程启动..."));
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
System.out.println(("t1线程等待 1s 后失败,返回"));
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(("获得了锁"));
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
System.out.println(("main线程获得了锁"));
t1.start();
try {
Thread.sleep(2000);
} finally {
lock.unlock();
}
}
公平锁
默认情况下,ReentrantLock 是非公平锁,在非公平锁模式下,线程获取锁的顺序是不确定的,新来的线程有可能插入到等待队列的前面,从而插队获取锁;而在公平锁模式下,ReentrantLock 会严格按照线程请求锁的顺序来分配锁。
非公平锁的优点是性能比公平锁好,因为公平锁需要维护一个有序的等待队列,并且在分配锁时需要进行更多的同步操作,下面的源码会分析
非公平锁
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
lock.lock();
for (int i = 0; i < 300; i++) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}
// 1s 之后去争抢锁
Thread.sleep(1000);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " start...");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "强行插入").start();
lock.unlock();
}
公平锁
ReentrantLock lock = new ReentrantLock(true);
设置为公平锁后,强行插入的running会在最后才输出。
条件变量
ReentrantLock 可以支持多个条件变量,它提供了一个newCondition()方法,该方法返回一个与该锁关联的条件变量,可以通过调用这个方法来创建多个条件变量,条件变量通常用于线程间的通信,允许线程等待某个条件的发生,或者通知其他线程某个条件已经满足。
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition1 = lock.newCondition();
private final Condition condition2 = lock.newCondition();
private int state = 0;
public void waitForCondition1() throws InterruptedException {
lock.lock();
try {
while (state != 1) {
condition1.await();
}
System.out.println("Condition 1 is met");
} finally {
lock.unlock();
}
}
public void waitForCondition2() throws InterruptedException {
lock.lock();
try {
while (state != 2) {
condition2.await();
}
System.out.println("Condition 2 is met");
} finally {
lock.unlock();
}
}
public void setState(int state) {
lock.lock();
try {
this.state = state;
if (state == 1) {
condition1.signalAll();
}
if (state == 2) {
condition2.signalAll();
}
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockTest lockTest = new ReentrantLockTest();
Thread thread1 = new Thread(() -> {
try {
lockTest.waitForCondition1();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread thread2 = new Thread(() -> {
try {
lockTest.waitForCondition2();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
thread2.start();
try {
//1 s后唤醒 thread1
Thread.sleep(1000);
lockTest.setState(1);
//2 s后唤醒thread2
Thread.sleep(1000);
lockTest.setState(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ReentrantLock 原理
从类图中可以看到 ReentrantrantLock 实现了 Lock 接口,其内部维护了一个同步器类 Sync,这个同步器类也是继承自 AQS,Sync是抽象的,有两个实现类,分别是非公平的与公平的
加锁流程
首先是构造方法,默认情况下是非公平锁实现。
public ReentrantLock() {
sync = new NonfairSync();
}
非公平锁的加锁
public void lock() {
sync.lock();
}
final void lock() {
// 尝试CAS方式将state从0改为1修改锁,如果修改成功了,就会将owner线程设置为当前线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
最开始的时候没有线程竞争锁,cas成功,将锁的拥有者线程设置为Thread-0。
当第二个线程过来竞争锁时,走else分支
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
第二个线程CAS失败,然后构造 Node双向链表组成的等待队列,Node节点的创建是懒惰的,Node 的 waitStatus 状态默认值为0,head指针指向的第一个 Node 称为 Dummy 哑元节点,不关联线程,主要作用是唤醒后继节点关联的线程去竞争锁。
然后Thread-1线程进入 acquireQueued 方法不停的自旋去尝试获取锁,经过两次自旋后如果还是获取不到锁就会park阻塞。
acquireQueued 方法首先判断当前节点是否头节点的后继节点,若是那么再次 tryAcquire 尝试获取锁,这时 state 仍为 1,获取失败进入 shouldParkAfterFailedAcquire 方法将前驱头结点的waitStatus 改为 -1(-1表明该节点有责任唤醒后继节点去竞争锁),返回false,然后再次自旋尝试获取锁,如果还是失败,进入parkAndCheckInterrupt() 调用LockSupport.park方法阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//若当前节点是头结点的后继节点,那么会尝试去获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//第一次执行到这会将将头结点的waitStatus改为-1
//第二次执行到这会park阻塞住
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
调用LockSupport.park方法将thread-1阻塞(灰色表示被阻塞住)。
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
return Thread.interrupted();
}
后续再有多个线程过来竞争锁,最终会变成这样子,除了最后一个节点,其他节点的waitStatus都为-1,-1表明当前节点有责任唤醒后继节点去竞争锁
解锁流程
使用完锁后,Thread-0调用unlock释放锁,如果释放成功,设置state = 0,exclusiveOwnerThread 为 null。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//若释放锁成功
if (tryRelease(arg)) {
Node h = head;
//判断队列不为空,且waitStatus=-1则唤醒后继节点去竞争锁
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//若为0,则释放锁,将拥有者线程置为null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
此时等待队列不为 null,并且 head 的 waitStatus = -1,那么找到头结点的后继节点,进入 unparkSuccessor 方法,unpark 恢复其节点关联的线程运行,也就是Thread-1线程,此时Thread-1 重新进入acquireQueued方法尝试去获取锁,由于state=0,Thread-1获取锁成功,通过cas方式将state改为1,并将锁的拥有者设置为当前线程Thread-1,然后head 指向Thread-1 所在的 Node,将 Node 设置为null, 原本的头节点从链表断开,从而被垃圾回收。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//status为-1,尝试cas修改0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒阻塞的线程
LockSupport.unpark(s.thread);
}
由于默认情况下是非公平锁实现,那么在释放锁的过程中可能被其他的线程抢占,比如下面这种情况
Thread-4比Thread-1抢先判断state为0,然后立马通过cas将0改为1,再将exclusiveOwnerThread设为Thread-4,那么Thread-1 会再次进入 acquireQueued 方法重新调用park方法阻塞。
可重入锁原理
默认情况下为非公平锁实现,当尝试获取锁时,会调用nonfairTryAcquire方法,首先,它会检查state的状态,如果state为0,表示还没上锁,因此可以直接进行加锁,若state不为0,则表明已被占用,此时,方法会进一步判断当前线程是否与exclusiveOwnerThread中的线程相同,若相同,则表明发生了锁的重入,将state++。在释放锁的过程中,会调用tryRelease方法,该方法执行后,state的值会减1,当state减少至0时,锁会被释放,同时将exclusiveOwnerThread设置为null。
static final class NonfairSync extends Sync {
// ...
// 尝试加锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 已经获得了锁, 还是当前线程去获取锁, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// 尝试释放锁
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// state 减为 0, 锁释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可打断原理
不可打断模式
在不可打断模式下,即使线程被打断,它仍会停留在AQS等待队列中,直到成功获取锁后才会知道被其他线程中断了,也就是打断标记被设置为true。
在默认情况下,线程获取锁的过程是不可打断的,当线程无法立即获得锁时,它会不断地自旋尝试获取,如果3次自旋后仍然失败,则会调用park进入阻塞状态,在阻塞状态下,线程可以被其他线程打断唤醒,如果线程被中断,中断标记会被设置为true,然后再次进入循环调用park进入阻塞状态。
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// ...
private final boolean parkAndCheckInterrupt() {
// 若打断标记是 true, 则park会失效
LockSupport.park(this);
// interrupted判断是否被打断,然后会清除打断标记,清除了打断标记后线程还是能park住
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//若node是头结点的后继节点
if (p == head && tryAcquire(arg)) {
//head指针指向node
setHead(node);
p.next = null;
failed = false;
// 获取到锁后才返回打断状态
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// park过程中如果被interrupt唤醒, 打断状态置为true
// 只是设置为 true,会再次进入循环,如果获得不了锁,再次调用park阻塞
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 如果打断状态为 true
selfInterrupt();
}
}
static void selfInterrupt() {
// 再产生一次中断
Thread.currentThread().interrupt();
}
}
可打断模式
在可打断模式下,当线程通过调用 acquireInterruptibly() 方法尝试获取锁的过程中可以被打断,与不可打断模式区别在于,当线程被唤醒,会直接抛出InterruptedException异常,该异常会中断线程循环等待锁。
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 可打断的尝试获取锁
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt唤醒会在这抛出异常, 结束循环等待
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
公平锁原理
非公平锁通过nonfairTryAcquire去获取锁,首先判断state若为0说明没有线程拿锁,当前线程会直接去抢锁,不会做任何判断;而公平锁会多个判断条件,当 state为0时即还没有线程占这个锁时,先调用hasQueuedPredecessors() 方法判断队列中是否有前驱节点,没有才会去竞争锁。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// 尝试获取锁
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 检查等待队列是否有前驱节点,没有才尝试加锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 判断队列是否有前驱节点
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node节点
return h != t &&
(
// (s = h.next) == null 表示队列中没有第二个节点
(s = h.next) == null ||
// 第二个节点的线程不是当前线程
s.thread != Thread.currentThread()
);
}
}
条件变量原理
await 流程
执行流程:创建与当前线程关联的Node节点放入condition的等待队列中,Node的waitStatus为-2,表明该节点处于等待状态,再调用fullyRelease清空锁,将state设置为0,拥有者线程设置为null,唤醒头结点的后继节点线程去竞争锁,最后调用park进行阻塞。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将线程加入到条件变量的等待队列中,并且将新的node状态设置为 -2
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
进入fullyRelease 方法,释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// release方法默认一次 -1
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
然后 unpark 等待队列中的下一个节点去竞争锁,此时Thread-1 竞争成功
最后Thread-0 调用 park 进入阻塞状态。
signal 流程
假如 Thread-1 要来唤醒 Thread-0
public final void signal() {
// 首先检查当前变量是否持有独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 唤醒第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
进入doSignal 流程,得到等待队列中第一个 Node节点。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//将第一个节点转移至阻塞队列的尾部
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
进入 transferForSignal 方法,将第一个Node节点加入阻塞队列尾部,再将 Thread-0 的 waitStatus 置为 0,Thread-3 的waitStatus 置为 -1。
final boolean transferForSignal(Node node) {
// cas将状态改为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将节点加入队列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
等待Thread-1线程释放锁后,就会找到头节点的后继节点,调用unlock唤醒该节点所关联的线程去竞争锁。