当前位置:网站首页>Which thread pool does Async use?
Which thread pool does Async use?
2022-08-02 23:32:00 【blue shirt dyed red dust】
前言
在SpringWe often use asynchronous operations in,注解中使用 @EnableAsync
和 @Async
就可以使用它了.But recently I found that the thread number used in asynchronous is the custom thread pool in our project ThreadPoolTaskExecutor
rather than the familiar SimpleAsyncTaskExecutor
So let's take a look at his execution process..
正文
- First make the asynchrony effective,We have to add in the startup class
@EnableAsync
Then click on it.它会使用@Import
注入一个AsyncConfigurationSelector
类,Start is through the parent class can decide it uses the configuration classProxyAsyncConfiguration
.
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
public AsyncConfigurationSelector() {
}
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch(adviceMode) {
case PROXY:
return new String[]{ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[]{"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"};
default:
return null;
}
}
}
- Click to see the injection of a
AsyncAnnotationBeanPostProcessor
.它实现了BeanPostProcessor
接口,So it is a post-processor,用于将Spring AOP
的Advisor
应用于给定的bean
.从而该bean
The method defined by the asynchronous annotation on the above will be called truly asynchronously when it is called.
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
public ProxyAsyncConfiguration() {
}
@Bean(
name = {"org.springframework.context.annotation.internalAsyncAnnotationProcessor"}
)
@Role(2)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder((Integer)this.enableAsync.getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
的父类实现了BeanFactoryAware
,那么会在AsyncAnnotationBeanPostProcessor
实例化之后回调setBeanFactory()
来实例化切面AsyncAnnotationAdvisor
.
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//定义一个切面
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
AsyncAnnotationAdvisor
Constructing and declaring cut-ins(切点)And the code to enhance(通知).
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//通知
this.advice = buildAdvice(executor, exceptionHandler);
//切入点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
- 通知就是最终要执行的.
buildAdvice
Used to construct the notification,主要是创建一个AnnotationAsyncExecutionInterceptor
类型的拦截器,And configured using the actuator and the exception handler.The real asynchronous execution of the code is inAsyncExecutionAspectSupport
中!
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//配置拦截器
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
- 配置拦截器,Custom executor through parameter configuration and exception handler or use the default actuators and the exception handler.
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//默认执行器
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
getDefaultExecutor()
方法,Used to find the default executor,父类AsyncExecutionAspectSupport
Looking for only one type of firstTaskExecutor
executor and return,If there are more than one, find the default executortaskExecutor
,If it cannot be found, return directlynull.子类AsyncExecutionInterceptor
重写getDefaultExecutor
方法,First call the parent class logic,返回nullconfigure a namedSimpleAsyncTaskExecutor
的执行器
/**
* 父类
* Get or build the default executor for this notification instance
* Here to return to the actuator will be cached for later use
* The default implementation searches for uniqueTaskExecutor的bean
* 在上下文中,用于名为“taskExecutor”的Executor bean.
* If neither can be resolved,This implementation will return null
*/
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
if (beanFactory != null) {
try {
// Search the only oneTaskExecutor类型的bean并返回
return beanFactory.getBean(TaskExecutor.class);
}
catch (NoUniqueBeanDefinitionException ex) {
//Can't find the only onebean异常后,搜索一个TaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find unique TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskExecutor bean found within the context, and none is named " +
"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
//Search for one when no exception is foundTaskExecutor类型的“taskExecutor”的bean并返回
logger.debug("Could not find default TaskExecutor bean", ex);
try {
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
}
catch (NoSuchBeanDefinitionException ex2) {
logger.info("No task executor bean found for async processing: " +
"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
}
// Giving up -> either using local default executor or none at all...
}
}
return null;
}
/**
* 子类
* If the parent class isnullTo instantiate a namedSimpleAsyncTaskExecutor的执行器
*/
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
所以,In this step you can understand why the asynchronous thread name by default SimpleAsyncTaskExecutor-xx
,Why is it possible to use your own thread pool configuration asynchronously with your own thread pool?.
After we have the breakthrough point,Executed every time the interface is requested to execute an asynchronous method AsyncExecutionInterceptor#invoke()
, determineAsyncExecutor
Used to decide which executor to use
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//Select an executor of the corresponding method in the cached executor
AsyncTaskExecutor executor = (AsyncTaskExecutor)this.executors.get(method);
if (executor == null) {
//获取@Async注解中的value(The specified actuator)
String qualifier = this.getExecutorQualifier(method);
Executor targetExecutor;
if (StringUtils.hasLength(qualifier)) {
//获取指定执行器的bean
targetExecutor = this.findQualifiedExecutor(this.beanFactory, qualifier);
} else {
//Choose the default executor
targetExecutor = (Executor)this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor)targetExecutor : new TaskExecutorAdapter(targetExecutor);
//cache the executor
this.executors.put(method, executor);
}
return (AsyncTaskExecutor)executor;
}
When there is an executor call doSubmit
Method to add tasks to the actuator.
异步任务,默认将采用SimpleAsyncTaskExecutor作为执行器!它有如下特点:
不复用线程,That is up a new thread for each task.但是可以通过
concurrencyLimit
property to control the number of concurrent threads,But by default do not limit(concurrencyLimit
取值为-1).因此,If we use async tasks,The configuration of the default executor must not be used,以防OOM异常!The best way is to specify the executor!
总结
This article mainly looks at the source code to understand asynchronous annotations @Async
Is how to choose the thread and using a thread in the project,Try to specify a unique thread pool for asynchronous tasks,This will avoid the impact of not sharing the thread pool with other services.
边栏推荐
猜你喜欢
J9 digital theory: the Internet across chain bridge has what effect?
ShardingSphere-proxy +PostgreSQL implements read-write separation (static strategy)
Lvm逻辑卷
SCANIA SCANIA OTL tag is introduced
9,共模抑制比一-不受输入信号中共模波动的影响。【如何分析共模CM抑制比。】
LM小型可编程控制器软件(基于CoDeSys)笔记二十五:plc的数据存储区(数字量输入通道部分)
如何解决图像分类中的类别不均衡问题?不妨试试分开学习表征和分类器
ssdp协议搜索GB28181设备
PG's SQL execution plan
What is a Field Service Management System (FSM)?what is the benefit?
随机推荐
"A daily practice, happy water problem" 1374. Generate a string with an odd number of each character
GNN教程:图神经网络基础知识!
C# Monitor类
Tencent YunMeng every jie: I experienced by cloud native authors efficiency best practices case
setup语法糖 defineProps defineEmits defineExpose
实战:10 种实现延迟任务的方法,附代码!
广东省数字经济发展指引 1.0之建成数据安全保障体系
Redis集群配置
Bena的生命周期
遇上Mysql亿级优化,怎么办
APP自动化uiautomator2获取toast
LeetCode 622 设计循环队列[数组 队列] HERODING的LeetCode之路
你是几星测试/开发程序员?技术型选手王大拿......
新增指令 v-memo
AI科学家:自动发现物理系统的隐藏状态变量
4 kmiles join YiSheng group, with more strong ability of digital business, accelerate China's cross-border electricity full domain full growth
ECCV 2022 | 通往数据高效的Transformer目标检测器
信息学奥赛一本通(1256:献给阿尔吉侬的花束)
[安洵杯 2019]easy_web
Linphone 被叫方如何解析来电SIP消息中的自定义头消息