首页 > 其他分享 >【工具类】可重用的CountDownLatch

【工具类】可重用的CountDownLatch

时间:2023-05-09 16:13:13浏览次数:38  
标签:count reset return int 重用 sync CountDownLatch 工具 public

欢迎review代码,指出错误

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 可重用的CountDownLatch
 * 增加reset方法:count值减少到0后,可以通过reset方法重置,可重复使用
 * 增加版本号:可以通过自主控制版本号来实现带有固定周期数的等待和唤醒
 */
public class ReusableCountDownLatch {
    private final Sync sync;
    /**
     * 等待线程的版本号
     */
    private ThreadLocal<Long> threadVersion = new ThreadLocal<>();
    /**
     * 当前对象的最新版本号
     */
    private AtomicLong latchVersion = new AtomicLong(0);

    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        /**
         * 记录count值,用于重置时使用
         */
        private int count;
        /**
         * 是否自动重置
         */
        private boolean autoReset;

        Sync(int count) {
            this.count = count;
            this.autoReset = false;
            setState(count);
        }
        Sync(int count,boolean autoReset) {
            this.count = count;
            this.autoReset = autoReset;
            setState(count);
        }

        protected void reset() {
            latchVersion.getAndIncrement();
            setState(count);
        }
        protected void reset(long version) {
            latchVersion.set(version);
            setState(count);
        }

        int getCount() {
            return getState();
        }

        /**
         * 尝试获取共享锁,AQS框架保证了获取锁和释放锁的过程不会出现并发问题
         * @param acquires the acquire argument. This value is always the one
         *        passed to an acquire method, or is the value saved on entry
         *        to a condition wait.  The value is otherwise uninterpreted
         *        and can represent anything you like.
         * @return
         */
        protected int tryAcquireShared(int acquires) {
            Long tVersion = threadVersion.get();
            long lVersion = latchVersion.get();
            if(tVersion != null && lVersion > tVersion) {
                threadVersion.set(null);
                return 1;
            } else if(tVersion != null && lVersion < tVersion) {
                return -1;
            }
            boolean res = getState() == 0;
            if(!res) {
                threadVersion.set(lVersion);
                return -1;
            }
            return 1;
        }

        /**
         * 尝试释放共享锁
         * @param releases the release argument. This value is always the one
         *        passed to a release method, or the current state value upon
         *        entry to a condition wait.  The value is otherwise
         *        uninterpreted and can represent anything you like.
         * @return
         */
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (; ; ) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc)) {
                    boolean res = nextc == 0;
                    if(res && autoReset) {
                        // 自动reset之后才会唤醒等待线程
                        reset();
//                        System.out.println("rest");
                    }
                    return res;
                }

            }
        }
    }

    public ReusableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public ReusableCountDownLatch(int count, boolean autoReset) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count,autoReset);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void await(long version) throws InterruptedException {
        threadVersion.set(version);
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public boolean await(long timeout, TimeUnit unit,long version)
            throws InterruptedException {
        threadVersion.set(version);
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public void reset() {
        sync.reset();
    }

    public void reset(long version) {
        sync.reset(version);
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

    // chatgpt帮忙写的测试用例
    public static void main(String[] args) throws InterruptedException {
        System.out.println("start");
        //带自动重置
        ReusableCountDownLatch latch = new ReusableCountDownLatch(3, true);

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                latch.countDown();
                System.out.println("Thread finished");
            }).start();
        }
        System.out.println("All threads await");
        latch.await();
        System.out.println("All threads finished");

        // 如果是不自动重置的需要手动重置
        //latch.reset();

        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                latch.countDown();
                System.out.println("Thread finished");
            }).start();
        }

        latch.await();
        System.out.println("All threads finished again");
    }
}

 

标签:count,reset,return,int,重用,sync,CountDownLatch,工具,public
From: https://www.cnblogs.com/wsss/p/17385407.html

相关文章

  • 【工具类】线程安全的滑动时间窗口记录工具类
    闲来无事,分享一个工具类,写的不好,轻喷,欢迎指出问题目标是线程安全无锁高性能的记录滑动时间窗口值importlombok.Getter;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;importjava.util.conc......
  • 【0基础学爬虫】爬虫基础之自动化工具 Pyppeteer 的使用
    大数据时代,各行各业对数据采集的需求日益增多,网络爬虫的运用也更为广泛,越来越多的人开始学习网络爬虫这项技术,K哥爬虫此前已经推出不少爬虫进阶、逆向相关文章,为实现从易到难全方位覆盖,特设【0基础学爬虫】专栏,帮助小白快速入门爬虫,本期为自动化工具Pyppeteer的使用。概述......
  • 一个C#开发的Windows远程桌面工具
    作为一名程序员,日常远程到服务器再正常不过了,在Windows环境,我们一般是通过操作系统自带、或者第三方工具。今天给你推荐一个开源的Windows远程桌面工具。项目简介这是一个基于MSTSC连接Windows远程桌面,并对其进行封装实现管理多个远程桌面配置的小工具,兼容WindowsXP及以......
  • 多连接的数据库管理工具Navicat Premium 16.1.9 Mac版
    NavicatPremium是一款多连接的数据库管理工具,它是一款免费的多通道、多连接程序,它支持企业和组织同时使用多个应用程序,在一个应用程序中运行多个数据库管理程序。使用Premium可以在同一应用程序中执行多个数据库程序。NavicatPremium可根据应用程序或Web服务之间的速度差异调......
  • 目前常用的在线格式化工具
    一、BeJson格式化工具网址:在线JSON校验格式化工具(BeJSON)优点:工具多缺点:广告多,界面设计较旧,拼凑的工具网站,界面风格差异较大不统一。    二、Robots2开发工具箱网址:Robots2开发工具网站优点:工具界面风格统一,界面整洁,有日常开发用到的工具和网站导航......
  • 人大金仓数据库迁移工具web版访问方式
    1.新版使用谷歌浏览器进行访问访问地址: http://localhost:54523/默认用户名及密码:kingbase/kingbase2. 老版使用谷歌浏览器进行访问访问地址:http://localhost:8080/默认用户名及密码:admin/123456&*会话保存策略:会话保存时间为一天,服务重启或登出失......
  • HBuilderX启动微信开发者工具报错[error] Error: Fail to open IDE
    报错提示如下: 解决方法:1.使用自己的账号登录。2.在微信开发者平台上申请appid并更改项目中的appid。3.删除项目中微信小程序的appid,这样就能在HBuilderX中启动游客身份的微信开发者工具。  检查其他步骤是否正确:1.打开微信开发者工具,在安全选项里开启服务端口......
  • API文档工具
    SpringBoot实战电商项目mall(50k+star)地址:https://github.com/macrozheng/mallSwaggerSwagger是一款非常流行的API文档工具,它能帮助你简化API文档的开发,极大提高开发效率,之前在mall项目中就是使用的它。  我们一般将Swagger和SpringBoot结合使用,使用的是Springfox给我们提......
  • 最新版本Camera Raw 15.3增效工具,新增AI功能
    Ps关于CameraRaw滤镜的消息大家都听了很多很多了,今天给大家分享的就是CameraRaw的最新版本,也就是那个传说中增加了AI功能的版本。对比先前两个版本,15.3在功能上也就做了2个值得关注的更新:1.AI降噪;2.AI智能蒙板。而改动最大的就是蒙版的支持,目前来看,多个AI蒙版的选择和使......
  • 时间工具函数
    工具函数:exportdefault{methods:{getDay(day){vartoday=newDate();vartargetday_milliseconds=today.getTime()+1000*60*60*24*day;today.setTime(targetday_milliseconds);//注意,这行是关键代码vartYear=today.ge......