文章目录
1. 概要
ScheduledThreadPoolExecutor 的上一篇文章:定时/延时任务-ScheduledThreadPoolExecutor的使用
上一篇文章中,我们介绍了 ScheduledThreadPoolExecutor 的用法,那么从这篇文章开始就要介绍 ScheduledThreadPoolExecutor 的原理了
2. 构造函数
首先要看的是 ScheduledThreadPoolExecutor
的构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
最终这些方法都会调用到线程池的构造函数 ThreadPoolExecutor
:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其实不难发现 ScheduledThreadPoolExecutor
给的这几个构造函数最大线程数都是 Integer.MAX_VALUE
3. 延时队列 - DelayedWorkQueue
ScheduledThreadPoolExecutor 的构造函数是 DelayedWorkQueue
,这个优先队列是用队列实现的,下面来逐步解析
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
3.1 参数
首先是延时队列的初始长度,默认是 16
/**
* 延时队列初始容量为 16
*/
private static final int INITIAL_CAPACITY = 16;
然后就是存储任务的数组,初始容量 16
/**
* 存储任务的数组,初始容量大小就是 16
*/
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
下面就是并发锁 lock
和 Condition
,用来进行阻塞唤醒的
/**
* 并发锁
*/
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
接着是数组里面的任务个数 size
/**
* 数组中的任务个数
*/
private int size = 0;
3.2 leader-follower模式
最后一个比较关键的是 leader
,这个 leader 里面其实代表了 Leader-Follower 模式
的线程调度,下面就来详细介绍下这个模式。
Leader-Follower 模式
是一种并发编程模式主要用于处理并发任务的调度。在 Leader-Follower 模式中,一组线程(也可以称为工作线程或处理单元)在等待任务时处于“等待”状态,一旦有任务到达,其中一个线程会被选为 “Leader”
来获取该任务并处理,而其他线程则继续保持“Follower”状态。任务处理完成后,原 Leader 线程可能会重新进入 Follower 状态,等待下一个任务的到来。
Leader-Follower 模式
下两种线程的职责如下:
- Leader 线程: 当前正在等待队列头部任务的线程, 线程会等待下一个任务的延迟时间(即 delay)结束后获取到该任务返回并交出 leader 的身份
- Follower 线程: 除了 leader 线程之外的其他线程被称为 follower 线程,follower 线程会无限期地等待,直到它们被选中成为新的 leader 线程
这两种线程的流程在下面会进行源码解析
3.3 数组调整
这部分和前几篇文章中 Timer
的逻辑是一模一样的,例子也是这篇文章复制过来的(注意下面图中节点头上那些是执行时间):定时/延时任务-细说Timer源码
3.3.1 向上调整
假设原来的数组节点是 1、2、3、4、5、6
原来的小根堆这时候添加了一个执行时间为2
的节点到队列尾部,这时候就触发了堆的向上修复
首先获取 7
的父结点 7 >> 1 = 3
,然后判断 7
和 3
两个节点的执行时间,发现·2 < 10
,于是交换两个节点
这时候 k(修复的下标)
来到了下标 3
的位置,然后再次获取父结点 3 >> 1 = 1
,这时候发现下标 1
的时间是 5
,比 2
小,于是继续交换下标 1
和 下标 3
的值
到这里 k = 1,就退出循环,修复完成了,整个过程的执行时间是 O(logn)
3.3.2 向下调整
下面就来看一下向下调整的流程图:
假设这时候原来的头结点任务被执行了,就会把最后一个执行时间为 10
的任务加到头节点
接着获取 k=1(要调整的节点)
的子节点并比较两个子节点的执行时间,选出最先执行的那一个 j(子节点)
交换 k
和 j
的节点
获取 k
的左右节点,这时候只有一个节点 6
,然后判断,但是子节点执行时间都比节点 k
靠后执行,符合优先级队列,修复结束
修复的时间复杂度也是 O(logn)
3.4 添加任务
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
最终方法都调用到了 offer
/**
* 添加一个任务
* @param x
* @return
*/
public boolean offer(Runnable x) {
// 添加的任务不能为空
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 目前队列里面的任务数量
int i = size;
// 如果队列满了,需要扩容
if (i >= queue.length)
grow();
// 容量 +1
size = i + 1;
// 如果容量为 0,就是初始添加任务
if (i == 0) {
// 设置第一个任务
queue[0] = e;
// 设置该任务的索引到任务属性中
setIndex(e, 0);
} else {
// 需要添加到堆队列尾部,顺便进行上浮
siftUp(i, e);
}
// 如果当前添加的元素是队首元素
if (queue[0] == e) {
leader = null;
// 唤醒等待元素的线程
available.signal();
}
} finally {
// 解锁
lock.unlock();
}
return true;
}
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
// 设置堆索引值
((ScheduledFutureTask)f).heapIndex = idx;
}
下面就来一句一句解析,首先判断如果添加的任务为空,抛出异常
if (x == null)
throw new NullPointerException();
接下来把任务转换成 RunnableScheduledFuture
,然后加锁
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
接着获取队列里面的任务数量,如果队列满了,就需要扩容
// 目前队列里面的任务数量
int i = size;
// 如果队列满了,需要扩容
if (i >= queue.length)
grow();
下面是扩容的流程,需要扩容成队列长度的 1.5 倍,当然还是需要判断下如果 newCapacity < 0
,就代表 int 溢出了
,这时候把队列长度设置成 Integer.MAX_VALUE
/**
* 扩容数组,必须要加锁才能调用
*/
private void grow() {
// 队列长度
int oldCapacity = queue.length;
// 扩容到原来的 1.5 倍
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
// 如果 < 0,就代表容量移除了
if (newCapacity < 0) // overflow
// 直接赋值最大值
newCapacity = Integer.MAX_VALUE;
// 分配数组,直接拷贝
queue = Arrays.copyOf(queue, newCapacity);
}
扩容之后,设置容量 + 1
// 容量 +1
size = i + 1;
下面判断如果容量为 0,就是第一个任务,直接设置任务到 index = 0 的位置,如果不是,就添加到队列尾部,然后向上调整,在 3.3.1 我们给出了向上调整的流程,下面就从源码角度看下是怎么调整的:
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 父结点,当前下标 i 的父结点就是(i - 1)/ 2
// 比如 2 的父结点是 0,5 的父结点是 2
int parent = (k - 1) >>> 1;
// 获取父结点
RunnableScheduledFuture<?> e = queue[parent];
// 如果当前要加入的任务执行时间在父结点之后,不用处理了
if (key.compareTo(e) >= 0)
break;
// 需要处理,处理的逻辑就是把加入的节点和父结点交换
// 1.父结点设置到 k 的位置
queue[k] = e;
// 2.设置任务的下标
setIndex(e, k);
// 3.继续向上遍历
k = parent;
}
// 到这里就是加入的任务应该要加入的下标
queue[k] = key;
// 设置任务下标
setIndex(key, k);
}
k 在这里就是数组尾部
- 获取当前下标的父结点,因为下标是从 0 开始的,所以
需要 - 1 再 / 2
,比如 2 的父结点是 0,5 的父结点是 2 - 下面使用
compareTo
进行比较,compareTo 里面会先根据过期时间对比,如果过期时间相同,再根据sequenceNumber
,也就是序号大小对比,序号小的排前面 - 如果当前要加入的任务过期时间或者序号在父结点后面,就不需要调整了
- 否则把父结点赋值到 k 的位置,然后 k 继续向上遍历
- 最终退出的时候下标 k 就是要设置的任务下标
- setIndex 其实就是设置堆索引
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
// 设置堆索引值
((ScheduledFutureTask)f).heapIndex = idx;
}
/**
* 任务之间的比较
* @param other
* @return
*/
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
// 比较两个任务的执行时间,执行时间段的放前面
// 如果执行时间都一样,就判断下序号,序号小的先执行
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
最后如果加入的元素是队首元素,就唤醒等待元素
// 如果当前添加的元素是队首元素
if (queue[0] == e) {
leader = null;
// 唤醒等待元素的线程
available.signal();
}
available.signal()
会唤醒等待的线程,去处理,唤醒之前会把 leader 设置为 null
,为什么要这么做呢? 我们得去获取任务的源码里面找原因,这个下面会解析。
最后解除锁,返回 true 表示添加任务成功
3.5 从队首获取任务
/**
* 从队首获取元素
* @return
*/
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
// 如果第一个任务为空或者第一个任务还没有过期
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 否则获取第一个任务
return finishPoll(first);
} finally {
lock.unlock();
}
}
从队首获取任务,如果任务为空或者任务没有过期,就返回 null,否则调用 finishPoll
获取第一个任务
/**
* Performs common bookkeeping for poll and take: Replaces
* first element with last and sifts it down. Call only when
* holding lock.
* 获取队首元素,并且从队首往下进行修复
* @param f the task to remove and return
*/
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 任务个数 - 1
int s = --size;
// 获取最后一个任务
RunnableScheduledFuture<?> x = queue[s];
// 设置为 null
queue[s] = null;
// 如果堆中还有任务
if (s != 0)
// 从 0 往下修复
siftDown(0, x);
// 设置获取出来的这个任务索引为 -1
setIndex(f, -1);
return f;
}
首先把任务数 -1,然后获取最后一个任务,把最后一个任务下标设置为空,接着如果堆中还有任务,就向下修复,最后把获取到的任务下标设置为 -1,向下修复的流程在 3.3.2 已经图解了,下面是源码
/**
* 从 k 往下修复延时队列,必须在加锁之后调用这个方法
* @param k
* @param key
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// 向下修复,父结点的范围
int half = size >>> 1;
while (k < half) {
// 左子节点
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
// 右子节点
int right = child + 1;
// 选出左子节点和右子节点中最先执行的
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// 如果要加入的任务比最小的任务都要先执行,直接跳出,当前下标 k 就是要加入的位置
if (key.compareTo(c) <= 0)
break;
// 否则交换父结点和子节点
queue[k] = c;
setIndex(c, k);
// 从子节点继续向子节点的子节点遍历
k = child;
}
// 到这里就是找到了加入的任务应该插入的下标
queue[k] = key;
setIndex(key, k);
}
k 是要修复的下标
- 首先获取这个下标的左子节点,然后再获取右子节点
- 判断这两个节点的优先级,也就是获取最先执行的或者序号最小的
- 如果 k 节点比这两个子节点都要先执行,直接退出循环,不用调整了
- 把更小的子节点赋值到父结点
- 继续向下调整
流程很简单,时间复杂度也是 Olog(n)
3.6 获取等待执行的任务
/**
* 获取待执行的任务
* @return
* @throws InterruptedException
*/
public RunnableScheduledFuture<?> take() throws InterruptedException {
// 加锁,可中断锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队首元素
RunnableScheduledFuture<?> first = queue[0];
// 如果队首元素为空,就阻塞等待任务入队
if (first == null)
available.await();
else {
// 获取第一个任务的延时时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于 0,就代表队首任务到期了,取出来
if (delay <= 0)
return finishPoll(first);
// 到这里,说明队首任务还没有过期,这时候把引用设置为空
first = null;
// 如果当前线程不是 leader 线程,那么当前线程阻塞等待
if (leader != null)
available.await();
else {
// 设置 leader 线程为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 设置等待的时间就是队首任务的剩余到期时间
// 如果等待时间到了,leader 线程就会从被阻塞的状态中唤醒并返回队首的任务
available.awaitNanos(delay);
} finally {
// 唤醒之后把 leader 重新置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 到这里就是 leader 线程等待时间到了被唤醒,同时队列里面有任务了
if (leader == null && queue[0] != null)
// 唤醒其他的等待获取任务的线程
available.signal();
// 解锁
lock.unlock();
}
}
还是一句一句分析,首先获取队首元素,如果队首元素为空,就代表任务队列没有任务,阻塞等待
// 获取队首元素
RunnableScheduledFuture<?> first = queue[0];
// 如果队首元素为空,就阻塞等待任务入队
if (first == null)
available.await();
else {
...
}
如果队首元素不为空,就判断下是不是过期了,如果过期就从队首取出元素,这个 finishPoll
在上面 3.5 解析过了
// 获取第一个任务的延时时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于 0,就代表队首任务到期了,取出来
if (delay <= 0)
return finishPoll(first);
如果队首元素没有过期,那么判断下当前线程是不是 leader 线程
,因为这里是 leader-follower
模式,所以如果不是 leader
线程,就无限阻塞:
// 如果当前线程不是 leader 线程,那么当前线程阻塞等待
if (leader != null)
available.await();
else {
....
}
如果是 leader 线程
,就回去阻塞 delay
时间,delay
就是第一个任务距离现在还有多久过期
// 设置 leader 线程为当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 设置等待的时间就是队首任务的剩余到期时间
// 如果等待时间到了,leader 线程就会从被阻塞的状态中唤醒并返回队首的任务
available.awaitNanos(delay);
} finally {
// 唤醒之后把 leader 重新置空
if (leader == thisThread)
leader = null;
}
唤醒之后把 leader
置空,置空之后重新 for 循环
,这时候 leader 线程
就能拿到任务去执行了。
注意:take()
其实就是前几篇文章中解析的线程池工作原理中的 Worker 线程(核心线程)
的阻塞获取任务的方法,建议先去看下前置的几篇线程池原理文章:带你从源码级别看懂 JDK 线程池原理(1)
到这里我们就可以回答3.4
提出的问题:
available.signal()
会唤醒等待的线程,去处理,唤醒之前会把leader 设置为 null
,**为什么要这么做呢?
因为添加了一个更早的任务之后唤醒的线程不一定是 leader
线程,所以如果不是 leader
线程获取到任务再去执行,这样就不符合 leader-follower
的模式,所以唤醒前要把 leader
置空
3.7 获取等待执行的任务(带超时时间)
上面这个方法是不带超时时间的,是核心线程的获取方法,那这个就是非核心线程的获取方法,获取任务,如果在一定时间内获取不到,就返回 null,回收非核心线程
/**
* 从任务队列中获取任务
* @param timeout 超时时间
* @param unit 时间单位
* @return
* @throws InterruptedException
*/
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
// 阻塞等待的时间
long nanos = unit.toNanos(timeout);
// 加锁,可中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队首任务
RunnableScheduledFuture<?> first = queue[0];
// 如果队首任务为空,就表示队列当前没有任务
if (first == null) {
// 如果不需要阻塞等待,直接返回 null
if (nanos <= 0)
return null;
else
// 否则等待 nanos 时间
nanos = available.awaitNanos(nanos);
} else {
// 如果队首元素不为空,首先获取下队首任务的过期时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 如果过期时间 <= 0,证明当前任务到期了,可以返回任务给线程执行了
return finishPoll(first);
// 这里就是任务没到期,但是队列也不等待
if (nanos <= 0)
// 直接返回 null
return null;
// 到这里就是任务没到期执行不了,同时线程也需要等待
// 把 first 引用设置为空
first = null;
// nanos:线程需要等待的时间
// delay:第一个任务还有多久才能执行
if (nanos < delay || leader != null)
// nanos < delay 表示线程等待了 nanos 也到不了任务的执行时间,直接阻塞
nanos = available.awaitNanos(nanos);
else {
// 这里就表示线程等待 nanos 能等到第一个任务的执行时间点
Thread thisThread = Thread.currentThread();
// 把 leader 线程设置为当前线程
leader = thisThread;
try {
// 然后等待 delay 时间,timeLeft 是该线程实际等待的时间
// 如果等待过程中调用了 available.signal() 这些方法也会被唤醒
long timeLeft = available.awaitNanos(delay);
// 计算 nanos 还剩下多少等待时间
nanos -= delay - timeLeft;
} finally {
// 把 leader 线程置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 把 leader 线程置空,leader 线程执行完任务之后需要让出这个属性给其他 flower 线程来竞争成为 leader 线程
if (leader == null && queue[0] != null)
available.signal();
// 解锁
lock.unlock();
}
}
还是来慢慢解析,首先加锁
// 阻塞等待的时间
long nanos = unit.toNanos(timeout);
// 加锁,可中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
接着获取队首任务,如果任务为空,就代表没有任务,这时候看看要不要阻塞,如果不需要就直接返回空,需要就等待 nacos 时间
// 获取队首任务
RunnableScheduledFuture<?> first = queue[0];
// 如果队首任务为空,就表示队列当前没有任务
if (first == null) {
// 如果不需要阻塞等待,直接返回 null
if (nanos <= 0)
return null;
else
// 否则等待 nanos 时间
nanos = available.awaitNanos(nanos);
如果队首任务不为空,就代表有任务,下面就要去看下有没有过期,如果过期了,直接返回队首任务,如果没过期并且不需要等待,就返回空
// 如果队首元素不为空,首先获取下队首任务的过期时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 如果过期时间 <= 0,证明当前任务到期了,可以返回任务给线程执行了
return finishPoll(first);
// 这里就是任务没到期,但是队列也不等待
if (nanos <= 0)
// 直接返回 null
return null;
下面就是任务没有到期执行不了,同时也需要等待 delay
时间,首先还是把 first 设置为空,然后判断下如果当前线程是 leader
线程,并且阻塞的时间 nacos
比下一个任务执行的时间要短,就阻塞等待 nanos
// nanos:线程需要等待的时间
// delay:第一个任务还有多久才能执行
if (nanos < delay || leader != null)
// nanos < delay 表示线程等待了 nanos 也到不了任务的执行时间,直接阻塞
nanos = available.awaitNanos(nanos);
否则就阻塞 delay
时间,并且阻塞完之后还有剩余 nacos - delay
时间可以继续获取任务,当然能进入下面的流程肯定是 leader
线程为空的情况下,也就是没有 leader 线程
,这时候当前线程会成为 leader
线程来阻塞获取任务执行
// 这里就表示线程等待 nanos 能等到第一个任务的执行时间点
Thread thisThread = Thread.currentThread();
// 把 leader 线程设置为当前线程
leader = thisThread;
try {
// 然后等待 delay 时间,timeLeft 是该线程实际等待的时间
// 如果等待过程中调用了 available.signal() 这些方法也会被唤醒
long timeLeft = available.awaitNanos(delay);
// 计算 nanos 还剩下多少等待时间
nanos -= delay - timeLeft;
} finally {
// 把 leader 线程置空
if (leader == thisThread)
leader = null;
}
从上面也可以看到,在 finally 里面会去把 leader 置空,让出 leader 给其他线程,在新一轮循环里面 leader 就会去获取到任务返回,返回之后在最外层的 finally 里面会唤醒一个 follower 线程去成为 leader 继续阻塞获取
// 把 leader 线程置空,leader 线程执行完任务之后需要让出这个属性给其他 flower 线程来竞争成为 leader 线程
if (leader == null && queue[0] != null)
available.signal();
// 解锁
lock.unlock();
这时候因为已经设置 leader 为空了,所以其他线程可以成为 leader,而当前线程拿着任务返回去执行任务,所以这里的核心理念就是:执行任务的一定会是前 leader 线程
3.8 清空任务队列
/**
* 清空任务队列
*/
public void clear() {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 遍历清除任务队列
for (int i = 0; i < size; i++) {
RunnableScheduledFuture<?> t = queue[i];
if (t != null) {
// 清除队列和任务的下标
queue[i] = null;
setIndex(t, -1);
}
}
size = 0;
} finally {
lock.unlock();
}
}
这个方法逻辑就比较简单了,就是遍历所有的任务,清空下标为 null,同时设置对应任务的 heapIndex = -1
,表示不在队列里面了
3.9 获取队列的第一个过期任务
/**
* 检查队列中的第一个任务有没有过期
* @return
*/
private RunnableScheduledFuture<?> peekExpired() {
// 下面其实就是判断当前线程是不是持有锁
// assert lock.isHeldByCurrentThread();
RunnableScheduledFuture<?> first = queue[0];
// 如果没有任务或者第一个任务还没到期,就返回空
// 否则返回第一个任务
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
这里可以用来检查队首任务有没有过期,如果没有过期会返回 null,如果过期了会返回具体的任务
3.10 获取过期任务
/**
* 将所有已经过期的任务从当前队列里面移除
* @param c 移除调的任务会存到这个集合里面
* @return
*/
public int drainTo(Collection<? super Runnable> c) {
// 参数校验
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first;
int n = 0;
while ((first = peekExpired()) != null) {
// 把过期的任务添加到集合里面
c.add(first);
// 从队列里面获取完任务之后需要向下修复
finishPoll(first);
++n;
}
return n;
} finally {
lock.unlock();
}
}
上面的方法是获取所有过期的任务,同时可以看到是通过 finishPoll
来获取的,获取完之后还得向下修复,当然这里是获取所有的过期任务,还有一个方法是获取最多 maxElements
个任务
/**
* 从任务队列中获取最多 maxElements 个过期的任务,放到集合 c 中
* @param c
* @param maxElements
* @return
*/
public int drainTo(Collection<? super Runnable> c, int maxElements) {
// 参数校验
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first;
int n = 0;
// 最多加入 maxElements 个任务
while (n < maxElements && (first = peekExpired()) != null) {
// 把过期的任务添加到集合里面
c.add(first);
// 从队列里面获取完任务之后需要向下修复
finishPoll(first);
++n;
}
return n;
} finally {
lock.unlock();
}
}
4. 小结
ScheduledThreadPoolExecutor 的队列终于是解析完了,下一篇文章解析最后的一部分,也就是任务类和其他的方法,毕竟大头都在这篇文章中了,包括里面的一些思想啥的
如有错误,欢迎指出!!!
标签:lock,ScheduledThreadPoolExecutor,获取,任务,线程,延时,定时,null,leader From: https://blog.csdn.net/laohuangaa/article/details/144162607