首页 > 其他分享 >多线程相关

多线程相关

时间:2024-02-22 23:57:01浏览次数:30  
标签:Node 多线程 return int lock 线程 相关 null

一、多线程与锁

0、用户空间和内核空间
1、什么是进程:

进程是资源分配的基本单位(形象理解为程序进入内存运行的内容)

2、什么是线程:

程序执行的基本单位

3、CAS的低层实现是汇编通过 lock cmpxchg 指令实现CAS的原子性
4、对象在内存中的存储布局(刚new出来的时候)/(对象头和类型指针组成对象头):

对象头:markword:8个字节
类型指针(class pointer):4个字节
实例数据(instance data):0
对齐(padding)/补齐使对象长度被8整除
前面对象头是12字节,不能被8整除所以再加4个字节补齐,一共是16字节

5、synchonzed实现过程

1)Java代码:synchonzed
2)monitorenter moniterexit(moniter监视器)
3)执行过程中自动升级:new一个对象--》偏向锁--》轻量级锁--》重量级锁
4)lock comxchg

6、缓存行 cache line(大小多数为64字节):

1)将一块数据按照块的大小读进缓存中,称之为一个缓存行
2)缓存一致性协议(如 MESI Cache,即英特尔的):一颗CPU的缓存发生改变,要跟其他CPU做同步
四种状态:modified(修改了)、Exclusive(独占)、Shared(一起读)、Invalid(失效)

7、对象的创建:

1)申请内存空间(new)
2)调用方法(invokespecial)
3)建立关联(astore)

8、cpu具有乱序执行现象:

可需要内存屏障或锁总线来保障有序性,在Java代码中可用volatile关键字来保持线程可见性,禁止指令冲排序

9、volatile:保持线程可见性,禁止指令重排序

​ 如何解决指令重排序:
​ 1)源码加volatile标志
​ 2)字节码生成ACC_VOLATILE标志
​ 3)jvm层面增加内存屏障
​ 4)hotspot(jvm的内核版本)层级代码实现,即汇编语言调用
​ 5)CPU级别,原语支持总线锁,硬件信号

10、DCL单例(double check lock):

双重检查单例,由于cpu指令重排序,不加volatile会有风险

11、“强软弱虚”引用

​ 1)强引用(NormalReference):普通引用,不再指向的时候才被垃圾回收
​ 2)软引用(SoftReference):用于缓存的实现,当内存不足的时候才会被回收
​ 3)弱引用(WeakReference):每次调用GC的时候即便有指向也会被回收,具有一次性的作用
​ 4)虚引用:用于管理堆外内存

12、Threadlocal:

​ 内部map的entry继承了弱引用:key的指向是弱引用,防止内存泄露

13、Java中的jvm线程与内核线程一一对应
14、什么是线程切换:
15、ABA问题:

即一个值被一个线程拿来使用的过程中,另一个线程或者其他多个线程使用过,但是回填的时候值变回原样,就是说过程中被使用过,如何监控这个问题:解决这个问题可以增加版本号

16、CAS(比较并交换)修改值的时候的原子性问题
17、偏向锁:

打上第一个线程的标志不用竞争,当有第二线程来的时候就竞争升级为轻量级锁

18、重入锁:

多次锁同一个对象(syn(o){syn(o)})
1)synchonzed一定可重入

image

二、并发编程

(一)ThreadPoolExecutor源码分析

1、ThreadPoolExecutor应用方式

  • 构建ThreadPoolExecutor对象,传入指定参数
  • 执行Runnable任务时,可以直接调用execute方法
  • 执行Callable任务时,需要有返回结果,直接调用submit方法

2、ThreadPoolExecutor的核心参数

  • corePoolSize(核心线程数):
  • maximumPoolSize(最大线程数):在核心线程数之外再在线程池中创建新的临时线程处理多余的任务(含核心线程)
  • keeAilveTime(非核心线程的空闲时间即存活时间)
  • TimeUnit:keeAilveTime的单位
  • WorkQueue(阻塞/工作队列):在核心线程没空闲的时候进行排队
  • ThreadFactory(线程工厂):创建线程
  • RejectedExecutionHandler(拒绝策略):在核心线程、非核心线程和阻塞队列都没有空闲的时候,拒绝多余任务的策略

3、ThreadPoolExecutor的执行流程

业务线程(主线程)提交任务到线程池之后,任务的处理流程

image

4、ThreadPoolExecutor的状态

4.1 ThreadPoolExecutor的参数
//ctl表述了两个状态 
//1. 表示线程池当前的状态(高三位)
//2. 表示线程池当前的工作线程个数(低29位)
static final int COUNT_BITS = Integer.SIZE - 3;
//表述作线程的最大数量
private static final int CAPACITY = (1 << COUNT BITS) - 1;
0010000 00000000 00000000 00000000
0011111 11111111 11111111 11111111

//线程池的5种状态
//111
private static final int RUNNING    = -1 << COUNT BITS;
//000
private static final int SHUTDOWN   =  0 << COUNT BITS;
//001
private static final int STOP       =  1 << COUNT BITS;
//010
private static final int TIDYING    =  2 << COUNT BITS;
//011
private static final int TERMINATED =  3 << COUNT BITS;

//计算出当前线程的状态
private static int runStateof(int c) {return c & ~CAPACITY;}
//计算当前线程池的工作线程个数
private static int workerCountof(int c){return c & CAPACITY;}
4.2 线程池的状态变换

image

5、execute方法

第一点核心:通过execute方法,查看到线程池的整体执行流程,以及一些避免并发情况的判断

第二点核心:为什么线程池会添加一个空任务的非核心线程到线程池

public void execute(Runnable command) {
    //非空判断
    if (command == null)
        throw new NullPointerException();
    //获取ctl属性
    int c = ctl.get();
    //工作线程的个数 是否小于 核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //通过add方法,添加一个核心线程去执行command任务
        if (addWorker(command, true))
            //添加核心线程成功,返回true,直接returnjieshu
            return;
        //如果在并发情况下,添加核心线程失败的线程,需要重新获取一次ctl属性
        c = ctl.get();
    }
    //创建核心线程失败 
    //判断当前线程池状态是否是RUNNING
    //如果是RUNNING,执行offer方法将任务添加到工作队列
    if (isRunning(c) && workQueue.offer(command)) {
        //添加任务到工作队列成功
        //再次重新获取ctl
        int recheck = ctl.get();
        //判断线程池是否是RUNNING状态,如果不是RUNNING状态,需要将任务从工作队列移除
        if (! isRunning(recheck) && remove(command))
            //执行拒绝策略(线程池状态不正确,执行拒绝策略)
            reject(command);
        //判断工作线程是否为0
        else if (workerCountOf(recheck) == 0)
            //工作线程数为0,但是工作队列中有任务
            //添加一个空任务非核心线程,为了处理在工作队列中排队的任务,即为了避免上述状态
            addWorker(null, false);
    }
    //任务添加到工作队列失败,添加非核心线程去执行当前任务
    else if (!addWorker(command, false))
        //添加非核心线程失败,执行reject拒绝策略
        reject(command);
}

6、addWorker方法

添加工作线程,并启动工作线程。

private boolean addWorker(Runnable firstTask, boolean core) {
    //对线程池状态的判断,以及对工作线程数量的判断
    //外层for循环的标识
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        //判断线程池状态
        //如果线程池状态不是RUNNING,就再次做后续判断,查看当前任务是否可以不处理
        if (rs >= SHUTDOWN &&
            //线程池状态为SHUTDOWN,并且任务为空,并且工作队列不为空
            //如果同事满足了这三个要求,那就是要处理工作队列当前中任务
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            //只要不是RUNNING状态,不处理新任务
            //如果SHUTDOWN状态,并且满足了之前addWorker(null, false),并且工作队列有任务时,不能走当前位置
            return false;

		//判断线程池数量
        for (;;) {
            //基于ctl获取当前工作线程数量
            int wc = workerCountOf(c);
            //判断工作线程是否大于最大值
            if (wc >= CAPACITY ||
                //如果是核心线程,是否大于设置的corePoolSize;
                //如果是非核心线程,是否大于maximumPoolSize
                wc >= (core ? corePoolSize : maximumPoolSize))
                //当前工作线程以及达到最大值了
                return false;

            //以CAS的方式(避免并发情况),对工作线程数+1,如果成功,
            if (compareAndIncrementWorkerCount(c))
                //直接跳到外层for循环
                break retry;

            //重新获取ctl的值
            c = ctl.get();  // Re-read ctl
            //基于新获取的ctl拿到线程池状态,判断和之前的rs状态是否一致
            if (runStateOf(c) != rs)
                //说明并发操作导致线程池状态变化,需要重新判断
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //添加工作线程,并启动工作线程
    //工作线程是否启动了
    boolean workerStarted = false;
    //工作线程是否添加了
    boolean workerAdded = false;
    //工作线程
    Worker w = null;
    try {
        //new Worker 构建工作线程,将任务扔到Worker中
        w = new Worker(firstTask);
        //拿到了Worker中绑定的Thread线程
        final Thread t = w.thread;
        if (t != null) {
            //加锁...
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                //基于重新获取的ctl,拿到线程池的状态
                int rs = runStateOf(ctl.get());
                //如果满足线程池状态为RUNNING,则添加工作线程
                if (rs < SHUTDOWN ||
                    //如果线程池状态为SHUTDOWN,并且传入的任务为null
                    (rs == SHUTDOWN && firstTask == null)) {
                    //开始添加工作线程
                    //判断当前线程是否出狱run状态(健壮性判断)
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //将构建好的Worker对象添加到workers(hashSet)
                    workers.add(w);
                    //获取当前工作线程个数
                    int s = workers.size();
                    //如果当前工作线程数大于历史最大的工作线程数,就重新赋值largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //将工作线程添加成功的标识设置为true
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //添加工作线程成功,启动线程
                t.start();
                //将工作线程启动的标识设置为true
                workerStarted = true;
            }
        }
    } finally {
        //如果启动工作线程失败
        if (! workerStarted)
            //1.移除workers中的工作线程
            //2.将工作线程数-1
            //3.尝试将线程池状态变为TIDYING过渡状态
            addWorkerFailed(w);
    }
    return workerStarted;
}

//启动工作线程失败后,做的补救操作
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //判断之前创建工作线程是否成功
        if (w != null)
            //如果成功,就将workers中的当前工作线程移除
            workers.remove(w);
        //将工作线程数-1
        decrementWorkerCount();
        //尝试将线程池状态变为TIDYING过渡状态
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

7、Worker对象

private final class Worker
    extends AbstractQueuedSynchronizer //线程中断
    implements Runnable                //存储需要执行的任务
{
    //工作线程的Thread对象,初始化时候构建出来的
    final Thread thread;
    //需要执行的任务
    Runnable firstTask;

    volatile long completedTasks;

    Worker(Runnable firstTask) {
        //刚刚初始化的工作线程不允许被中断
        setState(-1); // inhibit interrupts until runWorker
        //第一次new的时候,会将任务赋值给firstTask
        this.firstTask = firstTask;
        //给Worker构建Thread对象
        this.thread = getThreadFactory().newThread(this);
    }

    //调用t.start(),执行当前的run方法
    public void run() {
        runWorker(this);
    }

    //此处中断线程不是立即让线程停止,只是将thread的中断标识设置为true
    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    //将state置为0,在runWorke中,为了标识当前线程允许被中断
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

8、runWorker方法

执行任务的流程,并且做了中断线程相关的lock操作

final void runWorker(Worker w) {
    //获取当前工作线程
    Thread wt = Thread.currentThread();
    //拿到Worker对象中封装的任务
    Runnable task = w.firstTask;
    //将Worker的firstTask归位
    w.firstTask = null;
    //将Worker的state归位为0,表示可以被中断
    w.unlock(); // allow interrupts
    //任务执行时,勾子函数中是否出现异常的标识
    boolean completedAbruptly = true;
    try {
        //获取任务的第一个方式:执行execute、submit时,传入的任务直接处理
        //获取任务的第二个方式:从工作队列中获取任务执行
        while (task != null || (task = getTask()) != null) {
            //加锁,在SHUTDOWN状态下,当前线程不允许被中断
            //并且Worker内部实现的锁不是可重入锁,因为在中断时,也需要对worker进行lock,不能获取就代表当前工作线程正在执行任务
            w.lock();
            //如果线程池状态变为了STOP状态,必须将当前线程中断
            //第一个判断:判断当前线程池状态是否是STOP
            //第二个判断:查看中断标记位,并归位,如果位false,说明不是STOP,如果变为true,需要再次查看是否是并发操作导致线程池为STOP状态
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                //查询当前线程中断标记是否为false,如果是false,就执行wt.interrupt()
                !wt.isInterrupted())
                //将中断标记设置为true
                wt.interrupt();

            try {
                //执行任务的勾子函数,前置增强(非动态代理)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务的勾子函数,后置增强(非动态代理)
                    afterExecute(task, thrown);
                }
            } finally {
                //将task置为null
                task = null;
                //执行成功任务个数+1
                w.completedTasks++;
                //将state标志位设置为0
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

9、getTask方法

如何从工作队列workQueue中获取到任务

//从工作队列中获取任务
private Runnable getTask() {
    //标识(非核心线程可以干掉)
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        //=================判断线程池状态=============================
        int c = ctl.get();
        //拿到线程池的状态
        int rs = runStateOf(c);

        // 如果进入if,需要干掉当前工作线程
        //线程池状态SHUTDOWN、STOP
        //如果线程池状态大于等于STOP,需要移除当前工作线程
        //如果线程池状态为SHUTDOWN,并且工作队列为空,需要移除当前工作线程
        if (rs >= SHUTDOWN、 && (rs >= STOP || workQueue.isEmpty())) {
            //移除当前工作线程,工作线程数-1
            decrementWorkerCount();
            //返回null,交给processWorkerExit移除当前工作线程
            return null;
        }

        //=================判断工作线程数量=============================
        //拿到工作线程数
        int wc = workerCountOf(c);

        // allowCoreThreadTimeOut:是否允许核心线程超时(一般都为false)
        // 工作线程是否大于核心线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (
            //工作线程数是否已经大于最大线程数
            //工作线程数大于核心线程数,并且当前线程已经超时
            //尝试干掉当前线程
            (wc > maximumPoolSize || (timed && timedOut))
            //工作线程数大于1,或者工作队列为空
            //如果工作队列为空,干掉当前工作线程
            //如果工作线程数大于1,干掉当前工作线程
            && (wc > 1 || workQueue.isEmpty())) {
            //基于CAS的方式移除当前线程,只有一个线程会CAS成功
            if (compareAndDecrementWorkerCount(c))
                //返回null,交给processWorkerExit移除当前工作线程
                return null;
            continue;
        }

        //=================从工作队列中获取任务=============================

        try {
            Runnable r = timed ?
                //阻塞一定时间从工作队列拿任务(可以理解为非核心线程走这个方法)
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            	//一直阻塞(可以理解为核心线程走这个方法)
            	workQueue.take();
            if (r != null)
                //如果拿到任务直接放回执行
                return r;
            //从队列获取任务时,超时了(达到了当前工作线程的最大生存时间)
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

10、processWorkerExit方法

//移除当前工作线程的操作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果执行当前方法的操作不是getTask中的操作,是直接因为异常引起的。(一半是勾子函数抛出异常)
    if (completedAbruptly) 
        //因为执行方式“不合法”(即因为勾子函数自定义方法异常造成的),手动扣减工作线程数
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //记录当前线程池一共处理了多少个任务
        completedTaskCount += w.completedTasks;
        //移除工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //尝试将线程池关闭(到过渡状态 - 销毁状态)
    tryTerminate();

    int c = ctl.get();
    //当前线程池状态,进入此处,说明是RUNNING、SHUTDOWN
    //(不是STOP状态)
    if (runStateLessThan(c, STOP)) {
        //如果是正常状态移除当前工作线程
        if (!completedAbruptly) {
            //核心线程数最小值,允许多少
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果工作队列中的任务不为空,设置工作线程的最小值为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //还有工作线程在线程池中
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //不正常的方式移除了当前的工作线程,再添加一个工作线程(非核心)
        //线程池工作队列不为空,并且没有工作线程,再添加一个工作线程
        addWorker(null, false);
    }
}

(二)ReentrantLock源码分析

1、ReentrantLock的介绍

Java中提供的锁:synchronized、lock锁

ReentrantLock是一个互斥锁,可以让多线程执行期间,只有一个线程在执行指定的一段代码

使用方式:

//前置知识:CAS、volatile、AQS
ReentrantLock lock = new ReentrantLock();
//加锁
lock.lock();
try {
    //执行业务
}finally {
    //释放锁
    lock.unlock();
}

2、ReentrantLock的lock方法

2.1 简单分析lock方法

再lock中,内部调用了sync.lock()方法,该方法有两个实现

  • FairSync:公平锁

    每一个线程会一个个排队尝试获取锁资源,会先查看是否有现场排队,如果有直接去排队,如果没有才会尝试去竞争一下锁资源。

  • NonfairSync:非公平锁

    每个线程都会再执行lock()方法时,先尝试获取锁资源,获取不到再排队。

如果需要使用公平锁:在new ReentrantLock时,传入参数true。

如果需要使用非公平锁:直接无参构造。

PS:更推荐非公平锁,非公平锁比公平锁效率更高。

公平锁直接调用acquire方法尝试获取锁,

而非公平锁会先基于CAS的方式尝试去竞争锁资源,如果获取不到,才会执行acquire方法尝试获取锁。

2.2 分析AQS

AQS:是AbstractQueuedSynchronizer类,AQS内部维护着一个双向链表的队列

image

2.3 lock方法源码
//非公平锁的lock方法
final void lock() {
    //以CAS的方式尝试将state从0改为1
    if (compareAndSetState(0, 1))
        //证明修改state成功,就代表获取锁资源成功
        //将当前线程设置到AQS中的exclusiveOwnerThread(AOS中,AQS的父类),代表当前线程拿着锁资源(与后面的可重入锁有关系)
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

//公平锁的lock方法
final void lock() {
    acquire(1);
}

3、 ReentrantLock的acquire方法

//公平锁和非公平锁都会调用acquire
public final void acquire(int arg) {
    //tryAcquire方法:分为两种实现,一种是公平锁,一种是非公平锁
    //公平锁操作:如果state为0,再看是否有线程排队,如果有就去排队。如果是锁重入的操作,直接获取锁。
    //非公平锁操作:如果state为0,直接尝试CAS方式修改。如果是锁重入操作,直接获取锁资源。
    if (!tryAcquire(arg) &&
        //addWaiter方法:在线程没有通过tryAcquire拿到锁资源时,需要将当前线程封装为Node对象,去AQS内部排队
        //acquireQueued方法:查看当前线程是否是排在队伍前面的,如果是就尝试获取锁资源。如果长时间没拿到锁,也需要将当前线程挂起
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

4、ReentrantLock的tryAcquire方法

  • tryAcquire方法是AQS提供的,内部没有任何实现,需要继承AQS的类去实现逻辑代码

  • tryAcquire在ReentrantLock中提供了两种实现:公平锁、非公平锁

非公平锁的实现逻辑:

final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取AQS的state值
    int c = getState();
    //如果state为0,代表当前没有线程占用锁资源
    if (c == 0) {
        //直接基于CAS的方式尝试修改state,将0改为1,如果成功代表拿到锁资源
        if (compareAndSetState(0, acquires)) {
            //将exclusiveOwnerThread属性设置为当前线程
            setExclusiveOwnerThread(current);
            //返回true
            return true;
        }
    }
    //说明state肯定不为0,代表当前lock被线程占用
    else if (current == getExclusiveOwnerThread()) {
        //锁重入操作
        //对state + 1 (acquires为传入的1)
        int nextc = c + acquires;
        //判断锁重入已经达到最大值(二进制计算)
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //将AQS的state设置好
        setState(nextc);
        return true;
    }
    return false;
}

公平锁的实现逻辑:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //没有线程占用锁资源
    if (c == 0) {
        //先查看有没有线程排队
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
}

公平锁和非公平锁的tryAcquire方法的区别是:当判断state为0之后

  • 公平锁会先查看是否有线程正在排队,如果有直接返回false,没有则执行CAS尝试获取锁资源。
  • 非公平锁不管有没有线程排队,直接以CAS的方式尝试获取锁资源。

5、ReentrantLock的addWaiter方法

在线程执行tryAcquire方法没有获取到锁资源之后,会返回false,再配置上if中的!操作,会执行&&后面的方法,而在acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的参数中执行了addWaiter,要将当前获取锁失败的线程封装为Node,排队到AQS的队列中。

//获取锁失败,封装Node,排队到AQS的队列中
private Node addWaiter(Node mode) {
    //将线程封装为Node对象
    Node node = new Node(Thread.currentThread(), mode);
    // 获取到tail节点,pred
    Node pred = tail;
    ////如果tail不为null
    if (pred != null) {
        //将当前节点的prev指向tail
        node.prev = pred;
        //为了避免并发问题,以CAS的方式将tail指向当前线程
        if (compareAndSetTail(pred, node)) {
            // 将之前的tail的next指向当前节点
            pred.next = node;
            return node;
        }
    }
    //如果在队列为空,或者CAS操作失败后,会执行enq方法,将当前的node排到队列的末尾
    enq(node);
    return node;
}

//enq方法,传入的node就是当前节点
private Node enq(final Node node) {
    for (;;) {
        //获取tail节点
        Node t = tail;
        if (t == null) { // Must initialize
            //如果队列为空,先初始化head节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //到此队列肯定不为空,才用之前的逻辑将node插入
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

整体逻辑为,先初始化Node节点,将当前线程传入,并且标识为互斥锁。

尝试将当前Node插入到AQS队列的末尾

  • 队列为空:执行enq,先初始化空Node作为头,然后再将当前Node插入,作为tail。

  • 队列不为空:直接将当前Node插入,作为tail

6、ReentrantLock的acquireQueued方法

首先查看当前的node受否排在队列的第一个位置(不算head),直接再次执行tryAcquire方法竞争锁资源

尝试将当前线程挂起,最终排在有效节点后,才会将当前线程挂起。

  • 如果当前Node为head的下一个节点,直接尝试获取锁资源
//队伍前面,竞争锁资源。非队伍前面,挂起线程
final boolean acquireQueued(final Node node, int arg) {
    //竞争锁资源是否失败--默认是失败true
    boolean failed = true;
    try {
        //线程中断标识
        boolean interrupted = false;
        for (;;) {
            //获取当前节点的上一个节点
            final Node p = node.predecessor();
            //如果当前节点的上一个节点是head,如果是head后面的节点,就执行tryAcquire方法竞争锁资源
            if (p == head && tryAcquire方法竞争锁资源(arg)) {
                //竞争锁资源成功,进入当前业务代码...
                //因为当前线程已经拿到锁资源,将当前线程Node设置为head,并且将Node中的prev和thread置为null
                setHead(node);
                //将之前的头节点的next置为null,让GC将之前的head回收
                p.next = null; // help GC
                //将获取锁失败的标志置为false,false表示拿到锁成功
                failed = false;
                //返回线程中断标识,默认情况为false
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                //将线程挂起
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

尝试将当前线程挂起,涉及到了判断以及lockSupport方法

//节点被取消状态,表示节点被取消了却仍然在队列中
static final int CANCELLED =  1;
//正常状态
static final int SIGNAL    = -1;
//可以转变为-1
static final int CONDITION = -2;
//与共享锁有关,可以转变为-1
static final int PROPAGATE = -3;

//0为初始化状态

//判断当前线程是否可以挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //拿到了上一个节点的状态
    int ws = pred.waitStatus;
    //如果ws为-1,直接返回true,当前节点可以挂起线程
    if (ws == Node.SIGNAL)
        return true;
    //如果ws>0,说明肯定是CANCELLED状态,绕过这个节点,找上一个节点的上一个
    if (ws > 0) {
        //循环,直到找到上一个节点小于等于0的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //可能为0,-2,-3,直接以CAS的方式将节点状态改为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//找到上一个节点状态是正常的后,调用当前方法将线程挂起
private final boolean parkAndCheckInterrupt() {
    //直接使用Unsade类的park方法挂起线程
    LockSupport.park(this);
    //中断线程
    return Thread.interrupted();
}

7、ReentrantLock的unlock方法

unlock释放锁不分公平和非公平,都是执行sync的release方法

释放锁的核心,是将state从大于0的数值更改为0(由于可能有锁重入情况,所以要将状态值state数值为0才算是释放锁成功),即为释放锁成功

并且unlock方法应该会涉及到将AQS队列中阻塞的线程进行唤醒,阻塞用的是park方法,唤醒是unpark方法

//每次只施放一次锁
public void unlock() {
    sync.release(1);
}

8、ReentrantLock的unlock和release方法

在释放锁时,只有state被减为0之后,才会去唤醒AQS队列中被挂起的线程

在唤醒挂起线程时,如果head的next状态不正确,会从后往前找到离head最近的节点进行唤醒

从后往前找的原因:addWaiter是先将prev指针赋值,最后才会将上一个节点的next指针赋值,为了避免丢失节点或者跳过节点,必须从后往前找

//释放锁操作
public final boolean release(int arg) {
    //先查看tryRelease方法
    if (tryRelease(arg)) {
        //释放锁资源成功
        Node h = head;
        //如果head不为null,并且当前head的状态不为0
        if (h != null && h.waitStatus != 0)
            //说明AQS队列中,有Node再排队,并且线程已经挂起
            //需要唤醒被挂起的Node
            unparkSuccessor(h);
        return true;
    }
    //说明一次没释放掉锁资源
    return false;
}

protected final boolean tryRelease(int releases) {
    //直接获取state,并且-releases,将state-1
    int c = getState() - releases;
    //如果释放锁的线程,不是占用锁的线程,直接抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    //声明
    boolean free = false;
    //判断state-1后是否为0
    if (c == 0) {
        //如果位0,表示锁资源释放了
        free = true;
        //将占用互斥锁的线程标识置为null,即成功释放锁
        setExclusiveOwnerThread(null);
    }
    //锁重入了,一次没释放掉,将c赋值给state,等待下次执行时,再判断
    setState(c);
    return free;
}

//唤醒AQS中被挂起的线程
private void unparkSuccessor(Node node) {
    //获取head的状态
    int ws = node.waitStatus;
    if (ws < 0)
        //将当前head的状态设置为0
        compareAndSetWaitStatus(node, ws, 0);
    //拿到next节点
    Node s = node.next;
    //如果下一个节点为null,或者状态为CANCEL(取消),需要找到离head(头)结点最近的有效Node
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从后往前找(从后往前找的原因:因为需要查看addWaiter的内容,即Node是先将上一个指针赋值的)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //找到最近的node后,直接唤醒
    if (s != null)
        //唤醒线程
        LockSupport.unpark(s.thread);
}

标签:Node,多线程,return,int,lock,线程,相关,null
From: https://www.cnblogs.com/luzriu/p/18028443

相关文章

  • jvm相关
    1、查看当前垃圾回收器指令:java-XX:+PrintCommandLineFlags-version2、STW:在执行垃圾回收线程的时候,停止所有任务线程3、CMS初始标记和重新标记需要STW4、三色标记算法黑白灰的标记,在并发标记阶段5、G1分区6、jvm调优命令jps:列出Java的进程jinfo[进程号]:列出该进......
  • 多线程系列(六) -等待和通知模型详解
    一、简介在之前的线程系列文章中,我们介绍了synchronized和volatile关键字,使用它能解决线程同步的问题,但是它们无法解决线程之间协调和通信的问题。举个简单的例子,比如线程A负责将int型变量i值累加操作到10000,然后通知线程B负责把结果打印出来。这个怎么实现呢?其中一个......
  • 推荐两个网络复用相关的 Go pkg: cmux smux
    推荐两个网络复用相关的Gopkg:cmux/smux只写一下如何使用,不对实现进行大量描述,两个库的代码都比较精炼,花一会看一下就行。cmux对端口进行复用,单端口可以建立不同协议的连接(本质都是TCP),如TCP/TLS/HTTP/gRPC或自定义协议smux对TCP连接复用,单TCP连接承载多条smuxstream......
  • 【数据结构】C语言实现二叉树的相关操作
    树定义树(Tree)是n(n>=0)个结点的有限集若n==0,称为空树若n>0,则它满足如下两个条件:有且仅有一个特定的称为根(Root)的结点其余结点可分为m(m>=0)个互不相交的有限集T1,T2,T3,...Tm,其中每一个集合本身又是一棵树,称为根的子树(SubTree)术语结点:数据元素结点的度:结点......
  • 最小生成树相关理解
    最小生成树边权的多重集合是唯一最小的!而且顺着排序之后字典序也最小。证明是容易的,利用克鲁斯卡尔的过程归纳即可。还有一种我独创的证法:考虑配对。如果有两种生成树,把两棵树拍到一起,然后B树的边(x,y)可以和A树上的路径(x,y)上的点匹配,根据霍尔婚姻,显然具有完美匹配,又......
  • 多线程系列(四) -volatile关键字使用详解
    一、简介在上篇文章中,我们介绍到在多线程环境下,如果编程不当,可能会出现程序运行结果混乱的问题。出现这个原因主要是,JMM中主内存和线程工作内存的数据不一致,以及多个线程执行时无序,共同导致的结果。同时也提到引入synchronized同步锁,可以保证线程同步,让多个线程依次排队执行......
  • static相关
    1.代码块publicclassStaticDemo{{System.out.println("匿名代码块");}static{System.out.println("静态代码块");}publicStaticDemo(){System.out.println("构造函数");}publicstaticv......
  • C++多线程 第八章 设计并发代码
    第八章设计并发代码数据划分工作在处理开始前在线程间划分数据方面,C++与MPI或OpenMP的方式较为相似.一个任务被分成一个并行任务集,工作的线程独立运行这些任务.并且在最后的化简步骤中合并这些结果.尽管这种方法是很有效的,但是只有在数据可以实现划分时,才可如此.考虑这......
  • 多线程系列(三) -synchronized 关键字使用详解
    一、简介在之前的线程系列文章中,我们介绍了线程创建的几种方式以及常用的方法介绍。今天我们接着聊聊多线程线程安全的问题,以及解决办法。实际上,在多线程环境中,难免会出现多个线程对一个对象的实例变量进行同时访问和操作,如果编程处理不当,会产生脏读现象。二、线程安全问题介......
  • css样式相关代码记录
    element样式穿透:::v-deepposition属性值有static、relative、absolute、fixed、sticky。static:该关键字指定元素使用正常的布局行为,即元素在文档常规流中当前的布局位置。relative:该关键字下,元素先放置在未添加定位时的位置,再在不改变页面布局的前提下调整元素位置(因此会......