首页 > 其他分享 >Spring使用ThreadPoolTaskExecutor自定义线程池及实现异步调用

Spring使用ThreadPoolTaskExecutor自定义线程池及实现异步调用

时间:2022-08-30 13:44:30浏览次数:55  
标签:自定义 池及 Spring class 任务 线程 executor async public

一、ThreadPoolTaskExecutor

本文采用 Executors 的工厂方法进行配置。

1、将线程池用到的参数定义到配置文件中

在项目的 resources 目录下创建 executor.properties 文件,并添加如下配置:

# 异步线程配置
# 核心线程数
async.executor.thread.core_pool_size=5
# 最大线程数
async.executor.thread.max_pool_size=8
# 任务队列大小
async.executor.thread.queue_capacity=2
# 线程池中线程的名称前缀
async.executor.thread.name.prefix=async-service-
# 缓冲队列中线程的空闲时间
async.executor.thread.keep_alive_seconds=100

2、Executors 的工厂配置

2.1、配置详情
@Configuration
// @PropertySource是找的target目录下classes目录下的文件,resources目录下的文件编译后会生成在classes目录
@PropertySource(value = {"classpath:executor.properties"}, ignoreResourceNotFound=false, encoding="UTF-8")
@Slf4j
public class ExecutorConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;

    @Bean(name = "asyncTaskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        log.info("启动");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        // 任务队列大小
        executor.setQueueCapacity(queueCapacity);
        // 线程前缀名
        executor.setThreadNamePrefix(namePrefix);
        // 线程的空闲时间
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 线程初始化
        executor.initialize();
        return executor;
    }
}
2.2、注解说明
  • @Configuration:Spring 容器在启动时,会加载带有 @Configuration 注解的类,对其中带有 @Bean 注解的方法进行处理。
  • @Bean:是一个方法级别上的注解,主要用在 @Configuration 注解的类里,也可以用在 @Component 注解的类里。添加的 bean 的 id 为方法名。
  • @PropertySource:加载指定的配置文件。value 值为要加载的配置文件,ignoreResourceNotFound 意思是如果加载的文件找不到,程序是否忽略它。默认为 false 。如果为 true ,则代表加载的配置文件不存在,程序不报错。在实际项目开发中,最好设置为 false 。如果 application.properties 文件中的属性与自定义配置文件中的属性重复,则自定义配置文件中的属性值被覆盖,加载的是 application.properties 文件中的配置属性。
  • @Slf4j:lombok 的日志输出工具,加上此注解后,可直接调用 log 输出各个级别的日志。
  • @Value:调用配置文件中的属性并给属性赋予值。
2.3、线程池配置说明
  • 核心线程数:线程池创建时候初始化的线程数。当线程数超过核心线程数,则超过的线程则进入任务队列。
  • 最大线程数:只有在任务队列满了之后才会申请超过核心线程数的线程。不能小于核心线程数。
  • 任务队列:线程数大于核心线程数的部分进入任务队列。如果任务队列足够大,超出核心线程数的线程不会被创建,它会等待核心线程执行完它们自己的任务后再执行任务队列的任务,而不会再额外地创建线程。举例:如果有20个任务要执行,核心线程数:10,最大线程数:20,任务队列大小:2。则系统会创建18个线程。这18个线程有执行完任务的,再执行任务队列中的任务。
  • 线程的空闲时间:当 线程池中的线程数量 大于 核心线程数 时,如果某线程空闲时间超过 keepAliveTime ,线程将被终止。这样,线程池可以动态的调整池中的线程数。
  • 拒绝策略:如果(总任务数 - 核心线程数 - 任务队列数)-(最大线程数 - 核心线程数)> 0 的话,则会出现线程拒绝。举例:( 12 - 5 - 2 ) - ( 8 - 5 ) > 0,会出现线程拒绝。线程拒绝又分为 4 种策略,分别为:

    • CallerRunsPolicy():交由调用方线程运行,比如 main 线程。
    • AbortPolicy():直接抛出异常。
    • DiscardPolicy():直接丢弃。
    • DiscardOldestPolicy():丢弃队列中最老的任务。
2.4、线程池配置个人理解
  • 当一个任务被提交到线程池时,首先查看线程池的核心线程是否都在执行任务。如果没有,则选择一条线程执行任务。
  • 如果都在执行任务,查看任务队列是否已满。如果不满,则将任务存储在任务队列中。核心线程执行完自己的任务后,会再处理任务队列中的任务。
  • 如果任务队列已满,查看线程池(最大线程数控制)是否已满。如果不满,则创建一条线程去执行任务。如果满了,就按照策略处理无法执行的任务。

二、异步调用线程

通常 ThreadPoolTaskExecutor 是和 @Async 一起使用。在一个方法上添加 @Async 注解,表明是异步调用方法函数。@Async 后面加上线程池的方法名或 bean 名称,表明异步线程会加载线程池的配置

@Component
@Slf4j
public class ThreadTest {
    /**
     * 每10秒循环一次,一个线程共循环10次。
     */
    @Async("asyncTaskExecutor")
    public void ceshi3() {
        for (int i = 0; i <= 10; i++) {
            log.info("ceshi3: " + i);
            try {
                Thread.sleep(2000 * 5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

备注:一定要在启动类上添加 @EnableAsync 注解,这样 @Async 注解才会生效。

三、多线程使用场景

1、定时任务 @Scheduled

// 在启动类上添加 @EnableScheduling 注解
@SpringBootApplication
@EnableScheduling
public class SpringBootStudyApplication {
   public static void main(String[] args) {
      SpringApplication.run(SpringBootStudyApplication.class, args);
   }
}
// @Component 注解将定时任务类纳入 spring bean 管理。
@Component
public class listennerTest3 {

    @Autowired
    private ThreadTest t;

    // 每1分钟执行一次ceshi3()方法
    @Scheduled(cron = "0 0/1 * * * ?")
    public void run() {
        t.ceshi3();
    }
}

ceshi3() 方法调用线程池配置,且异步执行。

@Component
@Slf4j
public class ThreadTest {
    /**
     * 每10秒循环一次,一个线程共循环10次。
     */
    @Async("asyncTaskExecutor")
    public void ceshi3() {
        for (int i = 0; i <= 10; i++) {
            log.info("ceshi3: " + i);
            try {
                Thread.sleep(2000 * 5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2、程序一启动就异步执行多线程

通过继承 CommandLineRunner 类实现。

@Component
public class ListennerTest implements CommandLineRunner {

    @Autowired
    private ThreadTest t;

    @Override
    public void run(String... args) {
        for (int i = 1; i <= 10; i++) {
            t.ceshi();
        }
    }
}
@Component
@Slf4j
public class ThreadTest {

    @Async("asyncTaskExecutor")
    public void ceshi() {
        log.info("ceshi");
    }
}

3、定义一个 http 接口

还可以通过接口的形式来异步调用多线程:

@RestController
@RequestMapping("thread")
public class ListennerTest2 {

    @Autowired
    private ThreadTest t;

    @GetMapping("ceshi2")
    public void run() {
        for (int i = 1; i < 10; i++) {
            t.ceshi2();
        }
    }
}
@Component
@Slf4j
public class ThreadTest {

    @Async("asyncTaskExecutor")
    public void ceshi2() {
        for (int i = 0; i <= 3; i++) {
            log.info("ceshi2");
        }
    }
}    

4、测试类

@RunWith(SpringRunner.class)
@SpringBootTest
public class ThreadRunTest {

    @Autowired
    private ThreadTest t;

    @Test
    public void thread1() {
        for (int i = 1; i <= 10; i++) {
            t.ceshi4();
        }
    }
}
@Component
@Slf4j
public class ThreadTest {
    @Async("asyncTaskExecutor")
    public void ceshi4() {
        log.info("ceshi4");
    }
}

四、总结

以上主要介绍了 ThreadPoolTaskExecutor 线程池的配置、使用、相关注解的意义及作用,也简单介绍了使用 @Async 来异步调用线程,最后又列举了多线程的使用场景,并配上了代码示例。

五、使用实例:SpringBoot结合@Async实现

定义线程池

  • 第一步,先在Spring Boot主类中定义一个线程池,比如:
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {

        @Bean("taskExecutor")
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(200);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("taskExecutor-");
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
    }
}

上面我们通过使用 ThreadPoolTaskExecutor创建了一个线程池,同时设置了以下这些参数:

  • 核心线程数10:线程池创建时候初始化的线程数
  • 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  • 缓冲队列200:用来缓冲执行任务的队列
  • 允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
  • 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
  • 线程池对拒绝任务的处理策略:这里采用了 CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务

使用线程池

在定义了线程池之后,我们如何让异步调用的执行任务使用这个线程池中的资源来运行呢?方法非常简单,我们只需要在 @Async注解中指定线程池名即可,比如:

@Slf4j
@Component
public class Task {

    public static Random random = new Random();

    @Async("taskExecutor")
    public void doTaskOne() throws Exception {
        log.info("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务一,耗时:" + (end - start) + "毫秒");
    }

    @Async("taskExecutor")
    public void doTaskTwo() throws Exception {
        log.info("开始做任务二");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务二,耗时:" + (end - start) + "毫秒");
    }

    @Async("taskExecutor")
    public void doTaskThree() throws Exception {
        log.info("开始做任务三");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务三,耗时:" + (end - start) + "毫秒");
    }

}

单元测试

最后,我们来写个单元测试来验证一下

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private Task task;

    @Test
    public void test() throws Exception {

        task.doTaskOne();
        task.doTaskTwo();
        task.doTaskThree();

        Thread.currentThread().join();
    }

}

执行上面的单元测试,我们可以在控制台中看到所有输出的线程名前都是之前我们定义的线程池前缀名开始的,说明我们使用线程池来执行异步任务的试验成功了!

 
2018-03-27 22:01:15.620  INFO 73703 --- [ taskExecutor-1] com.didispace.async.Task                 : 开始做任务一
2018-03-27 22:01:15.620  INFO 73703 --- [ taskExecutor-2] com.didispace.async.Task                 : 开始做任务二
2018-03-27 22:01:15.620  INFO 73703 --- [ taskExecutor-3] com.didispace.async.Task                 : 开始做任务三
2018-03-27 22:01:18.165  INFO 73703 --- [ taskExecutor-2] com.didispace.async.Task                 : 完成任务二,耗时:2545毫秒
2018-03-27 22:01:22.149  INFO 73703 --- [ taskExecutor-3] com.didispace.async.Task                 : 完成任务三,耗时:6529毫秒
2018-03-27 22:01:23.912  INFO 73703 --- [ taskExecutor-1] com.didispace.async.Task

标签:自定义,池及,Spring,class,任务,线程,executor,async,public
From: https://www.cnblogs.com/liftsail/p/16638992.html

相关文章

  • Aspectj与Spring AOP比较
    1、简介今天有多个可用的AOP库,它们需要能够回答许多问题:是否与用户现有的或新的应用程序兼容?在哪里可以实现AOP?与自己的应用程序集成多快?性能开销是多少?在本......
  • SpringAOP
    1.日志处理的问题2.什么是AOP?通过代理模式,可以在指定位置执行对应流程。这样就可以将一些横向的功能抽离出来形成一一个独立的模块,然后在指定位置插入这些功能。这......
  • Java SPI与SpringBoot 自动配置
    JavaSPI设计思想1、使用约定的配置文件2、谁提供jar包,也要负责提供配置文件3、使用ClassLoader的getResource和getResources方法,来读取classpath中的配置文件 Sprin......
  • 面经-框架-Spring Bean生命周期
    SpringBean生命周期1.处理名称,检查缓存一级缓存:放单例成品对象;二级缓存:放单例工厂的产品;三级缓存:放单例工厂。2.检查父工厂如果父子容器名称重复,优先子容器bean。3......
  • 如何在springBoot中进行ReactiveFeignClient超时配置
    最近项目中用到了ReactiveFeign请求第三方的http接口,需要自定义一个请求超时时间,但在网上查了很多资料都没有一个比较准确的配置方法。pom依赖<dependency><groupId>......
  • springboot加载静态资源
      第一步写一个config的类继承,WebMvcConfigurationSupport   重写这个方法/**是指这个后面的路径都可以加载另一个配置类加载web:resources:static-lo......
  • Spring Boot:上传文件大小超限制如何捕获 MaxUploadSizeExceededException 异常
    (7条消息)SpringBoot:上传文件大小超限制如何捕获MaxUploadSizeExceededException异常_ifu25的博客-CSDN博客......
  • go语言的错误处理(自定义错误类型, wrap error)
    go语言的错误处理没有其他语言的try,catch,finally异常捕获机制,需要显式地进行错误处理,如果只是单纯地将错误返回,在深度过大时可能无法清楚地知道调用的链路。这时候可以通......
  • Spring学习笔记(四)——Spring Beans自动装配
    1.自动装配简介你已经学会如何使用<bean>元素来声明bean和通过使用XML配置文件中的<constructor-arg>和<property>元素来注入。Spring容器可以在不使用<constructo......
  • 自定义持久层框架的代码实现二
    代码实现续核心执行引擎Executor的实现/***执行器的实现**@name:SimpleExecutor*@author:terwer*@date:2022-05-0816:53*/classSimpleExecutor......