首页 > 其他分享 >@Async异步注解的使用

@Async异步注解的使用

时间:2023-01-04 11:11:54浏览次数:47  
标签:异步 config 任务 线程 注解 Async threadPoolTaskExecutor public ThreadPoolExecutor

@Async

简介

使用spring快速开启异步执行服务的注解

应用场景

同步:同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。

异步: 异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。例如, 在某个调用中,需要顺序调用 A, B, C三个过程方法;如他们都是同步调用,则需要将他们都顺序执行完毕之后,方算作过程执行完毕; 如B为一个异步的调用方法,则在执行完A之后,调用B,并不等待B完成,而是执行开始调用C,待C执行完毕之后,就意味着这个过程执行完毕了。在Java中,一般在处理类似的场景之时,都是基于创建独立的线程去完成相应的异步调用逻辑,通过主线程和不同的业务子线程之间的执行流程,从而在启动独立的线程之后,主线程继续执行而不会产生停滞等待的情况。

使用

必要条件

​ spring环境

简单使用

1.在启动类或配置类加上@EnableAsync
@SpringBootApplication
@EnableAsync
public class AsyncDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(AsyncDemoApplication.class, args);
    }

}
2.在服务类加上@Async注解
public interface TestService {

    void test();

    String mainTest();
}
@Service
public class TestServiceImpl implements TestService {


    @Override
    @Async
    public void test() {
        try {
            System.out.println("线程开始");
            TimeUnit.SECONDS.sleep(10);
            int a=1/0;
            System.out.println("service test:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


    @Override
    public String mainTest() {
        System.out.println("service msinTest:"+Thread.currentThread().getName());
        return "Hello World";
    }
}
3.测试

注意:测试时普通方法和异步方法并行,如果串行(在一个方法中调用另一个方法)则不会有效果

@SpringBootTest
@RunWith(SpringRunner.class)
class AsyncDemoApplicationTests {

    @Resource
    private TestService testService;
    @Test
    void contextLoads() {
        String s = testService.mainTest();
        if (s.equals("Hello World")){
            testService.test();
        }
        System.out.println(s);
        Scanner scanner = new Scanner(System.in);
        int i = scanner.nextInt();
    }
}
4.结果
service msinTest:main
Hello World
线程开始
2023-01-04 10:34:45.165 ERROR 22804 --- [         task-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.wangfan.service.impl.TestServiceImpl.test()
5.问题分析
  • 默认线程池的弊端

    在线程池应用中,参考阿里巴巴java开发规范:线程池不允许使用Executors去创建,不允许使用系统默认的线程池,推荐通过ThreadPoolExecutor的方式,这样的处理方式让开发的工程师更加明确线程池的运行规则,规避资源耗尽的风险。Executors各个方法的弊端:

  • newFixedThreadPool和newSingleThreadExecutor:主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。

  • newCachedThreadPool和newScheduledThreadPool:要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

    @Async默认异步配置使用的是SimpleAsyncTaskExecutor,该线程池默认来一个任务创建一个线程,若系统中不断的创建线程,最终会导致系统占用内存过高,引发OutOfMemoryError错误。针对线程创建问题,SimpleAsyncTaskExecutor提供了限流机制,通过concurrencyLimit属性来控制开关,当concurrencyLimit>=0时开启限流机制,默认关闭限流机制即concurrencyLimit=-1,当关闭情况下,会不断创建新的线程来处理任务。基于默认配置,SimpleAsyncTaskExecutor并不是严格意义的线程池,达不到线程复用的功能。

自定义线程池使用

  • 重新实现接口AsyncConfigurer
  • 继承AsyncConfigurerSupport
  • 配置由自定义的TaskExecutor替代内置的任务执行器

开启和测试代码参考简单使用

1.自定义线程池
@Configuration
public class AsyncConfig  {

    public static final String EXECUTOR_NAME = "testExecutor";
    @Value("${thread-pool.config.corePoolSize:1}")
    private int corePoolSize;
    @Value("${thread-pool.config.maxPoolSize:2}")
    private int maxPoolSize;
    @Value("${thread-pool.config.queueCapacity:2}")
    private int queueCapacity;
    @Value("${thread-pool.config.threadNamePrefix:vchr-}")
    private String threadNamePrefix;

    @Bean(name = EXECUTOR_NAME)
    public Executor executor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        // 最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        // 阻塞队列容量
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        // 待任务在关机时完成--表明等待所有线程执行完
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        // 线程名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
        // 设置拒绝策略,目前是直接拒绝
        /**
         * 到线程池的任务缓存队列已满并且线程池中的线程数到达了maxPoolSize,如果还有任务到来就会采取任务拒绝策略,
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但不抛出异常
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前边的任务,然后重新尝试执行任务(重复此过程)
         * TreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用execute()方法,直到成功
         */
        //初始化
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

2.指定服务类@Async的线程池

@Service
public class TestServiceImpl implements TestService {


    @Override
    @Async(value="AsyncConfig.EXECUTOR_NAME")
    public void test() {
        try {
            System.out.println("线程开始");
            TimeUnit.SECONDS.sleep(10);
            int a=1/0;
            System.out.println("service test:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }


    @Override
    public String mainTest() {
        System.out.println("service msinTest:"+Thread.currentThread().getName());
        return "Hello World";
    }
}

3.测试结果

service msinTest:main
Hello World
线程开始
2023-01-04 10:45:51.770 ERROR 25304 --- [         vchr-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.wangfan.service.impl.TestServiceImpl.test()

线程名称已改变

2.重新实现接口AsyncConfigurer
@Configuration
@Slf4j
public class AsyncConfig implements AsyncConfigurer {

    public static final String EXECUTOR_NAME = "testExecutor";
    @Value("${thread-pool.config.corePoolSize:1}")
    private int corePoolSize;
    @Value("${thread-pool.config.maxPoolSize:2}")
    private int maxPoolSize;
    @Value("${thread-pool.config.queueCapacity:2}")
    private int queueCapacity;
    @Value("${thread-pool.config.threadNamePrefix:vchr-}")
    private String threadNamePrefix;

    @Bean(name = EXECUTOR_NAME)
    public Executor executor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        // 最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        // 阻塞队列容量
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        // 待任务在关机时完成--表明等待所有线程执行完
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        // 线程名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
        // 设置拒绝策略,目前是直接拒绝
        /**
         * 到线程池的任务缓存队列已满并且线程池中的线程数到达了maxPoolSize,如果还有任务到来就会采取任务拒绝策略,
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但不抛出异常
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前边的任务,然后重新尝试执行任务(重复此过程)
         * TreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用execute()方法,直到成功
         */
        //初始化
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }


    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (throwable, method, objects) -> log.error(throwable.getMessage());
    }
}

2.指定服务类@Async的线程池可写可不写默认为自定义的(多个线城池需指定)

@Async
public void test() {
    ...
}

3.测试

service msinTest:main
Hello World
线程开始
2023-01-04 10:54:12.682 ERROR 27312 --- [         vchr-1] com.wangfan.config.AsyncConfig           : / by zero

线程名称已改变

3.继承AsyncConfigurerSupport

@Configuration
@Slf4j
//@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {

    public static final String EXECUTOR_NAME = "testExecutor";
    @Value("${thread-pool.config.corePoolSize:1}")
    private int corePoolSize;
    @Value("${thread-pool.config.maxPoolSize:2}")
    private int maxPoolSize;
    @Value("${thread-pool.config.queueCapacity:2}")
    private int queueCapacity;
    @Value("${thread-pool.config.threadNamePrefix:vchr-}")
    private String threadNamePrefix;

    @Bean(name = EXECUTOR_NAME)
    public Executor executor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        // 核心线程数
        threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
        // 最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
        // 阻塞队列容量
        threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
        // 待任务在关机时完成--表明等待所有线程执行完
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        // 线程名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix);
        // 设置拒绝策略,目前是直接拒绝
        /**
         * 到线程池的任务缓存队列已满并且线程池中的线程数到达了maxPoolSize,如果还有任务到来就会采取任务拒绝策略,
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但不抛出异常
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前边的任务,然后重新尝试执行任务(重复此过程)
         * TreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用execute()方法,直到成功
         */
        //初始化
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (throwable, method, objects) -> log.error(throwable.getMessage());
    }
}

2.指定服务类@Async的线程池可写可不写默认为自定义的(多个线城池需指定)

@Async
public void test() {
    ...
}

3.测试

service msinTest:main
Hello World
线程开始
2023-01-04 10:54:12.682 ERROR 27312 --- [         vchr-1] com.wangfan.config.AsyncConfig           : / by zero

线程名称已改变

标签:异步,config,任务,线程,注解,Async,threadPoolTaskExecutor,public,ThreadPoolExecutor
From: https://www.cnblogs.com/WangJingjun/p/17024293.html

相关文章