Spring任务调度
创始人
2025-06-27 23:01:16
0

一、任务调度

注解类:@Scheduled

核心处理类:ScheduledAnnotationBeanPostProcessor

使用的线程池:从容器中查询TaskScheduler。

  • 首先在容器中通过类型查找TaskScheduler Bean,如果没有则抛出NoSuchBeanDefinitionException异常。
  • 在这一步中,如果找到多个,那么会在通过beanName=taskScheduler在容器中查找
  • 在上一步中抛出异常后会继续查找java.util.concurrent.ScheduledExecutorService 类型的Bean。
  • 在这一步中,如果找到多个,那么会在通过beanName=taskScheduler在容器中查找
  • 在上一步中还是没有则结束(程序并不会报错)

如果上面流程都没有找到,则会通过如下方式创建一个。

this.localExecutor = Executors.newSingleThreadScheduledExecutor();
 this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);

在Springboot中有个自动配置类会配置一个TaskSchedulingAutoConfiguration。

public class TaskSchedulingAutoConfiguration {
   @Bean
   @ConditionalOnBean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
   @ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })
   public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {
     return builder.build();
   }
   
   @Bean
   @ConditionalOnMissingBean
   public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties,
       ObjectProvider taskSchedulerCustomizers) {
     TaskSchedulerBuilder builder = new TaskSchedulerBuilder();
     builder = builder.poolSize(properties.getPool().getSize());
     Shutdown shutdown = properties.getShutdown();
     builder = builder.awaitTermination(shutdown.isAwaitTermination());
     builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
     builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
     builder = builder.customizers(taskSchedulerCustomizers);
     return builder;
   }
 }

二、异步任务

  • 注解类:Async。
  • 核心处理类:AsyncAnnotationBeanPostProcessor。

通过ProxyAsyncConfiguration配置,该类继承AbstractAsyncConfiguration。

在父类中会初始化,下面两个成员变量:

@Configuration(proxyBeanMethods = false)
 public abstract class AbstractAsyncConfiguration implements ImportAware {
   @Nullable
   protected Supplier executor;
   @Nullable
   protected Supplier exceptionHandler;
   // 在容器中查找AsyncConfigurer Bean 且只能有一个
   @Autowired
   void setConfigurers(ObjectProvider configurers) {
     Supplier configurer = SingletonSupplier.of(() -> {
       List candidates = configurers.stream().collect(Collectors.toList());
       if (CollectionUtils.isEmpty(candidates)) {
         return null;
       }
       if (candidates.size() > 1) {
         throw new IllegalStateException("Only one AsyncConfigurer may exist");
       }
       return candidates.get(0);
     });
     this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
     this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
   }
 
   private  Supplier adapt(Supplier supplier, Function provider) {
     return () -> {
       AsyncConfigurer configurer = supplier.get();
       return (configurer != null ? provider.apply(configurer) : null);
     };
   }
 }

使用的线程池:

  • 首先在容器中通过类型查找AsyncConfigurer Bean。
  • 如果没有则设置默认的AsyncConfigurer::getAsyncExecutor 该方法是接口中默认方法,返回的是null。
  • 在上一步中如果容器中没有AsyncConfigurer,那么设置到AsyncAnnotationBeanPostProcessor中也将就是null。
  • 初始化AsyncAnnotationBeanPostProcessor。
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
   @Nullable
   private Supplier executor;
   @Nullable
   private Supplier exceptionHandler;
   @Override
   public void setBeanFactory(BeanFactory beanFactory) {
     super.setBeanFactory(beanFactory);
     // 构建切面Advisor
     AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
     if (this.asyncAnnotationType != null) {
       advisor.setAsyncAnnotationType(this.asyncAnnotationType);
     }
     advisor.setBeanFactory(beanFactory);
     this.advisor = advisor;
   }
 }
 public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
   public AsyncAnnotationAdvisor(
       @Nullable Supplier executor, @Nullable Supplier exceptionHandler) {
 
     Set> asyncAnnotationTypes = new LinkedHashSet<>(2);
     asyncAnnotationTypes.add(Async.class);
     try {
       asyncAnnotationTypes.add((Class)
           ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
     }
     // 构建通知类
     this.advice = buildAdvice(executor, exceptionHandler);
     this.pointcut = buildPointcut(asyncAnnotationTypes);
   }
   protected Advice buildAdvice(
       @Nullable Supplier executor, @Nullable Supplier exceptionHandler) {
 
     AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
     // 调用父类方法
     interceptor.configure(executor, exceptionHandler);
     return interceptor;
   }
 }
 public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware { 
   public void configure(@Nullable Supplier defaultExecutor,
       @Nullable Supplier exceptionHandler) {
 
     // 如果defaultExecutor则调用getDefaultExecutor方法,该方法在子类重写了
     this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
     this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
   }
   protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
     if (beanFactory != null) {
       try {
         // 容器中查找TaskExecutor类型的Bean
         return beanFactory.getBean(TaskExecutor.class);
       } catch (NoUniqueBeanDefinitionException ex) {
         try {
           return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
         } catch (NoSuchBeanDefinitionException ex2) {
         }
       } catch (NoSuchBeanDefinitionException ex) {
         try {
           return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
         } catch (NoSuchBeanDefinitionException ex2) {
         }
         // Giving up -> either using local default executor or none at all...
       }
     }
     return null;
   }
 }
 public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
   protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
     // 先通过父类获取
     Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
     // 如果父类获取不到,则创建默认的SimpleAsyncTaskExecutor
     return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
   }
 }

总结:

  • 从容器中查找TaskExecutor Bean。
  • 上一步没有找到,则查找beanName=taskExecutor,类型为java.util.concurrent.Executor的Bean。
  • 如果上一步还是没有找到,那么最终创建默认的SimpleAsyncTaskExecutor 这是个没有上限的线程池,来一个任务创建新线程。

如果执行的异步任务很多且线程池,线程有限则多的任务会等待。

三、Web异步接口

RequestMappingHandlerAdapter。

public class RequestMappingHandlerAdapter {
   // 默认是一个没有上限的线程池
   private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("MvcAsync");
   protected ModelAndView invokeHandlerMethod(
     HttpServletRequest request,
     HttpServletResponse response, 
     HandlerMethod handlerMethod) throws Exception {
     // ...
     WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
     asyncManager.setTaskExecutor(this.taskExecutor);
     // ...
   }
 }

创建RequestMappingHandlerAdapter Bean对象。

继承关系。

public static class EnableWebMvcConfiguration extends DelegatingWebMvcConfiguration{}
 public class DelegatingWebMvcConfiguration extends WebMvcConfigurationSupport {
   private final WebMvcConfigurerComposite configurers = new WebMvcConfigurerComposite();
   // WebMvcConfigurer默认实现
   @Autowired(required = false)
   public void setConfigurers(List configurers) {
     if (!CollectionUtils.isEmpty(configurers)) {
       this.configurers.addWebMvcConfigurers(configurers);
     }
   }
   @Override
   protected void configureAsyncSupport(AsyncSupportConfigurer configurer) {
     this.configurers.configureAsyncSupport(configurer);
   }
 }
 public class WebMvcConfigurationSupport implements ApplicationContextAware, ServletContextAware {
   private AsyncSupportConfigurer asyncSupportConfigurer;
   @Bean
   public RequestMappingHandlerAdapter requestMappingHandlerAdapter(...) {
     // ...
     AsyncSupportConfigurer configurer = getAsyncSupportConfigurer();
     if (configurer.getTaskExecutor() != null) {
       adapter.setTaskExecutor(configurer.getTaskExecutor());
     }
     // ...
   }
   protected AsyncSupportConfigurer getAsyncSupportConfigurer() {
     if (this.asyncSupportConfigurer == null) {
       this.asyncSupportConfigurer = new AsyncSupportConfigurer();
       configureAsyncSupport(this.asyncSupportConfigurer);
     }
     return this.asyncSupportConfigurer;
   }
 }
 public class WebMvcAutoConfiguration {
   public static class WebMvcAutoConfigurationAdapter implements WebMvcConfigurer, ServletContextAware {
     public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
       // 判断容器中是否有beanName = applicationTaskExecutor 的Bean
       if (this.beanFactory.containsBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME)) {
         // 通过beanName = applicationTaskExecutor获取bean对象
         Object taskExecutor = this.beanFactory.getBean(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME);
         // 判断是否AsyncTaskExecutor对象
         if (taskExecutor instanceof AsyncTaskExecutor) {
           configurer.setTaskExecutor(((AsyncTaskExecutor) taskExecutor));
         }
       }
       Duration timeout = this.mvcProperties.getAsync().getRequestTimeout();
       if (timeout != null) {
         configurer.setDefaultTimeout(timeout.toMillis());
       }
     }
   }
 }

总结:

  • 默认使用SimpleAsyncTaskExecutor。
  • 如果容器中存在以beanName = applicationTaskExecutor 且 类型是 AsyncTaskExecutor, 则使用该bean。

到这你应该知道了这三者在线程池方面该如何正确配置及使用了。

相关内容

热门资讯

如何允许远程连接到MySQL数... [[277004]]【51CTO.com快译】默认情况下,MySQL服务器仅侦听来自localhos...
如何利用交换机和端口设置来管理... 在网络管理中,总是有些人让管理员头疼。下面我们就将介绍一下一个网管员利用交换机以及端口设置等来进行D...
施耐德电气数据中心整体解决方案... 近日,全球能效管理专家施耐德电气正式启动大型体验活动“能效中国行——2012卡车巡展”,作为该活动的...
20个非常棒的扁平设计免费资源 Apple设备的平面图标PSD免费平板UI 平板UI套件24平图标Freen平板UI套件PSD径向平...
德国电信门户网站可实时显示全球... 德国电信周三推出一个门户网站,直观地实时提供其安装在全球各地的传感器网络检测到的网络攻击状况。该网站...
为啥国人偏爱 Mybatis,... 关于 SQL 和 ORM 的争论,永远都不会终止,我也一直在思考这个问题。昨天又跟群里的小伙伴进行...
《非诚勿扰》红人闫凤娇被曝厕所... 【51CTO.com 综合消息360安全专家提醒说,“闫凤娇”、“非诚勿扰”已经被黑客盯上成为了“木...
2012年第四季度互联网状况报... [[71653]]  北京时间4月25日消息,据国外媒体报道,全球知名的云平台公司Akamai Te...