首页 > 其他分享 >二、Quartz原理及流程

二、Quartz原理及流程

时间:2023-09-03 20:45:07浏览次数:139  
标签:Quartz thread 流程 private WorkerThread wt 线程 原理 runnable

参考

https://www.zhihu.com/question/41918492/answer/490367825

线程模型

Quartz 的线程模型如上图所示,其中 RegularSchedulerThread 为常规调度线程、MisfireSchedulerThread 为错失触发调度线程、JobThreadPool 为任务执行线程池。

常规调度线程轮询存储的所有 trigger,如果有需要触发的 trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该 trigger 关联的任务。Misfire 线程是扫描所有的 trigger,查看是否有 misfired trigger,如果有的话根据 misfire 的策略分别处理。

其中,常规调度线程的执行流程如下图所示:

  1. 常规调度线程会不断循环判断,直到任务线程池中有一个空闲线程,接着它会从 Trigger 集合中取出接下来 N 秒内即将触发的任务,然后等待着任务触发执行;最后触发器触发任务会被分派到线程池中的一个线程异步执行。
// 常规调度线程池
public class SimpleThreadPool implements ThreadPool {
    // 线程池总线程数
    private int count = -1;
    private int prio = Thread.NORM_PRIORITY;
    // 线程池当前状态
    private boolean isShutdown = false;
    private boolean handoffPending = false;
    private final Object nextRunnableLock = new Object();


    // 存放所有工作线程引用
    private List<WorkerThread> workers;
    // 存放所有空闲线程
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
    // 存放所有工作线程
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

    // 在下一个空闲线程中执行当前任务,如果线程池被要求关闭,会立刻给当前任务分配一个额外线程去执行
    public boolean runInThread(Runnable runnable) {
        if (runnable == null) {
            return false;
        }
        // 如果多线程同时调用,需要加锁
        synchronized (nextRunnableLock) {

            handoffPending = true;

            // Wait until a worker thread is available
            while ((availWorkers.size() < 1) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }

            if (!isShutdown) {
                WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
                busyWorkers.add(wt);
                wt.run(runnable);
            } else {
                // If the thread pool is going down, execute the Runnable
                // within a new additional worker thread (no thread from the pool).
                WorkerThread wt = new WorkerThread(this, threadGroup,
                        "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
                busyWorkers.add(wt);
                workers.add(wt);
                wt.start();
            }
            nextRunnableLock.notifyAll();
            handoffPending = false;
        }
        return true;
    }

    // 关闭线程池中所有正在执行任务的线程
    public void shutdown(boolean waitForJobsToComplete) {

        synchronized (nextRunnableLock) {
            getLog().debug("Shutting down threadpool...");

            isShutdown = true;

            if(workers == null) // case where the pool wasn't even initialize()ed
                return;

            // signal each worker thread to shut down
            Iterator<WorkerThread> workerThreads = workers.iterator();
            while(workerThreads.hasNext()) {
                WorkerThread wt = workerThreads.next();
                wt.shutdown();
                availWorkers.remove(wt);
            }

            // Give waiting (wait(1000)) worker threads a chance to shut down.
            // Active worker threads will shut down after finishing their
            // current job.
            nextRunnableLock.notifyAll();

            if (waitForJobsToComplete == true) {

                boolean interrupted = false;
                try {
                    // wait for hand-off in runInThread to complete...
                    while(handoffPending) {
                        try {
                            nextRunnableLock.wait(100);
                        } catch(InterruptedException _) {
                            interrupted = true;
                        }
                    }

                    // Wait until all worker threads are shut down
                    while (busyWorkers.size() > 0) {
                        WorkerThread wt = (WorkerThread) busyWorkers.getFirst();
                        try {
                            getLog().debug(
                                    "Waiting for thread " + wt.getName()
                                            + " to shut down");

                            // note: with waiting infinite time the
                            // application may appear to 'hang'.
                            nextRunnableLock.wait(2000);
                        } catch (InterruptedException _) {
                            interrupted = true;
                        }
                    }

                    workerThreads = workers.iterator();
                    while(workerThreads.hasNext()) {
                        WorkerThread wt = (WorkerThread) workerThreads.next();
                        try {
                            wt.join();
                            workerThreads.remove();
                        } catch (InterruptedException _) {
                            interrupted = true;
                        }
                    }
                } finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }

                getLog().debug("No executing jobs remaining, all threads stopped.");
            }
            getLog().debug("Shutdown of threadpool complete.");
        }
    }


    // 工作线程定义为内部类
    class WorkerThread extends Thread {
        // 工作线程执行时的互斥锁
        private final Object lock = new Object();

        // 工作线程暂停的标记
        private AtomicBoolean run = new AtomicBoolean(true);

        private SimpleThreadPool tp;

        // 具体执行的任务,构造函数传入
        private Runnable runnable = null;
        
        private boolean runOnce = false;

        /**
            * <p>
            * Create a worker thread and start it. Waiting for the next Runnable,
            * executing it, and waiting for the next Runnable, until the shutdown
            * flag is set.
            * </p>
            */
        WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
                        int prio, boolean isDaemon) {

            this(tp, threadGroup, name, prio, isDaemon, null);
        }

        /**
            * <p>
            * Create a worker thread, start it, execute the runnable and terminate
            * the thread (one time execution).
            * </p>
            */
        WorkerThread(SimpleThreadPool tp, ThreadGroup threadGroup, String name,
                        int prio, boolean isDaemon, Runnable runnable) {

            super(threadGroup, name);
            this.tp = tp;
            this.runnable = runnable;
            if(runnable != null)
                runOnce = true;
            setPriority(prio);
            setDaemon(isDaemon);
        }

        // 标记执行终止或结束
        void shutdown() {
            run.set(false);
        }

        public void run(Runnable newRunnable) {
            synchronized(lock) {
                if(runnable != null) {
                    throw new IllegalStateException("Already running a Runnable!");
                }

                runnable = newRunnable;
                lock.notifyAll();
            }
        }


        @Override
        public void run() {
            boolean ran = false;
            
            while (run.get()) {
                try {
                    synchronized(lock) {
                        while (runnable == null && run.get()) {
                            lock.wait(500);
                        }

                        if (runnable != null) {
                            ran = true;
                            runnable.run();         // 任务执行
                        }
                    }
                } catch (InterruptedException unblock) {
                    // do nothing (loop will terminate if shutdown() was called
                    try {
                        getLog().error("Worker thread was interrupt()'ed.", unblock);
                    } catch(Exception e) {
                        // ignore to help with a tomcat glitch
                    }
                } catch (Throwable exceptionInRunnable) {
                    try {
                        getLog().error("Error while executing the Runnable: ",
                            exceptionInRunnable);
                    } catch(Exception e) {
                        // ignore to help with a tomcat glitch
                    }
                } finally {
                    synchronized(lock) {
                        runnable = null;
                    }
                    // repair the thread in case the runnable mucked it up...
                    if(getPriority() != tp.getThreadPriority()) {
                        setPriority(tp.getThreadPriority());
                    }

                    if (runOnce) {
                            run.set(false);
                        clearFromBusyWorkersList(this);
                    } else if(ran) {
                        ran = false;
                        makeAvailable(this);
                    }

                }
            }

            //if (log.isDebugEnabled())
            try {
                getLog().debug("WorkerThread is shut down.");
            } catch(Exception e) {
                // ignore to help with a tomcat glitch
            }
        }
    }
}

错失调度线程的执行流程如下图:

  1. 错失调度线程首先会扫描所有的 trigger 集合判断是否有错失未触发的 misfired triggers
  2. 如果存在错失未触发的集合,循环遍历其中的每个触发器,对于每一个触发器根据所配置的错失触发原则 misfirePolicy 选择对应的处理方法处理;

Quartz 启动流程

标签:Quartz,thread,流程,private,WorkerThread,wt,线程,原理,runnable
From: https://www.cnblogs.com/istitches/p/17675546.html

相关文章

  • springboot自动配置的原理和如何自定义starter
    一、springboot自动配置的原理使用springboot时的一大优点就是当需要引入一些第三方的框架时只需要引入一个对应的starter后springboot就会自动的完成配置,例如在springboot中使用mybatis只需要引入mybatis提供的starter.那么这种便捷的配置方式是如何实现的呢,要了解其中的原理......
  • MD5算法原理(未完成)
    MD5简介MD5不是一种加密算法,而是一种哈希算法,用于生成固定长度的哈希值。哈希算法通常不涉及加密或解密,它们是单向操作,将输入数据转换为固定长度的哈希值,而无法从哈希值还原原始数据。MD5算法核心步骤:填充数据:首先,将输入数据填充到长度为512位的多重数(multipleof512bits),......
  • 图解Spark Graphx基于connectedComponents函数实现连通图底层原理
    原创/朱季谦第一次写这么长的graphx源码解读,还是比较晦涩,有较多不足之处,争取改进。一、连通图说明连通图是指图中的任意两个顶点之间都存在路径相连而组成的一个子图。用一个图来说明,例如,下面这个叫graph的大图里,存在两个连通图。左边是一个连接图,该子图里每个顶点都存在路......
  • DPDK基本原理
    内核处理网络数据包弊端中断处理处理大量网络数据包时,出现频繁的硬件中断,产生较高的性能开销。内存拷贝网络数据包从网卡到应用程序流程是,数据从网卡通过DMA传到内核缓冲区,从内核态拷贝到用户态。上下文切换硬件中断、多线程、锁竞争产生上下文切换开销。CPU缓存失效数据包处......
  • JDK源码阅读:ArrayList原理
    ArrayList原理ArrayList集合底层数据结构ArrayList集合介绍List接口的可调整大小的数组实现。数组:一旦初始化长度就不可以发生改变数组结构特性增删慢:每次删除元素,都需要更改数组长度、拷贝以及移动元素位置。查询快:由于数组在内存中是一块连续空间,因此可以根据地址+索引的......
  • 微信小程序开发部署发布流程
    微信小程序开发部署发布流程最近因为有比赛,所以在进行敏捷小程序开发,由于我比较菜,不会JS原生,所以选择了符合技术栈的技术路线。MPFlutter框架+dart语言的开发。这样就可以符合“同时产出小程序与APP”的需求。1.微信小程序申请微信公众平台(qq.com)进行申请,注册,认证。按照......
  • AQS公平锁的流程
    reentrantd的lock执行的是aqs的acquire方法1tryAcquirecas设置state的状态从0设置为1,成功则获取到锁,不成功则进行下一步 2、addWaiter 没有或取到锁,构建一个node,因为第一次队列没有元素,头部和尾部节点为空,走enq方法会将head和tail都指向一个newNode(),然后将tail指向等待......
  • dubbo原理
    一、dubbo简单原理主要包括五个节点:Provider、Consumer、Container、Register、MonitorProvider:服务提供者Consumer:服务订阅者Container:服务运行的容器Register:注册中心Monitor:监控中心,统计服务调用次数和调动时间dubbo工作过程:服务容器负责启动,加载,运行服务提供者。服务提供者在......
  • flowable对已经部署的流程定义进行更新(实操)
    首先通过createProcessDefinitionQuery()方法获取特定流程定义(根据流程定义键和最新版本)。然后,使用deleteDeployment()方法删除旧的部署及其相关数据。接下来,我们创建新的部署对象,并使用addClasspathResource()方法添加新的流程资源文件。最后,通过deploy()方法执行部署操作。在重......
  • 扩容Linux文件系统:从基本原理到实践
    一、引言在Linux系统中,文件系统是存储和组织数据的核心组件。随着应用程序和数据的不断增加,有时候需要扩大文件系统的容量。本文将介绍扩容Linux文件系统的方法和步骤,帮助您轻松应对存储需求。二、准备步骤在进行文件系统扩容之前,需要确保以下事项:了解现有磁盘空间:使用df-h命令查......