当前位置:网站首页>Thread pool, who am I? Where am I?
Thread pool, who am I? Where am I?
2022-07-19 20:21:00 【yanhom】
Hello everyone , This article discusses various postures for daily use of thread pools , Focus on how to Spring Use thread pool correctly in the environment .
Thread pool usage posture
First of all, let's ask you a question , How do you use thread pools in your daily development ?
I think it can be roughly divided into the following four situations :
1. Method level , Build as you use , Shut down after use
2. Class level sharing , Define a static final Embellished ThreadPoolExecutor, This class and subclasses ( Look at the modifier ) All objects 、 Method sharing
3. Business sharing , Define multiple by business type ThreadPoolExecutor, The same business type shares the same thread pool object
4. Global Shared , All parts of the service share the same global thread pool
Generally speaking , Preferred use 3, Second, the way 2, Don't use the way 1 Follow 4, Here's why
1. The purpose of thread pool is to uniformly manage thread resources , Reduce the overhead of frequent creation and destruction threads , Use pooling technology to reuse threads to execute tasks , Improve system performance , In high concurrency 、 In the asynchronous scenario , Method level use simply cannot achieve this goal , Instead, the performance will be reduced .
2. Share a thread pool globally , Task execution is uneven , interact , High time-consuming tasks will fill the thread pool resources , Lead to low-cost tasks without the opportunity to perform ; At the same time, if there is a parent-child relationship between tasks , It may lead to deadlock , causing OOM.
3. Thread pool isolation by business type , The execution of each task does not affect each other , Granularity is also a little larger than class level sharing , Will not create a large number of thread pools , Reduce the pressure of system scheduling , image Hystrix Thread pool isolation can be understood as this mode .
Sum up , It is suggested that everyone adopt the method 3, Define thread pool according to business function classification .
Spring Project use ThreadPoolExecutor
Spring As a Bean Containers , We usually use ThreadPoolExecutor Sign up to Spring In the container , meanwhile Spring When the container is refreshed, the corresponding ThreadPoolExecutor object To our business Bean in , Then it can be used directly , For example, the definition is as follows (ThreadPoolBuilder It is a builder mode implementation of encapsulation ):
@Configuration
public class ThreadPoolConfiguration {
@Bean
public ThreadPoolExecutor jobExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 3000)
.build();
}
@Bean
public ThreadPoolExecutor remotingExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(SYNCHRONOUS_QUEUE.getName(), null)
.build();
}
@Bean
public ThreadPoolExecutor consumeExecutor() {
return ThreadPoolBuilder.newBuilder()
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(LINKED_BLOCKING_QUEUE.getName(), 5000)
.build();
}
}
The above defines three thread pool instances according to usage scenarios , A timed task used to perform time-consuming tasks 、 One is used to execute remote RPC call 、 One for execution Mq consumption .
Use this way ThreadPoolExecutor There is a problem ,Spring When the container is closed, the tasks in the task queue may not be finished , There is a risk of losing the task .
We know Spring Medium Bean There is a life cycle , If Bean Realized Spring Corresponding lifecycle interface (InitializingBean、DisposableBean Interface ), stay Bean initialization 、 When the container is closed, the corresponding method will be called to do the corresponding processing .
Therefore, it is recommended not to use it directly ThreadPoolExecutor stay Spring Environment , have access to Spring Provided ThreadPoolTaskExecutor, perhaps DynamicTp Framework provided DtpExecutor Thread pool implementation .
some Spring knowledge
Here is a source code reading skill , Open source projects and Spring Integration time , Many students don't know where to start reading the source code .
We know Spring Provides a lot of extension points , Third party framework Integration Spring In fact, most of them are based on these extension interfaces , So we can start with these extension interfaces , Breakpoint debugging , Step by step into the framework kernel .
These extensions include but are not limited to the following interfaces :
BeanFactoryPostProcessor: stay Bean Right before instantiation BeanDefinition Make changes
BeanPostProcessor: stay Bean Before and after initialization Bean Make some modifications to package enhancements , For example, return the proxy object
Aware: A tag interface , Classes that implement this interface and sub interfaces will receive Spring Notification callback for , Give some Spring The ability of the framework , such as ApplicationContextAware、EnvironmentAware etc.
ApplicationContextInitializer: In the context preparation stage , Do some initialization before refreshing the container , For example, our commonly used configuration center client Basically, they inherit the initializer , Pull the configuration from remote to local before refreshing the container , And then it's packaged as PropertySource Put it in Environment For use in
ApplicationListener:Spring Event mechanism , Listen for specific application events (ApplicationEvent), An implementation of observer model
FactoryBean: Used to customize Bean Creation logic of (Mybatis、Feign wait )
ImportBeanDefinitionRegistrar: Definition @EnableXXX annotation , On the notes Import One. ImportBeanDefinitionRegistrar, Implement registration BeanDefinition Into the container
ApplicationRunner/CommandLineRunner: Callback after the container starts , Perform some initialization
The above lists several commonly used interfaces , however Spring Expansion is far from that , There are many extension interfaces that you can understand by yourself .
DynamicTp Generate thread pool objects
DynamicTp The framework defines DtpExecutor Thread pool class , The inheritance relationship is as follows :
EagerDtpExecutor: Reference resources Tomcat Thread pool design , Adjusted the execution process of the process pool , Priority is given to creating threads to execute tasks rather than putting them in the queue , It is mainly used for IO Dense scenes , Inherit DtpExecutor
DtpExecutor: Rewrote ThreadPoolExecutor Of execute Method 、beforeExecute Method 、afterExecute Method , Mainly do task packaging 、 Execution timeout 、 Wait for timeout records , Inherit DtpLifecycleSupport
DtpLifecycleSupport: Realized Spring Medium InitializingBean, DisposableBean Interface , stay Bean initialization 、Spring Execute the corresponding logic when the container is destroyed ,destroy The logic of the method is as follows :
@Override
public void destroy() {
internalShutdown();
}
public void internalShutdown() {
if (log.isInfoEnabled()) {
log.info("Shutting down ExecutorService, poolName: {}", threadPoolName);
}
if (this.waitForTasksToCompleteOnShutdown) {
// If you need to wait for the completion of the task , Call shutdown() Will perform previously submitted tasks , Reject new task submission , The thread pool state changes to SHUTDOWN
this.shutdown();
} else {
// If you don't need to wait for the completion of the task , Call directly shutdownNow() Method , Try to interrupt a task in progress , Return all unexecuted tasks , The thread pool state changes to STOP, And then call Future Of cancel Method cancel
for (Runnable remainingTask : this.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary();
}
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
private void awaitTerminationIfNecessary() {
if (this.awaitTerminationSeconds <= 0) {
return;
}
try {
// coordination shutdown Use , Block the current thread , Wait for the task that has been submitted to complete execution or timeout
if (!awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS) && log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor {} to terminate", threadPoolName);
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor {} to terminate", threadPoolName);
}
Thread.currentThread().interrupt();
}
}
DynamicTp Framework in integration Spring When , It also uses the extension interface mentioned above .
Expand 1
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DtpBeanDefinitionRegistrar.class)
public @interface EnableDynamicTp {
}
Have used DynamicTp You should know that you need to add @EnableDynamicTp annotation , This annotation is actually used ImportBeanDefinitionRegistrar Expand , The main codes are as follows :
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
DtpProperties dtpProperties = new DtpProperties();
// Bind the thread pool related configuration in the environment variable to DtpProperties On the object
PropertiesBinder.bindDtpProperties(environment, dtpProperties);
val executors = dtpProperties.getExecutors();
if (CollUtil.isEmpty(executors)) {
log.warn("DynamicTp registrar, no executors are configured.");
return;
}
executors.forEach(x -> {
// Determine the thread pool type (common or eager)
Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType());
String beanName = x.getThreadPoolName();
// Thread pool object properties
Map<String, Object> properties = buildProperties(x);
// Constructor parameters
Object[] args = buildArgs(executorTypeClass, x);
BeanUtil.registerIfAbsent(registry, beanName, executorTypeClass, properties, args);
});
}
Code reading :
1. We know ImportBeanDefinitionRegistrar The implementation of Spring It is executed when the container is refreshed , Before that, the thread pool configuration has been pulled from the configuration center in the context preparation stage and put into the environment variable , So the first step is to bind the thread pool related configuration in the environment variable to DtpProperties On the object .
2. And then construct BeanDefinitionBuilder object , Set constructor parameters 、 Setting property values , Sign up to BeanDefinition To Spring In the container
public static void doRegister(BeanDefinitionRegistry registry,
String beanName,
Class<?> clazz,
Map<String, Object> properties,
Object... constructorArgs) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
// Set constructor parameters , Old stereotyped writing
for (Object constructorArg : constructorArgs) {
builder.addConstructorArgValue(constructorArg);
}
// Set properties and values KV Yes , Follow up Bean populateBean Will pass through reflection set Method assignment
if (CollUtil.isNotEmpty(properties)) {
properties.forEach(builder::addPropertyValue);
}
registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
}
3.Spring The container will be refreshed according to the registered BeanDefinition Create a configured thread pool object , Initialize assignment , And inject into the referenced Bean in . So you don't have to use it manually @Bean Declare the thread pool object , Just configure it in the configuration center
Expand 2
DtpPostProcessor Inherit BeanPostProcessor, stay Bean Before and after initialization ThreadPoolExecutor And its subclasses , It is mainly used to get the thread pool object registered to DynamicTp In the container defined inside the framework ( Just one Map)
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof ThreadPoolExecutor)) {
return bean;
}
if (bean instanceof DtpExecutor) {
DtpExecutor dtpExecutor = (DtpExecutor) bean;
if (bean instanceof EagerDtpExecutor) {
((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);
}
registerDtp(dtpExecutor);
return dtpExecutor;
}
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
DynamicTp dynamicTp;
try {
dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);
if (dynamicTp == null) {
return bean;
}
} catch (NoSuchBeanDefinitionException e) {
log.error("There is no bean with the given name {}", beanName, e);
return bean;
}
String poolName = StringUtils.isNotBlank(dynamicTp.value()) ? dynamicTp.value() : beanName;
registerCommon(poolName, (ThreadPoolExecutor) bean);
return bean;
}
Expand 3
ApplicationListener Mainly used to decouple logic , Publish a listening event ,core Module heel adapter This extension is mainly used for module communication , And the framework will listen Spring Events of each stage of container startup , Do the corresponding logical processing
public abstract class AbstractDtpHandleListener implements GenericApplicationListener {
@Override
public boolean supportsEventType(ResolvableType resolvableType) {
Class<?> type = resolvableType.getRawClass();
if (type != null) {
return RefreshEvent.class.isAssignableFrom(type) ||
CollectEvent.class.isAssignableFrom(type) ||
AlarmCheckEvent.class.isAssignableFrom(type);
}
return false;
}
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
try {
if (event instanceof RefreshEvent) {
doRefresh(((RefreshEvent) event).getDtpProperties());
} else if (event instanceof CollectEvent) {
doCollect(((CollectEvent) event).getDtpProperties());
} else if (event instanceof AlarmCheckEvent) {
doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());
}
} catch (Exception e) {
log.error("DynamicTp adapter, event handle failed.", e);
}
}
}
Expand 4
ApplicationRunner, etc. Spring After the container starts , The hook function is called , Do some initialization ,DtpMonitor、DtpRegistry And so on
therefore DynamicTp Correct use of posture , The thread pool only needs to be declared in the configuration center , Then the framework will be based on Spring These extensions automatically create thread pool objects and inject them into the required Bean in , There is no need to show the declaration in the code
Let me introduce again DynamicTp frame
DynamicTp It is a lightweight dynamic thread pool management tool based on configuration center , The main functions can be summarized as Dynamic tuning 、 Notify the police 、 Operation monitoring 、 Three party package thread pool management, etc .
After several version iterations , The latest version v1.0.7 It has the following characteristics
characteristic
- Code zero intrusion : All configurations are placed in the configuration center , Zero intrusion into business code
- Light and simple : be based on springboot Realization , introduce starter, Access is simple 4 Step can be completed , The smooth 3 Minutes
- Highly scalable : The core functions of the framework provide SPI Interface for user-defined personalized implementation ( Configuration center 、 Profile parsing 、 Alert 、 Monitoring data collection 、 Task packaging, etc )
- Online large-scale application : Reference resources Meituan thread pool practice , Meituan already has mature application experience of this theory
- Multi platform notification alarm : Provide multiple alarm dimensions ( Configuration change notification 、 Active alarm 、 Capacity threshold alarm 、 Reject trigger alarm 、 Task execution or wait timeout alarm ), Enterprise wechat has been supported 、 nailing 、 Flybook alarm , At the same time provide SPI The interface can be customized and extended
- monitor : Regularly collect thread pool indicator data , Supported by MicroMeter、JsonLog Log output 、Endpoint Three ways , It can be done by SPI Interface custom extension implementation
- Task enhancement : Provide task packaging function , Realization TaskWrapper Interface can , Such as TtlTaskWrapper It can support the transfer of thread pool context information , And set the identification for the task id, Facilitate problem tracking
- Compatibility :JUC Ordinary thread pools can also be monitored by the framework ,@Bean Add... When defining @DynamicTp Annotations can be
- reliability : The thread pool implementation provided by the framework Spring Life cycle approach , Can be in Spring Process as many tasks in the queue as possible before the container closes
- Multimode : Reference resources Tomcat Thread pools are provided IO Used in dense scenes EagerDtpExecutor Thread pool
- Support multiple configuration centers : Dynamic adjustment of thread pool parameters based on mainstream configuration center , In real time , Already supported Nacos、Apollo、Zookeeper、Consul, It also provides SPI The interface can be customized and extended
- Middleware thread pool management : Integrated management of thread pools of common third-party components , Integrated Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix And other components ( Adjustable parameter 、 Monitoring and alarming )
Project address
At present, the total amount of 1.5k star, Thank you. star, welcome pr, In addition to business, contribute to open source together
Official website :https://dynamictp.cn
gitee Address :https://gitee.com/dromara/dynamic-tp
github Address :https://github.com/dromara/dynamic-tp
边栏推荐
猜你喜欢
随机推荐
硅谷课堂第二课-搭建项目环境和开发讲师管理接口
Classification of IO streams
mysql进阶(九)多表查询
遗传发育所白洋组招聘工作人员/博后(在Science、Nature子刊等发文被引过万次)
ECCV 2022 Oral | 迈向目标跟踪的大统一范式:Unicorn
Clickhouse replica table replicatedmergetree operation
硅谷课堂第七课-腾讯云点播管理模块(二)
Generate multiple databases at the same time based on multiple data sources and zero code, and add, delete, modify and check the restful API interface
Optimization analysis of storage structure and IO performance of openharmony littlefs file system
Ppdai site login Parameters Encryption Analysis, the shortest article, the best Case com
实时语音质量监控
金鱼哥RHCA回忆录:CL210集成身份管理--管理身份服务令牌
二十九、Xv6文件系统实现(gdb追踪mkfs,buffer cach和log)
Spark 聚合函数Aggregations
Node基于TCP建立的服务器
一只发生概率小于万分之一的Bug被我查看日志时发现了
开发者实名认证的一般通行做法
手持振弦VH501TC采集儀連接傳感器接線的方式
89元的小度智能音箱来了:这是要血洗智能音箱市场吗?
How to write the test report