首页 > 其他分享 >支持线程编排的并行框架AsyncTool

支持线程编排的并行框架AsyncTool

时间:2024-09-16 20:53:50浏览次数:3  
标签:AsyncTool 任务 private 编排 param 线程 WorkerWrapper 执行 public

它是由京东零售开源的项目,作者是天涯泪小武。如果大家想更深入理解可以去作者的博客看一下。

为什么会学习这个框架

最近在学习java并发中的CompletableFuture,它除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。

在上网查阅资料的过程中发现一款可支持线程编排的并行框架 asyncTool,在Gitee官网找到后发现该项目不仅开源作者还写了文档去一行一行代码进行教学,这让一个对并发编排任务感兴趣但又没有基础的人来了兴趣,因此我结合源码以及作者的介绍以及其他博客的分析去学习了该框架。

框架的作用

这个模块我特别喜欢作者的介绍,先引出问题然后回答,让我一个并没有并发项目经验的人深刻理解这个框架究竟是要做什么,以下是对作者介绍的借鉴和自己的总结:

为什么需要一个这样的带任务编排的框架

1.当用户请求某个功能时,可能需要去调用各种各样的接口最后汇总结果去展示给用户。

2.在一些需求中可能不同块代码之间有依赖关系,后执行的需要依赖先执行的,以及超时、阻塞、异常等情况的处理

用图片来解释就是(图片来自作者):

img

并发框架的监控

任务编排其实可以通过CompletableFuture的api经过复杂的组装后实现,但对于该类来说,使用者在supplyAsync()后无法知道该异步任务的执行情况,虽然也有一些方法可以获取执行结果以及执行中的异常,但如果该任务没有执行被跳过或者其他情况那么就不知道了,因此并发框架应该能对每一步任务进行监控,无论成功失败都应该有回调。

每个任务之间应该有强弱依赖

例如上面的图片中的一些情况:

  • A执行完后才能执行B
  • A B全执行完后才能执行C
  • A B其中一个执行后执行C
  • A B其中某一个指定执行后执行C

所以在写该框架时每个任务都有自己的依赖任务和下级任务,依赖任务按要求执行后才能执行该任务

任务可以使用依赖任务中的结果

既然任务之间有执行顺序,那么一定想使用上级的结果来处理本级的任务吧。

超时时间设置

虽然每一个任务单独的时间无法设置,但可以设置全组任务的timeout,来控制该任务的执行时间

框架高性能低线程

该框架中全程无锁

AB执行后才能执行C这种情况下,C会运行在AB中后执行完的那个线程上,严格贯彻低线程理念

框架的应用

串行

串行比较好设计,那就来三个任务依次执行即可

在该框架中任务需要实现IWorker接口和ICallback接口,并重写以下方法:

  • begin() 任务开始前执行的方法
  • action() 任务中耗时操作执行的位置
  • result() 任务执行后的结果,可以在此处理action中的结果值
  • defalutValue() 当执行中有异常后的默认返回值

模拟串行场景:A任务对参数+1,之后B任务对参数+2,之后C任务对参数+3。

A任务:

public class WorkerA implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {

    @Override
    public void begin() {
        System.out.println("A - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
    }

    @Override
    public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
        Integer res = object + 1;
        return res;
    }

    @Override
    public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
        System.out.println("A - param:" + JSON.toJSONString(param));
        System.out.println("A - result:" + JSON.toJSONString(workResult));

        System.out.println("A - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
    }

    @Override
    public Integer defaultValue() {
        System.out.println("A - defaultValue");
        return 101;
    }

}

B任务:

public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {

    @Override
    public void begin() {
        System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
    }

    @Override
    public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
        Integer res = object + 2;
        return res;
    }

    @Override
    public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
        System.out.println("B - param:" + JSON.toJSONString(param));
        System.out.println("B - result:" + JSON.toJSONString(workResult));

        System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
    }

    @Override
    public Integer defaultValue() {
        System.out.println("B - defaultValue");
        return 102;
    }
}

任务3:

public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {

    @Override
    public void begin() {
        System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
    }

    @Override
    public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
        Integer res = object + 3;
        return res;
    }

    @Override
    public void result(boolean success, Integer param, WorkResult<Integer> workResult) {
        System.out.println("C - param:" + JSON.toJSONString(param));
        System.out.println("C - result:" + JSON.toJSONString(workResult));

        System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
    }

    @Override
    public Integer defaultValue() {
        System.out.println("C - defaultValue");
        return 103;
    }
}

编写WorkerWrapper包装类,该类就是任务编排的core类

//A没有depend
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
        .id("workerA")
        .worker(workerA)
        .callback(workerA)
        .param(1)
        .build();

//B的depend是A
WorkerWrapper wrapperB = new WorkerWrapper.Builder<Integer, Integer>()
        .id("workerB")
        .worker(workerB)
        .callback(workerB)
        .param(2)
        .depend(wrapperA)
        .build();

//C的depend是B
WorkerWrapper wrapperC = new WorkerWrapper.Builder<Integer, Integer>()
        .id("workerC")
        .worker(workerC)
        .callback(workerC)
        .param(3)
        .depend(wrapperB)
        .build();
//begin
Async.beginWork(1000, wrapperA);

通过WorkerWrapper中的静态内部类Build去构建一个WorkerWrapper对象,从而实现任务的编排

最后通过Async类提交任务执行

并行

并行只需3个任务一并丢进beginWork中即可

Async.beginWork(1000, wrapperA,wrapperB,wrapperC);

阻塞等待 - 先串行,后并行

public class Test {
   public static void main(String[] args) throws ExecutionException, InterruptedException {
      Worker1 worker1 = new Worker1();
      Worker2 worker2 = new Worker2();
      Worker3 worker3 = new Worker3();

      WorkerWrapper wrapper1 = new WorkerWrapper.Builder<Integer,Integer>()
              .id("worker1")
              .worker(worker1)
              .callback(worker1)
              .param(1)
              .build();

      WorkerWrapper wrapper2 = new WorkerWrapper.Builder<Integer,Integer>()
              .id("worker2")
              .worker(worker2)
              .depend(wrapper1)
              .callback(worker2)
              .param(2)
              .build();

      WorkerWrapper wrapper3 = new WorkerWrapper.Builder<Integer,Integer>()
              .id("worker3")
              .worker(worker3)
              .depend(wrapper1)
              .callback(worker3)
              .param(3)
              .build();

      Async.beginWork(5000,wrapper1);
   }
}

让BC都依赖于A然后把A丢进beginWork即可.

阻塞等待 - 先并行,后串行

public class Test {
   public static void main(String[] args) throws ExecutionException, InterruptedException {
      Worker1 worker1 = new Worker1();
      Worker2 worker2 = new Worker2();
      Worker3 worker3 = new Worker3();

      WorkerWrapper wrapper1 = new WorkerWrapper.Builder<Integer,Integer>()
              .id("worker1")
              .worker(worker1)
              .callback(worker1)
              .param(1)
              .build();

      WorkerWrapper wrapper2 = new WorkerWrapper.Builder<Integer,Integer>()
              .id("worker2")
              .worker(worker2)
              .callback(worker2)
              .param(2)
              .build();

      WorkerWrapper wrapper3 = new WorkerWrapper.Builder<Integer,Integer>()
              .id("worker3")
              .worker(worker3)
              .depend(wrapper1,wrapper2)
              .callback(worker3)
              .param(3)
              .build();

      Async.beginWork(5000,wrapper1,wrapper2);
   }
}

C依赖AB,把AB 一起丢入即可

异常/超时回调

这2种场景,可以基于以上场景微调,即可debug调试。

Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)

1.基于全组设定的timeout,如果超时了,则worker中的返回值使用defaultValue()
2.如果当前Worker任务异常了,则当前任务使用defaultValue(),并且depend当前任务的,也FastFail,返回defaultValue()

框架的实现

包结构

图片来自涛声依旧叭

回调包即为实现作者对并发框架每一步任务监控的想法的实现

包装类即为编排任务核心类

执行器即对编排任务根据依赖关系去执行的实现

简单调用流程

Async执行器触发WorkerWrapper包装器类执行,WorkerWrapper中包装了任务类IWorker和回调类ICallback以及任务之间的依赖关系还有异常返回

回调接口:IWorker、ICallback

IWorker

/**
 * 每个最小执行单元需要实现该接口
 * @author wuweifeng wrote on 2019-11-19.
 */
public interface IWorker<T, V> {
    /**
     * 在这里做耗时操作,如rpc请求、IO等
     *
     * @param object
     *         object
     */
    V action(T object, Map<String, WorkerWrapper> allWrappers);

    /**
     * 超时、异常时,返回的默认值
     * @return 默认值
     */
    V defaultValue();
}

<T, V> T入参泛型 V出参泛型

ICallback

/**
 * 每个执行单元执行完毕后,会回调该接口</p>
 * 需要监听执行结果的,实现该接口即可
 * @author wuweifeng wrote on 2019-11-19.
 */
public interface ICallback<T, V> {

    void begin();

    /**
     * 耗时操作执行完毕后,就给value注入值
     *
     */
    void result(boolean success, T param, WorkResult<V> workResult);
}
/**
 * 默认回调类,如果不设置的话,会默认给这个回调
 * @author wuweifeng wrote on 2019-11-19.
 */
public class DefaultCallback<T, V> implements ICallback<T, V> {
    @Override
    public void begin() {
        
    }

    @Override
    public void result(boolean success, T param, WorkResult<V> workResult) {

    }

}

不传回调类默认给一个空的回调

包装类WorkerWrapper

有WorkerWrapper和DependWrapper两个类,因为依赖任务中还有个必要属性must,即判断是否某个任务执行后才能往后执行,还是只要任一执行即能向后执行。

源码
/**
 * 对每个worker及callback进行包装,一对一
 *
 * @author wuweifeng wrote on 2019-11-19.
 */
public class WorkerWrapper<T, V> {
    /**
     * 该wrapper的唯一标识
     */
    private String id;
    /**
     * worker将来要处理的param
     */
    private T param;
    private IWorker<T, V> worker;
    private ICallback<T, V> callback;
    /**
     * 在自己后面的wrapper,如果没有,自己就是末尾;如果有一个,就是串行;如果有多个,有几个就需要开几个线程</p>
     * -------2
     * 1
     * -------3
     * 如1后面有2、3
     */
    private List<WorkerWrapper<?, ?>> nextWrappers;
    /**
     * 依赖的wrappers,有2种情况,1:必须依赖的全部完成后,才能执行自己 2:依赖的任何一个、多个完成了,就可以执行自己
     * 通过must字段来控制是否依赖项必须完成
     * 1
     * -------3
     * 2
     * 1、2执行完毕后才能执行3
     */
    private List<DependWrapper> dependWrappers;
    /**
     * 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调
     * 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取
     * <p>
     * 1-finish, 2-error, 3-working
     */
    private AtomicInteger state = new AtomicInteger(0);
    /**
     * 该map存放所有wrapper的id和wrapper映射
     */
    private Map<String, WorkerWrapper> forParamUseWrappers;
    /**
     * 也是个钩子变量,用来存临时的结果
     */
    private volatile WorkResult<V> workResult = WorkResult.defaultResult();
    /**
     * 是否在执行自己前,去校验nextWrapper的执行结果<p>
     * 1   4
     * -------3
     * 2
     * 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。
     * 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的
     */
    private volatile boolean needCheckNextWrapperResult = true;

    private static final int FINISH = 1;
    private static final int ERROR = 2;
    private static final int WORKING = 3;
    private static final int INIT = 0;
}

几个重要属性:

  • worker/callback 任务类和回调类 param action方法中的参数(要处理的参数)
  • nextWrappers 在该任务后执行的任务
  • dependWrappers 该任务依赖的任务
构建

在阅读源码的过程中发现WorkerWrapper类中有一个静态内部类Builder,看了分析文章以及查阅资料后才知道这是建造者模式

**建造者模式:**建造者模式将对象的创建过程和表现分离,并且调用方通过指挥者调用方法对对象进行构建,使得调用方不再关心对象构建过程,构建对象的具体过程可以根据传入类型的不同而改变。

//使用建造者模式构建WorkerWrapper
WorkerWrapper wrapperA = new WorkerWrapper.Builder<Integer, Integer>()
        .id("workerA")
        .worker(workerA)
        .callback(workerA)
        .param(null)
        .depend(wrapperB, wrapperC)
        .build();
public static class Builder<W, C> {
    /**
     * 该wrapper的唯一标识
     */
    private String id = UUID.randomUUID().toString();
    /**
     * worker将来要处理的param
     */
    private W param;
    private IWorker<W, C> worker;
    private ICallback<W, C> callback;
    /**
     * 自己后面的所有
     */
    private List<WorkerWrapper<?, ?>> nextWrappers;
    /**
     * 自己依赖的所有
     */
    private List<DependWrapper> dependWrappers;
    /**
     * 存储强依赖于自己的wrapper集合
     */
    private Set<WorkerWrapper<?, ?>> selfIsMustSet;

    private boolean needCheckNextWrapperResult = true;
}

内部类Builder中的属性和WorkerWrapper中的一样,主要是通过build()方法去构建WorkerWrapper对象。

AsyncTool执行器工作过程

不传入线程池默认

public static final ThreadPoolExecutor COMMON_POOL =
            new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,  
                    1024,
                    15L, 
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),
                    (ThreadFactory) Thread::new);

第一个参数:表示线程池的核心线程数是系统可用处理器数量的两倍。这样做可以确保在大多数情况下,有足够的线程来并行处理任务,充分利用多核CPU的计算能力。

第二个参数:最大线程数

第三/四个参数:存活时间,当线程数大于核心线程数时,多余的空闲线程在终止前等待新任务的最长时间。

第五个参数:LinkedBlockingQueue实例,这是一个基于链表结构的阻塞队列。它的大小没有限制(或者说是Integer.MAX_VALUE)

第六个参数:线程工厂

核心方法

public static boolean beginWork(long timeout, ThreadPoolExecutor pool, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {
    	//没有任务直接返回false
        if(workerWrappers == null || workerWrappers.size() == 0) {
            return false;
        }
        //定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result
        Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();
        CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
        for (int i = 0; i < workerWrappers.size(); i++) {
            WorkerWrapper wrapper = workerWrappers.get(i);
            //用CompletableFuture异步执行任务
            futures[i] = CompletableFuture.runAsync(() -> wrapper.work(pool, timeout, forParamUseWrappers), pool);
        }
        try {
            //用CompletableFuture异步执行任务全部执行完后获取结果
            CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
            return true;
        } catch (TimeoutException e) {
            Set<WorkerWrapper> set = new HashSet<>();
            totalWorkers(workerWrappers, set);
            for (WorkerWrapper wrapper : set) {
                wrapper.stopNow();
            }
            return false;
        }
    }

WorkerWrapper处理任务

核心work()
/**
     * 开始工作
     * fromWrapper代表这次work是由哪个上游wrapper发起的
     */
    private void work(ThreadPoolExecutor poolExecutor, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
        this.forParamUseWrappers = forParamUseWrappers;
        //将自己放到所有wrapper的集合里去
        forParamUseWrappers.put(id, this);
        long now = SystemClock.now();
        //总的已经超时了,就快速失败,进行下一个
        if (remainTime <= 0) {
            fastFail(INIT, null);
            beginNext(poolExecutor, now, remainTime);
            return;
        }
        //如果自己已经执行过了。
        //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
        if (getState() == FINISH || getState() == ERROR) {
            beginNext(poolExecutor, now, remainTime);
            return;
        }

        //如果在执行前需要校验nextWrapper的状态
        if (needCheckNextWrapperResult) {
            //如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了
            if (!checkNextWrapperResult()) {
                fastFail(INIT, new SkippedException());
                beginNext(poolExecutor, now, remainTime);
                return;
            }
        }

        //如果没有任何依赖,说明自己就是第一批要执行的
        if (dependWrappers == null || dependWrappers.size() == 0) {
            fire();
            beginNext(poolExecutor, now, remainTime);
            return;
        }

        /*如果有前方依赖,存在两种情况
         一种是前面只有一个wrapper。即 A  ->  B
        一种是前面有多个wrapper。A C D ->   B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
        所以需要B来做判断,必须A、C、D都完成,自己才能执行 */

        //只有一个依赖
        if (dependWrappers.size() == 1) {
            doDependsOneJob(fromWrapper);
            beginNext(poolExecutor, now, remainTime);
        } else {
            //有多个依赖时
            doDependsJobs(poolExecutor, dependWrappers, fromWrapper, now, remainTime);
        }

    }

作者的注释十分清晰

工作步骤是:

  • 保存所有任务forParamUseWrappers
  • 判断是否超时
  • 检查是否已经执行过,避免重复处理
  • 检查next是否已经开始执行了,避免多余的处理
  • 没有任何依赖的话自己直接fire()执行
  • 有依赖的话doDependsOneJob/doDependsJobs

只有一个依赖的话,执行完就可以执行自己了

多个依赖的话,需要判断must依赖是否已经全执行才可以执行自己

执行fire()
/**
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
*/
private void fire() {
  //阻塞取结果
  workResult = workerDoJob();
}
/**
* 具体的单个worker执行任务
*/
private WorkResult<V> workerDoJob() {

    //1.Check重复执行
    if (!checkIsNullResult()) {
      return workResult;
    }
  
    try {
      //2.设置Wrapper状态为Working
      //如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
      if (!compareAndSetState(INIT, WORKING)) {
        return workResult;
      }

      //3.回调begin
      callback.begin();

      //4.执行耗时操作action
      V resultValue = worker.action(param, forParamUseWrappers);

      //5.设置Wrapper状态为FINISH
      //如果状态不是在working,说明别的地方已经修改了
      if (!compareAndSetState(WORKING, FINISH)) {
        return workResult;
      }

      workResult.setResultState(ResultState.SUCCESS);
      workResult.setResult(resultValue);
      //6.回调result
      callback.result(true, param, workResult);

      return workResult;
    } catch (Exception e) {
      //7.异常处理:设置状态ERROR\EXCEPTION,结果设置为默认值
      fastFail(WORKING, e);
      return workResult;
    }
}
beginNext()
/**
 * 进行下一个任务
 */
private void beginNext(ExecutorService executorService, long now, long remainTime) {
    //花费的时间
    long costTime = SystemClock.now() - now;

    //1.后续没有任务了
    if (nextWrappers == null) {
        return;
    }

    //2.后续只有1个任务,使用当前任务的线程执行next任务
    if (nextWrappers.size() == 1) {
        nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
        return;
    }

    //3.后续有多个任务,使用CompletableFuture[]包装,有几个任务就起几个线程执行
    CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
    for (int i = 0; i < nextWrappers.size(); i++) {
        int finalI = i;
        futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
                .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
    }

    //4.阻塞获取Future结果,注意这里没有超时时间,超时时间由全局统一控制。
    try {
        CompletableFuture.allOf(futures).get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

开启下一个任务,如果没有就结束程序,有一个就启动一个线程,有多个就启动多个。

doDependsOneJob()
private void doDependsOneJob(WorkerWrapper dependWrapper) {
    //1.依赖超时?
    if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
        workResult = defaultResult();
        fastFail(INIT, null);
    } 
    //2.依赖异常?
    else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
        workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
        fastFail(INIT, null);
    } 
    //3.依赖正常
    else {
        //前面任务正常完毕了,该自己了
        fire();
    }
}

如果判断到只有一个依赖的时候就要进入这个方法,看自己依赖的方法执行情况,如果依赖的方法超时那自己也超时

doDependsJobs()

先判断依赖的任务是否是must,如果一个都不是那就执行自己。

如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干,等到必须完成的完成了的时候就会让该任务进行的

如果fromWrapper是必须的,就判断是否所有的必须都完成了,并检查他们的情况,有超时即自己也超时

如何确保多依赖任务不被重复执行
//可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
        if (getState() == FINISH || getState() == ERROR) {
            beginNext(poolExecutor, now, remainTime);
            return;
        }

框架的特点

SystemClock

在项目中获取当前时间没有使用System.currentTimeMillis()而是自己写了一个SystemClock

项目中通过单例模式创建了一个时钟类SystemClock

/**
 * 用于解决高并发下System.currentTimeMillis卡顿
 * @author lry
 */
public class SystemClock {

    private final int period;

    private final AtomicLong now;

    /**
     * 单例模式
     */
    private static class InstanceHolder {
        private static final SystemClock INSTANCE = new SystemClock(1);
    }

    private SystemClock(int period) {
        this.period = period;
        this.now = new AtomicLong(System.currentTimeMillis());
        scheduleClockUpdating();
    }

    private static SystemClock instance() {
        return InstanceHolder.INSTANCE;
    }

    /**
     * 守护线程
     * 每1ms写一次now系统当前时间。
     */
    private void scheduleClockUpdating() {
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "System Clock");
            //守护线程,必须在线程启动前设置
            thread.setDaemon(true);
            return thread;
        });
        //定时周期执行任务
        scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
    }

    private long currentTimeMillis() {
        return now.get();
    }

    /**
     * 用来替换原来的System.currentTimeMillis()
     */
    public static long now() {
        return instance().currentTimeMillis();
    }
}

创建了一个守护线程,每1ms对 AtomicLong now进行更新 System.currentTimeMillis(),因为守护线程的执行周期是每1ms执行一次,这里是有1ms的延迟。

ScheduledExecutorServiceExecutorService 的子类,它基于 ExecutorService 功能实现周期执行的任务。

参考

手写中间件之——并发框架_天涯泪小武的博客-CSDN博客

并行编排AsyncTool框架源码解析_涛声依旧叭的博客-CSDN博客

标签:AsyncTool,任务,private,编排,param,线程,WorkerWrapper,执行,public
From: https://blog.csdn.net/qq_73210658/article/details/142287081

相关文章

  • JAVA 多线程基础:JAVA中double 和 long非原子读写问题
    在解释这个问题之前,我们先来回顾下Java中基础数据类型所占的位数。类型长度(位)字节byte41boolean41int324short162long648char162float324double648可以看到对于double以及long两种基本数据类型,所占位数为64位。而JVM却有32bit与64bit两种,也就是说在32bitJVM中不能将doub......
  • 线程安全级别
    线程安全的级别用于描述在多线程环境下,某个对象或类在处理并发访问时的安全性程度。它帮助开发者了解不同数据结构或代码在多线程中使用时,需要什么样的处理措施,以确保数据一致性和避免竞态条件(数据竞争)。 线程安全性可以分为不同的级别,取决于对并发访问的控制和保证数据一致性......
  • 代理ip批量检测工具,采用多线程并发编程,支持http,https,socks4,socks5协议!
     工具使用c++编程语言,采用多线程并发检测技术:支持ipv4及ipv6代理ip批量检测。支持httphttpssocks4及socks5代理服务器的批量检测。支持所有windows版本运行!导入方式支持手工选择文件及拖放文件。导入格式支持三种格式:第一种:用|号分割2409:8a50:8019:e470:a8d7:bdf0:fbfe:8b5......
  • JavaSE——多线程
    一、线程的五种基本状态1.新建状态(New)创建一个线程对象后,该线程对象就处于新建状态。此时它不能运行,仅仅是Java虚拟机为其分配了内存。2.就绪状态(Runnable)当线程对象调用的start()方法后,该线程就进入就绪状态。处于就绪状态的线程位于线程队列中,等待系统的调度以获得CPU的......
  • 多线程六-线程通信之Condition使用与设计猜想
    海上生明月,天涯共此时。愿大家在这个团圆的夜晚,收获满满的温馨和喜悦,团圆美满,中秋快乐!使用示例ConditionDemoAwait:开始之后加锁,阻塞并释放锁packagecom.caozz.demo5.concurrent;importjava.util.concurrent.locks.Condition;importjava.util.concurrent.locks.Lock;......
  • Android HandlerThread Post后延迟7秒才执行的原因及解决方案|如何提高Android后台线程
    在Android开发中,HandlerThread是用于处理后台线程任务的常见工具。然而,有时我们会遇到这样的问题:当任务通过HandlerThread的post方法发送后,任务的执行时间会出现明显的延迟,比如7秒的延迟才执行任务。本文将深入分析这种问题的成因,探讨可能的影响因素,并提供多种优化方案,帮助开发者解......
  • 多线程五-线程通信之wait与notify
    wait与notify用于syncronized的线程间通信的一种,wait用来阻塞线程并释放锁,notify用来唤醒线程。他们与condition作用基本一致,但是由于syncronized为jdk实现,阅读源码有难度,所以通过了解其原理,用来帮助我们后续理解condition的源码。可以通过下面一张图来理解:下面通过一个简单的......
  • [Java并发]守护线程
    守护线程和普通线程的最大区别是守护线程会在主线程结束后退出,但是普通线程在主线程结束后不会退出。普通线程的执行importjava.sql.Time;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;publicclassMain{publicstaticvoid......
  • Python 课程8-多线程编程和多进程编程
    前言        在现代编程中,处理并发任务是提高程序性能的关键之一。Python提供了多线程(threading)和多进程(multiprocessing)两种方式来实现并发编程。多线程适用于I/O密集型任务,而多进程则更适合CPU密集型任务。通过这两种技术,你可以高效地处理大规模数据、加速......
  • 浅谈线程的创建方式
    引言在网上查询这个问题,大多回答是线程的创建方式有四种。继承Thread类实现Runnable接口实现Callable接口使用线程池但是这种说法是错误的,或者说是不正确的不严谨的。我的想法实际上,在Java中创建线程的方式只有一种,就是使用newThread()只有这样才能创建一个线......