SpringMVC开启异步web要在web.xml文件中每个filter和servlet配置
FrameworkServlet.processRequest(HttpServletRequest request, HttpServletResponse response)
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.registerCallableInterceptor(FrameworkServlet.class.getName(), new RequestBindingInterceptor());
获取WebAsyncManager并设置CallableInterceptor。RequestBindingInterceptor实现了CallableProcessingInterceptor接口,在获取Callable异步结果之前调用preProcess设置LocaleContext和RequestAttributes。在获取Callable异步结果之后调用postProcess重置LocaleContext和RequestAttributes。
RequestMappingHandlerAdapter.invokeHandlerMethod(HttpServletRequest request,HttpServletResponse response, HandlerMethod handlerMethod)
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);
ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}
return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}
RequestMappingHandlerAdapter处理handle方法时,调用 WebAsyncUtils.createAsyncWebRequest创建异步web请求,类型是StandardServletAsyncWebRequest。异步web请求设置Timeout。获取WebAsyncManager。WebAsyncManager设置任务执行器,任务执行器类型是SimpleAsyncTaskExecutor。线程名前缀是MvcAsync。将异步web请求设置到WebAsyncManager。设置callableInterceptors(new CallableProcessingInterceptor[0])到WebAsyncManager。deferredResultInterceptors(DeferredResultProcessingInterceptor[0])设置到WebAsyncManager。
如果WebAsyncManager有异步结果,则调用获取异步结果后调用invocableMethod.wrapConcurrentResult包装异步结果。SpringMVC执行异步web的流程是创建AsyncContext。由AsyncContext开启异步web。AsyncContext添加监听器处理请求执行完成和超时。AsyncContext.dispatch()分发请求原来的请求,第一次请求获取异步任务,第二次请求获取异步任务里的结果。当调用第一次请求后将ConcurrentResult设置到WebAsyncManager。AsyncContext.dispatch()分发请求后,第二次请求asyncManager.hasConcurrentResult()为true,invocableMethod.wrapConcurrentResult将ConcurrentResult放到Callable中,invocableMethod.invokeAndHandle去Callable中获取结果。
ServletInvocableHandlerMethod.wrapConcurrentResult(Object result)
ServletInvocableHandlerMethod wrapConcurrentResult(Object result) {
return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result));
}
ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType)
public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
super((Callable<Object>) () -> {
if (result instanceof Exception) {
throw (Exception) result;
}
else if (result instanceof Throwable) {
throw new NestedServletException("Async processing failed", (Throwable) result);
}
return result;
}, CALLABLE_METHOD);
if (ServletInvocableHandlerMethod.this.returnValueHandlers != null) {
setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
}
this.returnType = returnType;
}
将ConcurrentResult封装到Callable中。
异步web第一次请求时,ServletInvocableHandlerMethod.invokeAndHandle执行handle方法获取返回值,调用returnValueHandlers.handleReturnValue处理返回值。Callable类型的返回值CallableMethodReturnValueHandler返回值解析器处理。
CallableMethodReturnValueHandler.handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest)
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
Callable<?> callable = (Callable<?>) returnValue;
WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
}
获取WebAsyncManager并调用startCallableProcessing。
WebAsyncManager.startCallableProcessing(Callable<?> callable, Object... processingContext)
public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception {
Assert.notNull(callable, "Callable must not be null");
startCallableProcessing(new WebAsyncTask(callable), processingContext);
}
WebAsyncManager.startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
throws Exception {
Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
Long timeout = webAsyncTask.getTimeout();
if (timeout != null) {
this.asyncWebRequest.setTimeout(timeout);
}
AsyncTaskExecutor executor = webAsyncTask.getExecutor();
if (executor != null) {
this.taskExecutor = executor;
}
else {
logExecutorWarning();
}
List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
interceptors.add(webAsyncTask.getInterceptor());
interceptors.addAll(this.callableInterceptors.values());
interceptors.add(timeoutCallableInterceptor);
final Callable<?> callable = webAsyncTask.getCallable();
final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
System.out.println("处理超时");
if (logger.isDebugEnabled()) {
logger.debug("Async request timeout for " + formatRequestUri());
}
Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
if (result != CallableProcessingInterceptor.RESULT_NONE) {
setConcurrentResultAndDispatch(result);
}
});
this.asyncWebRequest.addErrorHandler(ex -> {
if (!this.errorHandlingInProgress) {
if (logger.isDebugEnabled()) {
logger.debug("Async request error for " + formatRequestUri() + ": " + ex);
}
Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
setConcurrentResultAndDispatch(result);
}
});
this.asyncWebRequest.addCompletionHandler(() ->
interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable));
interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
startAsyncProcessing(processingContext);
try {
Future<?> future = this.taskExecutor.submit(() -> {
Object result = null;
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
result = callable.call();
}
catch (Throwable ex) {
result = ex;
}
finally {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
}
setConcurrentResultAndDispatch(result);
});
interceptorChain.setTaskFuture(future);
}
catch (RejectedExecutionException ex) {
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
setConcurrentResultAndDispatch(result);
throw ex;
}
}
1、设置timeout和executor。
2、设置List
- WebAsyncTask.getInterceptor()创建的CallableProcessingInterceptor
- this.callableInterceptors.values(),即FrameworkServlet内部类RequestBindingInterceptor
- TimeoutCallableProcessingInterceptor处理超时
3、asyncWebRequest添加超时处理。超时处理调用CallableInterceptorChain的triggerAfterTimeout。调用setConcurrentResultAndDispatch设置concurrentResult结果和分发请求。
4、asyncWebRequest添加错误处理。错误处理调用CallableInterceptorChain的triggerAfterError,调用setConcurrentResultAndDispatch设置concurrentResult结果和分发请求。
5、asyncWebRequest添加请求完成处理。请求完成后调用CallableInterceptorChain的triggerAfterCompletion。
6、调用CallableInterceptorChain的applyBeforeConcurrentHandling方法,调用异步处理的前置方法。
7、startAsyncProcessing开启异步web。
8、taskExecutor执行异步任务。taskExecutor是SimpleAsyncTaskExecutor。在调用callable.call()方法之前调用CallableInterceptorChain的applyPreProcess,执行获取异步结果的前置方法。执行完callable.call()后调用CallableInterceptorChain的applyPostProcess后置方法。
WebAsyncManager.startAsyncProcessing(Object[] processingContext)
private void startAsyncProcessing(Object[] processingContext) {
synchronized (WebAsyncManager.this) {
this.concurrentResult = RESULT_NONE;
this.concurrentResultContext = processingContext;
this.errorHandlingInProgress = false;
}
this.asyncWebRequest.startAsync();
if (logger.isDebugEnabled()) {
logger.debug("Started async request");
}
}
调用asyncWebRequest.startAsync()开启异步web。
StandardServletAsyncWebRequest.startAsync()
public void startAsync() {
Assert.state(getRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +
"in async request processing. This is done in Java code using the Servlet API " +
"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
"filter declarations in web.xml.");
Assert.state(!isAsyncComplete(), "Async processing has already completed");
if (isAsyncStarted()) {
return;
}
this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
this.asyncContext.addListener(this);
if (this.timeout != null) {
this.asyncContext.setTimeout(this.timeout);
}
}
调用startAsync开启异步web。asyncContext添加监听器和设置超时时间。
StandardServletAsyncWebRequest
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
System.out.println("异步web开始:" + event);
}
@Override
public void one rror(AsyncEvent event) throws IOException {
this.exceptionHandlers.forEach(consumer -> consumer.accept(event.getThrowable()));
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
this.timeoutHandlers.forEach(Runnable::run);
}
@Override
public void onComplete(AsyncEvent event) throws IOException {
this.completionHandlers.forEach(Runnable::run);
this.asyncContext = null;
this.asyncCompleted.set(true);
}
以上四个方法是监听器处理方法。asyncCompleted表示request是否完成。
WebAsyncManager.setConcurrentResultAndDispatch(Object result)
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
if (this.concurrentResult != RESULT_NONE) {
return;
}
this.concurrentResult = result;
this.errorHandlingInProgress = (result instanceof Throwable);
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + formatRequestUri());
}
return;
}
if (logger.isDebugEnabled()) {
boolean isError = result instanceof Throwable;
logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
}
this.asyncWebRequest.dispatch();
}
设置concurrentResult和调用asyncWebRequest.dispatch分发请求。如果是taskExecutor.submit正常执行完或出现错误则由taskExecutor里的线程调用此方法。线程名前缀是MvcAsync。如果是超时则由主线程调用此方法。
StandardServletAsyncWebRequest.dispatch()
public void dispatch() {
Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext");
System.out.println("分发请求:"+Thread.currentThread());
this.asyncContext.dispatch();
}
调用asyncContext.dispatch分发新的请求到原来的请求。
SimpleAsyncTaskExecutor.submit(Runnable task)
public Future<?> submit(Runnable task) {
FutureTask<Object> future = new FutureTask<>(task, null);
execute(future, TIMEOUT_INDEFINITE);
return future;
}
执行异步web的任务。
SimpleAsyncTaskExecutor.execute(Runnable task, long startTimeout)
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
public Thread createThread(Runnable runnable) {
Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
thread.setPriority(getThreadPriority());
thread.setDaemon(isDaemon());
return thread;
}
线程前缀是MvcAsync。
标签:异步,web,SpringMVC,Object,Callable,WebAsyncManager,result,asyncWebRequest,null From: https://www.cnblogs.com/shigongp/p/16885526.html