首页 > 其他分享 >线程池ThreadPoolTaskExecutor的同步及异步使用

线程池ThreadPoolTaskExecutor的同步及异步使用

时间:2022-12-07 20:56:37浏览次数:75  
标签:异步 task org 线程 executor import ThreadPoolTaskExecutor public

参考信息

本人参考的是这一篇,描述方面比本人好得多: springboot线程池的使用和扩展 VisiableThreadPoolTaskExecutor

背景:

简略记一下,笔记:

  • 目标是想在 springboot服务下,自定义一个线程池,然后使用异步,原目的是为了批量导入用。

项目架构

  • 普通的springboot服务

步骤

1、先定义一个 ExecutorConfig 类
为了方便,Executor 实现类也可以放在这个config 类。用 @Bean 注解一下也是可以的
并且要加上 @Configuration @EnableAsync 这两个注解~~


import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import lombok.extern.slf4j.Slf4j;

/**
 * 线程池config
 *
 */
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

    // 核心线程数
    @Value("${async.executor.thread.core_pool_size:5}")
    private int corePoolSize;

    // 最大线程数
    @Value("${async.executor.thread.max_pool_size:10}")
    private int maxPoolSize;

    // 线程池队列数
    @Value("${async.executor.thread.queue_capacity:2000}")
    private int queueCapacity;

    // 线程池前缀
    @Value("${async.executor.thread.name.prefix:async-task-}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.warn("start asyncServiceExecutor");
        //在这里修改
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);
        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //配置队列大小
        executor.setQueueCapacity(queueCapacity);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

2、这里用到了一个 VisiableThreadPoolTaskExecutor 类, 这个就是为了对线程池运行过程中的细节进行记录的一个类。
看下代码就清楚了:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * ThreadPoolTaskExecutor的子类
 */
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if(null==threadPoolExecutor){
            return;
        }

        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

3、其实到此,准备工作就做完了。。。接下来就是调用了。
controller 层, service 层。。。emmm,这里暂不详述了,参考的博文写得更好。。。
大概示意一下service层的同步异步方法的调用。。:

  • 线程池同步调用的方式:
    意思就是, 一个请求过来,会等线程池里面执行完成后,再返回。
    异步,就不等待了,直接响应回去了~
    // 同步用法,要把 Executor 注入进来,并指定 name
    @Resource(name = "asyncServiceExecutor")
    private Executor executor;

    @Override
    public void showme() {
        log.info("start>>>>>>");
        executor.execute(()->{
            // 模拟请求。。
            // 从日志上看, start >>>>> 的打印时间和  end >>>>>> 的打印时间,中间是会隔着3秒的
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        log.info("end>>>>>>");
    }

  • 异步用法
    // 异步用法,就不用这两行了。。。
    // @Resource(name = "asyncServiceExecutor")
    // private Executor executor;

    // 但是要在方法上面,加上这一行了,重要,重要~
    @Async("asyncServiceExecutor")
    @Override
    public void showme() {
        log.info("start>>>>>>");
        // 模拟请求。。
        // 这里用睡眠3秒表示请求,但实际上这个方法,并不会阻塞,它会马上返回。。。
        // 用日志上看到, start >>>>> 和 end >>>> 是在同一时间戳打印的。。。
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("end>>>>>>");
    }



本人比较懒,也只是随手笔记,如有错漏请指正!谢谢。

标签:异步,task,org,线程,executor,import,ThreadPoolTaskExecutor,public
From: https://www.cnblogs.com/aaacarrot/p/16964508.html

相关文章

  • 线程池中CompletionService的应用
    当使用ExecutorService启动了多个Callable后,每个Callable会产生一个Future,我们需要将多个Future存入一个线性表,用于之后处理数据。当然,还有更复杂的情况,有5个生产者线程,每......
  • java面试(多线程)
    1. Callable、Future、FutureTash详解Callable与Future是在JAVA的后续版本中引入进来的,Callable类似于Runnable接口,实现Callable接口的类与实现Runnable的类都是可以被线程......
  • JUC6 中断机制与线程通信三种让线程等待和唤醒的方法:
    1.线程中断1.1什么是线程中断①.一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止,所以,Thread.stop、Thread.suspend、Thread.resume都已经被废......
  • JUC5 多线程锁(下)
    1.​​synchronize​​锁升级:无锁,偏向锁,轻量锁,重量锁(看病:社区医院->三甲医院)1.1 概述按照获得锁和释放锁的性能消耗,锁的分类:1.无锁状态2.偏向锁:不进行​​CAS​​,测......
  • JUC4 多线程锁(上)
    1.乐观锁和悲观锁①.悲观锁什么是悲观锁?认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改适合写操作多的场......
  • logback异步输出日志详解
    前言logback应该是目前最流行的日志打印框架了,毕竟SpringBoot中默认的集成的日志框架也是logback。在实际项目开发过程中,常常会遇到由于打印大量日志而导致程序并发降低,QPS......
  • 将jquery validate校验框架的remote异步验证设置为同步校验
        最近公司的项目中都是使用的jqueryvalidate在做表单,感觉确实非常好用,很灵活,用起来很顺手。但也遇到了不少问题。在此记录一下。    问题:当提交表单触......
  • 使用ajaxFileUpload实现文件异步上传
     最近在项目中遇到要使用ajax提交包含file输入框的表单的情况,网上查了下,发现ajaxFileUpload.js插件的比较多。就研究了下,发现真的不错。传统的包含file输入框的表单提交遇......
  • 线程饥饿死锁
    线程饥饿死锁:   在一个线程池中,如果一个任务依赖于其他任务的执行,就可能产生死锁。对应一个单线程话的Executor,一个任务将另一个任务提交到相同的Executor中,并等待......
  • 线程的取消和中断
    下面介绍取消线程常用的4中方式:一、通过设置“cancelled requested”标志来中断线程java中的任务取消实现:    是通过一个协作机制完成的,使用一个线程能够要求另......