重入锁ReentrantLock
重人锁ReentrantLock,顾名思义,就是支持重进人的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。
ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进人,但是在调lock( )方法时,已经获取到锁的线程,能够再次调用1ock()方法获取锁而不被阻塞。
下面将重点分析ReentrantLock是如何实现重进入和公平性获取锁的特性。
1. 实现重进入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的现需要解决以下两个问题。
- 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程、如果是则再次成功获取。
- 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程才能获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
static final class NonfairSync extends Sync {
// 获取锁的逻辑
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 第一次获取锁
if (compareAndSetState(0, acquires)) {
// 设置当前线程为持有锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 不是第一次,判断当前线程是否是持有锁的线程
else if (current == getExclusiveOwnerThread()) {
// 重复进入,计数增加
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 全部释放完了
if (c == 0) {
free = true;
// 不在有线程持有锁
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2. 公平锁与非公平锁的区别
公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
// 公平锁
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// hasQueuedPredecessors()会判断是否有其他线程在等待着,如果有则返回true
// 当前线程不在通过compareAndSetState抢占锁,直接进入同步队列或重入锁的逻辑
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平性锁可能使线程“饥饿”,但是通常会带来更好的性能。
读写锁
读写锁允许在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于后的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。java并发包提供读写锁的示例是 RcentrantReadWriteLock,有如下几个特性:
- 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
- 该锁支持重进人,以读写线程为例:读线程在获取了读锁之后,能够再次获取锁。而写线程
在获取了写锁之后能够再次获取写锁,同时也可以获取读锁 - 遵循获取写锁、获取读镇再释放写锁的次序,写锁能够降级成为读锁
1. 读写锁的接口
public interface ReadWriteLock {
// 获取读锁
Lock readLock();
// 获取写锁
Lock writeLock();
}
2. 代码分析
接下来分析 ReentrantReadWriteLock 的实现,主要包括:读写状态的设计、写锁的获取与释放、读锁的获取与释放以及锁降级
2.1 读写状态的设计
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低 16位表示写。
2.2 写锁的释放与获取
写锁是一个支持重进人的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 写锁的个数
if (c != 0) {
// 存在读锁 或 线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 重入
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
2.3 读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁被其他线程获取,则进人等待状态。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
2.4 锁降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
Condition接口
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.0bject上),主要包括wait( )、wait(long timeout)、notify( ) 以及notifyA1I( )方法,这些方法与synchronized 同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与lock配合可以实现等待/通知模式。
1. 接口与示例
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时、需要提前获取到Condition对架关联的锁,Condition对象是由 Lock 对象创建出来的,换句话说,Condition是依赖Lock对象的。
Condition的使用方式比较简单,需要注意在调用方法前获取锁,使用方式如代。
/**
* @author strind
* @date 2024/10/20 11:55
* @description Condition使用方式
*/
public class ConditionUserDemo {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void ConditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
}finally {
lock.unlock();
}
}
public void ConditionWake() throws InterruptedException {
lock.lock();
try {
condition.signal();
}finally {
lock.unlock();
}
}
}
如示例所示,一般都会将Condition对象作为成员变量。当调用await( )方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal( )方法,通知当前线程后,当前线程才从 await( )方法返回,并且在返回前已经获取了锁。
public interface Condition {
// 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进人运行状态并从await()方法返回的情况,包括:
// 1. 其他线程调用该Conditon的signal()或signalAll()方法,而当前线程被选中唤醒
// 2. 其他线程(调用interrupt()方法)中断当前线程
// 如果当前等待线程从awai()方法返回,那么表明该线程已经获取了Condition对象所对应的锁
void await() throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition 相关联的锁
void signal();
// 唤醒所有
void signalAll();
}
2. 原理分析
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition 的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。
Condition的实现,主要包括:等待队列、等待和通知,下面提到的Condition 如果不加说明均指的是 ConditionObject。
2.1等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition对象上等待的线程,如果一个线程调用了Condition.await( )方法,那么该线程将会释放锁、构造成节点加入等待队列并进人等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node
一个Condition包含一个等待队列,Condition拥有首节点和尾节点。当前线程调用 Condition.await( )方法,将会以当前线程构造节点,并将节点从尾部加人等待队列。
2.2 等待
调用 Condition 的await( )方法(或者以await开头的方法),会使当前线程进人等待队列并释放锁,同时线程的状态变更为等待状态。当从await( )方法返回时,当前线程一定是获取了Condition相关的锁。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入等待队列
Node node = addConditionWaiter();
// 释放同步状态,即释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用该方法的线程是成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal( )方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException.
2.3 通知
调用 Condition 的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用该方法的前置条件是当前线程必须获取了锁,可以看到signal( )方法进行了isHeldExclusively( )检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSuppont 唤醒节点中的线程。
通过调用同步器的 enq (Node node)方法,等待队列中的头节点线程安全地移动到同步队列,当节点移动到同步队列后,当前线程再使用LockSupport 唤醒该节点的线程。
被唤醒后的线程,将从await( )方法中的 while循环中退出(isOnSyncQueue(Node node)方法返回tnue,节点已经在同步队列中),进而调用同步器的acquireQueued( )方法加人到或取同步状态的竞争中。
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await0方法返回,此时该线程已经成功地获取了锁。
Condition的signalAIl( ) 方法,相当于对等待队列中的每个节点均执行一次 signal( )方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
标签:队列,java,实现,current,获取,线程,原理,rh,Condition From: https://www.cnblogs.com/strind/p/18492736