首页 > 其他分享 >AQS_应用

AQS_应用

时间:2024-01-14 10:23:33浏览次数:38  
标签:count 加锁 AQS thread int current zero 应用

RocketMQ-CountDownLatch2

public class CountDownLatch2 {
    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch2} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked before threads can pass through {@link
     * #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch2(int count) {
        if (count < 0)
            throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of three things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>The specified waiting time elapses.
     * </ul>
     *
     * <p>If the count reaches zero then the method returns with the
     * value {@code true}.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.  If the time is less than or equal to zero, the method
     * will not wait at all.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count
     * reached zero
     * @throws InterruptedException if the current thread is interrupted while waiting
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

    public void reset() {
        sync.reset();
    }

    /**
     * Returns a string identifying this latch, as well as its state.
     * The state, in brackets, includes the String {@code "Count ="}
     * followed by the current count.
     *
     * @return a string identifying this latch, as well as its state
     */
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

    /**
     * Synchronization control For CountDownLatch2.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        private final int startCount;

        Sync(int count) {
            this.startCount = count;
            setState(count);
        }

        int getCount() {
            return getState();
        }

        // 加共享锁方法——只有当资源数为0时才加锁成功
        @Override
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        // 释放锁,就是将资源数减一,注意CAS失败后重试,所以为死循环
        @Override
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (; ; ) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

        protected void reset() {
            setState(startCount);
        }
    }
}

上文分析了 AQS 的源码、加锁释放锁的逻辑,其实 AQS 在许多框架中有自己的实现,比如在 RocketMQ 中封装的 CountDownLatch2 就是利用了 AQS 加锁失败时线程释放CPU资源并等待的原理。

加锁操作只有在内部资源数为0 时才会成功,对于 CountDownLatch2 最后退出时资源数才可能调整为0,所以所有的线程加锁都会失败,那么会进入自旋进而调用 LockSupport.park() 方法进入等待状态,该状态会释放系统资源。

// 对应 await() 方法,阻塞等待,因为调用 tryAcquireShared() 初始返回-1,进入doAcquireSharedInterruptibly() 后会自旋尝试加锁
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


// 尝试加锁操作
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 会在这里不停自旋尝试加锁,直到加锁成功退出,而加锁成功的条件是资源数为0
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

解锁操作就是将资源数减一,减为0 时标志着需要唤醒 CountDownLatch2 等待的线程,此时再执行加锁操作 tryAcquireShared() 会返回1,即加锁成功,那么会退出 doAcquireSharedInterruptibly() 方法,结束自旋加锁操作,await() 方法会理解结束。

标签:count,加锁,AQS,thread,int,current,zero,应用
From: https://www.cnblogs.com/istitches/p/17963406

相关文章

  • pandas典型应用
    #header默认为0,即以第一列为列名,这里设为None,意味不设置第一行为列名;df11=pd.read_table(url,header=None)df11.head()#names=***,可以自定义设置列名user_cols=['id','quantity','name','choice','price']df=pd.read_table(url,header=None......
  • .NET中轻松应用SQLite:零配置数据库引擎的完美指南
     SQLite是一种轻量级的嵌入式数据库引擎,它在.NET中被广泛使用。SQLite是一个零配置的数据库引擎,不需要服务器,可以直接在应用程序中使用。下面是一个简单的示例,演示如何在.NET中使用SQLite,并提供了常见的查询、增加、修改和删除功能。首先,你需要在项目中安装 System.D......
  • Django客户端应用1向服务端应用2发送POST请求并接收解析数据
    一、应用1发送post请求deflogin(url,data):response=requests.post(url,json=data)ifresponse.status_code==200:result=response.json()print(result)returnresultelse:returnNonetry:url="htt......
  • AQS源码解析
    AQS结构特性内部包含Node、ConditionObject静态内部类,Node用来存储没竞争到锁的线程状态、CondidtionObject是对条件变量的封装;volatileintstate变量记录锁的状态,1表示锁被持有、0表示锁被释放,同时对应三个方法来更改/获取锁的状态:getState()、setState(intnewState......
  • Controller(StatefulSet)-部署有状态应用,部署守护进程,一次任务和定时任务
    Controller(StatefulSet)-部署有状态应用在Kubernetes中,StatefulSet是一种用于部署有状态应用的控制器。与无状态应用不同,有状态应用需要保持持久性和可识别的网络标识。在有状态应用中,每个Pod都有一个唯一的标识符,并且Pod的创建和删除顺序是有序的。在StatefulSet中创建的Pod具有以......
  • 如何让Visual Studio Tools for Unity插件用于调试你自己的Mono嵌入应用程序
       最近在测试将mono嵌入到C++应用程序中,苦于没有调试器,有时候还是不怎么方便。网上搜了一下,有VS插件MDebug、VSMonoDebugger,实际试用了一下,有点麻烦,而且似乎对Windows+VisualStudio2022支持不大好。因此想到了,Unity引擎是基于mono的,VisualStudio2022也内置了针对Unity的......
  • Kubernetes Controller(Deployment)-发布应用
    Kubernetes控制器(Deployment)是一个用于发布和管理应用程序的核心组件。它提供了一种声明式的方式来定义应用程序的期望状态,并确保系统自动地将当前状态与期望状态保持一致。通过使用Deployment,您可以定义应用程序的副本数、应用程序部署的容器镜像、应用程序的依赖关系等等。一旦......
  • 如何正确了解应用高防IP
    一、简介随着互联网的快速发展,网络安全问题逐渐凸显。高防IP作为一项重要的网络安全服务,已经成为了守护网络安全的重要一环。高防IP通过提供强大的抗DDoS攻击能力和其他网络安全防护措施,保障用户业务的安全稳定运行。本文将详细介绍高防IP,让读者可以更轻易的了解如何使用和为什么接......
  • 函数计算域名调试web应用
    函数计算域名调试web应用如果没有域名的话,可以利用一个小技巧来绕过阿里云对于函数计算域名使用的限制,从而直接使用阿里云的域名进行访问先进入如下页面复制公网访问地址​​然后安装可以修改响应头的浏览器插件,例如https://github.com/FirefoxBar/HeaderEditor,然后进行类......
  • 防抖节流的应用场景
    1.防抖:表单验证:当用户在输入框中输入内容时,可以使用防抖来延迟验证用户输入的内容,减少验证的频率。按钮点击:当用户频繁点击按钮时,可以使用防抖来确保只有最后一次点击生效,避免重复提交或触发不必要的操作。搜索框自动补全:当用户在搜索框中输入关键字时,可以使用防抖来延迟发送请求,减......