具体如何流转,何时进入队列,状态何时变化,何时被唤醒等流程待更新中。。。
AQS重要属性
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 内部类node
static final class Node {
// 等待状态
volatile int waitStatus;
// 前结点
volatile Node prev;
// 后结点
volatile Node next;
// 当前线程
volatile Thread thread;
}
// 头结点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 状态
private volatile int state;
}
1、创建
// 构造方法创建对象,空或false创建非公平锁,true创建公平锁
ReentrantLock lock = new ReentrantLock();
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2、lock();方法
// 1.加锁
lock.lock();
// 2.内部调用的是sync.lock();方法
public void lock() {
sync.lock();
}
// 3.sync为ReentrantLock内部类,其继承AQS
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {}
// 4.再往下看此处lock方法为NonfairSync非公平锁的实现
final void lock() {
// 4.1 compareAndSetState比较并设置状态(AQS中重要的属性之一 private volatile int state; 代表锁的状态)期望若为0,则更新成1,底层调用的是unsafe.compareAndSwapInt()内部方法,加锁成功返回true
if (compareAndSetState(0, 1))
// 如果加锁成功,则将当前线程设置为其拥有线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 否则调用acquire方法,看下面 5. 分析
acquire(1);
}
// 5.acquire方法(3个重要方法)--arg==1
public final void acquire(int arg) {
// 5.1 tryAcquire(arg)
// 5.2 addWaiter(Node.EXCLUSIVE)
// 5.3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 5.1 tryAcquire(arg) --arg==1
// 是AQS的模板方法,强制子类去实现,否则直接报错
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 这个是非公平锁的实现,如果当前没有线程或与之前线程是同一线程,则true,否则false(说明已存在线程,需要等待)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// AQS中重要的状态
int c = getState();
// 若状态为0则表示无线程抢占
if (c == 0) {
// 则设置当前线程并更改状态,和 4.1一致,返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 否则代表有线程了,判断和之前的线程是否是同一个,此处说明ReentrantLock为可重入锁
else if (current == getExclusiveOwnerThread()) {
// 同一线程则锁加1,返回true
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 5.2 addWaiter(Node.EXCLUSIVE) -- EXCLUSIVE = null
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// node头=尾
Node pred = tail;
// 若头不为空,则当前node的前结点设为头结点
if (pred != null) {
node.prev = pred;
// 5.2.1 compareAndSetTail 设置尾结点,头结点的后指针指向当前结点(即双向结点设置完毕),底层调用native的unsafe方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若头结点为空,则 5.2.2 enq(node);
enq(node);
return node;
}
// 5.2.2 enq(node); --若头结点为空
private Node enq(final Node node) {
// 自旋
for (;;) {
Node t = tail;
// 若头结点为空,则必须先初始化头结点
if (t == null) { // Must initialize
// unsafe方法设置头结点--new Node()初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 头结点不为空,则当前结点的前指针指向头结点
node.prev = t;
// 设置尾结点,头结点的后指针指向当前结点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 5.3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) --arg==1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 获取当前结点的前结点
final Node p = node.predecessor();
// 若前结点为头结点,继续尝试获取锁
if (p == head && tryAcquire(arg)) {
// 抢占锁成功后则队列中不需要了
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 否则尝试入队 5.3.1 shouldParkAfterFailedAcquire(p, node)
// 5.3.2 parkAndCheckInterrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 否则尝试入队 5.3.1 shouldParkAfterFailedAcquire(p, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头结点的等待状态 waitStatus重要属性
int ws = pred.waitStatus;
// -1,处于锁状态,等待唤醒
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 5.3.2 parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
// 将当前线程锁住
LockSupport.park(this);
return Thread.interrupted();
}
3、lock.unlock()
// 1.unlock()
lock.unlock();
// 2.调用的是 sync.release(1);
public void unlock() {
sync.release(1);
}
// 3.release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// unparkSuccessor(h)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
标签:node,Node,结点,return,AQS,int,ReentrantLock,----,arg
From: https://www.cnblogs.com/alpari-wang/p/17445855.html