参考
线程模型
Quartz 的线程模型如上图所示,其中 RegularSchedulerThread
为常规调度线程、MisfireSchedulerThread
为错失触发调度线程、JobThreadPool
为任务执行线程池。
常规调度线程轮询存储的所有 trigger,如果有需要触发的 trigger
,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该 trigger 关联的任务。Misfire
线程是扫描所有的 trigger,查看是否有 misfired trigger
,如果有的话根据 misfire
的策略分别处理。
其中,常规调度线程的执行流程如下图所示:
- 常规调度线程会不断循环判断,直到任务线程池中有一个空闲线程,接着它会从
Trigger
集合中取出接下来 N 秒内即将触发的任务,然后等待着任务触发执行;最后触发器触发任务会被分派到线程池中的一个线程异步执行。
// 常规调度线程池
public class SimpleThreadPool implements ThreadPool {
// 线程池总线程数
private int count = -1;
private int prio = Thread.NORM_PRIORITY;
// 线程池当前状态
private boolean isShutdown = false;
private boolean handoffPending = false;
private final Object nextRunnableLock = new Object();
// 存放所有工作线程引用
private List<WorkerThread> workers;
// 存放所有空闲线程
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
// 存放所有工作线程
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
// 在下一个空闲线程中执行当前任务,如果线程池被要求关闭,会立刻给当前任务分配一个额外线程去执行
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
// 如果多线程同时调用,需要加锁
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
// 关闭线程池中所有正在执行任务的线程
public void shutdown(boolean waitForJobsToComplete) {
synchronized (nextRunnableLock) {
getLog().debug("Shutting down threadpool...");
isShutdown = true;
if(workers == null) // case where the pool wasn't even initialize()ed
return;
// signal each worker thread to shut down
Iterator<WorkerThread> workerThreads = workers.iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = workerThreads.next();
wt.shutdown();
availWorkers.remove(wt);
}
// Give waiting (wait(1000)) worker threads a chance to shut down.
// Active worker threads will shut down after finishing their
// current job.
nextRunnableLock.notifyAll();
if (waitForJobsToComplete == true) {
boolean interrupted = false;
try {
// wait for hand-off in runInThread to complete...
while(handoffPending) {
try {
nextRunnableLock.wait(100);
} catch(InterruptedException _) {
interrupted = true;
}
}
// Wait until all worker threads are shut down
while (busyWorkers.size() > 0) {
WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
try {
getLog().debug(
"Waiting for thread " + wt.getName()
+ " to shut down");
// note: with waiting infinite time the
// application may appear to 'hang'.
nextRunnableLock.wait(2000);
} catch (InterruptedException _) {
interrupted = true;
}
}
workerThreads = workers.iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = (WorkerThread) workerThreads.next();
try {
wt.join();
workerThreads.remove();
} catch (InterruptedException _) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
getLog().debug("No executing jobs remaining, all threads stopped.");
}
getLog().debug("Shutdown of threadpool complete.");
}
}
// 工作线程定义为内部类
class WorkerThread extends Thread {
// 工作线程执行时的互斥锁
private final Object lock = new Object();
// 工作线程暂停的标记
private AtomicBoolean run = new AtomicBoolean(true);
private SimpleThreadPool tp;
// 具体执行的任务,构造函数传入
private Runnable runnable = null;
private boolean runOnce = false;
/**
* <p>
* Create a worker thread and start it. Waiting for the next Runnable,
* executing it, and waiting for the next Runnable, until the shutdown
* flag is set.
* </p>
*/
WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
int prio, boolean isDaemon) {
this(tp, threadGroup, name, prio, isDaemon, null);
}
/**
* <p>
* Create a worker thread, start it, execute the runnable and terminate
* the thread (one time execution).
* </p>
*/
WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
int prio, boolean isDaemon, Runnable runnable) {
super(threadGroup, name);
this.tp = tp;
this.runnable = runnable;
if(runnable != null)
runOnce = true;
setPriority(prio);
setDaemon(isDaemon);
}
// 标记执行终止或结束
void shutdown() {
run.set(false);
}
public void run(Runnable newRunnable) {
synchronized(lock) {
if(runnable != null) {
throw new IllegalStateException("Already running a Runnable!");
}
runnable = newRunnable;
lock.notifyAll();
}
}
@Override
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
if (runnable != null) {
ran = true;
runnable.run(); // 任务执行
}
}
} catch (InterruptedException unblock) {
// do nothing (loop will terminate if shutdown() was called
try {
getLog().error("Worker thread was interrupt()'ed.", unblock);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} catch (Throwable exceptionInRunnable) {
try {
getLog().error("Error while executing the Runnable: ",
exceptionInRunnable);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} finally {
synchronized(lock) {
runnable = null;
}
// repair the thread in case the runnable mucked it up...
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
run.set(false);
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;
makeAvailable(this);
}
}
}
//if (log.isDebugEnabled())
try {
getLog().debug("WorkerThread is shut down.");
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
}
}
}
错失调度线程的执行流程如下图:
- 错失调度线程首先会扫描所有的
trigger
集合判断是否有错失未触发的misfired triggers
; - 如果存在错失未触发的集合,循环遍历其中的每个触发器,对于每一个触发器根据所配置的错失触发原则
misfirePolicy
选择对应的处理方法处理;