ReentrantLock是互斥锁,若存在读多写少同时保证线程安全的场景,ReentrantLock效率比较低,此时需要用到ReentrantReadWriteLock。
一、ReentrantReadWriteLock介绍
ReentrantReadWriteLock是可重入的读写锁,实现了ReadWriteLock接口,ReadWriteLock是读写锁的顶级接口,定义了readLock、writeLock方法。
// 读写锁接口 public interface ReadWriteLock { // 获取读锁 Lock readLock(); // 获取写锁 Lock writeLock(); }
读写锁,读写互斥、写写互斥、读读不互斥。
二、ReentrantReadWriteLock使用
1 import java.util.concurrent.locks.ReentrantReadWriteLock; 2 3 public class TestReentrantReadWriteLock { 4 5 public static void main(String[] args) { 6 // 可重入读写锁 7 ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 8 // 获取读锁 9 ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); 10 // 获取写锁 11 ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); 12 13 // 加写锁 14 writeLock.lock(); 15 try { 16 // 写锁处理 17 }finally { 18 // 释放写锁 19 writeLock.unlock(); 20 } 21 22 // 加读锁 23 readLock.lock(); 24 try { 25 // 读锁处理 26 }finally { 27 // 释放读锁 28 readLock.unlock(); 29 } 30 } 31 }
三、ReentrantReadWriteLock原理
ReentrantReadWriteLock中既有读锁又有写锁,读写锁的状态是如何而控制的呢?AQS的state锁状态是int类型,4字节32位,ReentrantReadWriteLock将高16位作为读锁的状态,低16位作为写锁的状态。
四、ReentrantReadWriteLock源码分析
1、ReentrantReadWriteLock构造函数
1 // 读锁 ReentrantReadWriteLock的内部类 2 private final ReentrantReadWriteLock.ReadLock readerLock; 3 // 写锁 ReentrantReadWriteLock的内部类 4 private final ReentrantReadWriteLock.WriteLock writerLock; 5 6 public ReentrantReadWriteLock(boolean fair) { 7 // 根据fair 选择 公平锁 或者 非公平锁 8 sync = fair ? new FairSync() : new NonfairSync(); 9 // 创建读锁 10 readerLock = new ReadLock(this); 11 // 创建写锁 12 writerLock = new WriteLock(this); 13 }
2、ReentrantReadWriteLock源码实现
ReentrantReadWriteLock持有内部类如下:
1、公平锁/非公平锁
ReentrantReadWriteLock的公平锁、非公平锁基于抽象内部类Sync实现的,两者主要区别在于是读写是否等待的实现不同。
1.1、writerShouldBlock
1 // 非公平锁,写,直接尝试获取写锁 2 final boolean writerShouldBlock() { 3 return false; 4 } 5 6 // 公平锁,写, 判断队列中是否阻塞的读/写线程等待唤醒抢锁资源 7 // 若有等待线程,返回true 8 final boolean writerShouldBlock() { 9 return hasQueuedPredecessors(); 10 }
等待队列中是否有其他读/写线程等待获取锁资源:
1 // 判断队列中是否已有等待获取锁资源的其他读/写线程 2 public final boolean hasQueuedPredecessors() { 3 // 等待队列尾节点 4 Node t = tail; 5 // 等待队列头节点 6 Node h = head; 7 Node s; 8 // 等待队列中有其他读/写线程在等待获取锁资源 9 return h != t && 10 ((s = h.next) == null || s.thread != Thread.currentThread()); 11 }
1.2、readerShouldBlock
1 // 非公平锁,读,判断队列头结点是否是写锁,若有写锁,当前读线程应排队 2 final boolean readerShouldBlock() { 3 return apparentlyFirstQueuedIsExclusive(); 4 } 5 6 // 公平锁,读,判断等待队列中是否有其他线程在排队 7 final boolean readerShouldBlock() { 8 return hasQueuedPredecessors(); 9 }
1.3、总结
公平锁与非公平锁的主要区别在于:公平锁无论是获取读锁还是写锁,优先判断等待队列中是否有等待获取锁资源的Node节点,若有,当前线程等待;非公平锁,直接尝试获取写锁,获取读锁时判断队列首个Node节点是否为写锁,若为写锁,当前读线程等待。
2、读锁/写锁
读锁、写锁构造函数如下:
1 // 读锁构造函数 2 protected ReadLock(ReentrantReadWriteLock lock) { 3 sync = lock.sync; 4 } 5 6 // 写锁构造函数 7 protected WriteLock(ReentrantReadWriteLock lock) { 8 sync = lock.sync; 9 }
2.1、写锁
为方便描述,下面用AQS表示抽象队列同步器AbstractQueuedSynchronizer。
2.1.1、写锁加锁
ReentrantReadWriteLock$WriteLock#lock() 核心代码:
public void lock() { sync.acquire(1); }
获取读写锁中的写锁,AbstractQueuedSynchronizer#acquire() 核心代码:
1 public final void acquire(int arg) { 2 // 获取锁资源 3 if (!tryAcquire(arg) && 4 // 获取锁资源失败,将当前线程节点加入等待队列,判断是否阻塞线程 5 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 6 selfInterrupt(); 7 }
AbstractQueuedSynchronizer的addWaiter、acquireQueued方法在ReentrantLock源码分析中已做详细介绍,此处不再赘述。下面来重点看看ReentrantReadWriteLock的tryAcquire方法:
1 protected final boolean tryAcquire(int acquires) { 2 // 获取当前线程 3 Thread current = Thread.currentThread(); 4 // 获取AQS的读写锁状态 state 5 int c = getState(); 6 // 获取 state 低16位 写锁状态 7 int w = exclusiveCount(c); 8 // state != 0, AQS读写锁状态不为0 9 if (c != 0) { 10 // 写锁状态为0 ,持有所资源线程不为当前线程,获取锁资源失败 11 if (w == 0 || current != getExclusiveOwnerThread()) 12 return false; 13 // 重入次数溢出,抛出异常 14 if (w + exclusiveCount(acquires) > MAX_COUNT) 15 throw new Error("Maximum lock count exceeded"); 16 // 设置的 state 状态, 重入次数 + 1 17 setState(c + acquires); 18 // 返回true,获取锁资源成功 19 return true; 20 } 21 // 写锁获取锁资源是否需要等待 22 // (公平锁 - 判断等待队列中是否已经有线程在等待,有线程等待当前线程加入等待队列;非公平锁 - 直接尝试抢锁) 23 if (writerShouldBlock() || 24 // CAS 抢锁资源 25 !compareAndSetState(c, c + acquires)) 26 // 抢锁资源失败 27 return false; 28 // 抢锁资源成功,设置线程属性为当前线程 29 setExclusiveOwnerThread(current); 30 return true; 31 }
读锁或写锁的状态值大于0,持有锁资源的线程不为当前线程,获取锁资源失败;
读锁或写锁的状态值溢出,抛出异常,获取锁资源失败;
获取锁资源成功,更新读锁或写锁的状态值,并设置线程属性为当前线程。
2.1.2、写锁释放锁
ReentrantReadWriteLock$WriteLock#unlock() 核心代码:
public void unlock() { sync.release(1); }
释放读写锁中的写锁,AbstractQueuedSynchronizer#release() 核心代码:
1 public final boolean release(int arg) { 2 // 释放锁资源 3 if (tryRelease(arg)) { 4 // 等待队列头节点不为空,并且等待队列中有挂起的节点待唤醒 5 Node h = head; 6 if (h != null && h.waitStatus != 0) 7 // 唤醒后继挂起线程 LockSupport.unpark 8 unparkSuccessor(h); 9 // 锁资源释放成功,返回true 10 return true; 11 } 12 // 锁资源释放失败,返回false 13 return false; 14 }
释放锁资源,ReentrantReadWriteLock#tryRelease() 核心代码:
1 protected final boolean tryRelease(int releases) { 2 // 当前线程是否持有锁资源判断,不满足,抛异常 3 if (!isHeldExclusively()) 4 throw new IllegalMonitorStateException(); 5 // 释放写锁的目标状态值,因为写锁状态是低16位,可以直接更新用state做操作 6 int nextc = getState() - releases; 7 // state低16位是否为0 8 boolean free = exclusiveCount(nextc) == 0; 9 // state低16位为0,表示当前线程彻底释放锁资源,线程属性设置null,当前线程不再持有锁 10 if (free) 11 setExclusiveOwnerThread(null); 12 // state低16位为0,更新写锁状态state,当前线程仍然持有锁资源 13 setState(nextc); 14 return free; 15 } 16 17 // 持有锁资源的线程是否为当前线程 18 protected final boolean isHeldExclusively() { 19 return getExclusiveOwnerThread() == Thread.currentThread(); 20 }
持有锁的线程不为当前线程,抛出异常;
写锁状态值修改,若修改后的写锁状态 - AQS的state低16位为0,当前线程彻底释放锁资源,不再持有锁,持有锁的线程属性设置为null,更新AQS的state值;若修改后的写锁状态- AQS的state低16位大于0,当前线程扔持有锁,调整当前线程的重入次数。2.2、读锁
2.2.1、读锁 加锁
ReentrantReadWriteLock$ReadLock#lock() 核心代码:
public void lock() { // 获取读锁 sync.acquireShared(1); }
读写锁中的读锁获取锁真正的方法,AbstractQueuedSynchronizer#acquireShared() 核心代码:
1 public final void acquireShared(int arg) { 2 // 尝试获取锁资源, 3 if (tryAcquireShared(arg) < 0) 4 // 未获取到锁资源,排队处理 5 doAcquireShared(arg); 6 }
1、tryAcquireShared()
尝试获取锁资源,ReentrantReadWriteLock#tryAcquireShared() 核心代码:
1 // 最后一个获取锁资源的读线程对象 2 private transient HoldCounter cachedHoldCounter; 3 // 线程持有的重入次数 继承自ThreadLocal 4 private transient ThreadLocalHoldCounter readHolds; 5 // 存储第一个获取读锁的线程对象 6 private transient Thread firstReader = null; 7 // 存储第一个获取读锁的线程的重入次数 8 private transient int firstReaderHoldCount; 9 10 protected final int tryAcquireShared(int unused) { 11 // 获取当前线程 12 Thread current = Thread.currentThread(); 13 // 获取AQS的 state 状态值 14 int c = getState(); 15 // 当前锁资源是否被其他写线程持有 16 if (exclusiveCount(c) != 0 && 17 getExclusiveOwnerThread() != current) 18 // 被其他写线程持有,返回-1,需排队 19 return -1; 20 // 获取当前锁资源是否被读线程持有 21 int r = sharedCount(c); 22 // 当前读线程抢锁资源,优先判断当前读线程是否需要等待(公平锁/非公平锁实现方式不同) 23 if (!readerShouldBlock() && 24 // 读锁是否已超出最大限制 25 r < MAX_COUNT && 26 // CAS 对 state 的高16 位 + 1 27 compareAndSetState(c, c + SHARED_UNIT)) { 28 // 当前读线程获取到了锁资源 29 if (r == 0) { 30 // 首个获取到锁资源的读线程存储在firstReader 线程对象中 31 firstReader = current; 32 // 读线程的重入次数设置为 1 33 firstReaderHoldCount = 1; 34 } else if (firstReader == current) { 35 // 当前线程为首个获取到所资源的读线程, firstReaderHoldCount + 1 36 firstReaderHoldCount++; 37 } else { 38 // 当前读线程第一个拿到锁资源的线程,先获取最后获取到锁资源的读线程对象cachedHoldCounter,持有重入次数、线程id 39 HoldCounter rh = cachedHoldCounter; 40 // rh == null : 第二个获取锁资源的读线程 或者 最后获取锁资源的读线程不是当前线程 41 if (rh == null || rh.tid != getThreadId(current)) 42 // 从ThreadLocal中 获取 HoldCounter 读线程对象 43 cachedHoldCounter = rh = readHolds.get(); 44 // 当前线程的重入次数为0,将当前线程的 HoldCounter 设置到线程变量ThreadLocal中 45 else if (rh.count == 0) 46 readHolds.set(rh); 47 // 重入次数 + 1 48 rh.count++; 49 } 50 // 获取锁资源成功,返回1 51 return 1; 52 } 53 // 未拿到锁,未返回-1 的处理 54 return fullTryAcquireShared(current); 55 }
ReentrantReadWriteLock#fullTryAcquireShared() 核心代码:
1 // cas获取锁失败、重入读未被tryAcquireShared处理的,执行此方法 2 final int fullTryAcquireShared(Thread current) { 3 4 HoldCounter rh = null; 5 for (;;) { 6 // 获取AQS的state 7 int c = getState(); 8 // 当前锁资源是否被其他写线程持有 9 if (exclusiveCount(c) != 0) { 10 if (getExclusiveOwnerThread() != current) 11 return -1; 12 // 当前读线程是否需要等待, 公平锁:等待队列中有排队的;等待队列中的head的next为写锁 13 } else if (readerShouldBlock()) { 14 // 当前线程为第一个获取读锁的线程,什么都不处理 15 if (firstReader == current) { 16 17 // 当前线程不为第一个获取读锁的线程 18 } else { 19 if (rh == null) { 20 // 获取最后一个获取锁资源的读线程 21 rh = cachedHoldCounter; 22 // 当前线程为第二个获取读锁的 或者 当前线程不是最后一个获取读锁的 23 if (rh == null || rh.tid != getThreadId(current)) { 24 // 获取去当前线程的 HoldCounter 对象,方便获取重入次数 25 rh = readHolds.get(); 26 // 当前线程重入次数为0,不为重入操作 27 if (rh.count == 0) 28 // 将我的TL中的值移除掉,不移除会造成内存泄漏。用于处理JDK1.5的内存泄漏问题 29 readHolds.remove(); 30 } 31 } 32 // 当前线程重入次数为0,需排队 33 if (rh.count == 0) 34 return -1; 35 } 36 } 37 // 超过读锁的最大值,抛出异常 38 if (sharedCount(c) == MAX_COUNT) 39 throw new Error("Maximum lock count exceeded"); 40 // CAS 竞争锁资源,逻辑同tryAcquireShared 41 if (compareAndSetState(c, c + SHARED_UNIT)) { 42 if (sharedCount(c) == 0) { 43 firstReader = current; 44 firstReaderHoldCount = 1; 45 } else if (firstReader == current) { 46 firstReaderHoldCount++; 47 } else { 48 if (rh == null) 49 rh = cachedHoldCounter; 50 if (rh == null || rh.tid != getThreadId(current)) 51 rh = readHolds.get(); 52 else if (rh.count == 0) 53 readHolds.set(rh); 54 rh.count++; 55 cachedHoldCounter = rh; 56 } 57 return 1; 58 } 59 } 60 }
1、当前锁资源是否被其他写线程持有,获取锁资源失败,返回 -1;
2、当前锁资源被读线程持有,判断当前读线程是都需要等待,是否超出读锁的最大值线程,若都满足,通过CAS尝试获取锁
->获取锁成功:
是否为首个获取读锁的线程,若为是,firstReader线程对象属性设置成当前线程,firstReaderHoldCount重入次数属性设置为1;
是否为首个获取读锁的线程重入,若为是,firstReaderHoldCount重入次数属性加1;
第二个获取锁资源的读线程 或者 最后获取锁资源的读线程不是当前线程,创建当前线程的线程变量存储 HoldCounter对象,HoldCounter对象包含重入次数与线程id,若当前线程的重入次数为0,将当前线程的HoldCounter 设置到线程变量ThreadLocal中,重入次数 + 1
->获取锁失败
再次尝试获取锁资源,解决内存泄露问题。
ThreadLocalHoldCounter继承自ThreadLocal,通过ThreadLocalHoldCounter#get()方法,调用initialValue()方法,完成HoldCounter对象的初始化,进而完成thread与HoldCounter的绑定,有关ThreadLocal的分析,在ThreadLocal原理 已做分析,此处不再赘述。
1 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { 2 // 初始化 3 public HoldCounter initialValue() { 4 // 创建HoldeCounter对象 5 return new HoldCounter(); 6 } 7 }
2、doAcquireShared()
获取读锁失败,加入等待队列等待,AbstractQueuedSynchronizer#doAcquireShared() 核心方法:
1 private void doAcquireShared(int arg) { 2 // 将读锁加入等待队列中等待 3 final Node node = addWaiter(Node.SHARED); 4 // 当如队里是否成功标识 5 boolean failed = true; 6 try { 7 boolean interrupted = false; 8 for (;;) { 9 // 获取当前读线程节点的前一节点 10 final Node p = node.predecessor(); 11 // 当前节点的前一节点为等待队列的头节点head 12 if (p == head) { 13 // 尝试获取锁资源 14 int r = tryAcquireShared(arg); 15 if (r >= 0) { 16 // 获取锁资源成功,唤醒等待队列中后面需要获取读锁的线程 17 setHeadAndPropagate(node, r); 18 // 为了垃圾回收,当前节点的下一节点设置为null 19 p.next = null; 20 if (interrupted) 21 selfInterrupt(); 22 failed = false; 23 return; 24 } 25 } 26 // 是否阻塞当前线程,需要保证当前节点前面的Node的状态为-1,才能执行后面操作 27 if (shouldParkAfterFailedAcquire(p, node) && 28 // 阻塞当前线程,LockSupport.park 29 parkAndCheckInterrupt()) 30 interrupted = true; 31 } 32 } finally { 33 if (failed) 34 cancelAcquire(node); 35 } 36 }
3、总结
读锁是共享锁,同一时间可以被多个线程持有读锁。每个获取到读锁的线程,都有自己的线程变量ThreadLocal存储锁重入的次数。首个获取读锁的线程,不需要线程变量ThreadLocal记录重入次数,而是用firstReaderHoldCount变量记录重入次数。最后一个拿到读锁的线程,用HolderCounter对象存储,可避免频繁的锁重入,从线程变量ThreadLocal中获取重入次数。
2.2.2、读锁释放锁
释放读锁 ReentrantReadWriteLock#unlock() 核心代码:
public void unlock() { // 释放读锁 sync.releaseShared(1); }
真正释放读锁的方法,AbstractQueuedSynchronizer#releaseShared() 核心代码:
1 public final boolean releaseShared(int arg) { 2 // 尝试释放读锁 3 if (tryReleaseShared(arg)) { 4 // 唤醒等待队列中的等待线程 5 doReleaseShared(); 6 // 无读线程持有读锁标识,返回true 7 return true; 8 } 9 // 无读线程持有读锁标识,返回false 10 return false; 11 }
1、tryReleaseShared()
尝试释放读锁,ReentrantReadWriteLock#tryReleaseShared() 核心代码:
1 protected final boolean tryReleaseShared(int unused) { 2 // 获取当前线程 3 Thread current = Thread.currentThread(); 4 // 当前线程为首个获取读锁的线程 5 if (firstReader == current) { 6 // 若首个获取读锁的线程重入次数为1 7 if (firstReaderHoldCount == 1) 8 // 将首个获取锁资源的读线程对象设置为null,方便GC 9 firstReader = null; 10 else 11 // 首个获取读锁线程的重入次数 -1 12 firstReaderHoldCount--; 13 } else { 14 // 获取最后一个获取读锁线程的重入次数、线程id 15 HoldCounter rh = cachedHoldCounter; 16 // 当前线程不是最后一个线程 17 if (rh == null || rh.tid != getThreadId(current)) 18 // 获取当前线程的线程变量中的HoldCounter 19 rh = readHolds.get(); 20 // 获取当前线程的可重入次数 21 int count = rh.count; 22 // 当前线程的可重入次数小于等于 1 23 if (count <= 1) { 24 // 删除当前线程的线程变量,避免内存泄露 25 readHolds.remove(); 26 // 当前线程的可重入次数小于等于 0,抛出异常 27 if (count <= 0) 28 throw unmatchedUnlockException(); 29 } 30 // 当前线程的可重入次数 -1 31 --rh.count; 32 } 33 for (;;) { 34 // 获取 AQS 的 state 35 int c = getState(); 36 // 获取读锁的状态值 37 int nextc = c - SHARED_UNIT; 38 // CAS 释放读锁 39 if (compareAndSetState(c, nextc)) 40 // 所有的读线程是否都释放读锁 41 return nextc == 0; 42 } 43 }
2、doReleaseShared()
1 // 唤醒等待队列中的 2 private void doReleaseShared() { 3 // 循环处理,防止有新的线程加入等待队列中 4 for (;;) { 5 // 获取等待队列头节点 6 Node h = head; 7 // 头节点不为空,等待队列中有多个线程在等待 8 if (h != null && h != tail) { 9 // 获取头节点的等待状态 10 int ws = h.waitStatus; 11 // 当前Node节点状态为被阻塞 12 if (ws == Node.SIGNAL) { 13 // CAS 操作,WaitStatus 由 -1 设置成 0,并唤醒阻塞的线程锁 14 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 15 // CAS 失败,循环更新 16 continue; 17 // 唤醒等待队列中挂起的线程 18 unparkSuccessor(h); 19 } 20 // 当前节点为已唤醒线程,WaitStatus 由 0 设置成 -3 21 // 等待的读线程的处理 22 else if (ws == 0 && 23 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 24 // CAS 失败,循环更新 25 continue; 26 } 27 // 没有新的线程加入等待队列,结束循环 28 if (h == head) 29 break; 30 } 31 }
3、总结
读锁的释放,根据AQS中state的高16位值判断,若为0,表示无读线程持有读锁,此时需要唤醒等待队列中的阻塞线程抢锁资源;若大于0,
-> 当前线程是首个获取读锁的线程
首个获取读锁的线程重入次数为1,则将firstReader设置为null,首个获取读锁线程彻底释放锁资源,若首个获取读锁的线程重入次数大于1,firstReaderHoldCount减 1
-> 当前线程不是最后一个获取读锁的线程
从当前线程的线程变量ThreadLocal中,获取HoldCounter,若可重入次数等于1,删除当前线程的线程变量,避免内存泄露,同时可重入次数减 1;若可重入次数小于等于0,抛出异常。
-> 当前线程不是最后一个获取读锁的线程
cachedHoldCounter属性作为HoldCounter,若可重入次数等于1,删除当前线程的线程变量,避免内存泄露,同时可重入次数减 1;若可重入次数小于等于0,抛出异常。
标签:分析,ReentrantReadWriteLock,获取,读锁,源码,线程,当前,rh From: https://www.cnblogs.com/RunningSnails/p/17380635.html