使用 Async 执行异步任务
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
声明异步任务
@Component
@Slf4j
public class AsyncTask {
/**
* 通知计数
* */
@Async("taskExecutor")
public void notifyCount() {
log.info("执行异步任务-通知计数开始");
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行异步任务-通知计数结束");
}
/**
* 通知邮件-带返回值
* */
@Async
public Future<String> notifyEmail() {
log.info("执行异步任务-通知邮件开始");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("执行异步任务-通知邮件结束");
return new AsyncResult<>("send email success");
}
}
@Async(“参数”): 参数用来定义哪个线程池执行
执行异步任务
@RestController
@RequestMapping("")
@Slf4j
public class AnalysController {
@Autowired
private AsyncTask asyncTask;
@RequestMapping(value = "index", method = RequestMethod.GET)
public XloResponse index() throws Exception {
// 通知计数
asyncTask.notifyCount();
// 通知邮件
Future<String> stringFuture = asyncTask.notifyEmail();
// TODO 业务逻辑处理
// 阻塞获取通知邮件的结果
log.info(stringFuture.get());
}
}
自定义执行异步任务的线程池信息
声明式
spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=50
spring.task.execution.pool.keep-alive=60s
spring.task.execution.pool.queue-capacity=2
spring.task.execution.thread-name-prefix=MyTask-
编程式
@Slf4j
// 开启异步任务支持
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
/**
* 处理异步方法调用时要使用的实例 <br /><br />
*
*
* 拒绝策略常用有有这四种
* <ul>
* <li>
* <b>ThreadPoolExecutor.AbortPolicy: </b>丢弃任务并抛出RejectedExecutionException异常(默认)
* </li>
* <li>
* <b>ThreadPoolExecutor.DiscardPolic: </b>丢弃任务,但是不抛出异常
* </li>
* <li>
* <b>ThreadPoolExecutor.DiscardOldestPolicy: </b>丢弃队列最前面的任务,然后重新尝试执行任务
* </li>
* <li>
* <b>ThreadPoolExecutor.CallerRunsPolic: </b>由调用线程处理该任务
* </li>
* </ul>
* */
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(10);
// 最大线程数
executor.setMaxPoolSize(60);
// 线程最大空闲时间
executor.setKeepAliveSeconds(60);
// 队列大小
executor.setQueueCapacity(10);
// 指定用于新创建的线程名称的前缀
executor.setThreadNamePrefix("MyExecutor-");
// 使用预定义拒绝策略
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 使用自定义拒绝策略
executor.setRejectedExecutionHandler((runnable, executor1) -> {
System.out.println("具体拒绝策略");
});
// 等待任务完成时再关闭线程池--表明等待所有线程执行完
// executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间(默认为0,此时立即停止), 并在等待xx秒后强制停止
// executor.setAwaitTerminationSeconds(60 * 5);
// 初始化
executor.initialize();
return executor;
}
/**
* 对void方法抛出的异常处理方法
* */
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// 使用自定义异常处理类 lambda方式
// return (throwable, method, objects) -> log.info("抛异常啦~~~throwable={},method={},params={}", throwable, method, objects);
// 使用自定义异常处理类
// return new MyAsyncExceptionHandler();
// 使用系统自带异常处理类
return new SimpleAsyncUncaughtExceptionHandler();
}
}
编程式 + 声明式
spring.async.executor.pool.core-size=10
spring.async.executor.pool.max-size=60
spring.async.executor.pool.keep-alive-seconds=60
spring.async.executor.pool.queue-capacity=10
spring.async.executor.pool.thread-name-prefix=XloTask-
@Slf4j
// 开启异步任务支持
@EnableAsync
@Configuration
@ConfigurationProperties(prefix="spring.async.executor.pool")
@Data
public class AsyncConfig implements AsyncConfigurer {
private Integer coreSize;
private Integer maxSize;
private Integer keepAliveSeconds;
private Integer queueCapacity;
private String threadNamePrefix;
/**
* 处理异步方法调用时要使用的实例
* */
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(coreSize);
// 最大线程数
executor.setMaxPoolSize(maxSize);
// 线程最大空闲时间
executor.setKeepAliveSeconds(keepAliveSeconds);
// 队列大小
executor.setQueueCapacity(queueCapacity);
// 指定用于新创建的线程名称的前缀
executor.setThreadNamePrefix(threadNamePrefix);
executor.initialize();
return executor;
}
/**
* Async无返回方法的异常处理方法
* */
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (throwable, method, objects) -> log.info("抛异常啦~~~throwable={},method={},params={}", throwable, method, objects);
}
}
自定义异常处理类
/**
* <p>
* 自定义异常处理类
* </p>
*
* @author Answer.AI.L
* @version 1.0
* @date 2019-10-09
*/
@Slf4j
public class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... params) {
log.info("Exception message - " + throwable.getMessage());
log.info("Method name - " + method.getName());
for (Object param: params) {
log.info("Parameter value - " + param);
}
}
}
使用WebAsyncTask执行异步任务
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setKeepAliveSeconds(2);
taskExecutor.setQueueCapacity(5);
taskExecutor.setThreadNamePrefix("asyncTask");
return taskExecutor;
}
@GetMapping("/asyncTask")
public WebAsyncTask<String> asyncTask() throws Exception {
Callable<String> callable = () -> {
log.info("异步工作线程:{}", Thread.currentThread().getName());
// 模拟业务耗时, 任务处理时间5s, 不超时
Thread.sleep(5 * 1000L);
return UUID.randomUUID().toString();
};
// 异步任务, 超时时间为10s
WebAsyncTask<String> asyncTask = new WebAsyncTask<>(
10 * 1000L,
executor,
callable
);
// 任务执行超时时调用该方法
asyncTask.onTimeout(() -> {
log.info("任务执行超时");
return "timeout";
});
// 任务执行完成时调用该方法
asyncTask.onCompletion(() -> {
log.info("任务执行完成");
});
// 任务执行异常时调用该方法
asyncTask.onError(() -> {
log.info("任务执行异常");
return "error";
});
return asyncTask;
}
Spring Boot 提供的 WebAsyncTask 的异步编程 API。相比上文介绍的 @Async 注解,WebAsyncTask 提供更加健全的 超时处理 和 异常处理 支持。 实战Spring Boot 2.0之WebAsyncTask