[spring6: Mvc-异步请求]-源码分析

发布于:2025-07-28 ⋅ 阅读:(19) ⋅ 点赞:(0)

源码

Callable

Callable 是一个带返回值且可抛异常的任务接口。

@FunctionalInterface
public interface Callable<V> {

    V call() throws Exception;
    
}

WebAsyncTask

WebAsyncTaskCallable 的增强版本,支持自定义超时时间和执行线程池,实现更灵活的异步请求处理。

public class WebAsyncTask<V> implements BeanFactoryAware {

	private final Callable<V> callable;

	@Nullable
	private final Long timeout;

	@Nullable
	private final AsyncTaskExecutor executor;

	@Nullable
	private final String executorName;

	@Nullable
	private BeanFactory beanFactory;

	@Nullable
	private Callable<V> timeoutCallback;

	@Nullable
	private Callable<V> errorCallback;

	@Nullable
	private Runnable completionCallback;

	public WebAsyncTask(Callable<V> callable) {
		Assert.notNull(callable, "Callable must not be null");
		this.callable = callable;
		this.timeout = null;
		this.executor = null;
		this.executorName = null;
	}

	public WebAsyncTask(long timeout, Callable<V> callable) {
		Assert.notNull(callable, "Callable must not be null");
		this.callable = callable;
		this.timeout = timeout;
		this.executor = null;
		this.executorName = null;
	}

	public WebAsyncTask(@Nullable Long timeout, String executorName, Callable<V> callable) {
		Assert.notNull(callable, "Callable must not be null");
		Assert.notNull(executorName, "Executor name must not be null");
		this.callable = callable;
		this.timeout = timeout;
		this.executor = null;
		this.executorName = executorName;
	}

	public WebAsyncTask(@Nullable Long timeout, AsyncTaskExecutor executor, Callable<V> callable) {
		Assert.notNull(callable, "Callable must not be null");
		Assert.notNull(executor, "Executor must not be null");
		this.callable = callable;
		this.timeout = timeout;
		this.executor = executor;
		this.executorName = null;
	}

	public Callable<?> getCallable() {
		return this.callable;
	}

	@Nullable
	public Long getTimeout() {
		return this.timeout;
	}

	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		this.beanFactory = beanFactory;
	}

	@Nullable
	public AsyncTaskExecutor getExecutor() {
		if (this.executor != null) {
			return this.executor;
		}
		else if (this.executorName != null) {
			Assert.state(this.beanFactory != null, "BeanFactory is required to look up an executor bean by name");
			return this.beanFactory.getBean(this.executorName, AsyncTaskExecutor.class);
		}
		else {
			return null;
		}
	}
	
	public void onTimeout(Callable<V> callback) {
		this.timeoutCallback = callback;
	}

	public void onError(Callable<V> callback) {
		this.errorCallback = callback;
	}

	public void onCompletion(Runnable callback) {
		this.completionCallback = callback;
	}

	CallableProcessingInterceptor getInterceptor() {
		return new CallableProcessingInterceptor() {
			@Override
			public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
				return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
			}
			@Override
			public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
				return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
			}
			@Override
			public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
				if (completionCallback != null) {
					completionCallback.run();
				}
			}
		};
	}

}

DeferredResult

DeferredResult 允许异步请求处理中,应用线程自主设置结果并注册超时、错误和完成回调。

public class DeferredResult<T> {

	private static final Object RESULT_NONE = new Object();

	private static final Log logger = LogFactory.getLog(DeferredResult.class);

	@Nullable
	private final Long timeoutValue;

	private final Supplier<?> timeoutResult;

	@Nullable
	private Runnable timeoutCallback;

	@Nullable
	private Consumer<Throwable> errorCallback;

	@Nullable
	private Runnable completionCallback;

	@Nullable
	private DeferredResultHandler resultHandler;

	@Nullable
	private volatile Object result = RESULT_NONE;

	private volatile boolean expired;

	public DeferredResult() {
		this(null);
	}

	public DeferredResult(@Nullable Long timeoutValue) {
		this(timeoutValue, () -> RESULT_NONE);
	}

	public DeferredResult(@Nullable Long timeoutValue, Object timeoutResult) {
		this(timeoutValue, () -> timeoutResult);
	}

	public DeferredResult(@Nullable Long timeoutValue, Supplier<?> timeoutResult) {
		this.timeoutValue = timeoutValue;
		this.timeoutResult = timeoutResult;
	}

	public final boolean isSetOrExpired() {
		return (this.result != RESULT_NONE || this.expired);
	}

	public boolean hasResult() {
		return (this.result != RESULT_NONE);
	}

	@Nullable
	public Object getResult() {
		Object resultToCheck = this.result;
		return (resultToCheck != RESULT_NONE ? resultToCheck : null);
	}
	
	@Nullable
	final Long getTimeoutValue() {
		return this.timeoutValue;
	}

	public void onTimeout(Runnable callback) {
		this.timeoutCallback = callback;
	}

	public void onError(Consumer<Throwable> callback) {
		this.errorCallback = callback;
	}

	public void onCompletion(Runnable callback) {
		this.completionCallback = callback;
	}

	public final void setResultHandler(DeferredResultHandler resultHandler) {
		Assert.notNull(resultHandler, "DeferredResultHandler is required");
		if (this.expired) {
			return;
		}
		
		Object resultToHandle;
		synchronized (this) {
			if (this.expired) {
				return;
			}
			resultToHandle = this.result;
			if (resultToHandle == RESULT_NONE) {
				this.resultHandler = resultHandler;
				return;
			}
		}

		try {
			resultHandler.handleResult(resultToHandle);
		}
		catch (Throwable ex) {
			logger.debug("Failed to process async result", ex);
		}
	}
	
	public boolean setResult(@Nullable T result) {
		return setResultInternal(result);
	}

	private boolean setResultInternal(@Nullable Object result) {
		if (isSetOrExpired()) {
			return false;
		}
		DeferredResultHandler resultHandlerToUse;
		synchronized (this) {
			if (isSetOrExpired()) {
				return false;
			}
			this.result = result;
			resultHandlerToUse = this.resultHandler;
			if (resultHandlerToUse == null) {
				return true;
			}
			this.resultHandler = null;
		}
		resultHandlerToUse.handleResult(result);
		return true;
	}

	public boolean setErrorResult(Object result) {
		return setResultInternal(result);
	}
	
	final DeferredResultProcessingInterceptor getLifecycleInterceptor() {
		return new LifecycleInterceptor();
	}

	@FunctionalInterface
	public interface DeferredResultHandler {
		void handleResult(@Nullable Object result);
	}

	private class LifecycleInterceptor implements DeferredResultProcessingInterceptor {

		@Override
		public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> result) {
			boolean continueProcessing = true;
			try {
				if (timeoutCallback != null) {
					timeoutCallback.run();
				}
			}
			finally {
				Object value = timeoutResult.get();
				if (value != RESULT_NONE) {
					continueProcessing = false;
					try {
						setResultInternal(value);
					}
					catch (Throwable ex) {
						logger.debug("Failed to handle timeout result", ex);
					}
				}
			}
			return continueProcessing;
		}

		@Override
		public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> result, Throwable t) {
			try {
				if (errorCallback != null) {
					errorCallback.accept(t);
				}
			}
			finally {
				try {
					setResultInternal(t);
				}
				catch (Throwable ex) {
					logger.debug("Failed to handle error result", ex);
				}
			}
			return false;
		}

		@Override
		public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> result) {
			expired = true;
			if (completionCallback != null) {
				completionCallback.run();
			}
		}

	}
}

AsyncContext

AsyncContext 是 Servlet 3.0 提供的异步处理上下文,允许请求线程释放并异步执行任务,最终通过手动完成响应或分发继续处理。

public interface AsyncContext {

    String ASYNC_REQUEST_URI = "jakarta.servlet.async.request_uri";
    
    String ASYNC_CONTEXT_PATH = "jakarta.servlet.async.context_path";
    
    String ASYNC_MAPPING = "jakarta.servlet.async.mapping";
    
    String ASYNC_PATH_INFO = "jakarta.servlet.async.path_info";
    
    String ASYNC_SERVLET_PATH = "jakarta.servlet.async.servlet_path";
    
    String ASYNC_QUERY_STRING = "jakarta.servlet.async.query_string";

    ServletRequest getRequest();

    ServletResponse getResponse();
    
    boolean hasOriginalRequestAndResponse();

    void dispatch();

    void dispatch(String path);
    
    void dispatch(ServletContext context, String path);

    void complete();

    void start(Runnable run);
    
    void addListener(AsyncListener listener);
    
    void addListener(AsyncListener listener, ServletRequest request, ServletResponse response);

    <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException;

    void setTimeout(long timeout);
    
    long getTimeout();
}

原理

CallableMethodReturnValueHandler

CallableMethodReturnValueHandler 负责处理控制器方法返回的 Callable 类型,借助 WebAsyncManager 启动异步执行并挂起请求,由 Spring MVC 异步处理机制在任务完成后恢复响应。

// RequestMappingHandlerAdapter.handleInternal ->  
// 	 RequestMappingHandlerAdapter.invokeHandlerMethod ->
//	    ServletInvocableHandlerMethod.invokeAndHandle -> 
// 		  HandlerMethodReturnValueHandlerComposite.handleReturnValue ->
//			HandlerMethodReturnValueHandlerComposite.selectHandler ->
public class CallableMethodReturnValueHandler implements HandlerMethodReturnValueHandler {

	@Override
	public boolean supportsReturnType(MethodParameter returnType) {
		return Callable.class.isAssignableFrom(returnType.getParameterType());
	}

	@Override
	public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

		if (returnValue == null) {
			mavContainer.setRequestHandled(true);
			return;
		}

		Callable<?> callable = (Callable<?>) returnValue;
		WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
	}

}

AsyncTaskMethodReturnValueHandler

AsyncTaskMethodReturnValueHandler 用于处理返回值为 WebAsyncTask 的方法,借助 WebAsyncManager 启动异步调用流程,并可注入 BeanFactory 支持自定义线程池与回调配置。

public class AsyncTaskMethodReturnValueHandler implements HandlerMethodReturnValueHandler {

	@Nullable
	private final BeanFactory beanFactory;


	public AsyncTaskMethodReturnValueHandler(@Nullable BeanFactory beanFactory) {
		this.beanFactory = beanFactory;
	}


	@Override
	public boolean supportsReturnType(MethodParameter returnType) {
		return WebAsyncTask.class.isAssignableFrom(returnType.getParameterType());
	}

	@Override
	public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

		if (returnValue == null) {
			mavContainer.setRequestHandled(true);
			return;
		}

		WebAsyncTask<?> webAsyncTask = (WebAsyncTask<?>) returnValue;
		if (this.beanFactory != null) {
			webAsyncTask.setBeanFactory(this.beanFactory);
		}
		WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(webAsyncTask, mavContainer);
	}

}

DeferredResultMethodReturnValueHandler

DeferredResultMethodReturnValueHandler 处理控制器返回的 DeferredResultListenableFutureCompletionStage,通过 WebAsyncManager 启动异步处理,挂起请求并在结果就绪时恢复响应流程。

// RequestMappingHandlerAdapter.handleInternal ->  
// 	 RequestMappingHandlerAdapter.invokeHandlerMethod ->
//	    ServletInvocableHandlerMethod.invokeAndHandle -> 
// 		  HandlerMethodReturnValueHandlerComposite.handleReturnValue ->
//			HandlerMethodReturnValueHandlerComposite.selectHandler ->
public class DeferredResultMethodReturnValueHandler implements HandlerMethodReturnValueHandler {

	@SuppressWarnings({"deprecation", "removal"})
	@Override
	public boolean supportsReturnType(MethodParameter returnType) {
		Class<?> type = returnType.getParameterType();
		return (DeferredResult.class.isAssignableFrom(type) ||
				org.springframework.util.concurrent.ListenableFuture.class.isAssignableFrom(type) ||
				CompletionStage.class.isAssignableFrom(type));
	}

	@SuppressWarnings({"deprecation", "removal"})
	@Override
	public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

		if (returnValue == null) {
			mavContainer.setRequestHandled(true);
			return;
		}

		DeferredResult<?> result;

		if (returnValue instanceof DeferredResult<?> deferredResult) {
			result = deferredResult;
		}
		else if (returnValue instanceof org.springframework.util.concurrent.ListenableFuture<?> listenableFuture) {
			result = adaptListenableFuture(listenableFuture);
		}
		else if (returnValue instanceof CompletionStage<?> completionStage) {
			result = adaptCompletionStage(completionStage);
		}
		else {
			// Should not happen...
			throw new IllegalStateException("Unexpected return value type: " + returnValue);
		}

		WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
	}

	@SuppressWarnings({"deprecation", "removal"})
	private DeferredResult<Object> adaptListenableFuture(org.springframework.util.concurrent.ListenableFuture<?> future) {
		DeferredResult<Object> result = new DeferredResult<>();
		future.addCallback(new org.springframework.util.concurrent.ListenableFutureCallback<Object>() {
			@Override
			public void onSuccess(@Nullable Object value) {
				result.setResult(value);
			}
			@Override
			public void onFailure(Throwable ex) {
				result.setErrorResult(ex);
			}
		});
		return result;
	}

	private DeferredResult<Object> adaptCompletionStage(CompletionStage<?> future) {
		DeferredResult<Object> result = new DeferredResult<>();
		future.whenComplete((value, ex) -> {
			if (ex != null) {
				if (ex instanceof CompletionException && ex.getCause() != null) {
					ex = ex.getCause();
				}
				result.setErrorResult(ex);
			}
			else {
				result.setResult(value);
			}
		});
		return result;
	}

}

WebAsyncManager

WebAsyncManager 是 Spring MVC 异步请求的核心管理器,负责协调异步任务执行、超时和错误处理,并在结果就绪后触发异步分派恢复请求流程。

public final class WebAsyncManager {

	private static final Object RESULT_NONE = new Object();

	private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR = new SimpleAsyncTaskExecutor(WebAsyncManager.class.getSimpleName());

	private static final Log logger = LogFactory.getLog(WebAsyncManager.class);

	private static final CallableProcessingInterceptor timeoutCallableInterceptor = new TimeoutCallableProcessingInterceptor();

	private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor();


	@Nullable
	private AsyncWebRequest asyncWebRequest;

	private AsyncTaskExecutor taskExecutor = DEFAULT_TASK_EXECUTOR;

	private boolean isMultipartRequestParsed;

	@Nullable
	private volatile Object concurrentResult = RESULT_NONE;

	@Nullable
	private volatile Object[] concurrentResultContext;

	private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);

	private final Map<Object, CallableProcessingInterceptor> callableInterceptors = new LinkedHashMap<>();

	private final Map<Object, DeferredResultProcessingInterceptor> deferredResultInterceptors = new LinkedHashMap<>();

	WebAsyncManager() {}

	@SuppressWarnings({"rawtypes", "unchecked"})
	public void startCallableProcessing(Callable<?> callable, Object... processingContext) throws Exception {
		Assert.notNull(callable, "Callable must not be null");
		startCallableProcessing(new WebAsyncTask(callable), processingContext);
	}

	@SuppressWarnings("NullAway")
	public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {

		Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
		Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");

		if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) {
			throw new IllegalStateException(
					"Unexpected call to startCallableProcessing: [" + this.state.get() + "]");
		}

		Long timeout = webAsyncTask.getTimeout();
		if (timeout != null) {
			this.asyncWebRequest.setTimeout(timeout);
		}

		AsyncTaskExecutor executor = webAsyncTask.getExecutor();
		if (executor != null) {
			this.taskExecutor = executor;
		}

		List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
		interceptors.add(webAsyncTask.getInterceptor());
		interceptors.addAll(this.callableInterceptors.values());
		interceptors.add(timeoutCallableInterceptor);

		final Callable<?> callable = webAsyncTask.getCallable();
		final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);

		this.asyncWebRequest.addTimeoutHandler(() -> {
			if (logger.isDebugEnabled()) {
				logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest));
			}
			Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
			if (result != CallableProcessingInterceptor.RESULT_NONE) {
				setConcurrentResultAndDispatch(result);
			}
		});

		this.asyncWebRequest.addErrorHandler(ex -> {
			if (logger.isDebugEnabled()) {
				logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest) + ": " + ex);
			}
			if (DisconnectedClientHelper.isClientDisconnectedException(ex)) {
				ex = new AsyncRequestNotUsableException(
						"Servlet container error notification for disconnected client", ex);
			}
			Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
			result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
			setConcurrentResultAndDispatch(result);
		});

		this.asyncWebRequest.addCompletionHandler(() -> interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable));

		interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
		startAsyncProcessing(processingContext);
		try {
			Future<?> future = this.taskExecutor.submit(() -> {
				Object result = null;
				try {
					interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
					result = callable.call();
				}
				catch (Throwable ex) {
					result = ex;
				}
				finally {
					result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
				}
				setConcurrentResultAndDispatch(result);
			});
			interceptorChain.setTaskFuture(future);
		}
		catch (Throwable ex) {
			Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
			setConcurrentResultAndDispatch(result);
		}
	}

	@SuppressWarnings("NullAway")
	public void startDeferredResultProcessing(
			final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

		Assert.notNull(deferredResult, "DeferredResult must not be null");
		Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");

		if (!this.state.compareAndSet(State.NOT_STARTED, State.ASYNC_PROCESSING)) {
			throw new IllegalStateException(
					"Unexpected call to startDeferredResultProcessing: [" + this.state.get() + "]");
		}

		Long timeout = deferredResult.getTimeoutValue();
		if (timeout != null) {
			this.asyncWebRequest.setTimeout(timeout);
		}

		List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<>();
		interceptors.add(deferredResult.getLifecycleInterceptor());
		interceptors.addAll(this.deferredResultInterceptors.values());
		interceptors.add(timeoutDeferredResultInterceptor);

		final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);

		this.asyncWebRequest.addTimeoutHandler(() -> {
			if (logger.isDebugEnabled()) {
				logger.debug("Servlet container timeout notification for " + formatUri(this.asyncWebRequest));
			}
			try {
				interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
				synchronized (WebAsyncManager.this) {
					// If application thread set the DeferredResult first in a race,
					// we must still not return until setConcurrentResultAndDispatch is done
					return;
				}
			}
			catch (Throwable ex) {
				setConcurrentResultAndDispatch(ex);
			}
		});

		this.asyncWebRequest.addErrorHandler(ex -> {
			if (logger.isDebugEnabled()) {
				logger.debug("Servlet container error notification for " + formatUri(this.asyncWebRequest));
			}
			if (DisconnectedClientHelper.isClientDisconnectedException(ex)) {
				ex = new AsyncRequestNotUsableException(
						"Servlet container error notification for disconnected client", ex);
			}
			try {
				interceptorChain.triggerAfterError(this.asyncWebRequest, deferredResult, ex);
				synchronized (WebAsyncManager.this) {
					// If application thread set the DeferredResult first in a race,
					// we must still not return until setConcurrentResultAndDispatch is done
					return;
				}
			}
			catch (Throwable interceptorEx) {
				setConcurrentResultAndDispatch(interceptorEx);
			}
		});

		this.asyncWebRequest.addCompletionHandler(() ->
				interceptorChain.triggerAfterCompletion(this.asyncWebRequest, deferredResult));

		interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
		startAsyncProcessing(processingContext);

		try {
			interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
			deferredResult.setResultHandler(result -> {
				result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
				setConcurrentResultAndDispatch(result);
			});
		}
		catch (Throwable ex) {
			setConcurrentResultAndDispatch(ex);
		}
	}

	private void startAsyncProcessing(Object[] processingContext) {
		synchronized (WebAsyncManager.this) {
			this.concurrentResult = RESULT_NONE;
			this.concurrentResultContext = processingContext;
		}

		Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
		if (logger.isDebugEnabled()) {
			logger.debug("Started async request for " + formatUri(this.asyncWebRequest));
		}

		this.asyncWebRequest.startAsync();
	}

	private void setConcurrentResultAndDispatch(@Nullable Object result) {
		Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
		synchronized (WebAsyncManager.this) {
			if (!this.state.compareAndSet(State.ASYNC_PROCESSING, State.RESULT_SET)) {
				if (logger.isDebugEnabled()) {
					logger.debug("Async result already set: [" + this.state.get() + "], " +
							"ignored result for " + formatUri(this.asyncWebRequest));
				}
				return;
			}

			this.concurrentResult = result;
			if (logger.isDebugEnabled()) {
				logger.debug("Async result set for " + formatUri(this.asyncWebRequest));
			}

			if (this.asyncWebRequest.isAsyncComplete()) {
				if (logger.isDebugEnabled()) {
					logger.debug("Async request already completed for " + formatUri(this.asyncWebRequest));
				}
				return;
			}

			if (logger.isDebugEnabled()) {
				logger.debug("Performing async dispatch for " + formatUri(this.asyncWebRequest));
			}
			this.asyncWebRequest.dispatch();
		}
	}
}

StandardServletAsyncWebRequest

StandardServletAsyncWebRequest 是 Spring 框架中封装 Servlet 3.0 异步处理机制的类,用于管理异步请求的启动、超时和完成通知。

public class StandardServletAsyncWebRequest extends ServletWebRequest implements AsyncWebRequest, AsyncListener {

	private final List<Runnable> timeoutHandlers = new ArrayList<>();

	private final List<Consumer<Throwable>> exceptionHandlers = new ArrayList<>();

	private final List<Runnable> completionHandlers = new ArrayList<>();

	@Nullable
	private Long timeout;

	@Nullable
	private AsyncContext asyncContext;

	private State state;

	private final ReentrantLock stateLock = new ReentrantLock();

	public StandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response) {
		this(request, response, null);
	}

	@SuppressWarnings("NullAway")
	StandardServletAsyncWebRequest(HttpServletRequest request, HttpServletResponse response, 
		@Nullable StandardServletAsyncWebRequest previousRequest) {
		
		super(request, new LifecycleHttpServletResponse(response));
		this.state = (previousRequest != null ? previousRequest.state : State.NEW);
		//noinspection DataFlowIssue
		((LifecycleHttpServletResponse) getResponse()).setAsyncWebRequest(this);
	}

	@Override
	public void startAsync() {
		Assert.state(getRequest().isAsyncSupported(),
				"Async support must be enabled on a servlet and for all filters involved " +
				"in async request processing. This is done in Java code using the Servlet API " +
				"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
				"filter declarations in web.xml.");

		if (isAsyncStarted()) {
			return;
		}

		if (this.state == State.NEW) {
			this.state = State.ASYNC;
		}
		else {
			Assert.state(this.state == State.ASYNC, "Cannot start async: [" + this.state + "]");
		}

		this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
		this.asyncContext.addListener(this);
		if (this.timeout != null) {
			this.asyncContext.setTimeout(this.timeout);
		}
	}

	@Override
	public void dispatch() {
		Assert.state(this.asyncContext != null, "AsyncContext not yet initialized");
		if (!this.isAsyncComplete()) {
			this.asyncContext.dispatch();
		}
	}
	
	// ...

}

实战

@RestController
@SpringBootApplication
public class Application {

    @Autowired
    @Qualifier(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)
    private AsyncTaskExecutor applicationTaskExecutor;

    @GetMapping("/deferredResult")
    public DeferredResult<String> deferredResult() {
        DeferredResult<String> deferredResult = new DeferredResult<>(3000L, "timeout");
        applicationTaskExecutor.execute(() -> {
            try {
                Thread.sleep(5000); 
                deferredResult.setResult("deferredResult");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        return deferredResult;
    }

    @GetMapping("/webAsyncTask")
    public WebAsyncTask<String> webAsyncTask() {
        return new WebAsyncTask<String>(3000L,() -> { 
            Thread.sleep(1000);
            return "webAsyncTask";
        });
    }

    @GetMapping("/callable")
    public Callable<String> callable() {
        return () -> "callable";
    }

    @GetMapping("/asyncContext")
    public void asyncContext(HttpServletRequest request, HttpServletResponse response) {
        AsyncContext asyncContext = request.startAsync();
        asyncContext.start(() -> {
            try {
                Thread.sleep(2000);
                response.getWriter().write("asyncContext");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                asyncContext.complete();
            }
        });
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

网站公告

今日签到

点亮在社区的每一天
去签到