首页 > 编程语言 >CountDownLatch源码解析

CountDownLatch源码解析

时间:2023-12-29 19:55:21浏览次数:44  
标签:Node head int 源码 线程 CountDownLatch 解析 节点

CountDownLatch源码解析

  • countdown是倒计时的意思,latch是门闩的意思,也有门锁的意思,合起来字面意思就是一个倒计树计锁器的意思,先来看一个具体的案例分析大致了解
import java.util.concurrent.CountDownLatch;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        int N = 3; // 有3个操作需要等待
        CountDownLatch latch = new CountDownLatch(N);

        for (int i = 0; i < N; i++) {
            new Thread(new Worker(latch), "Worker " + i).start();
        }

        latch.await();  // 主线程在这里等待
        System.out.println("所有操作完成,主线程继续执行。");
    }

    static class Worker implements Runnable {
        private final CountDownLatch latch;

        Worker(CountDownLatch latch) {
            this.latch = latch;
        }

        public void run() {
            try {
                // 执行一些操作
                System.out.println(Thread.currentThread().getName() + " 完成操作");
            } finally {
                latch.countDown(); // 计数器减一
            }
        }
    }
}

打印结果如下:

Worker 0 完成操作
Worker 2 完成操作
Worker 1 完成操作
所有操作完成,主线程继续执行。

在这个例子中,主线程创建了三个工作线程,并等待这三个线程完成工作。每个工作线程完成工作后会减少计数器的值。当所有工作线程都完成后,主线程继续执行。

这么一看他有点类似计数器锁的样式,有一说一他确实有这个功能,因此可以把他看成一个辅助工具


1.CountDownLatch的构造实现

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

CountDownLatch又名锁存器,他的实现得以依赖与内部的Sync这个类,而此类也就是我们AQS的一个子类,也就是说部分功能依旧通过我们这个AQS的模板类去实现,因此这里的count实际上是注入到state这个字段中去,这个字段用来表示锁的一个状态,也可以用来计数.


1.1.Sync同步队列

sync是锁存器中重要的一个内部类,因此了解sync的内部结构是有必要的:他用来辅助我们去处理一些同步状态的问题,其中最核心的两个方法如下:

 Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

如果有阅读过读写锁的相关源码或者了解过共享模式就能明白tryAcquireShared方法和tryReleaseShared方法具体含义,这里共享模式下仅仅只有0和大于0的数,每次释放-1,直到为0说明此时的所有事件皆以完成(参考前面案例).


2.await方法的具体实现

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

可以看出此方法的实现依赖与AQS的源码实现

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
 protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

因此实际上传入的参数1没有发挥任何作用,实际上还是取决于AQS的state字段来判断锁存器中的计数是否存在,如果小于0说明此时还有线程在执行或者某个事件还未完成,因此调用doAcquireSharedInterruptibly方法

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这部分的代码是AQS的底层实现,需要对AQS有一定的基本认识再去阅读比较容易理解,核心思维便是管理共享资源的获取,大致逻辑如下:

  • 添加一个新的节点添加到等待队列中(尾插:具体参考AQS)addwaiter;
  • 下面操作是一个自旋的操作,且每次自旋都会去检查是否需要中断
  • 获取新节点的前驱节点,如果前驱节点为头节点(头节点不保存任何具体元素:具体参考AQS)predecessor;
  • 在共享模式下获取状态state,如果大于等于0(可以回头看Sync内部的getstate方法,这里其实类似于双重锁进行了一个临时的校验),说明获取成功,调用setHeadAndPropagate,设置头节点并传播(这个是翻译得来的,具体查看源码),并设置节点域,更改标志位字段,退出自旋状态
  • 如果在获取失败后,考虑到性能问题是否需要中断对线程进行检查,后续再来看

因此核心是setHeadAndPropagate方法,查看源码:

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
  • 这段代码的核心在于设置AQS等待队列中的头节点,并且传播的含义就是其他节点的一个唤醒问题,根据条件是否唤醒其他线程,比如propagate大于0,表示一个传播条件,以及后续的条件都是在考虑是否需要唤醒其他线程,因此会获取此节点的后继节点,如果他的后续为空,并且为共享模式下,那意味着可以唤醒队列中其他的等待节点,
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
  • 先来说他的一个整体设计思想:在共享模式下释放同步状态时使用,它确保状态的释放能够传播到其他等待的线程,所以在更新节点状态的时候底层是一个CAS的操作,下面来解释一下这段代码的具体含义:
    1. 首先是一个自旋的过程,并且头节点不为null,也不是尾节点,这说明此时等待队列中还存在节点,还有等待的线程
    2. 接下来获取头节点的状态,然后是一段逻辑的处理来处理不同的状态,如果是singnal状态,则需要唤醒后续节点,unparkSuccessor会唤醒后继节点,并且直到更新成功,如果 ws == 0,表示当前没有特定的信号状态,但需要设置为传播状态(PROPAGATE),以确保释放操作能传播给后继节点。这里同样使用 CAS 操作尝试更新状态,如果失败,则继续循环。(这里请参考AQS等待队列中节点的状态设置,0并不表示特定的状态)
    3. if (h == head) { break; }:检查头节点是否在循环期间改变。如果没有改变,则跳出循环。这是一种优化,以避免在头节点已经被处理的情况下进行无用的循环。

3.countDown方法的具体实现

public void countDown() {
        sync.releaseShared(1);
    }

这个函数的意义就是去递减锁存器的计数,如果计数为0,则释放所有线程

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  • 和上述流程对应,在共享模式下会去释放对象

    protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    
    • 如果状态为0,说明没有线程独占,也就是锁存器为0的情况,其余情况他会自己减1,并且通过CAS更换state字段;
    private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    而在这里同样去调用此方法的目的是为了唤醒那些在await方法中等待的线程.从而实现一种多线程之间的协调有效通知/等待过程.


    4.CountDownLatch的设计意义

    现有一场景,需要同时满足时,才可以去调用或者实现:例如,在我们的软件开发设计中需要(数据库服务,网络服务,和相关配置服务都同时启动)才能去启动,这么说是否能够理解其中的含义,这其实意味着某几件事中如果可能存在关联的时候需要一定的协调的时候就可以用到这种同步辅助工具

标签:Node,head,int,源码,线程,CountDownLatch,解析,节点
From: https://www.cnblogs.com/blissful/p/17935595.html

相关文章

  • 获取系统信息源码C#
    Environment.UserDomainName //获取与当前用户关联的网络域名。Environment.ProcessorCount //获取当前计算机上的处理器数。Environment.WorkingSet // 获取映射到进程上下文的物理内存量。Environment.Version // 获取公共语言运行库的版本信息。Environment.O......
  • ReentrantReadWriteLock源码阅读
    ReentrantReadWriteLock源码阅读目录ReentrantReadWriteLock源码阅读简介例子代码分析总览Syncstate定义lockcount和holdcountSync.HoldCounter类Sync类其他成员变量tryAcquiretryReleasetryAcquireSharedfullTryAcquireSharedtryReleaseSharedtryWriteLocktryReadLockSync类分......
  • 29.capability 配置参数解析与 分布式运行
    目录capability概述capability配置SeleniumGrid简介分布式运行capability概述Capabilities是WebDriver支持的标准命令之外的扩展命令(配置信息)配置web驱动的属性,如浏览器名称、浏览器平台等。结合SeleniumGrid完成分布式、兼容性等测试官网地址:https://ww......
  • Golang - sync.Pool底层源码详解
    sync.Pool是sync包下的一个组件,用来提高对象复用几率,减少gc的压力,减少内存分配,它是并发安全的,常用来存储并复用临时对象。任何存放区其中的值可以在任何时候被删除而不通知,在高负载下可以动态的扩容,在不活跃时对象池会收缩。可伸缩的,其大小仅受限于内存的大小,可以被看作是一......
  • Spring Cloud工程项目管理系统源码,支持多端展示
    智慧工地解决方案依托计算机技术、物联网、云计算、大数据、人工智能、VR&AR等技术相结合,为工程项目管理提供先进技术手段,构建工地现场智能监控和控制体系,弥补传统方法在监管中的缺陷,最线实现项目对人、机、料、法、环的全方位实时监控。支持多端展示(大屏、PC端、手机端、平板端)。......
  • CRM公司管理系统能为中小企业做哪些事情?CRM功能解析
    巴菲特曾说:“设计出的工具越多,使用工具的人就得越聪明。“”如果您是中小企业主,想要企业更好地发展,您都可以考虑使用CRM管理系统。它可以帮助中小企业有效地管理客户,提高业务效率,实现快、稳、准的发展。本文将详细的描述公司管理系统CRM对中小型公司意味着什么???一、什么是CRM系......
  • OpenEuler22.03源码编译安装nginx1.24.0
    一、环境说明操作系统版本:OpenEuler22.03SP2LTSNginx版本:1.24.0安装位置:/app/nginxSelinux配置:关闭或设置为permissive二、Nginx安装#安装必要依赖dnf-yinstalltargccmakepcrepcre-develzlibzlib-developensslopenssl-devel#创建nginx安装文件夹mkdir/......
  • Kotlin 协程源码阅读笔记 —— Mutex
    Kotlin协程源码阅读笔记——Mutex我们在Java/Kotlin编程时如果需要某段代码块同一时间只有一个线程能够执行时,通常是使用synchronized,但是协程中可不能使用synchronized,为什么呢?如果你了解过协程的工作方式就不会觉得奇怪(如果不了解协程工作方式的同学,可以看以下代码为......
  • 基于源码去理解Iterator迭代器的Fail-Fast与Fail-Safe机制
    原创/朱季谦在Java编程当中,Iterator迭代器是一种用于遍历如List、Set、Map等集合的工具。这类集合部分存在线程安全的问题,例如ArrayList,若在多线程环境下,迭代遍历过程中存在其他线程对这类集合进行修改的话,就可能导致不一致或者修改异常问题,因此,针对这种情况,迭代器提供了两种处......
  • Unity解析key不确定的Json
    遇到Json的key不固定时,只需要解析value,如下Jsondata下的key(1和2)是变化的:{"status":1,"msg":"success","data":["1:":{"atitle":"test",......