一、多线程与锁
0、用户空间和内核空间
1、什么是进程:
进程是资源分配的基本单位(形象理解为程序进入内存运行的内容)
2、什么是线程:
程序执行的基本单位
3、CAS的低层实现是汇编通过 lock cmpxchg 指令实现CAS的原子性
4、对象在内存中的存储布局(刚new出来的时候)/(对象头和类型指针组成对象头):
对象头:markword:8个字节
类型指针(class pointer):4个字节
实例数据(instance data):0
对齐(padding)/补齐使对象长度被8整除
前面对象头是12字节,不能被8整除所以再加4个字节补齐,一共是16字节
5、synchonzed实现过程
1)Java代码:synchonzed
2)monitorenter moniterexit(moniter监视器)
3)执行过程中自动升级:new一个对象--》偏向锁--》轻量级锁--》重量级锁
4)lock comxchg
6、缓存行 cache line(大小多数为64字节):
1)将一块数据按照块的大小读进缓存中,称之为一个缓存行
2)缓存一致性协议(如 MESI Cache,即英特尔的):一颗CPU的缓存发生改变,要跟其他CPU做同步
四种状态:modified(修改了)、Exclusive(独占)、Shared(一起读)、Invalid(失效)
7、对象的创建:
1)申请内存空间(new)
2)调用方法(invokespecial)
3)建立关联(astore)
8、cpu具有乱序执行现象:
可需要内存屏障或锁总线来保障有序性,在Java代码中可用volatile关键字来保持线程可见性,禁止指令冲排序
9、volatile:保持线程可见性,禁止指令重排序
如何解决指令重排序:
1)源码加volatile标志
2)字节码生成ACC_VOLATILE标志
3)jvm层面增加内存屏障
4)hotspot(jvm的内核版本)层级代码实现,即汇编语言调用
5)CPU级别,原语支持总线锁,硬件信号
10、DCL单例(double check lock):
双重检查单例,由于cpu指令重排序,不加volatile会有风险
11、“强软弱虚”引用
1)强引用(NormalReference):普通引用,不再指向的时候才被垃圾回收
2)软引用(SoftReference):用于缓存的实现,当内存不足的时候才会被回收
3)弱引用(WeakReference):每次调用GC的时候即便有指向也会被回收,具有一次性的作用
4)虚引用:用于管理堆外内存
12、Threadlocal:
内部map的entry继承了弱引用:key的指向是弱引用,防止内存泄露
13、Java中的jvm线程与内核线程一一对应
14、什么是线程切换:
15、ABA问题:
即一个值被一个线程拿来使用的过程中,另一个线程或者其他多个线程使用过,但是回填的时候值变回原样,就是说过程中被使用过,如何监控这个问题:解决这个问题可以增加版本号
16、CAS(比较并交换)修改值的时候的原子性问题
17、偏向锁:
打上第一个线程的标志不用竞争,当有第二线程来的时候就竞争升级为轻量级锁
18、重入锁:
多次锁同一个对象(syn(o){syn(o)})
1)synchonzed一定可重入
二、并发编程
(一)ThreadPoolExecutor源码分析
1、ThreadPoolExecutor应用方式
- 构建ThreadPoolExecutor对象,传入指定参数
- 执行Runnable任务时,可以直接调用execute方法
- 执行Callable任务时,需要有返回结果,直接调用submit方法
2、ThreadPoolExecutor的核心参数
- corePoolSize(核心线程数):
- maximumPoolSize(最大线程数):在核心线程数之外再在线程池中创建新的临时线程处理多余的任务(含核心线程)
- keeAilveTime(非核心线程的空闲时间即存活时间)
- TimeUnit:keeAilveTime的单位
- WorkQueue(阻塞/工作队列):在核心线程没空闲的时候进行排队
- ThreadFactory(线程工厂):创建线程
- RejectedExecutionHandler(拒绝策略):在核心线程、非核心线程和阻塞队列都没有空闲的时候,拒绝多余任务的策略
3、ThreadPoolExecutor的执行流程
业务线程(主线程)提交任务到线程池之后,任务的处理流程
4、ThreadPoolExecutor的状态
4.1 ThreadPoolExecutor的参数
//ctl表述了两个状态
//1. 表示线程池当前的状态(高三位)
//2. 表示线程池当前的工作线程个数(低29位)
static final int COUNT_BITS = Integer.SIZE - 3;
//表述作线程的最大数量
private static final int CAPACITY = (1 << COUNT BITS) - 1;
0010000 00000000 00000000 00000000
0011111 11111111 11111111 11111111
//线程池的5种状态
//111
private static final int RUNNING = -1 << COUNT BITS;
//000
private static final int SHUTDOWN = 0 << COUNT BITS;
//001
private static final int STOP = 1 << COUNT BITS;
//010
private static final int TIDYING = 2 << COUNT BITS;
//011
private static final int TERMINATED = 3 << COUNT BITS;
//计算出当前线程的状态
private static int runStateof(int c) {return c & ~CAPACITY;}
//计算当前线程池的工作线程个数
private static int workerCountof(int c){return c & CAPACITY;}
4.2 线程池的状态变换
5、execute方法
第一点核心:通过execute方法,查看到线程池的整体执行流程,以及一些避免并发情况的判断
第二点核心:为什么线程池会添加一个空任务的非核心线程到线程池
public void execute(Runnable command) {
//非空判断
if (command == null)
throw new NullPointerException();
//获取ctl属性
int c = ctl.get();
//工作线程的个数 是否小于 核心线程数
if (workerCountOf(c) < corePoolSize) {
//通过add方法,添加一个核心线程去执行command任务
if (addWorker(command, true))
//添加核心线程成功,返回true,直接returnjieshu
return;
//如果在并发情况下,添加核心线程失败的线程,需要重新获取一次ctl属性
c = ctl.get();
}
//创建核心线程失败
//判断当前线程池状态是否是RUNNING
//如果是RUNNING,执行offer方法将任务添加到工作队列
if (isRunning(c) && workQueue.offer(command)) {
//添加任务到工作队列成功
//再次重新获取ctl
int recheck = ctl.get();
//判断线程池是否是RUNNING状态,如果不是RUNNING状态,需要将任务从工作队列移除
if (! isRunning(recheck) && remove(command))
//执行拒绝策略(线程池状态不正确,执行拒绝策略)
reject(command);
//判断工作线程是否为0
else if (workerCountOf(recheck) == 0)
//工作线程数为0,但是工作队列中有任务
//添加一个空任务非核心线程,为了处理在工作队列中排队的任务,即为了避免上述状态
addWorker(null, false);
}
//任务添加到工作队列失败,添加非核心线程去执行当前任务
else if (!addWorker(command, false))
//添加非核心线程失败,执行reject拒绝策略
reject(command);
}
6、addWorker方法
添加工作线程,并启动工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {
//对线程池状态的判断,以及对工作线程数量的判断
//外层for循环的标识
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//判断线程池状态
//如果线程池状态不是RUNNING,就再次做后续判断,查看当前任务是否可以不处理
if (rs >= SHUTDOWN &&
//线程池状态为SHUTDOWN,并且任务为空,并且工作队列不为空
//如果同事满足了这三个要求,那就是要处理工作队列当前中任务
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
//只要不是RUNNING状态,不处理新任务
//如果SHUTDOWN状态,并且满足了之前addWorker(null, false),并且工作队列有任务时,不能走当前位置
return false;
//判断线程池数量
for (;;) {
//基于ctl获取当前工作线程数量
int wc = workerCountOf(c);
//判断工作线程是否大于最大值
if (wc >= CAPACITY ||
//如果是核心线程,是否大于设置的corePoolSize;
//如果是非核心线程,是否大于maximumPoolSize
wc >= (core ? corePoolSize : maximumPoolSize))
//当前工作线程以及达到最大值了
return false;
//以CAS的方式(避免并发情况),对工作线程数+1,如果成功,
if (compareAndIncrementWorkerCount(c))
//直接跳到外层for循环
break retry;
//重新获取ctl的值
c = ctl.get(); // Re-read ctl
//基于新获取的ctl拿到线程池状态,判断和之前的rs状态是否一致
if (runStateOf(c) != rs)
//说明并发操作导致线程池状态变化,需要重新判断
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//添加工作线程,并启动工作线程
//工作线程是否启动了
boolean workerStarted = false;
//工作线程是否添加了
boolean workerAdded = false;
//工作线程
Worker w = null;
try {
//new Worker 构建工作线程,将任务扔到Worker中
w = new Worker(firstTask);
//拿到了Worker中绑定的Thread线程
final Thread t = w.thread;
if (t != null) {
//加锁...
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//基于重新获取的ctl,拿到线程池的状态
int rs = runStateOf(ctl.get());
//如果满足线程池状态为RUNNING,则添加工作线程
if (rs < SHUTDOWN ||
//如果线程池状态为SHUTDOWN,并且传入的任务为null
(rs == SHUTDOWN && firstTask == null)) {
//开始添加工作线程
//判断当前线程是否出狱run状态(健壮性判断)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将构建好的Worker对象添加到workers(hashSet)
workers.add(w);
//获取当前工作线程个数
int s = workers.size();
//如果当前工作线程数大于历史最大的工作线程数,就重新赋值largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
//将工作线程添加成功的标识设置为true
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
//添加工作线程成功,启动线程
t.start();
//将工作线程启动的标识设置为true
workerStarted = true;
}
}
} finally {
//如果启动工作线程失败
if (! workerStarted)
//1.移除workers中的工作线程
//2.将工作线程数-1
//3.尝试将线程池状态变为TIDYING过渡状态
addWorkerFailed(w);
}
return workerStarted;
}
//启动工作线程失败后,做的补救操作
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断之前创建工作线程是否成功
if (w != null)
//如果成功,就将workers中的当前工作线程移除
workers.remove(w);
//将工作线程数-1
decrementWorkerCount();
//尝试将线程池状态变为TIDYING过渡状态
tryTerminate();
} finally {
mainLock.unlock();
}
}
7、Worker对象
private final class Worker
extends AbstractQueuedSynchronizer //线程中断
implements Runnable //存储需要执行的任务
{
//工作线程的Thread对象,初始化时候构建出来的
final Thread thread;
//需要执行的任务
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
//刚刚初始化的工作线程不允许被中断
setState(-1); // inhibit interrupts until runWorker
//第一次new的时候,会将任务赋值给firstTask
this.firstTask = firstTask;
//给Worker构建Thread对象
this.thread = getThreadFactory().newThread(this);
}
//调用t.start(),执行当前的run方法
public void run() {
runWorker(this);
}
//此处中断线程不是立即让线程停止,只是将thread的中断标识设置为true
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//将state置为0,在runWorke中,为了标识当前线程允许被中断
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
8、runWorker方法
执行任务的流程,并且做了中断线程相关的lock操作
final void runWorker(Worker w) {
//获取当前工作线程
Thread wt = Thread.currentThread();
//拿到Worker对象中封装的任务
Runnable task = w.firstTask;
//将Worker的firstTask归位
w.firstTask = null;
//将Worker的state归位为0,表示可以被中断
w.unlock(); // allow interrupts
//任务执行时,勾子函数中是否出现异常的标识
boolean completedAbruptly = true;
try {
//获取任务的第一个方式:执行execute、submit时,传入的任务直接处理
//获取任务的第二个方式:从工作队列中获取任务执行
while (task != null || (task = getTask()) != null) {
//加锁,在SHUTDOWN状态下,当前线程不允许被中断
//并且Worker内部实现的锁不是可重入锁,因为在中断时,也需要对worker进行lock,不能获取就代表当前工作线程正在执行任务
w.lock();
//如果线程池状态变为了STOP状态,必须将当前线程中断
//第一个判断:判断当前线程池状态是否是STOP
//第二个判断:查看中断标记位,并归位,如果位false,说明不是STOP,如果变为true,需要再次查看是否是并发操作导致线程池为STOP状态
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
//查询当前线程中断标记是否为false,如果是false,就执行wt.interrupt()
!wt.isInterrupted())
//将中断标记设置为true
wt.interrupt();
try {
//执行任务的勾子函数,前置增强(非动态代理)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务的勾子函数,后置增强(非动态代理)
afterExecute(task, thrown);
}
} finally {
//将task置为null
task = null;
//执行成功任务个数+1
w.completedTasks++;
//将state标志位设置为0
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
9、getTask方法
如何从工作队列workQueue中获取到任务
//从工作队列中获取任务
private Runnable getTask() {
//标识(非核心线程可以干掉)
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//=================判断线程池状态=============================
int c = ctl.get();
//拿到线程池的状态
int rs = runStateOf(c);
// 如果进入if,需要干掉当前工作线程
//线程池状态SHUTDOWN、STOP
//如果线程池状态大于等于STOP,需要移除当前工作线程
//如果线程池状态为SHUTDOWN,并且工作队列为空,需要移除当前工作线程
if (rs >= SHUTDOWN、 && (rs >= STOP || workQueue.isEmpty())) {
//移除当前工作线程,工作线程数-1
decrementWorkerCount();
//返回null,交给processWorkerExit移除当前工作线程
return null;
}
//=================判断工作线程数量=============================
//拿到工作线程数
int wc = workerCountOf(c);
// allowCoreThreadTimeOut:是否允许核心线程超时(一般都为false)
// 工作线程是否大于核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (
//工作线程数是否已经大于最大线程数
//工作线程数大于核心线程数,并且当前线程已经超时
//尝试干掉当前线程
(wc > maximumPoolSize || (timed && timedOut))
//工作线程数大于1,或者工作队列为空
//如果工作队列为空,干掉当前工作线程
//如果工作线程数大于1,干掉当前工作线程
&& (wc > 1 || workQueue.isEmpty())) {
//基于CAS的方式移除当前线程,只有一个线程会CAS成功
if (compareAndDecrementWorkerCount(c))
//返回null,交给processWorkerExit移除当前工作线程
return null;
continue;
}
//=================从工作队列中获取任务=============================
try {
Runnable r = timed ?
//阻塞一定时间从工作队列拿任务(可以理解为非核心线程走这个方法)
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//一直阻塞(可以理解为核心线程走这个方法)
workQueue.take();
if (r != null)
//如果拿到任务直接放回执行
return r;
//从队列获取任务时,超时了(达到了当前工作线程的最大生存时间)
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
10、processWorkerExit方法
//移除当前工作线程的操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果执行当前方法的操作不是getTask中的操作,是直接因为异常引起的。(一半是勾子函数抛出异常)
if (completedAbruptly)
//因为执行方式“不合法”(即因为勾子函数自定义方法异常造成的),手动扣减工作线程数
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//记录当前线程池一共处理了多少个任务
completedTaskCount += w.completedTasks;
//移除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试将线程池关闭(到过渡状态 - 销毁状态)
tryTerminate();
int c = ctl.get();
//当前线程池状态,进入此处,说明是RUNNING、SHUTDOWN
//(不是STOP状态)
if (runStateLessThan(c, STOP)) {
//如果是正常状态移除当前工作线程
if (!completedAbruptly) {
//核心线程数最小值,允许多少
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果工作队列中的任务不为空,设置工作线程的最小值为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//还有工作线程在线程池中
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//不正常的方式移除了当前的工作线程,再添加一个工作线程(非核心)
//线程池工作队列不为空,并且没有工作线程,再添加一个工作线程
addWorker(null, false);
}
}
(二)ReentrantLock源码分析
1、ReentrantLock的介绍
Java中提供的锁:synchronized、lock锁
ReentrantLock是一个互斥锁,可以让多线程执行期间,只有一个线程在执行指定的一段代码
使用方式:
//前置知识:CAS、volatile、AQS
ReentrantLock lock = new ReentrantLock();
//加锁
lock.lock();
try {
//执行业务
}finally {
//释放锁
lock.unlock();
}
2、ReentrantLock的lock方法
2.1 简单分析lock方法
再lock中,内部调用了sync.lock()方法,该方法有两个实现
-
FairSync:公平锁
每一个线程会一个个排队尝试获取锁资源,会先查看是否有现场排队,如果有直接去排队,如果没有才会尝试去竞争一下锁资源。
-
NonfairSync:非公平锁
每个线程都会再执行lock()方法时,先尝试获取锁资源,获取不到再排队。
如果需要使用公平锁:在new ReentrantLock时,传入参数true。
如果需要使用非公平锁:直接无参构造。
PS:更推荐非公平锁,非公平锁比公平锁效率更高。
公平锁直接调用acquire方法尝试获取锁,
而非公平锁会先基于CAS的方式尝试去竞争锁资源,如果获取不到,才会执行acquire方法尝试获取锁。
2.2 分析AQS
AQS:是AbstractQueuedSynchronizer类,AQS内部维护着一个双向链表的队列
2.3 lock方法源码
//非公平锁的lock方法
final void lock() {
//以CAS的方式尝试将state从0改为1
if (compareAndSetState(0, 1))
//证明修改state成功,就代表获取锁资源成功
//将当前线程设置到AQS中的exclusiveOwnerThread(AOS中,AQS的父类),代表当前线程拿着锁资源(与后面的可重入锁有关系)
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//公平锁的lock方法
final void lock() {
acquire(1);
}
3、 ReentrantLock的acquire方法
//公平锁和非公平锁都会调用acquire
public final void acquire(int arg) {
//tryAcquire方法:分为两种实现,一种是公平锁,一种是非公平锁
//公平锁操作:如果state为0,再看是否有线程排队,如果有就去排队。如果是锁重入的操作,直接获取锁。
//非公平锁操作:如果state为0,直接尝试CAS方式修改。如果是锁重入操作,直接获取锁资源。
if (!tryAcquire(arg) &&
//addWaiter方法:在线程没有通过tryAcquire拿到锁资源时,需要将当前线程封装为Node对象,去AQS内部排队
//acquireQueued方法:查看当前线程是否是排在队伍前面的,如果是就尝试获取锁资源。如果长时间没拿到锁,也需要将当前线程挂起
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
4、ReentrantLock的tryAcquire方法
-
tryAcquire方法是AQS提供的,内部没有任何实现,需要继承AQS的类去实现逻辑代码
-
tryAcquire在ReentrantLock中提供了两种实现:公平锁、非公平锁
非公平锁的实现逻辑:
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取AQS的state值
int c = getState();
//如果state为0,代表当前没有线程占用锁资源
if (c == 0) {
//直接基于CAS的方式尝试修改state,将0改为1,如果成功代表拿到锁资源
if (compareAndSetState(0, acquires)) {
//将exclusiveOwnerThread属性设置为当前线程
setExclusiveOwnerThread(current);
//返回true
return true;
}
}
//说明state肯定不为0,代表当前lock被线程占用
else if (current == getExclusiveOwnerThread()) {
//锁重入操作
//对state + 1 (acquires为传入的1)
int nextc = c + acquires;
//判断锁重入已经达到最大值(二进制计算)
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//将AQS的state设置好
setState(nextc);
return true;
}
return false;
}
公平锁的实现逻辑:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//没有线程占用锁资源
if (c == 0) {
//先查看有没有线程排队
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁和非公平锁的tryAcquire方法的区别是:当判断state为0之后
- 公平锁会先查看是否有线程正在排队,如果有直接返回false,没有则执行CAS尝试获取锁资源。
- 非公平锁不管有没有线程排队,直接以CAS的方式尝试获取锁资源。
5、ReentrantLock的addWaiter方法
在线程执行tryAcquire方法没有获取到锁资源之后,会返回false,再配置上if中的!操作,会执行&&后面的方法,而在acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的参数中执行了addWaiter,要将当前获取锁失败的线程封装为Node,排队到AQS的队列中。
//获取锁失败,封装Node,排队到AQS的队列中
private Node addWaiter(Node mode) {
//将线程封装为Node对象
Node node = new Node(Thread.currentThread(), mode);
// 获取到tail节点,pred
Node pred = tail;
////如果tail不为null
if (pred != null) {
//将当前节点的prev指向tail
node.prev = pred;
//为了避免并发问题,以CAS的方式将tail指向当前线程
if (compareAndSetTail(pred, node)) {
// 将之前的tail的next指向当前节点
pred.next = node;
return node;
}
}
//如果在队列为空,或者CAS操作失败后,会执行enq方法,将当前的node排到队列的末尾
enq(node);
return node;
}
//enq方法,传入的node就是当前节点
private Node enq(final Node node) {
for (;;) {
//获取tail节点
Node t = tail;
if (t == null) { // Must initialize
//如果队列为空,先初始化head节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//到此队列肯定不为空,才用之前的逻辑将node插入
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
整体逻辑为,先初始化Node节点,将当前线程传入,并且标识为互斥锁。
尝试将当前Node插入到AQS队列的末尾
-
队列为空:执行enq,先初始化空Node作为头,然后再将当前Node插入,作为tail。
-
队列不为空:直接将当前Node插入,作为tail
6、ReentrantLock的acquireQueued方法
首先查看当前的node受否排在队列的第一个位置(不算head),直接再次执行tryAcquire方法竞争锁资源
尝试将当前线程挂起,最终排在有效节点后,才会将当前线程挂起。
- 如果当前Node为head的下一个节点,直接尝试获取锁资源
//队伍前面,竞争锁资源。非队伍前面,挂起线程
final boolean acquireQueued(final Node node, int arg) {
//竞争锁资源是否失败--默认是失败true
boolean failed = true;
try {
//线程中断标识
boolean interrupted = false;
for (;;) {
//获取当前节点的上一个节点
final Node p = node.predecessor();
//如果当前节点的上一个节点是head,如果是head后面的节点,就执行tryAcquire方法竞争锁资源
if (p == head && tryAcquire方法竞争锁资源(arg)) {
//竞争锁资源成功,进入当前业务代码...
//因为当前线程已经拿到锁资源,将当前线程Node设置为head,并且将Node中的prev和thread置为null
setHead(node);
//将之前的头节点的next置为null,让GC将之前的head回收
p.next = null; // help GC
//将获取锁失败的标志置为false,false表示拿到锁成功
failed = false;
//返回线程中断标识,默认情况为false
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
//将线程挂起
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
尝试将当前线程挂起,涉及到了判断以及lockSupport方法
//节点被取消状态,表示节点被取消了却仍然在队列中
static final int CANCELLED = 1;
//正常状态
static final int SIGNAL = -1;
//可以转变为-1
static final int CONDITION = -2;
//与共享锁有关,可以转变为-1
static final int PROPAGATE = -3;
//0为初始化状态
//判断当前线程是否可以挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//拿到了上一个节点的状态
int ws = pred.waitStatus;
//如果ws为-1,直接返回true,当前节点可以挂起线程
if (ws == Node.SIGNAL)
return true;
//如果ws>0,说明肯定是CANCELLED状态,绕过这个节点,找上一个节点的上一个
if (ws > 0) {
//循环,直到找到上一个节点小于等于0的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//可能为0,-2,-3,直接以CAS的方式将节点状态改为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//找到上一个节点状态是正常的后,调用当前方法将线程挂起
private final boolean parkAndCheckInterrupt() {
//直接使用Unsade类的park方法挂起线程
LockSupport.park(this);
//中断线程
return Thread.interrupted();
}
7、ReentrantLock的unlock方法
unlock释放锁不分公平和非公平,都是执行sync的release方法
释放锁的核心,是将state从大于0的数值更改为0(由于可能有锁重入情况,所以要将状态值state数值为0才算是释放锁成功),即为释放锁成功
并且unlock方法应该会涉及到将AQS队列中阻塞的线程进行唤醒,阻塞用的是park方法,唤醒是unpark方法
//每次只施放一次锁
public void unlock() {
sync.release(1);
}
8、ReentrantLock的unlock和release方法
在释放锁时,只有state被减为0之后,才会去唤醒AQS队列中被挂起的线程
在唤醒挂起线程时,如果head的next状态不正确,会从后往前找到离head最近的节点进行唤醒
从后往前找的原因:addWaiter是先将prev指针赋值,最后才会将上一个节点的next指针赋值,为了避免丢失节点或者跳过节点,必须从后往前找
//释放锁操作
public final boolean release(int arg) {
//先查看tryRelease方法
if (tryRelease(arg)) {
//释放锁资源成功
Node h = head;
//如果head不为null,并且当前head的状态不为0
if (h != null && h.waitStatus != 0)
//说明AQS队列中,有Node再排队,并且线程已经挂起
//需要唤醒被挂起的Node
unparkSuccessor(h);
return true;
}
//说明一次没释放掉锁资源
return false;
}
protected final boolean tryRelease(int releases) {
//直接获取state,并且-releases,将state-1
int c = getState() - releases;
//如果释放锁的线程,不是占用锁的线程,直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//声明
boolean free = false;
//判断state-1后是否为0
if (c == 0) {
//如果位0,表示锁资源释放了
free = true;
//将占用互斥锁的线程标识置为null,即成功释放锁
setExclusiveOwnerThread(null);
}
//锁重入了,一次没释放掉,将c赋值给state,等待下次执行时,再判断
setState(c);
return free;
}
//唤醒AQS中被挂起的线程
private void unparkSuccessor(Node node) {
//获取head的状态
int ws = node.waitStatus;
if (ws < 0)
//将当前head的状态设置为0
compareAndSetWaitStatus(node, ws, 0);
//拿到next节点
Node s = node.next;
//如果下一个节点为null,或者状态为CANCEL(取消),需要找到离head(头)结点最近的有效Node
if (s == null || s.waitStatus > 0) {
s = null;
//从后往前找(从后往前找的原因:因为需要查看addWaiter的内容,即Node是先将上一个指针赋值的)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//找到最近的node后,直接唤醒
if (s != null)
//唤醒线程
LockSupport.unpark(s.thread);
}
标签:Node,多线程,return,int,lock,线程,相关,null
From: https://www.cnblogs.com/luzriu/p/18028443