首页 > 其他分享 >定时/延时任务-带你看懂ScheduledThreadPoolExecutor的原理(1)

定时/延时任务-带你看懂ScheduledThreadPoolExecutor的原理(1)

时间:2024-12-01 20:28:19浏览次数:6  
标签:lock ScheduledThreadPoolExecutor 获取 任务 线程 延时 定时 null leader

文章目录

1. 概要

ScheduledThreadPoolExecutor 的上一篇文章:定时/延时任务-ScheduledThreadPoolExecutor的使用

上一篇文章中,我们介绍了 ScheduledThreadPoolExecutor 的用法,那么从这篇文章开始就要介绍 ScheduledThreadPoolExecutor 的原理了

2. 构造函数

首先要看的是 ScheduledThreadPoolExecutor 的构造函数

public ScheduledThreadPoolExecutor(int corePoolSize) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
         new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                  ThreadFactory threadFactory) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
         new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                  RejectedExecutionHandler handler) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
         new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
   super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
         new DelayedWorkQueue(), threadFactory, handler);
}

最终这些方法都会调用到线程池的构造函数 ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                    int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         ThreadFactory threadFactory,
                         RejectedExecutionHandler handler) {
   if (corePoolSize < 0 ||
       maximumPoolSize <= 0 ||
       maximumPoolSize < corePoolSize ||
       keepAliveTime < 0)
       throw new IllegalArgumentException();
   if (workQueue == null || threadFactory == null || handler == null)
       throw new NullPointerException();
   this.acc = System.getSecurityManager() == null ?
           null :
           AccessController.getContext();
   this.corePoolSize = corePoolSize;
   this.maximumPoolSize = maximumPoolSize;
   this.workQueue = workQueue;
   this.keepAliveTime = unit.toNanos(keepAliveTime);
   this.threadFactory = threadFactory;
   this.handler = handler;
}

其实不难发现 ScheduledThreadPoolExecutor 给的这几个构造函数最大线程数都是 Integer.MAX_VALUE

3. 延时队列 - DelayedWorkQueue

ScheduledThreadPoolExecutor 的构造函数是 DelayedWorkQueue ,这个优先队列是用队列实现的,下面来逐步解析

static class DelayedWorkQueue extends AbstractQueue<Runnable>
     implements BlockingQueue<Runnable> {

3.1 参数

首先是延时队列的初始长度,默认是 16

/**
 * 延时队列初始容量为 16
 */
private static final int INITIAL_CAPACITY = 16;

然后就是存储任务的数组,初始容量 16

/**
  * 存储任务的数组,初始容量大小就是 16
  */
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

下面就是并发锁 lock Condition,用来进行阻塞唤醒的

/**
 * 并发锁
 */
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();

接着是数组里面的任务个数 size

/**
 * 数组中的任务个数
 */
private int size = 0;

3.2 leader-follower模式

最后一个比较关键的是 leader,这个 leader 里面其实代表了 Leader-Follower 模式 的线程调度,下面就来详细介绍下这个模式。
Leader-Follower 模式 是一种并发编程模式主要用于处理并发任务的调度。在 Leader-Follower 模式中,一组线程(也可以称为工作线程或处理单元)在等待任务时处于“等待”状态,一旦有任务到达,其中一个线程会被选为 “Leader” 来获取该任务并处理,而其他线程则继续保持“Follower”状态。任务处理完成后,原 Leader 线程可能会重新进入 Follower 状态,等待下一个任务的到来。
Leader-Follower 模式下两种线程的职责如下:

  1. Leader 线程: 当前正在等待队列头部任务的线程, 线程会等待下一个任务的延迟时间(即 delay)结束后获取到该任务返回并交出 leader 的身份
  2. Follower 线程: 除了 leader 线程之外的其他线程被称为 follower 线程,follower 线程会无限期地等待,直到它们被选中成为新的 leader 线程

这两种线程的流程在下面会进行源码解析

3.3 数组调整

这部分和前几篇文章中 Timer 的逻辑是一模一样的,例子也是这篇文章复制过来的(注意下面图中节点头上那些是执行时间):定时/延时任务-细说Timer源码

3.3.1 向上调整

假设原来的数组节点是 1、2、3、4、5、6
在这里插入图片描述
原来的小根堆这时候添加了一个执行时间为2的节点到队列尾部,这时候就触发了堆的向上修复
首先获取 7 的父结点 7 >> 1 = 3,然后判断 73 两个节点的执行时间,发现·2 < 10,于是交换两个节点
在这里插入图片描述
这时候 k(修复的下标) 来到了下标 3 的位置,然后再次获取父结点 3 >> 1 = 1,这时候发现下标 1 的时间是 5,比 2 小,于是继续交换下标 1 和 下标 3 的值
在这里插入图片描述
到这里 k = 1,就退出循环,修复完成了,整个过程的执行时间是 O(logn)

3.3.2 向下调整

下面就来看一下向下调整的流程图:
在这里插入图片描述

假设这时候原来的头结点任务被执行了,就会把最后一个执行时间为 10 的任务加到头节点
在这里插入图片描述
接着获取 k=1(要调整的节点) 的子节点并比较两个子节点的执行时间,选出最先执行的那一个 j(子节点)
在这里插入图片描述
交换 kj 的节点
在这里插入图片描述
获取 k 的左右节点,这时候只有一个节点 6,然后判断,但是子节点执行时间都比节点 k 靠后执行,符合优先级队列,修复结束

修复的时间复杂度也是 O(logn)

3.4 添加任务

public void put(Runnable e) {
    offer(e);
}

public boolean add(Runnable e) {
    return offer(e);
}

public boolean offer(Runnable e, long timeout, TimeUnit unit) {
    return offer(e);
}

最终方法都调用到了 offer

/**
* 添加一个任务
 * @param x
 * @return
 */
public boolean offer(Runnable x) {
    // 添加的任务不能为空
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 目前队列里面的任务数量
        int i = size;
        // 如果队列满了,需要扩容
        if (i >= queue.length)
            grow();
        // 容量 +1
        size = i + 1;
        // 如果容量为 0,就是初始添加任务
        if (i == 0) {
            // 设置第一个任务
            queue[0] = e;
            // 设置该任务的索引到任务属性中
            setIndex(e, 0);
        } else {
            // 需要添加到堆队列尾部,顺便进行上浮
            siftUp(i, e);
        }
        // 如果当前添加的元素是队首元素
        if (queue[0] == e) {
            leader = null;
            // 唤醒等待元素的线程
            available.signal();
        }
    } finally {
        // 解锁
        lock.unlock();
    }
    return true;
}

private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
        // 设置堆索引值
        ((ScheduledFutureTask)f).heapIndex = idx;
}

下面就来一句一句解析,首先判断如果添加的任务为空,抛出异常

if (x == null)
       throw new NullPointerException();

接下来把任务转换成 RunnableScheduledFuture,然后加锁

// 加锁
final ReentrantLock lock = this.lock;
lock.lock();

接着获取队列里面的任务数量,如果队列满了,就需要扩容

// 目前队列里面的任务数量
int i = size;
// 如果队列满了,需要扩容
if (i >= queue.length)
    grow();

下面是扩容的流程,需要扩容成队列长度的 1.5 倍,当然还是需要判断下如果 newCapacity < 0,就代表 int 溢出了,这时候把队列长度设置成 Integer.MAX_VALUE

/**
 * 扩容数组,必须要加锁才能调用
 */
private void grow() {
    // 队列长度
    int oldCapacity = queue.length;
    // 扩容到原来的 1.5 倍
    int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
    // 如果 < 0,就代表容量移除了
    if (newCapacity < 0) // overflow
        // 直接赋值最大值
        newCapacity = Integer.MAX_VALUE;
    // 分配数组,直接拷贝
    queue = Arrays.copyOf(queue, newCapacity);
}

扩容之后,设置容量 + 1

// 容量 +1
size = i + 1;

下面判断如果容量为 0,就是第一个任务,直接设置任务到 index = 0 的位置,如果不是,就添加到队列尾部,然后向上调整,在 3.3.1 我们给出了向上调整的流程,下面就从源码角度看下是怎么调整的:

private void siftUp(int k, RunnableScheduledFuture<?> key) {
  while (k > 0) {
        // 父结点,当前下标 i 的父结点就是(i - 1)/ 2
        // 比如 2 的父结点是 0,5 的父结点是 2
        int parent = (k - 1) >>> 1;
        // 获取父结点
        RunnableScheduledFuture<?> e = queue[parent];
        // 如果当前要加入的任务执行时间在父结点之后,不用处理了
        if (key.compareTo(e) >= 0)
            break;
        // 需要处理,处理的逻辑就是把加入的节点和父结点交换
        // 1.父结点设置到 k 的位置
        queue[k] = e;
        // 2.设置任务的下标
        setIndex(e, k);
        // 3.继续向上遍历
        k = parent;
    }
    // 到这里就是加入的任务应该要加入的下标
    queue[k] = key;
    // 设置任务下标
    setIndex(key, k);
}

k 在这里就是数组尾部

  1. 获取当前下标的父结点,因为下标是从 0 开始的,所以 需要 - 1 再 / 2,比如 2 的父结点是 0,5 的父结点是 2
  2. 下面使用 compareTo 进行比较,compareTo 里面会先根据过期时间对比,如果过期时间相同,再根据 sequenceNumber,也就是序号大小对比,序号小的排前面
  3. 如果当前要加入的任务过期时间或者序号在父结点后面,就不需要调整了
  4. 否则把父结点赋值到 k 的位置,然后 k 继续向上遍历
  5. 最终退出的时候下标 k 就是要设置的任务下标
  6. setIndex 其实就是设置堆索引
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
      if (f instanceof ScheduledFutureTask)
          // 设置堆索引值
          ((ScheduledFutureTask)f).heapIndex = idx;
  }
/**
* 任务之间的比较
* @param other
* @return
*/
public int compareTo(Delayed other) {
   if (other == this) // compare zero if same object
       return 0;
   if (other instanceof ScheduledFutureTask) {
       // 比较两个任务的执行时间,执行时间段的放前面
       // 如果执行时间都一样,就判断下序号,序号小的先执行
       ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
       long diff = time - x.time;
       if (diff < 0)
           return -1;
       else if (diff > 0)
           return 1;
       else if (sequenceNumber < x.sequenceNumber)
           return -1;
       else
           return 1;
   }
   long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
   return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

最后如果加入的元素是队首元素,就唤醒等待元素

// 如果当前添加的元素是队首元素
if (queue[0] == e) {
    leader = null;
    // 唤醒等待元素的线程
    available.signal();
}

available.signal() 会唤醒等待的线程,去处理,唤醒之前会把 leader 设置为 null为什么要这么做呢? 我们得去获取任务的源码里面找原因,这个下面会解析。
最后解除锁,返回 true 表示添加任务成功

3.5 从队首获取任务

/**
 * 从队首获取元素
 * @return
 */
public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        RunnableScheduledFuture<?> first = queue[0];
        // 如果第一个任务为空或者第一个任务还没有过期
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            // 否则获取第一个任务
            return finishPoll(first);
    } finally {
        lock.unlock();
    }
}

从队首获取任务,如果任务为空或者任务没有过期,就返回 null,否则调用 finishPoll 获取第一个任务

 /**
* Performs common bookkeeping for poll and take: Replaces
 * first element with last and sifts it down.  Call only when
 * holding lock.
 * 获取队首元素,并且从队首往下进行修复
 * @param f the task to remove and return
 */
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 任务个数 - 1
    int s = --size;
    // 获取最后一个任务
    RunnableScheduledFuture<?> x = queue[s];
    // 设置为 null
    queue[s] = null;
    // 如果堆中还有任务
    if (s != 0)
        // 从 0 往下修复
        siftDown(0, x);
    // 设置获取出来的这个任务索引为 -1
    setIndex(f, -1);
    return f;
}

首先把任务数 -1,然后获取最后一个任务,把最后一个任务下标设置为空,接着如果堆中还有任务,就向下修复,最后把获取到的任务下标设置为 -1,向下修复的流程在 3.3.2 已经图解了,下面是源码

/**
* 从 k 往下修复延时队列,必须在加锁之后调用这个方法
 * @param k
 * @param key
 */
private void siftDown(int k, RunnableScheduledFuture<?> key) {
    // 向下修复,父结点的范围
    int half = size >>> 1;
    while (k < half) {
        // 左子节点
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        // 右子节点
        int right = child + 1;
        // 选出左子节点和右子节点中最先执行的
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        // 如果要加入的任务比最小的任务都要先执行,直接跳出,当前下标 k 就是要加入的位置
        if (key.compareTo(c) <= 0)
            break;
        // 否则交换父结点和子节点
        queue[k] = c;
        setIndex(c, k);
        // 从子节点继续向子节点的子节点遍历
        k = child;
    }
    // 到这里就是找到了加入的任务应该插入的下标
    queue[k] = key;
    setIndex(key, k);
}

k 是要修复的下标

  1. 首先获取这个下标的左子节点,然后再获取右子节点
  2. 判断这两个节点的优先级,也就是获取最先执行的或者序号最小的
  3. 如果 k 节点比这两个子节点都要先执行,直接退出循环,不用调整了
  4. 把更小的子节点赋值到父结点
  5. 继续向下调整

流程很简单,时间复杂度也是 Olog(n)

3.6 获取等待执行的任务

/**
* 获取待执行的任务
 * @return
 * @throws InterruptedException
 */
public RunnableScheduledFuture<?> take() throws InterruptedException {
    // 加锁,可中断锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 获取队首元素
            RunnableScheduledFuture<?> first = queue[0];
            // 如果队首元素为空,就阻塞等待任务入队
            if (first == null)
                available.await();
            else {
                // 获取第一个任务的延时时间
                long delay = first.getDelay(NANOSECONDS);
                // 如果小于 0,就代表队首任务到期了,取出来
                if (delay <= 0)
                    return finishPoll(first);
                // 到这里,说明队首任务还没有过期,这时候把引用设置为空
                first = null;
                // 如果当前线程不是 leader 线程,那么当前线程阻塞等待
                if (leader != null)
                    available.await();
                else {
                    // 设置 leader 线程为当前线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 设置等待的时间就是队首任务的剩余到期时间
                        // 如果等待时间到了,leader 线程就会从被阻塞的状态中唤醒并返回队首的任务
                        available.awaitNanos(delay);
                    } finally {
                        // 唤醒之后把 leader 重新置空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 到这里就是 leader 线程等待时间到了被唤醒,同时队列里面有任务了
        if (leader == null && queue[0] != null)
            // 唤醒其他的等待获取任务的线程
            available.signal();
        // 解锁
        lock.unlock();
    }
}

还是一句一句分析,首先获取队首元素,如果队首元素为空,就代表任务队列没有任务,阻塞等待

// 获取队首元素
RunnableScheduledFuture<?> first = queue[0];
// 如果队首元素为空,就阻塞等待任务入队
if (first == null)
    available.await();
else {
	...
}

如果队首元素不为空,就判断下是不是过期了,如果过期就从队首取出元素,这个 finishPoll 在上面 3.5 解析过了

// 获取第一个任务的延时时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于 0,就代表队首任务到期了,取出来
if (delay <= 0)
    return finishPoll(first);

如果队首元素没有过期,那么判断下当前线程是不是 leader 线程 ,因为这里是 leader-follower 模式,所以如果不是 leader 线程,就无限阻塞:

// 如果当前线程不是 leader 线程,那么当前线程阻塞等待
if (leader != null)
     available.await();
 else {
 	....
 }

如果是 leader 线程,就回去阻塞 delay 时间,delay 就是第一个任务距离现在还有多久过期

// 设置 leader 线程为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
    // 设置等待的时间就是队首任务的剩余到期时间
    // 如果等待时间到了,leader 线程就会从被阻塞的状态中唤醒并返回队首的任务
    available.awaitNanos(delay);
} finally {
    // 唤醒之后把 leader 重新置空
    if (leader == thisThread)
        leader = null;
}

唤醒之后把 leader 置空,置空之后重新 for 循环,这时候 leader 线程 就能拿到任务去执行了。
注意:take() 其实就是前几篇文章中解析的线程池工作原理中的 Worker 线程(核心线程)的阻塞获取任务的方法,建议先去看下前置的几篇线程池原理文章带你从源码级别看懂 JDK 线程池原理(1)
到这里我们就可以回答3.4 提出的问题:

available.signal() 会唤醒等待的线程,去处理,唤醒之前会把 leader 设置为 null,**为什么要这么做呢?

因为添加了一个更早的任务之后唤醒的线程不一定是 leader 线程,所以如果不是 leader 线程获取到任务再去执行,这样就不符合 leader-follower 的模式,所以唤醒前要把 leader 置空

3.7 获取等待执行的任务(带超时时间)

上面这个方法是不带超时时间的,是核心线程的获取方法,那这个就是非核心线程的获取方法,获取任务,如果在一定时间内获取不到,就返回 null,回收非核心线程

/**
* 从任务队列中获取任务
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return
 * @throws InterruptedException
 */
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    // 阻塞等待的时间
    long nanos = unit.toNanos(timeout);
    // 加锁,可中断
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 获取队首任务
            RunnableScheduledFuture<?> first = queue[0];
            // 如果队首任务为空,就表示队列当前没有任务
            if (first == null) {
                // 如果不需要阻塞等待,直接返回 null
                if (nanos <= 0)
                    return null;
                else
                    // 否则等待 nanos 时间
                    nanos = available.awaitNanos(nanos);
            } else {
                // 如果队首元素不为空,首先获取下队首任务的过期时间
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    // 如果过期时间 <= 0,证明当前任务到期了,可以返回任务给线程执行了
                    return finishPoll(first);
                // 这里就是任务没到期,但是队列也不等待
                if (nanos <= 0)
                    // 直接返回 null
                    return null;
                // 到这里就是任务没到期执行不了,同时线程也需要等待
                // 把 first 引用设置为空
                first = null;
                // nanos:线程需要等待的时间
                // delay:第一个任务还有多久才能执行
                if (nanos < delay || leader != null)
                    // nanos < delay 表示线程等待了 nanos 也到不了任务的执行时间,直接阻塞
                    nanos = available.awaitNanos(nanos);
                else {
                    // 这里就表示线程等待 nanos 能等到第一个任务的执行时间点
                    Thread thisThread = Thread.currentThread();
                    // 把 leader 线程设置为当前线程
                    leader = thisThread;
                    try {
                        // 然后等待 delay 时间,timeLeft 是该线程实际等待的时间
                        // 如果等待过程中调用了 available.signal() 这些方法也会被唤醒
                        long timeLeft = available.awaitNanos(delay);
                        // 计算 nanos 还剩下多少等待时间
                        nanos -= delay - timeLeft;
                    } finally {
                        // 把 leader 线程置空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 把 leader 线程置空,leader 线程执行完任务之后需要让出这个属性给其他 flower 线程来竞争成为 leader 线程
        if (leader == null && queue[0] != null)
            available.signal();
        // 解锁
        lock.unlock();
    }
}

还是来慢慢解析,首先加锁

// 阻塞等待的时间
long nanos = unit.toNanos(timeout);
// 加锁,可中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();

接着获取队首任务,如果任务为空,就代表没有任务,这时候看看要不要阻塞,如果不需要就直接返回空,需要就等待 nacos 时间

// 获取队首任务
RunnableScheduledFuture<?> first = queue[0];
 // 如果队首任务为空,就表示队列当前没有任务
 if (first == null) {
     // 如果不需要阻塞等待,直接返回 null
     if (nanos <= 0)
         return null;
     else
         // 否则等待 nanos 时间
         nanos = available.awaitNanos(nanos);

如果队首任务不为空,就代表有任务,下面就要去看下有没有过期,如果过期了,直接返回队首任务,如果没过期并且不需要等待,就返回空

// 如果队首元素不为空,首先获取下队首任务的过期时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
    // 如果过期时间 <= 0,证明当前任务到期了,可以返回任务给线程执行了
    return finishPoll(first);
// 这里就是任务没到期,但是队列也不等待
if (nanos <= 0)
    // 直接返回 null
    return null;

下面就是任务没有到期执行不了,同时也需要等待 delay 时间,首先还是把 first 设置为空,然后判断下如果当前线程是 leader 线程,并且阻塞的时间 nacos 比下一个任务执行的时间要短,就阻塞等待 nanos

// nanos:线程需要等待的时间
 // delay:第一个任务还有多久才能执行
 if (nanos < delay || leader != null)
     // nanos < delay 表示线程等待了 nanos 也到不了任务的执行时间,直接阻塞
     nanos = available.awaitNanos(nanos);

否则就阻塞 delay 时间,并且阻塞完之后还有剩余 nacos - delay 时间可以继续获取任务,当然能进入下面的流程肯定是 leader 线程为空的情况下,也就是没有 leader 线程,这时候当前线程会成为 leader 线程来阻塞获取任务执行

// 这里就表示线程等待 nanos 能等到第一个任务的执行时间点
 Thread thisThread = Thread.currentThread();
 // 把 leader 线程设置为当前线程
 leader = thisThread;
 try {
     // 然后等待 delay 时间,timeLeft 是该线程实际等待的时间
     // 如果等待过程中调用了 available.signal() 这些方法也会被唤醒
     long timeLeft = available.awaitNanos(delay);
     // 计算 nanos 还剩下多少等待时间
     nanos -= delay - timeLeft;
 } finally {
     // 把 leader 线程置空
     if (leader == thisThread)
         leader = null;
 }

从上面也可以看到,在 finally 里面会去把 leader 置空,让出 leader 给其他线程,在新一轮循环里面 leader 就会去获取到任务返回,返回之后在最外层的 finally 里面会唤醒一个 follower 线程去成为 leader 继续阻塞获取

// 把 leader 线程置空,leader 线程执行完任务之后需要让出这个属性给其他 flower 线程来竞争成为 leader 线程
if (leader == null && queue[0] != null)
    available.signal();
// 解锁
lock.unlock();

这时候因为已经设置 leader 为空了,所以其他线程可以成为 leader,而当前线程拿着任务返回去执行任务,所以这里的核心理念就是:执行任务的一定会是前 leader 线程

3.8 清空任务队列

/**
 * 清空任务队列
 */
public void clear() {
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 遍历清除任务队列
        for (int i = 0; i < size; i++) {
            RunnableScheduledFuture<?> t = queue[i];
            if (t != null) {
                // 清除队列和任务的下标
                queue[i] = null;
                setIndex(t, -1);
            }
        }
        size = 0;
    } finally {
        lock.unlock();
    }
}

这个方法逻辑就比较简单了,就是遍历所有的任务,清空下标为 null,同时设置对应任务的 heapIndex = -1,表示不在队列里面了

3.9 获取队列的第一个过期任务

/**
 * 检查队列中的第一个任务有没有过期
 * @return
 */
private RunnableScheduledFuture<?> peekExpired() {
    // 下面其实就是判断当前线程是不是持有锁
    // assert lock.isHeldByCurrentThread();
    RunnableScheduledFuture<?> first = queue[0];
    // 如果没有任务或者第一个任务还没到期,就返回空
    // 否则返回第一个任务
    return (first == null || first.getDelay(NANOSECONDS) > 0) ?
        null : first;
}

这里可以用来检查队首任务有没有过期,如果没有过期会返回 null,如果过期了会返回具体的任务

3.10 获取过期任务

 /**
* 将所有已经过期的任务从当前队列里面移除
 * @param c 移除调的任务会存到这个集合里面
 * @return
 */
public int drainTo(Collection<? super Runnable> c) {
    // 参数校验
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    // 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        RunnableScheduledFuture<?> first;
        int n = 0;
        while ((first = peekExpired()) != null) {
            // 把过期的任务添加到集合里面
            c.add(first);
            // 从队列里面获取完任务之后需要向下修复
            finishPoll(first);
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}

上面的方法是获取所有过期的任务,同时可以看到是通过 finishPoll 来获取的,获取完之后还得向下修复,当然这里是获取所有的过期任务,还有一个方法是获取最多 maxElements 个任务

/**
* 从任务队列中获取最多 maxElements 个过期的任务,放到集合 c 中
 * @param c
 * @param maxElements
 * @return
 */
public int drainTo(Collection<? super Runnable> c, int maxElements) {
    // 参数校验
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        RunnableScheduledFuture<?> first;
        int n = 0;
        // 最多加入 maxElements 个任务
        while (n < maxElements && (first = peekExpired()) != null) {
            // 把过期的任务添加到集合里面
            c.add(first);
            // 从队列里面获取完任务之后需要向下修复
            finishPoll(first);
            ++n;
        }
        return n;
    } finally {
        lock.unlock();
    }
}

4. 小结

ScheduledThreadPoolExecutor 的队列终于是解析完了,下一篇文章解析最后的一部分,也就是任务类和其他的方法,毕竟大头都在这篇文章中了,包括里面的一些思想啥的




如有错误,欢迎指出!!!

标签:lock,ScheduledThreadPoolExecutor,获取,任务,线程,延时,定时,null,leader
From: https://blog.csdn.net/laohuangaa/article/details/144162607

相关文章

  • cron: 如何使用Cron表达式配置定时任务
    Cron表达式用于设置定时任务,无论是在Linux的Crontab中,还是在各种语言开发的程序中都有应用,它提供了一种强大而灵活的方法来设定定时任务。Cron表达式语法Cron表达式是一种字符串格式,标准的Cron表达式是由五部分组成,分别表示,分钟、小时、日期、月份和星期几。这个时候,就有小伙......
  • 【Go底层】time包Ticker定时器原理
    目录1、背景2、go版本3、源码解释【1】Ticker结构【2】NewTicker函数解释4、代码示例5、总结1、背景说到定时器我们一般想到的库是cron,但是对于一些简单的定时任务场景,标准库time包下提供的定时器就足够我们使用,本篇文章我们就来研究一下time包下的Ticker定时器。2......
  • Qt - QTimer(定时器)
    基本使用方式:多次定时器QTimer*timer=newQTimer(this);//timer->setInterval(1000);//设置间隔时间connect(timer,SIGNAL(timeout()),this,SLOT(update()));timer->start(1000);//start之后,设置间隔时间并启动定时器,每隔一秒触发一次槽函数 单次定时器注意:可......
  • 力控组态实现延时5s延时触发,命令间隔200ms
    最近做一个阀门开度随液位变化的程序,液位设定一个目标值,液位高于目标值,阀门开度减小;液位低于目标值,阀门开度增加。很明显,该程序适合用PID控制,在大循环中计算阀门开度值,并下置给阀门,结果,设备经常离线原因:经分析,大循环中运行,数据下置太快,设备反应不过来,导致通讯超时,或者是撞......
  • 杰理-timer硬件定时器配置
    目录杰理定时器demo: 个人测试验证:杰理的定时器有硬件定时器和软件定时器软件定时器是基于systime线程提供的时基,可以参考下面这位博主的文章:【杰理AC696X】软件定时器介绍-CSDN博客今天主要说的是硬件定时器的配置。杰理定时器demo:timer模式demo//定时器voidtimer......
  • AutoHotkey (AHK) 是一款开源的自动化脚本语言,AutoHotkey(AHK)具备广泛的应用场景,适用于
    AutoHotkey(AHK)是一款开源的自动化脚本语言,主要用于Windows平台上的桌面应用程序自动化、键盘鼠标操作模拟、热键设置、窗口管理等任务。它的简单性和强大的灵活性使得AHK成为许多用户进行日常自动化和重复性任务的首选工具。1. AutoHotkey是什么?AutoHotkey是一种脚本......
  • golang 定时器的不同任务
    应用场景电池船数据上报频次:航行中1次/30秒,不航行1次/1小时电池簇数据上报频次:工作中1次/1秒,不工作不上报   main.gopackagemainimport( "fmt" "os" "os/signal" "syscall" "ticker/util" "time")varticker1*util.DynamicTi......
  • 涂鸦革新WebRTC技术!让IPC监测低延时、高可靠更安全
            随着科技的飞速发展,越来越多人开始关注居家安全、食品安全、校园安全等领域,大家对实时监测的需求也在不断升级。想象一下,无论身处何地,只需轻触屏幕,就能实时查看家中、办公室或任何你关心的地方,这不再是科幻小说中的场景,因为通过WebRTC技术在IPC监测领域的......
  • 定时器-初级程序-极语言教程
    //窗体代码:整数窗体,小时,分钟,标签3,标签4,计时;程序资源24,"清单.xml";程序段加载窗体整数左=(桌面.宽-350)>>1,上=(桌面.高-300)>>1;窗体=创建窗口($200,程序.名称,"定时器",$10CF0064,左,上,350,300,0,0,0,0);小时=创建窗口($200,"Edit","10",$50010000,70,65,45,......
  • 定时音乐模块-初级程序-极语言教程
    //窗体代码:整数窗体,小时,分钟,标签3,标签4,计时;程序资源24,"清单.xml";程序段加载窗体整数左=(桌面.宽-350)>>1,上=(桌面.高-300)>>1;窗体=创建窗口($200,程序.名称,"定时器",$10CF0064,左,上,350,300,0,0,0,0);小时=创建窗口($200,"Edit","10",$50010000,70,65,45,......