首页 > 编程语言 >SpringBoot线程池和Java线程池的实现原理

SpringBoot线程池和Java线程池的实现原理

时间:2023-04-11 10:36:52浏览次数:38  
标签:Java SpringBoot void 线程 taskExecutor new public ThreadPoolExecutor

使用默认的线程池

方式一:通过@Async注解调用

public class AsyncTest {
    @Async
    public void async(String name) throws InterruptedException {
        System.out.println("async" + name + " " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }
}

启动类上需要添加@EnableAsync注解,否则不会生效。

@SpringBootApplication
//@EnableAsync
public class Test1Application {
   public static void main(String[] args) throws InterruptedException {
      ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
      AsyncTest bean = run.getBean(AsyncTest.class);
      for(int index = 0; index <= 10; ++index){
         bean.async(String.valueOf(index));
      }
   }
}

方式二:直接注入 ThreadPoolTaskExecutor

需要加上 @EnableAsync注解

@SpringBootTest
class Test1ApplicationTests {

   @Resource
   ThreadPoolTaskExecutor threadPoolTaskExecutor;

   @Test
   void contextLoads() {
      Runnable runnable = () -> {
         System.out.println(Thread.currentThread().getName());
      };

      for(int index = 0; index <= 10; ++index){
         threadPoolTaskExecutor.submit(runnable);
      }
   }

}

线程池默认配置信息

SpringBoot线程池的常见配置:

spring:
  task:
    execution:
      pool:
        core-size: 8
        max-size: 16                          # 默认是 Integer.MAX_VALUE
        keep-alive: 60s                       # 当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过keepAliveTime,线程将被终止
        allow-core-thread-timeout: true       # 是否允许核心线程超时,默认true
        queue-capacity: 100                   # 线程队列的大小,默认Integer.MAX_VALUE
      shutdown:
        await-termination: false              # 线程关闭等待
      thread-name-prefix: task-               # 线程名称的前缀

SpringBoot 线程池的实现原理

TaskExecutionAutoConfiguration 类中定义了 ThreadPoolTaskExecutor,该类的内部实现也是基于java原生的 ThreadPoolExecutor类。initializeExecutor()方法在其父类中被调用,但是在父类中 RejectedExecutionHandler 被定义为了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通过initialize()方法将AbortPolicy传入initializeExecutor()中。

注意在TaskExecutionAutoConfiguration 类中,ThreadPoolTaskExecutor类的bean的名称为: applicationTaskExecutortaskExecutor

// TaskExecutionAutoConfiguration#applicationTaskExecutor()
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
      AsyncAnnotationBeanPostProcessor.DEFAUL
          T_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
   return builder.build();
}
// ThreadPoolTaskExecutor#initializeExecutor()
@Override
protected ExecutorService initializeExecutor(
      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

   BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

   ThreadPoolExecutor executor;
   if (this.taskDecorator != null) {
      executor = new ThreadPoolExecutor(
            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
            queue, threadFactory, rejectedExecutionHandler) {
         @Override
         public void execute(Runnable command) {
            Runnable decorated = taskDecorator.decorate(command);
            if (decorated != command) {
               decoratedTaskMap.put(decorated, command);
            }
            super.execute(decorated);
         }
      };
   }
   else {
      executor = new ThreadPoolExecutor(
            this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
            queue, threadFactory, rejectedExecutionHandler);

   }

   if (this.allowCoreThreadTimeOut) {
      executor.allowCoreThreadTimeOut(true);
   }

   this.threadPoolExecutor = executor;
   return executor;
}
// ExecutorConfigurationSupport#initialize()
public void initialize() {
   if (logger.isInfoEnabled()) {
      logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
   }
   if (!this.threadNamePrefixSet && this.beanName != null) {
      setThreadNamePrefix(this.beanName + "-");
   }
   this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}

覆盖默认的线程池

覆盖默认的 taskExecutor对象,bean的返回类型可以是ThreadPoolTaskExecutor也可以是Executor

@Configuration
public class ThreadPoolConfiguration {

    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    }
}

管理多个线程池

如果出现了多个线程池,例如再定义一个线程池 taskExecutor2,则直接执行会报错。此时需要指定bean的名称即可。

@Bean("taskExecutor2")
public ThreadPoolTaskExecutor taskExecutor2() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    //设置线程池参数信息
    taskExecutor.setCorePoolSize(10);
    taskExecutor.setMaxPoolSize(50);
    taskExecutor.setQueueCapacity(200);
    taskExecutor.setKeepAliveSeconds(60);
    taskExecutor.setThreadNamePrefix("myExecutor2--");
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.setAwaitTerminationSeconds(60);
    //修改拒绝策略为使用当前线程执行
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //初始化线程池
    taskExecutor.initialize();
    return taskExecutor;
}

引用线程池时,需要将变量名更改为bean的名称,这样会按照名称查找。

@Resource
ThreadPoolTaskExecutor taskExecutor2;

对于使用@Async注解的多线程则在注解中指定bean的名字即可。

    @Async("taskExecutor2")
    public void async(String name) throws InterruptedException {
        System.out.println("async" + name + " " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }

线程池的四种拒绝策略

JAVA常用的四种线程池

ThreadPoolExecutor 类的构造函数如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

newCachedThreadPool

不限制最大线程数(maximumPoolSize=Integer.MAX_VALUE),如果有空闲的线程超过需要,则回收,否则重用已有的线程。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>());

newFixedThreadPool

定长线程池,超出线程数的任务会在队列中等待。

return new ThreadPoolExecutor(nThreads, nThreads,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue<Runnable>());

newScheduledThreadPool

类似于newCachedThreadPool,线程数无上限,但是可以指定corePoolSize。可实现延迟执行、周期执行。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

周期执行:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(()->{
   System.out.println("rate");
}, 1, 1, TimeUnit.SECONDS);

延时执行:

scheduledThreadPool.schedule(()->{
   System.out.println("delay 3 seconds");
}, 3, TimeUnit.SECONDS);

newSingleThreadExecutor

单线程线程池,可以实现线程的顺序执行。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Java 线程池中的四种拒绝策略

  • CallerRunsPolicy:线程池让调用者去执行。

  • AbortPolicy:如果线程池拒绝了任务,直接报错。

  • DiscardPolicy:如果线程池拒绝了任务,直接丢弃。

  • DiscardOldestPolicy:如果线程池拒绝了任务,直接将线程池中最旧的,未运行的任务丢弃,将新任务入队。

CallerRunsPolicy

直接在主线程中执行了run方法。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
 
    public CallerRunsPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

效果类似于:

Runnable thread = ()->{
   System.out.println(Thread.currentThread().getName());
   try {
      Thread.sleep(0);
   } catch (InterruptedException e) {
      throw new RuntimeException(e);
   }
};

thread.run();

AbortPolicy

直接抛出RejectedExecutionException异常,并指示任务的信息,线程池的信息。、

public static class AbortPolicy implements RejectedExecutionHandler {
 
    public AbortPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

DiscardPolicy

什么也不做。

public static class DiscardPolicy implements RejectedExecutionHandler {
 
    public DiscardPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

  • e.getQueue().poll() : 取出队列最旧的任务。

  • e.execute(r) : 当前任务入队。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 
    public DiscardOldestPolicy() { }
 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

java 线程复用的原理

java的线程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 对象,该对象在 被维护在private final HashSet<Worker> workers = new HashSet<Worker>();workQueue是保存待执行的任务的队列,线程池中加入新的任务时,会将任务加入到workQueue队列中。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

work对象的执行依赖于 runWorker(),与我们平时写的线程不同,该线程处在一个循环中,并不断地从队列中获取新的任务执行。因此线程池中的线程才可以复用,而不是像我们平常使用的线程一样执行完毕就结束。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

标签:Java,SpringBoot,void,线程,taskExecutor,new,public,ThreadPoolExecutor
From: https://www.cnblogs.com/twilight0402/p/17305328.html

相关文章

  • JavaScript 去除数组中重复的元素 得到新数组
    方法一:思路:准备一个新数组,将原数组中的元素一一放入新数组,放入之前判断该元素是否存在新数组中,不存在的话就直接存入新数组。functionuniqueArr(arr){ varnewArr=[]; for(leti=0;i<arr.length;i++){ if(newArr.indexOf(arr[i])==-1){ newArr.push(arr[i]); } } r......
  • Javascript 原型与原型链
    在BrendanEich设计Javascript时,借鉴了Self和Smalltalk这两门基于原型的语言。之所以选择基于原型的面向对象系统,是因为BrendanEich一开始没有打算在JavaScript中加入类的概念,其设计初衷是为非专业的开发人员提供一个方便的工具,使其使用尽可能简单、易学。随着人们对网页要求的逐......
  • java包装类
    基本数据类型包装类byteBytebooleanBooleanshortShortcharCharacterintIntegerlongLongfloatFloatdoubleDouble/*Integeri=100;intj=newInteger(100); 自动装箱调用的是valueOf()方法,而不是newInteger()方......
  • 多线程事务的提交解决办法
    多线程处理的时候,如果发生了错误,不会因为加了@Transcational注解而生效,这里需要额外使用SqlSessionTemplate{//插入主表electronicTaxBillMapper.insertBatch(masterList);//更新出库单状态outOrderDetailMapper.updateByOrderCodeList(orderCodeList);//切......
  • java jar包运行用外置配置yml文件
    1.准备文件和目录2.启动命令java-jar-Dspring.config.location=config/application.ymldatachangenew.jar ......
  • delphi 11.3 java.ioexception:cleartext http traffic [IP地址] not permitted
    要在AndroidManifest.xml添加如下属性即可:参考:HowtoFixCleartextHTTPTrafficnotPermittedinAndroid-TRENDOCEANS ......
  • 用Java写一段中国身份证的正则表达式,要求验证身份证中的日期,且大于1900年,以及校验码验
    以下是一个Java正则表达式,可用于验证中国身份证中的日期,并要求日期在1900年及之后:Stringregex="(?:(?:19[0-9]\\d)|(?:[2-9]\\d{3}))(?:0[1-9]|1[012])(?:0[1-9]|[12]\\d|3[01])\\d{3}[\\dXx]";这个正则表达式的含义如下:(?:(?:19[0-9]\\d)|(?:[2-9]\\d{3})):匹配1900年......
  • Java虚拟机整体思路
    我们日常编程的Java编程是在Java语言规范代码,通过javac前端编译器编译器,产生字节码规范,此时我们应该对字节码文件结构有一个大致的认识,此时我们了解了Java虚拟机内存面局(专业术语叫运行时数据区),类加载器通过加载器将字节码文件加载到内存中(此时应该对类加载的过程有一个大致的了解......
  • 线程中的终极异常处理处理
    提问线程中的终极异常处理处理回答为了异常阻塞主线程是不值得的使用事件通知方式,这样不会阻塞主线程捕捉AggregateException......
  • java.lang.NoSuchMethodException: com.innovation.web.BuyServlet.$%7Bid%7D(javax.s
    问题描述我在html页面写了get到删除某条记录的url路径里去,然后一直显示这个错误,也到不了相应的后台方法里面去,就很离谱欸家人们!问题解决听从友友的建议,将之前的/deleteCarts/${id}改成了之前用过的那种样式,也就是/deleteCarts?id=${id},然后就成功跳转到那个后台servlet里面啦!......