你应该知道的几种Spring boot异步调用方式
在工作中,我们可能会遇到一些需要异步调用的场景。如果你接手的是一个维护的项目,大概率这部分内容都是已经存在的。但是如果你需要搭建新的项目,异步的功能就不可或缺了。
Springboot项目的异步调用大概分为一下几种:
一、使用XXL-JOB等类似的框架
1.1 简介
XXL-JOB 是一个分布式任务调度平台,旨在解决大规模分布式系统中的任务调度和管理问题。它支持定时任务的调度和执行,支持多种执行方式(如 cron 表达式、简单定时、一次性任务等),并且具有高可用、容错、动态配置、任务分片等特性,广泛应用于微服务、分布式系统等场景。
1.2 主要特性
- Web 控制台:提供图形化界面来管理任务,包括定时任务的创建、修改、调度、监控等。
- 分布式调度:支持集群部署,通过任务分片和负载均衡实现任务的分布式执行。
- 高可用和容错:支持任务的失败重试,节点故障自动转移。
- 任务监控:提供实时任务执行情况、日志查看、报警等功能,帮助开发者监控任务执行。
- 多语言支持:支持 Java、Shell、Python 等多种语言的任务执行。
1.3 典型架构
- Admin:调度中心,负责调度管理、任务注册、执行日志记录等。
- Executor:任务执行器,负责实际的任务执行。
- Client:用于任务的调用,通常为应用服务。
1.4 简单示例
- 配置调度中心:启动 XXL-JOB Admin。
- 任务执行器:在 Spring Boot 项目中集成 XXL-JOB,使用注解 @XxlJob 来定义任务。
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.biz.model.ReturnT;
import org.springframework.stereotype.Component;
@Component
public class DemoJobHandler {
// 1. 使用 Cron 表达式配置任务
@XxlJob(value = "cronJobHandler", cron = "0 0 12 * * ?") // 每天12点执行
public ReturnT<String> cronJobHandler(String param) throws Exception {
System.out.println("Cron job executed at 12:00.");
return ReturnT.SUCCESS;
}
// 2. 使用简单定时配置任务
@XxlJob(value = "simpleJobHandler", cron = "0/10 * * * * ?") // 每10秒执行一次
public ReturnT<String> simpleJobHandler(String param) throws Exception {
System.out.println("Simple job executed every 10 seconds.");
return ReturnT.SUCCESS;
}
// 3. 一次性任务(无需Cron设置,手动触发)
@XxlJob("onceJobHandler") // 一次性执行
public ReturnT<String> onceJobHandler(String param) throws Exception {
System.out.println("Once job executed only once.");
return ReturnT.SUCCESS;
}
}
1.5 总结
XXL-JOB 是一个功能强大且灵活的任务调度平台,适合需要高可用、高并发任务调度的场景,尤其在分布式和微服务架构中表现优秀。
二、ThreadPoolTaskExecutor 结合@Async
2.1 简介
ThreadPoolTaskExecutor 是 Spring 提供的一个常用线程池实现,它实现了 TaskExecutor 接口,允许你在 Spring 应用中灵活地配置和管理线程池。它适合用于需要控制线程池大小和线程行为的场景,可以与 @Async 配合使用来管理异步任务的执行。
2.2 主要特性与配置项
- 线程池管理
ThreadPoolTaskExecutor 基于线程池进行任务执行,可以管理线程的创建、销毁、调度等操作,避免了频繁创建新线程的开销,提高了应用性能。 - 核心线程数(Core Pool Size)
核心线程数是线程池中始终存在的线程数,即使这些线程处于空闲状态也会保留在池中。ThreadPoolTaskExecutor 允许配置核心线程数,从而控制池中最小的线程数。 - 配置项:setCorePoolSize(int corePoolSize)
默认值:1 - 最大线程数(Max Pool Size)
最大线程数是线程池可以容纳的最大线程数。当线程池中所有的核心线程都在执行任务时,如果队列已满,则可以创建新线程来执行任务,直到达到最大线程数。超过最大线程数的任务将被拒绝。 - 配置项:setMaxPoolSize(int maxPoolSize)
默认值:Integer.MAX_VALUE - 线程空闲时间(Keep-Alive Time)
线程池中空闲线程的最大空闲时间。如果某个线程在指定的空闲时间内没有执行任务,它将被销毁,直到线程数达到核心线程数为止。 - 配置项:setKeepAliveSeconds(int keepAliveSeconds)
默认值:60(秒) - 任务队列(Queue Capacity)
线程池中的任务队列用于保存等待执行的任务。当线程池中的线程数达到核心线程数时,新的任务将被放入队列等待执行。任务队列的容量决定了能够同时排队等待的任务数量。 - 配置项:setQueueCapacity(int queueCapacity)
默认值:Integer.MAX_VALUE - 拒绝策略(RejectedExecutionHandler)
当线程池中的线程数达到最大线程数,且队列已满时,线程池将根据设置的拒绝策略来处理新提交的任务。ThreadPoolTaskExecutor 提供了多种拒绝策略:- CallerRunsPolicy:调用者线程执行该任务。
- AbortPolicy:直接抛出 RejectedExecutionException 异常,默认策略。
- DiscardPolicy:丢弃任务,不抛出异常。
- DiscardOldestPolicy:丢弃最旧的任务,执行新的任务。
- 配置项:setRejectedExecutionHandler(RejectedExecutionHandler handler)
默认值:AbortPolicy - 线程名称前缀(Thread Name Prefix)
ThreadPoolTaskExecutor 允许为池中的线程指定一个名称前缀,以便在日志或监控中轻松识别。 - 配置项:setThreadNamePrefix(String threadNamePrefix)
默认值:无 - 是否允许核心线程超时(Allow Core Thread TimeOut)
默认情况下,核心线程是不会被销毁的。设置为 true 后,核心线程也可以在空闲超时后被销毁。 - 配置项:setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut)
默认值:false - 初始化/销毁钩子(Lifecycle Hooks)
ThreadPoolTaskExecutor 实现了 InitializingBean 和 DisposableBean 接口,可以在初始化和销毁时执行特定操作。afterPropertiesSet() 会在线程池初始化时被调用,destroy() 在销毁时被调用。 - 异步任务执行(TaskExecutor 接口)
ThreadPoolTaskExecutor 实现了 Spring 的 TaskExecutor 接口,这使得它可以在 Spring 的异步任务执行环境中使用,例如,@Async 注解的异步方法。
2.3 简单示例
- 配置
- setCorePoolSize(int): 设置核心线程数。
- setMaxPoolSize(int): 设置最大线程数。
- setQueueCapacity(int): 设置队列容量,用于存储等待执行的任务。
- setThreadNamePrefix(String): 设置线程名称的前缀。
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 核心线程数
executor.setMaxPoolSize(10); // 最大线程数
executor.setQueueCapacity(25); // 队列容量
executor.setThreadNamePrefix("MyExecutor-"); // 线程名称前缀
executor.setWaitForTasksToCompleteOnShutdown(true); // 等待任务完成
executor.setKeepAliveSeconds(60); // 线程保持活动的时间
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
return executor;
}
- 用法
@Service
public class AsyncService {
@Async("taskExecutor") // 使用自定义线程池
public void performTask() {
System.out.println("Executing task in thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
三、ListenableFuture
3.1 简介
ListenableFuture 是 Guava 提供的一个接口,扩展了 java.util.concurrent.Future,主要用于异步编程。与 Future 不同,ListenableFuture 支持注册回调方法,在任务执行完毕时自动执行,而无需显式地调用 get() 方法来获取结果。
3.2 主要特性
- 回调机制:可以通过 addListener() 方法为任务添加回调函数,这些回调会在任务完成后被执行。
- 支持取消:与 Future 一样,支持取消任务(cancel())。
- 支持异步结果:可以通过 get() 或 get(long timeout, TimeUnit unit) 获取任务结果,但通常建议通过回调来处理结果,避免阻塞。
- 异步处理:可以与 ListeningExecutorService 配合使用,使得任务执行和结果处理完全异步。
3.3 核心方法
- addListener(Runnable listener, Executor executor):为任务添加回调方法,任务执行完成时会调用回调。
- get():阻塞方法,等待任务执行完成并返回结果。
- get(long timeout, TimeUnit unit):带有超时限制的阻塞方法。
3.4 简单示例
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ListenableFutureExample {
public static void main(String[] args) throws Exception {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
ListenableFuture<Integer> future = executorService.submit(() -> {
TimeUnit.SECONDS.sleep(2);
return 42; // 任务返回的结果
});
// 注册回调函数
future.addListener(() -> {
try {
System.out.println("Result: " + future.get()); // 获取结果
} catch (Exception e) {
e.printStackTrace();
}
}, executorService);
// 其他操作
System.out.println("Task submitted.");
}
}
3.5 总结
ListenableFuture 提供了更灵活的异步编程接口,尤其适合需要异步回调处理的场景。通过与 ListeningExecutorService 配合使用,能够实现任务的非阻塞执行和结果处理。
四、使用异步 Web 请求处理
4.1 简介
DeferredResult 是 Spring MVC 提供的一种异步请求处理机制,可以帮助我们在 Web 请求处理中异步地返回结果,适用于需要处理时间较长或耗时操作的场景。通过 DeferredResult,Spring MVC 可以将请求处理的线程释放给容器进行其他操作,直到业务逻辑处理完毕后再将结果返回给客户端。这个特性特别适合那些需要等待外部系统调用(如远程服务、数据库等)返回结果的场景。
4.2 DeferredResult 的工作原理
- 异步处理: 当请求到达时,Spring MVC 将请求交给一个异步处理线程。在处理的过程中,Web 请求线程并不会被阻塞,可以处理其他请求。
- 调用 setResult: 当业务逻辑处理完毕后,我们通过调用 DeferredResult 的 setResult 或 setErrorResult 方法将处理结果返回给客户端,通知客户端请求已完成。
- 超时控制: 可以设置超时时间,如果请求在规定时间内未完成,可以设置超时处理。
4.3 典型的使用场景
- 调用远程服务(例如微服务间通信)。
- 执行长时间的数据库操作。
- 等待外部事件或消息。
4.4 基本步骤
- 使用 @RequestMapping 或 @GetMapping 等注解标注一个控制器方法,返回类型为 DeferredResult。
- 在方法内部处理异步业务逻辑,并在业务完成后调用 DeferredResult.setResult() 或 DeferredResult.setErrorResult()。
4.5 示例代码
假设我们有一个异步服务,它会模拟一个延迟操作,例如模拟调用一个远程服务,然后返回数据。
- 控制器实现异步处理
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@RestController
@EnableAsync // 启用异步支持
public class AsyncController {
@Autowired
private AsyncService asyncService;
// 返回 DeferredResult,表示异步请求
@GetMapping("/async")
public DeferredResult<String> handleAsyncRequest() {
// 创建 DeferredResult 对象,设置超时时间
DeferredResult<String> deferredResult = new DeferredResult<>(5000L, "Timeout occurred");
// 调用异步方法处理
asyncService.processAsyncTask(deferredResult);
// 返回 DeferredResult
return deferredResult;
}
}
- 异步服务处理
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.TimeUnit;
@Service
public class AsyncService {
@Async
public void processAsyncTask(DeferredResult<String> deferredResult) {
try {
// 模拟耗时操作,如外部服务调用
TimeUnit.SECONDS.sleep(3); // 延迟3秒
// 操作完成后,调用 setResult 返回结果
deferredResult.setResult("Task Completed Successfully!");
} catch (InterruptedException e) {
// 错误处理,调用 setErrorResult
deferredResult.setErrorResult("Error Occurred");
}
}
}
- 配置异步请求支持
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
@EnableWebMvc
public class WebConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
// 配置异步超时时间
configurer.setDefaultTimeout(5000L);
}
}
4.6 重要方法解析
- setResult(T result):当异步操作完成并准备好响应时调用该方法,传递结果数据给客户端。
- setErrorResult(Object errorResult):如果异步操作出现错误,可以调用此方法传递错误信息。
- DeferredResult(long timeout, Object timeoutResult):可以设置超时处理。如果请求在指定时间内未完成,将返回一个默认的超时结果。
例如,DeferredResultdeferredResult = new DeferredResult<>(5000L, "Timeout occurred"); 表示请求的超时时间为 5 秒,超时后将返回 "Timeout occurred"。 - getResult():可以在后续处理中获取处理结果,但一般推荐在异步回调方法中处理结果。
4.7 异常处理
可以在异步操作中捕获异常,并使用 setErrorResult 返回错误信息。
try {
// 模拟耗时操作
TimeUnit.SECONDS.sleep(3);
deferredResult.setResult("Success");
} catch (Exception e) {
deferredResult.setErrorResult("Error: " + e.getMessage());
}
4.8 配置超时
DeferredResult<String> deferredResult = new DeferredResult<>(5000L, "Timeout Occurred");
// 这会设置5秒的超时时间,超时后返回 "Timeout Occurred"。
4.9 结合 @Async 实现更复杂的异步逻辑
Spring 支持使用 @Async 注解来简化异步操作。通过 @Async 可以让方法在独立的线程中执行,从而不会阻塞主线程。
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
@Service
public class AsyncService {
@Async
public void processAsyncTask(DeferredResult<String> deferredResult) {
try {
// 模拟长时间处理
Thread.sleep(5000);
deferredResult.setResult("Processing Complete");
} catch (InterruptedException e) {
deferredResult.setErrorResult("Error occurred during processing");
}
}
}
4.10 总结
- DeferredResult 是 Spring MVC 提供的用于处理异步请求的工具。它能够让请求不再阻塞主线程,处理完成后再通过 setResult 或 setErrorResult 返回结果或错误。
- @Async 和 DeferredResult 配合使用,可以更容易地处理需要长时间等待的异步请求,避免阻塞请求线程,提高应用的响应能力。
- 配置超时和错误处理,确保在异步操作中出现问题时能够及时响应。
这种机制对于需要高性能和响应快速的 Web 应用非常有用,特别是当涉及到远程服务调用、复杂的计算任务或耗时操作时。