package org.jeecg.modules.doc.threadpool;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.scheduling.annotation.AsyncConfigurer;
|
import org.springframework.scheduling.annotation.EnableAsync;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Objects;
|
import java.util.concurrent.Executor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
/**
|
* 异步线程池配置 AsyncConfigurer在applicationContext早期初始化,如果需要依赖于其它的bean,尽可能的将它们声明为lazy
|
*/
|
@Slf4j
|
@EnableAsync
|
@Component
|
@EnableConfigurationProperties(AsyncThreadPoolProperties.class)
|
public class AsyncThreadPoolAutoConfiguration implements AsyncConfigurer {
|
|
@Autowired
|
private AsyncThreadPoolProperties asyncThreadPoolProperties;
|
|
/**
|
* 定义线程池
|
* 使用{@link java.util.concurrent.LinkedBlockingQueue}(FIFO)队列,是一个用于并发环境下的阻塞队列集合类
|
* ThreadPoolTaskExecutor不是完全被IOC容器管理的bean,可以在方法上加上@Bean注解交给容器管理,这样可以将taskExecutor.initialize()方法调用去掉,容器会自动调用
|
*
|
* @return
|
*/
|
@Bean("asyncTaskExecutor")
|
@Override
|
public Executor getAsyncExecutor() {
|
//Java虚拟机可用的处理器数
|
int processors = Runtime.getRuntime().availableProcessors();
|
//定义线程池
|
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
|
//核心线程数
|
taskExecutor.setCorePoolSize(Objects.nonNull(asyncThreadPoolProperties.getCorePoolSize()) ? asyncThreadPoolProperties.getCorePoolSize() : processors);
|
//线程池最大线程数,默认:40000
|
taskExecutor.setMaxPoolSize(Objects.nonNull(asyncThreadPoolProperties.getMaxPoolSize()) ? asyncThreadPoolProperties.getMaxPoolSize() : 40000);
|
//线程队列最大线程数,默认:80000
|
taskExecutor.setQueueCapacity(Objects.nonNull(asyncThreadPoolProperties.getMaxPoolSize()) ? asyncThreadPoolProperties.getMaxPoolSize() : 80000);
|
//线程名称前缀
|
taskExecutor.setThreadNamePrefix(StringUtils.isNotEmpty(asyncThreadPoolProperties.getThreadNamePrefix()) ? asyncThreadPoolProperties.getThreadNamePrefix() : "Async-ThreadPool-");
|
//线程池中线程最大空闲时间,默认:60,单位:秒
|
taskExecutor.setKeepAliveSeconds(asyncThreadPoolProperties.getKeepAliveSeconds());
|
//核心线程是否允许超时,默认:false
|
taskExecutor.setAllowCoreThreadTimeOut(asyncThreadPoolProperties.isAllowCoreThreadTimeOut());
|
//IOC容器关闭时是否阻塞等待剩余的任务执行完成,默认:false(必须设置setAwaitTerminationSeconds)
|
taskExecutor.setWaitForTasksToCompleteOnShutdown(asyncThreadPoolProperties.isWaitForTasksToCompleteOnShutdown());
|
//阻塞IOC容器关闭的时间,默认:10秒(必须设置setWaitForTasksToCompleteOnShutdown)
|
taskExecutor.setAwaitTerminationSeconds(asyncThreadPoolProperties.getAwaitTerminationSeconds());
|
/**
|
* 拒绝策略,默认是AbortPolicy
|
* AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
|
* DiscardPolicy:丢弃任务但不抛出异常
|
* DiscardOldestPolicy:丢弃最旧的处理程序,然后重试,如果执行器关闭,这时丢弃任务
|
* CallerRunsPolicy:执行器执行任务失败,则在策略回调方法中执行任务,如果执行器关闭,这时丢弃任务
|
*/
|
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
|
//初始化
|
//taskExecutor.initialize();
|
|
return taskExecutor;
|
}
|
|
/**
|
* 异步方法执行的过程中抛出的异常捕获
|
*
|
* @return AsyncUncaughtExceptionHandler
|
*/
|
@Override
|
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
|
return new BaseAsyncUncaughtExceptionHandler();
|
}
|
}
|