首页 > 编程语言 >【AQS与ReentrantLock】剖析AQS与ReentrantLock底层源码,实现一个属于你的锁

【AQS与ReentrantLock】剖析AQS与ReentrantLock底层源码,实现一个属于你的锁

时间:2024-05-26 09:00:06浏览次数:27  
标签:return AQS int ReentrantLock 获取 源码 线程 final

AQS

引言

在多线程高并发场景下,我们为了保证操作的原子性,必要的需要对代码块进行线程同步。

我们知道ReentrantLock可以在Java中实现公平锁和非公平锁,它是类层面实现的锁,可以起到像sychronized关键字那样的同步功能,那么它是如何做到的呢?

为了深入了解ReentrantLock的实现,那么就必须要了解ReentrantLock的底层设计—AQS,这篇文章将会结合底层源码来理解AQS是什么。
在这里插入图片描述

AQS

AQS,其实就是AbstractQueuedSynchronizer(抽象队列式同步器)这个抽象类

位于java.util.concurrent.locks这个包下,如果看它的派生类,会发现各种同步场景的设计都使用到了AQS,如LeentrantLockSemaphoreThreadPoolExecutor等。

实际上AQS内部使用的是一种遵循FIFO的双向链表来存储线程对象的,这一点从源码中AQS的内部类Node即可看出。

**设想这样的场景:**如果有多个线程需要获取锁,那么我们对每个线程都要做妥善的处置,那么必然需要一个空间来存放这些线程以便我们管理这些线程,AQS就提供了这样一种数据结构来保存并发线程对象的引用。当我们需要唤醒线程,或是阻塞线程时都将通过这个FIFO的链式队列操作我们的线程对象。

static final class Node {

    //省略....
    
    volatile Node prev;

    volatile Node next;

    volatile Thread thread;
   
     //省略....
}

那么AQS又是如何实现同步的?

如果要实现同步,我们就需要一种状态机制来确定当前同步队列的状态,是处于锁释放状态,还是处于锁占用状态,锁占用状态下我们同步队列的各个线程状态又是如何的,锁释放状态我们的同步队列又该如何运作。

其实在AQS中,是通过一个原子的int值state来表示当前同步队列状态,具体的state值是多少,是交由具体的实现类去定义的。

由于AQS支持独占/共享模式,因此不同实现对state值的不同解释也不同,AQS只是做了抽象层面的定义。

例如

RenntrantLock实现中state值为1时表示有线程正在占用锁,state值代表0时表示锁被释放,这点我们来看源码。

final void lock() {
    acquire(1);//调用父类AQS的acquire方法
}
//父类AQS中的方法,如果尝试获取锁失败就阻塞当前线程,将调用子类的tryAcquire方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

我们关心的compareAndSetState这个语句,在底层使用的其实是Unsafe类。

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

没错可以看到AQS用了Unsafe类的方法,正是CAS操作(比较并替换),这是一个原子操作,调用了本地native方法。

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);//这个方法
}

Unsafe

这个类提供了一种底层的、不安全的操作内存的方式。在compareAndSetState方法中,Unsafe类的compareAndSwapInt方法被用来执行CAS(比较并交换)操作。

Unsafe这个类比较特殊,可以直接操作内存,一般来说高级语言是没法直接操作内存的,java也存在相关的安全检查和内存管理机制。但是通过Unsafe这个不安全的类就能完成这一操作。

compareAndSwapInt就是它提供的一种硬件级别的原子操作指令。

原子操即在多线程环境中单个线程要么一次性做完,要么全不做。

CAS操作分为下面三个步骤

  • 读取当前变量值
  • 比较这个值是否等于预期值
  • 如果是将这个变量更新为预期值

这样的三个操作就是原子操作

为什么要有这样的操作呢?

就好比现在多个人竞争一个房间的使用权,只有房间上挂上未使用(值=0)的牌子,才允许他人进去,这个人在进去前是不是就能挂上一个在使用的牌子(值=1),这样子其它人来的时候看到了就不会直接冲进去了,只能在外面等着。

只不过CAS操作保证了这个看牌子→判断→挂牌子的不被他人打断。

线程阻塞

回到刚才ReentrantLock对state值的改变,我们知道了ReentrantLock是通过AQS中提供的CAS方法(底层Unsafe操作本地方法)通过操作内存来改变state的值。

那么如果CAS操作失败怎么办,也就是已经有线程通过CAS获取到锁了(state = 1),这时候我们再来看AQS接下来的操作。

其实在刚才CAS操作之后,在AQS中还有一个相与的条件方acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),这个方法中有一个addWaiter方法,看似套娃其实就是把当前的线程构造为AQS同步队列中的一个Node并放入链式队列中了。

那么放进去就好了吗?当然不是,既然是同步阻塞队列,那么线程竞争锁失败肯定是会被阻塞的。我们来看看阻塞的具体方法。

其实就是这个方法 parkAndCheckInterrupt,我们进入这个方法看看。

final boolean acquireQueued(final Node node, int arg) {
    //...
    try {
        //...
        for (;;) {
    		//...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                    //...
        }
    } finally {
            //...
    }
}

可以看到这个方法使用到了LockSupport这个类,我们直接打入内部。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

现在我们打入LockSupport这个类的内部了,结果一看又是老朋友Unsafe提供的方法,那我们就不用往下看了,线程阻塞的方法也是Unsafe类提供的方法实现的。UNSAFE.park(false, 0L);

这里其实就是设置了线程无限期的等待,通过本地方法实现了线程的阻塞逻辑。

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

到此为止我们大概了解了ReentrantLock是如何利用AQS来实现的线程获取锁以及获取锁失败后的阻塞原理。

让我们来梳理一下逻辑:

  • 当线程进入AQS后,具体的锁实现会调用acquire方法来获取锁,本质上就是CAS方法
  • 如果获取锁成功(CAS),其它线程进入后也会尝试CAS获取锁
  • 如果获取锁失败(CAS),就放入FIFO链式队列中阻塞等待

锁释放

我们知道使用锁的时候当线程完成任务时也一定要记得释放锁,否则就会发生死锁问题,那么在释放锁后,AQS又发生了什么呢?

这里我们就快速定位到释放锁方法,主要是以下两个步骤

  • 调用具体的释放锁方法,ReentrantLock中最后是将state的值设置回了0
  • 由AQS负责调用unparkSuccessor方法,将后续节点节点从阻塞状态被唤醒 (底层方法LockSupport.unpark(s.thread);)
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

ReentrantLock中释放锁具体实现

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

那么后续节点所对应的阻塞线程从阻塞状态中恢复后发生了什么?

其实我们就看刚才发生阻塞的方法在调用了parkAndCheckInterrupt()后被阻塞,那么被唤醒后其实就是再次循环调用tryAcquire方法获取锁。这样子我们就分析完了从锁获取→阻塞→再次获取锁的一个分析流程。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //在这行被阻塞
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

用AQS设计一个简单的锁

到这里我们就能自己实现一个自己的锁了,我们仿照ReentrantLock的逻辑实现一个我们的锁,AQS使用了模板设计模式,将需要实现的方法开放给了我们,我们需要重写一些方法来实现具体的上锁和解锁方法。

下面的方法是博主自己写的,仅供学习参考,但是实现了基本的逻辑,大家可以参考ReentrantLock源码实现一个完善的锁

import java.util.concurrent.locks.AbstractQueuedSynchronizer;


public class AQSTest {

    public static void main(String[] args) {
        Lock lock = new Lock();

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("t1尝试获取锁");
                lock.lock();
                lock.lock();//重入锁
                System.out.println("t1获取到锁");
                System.out.println("5s后释放锁");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock.unlock();//释放锁
                lock.unlock();//释放锁
                System.out.println("t1释放锁");
            }
        });
        t1.start();

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("主线程尝试获取锁");
        lock.lock();
        System.out.println("主线程获取到锁");
        lock.unlock();
        System.out.println("主线程释放锁");

    }

}

class Lock {

    private Sync sync = new Sync();

    void lock(){
        sync.lock();
    }

    void unlock(){
        sync.unlock();
    }

    class Sync extends AbstractQueuedSynchronizer{

        void lock(){
            acquire(1);
        }

        /**
         * 返回true代表获取锁成功,false代表获取锁失败
         * @param arg
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) {
            int state = getState();
            if (state == 0){
                //CAS操作
                compareAndSetState(0,arg);
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }else if (getExclusiveOwnerThread() == Thread.currentThread()){
                //可重入锁逻辑
                int nextS = getState() + arg;
                setState(nextS);//更新状态值,直接更新
                return true;
            }
            return false;

        }

        /**
         * 返回true代表释放锁成功,false代表释放锁失败
         * @param arg
         * @return
         */
        @Override
        protected boolean tryRelease(int arg) {
            //当前线程释放锁
            int c = getState() - arg;
            if (Thread.currentThread() != getExclusiveOwnerThread()){
                throw new IllegalMonitorStateException();
            }
            boolean b = false;
            if (c == 0){
                b = true;
                setExclusiveOwnerThread(null);//清除拥有锁的线程引用
            }
            setState(c);
            return b;
        }

        void unlock(){
            release(1);
        }

    }

}

结果

t1尝试获取锁
t1获取到锁
5s后释放锁
主线程尝试获取锁
t1释放锁
主线程获取到锁
主线程释放锁

下面是AQS的JDK注释,如果要设计锁的话可以仔细结合源码阅读

为实现阻塞锁和依赖于先进先出(FIFO)等待队列的相关同步器(semaphores、事件等)提供了一个框架。该类旨在为大多数依赖于单个原子 int 值来表示状态的同步器提供有用的基础。子类必须定义改变该状态的受保护方法,以及定义该状态在该对象被获取或释放时的含义。有了这些方法,该类中的其他方法就可以执行所有队列和阻塞机制。子类可以维护其他状态字段,但只有使用 getStatesetStatecompareAndSetState 方法原子更新的 int 值才会被同步跟踪。
子类应定义为非公开的内部辅助类,用于实现其外层类的同步属性。AbstractQueuedSynchronizer 类没有实现任何同步接口。相反,该类定义了 acquireInterruptibly 等方法,具体锁和相关同步器可根据需要调用这些方法来实现其公共方法。
该类支持默认独占模式和共享模式。在独占模式下获取时,其他线程尝试获取不会成功。多个线程在共享模式下的获取可能(但不必)成功。该类并不 "理解 "这些差异,只是在机械意义上,当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也能获取。在不同模式下等待的线程共享同一个 FIFO 队列。通常,实现子类只支持其中一种模式,但这两种模式都会发挥作用,例如在读写锁中。只支持独占模式或共享模式的子类无需定义支持未使用模式的方法。
该类定义了一个嵌套的 AbstractQueuedSynchronizer.ConditionObject 类,支持独占模式的子类可将该类用作 Condition 实现,其中的方法 isHeldExclusively 会报告同步是否被当前线程独占,使用当前 getState 值调用的方法 release 会完全释放该对象,而获取(acquire)则会根据保存的状态值,最终将该对象恢复到之前的获取状态。没有任何 AbstractQueuedSynchronizer 方法会在其他情况下创建这样的条件,因此如果无法满足这一限制条件,请不要使用该方法。当然,AbstractQueuedSynchronizer.ConditionObject 的行为取决于其同步器实现的语义。
该类为内部队列提供了检查、检测和监控方法,并为条件对象提供了类似的方法。这些方法可根据需要导出到使用 AbstractQueuedSynchronizer 作为同步机制的类中。
该类的序列化只存储底层的原子整数维护状态,因此反序列化对象的线程队列是空的。需要序列化的典型子类将定义一个 readObject 方法,用于在反序列化时将其恢复到已知的初始状态。

公平锁与非公平锁

最后我们来对比下ReentrantLock中公平锁和非公平锁的区别

非公平锁

final void lock() {
    if (compareAndSetState(0, 1)) //直接尝试CAS,插队了
        setExclusiveOwnerThread(Thread.currentThread());//CAS成功就获取到锁
    else
        acquire(1);//没抢到才执行AQS里获取锁的逻辑,也就是在进行一次CAS,再没抢到锁才去排队
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平锁

final void lock() {
    acquire(1);//老老实实执行获取锁的操作
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&  //先看有没有其它线程排在该线程前面,如果该线程就是队首才尝试去CAS,不然就去排队
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
}

ReentrantLock对于两种锁的执行逻辑

  1. 线程1进入
  2. 如果锁可用(同步状态允许),直接获取锁
  3. 锁不可用(被占用),封装为Node放入FIFO队列并阻塞
    1. **非公平锁:**线程2进入,尝试获取锁(共2次CAS),如果竞争失败,放入FIFO队尾,阻塞等待
    2. **公平锁:**线程2进入,检查等待队列,确保无线程等待,若存在线程等待,加入队尾等待
  4. 线程1释放锁,更新同步状态,唤醒队列中下一个等待线程
  5. 线程2被唤醒,尝试获取锁

相信经过本文的讲述你对AQS应该也不再陌生,如果感兴趣一定要自己去动手实践一下,前路漫漫,与君共勉!

标签:return,AQS,int,ReentrantLock,获取,源码,线程,final
From: https://blog.csdn.net/m0_63902044/article/details/139206035

相关文章

  • 【初探Java之路 六 】集合1-ArrayList和LinkedList的使用与源码分析
    ......
  • 基于python+django框架旅游景区景点购票系统设计与实现(源码+LW+安装+基础课)
     博主介绍:黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者,CSDN博客专家,在线教育专家,CSDN钻石讲师;专注大学生毕业设计教育和辅导。所有项目都配有从入门到精通的基础知识视频课程,学习后应对毕业设计答辩。项目配有对应开发文档、开题报告、任务书、P......
  • 【转】centos7.9源码安装mysql5.7.44
    原文:https://blog.csdn.net/SeeYouGoodBye/article/details/1352314511、环境介绍centos7.9mysql5.7.44boost1.59.0注意:这里的编译版本mysql5.7.44和boost1.59.0是有依赖的,建议使用相同版本2、安装编译要用的依赖软件yuminstall-ygccgcc-c++cmakelibaio-develncu......
  • 计算云上对mysql源码debug
    前准备vscode(安装remotedev插件)、mysql源码、c/c++、安装cmake、安装gccmysql源码:gitclonehttps://github.com/mysql/mysql-server.gitcmake安装:sudoyuminstallcmake3查看cmake版本:cmake--version安装gcc:sudoyuminstallgcc在计算云上编译mysql1、配置(进入m......
  • 一键启动,无限创作:AI数字人系统源码,轻松制作属于你的数字人模型!
    数字人,这一新兴概念,正逐渐渗透到我们生活的方方面面。它不仅仅是技术的创新,更是对传统服务模式的颠覆。AI数字人系统源码(源码:ai6ai69)的部署,为企业和个人提供了一个全新的创作平台,让每个人都能轻松打造属于自己的数字人模型。一、AI数字人系统源码部署所需配置在部署AI数字人......
  • 【全开源】多场馆场地预定小程序源码(ThinkPHP+FastAdmin+UniApp)
    场馆场地预定小程序源码一款基于ThinkPHP+FastAdmin+UniApp开发的多场馆场地预定小程序,提供运动场馆运营解决方案,适用于体育馆、羽毛球馆、兵乒球馆、篮球馆、网球馆等场馆(高级版)......
  • 【全开源】教育系统源码(支持微信小程序+移动端H5+安卓APP+IOS-APP)
    构建智慧教育的基石在当今信息化快速发展的时代,教育系统正面临着前所未有的变革。西陆教育系统源码,作为这一变革的先锋力量,以其卓越的性能和灵活性,为教育机构提供了全新的解决方案。一、源码的力量:定制化与可扩展性西陆教育系统源码的核心优势在于其高度的定制化和可扩展......
  • Redis 源码学习记录:集合 (set)
    无序集合Redis源码版本:Redis-6.0.9,本篇文章无序集合的代码均在intset.h/intset.c文件中。Redis通常使用字典结构保存用户集合数据,字典键存储集合元素,字典值为空。如果一个集合全是整数,则使用字典国语浪费内存。为此,Redis设计了intset数据结构,专门用来保存整数......
  • 【制作100个unity游戏之27】使用unity复刻经典游戏《植物大战僵尸》,制作属于自己的植
    最终效果系列导航文章目录最终效果系列导航前言素材简单搭建环境豌豆射手向日葵源码结束前言在游戏界,有些作品以其独特的创意和精彩的游戏体验,成为了经典中的经典。而《植物大战僵尸》就是其中的佼佼者,它以其独特的塔防玩法和富有趣味性的设计,吸引了全球无数玩家......
  • 【计算机毕业设计】基于SSM++jsp的实验室耗材管理系统【源码+lw+部署文档】
             目录第1章绪论1.1课题背景1.2课题意义1.3研究内容第2章开发环境与技术2.1MYSQL数据库2.2JSP技术 2.3SSM框架第3章系统分析3.1可行性分析3.1.1技术可行性3.1.2经济可行性3.1.3操作可行性3.2系统流程3.2.1操作流程3.2.2......