简单研究下spring 长轮训 DeferredResult 的用法以及简单的原理。 如果让自己设计,可能就是会用一些异步+spring的扩展机制来实现。
1. DeferredResult简单用法
1. 新建测试类
package cn.qz.template.controller;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.UUID;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class AsycnController {
// 维护队列
private ConcurrentLinkedDeque<DeferredResult<String>> deferredResults =
new ConcurrentLinkedDeque<DeferredResult<String>>();
// 处理线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(10,
new ThreadFactoryBuilder().setNameFormat("custom-pool-%d").build());
@RequestMapping("/getResult")
@ResponseBody
public DeferredResult<String> getDeferredResultController() {
final String requestId = UUID.fastUUID().toString(true);
printStr("收到请求\t" + requestId);
final String message = "defaultValue" + requestId;
// 设置 5秒就会超时
final DeferredResult<String> stringDeferredResult = new DeferredResult<String>(3000L);
// 也可以直接设置默认值
// final DeferredResult<String> stringDeferredResult1 = new DeferredResult<String>(3000L, message);
//将请求加入到队列中
deferredResults.add(stringDeferredResult);
// 正常处理
executorService.submit(new Runnable() {
@Override
public void run() {
try {
int time = new Random().nextInt(10) * 1000;
printStr("休眠: " + time + "\t" + requestId);
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
//业务处理
printStr("业务处理完成\t" + requestId);
stringDeferredResult.setResult(message);
}
});
// setResult完毕之后,调用该方法
stringDeferredResult.onCompletion(new Runnable() {
@Override
public void run() {
printStr("异步调用完成\t" + requestId);
//响应完毕之后,将请求从队列中去除掉
deferredResults.remove(stringDeferredResult);
}
});
stringDeferredResult.onTimeout(new Runnable() {
@Override
public void run() {
printStr("业务处理超时\t" + requestId);
stringDeferredResult.setResult("error:timeOut\t" + requestId);
}
});
return stringDeferredResult;
}
private void printStr(String str) {
System.out.println(DateUtil.format(new Date(), "HH:mm:ss") + "\tThreadName :" + Thread.currentThread().getName() + "\t" + str);
}
}
2. 测试后查看日志:
第一次:
2023-01-08 15:17:28.429 | [http-nio-8090-exec-7] INFO cn.qz.template.config.ControllerAspect - 请求方法:DeferredResult cn.qz.template.controller.AsycnController.getDeferredResultController()
15:17:28 ThreadName :http-nio-8090-exec-7 收到请求 5b3753150abf4a16b16c1e5ac9d2f932
2023-01-08 15:17:28.431 | [http-nio-8090-exec-7] INFO cn.qz.template.config.ControllerAspect - 请求完毕:DeferredResult cn.qz.template.controller.AsycnController.getDeferredResultController()=>org.springframework.web.context.request.async.DeferredResult@69317f18
15:17:28 ThreadName :custom-pool-2 休眠: 5000 5b3753150abf4a16b16c1e5ac9d2f932
15:17:32 ThreadName :http-nio-8090-exec-8 业务处理超时 5b3753150abf4a16b16c1e5ac9d2f932
15:17:32 ThreadName :http-nio-8090-exec-8 异步调用完成 5b3753150abf4a16b16c1e5ac9d2f932
15:17:33 ThreadName :custom-pool-2 业务处理完成 5b3753150abf4a16b16c1e5ac9d2f932
第二次
2023-01-08 15:18:39.470 | [http-nio-8090-exec-1] INFO cn.qz.template.config.ControllerAspect - 请求方法:DeferredResult cn.qz.template.controller.AsycnController.getDeferredResultController()
15:18:39 ThreadName :http-nio-8090-exec-1 收到请求 0bf4f5cad145401f9aa8e4a9e37fede0
2023-01-08 15:18:39.474 | [http-nio-8090-exec-1] INFO cn.qz.template.config.ControllerAspect - 请求完毕:DeferredResult cn.qz.template.controller.AsycnController.getDeferredResultController()=>org.springframework.web.context.request.async.DeferredResult@42ff9f36
15:18:39 ThreadName :custom-pool-3 休眠: 2000 0bf4f5cad145401f9aa8e4a9e37fede0
15:18:41 ThreadName :custom-pool-3 业务处理完成 0bf4f5cad145401f9aa8e4a9e37fede0
15:18:41 ThreadName :http-nio-8090-exec-2 异步调用完成 0bf4f5cad145401f9aa8e4a9e37fede0
第三次:
2023-01-08 15:21:20.260 | [http-nio-8090-exec-7] INFO cn.qz.template.config.ControllerAspect - 请求方法:DeferredResult cn.qz.template.controller.AsycnController.getDeferredResultController()
15:21:20 ThreadName :http-nio-8090-exec-7 收到请求 3bad228d702b40a29f1188010aaa15c6
2023-01-08 15:21:20.261 | [http-nio-8090-exec-7] INFO cn.qz.template.config.ControllerAspect - 请求完毕:DeferredResult cn.qz.template.controller.AsycnController.getDeferredResultController()=>org.springframework.web.context.request.async.DeferredResult@1dd50481
15:21:20 ThreadName :custom-pool-5 休眠: 1000 3bad228d702b40a29f1188010aaa15c6
15:21:21 ThreadName :custom-pool-5 业务处理完成 3bad228d702b40a29f1188010aaa15c6
15:21:21 ThreadName :http-nio-8090-exec-8 异步调用完成 3bad228d702b40a29f1188010aaa15c6
3. 总结:
1》可以看到,对于spring 框架调度来说,是先反射调用方法,然后返回结果。但是对于前端来说请求还是阻塞的,没有拿到结果
2》从线程的角度来说,业务处理是用的自己的线程池,onCompletion 完成和onTimeout 超时是用的tomcat 的线程池。
2. 原理简单理解
1. 关于spring 对返回结果是 DeferredResult 的拦截以及处理
断点下在org.springframework.web.context.request.async.DeferredResult#setResultHandler 方法,查看调用链:
可以看到核心也是走SpringMVC的任务分发。 然后对搜集到的结果进行处理。几个关键逻辑如下:
1》org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter#invokeHandlerMethod
@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ServletWebRequest webRequest = new ServletWebRequest(request, response);
Object result;
try {
WebDataBinderFactory binderFactory = this.getDataBinderFactory(handlerMethod);
ModelFactory modelFactory = this.getModelFactory(handlerMethod, binderFactory);
ServletInvocableHandlerMethod invocableMethod = this.createInvocableHandlerMethod(handlerMethod);
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
if (asyncManager.hasConcurrentResult()) {
result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer)asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(this.logger, (traceOn) -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
invocableMethod.invokeAndHandle(webRequest, mavContainer, new Object[0]);
if (!asyncManager.isConcurrentHandlingStarted()) {
ModelAndView var15 = this.getModelAndView(mavContainer, modelFactory, webRequest);
return var15;
}
result = null;
} finally {
webRequest.requestCompleted();
}
return (ModelAndView)result;
}
可以看到这里有就是获取ModelAndView 对象。
2》 invocableMethod.invokeAndHandle(webRequest, mavContainer, new Object[0]); 会调用到 org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod#invokeAndHandle
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer, Object... providedArgs) throws Exception {
Object returnValue = this.invokeForRequest(webRequest, mavContainer, providedArgs);
this.setResponseStatus(webRequest);
if (returnValue == null) {
if (this.isRequestNotModified(webRequest) || this.getResponseStatus() != null || mavContainer.isRequestHandled()) {
this.disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
} else if (StringUtils.hasText(this.getResponseStatusReason())) {
mavContainer.setRequestHandled(true);
return;
}
mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {
this.returnValueHandlers.handleReturnValue(returnValue, this.getReturnValueType(returnValue), mavContainer, webRequest);
} catch (Exception var6) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.formatErrorForReturnValue(returnValue), var6);
}
throw var6;
}
}
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod#returnValueHandlers 对于响应结果的处理有如下处理器:
3》继续调用到:org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite#handleReturnValue
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
HandlerMethodReturnValueHandler handler = this.selectHandler(returnValue, returnType);
if (handler == null) {
throw new IllegalArgumentException("Unknown return value type: " + returnType.getParameterType().getName());
} else {
handler.handleReturnValue(returnValue, returnType, mavContainer, webRequest);
}
}
实际就是根据返回类型选择合适的处理器,然后调用 handleReturnValue 方法进行后置处理。这里调用到:org.springframework.web.servlet.mvc.method.annotation.DeferredResultMethodReturnValueHandler
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.springframework.web.servlet.mvc.method.annotation;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
public class DeferredResultMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
public DeferredResultMethodReturnValueHandler() {
}
public boolean supportsReturnType(MethodParameter returnType) {
Class<?> type = returnType.getParameterType();
return DeferredResult.class.isAssignableFrom(type) || ListenableFuture.class.isAssignableFrom(type) || CompletionStage.class.isAssignableFrom(type);
}
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
} else {
DeferredResult result;
if (returnValue instanceof DeferredResult) {
result = (DeferredResult)returnValue;
} else if (returnValue instanceof ListenableFuture) {
result = this.adaptListenableFuture((ListenableFuture)returnValue);
} else {
if (!(returnValue instanceof CompletionStage)) {
throw new IllegalStateException("Unexpected return value type: " + returnValue);
}
result = this.adaptCompletionStage((CompletionStage)returnValue);
}
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, new Object[]{mavContainer});
}
}
private DeferredResult<Object> adaptListenableFuture(ListenableFuture<?> future) {
final DeferredResult<Object> result = new DeferredResult();
future.addCallback(new ListenableFutureCallback<Object>() {
public void onSuccess(@Nullable Object value) {
result.setResult(value);
}
public void onFailure(Throwable ex) {
result.setErrorResult(ex);
}
});
return result;
}
private DeferredResult<Object> adaptCompletionStage(CompletionStage<?> future) {
DeferredResult<Object> result = new DeferredResult();
future.handle((value, ex) -> {
if (ex != null) {
if (ex instanceof CompletionException && ex.getCause() != null) {
ex = ex.getCause();
}
result.setErrorResult(ex);
} else {
result.setResult(value);
}
return null;
});
return result;
}
}
View Code
4》实际最终都会交给org.springframework.web.context.request.async.WebAsyncManager 类进行处理:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.springframework.web.context.request.async;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
public final class WebAsyncManager {
private static final Object RESULT_NONE = new Object();
private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR = new SimpleAsyncTaskExecutor(WebAsyncManager.class.getSimpleName());
private static final Log logger = LogFactory.getLog(WebAsyncManager.class);
private static final CallableProcessingInterceptor timeoutCallableInterceptor = new TimeoutCallableProcessingInterceptor();
private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor();
private static Boolean taskExecutorWarning = true;
private AsyncWebRequest asyncWebRequest;
private AsyncTaskExecutor taskExecutor;
private volatile Object concurrentResult;
private volatile Object[] concurrentResultContext;
private final Map<Object, CallableProcessingInterceptor> callableInterceptors;
private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors;
WebAsyncManager() {
this.taskExecutor = DEFAULT_TASK_EXECUTOR;
this.concurrentResult = RESULT_NONE;
this.callableInterceptors = new LinkedHashMap();
this.deferredResultInterceptors = new LinkedHashMap();
}
public void setAsyncWebRequest(AsyncWebRequest asyncWebRequest) {
Assert.notNull(asyncWebRequest, "AsyncWebRequest must not be null");
this.asyncWebRequest = asyncWebRequest;
this.asyncWebRequest.addCompletionHandler(() -> {
asyncWebRequest.removeAttribute(WebAsyncUtils.WEB_ASYNC_MANAGER_ATTRIBUTE, 0);
});
}
public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
public boolean isConcurrentHandlingStarted() {
return this.asyncWebRequest != null && this.asyncWebRequest.isAsyncStarted();
}
public boolean hasConcurrentResult() {
return this.concurrentResult != RESULT_NONE;
}
public Object getConcurrentResult() {
return this.concurrentResult;
}
public Object[] getConcurrentResultContext() {
return this.concurrentResultContext;
}
@Nullable
public CallableProcessingInterceptor getCallableInterceptor(Object key) {
return (CallableProcessingInterceptor)this.callableInterceptors.get(key);
}
@Nullable
public DeferredResultProcessingInterceptor getDeferredResultInterceptor(Object key) {
return (DeferredResultProcessingInterceptor)this.deferredResultInterceptors.get(key);
}
public void registerCallableInterceptor(Object key, CallableProcessingInterceptor interceptor) {
Assert.notNull(key, "Key is required");
Assert.notNull(interceptor, "CallableProcessingInterceptor is required");
this.callableInterceptors.put(key, interceptor);
}
public void registerCallableInterceptors(CallableProcessingInterceptor... interceptors) {
Assert.notNull(interceptors, "A CallableProcessingInterceptor is required");
CallableProcessingInterceptor[] var2 = interceptors;
int var3 = interceptors.length;
for(int var4 = 0; var4 < var3; ++var4) {
CallableProcessingInterceptor interceptor = var2[var4];
String key = interceptor.getClass().getName() + ":" + interceptor.hashCode();
this.callableInterceptors.put(key, interceptor);
}
}
public void registerDeferredResultInterceptor(Object key, DeferredResultProcessingInterceptor interceptor) {
Assert.notNull(key, "Key is required");
Assert.notNull(interceptor, "DeferredResultProcessingInterceptor is required");
this.deferredResultInterceptors.put(key, interceptor);
}
public void registerDeferredResultInterceptors(DeferredResultProcessingInterceptor... interceptors) {
Assert.notNull(interceptors, "A DeferredResultProcessingInterceptor is required");
DeferredResultProcessingInterceptor[] var2 = interceptors;
int var3 = interceptors.length;
for(int var4 = 0; var4 < var3; ++var4) {
DeferredResultProcessingInterceptor interceptor = var2[var4];
String key = interceptor.getClass().getName() + ":" + interceptor.hashCode();
this.deferredResultInterceptors.put(key, interceptor);
}
}
public void clearConcurrentResult() {
synchronized(this) {
this.concurrentResult = RESULT_NONE;
this.concurrentResultContext = null;
}
}
public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception {
Assert.notNull(callable, "Callable must not be null");
this.startCallableProcessing(new WebAsyncTask(callable), processingContext);
}
public void startCallableProcessing(WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {
Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
Long timeout = webAsyncTask.getTimeout();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
AsyncTaskExecutor executor = webAsyncTask.getExecutor();
if (executor != null) {
this.taskExecutor = executor;
} else {
this.logExecutorWarning();
}
List<CallableProcessingInterceptor> interceptors = new ArrayList();
interceptors.add(webAsyncTask.getInterceptor());
interceptors.addAll(this.callableInterceptors.values());
interceptors.add(timeoutCallableInterceptor);
Callable<?> callable = webAsyncTask.getCallable();
CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Async request timeout for " + this.formatRequestUri());
}
Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
if (result != CallableProcessingInterceptor.RESULT_NONE) {
this.setConcurrentResultAndDispatch(result);
}
});
this.asyncWebRequest.addErrorHandler((ex) -> {
if (logger.isDebugEnabled()) {
logger.debug("Async request error for " + this.formatRequestUri() + ": " + ex);
}
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
result = result != CallableProcessingInterceptor.RESULT_NONE ? result : ex;
this.setConcurrentResultAndDispatch(result);
});
this.asyncWebRequest.addCompletionHandler(() -> {
interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable);
});
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
this.startAsyncProcessing(processingContext);
try {
Future<?> future = this.taskExecutor.submit(() -> {
Object result = null;
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
result = callable.call();
} catch (Throwable var8) {
result = var8;
} finally {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
}
this.setConcurrentResultAndDispatch(result);
});
interceptorChain.setTaskFuture(future);
} catch (RejectedExecutionException var10) {
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, var10);
this.setConcurrentResultAndDispatch(result);
throw var10;
}
}
private void logExecutorWarning() {
if (taskExecutorWarning && logger.isWarnEnabled()) {
synchronized(DEFAULT_TASK_EXECUTOR) {
AsyncTaskExecutor executor = this.taskExecutor;
if (taskExecutorWarning && (executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor)) {
String executorTypeName = executor.getClass().getSimpleName();
logger.warn("\n!!!\nAn Executor is required to handle java.util.concurrent.Callable return values.\nPlease, configure a TaskExecutor in the MVC config under \"async support\".\nThe " + executorTypeName + " currently in use is not suitable under load.\n-------------------------------\nRequest URI: '" + this.formatRequestUri() + "'\n!!!");
taskExecutorWarning = false;
}
}
}
}
private String formatRequestUri() {
HttpServletRequest request = (HttpServletRequest)this.asyncWebRequest.getNativeRequest(HttpServletRequest.class);
return request != null ? request.getRequestURI() : "servlet container";
}
private void setConcurrentResultAndDispatch(Object result) {
synchronized(this) {
if (this.concurrentResult != RESULT_NONE) {
return;
}
this.concurrentResult = result;
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + this.formatRequestUri());
}
} else {
if (logger.isDebugEnabled()) {
boolean isError = result instanceof Throwable;
logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + this.formatRequestUri());
}
this.asyncWebRequest.dispatch();
}
}
public void startDeferredResultProcessing(DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
Assert.notNull(deferredResult, "DeferredResult must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
Long timeout = deferredResult.getTimeoutValue();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
List<DeferredResultProcessingInterceptor> interceptors = new ArrayList();
interceptors.add(deferredResult.getInterceptor());
interceptors.addAll(this.deferredResultInterceptors.values());
interceptors.add(timeoutDeferredResultInterceptor);
DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
try {
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
} catch (Throwable var4) {
this.setConcurrentResultAndDispatch(var4);
}
});
this.asyncWebRequest.addErrorHandler((ex) -> {
try {
if (!interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex)) {
return;
}
deferredResult.setErrorResult(ex);
} catch (Throwable var5) {
this.setConcurrentResultAndDispatch(var5);
}
});
this.asyncWebRequest.addCompletionHandler(() -> {
interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult);
});
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
this.startAsyncProcessing(processingContext);
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
deferredResult.setResultHandler((result) -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
this.setConcurrentResultAndDispatch(result);
});
} catch (Throwable var7) {
this.setConcurrentResultAndDispatch(var7);
}
}
private void startAsyncProcessing(Object[] processingContext) {
synchronized(this) {
this.concurrentResult = RESULT_NONE;
this.concurrentResultContext = processingContext;
}
this.asyncWebRequest.startAsync();
if (logger.isDebugEnabled()) {
logger.debug("Started async request");
}
}
}
View Code
5》再往下追代码就是使用一些多线程机制加Spring 的事务机制进行封装,调度,然后进行处理。
2. 休眠时间长点,多次调用查看其内部线程状态
可以看到其实 getDeferredResultController 方法处理的比较快,用tomcat 的最小10个空余工作线程是完全够用的。因为该方法内部没有任何的逻辑。也就是说该种异步模式不会增大tomcat 线程数量。
3. SpringMVC 对于ResponseBody 响应的处理
简单研究下对于ResponseBody 的处理。
1. SpringMVC核心逻辑解读:
(1). org.springframework.web.servlet.DispatcherServlet#doDispatch SpringMVC 核心逻辑
1》一系列判断,然后获取handler
2》反射调用方法,然后获取modelAndView 对象。 如果获取的ModelAndView 对象不为空,就进行界面渲染之后返回前端
(2). org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter#handleInternal 到这里反射执行方法,然后返回ModelAndView 对象。 核心也就是在这里,如果是自己已经处理完结果,那么返回的modelAndView 为null 即可。
protected ModelAndView handleInternal(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
this.checkRequest(request);
ModelAndView mav;
if (this.synchronizeOnSession) {
HttpSession session = request.getSession(false);
if (session != null) {
Object mutex = WebUtils.getSessionMutex(session);
synchronized(mutex) {
mav = this.invokeHandlerMethod(request, response, handlerMethod);
}
} else {
mav = this.invokeHandlerMethod(request, response, handlerMethod);
}
} else {
mav = this.invokeHandlerMethod(request, response, handlerMethod);
}
if (!response.containsHeader("Cache-Control")) {
if (this.getSessionAttributesHandler(handlerMethod).hasSessionAttributes()) {
this.applyCacheSeconds(response, this.cacheSecondsForSessionAttributeHandlers);
} else {
this.prepareResponse(response);
}
}
return mav;
}
(3). 继续调用到:org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter#invokeHandlerMethod
protected ModelAndView invokeHandlerMethod(HttpServletRequest request, HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ServletWebRequest webRequest = new ServletWebRequest(request, response);
ModelAndView var15;
try {
WebDataBinderFactory binderFactory = this.getDataBinderFactory(handlerMethod);
ModelFactory modelFactory = this.getModelFactory(handlerMethod, binderFactory);
ServletInvocableHandlerMethod invocableMethod = this.createInvocableHandlerMethod(handlerMethod);
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
Object result;
if (asyncManager.hasConcurrentResult()) {
result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer)asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(this.logger, (traceOn) -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
invocableMethod.invokeAndHandle(webRequest, mavContainer, new Object[0]);
if (asyncManager.isConcurrentHandlingStarted()) {
result = null;
return (ModelAndView)result;
}
var15 = this.getModelAndView(mavContainer, modelFactory, webRequest);
} finally {
webRequest.requestCompleted();
}
return var15;
}
View Code
1》org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod#invokeAndHandle 返回调用方法,执行对返回结果的处理
public void invokeAndHandle(ServletWebRequest webRequest, ModelAndViewContainer mavContainer, Object... providedArgs) throws Exception {
Object returnValue = this.invokeForRequest(webRequest, mavContainer, providedArgs);
this.setResponseStatus(webRequest);
if (returnValue == null) {
if (this.isRequestNotModified(webRequest) || this.getResponseStatus() != null || mavContainer.isRequestHandled()) {
this.disableContentCachingIfNecessary(webRequest);
mavContainer.setRequestHandled(true);
return;
}
} else if (StringUtils.hasText(this.getResponseStatusReason())) {
mavContainer.setRequestHandled(true);
return;
}
mavContainer.setRequestHandled(false);
Assert.state(this.returnValueHandlers != null, "No return value handlers");
try {
this.returnValueHandlers.handleReturnValue(returnValue, this.getReturnValueType(returnValue), mavContainer, webRequest);
} catch (Exception var6) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.formatErrorForReturnValue(returnValue), var6);
}
throw var6;
}
}
可以看到:反射调用获取到returnValue;然后org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite#handleReturnValue 从内置的一些返回结果处理器中获取handler 对结果进行处理
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
HandlerMethodReturnValueHandler handler = selectHandler(returnValue, returnType);
if (handler == null) {
throw new IllegalArgumentException("Unknown return value type: " + returnType.getParameterType().getName());
}
handler.handleReturnValue(returnValue, returnType, mavContainer, webRequest);
}
2》org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter#getModelAndView 获取MAV对象
private ModelAndView getModelAndView(ModelAndViewContainer mavContainer, ModelFactory modelFactory, NativeWebRequest webRequest) throws Exception {
modelFactory.updateModel(webRequest, mavContainer);
if (mavContainer.isRequestHandled()) {
return null;
} else {
ModelMap model = mavContainer.getModel();
ModelAndView mav = new ModelAndView(mavContainer.getViewName(), model, mavContainer.getStatus());
if (!mavContainer.isViewReference()) {
mav.setView((View)mavContainer.getView());
}
if (model instanceof RedirectAttributes) {
Map<String, ?> flashAttributes = ((RedirectAttributes)model).getFlashAttributes();
HttpServletRequest request = (HttpServletRequest)webRequest.getNativeRequest(HttpServletRequest.class);
if (request != null) {
RequestContextUtils.getOutputFlashMap(request).putAll(flashAttributes);
}
}
return mav;
}
}
可以看到这里有个核心的逻辑,如果 mavContainer.isRequestHandled() 请求处理完,那么返回为空。 也就是如果我们自己处理完,将这个参数设为True 即可。
2. org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor#handleReturnValue 对结果处理
1》org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite#handleReturnValue 获取到RequestResponseBodyMethodProcessor
2》调用 RequestResponseBodyMethodProcessor.handleReturnValue方法处理结果
RequestResponseBodyMethodProcessor 相关方法如下:
public boolean supportsReturnType(MethodParameter returnType) {
return AnnotatedElementUtils.hasAnnotation(returnType.getContainingClass(), ResponseBody.class) || returnType.hasMethodAnnotation(ResponseBody.class);
}
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws IOException, HttpMediaTypeNotAcceptableException, HttpMessageNotWritableException {
mavContainer.setRequestHandled(true);
ServletServerHttpRequest inputMessage = this.createInputMessage(webRequest);
ServletServerHttpResponse outputMessage = this.createOutputMessage(webRequest);
this.writeWithMessageConverters(returnValue, returnType, inputMessage, outputMessage);
}
可以看到处理过程就是调用 setRequestHandled 设为true, 然后获取到request、response 然后写回结果。
4. 扩展自己的返回结果处理器
参考接口以及实现类:
org.springframework.web.method.support.HandlerMethodReturnValueHandler
org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor
1. 编写处理类
package cn.qz.template.config;
import cn.qz.common.utils.TypedResult;
import org.springframework.core.MethodParameter;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.filter.ShallowEtagHeaderFilter;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletResponse;
public class CustomReturnHandler implements HandlerMethodReturnValueHandler {
@Override
public boolean supportsReturnType(MethodParameter methodParameter) {
return methodParameter.getMethod().getReturnType().equals(TypedResult.class);
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter methodParameter, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
mavContainer.setRequestHandled(true);
HttpServletResponse response = (HttpServletResponse) webRequest.getNativeResponse(HttpServletResponse.class);
Assert.state(response != null, "No HttpServletResponse");
ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
ServletRequest request = (ServletRequest) webRequest.getNativeRequest(ServletRequest.class);
Assert.state(request != null, "No ServletRequest");
ShallowEtagHeaderFilter.disableContentCaching(request);
// 简单处理,只拿出其Msg 属性,然后返回到前端
response.getWriter().write(((TypedResult) returnValue).getMsg());
response.getWriter().flush();
}
}
2. 注册到Spring
package cn.qz.template.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import java.util.List;
@Configuration
public class MyWebConfiguration implements WebMvcConfigurer {
@Override
public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> handlers) {
handlers.add(new CustomReturnHandler());
}
}
3. 添加测试类
package cn.qz.template.controller;
import cn.qz.common.utils.TypedResult;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
@RequestMapping("/test2")
@Controller
public class TestController2 {
@GetMapping
public TypedResult test() {
TypedResult<Object> objectTypedResult = new TypedResult<>();
objectTypedResult.setMsg("success");
return objectTypedResult;
}
}
4. 测试
qiao-zhi@qiao-zhideMBP ~ % curl http://localhost:8090/test2
success%
qiao-zhi@qiao-zhideMBP ~ % curl http://localhost:8090/test2
success%
qiao-zhi@qiao-zhideMBP ~ %
【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】