首页 > 其他分享 >SpringMVC-异步web返回Callable

SpringMVC-异步web返回Callable

时间:2022-11-13 14:45:22浏览次数:54  
标签:异步 web SpringMVC Object Callable WebAsyncManager result asyncWebRequest null

SpringMVC开启异步web要在web.xml文件中每个filter和servlet配置true

FrameworkServlet.processRequest(HttpServletRequest request, HttpServletResponse response)

WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.registerCallableInterceptor(FrameworkServlet.class.getName(), new RequestBindingInterceptor());

获取WebAsyncManager并设置CallableInterceptor。RequestBindingInterceptor实现了CallableProcessingInterceptor接口,在获取Callable异步结果之前调用preProcess设置LocaleContext和RequestAttributes。在获取Callable异步结果之后调用postProcess重置LocaleContext和RequestAttributes。

RequestMappingHandlerAdapter.invokeHandlerMethod(HttpServletRequest request,HttpServletResponse response, HandlerMethod handlerMethod)

protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
		HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

	ServletWebRequest webRequest = new ServletWebRequest(request, response);
	try {
		WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
		ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);

		ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
		if (this.argumentResolvers != null) {
			invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
		}
		if (this.returnValueHandlers != null) {
			invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
		}
		invocableMethod.setDataBinderFactory(binderFactory);
		invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);

		ModelAndViewContainer mavContainer = new ModelAndViewContainer();
		mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
		modelFactory.initModel(webRequest, mavContainer, invocableMethod);
		mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);

		AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
		asyncWebRequest.setTimeout(this.asyncRequestTimeout);

		WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
		asyncManager.setTaskExecutor(this.taskExecutor);
		asyncManager.setAsyncWebRequest(asyncWebRequest);
		asyncManager.registerCallableInterceptors(this.callableInterceptors);
		asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

		if (asyncManager.hasConcurrentResult()) {
			Object result = asyncManager.getConcurrentResult();
			mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
			asyncManager.clearConcurrentResult();
			LogFormatUtils.traceDebug(logger, traceOn -> {
				String formatted = LogFormatUtils.formatValue(result, !traceOn);
				return "Resume with async result [" + formatted + "]";
			});
			invocableMethod = invocableMethod.wrapConcurrentResult(result);
		}

		invocableMethod.invokeAndHandle(webRequest, mavContainer);
		if (asyncManager.isConcurrentHandlingStarted()) {
			return null;
		}

		return getModelAndView(mavContainer, modelFactory, webRequest);
	}
	finally {
		webRequest.requestCompleted();
	}
}

RequestMappingHandlerAdapter处理handle方法时,调用 WebAsyncUtils.createAsyncWebRequest创建异步web请求,类型是StandardServletAsyncWebRequest。异步web请求设置Timeout。获取WebAsyncManager。WebAsyncManager设置任务执行器,任务执行器类型是SimpleAsyncTaskExecutor。线程名前缀是MvcAsync。将异步web请求设置到WebAsyncManager。设置callableInterceptors(new CallableProcessingInterceptor[0])到WebAsyncManager。deferredResultInterceptors(DeferredResultProcessingInterceptor[0])设置到WebAsyncManager。

如果WebAsyncManager有异步结果,则调用获取异步结果后调用invocableMethod.wrapConcurrentResult包装异步结果。SpringMVC执行异步web的流程是创建AsyncContext。由AsyncContext开启异步web。AsyncContext添加监听器处理请求执行完成和超时。AsyncContext.dispatch()分发请求原来的请求,第一次请求获取异步任务,第二次请求获取异步任务里的结果。当调用第一次请求后将ConcurrentResult设置到WebAsyncManager。AsyncContext.dispatch()分发请求后,第二次请求asyncManager.hasConcurrentResult()为true,invocableMethod.wrapConcurrentResult将ConcurrentResult放到Callable中,invocableMethod.invokeAndHandle去Callable中获取结果。

ServletInvocableHandlerMethod.wrapConcurrentResult(Object result)

ServletInvocableHandlerMethod wrapConcurrentResult(Object result) {
	return new ConcurrentResultHandlerMethod(result, new ConcurrentResultMethodParameter(result));
}

ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType)

	public ConcurrentResultHandlerMethod(final Object result, ConcurrentResultMethodParameter returnType) {
		super((Callable<Object>) () -> {
			if (result instanceof Exception) {
				throw (Exception) result;
			}
			else if (result instanceof Throwable) {
				throw new NestedServletException("Async processing failed", (Throwable) result);
			}
			return result;
		}, CALLABLE_METHOD);

		if (ServletInvocableHandlerMethod.this.returnValueHandlers != null) {
			setHandlerMethodReturnValueHandlers(ServletInvocableHandlerMethod.this.returnValueHandlers);
		}
		this.returnType = returnType;
	}

将ConcurrentResult封装到Callable中。

异步web第一次请求时,ServletInvocableHandlerMethod.invokeAndHandle执行handle方法获取返回值,调用returnValueHandlers.handleReturnValue处理返回值。Callable类型的返回值CallableMethodReturnValueHandler返回值解析器处理。

CallableMethodReturnValueHandler.handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest)

public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
		ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

	if (returnValue == null) {
		mavContainer.setRequestHandled(true);
		return;
	}

	Callable<?> callable = (Callable<?>) returnValue;
	WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
}

获取WebAsyncManager并调用startCallableProcessing。

WebAsyncManager.startCallableProcessing(Callable<?> callable, Object... processingContext)

public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception {
	Assert.notNull(callable, "Callable must not be null");
	startCallableProcessing(new WebAsyncTask(callable), processingContext);
}

WebAsyncManager.startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)

public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
		throws Exception {

	Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
	Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");

	Long timeout = webAsyncTask.getTimeout();
	if (timeout != null) {
		this.asyncWebRequest.setTimeout(timeout);
	}

	AsyncTaskExecutor executor = webAsyncTask.getExecutor();
	if (executor != null) {
		this.taskExecutor = executor;
	}
	else {
		logExecutorWarning();
	}

	List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
	interceptors.add(webAsyncTask.getInterceptor());
	interceptors.addAll(this.callableInterceptors.values());
	interceptors.add(timeoutCallableInterceptor);

	final Callable<?> callable = webAsyncTask.getCallable();
	final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);

	this.asyncWebRequest.addTimeoutHandler(() -> {
		System.out.println("处理超时");
		if (logger.isDebugEnabled()) {
			logger.debug("Async request timeout for " + formatRequestUri());
		}
		Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
		if (result != CallableProcessingInterceptor.RESULT_NONE) {
			setConcurrentResultAndDispatch(result);
		}
	});

	this.asyncWebRequest.addErrorHandler(ex -> {
		if (!this.errorHandlingInProgress) {
			if (logger.isDebugEnabled()) {
				logger.debug("Async request error for " + formatRequestUri() + ": " + ex);
			}
			Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
			result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
			setConcurrentResultAndDispatch(result);
		}
	});

	this.asyncWebRequest.addCompletionHandler(() ->
			interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable));

	interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
	startAsyncProcessing(processingContext);
	try {
		Future<?> future = this.taskExecutor.submit(() -> {
			Object result = null;
			try {
				interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
				result = callable.call();
			}
			catch (Throwable ex) {
				result = ex;
			}
			finally {
				result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
			}
			setConcurrentResultAndDispatch(result);
		});
		interceptorChain.setTaskFuture(future);
	}
	catch (RejectedExecutionException ex) {
		Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
		setConcurrentResultAndDispatch(result);
		throw ex;
	}
}

1、设置timeout和executor。
2、设置List和创建CallableInterceptorChain。List值有:

  • WebAsyncTask.getInterceptor()创建的CallableProcessingInterceptor
  • this.callableInterceptors.values(),即FrameworkServlet内部类RequestBindingInterceptor
  • TimeoutCallableProcessingInterceptor处理超时

3、asyncWebRequest添加超时处理。超时处理调用CallableInterceptorChain的triggerAfterTimeout。调用setConcurrentResultAndDispatch设置concurrentResult结果和分发请求。
4、asyncWebRequest添加错误处理。错误处理调用CallableInterceptorChain的triggerAfterError,调用setConcurrentResultAndDispatch设置concurrentResult结果和分发请求。
5、asyncWebRequest添加请求完成处理。请求完成后调用CallableInterceptorChain的triggerAfterCompletion。
6、调用CallableInterceptorChain的applyBeforeConcurrentHandling方法,调用异步处理的前置方法。
7、startAsyncProcessing开启异步web。
8、taskExecutor执行异步任务。taskExecutor是SimpleAsyncTaskExecutor。在调用callable.call()方法之前调用CallableInterceptorChain的applyPreProcess,执行获取异步结果的前置方法。执行完callable.call()后调用CallableInterceptorChain的applyPostProcess后置方法。

WebAsyncManager.startAsyncProcessing(Object[] processingContext)

	private void startAsyncProcessing(Object[] processingContext) {
	synchronized (WebAsyncManager.this) {
		this.concurrentResult = RESULT_NONE;
		this.concurrentResultContext = processingContext;
		this.errorHandlingInProgress = false;
	}
	this.asyncWebRequest.startAsync();

	if (logger.isDebugEnabled()) {
		logger.debug("Started async request");
	}
}

调用asyncWebRequest.startAsync()开启异步web。

StandardServletAsyncWebRequest.startAsync()

public void startAsync() {
	Assert.state(getRequest().isAsyncSupported(),
			"Async support must be enabled on a servlet and for all filters involved " +
			"in async request processing. This is done in Java code using the Servlet API " +
			"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
			"filter declarations in web.xml.");
	Assert.state(!isAsyncComplete(), "Async processing has already completed");

	if (isAsyncStarted()) {
		return;
	}
	this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
	this.asyncContext.addListener(this);
	if (this.timeout != null) {
		this.asyncContext.setTimeout(this.timeout);
	}
}

调用startAsync开启异步web。asyncContext添加监听器和设置超时时间。

StandardServletAsyncWebRequest

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
	System.out.println("异步web开始:" + event);
}

@Override
public void one rror(AsyncEvent event) throws IOException {
	this.exceptionHandlers.forEach(consumer -> consumer.accept(event.getThrowable()));
}

@Override
public void onTimeout(AsyncEvent event) throws IOException {
	this.timeoutHandlers.forEach(Runnable::run);
}

@Override
public void onComplete(AsyncEvent event) throws IOException {
	this.completionHandlers.forEach(Runnable::run);
	this.asyncContext = null;
	this.asyncCompleted.set(true);
}

以上四个方法是监听器处理方法。asyncCompleted表示request是否完成。

WebAsyncManager.setConcurrentResultAndDispatch(Object result)

private void setConcurrentResultAndDispatch(Object result) {
	synchronized (WebAsyncManager.this) {
		if (this.concurrentResult != RESULT_NONE) {
			return;
		}
		this.concurrentResult = result;
		this.errorHandlingInProgress = (result instanceof Throwable);
	}

	if (this.asyncWebRequest.isAsyncComplete()) {
		if (logger.isDebugEnabled()) {
			logger.debug("Async result set but request already complete: " + formatRequestUri());
		}
		return;
	}

	if (logger.isDebugEnabled()) {
		boolean isError = result instanceof Throwable;
		logger.debug("Async " + (isError ? "error" : "result set") + ", dispatch to " + formatRequestUri());
	}
	this.asyncWebRequest.dispatch();
}

设置concurrentResult和调用asyncWebRequest.dispatch分发请求。如果是taskExecutor.submit正常执行完或出现错误则由taskExecutor里的线程调用此方法。线程名前缀是MvcAsync。如果是超时则由主线程调用此方法。

StandardServletAsyncWebRequest.dispatch()

public void dispatch() {
	Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext");
	System.out.println("分发请求:"+Thread.currentThread());
	this.asyncContext.dispatch();
}

调用asyncContext.dispatch分发新的请求到原来的请求。

SimpleAsyncTaskExecutor.submit(Runnable task)

public Future<?> submit(Runnable task) {
	FutureTask<Object> future = new FutureTask<>(task, null);
	execute(future, TIMEOUT_INDEFINITE);
	return future;
}

执行异步web的任务。

SimpleAsyncTaskExecutor.execute(Runnable task, long startTimeout)

public void execute(Runnable task, long startTimeout) {
	Assert.notNull(task, "Runnable must not be null");
	Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
	if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
		this.concurrencyThrottle.beforeAccess();
		doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
	}
	else {
		doExecute(taskToUse);
	}
}

protected void doExecute(Runnable task) {
	Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
	thread.start();
}


public Thread createThread(Runnable runnable) {
	Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName());
	thread.setPriority(getThreadPriority());
	thread.setDaemon(isDaemon());
	return thread;
}

线程前缀是MvcAsync。

标签:异步,web,SpringMVC,Object,Callable,WebAsyncManager,result,asyncWebRequest,null
From: https://www.cnblogs.com/shigongp/p/16885526.html

相关文章