首页 > 编程语言 >java线程池-1

java线程池-1

时间:2024-01-22 17:58:59浏览次数:28  
标签:java 队列 任务 线程 执行 方法 public

1. 概述

  • Java线程的创建非常昂贵,需要JVM和OS(操作系统)配合完成大量的工作:

    • 必须为线程堆栈分配和初始化大量内存块,其中包含至少1MB的栈内存。
    • 需要进行系统调用,以便在OS(操作系统)中创建和注册本地线程。
  • Java高并发应用频繁创建和销毁线程的操作将是非常低效的,而且是不被编程规范所允许的。如何降低Java线程的创建成本?必须使用到线程池。线程池主要解决了以下两个问题:

    • 提升性能: 线程池能独立负责线程的创建、维护和分配。在执行大量异步任务时,可以不需要自己创建线程,而是将任务交给线程池去调度。线程池能尽可能使用空闲的线程去执行异步任务,最大限度地对已经创建的线程进行复用,使得性能提升明显。
    • 线程管理: 每个Java线程池会保持一些基本的线程统计信息,例如完成的任务数量、空闲时间等,以便对线程进行有效管理,使得能对所接收到的异步任务进行高效调度。

2. java的线程池常用类

image

2.1 Executor

  • Executor是Java异步目标任务的执行者接口,其目标是来执行目标任务。执行者Executor提供了execute()接口来执行已提交的Runnable执行目标实例。Executor作为执行者的角色,其目的是任务提交者任务执行者分离开来的机制。它只包含一个函数式方法:
    public interface Executor {
        void execute(Runnable command);
    }
    
    

2.2 ExecutorService

  • 目标任务的执行者服务接口,对外提供异步任务的接收服务,ExecutorService提供了接收异步任务并转交给执行者”的方法,如submit系列方法、invoke系列方法等。具体如下:

    // 向线程池提交单个异步任务
    <T> Future<T> submit(Callable<T>task);
    
    //向线程池提交批量异步任务
    <T>List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    
    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

2.3 AbstractExecutorService

  • AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。AbstractExecutorService存在的目的是为ExecutorService中的接口提供默认实现。

2.4 ThreadPoolExecutor

  • ThreadPoolExecutor就是大名鼎鼎的线程池实现类,它继承于AbstractExecutorService抽象类。
  • ThreadPoolExecutor是JUC线程池的核心实现类。线程的创建和终止需要很大的开销,线程池中预先提供了指定数量的可重用线程,所以使用线程池会节省系统资源,并且每个线程池都维护了一些基础的数据统计,方便线程的管理和监控。

2.5 ScheduledExecutorService

  • ScheduledExecutorService是一个接口,它继承于ExecutorService。它是一个可以完成延时性周期性任务的调度线程池接口,其功能和Timer/TimerTask类似。

    public interface ExecutorService extends Executor {
    
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

2.6 ScheduledThreadPoolExecutor

  • ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,它提供了ScheduledExecutorService线程池接口中延时执行周期执行等抽象调度方法的具体实现。
  • ScheduledThreadPoolExecutor类似于Timer,但是在高并发程序中,ScheduledThreadPoolExecutor的性能要优于Timer。

2.7 Executors

  • Executors是个静态工厂类,它通过静态工厂方法返回ExecutorServiceScheduledExecutorService等线程池实例对象,这些静态工厂方法可以理解为一些快捷的创建线程池的方法。
newSingleThreadExecutor() // 创建只有一个线程的线程池
newFixedThreadPool(int nThreads) // 创建固定大小的线程池
newCachedThreadPool() // 创建一个不限制线程数量的线程池,提交的任务都将立即执行,但是空闲线程会得到及时回收
newScheduledThreadPool() // 创建一个可定期或者延时执行任务的线程池

2.7.1 newSingleThreadExecutor 创建单线程化线程池

  • 该方法用于创建一个单线程化线程池,也就是只有一条线程的线程池,所创建的线程池用唯一的工作线程来执行任务,使用此方法创建的线程池能保证所有任务按照指定顺序(如FIFO)执行。
  • 调用Executors.newSingleThreadExecutor()方法创建一个单线程化线程池
public class CreateThreadPoolDemo {
    public static final int SLEEP_GAP = 500;
    public static final int MAX_TURN = 5;

    //异步的执行目标类
   public static class TargetTask implements Runnable {
        static AtomicInteger taskNo = new AtomicInteger(1);
        protected String taskName;

        public TargetTask() {
            taskName = "task-" + taskNo.get();
            taskNo.incrementAndGet();
        }

        public void run() {           
            // TODO
            // 线程睡眠一会
            sleepMilliSeconds(SLEEP_GAP);
            // TODO
        }

        @Override
        public String toString() {
            return "TargetTask{" + taskName + '}';
        }
    }

    //测试用例:只有一条线程的线程池
    @Test
    public void testSingleThreadExecutor() {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            pool.execute(new TargetTask());
            pool.submit(new TargetTask());
        }
        sleepSeconds(1000);
        //关闭线程池
        pool.shutdown();
    }
}

image

  • 单线程化的线程池中的任务,是按照提交的次序顺序执行的。

  • 池中的唯一线程的存活时间是无限的

  • 当池中的唯一线程正繁忙时,新提交的任务实例会进入内部的阻塞队列中,并且其阻塞队列是无界的

    总体来说,单线程化的线程池所适用的场景是:

    • 任务按照提交次序,一个任务接一个任务执行的场景。

    调用shutdown()方法用来关闭线程池。执行shutdown()方法后,线程池状态变为SHUTDOWN状态,此时线程池将拒绝新任务,不能再往线程池中添加新任务,否则会抛出RejectedExecutionException异常。

    调用shutdown方法后的线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成才会退出。还

    有一个与shutdown类似的方法,叫作shutdownNow(),执行shutdownNow()方法后,线程池状态会立刻变成STOP,并试图停止所有正在执行的线程不再处理还在阻塞队列中等待的任务会返回那些未执行的任务

2.7.2 newFixedThreadPool 创建固定数量的线程池

  • 该方法用于创建一个固定数量的线程池,其唯一的参数用于设置池中线程的固定数量。调用Executors.newFixedThreadPool (int threads)方法创建固定数量线程的线程池
    public void testNewFixedThreadPool() {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            pool.execute(new TargetTask());
            pool.submit(new TargetTask());
        }
        sleepSeconds(1000);
        //关闭线程池
        pool.shutdown();
    }

image

  • 创建一个线程数为3的固定数量线程池,然后向其中提交了10个任务。从输出结果可以看到,该线程池同时只能执行3个任务剩余的任务会排队等待

  • 固定数量的线程池的特点大致如下:

    1. 如果线程数没有达到固定数量,每次提交一个任务,池内就创建一个新线程,直到线程达到线程池固定的数量。
    2. 线程池的大小一旦达到固定数量就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
    3. 在接收异步任务的执行目标实例时,如果池中的所有线程均在繁忙状态,新任务会进入阻塞队列中(无界的阻塞队列)。

    固定数量的线程池的适用场景: 需要任务长期执行的场景。固定数量的线程池的线程数能够比较稳定保证一个数,避免频繁回收线程和创建线程,故适用于处理CPU密集型的任务,在CPU被工作线程长时间使用的情况下,能确保尽可能少地分配线程。
    固定数量的线程池的弊端: 内部使用无界队列来存放排队任务,当大量任务超过线程池最大容量需要处理时,队列无线增大,使服务器资源迅速耗尽。

2.7.3 newCachedThreadPool 创建可缓存线程池

  • 该方法用于创建一个可缓存线程池,如果线程池内的某些线程无事可干成为空闲线程,可缓存线程池可灵活回收这些空闲线程。
  • 使用Executors.newCachedThreadPool()方法创建一个可缓存线程池
   public void testNewCacheThreadPool() {
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            pool.execute(new TargetTask());
            pool.submit(new TargetTask());
        }
        sleepSeconds(1000);
        //关闭线程池
        pool.shutdown();
    }

image

  • 可缓存线程池的特点:

    1. 在接收新的异步任务target执行目标实例时,如果池内所有线程繁忙,此线程池就会添加新线程来处理任务。
    2. 线程池不会对线程池大小进行限制,线程池大小完全依赖于操作系统(或者说JVM) 能够创建的最大线程大小。
    3. 如果部分线程空闲,也就是存量线程的数量超过了处理任务数量,就会回收空闲(60秒不执行任务) 线程。

    可缓存线程池的适用场景: 需要快速处理突发性强、耗时较短的任务场景,如Netty的NIO处理场景、REST API接口的瞬时削峰场景。

    可缓存线程池的线程数量不固定,只要有空闲线程就会被回收: 接收到的新异步任务执行目标,查看是否有线程处于空闲状态,如果没有就直接创建新的线程。

    可缓存线程池的弊端: 线程池没有最大线程数量限制,如果大量的异步任务执行目标实例同时提交,可能会因线程过多而导致资源耗尽。

2.7.4 newScheduledThreadPool 创建可调度线程池

  • 创建一个可调度线程池,即一个提供延时周期性任务的调度功能的ScheduledExecutorService类型的线程池。

  • Executors提供了多个创建可调度线程池工厂方法部分如下:

    // 方法一: 创建一个可调度线程池,池内仅含有一个线程
    public static ScheduledExecutorService newSingleThreadScheduledExecutor();
    
    // 方法二: 创建一个可调度线程池,池内含有N个线程,N的值为输入参数 corePoolSize
    public static ScheduledExecutorService newscheduledThreadPool(int corePoolsize) :
    
  • newSingleThreadScheduledExecutor工广方法所创建的仅含有一个线程的可调度线程池,适用于调度串行化任务,也就是一个任务接一个任务地串行化调度执行。

  • 使用Executors.newScheduledThreadPool(int corePoolSize)方法创建一个可调度线程池

 public void testNewScheduledThreadPool() {
        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
        for (int i = 0; i < 2; i++) {
            scheduled.scheduleAtFixedRate(new TargetTask(),
                    0, 500, TimeUnit.MILLISECONDS);
            //以上的参数中:
            // 0表示首次执行任务的延迟时间,500表示每次执行任务的间隔时间
            //TimeUnit.MILLISECONDS所设置的时间的计时单位为毫秒
        }
        sleepSeconds(1000);
        //关闭线程池
        scheduled.shutdown();
    }
  • newScheduledThreadPool工厂方法可以创建一个执行延时周期性任务。可调度线程池所创建的线程池为ScheduleExecutorService类型的实例。ScheduleExecutorService接口中有多个重要的接收被调目标任务方法,其中scheduleAtFixedRatescheduleWithFixedDelay使用得比较多。

    • ScheduleExecutorService接收被调目标任务方法之一:scheduleAtFixedRate方法的定义如下

       public ScheduledFuture<?> scheduleAtFixedRate(
           Runnable command,  // 异步任务target执行目标
           long initialDelay, // 首次执行延时
           long period,       // 两次开始执行最小间隔时间
           TimeUnit unit 		// 所设置的时间的计时单位,如TimeUnit.SECONDS常量
       );
      
    • ScheduleExecutorService接收被调目标任务方法之二:scheduleWithFixedDelay方法的定义如下

       public ScheduledFuture<?> scheduleWithFixedDelay(
           Runnable command, 	// 异步任务target执行目标
           long initialDelay,	// 首次执行延时
           long delay,		// 前一次执行结束到下一次执行开始的间隔时间 (间隔执行延迟时间)
           TimeUnit unit		//  所设置的时间的计时单位,如TimeUnit.SECONDS常量
       );
      
  • 当被调任务的执行时间大于指定的间隔时间时,ScheduleExecutorService不会在创建一个新的线程去并发执行这个任务,而是等待前一次调度执行完毕

  • 可调度线程池的适用场景:

    • 周期性执行任务的场景。Spring Boot中的任务调度器,底层借助了JUC的ScheduleExecuorService调度线程池实现,并且可以通过@Configuration配置类型的Bean。

    • 可调度线程池实例进行配置,下面是一个例子:

      @Configuration
      public class ScheduledConfig implements SchedulingConfiqurer{
          @Override
          public void configureTasks (ScheduledTaskRegistrar scheduledTaskRegistrar){
              Method[] methods = BatchProperties.Job.class.getMethods();
              int defaultPoolSize = 4; //默认的线程数为4
              int corePoolSize = 0;	
              //扫描配置了@Scheduled调度注解的方法
              //根据需要调度的方法数,配置线程池中的线程数if(methods != null && methods .length > 0)
      		if(methods != null && methods.length > 0){
                  for(Method method : methods ){
                      Scheduled annotation = method.getAnnotation(Scheduled.class);
                      if (annotation != null){
                          corePoolSize++;
                      }
                  }
           
                  if (defaultPoolSize > corePoolSize){
                      corePoolSize = defaultPoolSize;
                  }
          	} 	              scheduledTaskRegistrar.setscheduler(Executors.newScheduledThreadPool(corePoolSize));   
          }
      }
      
  • 为何JUC要提供工厂方法呢?

    • 原因是使用ThreadPoolExecutorScheduledThreadPoolExecutor构造器去创建普通线程池、可调度线程池比较复杂,这些构造器会涉及大量的复杂参数。
    • 尽管Executors的工厂方法使用方便,但是在生产场景中被很多企业(尤其是大厂) 的开发规范所禁用。

3. 线程池的标准创建方式

  • 大部分企业的开发规范都会禁止使用快捷线程池,要求通过标准构造器ThreadPoolExecutor去构造工作线程池。

  • Executors工厂类中创建线程池的快捷工厂方法实际上是调用ThreadPoolExecutor线程池的构造方法完成的

  • 定时任务使用ScheduledThreadPoolExecutor 线程池的构造方法完成的。

    public ThreadPoolExecutor(
        int corePoolSize,				 	// 核心线程数,即使线程空闲,也不会回收
        int maximumPoolSize, 				// 线程数的上限
        long keepAliveTime, 				// 线程最大空闲时长
        TimeUnit unit,						// 所设置的时间的计时单位,如TimeUnit.SECONDS常量
        BlockingQueue<Runnable> workQueue,  // 任务的排队队列
        ThreadFactory threadFactory, 		// 新线程的产生方式
        RejectedExecutionHandler handler	// 拒绝策略
    ) 
    

3.1 核心和最大线程数量

  • 参数corePoolSize用于设置核心(Core) 线程池数量
  • 参数maximumPoolSize用于设置最大线程数量。
  • 线程池执行器将会根据corePoolSizemaximumPolSize自动地维护线程池中的工作线程
  • 维护规则为:
    1. 当在线程池接收到的新任务,并且当前工作线程数少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求,直到线程数达到corePoolSize。
    2. 如果当前工作线程数多于corePoolSize数量,但小于maximumPoolSize数量,那么仅当任务队列已满时才会创建新线程
    3. 通过设置corePoolSizemaximumPoolSize相同,可以创建一个固定大小的线程池。
    4. maximumPoolSize被设置为无界值(如Integer.MAX_VALUE)时,线程池可以接收任意数量的并发任务。
    5. corePoolSizemaximumPoolSize不仅能在线程池构造时设置,也可以调用setCorePoolSizel()setMaximumPoolSize()两个方法进行动态更改。

3.2 BlockingQueue

  • BlockingQueue(阻塞队列)的实例用于暂时接收到的异步任务,如果线程池的核心线程都在忙,那么所接收到的目标任务缓存在阻塞队列中。

3.3 keepAliveTime

  • 线程构造器的keepAliveTime (空闲线程存活时间)参数用于设置池内线程最大Idle (空闲)时长或者说保活时长,如果超过这个时间,默认情况下Idle线程、非Core线程会被回收。

  • 如果池在使用过程中提交任务的频率变高,也可以调用方法setKeepAliveTime(long, TimeUnit)进行线程存活时间的动态调整,可以将时长延长。如果需要防止Idle线程被终止,可以将Idle时间设置为无限大,具体如下:

    setKeepAliveTime (Long.MAX_VALUE,TimeUnit.NANOSECONDS);
    
  • 默认情况下,Idle超时策略仅适用于存在超过corePoolSize线程的情况。

    • 如果调用了allowCoreThreadTimeOut(boolean)方法,并且传入了参数true,则keepAliveTime参数所设置的Idle超时策略也将被应用于核心线程

4 向线程池提交任务

// 方式一: 调用execute()方法,Executor 接口中的方法
void execute(Runnable command);

// 方式二: 调用submit()方法,ExecutorService 接口中的方法
<T> Future<T> submit (Callable<T> task);
<T> Future<T> submit (Runnable task, T result);
Future<?> submit (Runnable task);
  • submit和execute两类方法区别
    1. 二者所接受的参数不一样
    • execute()方法只能接收Runnable类型的参数,而submit()方法可以接收Callable、Runnable两种类型的参数。
    • Callable类型的任务是可以返回执行结果的,而Runnable类型的任务不可以返回执行结果
    • Callable允许抛出异常Runnable不允许抛出异常。
    1. submit()提交任务后会有返回值,而execute()没有
    • execute()方法主要用于启动任务的执行,而任务的执行结果和可能的异常调用者并不关心
    • submit()方法也用于启动任务的执行,但是启动之后会返回Future对象,代表一个异步执行实例,可以通过该异步执行实例去获取结果。
    1. submit()方便Exception处理
    • execute()方法在启动任务的执行后,任务执行过程中可能发生的异常调用者并不关心。
    • submit()方法返回Future对象(异步执行实例),可以进行异步执行过程中的异常捕获

4.1 通过 submit()返回的 Future 对象获取结果

public void testSubmit2() {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        Future<Integer> future = pool.schedule(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                //返回200 - 300 之间的随机数
                return RandomUtil.randInRange(200, 300);
            }
        },100,TimeUnit.MILLISECONDS);

        try {
            Integer result = future.get();
            System.out.println("异步执行的结果是:" + result);
        } catch (InterruptedException e) {
            System.out.println("异步调用被中断");
            e.printStackTrace();
        } catch (ExecutionException e) {
            System.out.println("异步调用过程中,发生了异常");
            e.printStackTrace();
        }
        sleepSeconds(10);
        //关闭线程池
        pool.shutdown();
    }

image

4.2 通过 submit()返回的 Future 对象捕获异常

public void testSubmit2() {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        Future<Integer> future = pool.schedule(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                //返回200 - 300 之间的随机数
                return RandomUtil.randInRange(200, 300);
            }
        },100,TimeUnit.MILLISECONDS);

        try {
            Integer result = future.get();
            System.out.println("异步执行的结果是:" + result);
        } catch (InterruptedException e) {
            System.out.println("异步调用被中断");
            e.printStackTrace();
        } catch (ExecutionException e) {
            System.out.println("异步调用过程中,发生了异常");
            e.printStackTrace();
        }
        sleepSeconds(10);
        //关闭线程池
        pool.shutdown();
    }

image

ThreadPoolExecutor类的实现中,内部核心的任务提交方法是execute()方法,虽然用户程序通过submit()也可以提交任务,但是实际上submit()方法中最终调用的还是execute()方法。

5.线程池的任务调度流程

  • 线程池的任务调度流程(包含接收新任务和执行下一个任务)大致如下:

    1. 如果当前工作线程数量小于核心线程数量,执行器总是优先创建一个任务线程,而不是线程队列中获取一个空闲线程
    2. 如果线程池中总的任务数量大于核心线程池数量新接收的任务将被加入到阻塞队列中,直到阻塞队列已满。在核心线程池数量已经用完、阻塞队列没有满的场景下,线程池不会为新任务创建一个新线程。
    3. 当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务,并开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
    4. 核心线程池数量已经用完、阻塞队列也已经满了的场景下,如果线程池接收到新的任务,将会为新任务创建一个线程(非核心线程)并且立即开始执行新任务
    5. 在核心线程都用完、阻塞队列已满的情况下,一直会创建新线程去执行新任务,直到池内的线程总数超出maximumPoolSize。如果线程池的线程总数超过maximumPoolSize,线程池就会拒绝接收任务,当新任务过来时,会为新任务执行拒绝策略

image

  1. 核心和最大线程数量、BlockingQueue队列等参数如果配置得不合理,可能会造成异步任务得不到预期的并发执行,造成严重的排队等待现象。
  2. 线程池的调度器创建线程的一条重要的规则是: 在corePoolSize已满之后,还需要等阻塞队列已满,才会为去创建新的线程。

6. ThreadFactory(线程工厂)

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

  • ThreadFactory唯一方法newThread()创建新线程,可以更改新线程的名称线程组优先级守护进程状态等。
  • 如果newThread()返回值为null,表示线程工厂未能成功创建线程,线程池可能无法执行任何任务
  • 使用Executors创建新的线程池时,也可以基于ThreadFactory(线程工厂)创建,在创建新线程池时可以指定将使用ThreadFactory实例。如果没有指定的话,就会使用Executors.defaultThreadFactory默认实例。使用默认的线程工厂实例所创建的线程全部位于同一个ThreadGroup(线程组)中,具有相同的NORM_PRIORITY(优先级为5),而且都是非守护进程状态
  • 基于自定义的ThreadFactory实例创建线程池,首先需要实现一个ThreadFactory类,实现唯一的抽象方法newThread(Runnable)
 public class SimpleThreadFactory implements ThreadFactory {
        static AtomicInteger threadNo = new AtomicInteger(1);
        //实现其唯一的创建线程方法
        @Override
        public Thread newThread(Runnable target) {
            String threadName = "simpleThread-" + threadNo.get();
            System.out.println("创建一条线程,名称为:" + threadName);
            threadNo.incrementAndGet();
            //设置线程名称
            Thread thread = new Thread(target, threadName);
            //设置为守护线程
            thread.setDaemon(true);
            return thread;
        }
    }


    @org.junit.Test
    public void testThreadFactory() {
        //使用自定义线程工厂,快捷创建线程池
        ExecutorService pool =
                Executors.newFixedThreadPool(2, new SimpleThreadFactory());
        for (int i = 0; i < 5; i++) {
            pool.submit(new TargetTask());
        }
        //等待10秒
        sleepSeconds(10);
        System.out.println("关闭线程池");
        pool.shutdown();
    }

image

7. 任务阻塞队列

  • Java中的阻塞队列(BlockingQueue)与普通队列相比有一个重要的特点:
    • 阻塞队列为空时,会阻塞当前线程的元素获取操作
      • 一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素
      • 当队列中有元素后,被阻塞的线程会自动被唤醒(唤醒过程不需要用户程序干预)。
    • Java线程池使用BlockingQueue存放接收到的异步任务,BlockingQueue是JUC包的一个超级接口,比较常用的实现类有:
      1. ArrayBlockingQueue: 是一个数组实现的有界阻塞队列(有界队列),队列中的元素按FIFO排序。在创建时必须设置大小,接收的任务超出corePoolSize数量时,任务被缓存到该阻塞队列中,任务缓存的数量只能为创建时设置的大小,若该阻塞队列满,则会为新的任务创建线程,直到线程池中的线程总数大于maximumPoolSize。
      2. LinkedBlockingQueue: 是一个基于链表实现的阻塞队列,按FIFO排序任务,可以设置容量(有界队列),不设置容量则默认使用Integer.Max_VALUE作为容量(无界队列)LinkedBlockingQueue队列的吞吐量高于ArrayBlockingQueue。如果不设置LinkedBlockingQueue的容量(无界队列),当接收的任务数量超出corePoolSize数量时,则新任务可以被无限制地缓存到该阻塞队列中,直到资源耗尽。有两个快捷创建线程池的工方法Executors.newSingleThreadExecutorExecutors.newFixedThreadPool使用了这个队列,并且都没有设置容量(无界队列)。
      3. PriorityBlockingQueue: 是具有优先级的无界队列。
      4. DelayQueue: 这是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,而队列头部的元素是最先过期的元素。快捷工厂方法Executors.newScheduledThreadPool所创建的线程池使用此队列。
      5. SynchronousQueue(同步队列): 是一个不存储元素阻塞队列,每个插入操作必须等到另一个线程的调用移除操作,否则插入操作一直处于阻塞状态,其吞吐量通常高于LinkedBlockingQueue。快捷工厂方法Executors.newCachedThreadPool所创建的线程池使用此队列。这个队列不会保存提交的任务,而是直接新建一个线程来执行新来的任务。

8. 调度器的钩子方法

  • ThreadPoolExecutor线程池调度器为每个任务执行前后都提供了钩子方法。
  • ThreadPoolExecutor类提供了三个钩子方法(空方法),这三个空方法一般用作被子类重写。
// 任务执行之前的钩子方法(前钩子)
protected void beforeExecute(Thread t,Runnable r){}

// 任务执行之后的钩子方法(后钩子)
protected void afterExecute(Runnable r,Throwable t){}

// 线程池终止时的钩子方法(停止钩子)
protected void terminated(){}
  1. beforeExecute: 异步任务执行之前的钩子方法
    • 线程池工作线程在异步执行完成的目标实例(如Runnable实例)前调用此钩子方法。
    • 此方法仍然由执行任务的工作线程调用默认实现不执行任何操作,但可以在子类中对其进行自定义。
    • 此方法由执行目标实例的工作线程调用,可用于重新初始化ThreadLocal线程本地变量实例。更新日志记录、开始计时统计、更新上下文变量等。
  2. afterExecute: 异步任务执行之后的钩子方法
    • 线程池工作线程在异步执行目标实例后调用此钩子方法。
    • 此方法仍然由执行任务的工作线程调用。
    • 此钩子方法的默认实现不执行任何操作,可以在调度器子类中对其进行自定义。
    • 此方法由执行目标实例的工作线程调用,可用于清除ThreadLocal线程本地变量、更新日志记录、收集统计信息、更新上下文变量等。
  3. terminated: 线程池终止时的钩子方法
    • terminated钩子方法在Executor终止时调用,默认实现不执行任何操作。
public void testHooks() {
        ExecutorService pool = new ThreadPoolExecutor(
            	2, // 核心线程
                4, // 总线程
               60, // 休眠时长
               TimeUnit.SECONDS, // 休眠单位
               new LinkedBlockingQueue<>(2)) {
            @Override
            protected void terminated() {
                System.out.println("调度器已经终止!");
            }

            @Override
            protected void beforeExecute(Thread t, Runnable target) {
                System.out.println(target + "前钩子被执行");
                //记录开始执行时间
                START_TIME.set(System.currentTimeMillis());
                super.beforeExecute(t, target);
            }


            @Override
            protected void afterExecute(Runnable target, Throwable t) {
                super.afterExecute(target, t);
                //计算执行时长
                long time = (System.currentTimeMillis() - START_TIME.get());
                System.out.println(target + " 后钩子被执行, 任务执行时长(ms):" + time);
                //清空本地变量
                START_TIME.remove();
            }
        };
         
    	pool.execute(new TargetTask());
        //等待10秒
        sleepSeconds(10);
        System.out.println("关闭线程池");
        pool.shutdown();

    }

image

beforeExecute(前钩子)方法中通过startTime线程局部变量暂存了异步目标任务(如Runnable实例)的开始执行时间(起始时间);

afterExecute(后钩子)方法中通过startTime线程局部变量获取了之前暂存的起始时间,然后计算与系统当前时间(结束时间)之间的时间差,从而得出异步目标任务的执行时长。

9. 线程池的拒绝策略

  • 在线程池的任务缓存队列为有界队列(有容量限制的队列)的时候,如果队列满了,提交任务到线程池的时候就会被拒绝。
  • 任务被拒绝有两种情况:
    1. 线程池已经被关闭。
    2. 工作队列已满且maximumPoolSize已满。
  • 无论以上哪种情况任务被拒绝,线程池都会调用RejectedExecutionHandler实例的reiectedExecution()方法。
  • RejectedExecutionHandler是拒绝策略的接口,JUC为该接口提供了以下几种实现:
    • AbortPolicy: 拒绝策略。
    • DiscardPolicy: 抛弃策略。
    • DiscardOldestPolicy: 抛弃最老任务策略。
    • CallerRunsPolicy: 调用者执行策略。
    • 自定义策略。

image

  1. AbortPolicy
    使用该策略时,如果线程池队列满了新任务就会被拒绝,并且抛出ReiectedExecutionException异常。该策略是线程池的默认的拒绝策略

     public static class AbortPolicy implements RejectedExecutionHandler {
            public AbortPolicy() { }
         
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    
  2. DiscardPolicy
    使用该策略,如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出

    public static class DiscardPolicy implements RejectedExecutionHandler {
            public DiscardPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
  3. DiscardOldestPolicy
    抛弃最老任务策略,也就是说如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除对头元素后再尝试入队。

        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
         
            public DiscardOldestPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    
  4. CallerRunsPolicy
    调用者执行策略。在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务不会使用线程池中的线程去执行新任务。

     public static class CallerRunsPolicy implements RejectedExecutionHandler {       
            public CallerRunsPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
  5. 自定义策略
    如果以上拒绝策略都不符合需求,那么可自定义一个拒绝策略,实现RejectedExecutionHandler接口的rejectedExecution方法即可。

    // 自定义拒绝策略
    public static class CustomIgnorePolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           // TODO
        }
    }
    
    

线程池默认的拒绝策略为AbortPolicy如果提交的任务被拒绝,线程池抛出ReiectedExecutionException异常,该异常是非受检异常(运行时异常),很容易忘记捕获。如果关心任务被拒绝的事件,需要在提交任务时捕获ReiectedExecutionException异常。

10. 线程池的状态

  • 一般情况下,线程池启动后建议手动关闭

  • 线程池状态。线程池总共存在5种状态,定义在ThreadPoolExecutor类中

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
  • 线程池的5种状态具体如下:

    1. RUNNING: 线程池创建之后的初始状态,这种状态下可以执行任务
    2. SHUTDOWN: 该状态下线程池不再接受新任务,但是会将工作队列中的任务执行完毕
    3. STOP: 该状态下线程池不再接受新任务,也不会处理工作队列中的剩余任务,并且将会中断所有工作线程
    4. TIDYING: 该状态下所有任务都已终止或者处理完成,将会执行terminated()钩子方法。
    5. TERMINATED: 执行完terminated()钩子方法之后的状态。
  • 线程池的状态转换规则为:

    1. 线程池创建之后状态为RUNNING
    2. 执行线程池的shutdown()实例方法,会使线程池状态从RUNNING转变为SHUTDOWN
    3. 执行线程池的shutdownNow()实例方法,会使线程池状态从RUNNING转变为STOP
    4. 当线程池处于SHUTDOWN状态,执行其shutdownNow()方法会将其状态转变为STOP
    5. 等待线程池的所有工作线程停止,工作队列清空之后,线程池状态会从STOP转变为
      TIDYING.
    6. 执行完terminated()钩子方法之后,线程池状态从TIDYING转变为TERMINATED

image

11. 优雅关闭线程池

  • shutdown: 是JUC提供一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后才会执行关闭,但是此方法被调用之后线程池的状态转变为SHUTDOWN线程池不会再接收新的任务
  • shutdownNow: 是JUC提供一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务
  • awaitTermination: 等待线程池完成关闭。在调用线程池的shutdown()shutdownNow()方法时,当前线程立即返回,不会一直等待直到线程池完成关闭。如果需要等到线程池关闭完成,可以调用awaitTermination()方法。

11.1 shutdown()方法原理

   public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
             // 检查权限
            checkShutdownAccess();
            // 设置线程池状态
            advanceRunstate(SHUTDOWN);
            // 中断空闲线程
            interruptIdleworkers();
            //钩子函数,主要用于清理一些资源onShutdown();
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
  1. shutdown()方法首先加锁
  2. 其次检查调用者是否具有执行线程池关闭的Java Security权限。
  3. 接着shutdown()方法会将线程池状态变为SHUTDOWN,在这之后线程池不再接受提交的新任务。
  4. 如果继续往线程池提交任务,会使用线程池拒绝策略,默认使用ThreadPoolExecutor.AbortPolicy,接收新任务时会抛出RejectedExecutionException异常。

11.2 shutdownNow()方法原理

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
             // 检查权限
            checkShutdownAccess();
             // 设置线程池状态为 STOP
            advanceRunState(STOP);
          	 // 中断所有线程,包括工作线程以及空闲线程
            interruptWorkers();
			 // 丢弃工作队列中剩余任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
  1. shutdownNow()方法将会把线程池状态设置为STOP
  2. 然后中断所有线程(包括工作线程以及空闲线程)
  3. 最后清空工作队列,取出工作队列中所有未完成的任务返回给调用者。
  4. 与有序的shutdown()方法相比,shutdownNow方法比较粗暴,直接中断工作线程。这里需要注意的是,中断线程并不代表线程立刻结束,只是通过工作线程的interrupt()实例方法设置了中断状态,这里需要用户程序主动配合线程进行中断操作。

11.3 awaitTermination()方法的使用

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

如果线程池完成关闭,awaitTermination()方法将会返回true,否则当等待时间超过指定时间后将会返回false。如果需要调用awaitTermination(),建议不是永久等待,而是设置一定重试次数。

11.4 小结

  • 结合shutdown()、shutdownNow()awaitTermination()三个方法去优雅关闭一个线程池,大致分为以下几步:

    1. 执行shutdown()方法,拒绝新任务的提交,并等待所有任务有序地执行完毕。
    2. 执行awaitTermination(long timeout,TimeUnitunit)方法,指定超时时间,判断是否已经关闭所有任务,线程池关闭完成。
    3. 如果awaitTermination()方法返回false,或者被中断,就调用shutDownNow()方法立即关闭线程池所有任务。
    4. 执行awaitTermination(long timeout,TimeUnit unit)方法,判断线程池是否关闭完成。如果超时,就可以进入循环关闭,循环一定的次数(如1000次),不断关闭线程池,直到其关闭或者循环结束。
    public static void shutdownThreadPoolGracefully(ExecutorService threadPool) {
            if (!(threadPool instanceof ExecutorService) || threadPool.isTerminated()) {
                return;
            }
            try {
                threadPool.shutdown();   //拒绝接受新任务
            } catch (SecurityException e) {
                return;
            } catch (NullPointerException e) {
                return;
            }
            try {
                // 等待 60 s,等待线程池中的任务完成执行
                if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                    // 调用 shutdownNow 取消正在执行的任务
                    threadPool.shutdownNow();
                    // 再次等待 60 s,如果还未结束,可以再次尝试,或则直接放弃
                    if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                        System.err.println("线程池任务未正常执行结束");
                    }
                }
            } catch (InterruptedException ie) {
                // 捕获异常,重新调用 shutdownNow
                threadPool.shutdownNow();
            }
            //任然没有关闭,循环关闭1000次,每次等待10毫秒
            if (!threadPool.isTerminated()) {
                try {
                    for (int i = 0; i < 1000; i++) {
                        if (threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                            break;
                        }
                        threadPool.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    System.err.println(e.getMessage());
                } catch (Throwable e) {
                    System.err.println(e.getMessage());
                }
            }
        }
    

11.3 注册 JVM 钩子函数自动关闭线程池

  • 如果使用了线程池,可以在JVM注册一个钩子函数,在IM进程关闭之前,由钩子函数自动将线程池优雅关闭,以确保资源正常释放。
static class SeqOrScheduledTargetThreadPoolLazyHolder {
        //线程池:用于定时任务、顺序排队执行任务
        static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(
                1,
                new CustomThreadFactory("seq"));
        
        static {
            //JVM关闭时的钩子函数
            Runtime.getRuntime().addShutdownHook(
                    new ShutdownHookThread("定时和顺序任务线程池", new Callable<Void>() {
                        @Override
                        public Void call() throws Exception {
                            //优雅关闭线程池
                            shutdownThreadPoolGracefully(EXECUTOR);
                            return null;
                        }
                    }));
        }
        
    }

标签:java,队列,任务,线程,执行,方法,public
From: https://www.cnblogs.com/ccblblog/p/17980588

相关文章

  • 5个非常流行且常用的Java自动化测试框架!
    目前有无数的Java测试框架可供QA测试人员使用。可以理解的是,您拥有的选择越多,从很多中选出最好的一个就越复杂。在本文中,我们将分享一些常用的Java测试框架,以提升您的职业生涯并提高您的技能组合。1、JUnitJUnit是Java中最常用的单元测试框架之一。它可以用于测试各种Ja......
  • 对java方法增加@Async注解不管用的问题
    如果你在Java方法上使用了@Async注解,但异步执行并没有生效,可能是因为以下几个常见原因:未启用异步配置:需要在SpringBoot的启动类上添加@EnableAsync注解来开启对异步任务的支持。1@SpringBootApplication2@EnableAsync3publicclassApplication{4publicstaticvoidmain(......
  • 复杂JSON数据的扁平化解析_Java实现
    在工作中遇到了解析JSON数据的场景,但是此岗位传统的做法是通过Python脚本来实现的,而且是非常不合理的手动解析——每对应不同的JSON数据结构,都需要手动改动很多脚本文件,工作量与JSON数据结构的复杂程度成正比!(很难想象这是一个做开发的人想出来的方案)因此最开始接触此工作内容的时......
  • Java基础复习之选择结构使用思路
    Java基础复习之选择结构使用思路目录目录Java基础复习之选择结构使用思路目录一、Java提供的三种选择结构二、三种选择结构的使用结构(一)关于if...else的三种使用结构(二)三元运算符(三)关于switch...case的两种使用结构三、选择结构使用思路一、Java提供的三种选择结构if、......
  • Java开发者的Golang进修指南:从0->1带你实现协程池
    在Java编程中,为了降低开销和优化程序的效率,我们常常使用线程池来管理线程的创建和销毁,并尽量复用已创建的对象。这样做不仅可以提高程序的运行效率,还能减少垃圾回收器对对象的回收次数。在Golang中,我们知道协程(goroutine)由于其体积小且效率高,在高并发场景中扮演着重要的角色。然......
  • 使用Javamail接收imaps协议的邮件
    网上的消息不能说大多,只能说基本都过时了,连imap和imaps都不分了本文基于apache-james项目搭建的邮件服务器,其他邮件服务器仅供参考首先是依赖,这里需要引入两个依赖,如下<dependency><groupId>javax.mail</groupId><artifactId>javax.mail-api</artifactId>......
  • 到底什么样的 Java 项目用 Solon 好???
    什么样的Java项目用Solon好就像华为讲的,不要因为爱国而特意买华为手机。Solon也是,有需要就用不需要就跳过(按正常的需求选择):信创需要国产化,应该用Solon或者SolonCloud(有案例)军工项目要国产化,应该用Solon或者SolonCloud(有案例)嵌入式设备,内存有限,算力差,可以用Solo......
  • JavaScript 中的展开运算符是什么?
    展开运算符(SpreadOperator)是JavaScript中的一种语法,用于将可迭代对象(如数组或字符串)展开为独立的元素。它使用三个连续的点号(...)作为操作符。展开运算符可以在多种情况下使用,包括数组、对象和函数调用等。下面是一些展开运算符的用法示例:1:展开数组:使用展开运算符可以将一......
  • Java 如何将Excel转换为TXT文本格式
    TXT文件是一种非常简单、通用且易于处理的文本格式。在处理大规模数据时,将Excel转为TXT纯文本文件可以提高处理效率。此外,许多编程语言和数据处理工具都有内置的函数和库来读取和处理TXT文件,因此将Excel文件转换为TXT还可以简化数据导入过程。本文将介绍如何使用Java将Excel转为TX......
  • python 多线程multiprocessing
    该多线程,简单计算结果可以使用,在django里想并行处理多个实体进行计算不行,请自行验证importmultiprocessing#要在进程池中并行执行的任务函数defprocess_data(data):#执行任务的逻辑result=data*2returnresultif__name__=='__main__':#创......