首先来看下EnableScheduling的javadoc:
@EnableScheduling启用了Spring的任务调度功能,这跟在xml中配置<task:*> 是一样的,它可以加在@Configuration上:
@Configuration@EnableSchedulingpublic class AppConfig {// various @Bean definitions}
下面的代码可以在容器中的MyTask这个bean上查找到@Scheduled注解,比如:
package com.myco.tasks;public class MyTask {@Scheduled(fixedRate=1000)public void work() {// task execution logic}}
下面的配置可以保证MyTask.work()这个方法每一秒调用一次:
@Configuration@EnableSchedulingpublic class AppConfig {@Beanpublic MyTask task() {return new MyTask();}}
此外,如果MyTask上添加了@Component注解,下面的配置也是一样的效果:
@Configuration@EnableScheduling//这里添加了组件扫描,只要能扫描到@Scheduled就行@ComponentScan(basePackages="com.myco.tasks")public class AppConfig {}
被@Scheduled注解修饰的方法也可以直接添加在@Configuration的类内部:
@Configuration@EnableSchedulingpublic class AppConfig {@Scheduled(fixedRate=1000)public void work() {// task execution logic}}
默认查找调度器的逻辑是:要么是唯一的一个类型是org.springframework.scheduling.TaskScheduler的bean,要么是一个名字是taskScheduler类型是TaskScheduler的bean,如果还没找到,继续查找唯一的ScheduledExecutorService,如果没有,继续查找名字是taskScheduler的ScheduledExecutorService。如果还没找到,Spring会创建一个默认的单线程的调度器。
如果想对调度器做更多的定制化,可以注入实现了SchedulingConfigurer接口的bean,这样就可以访问ScheduledTaskRegistrar实例了,下面的例子演示了如何自定义Executor:
@Configuration@EnableSchedulingpublic class AppConfig implements SchedulingConfigurer {@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {taskRegistrar.setScheduler(taskExecutor());}@Bean(destroyMethod="shutdown")public Executor taskExecutor() {return Executors.newScheduledThreadPool(100);}}
注意上面代码中的@Bean(destroyMethod="shutdown"),这个保证了Spring容器关闭的时候executor也可以正常关闭。
实现了SchedulingConfigurer接口的时候,也可以对ScheduledTaskRegistrar里面的任务注册做更细粒度的控制,比如,下面的代码演示了每当Trigger触发的时候,都要执行特定bean的方法:
@Configuration@EnableSchedulingpublic class AppConfig implements SchedulingConfigurer {@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {taskRegistrar.setScheduler(taskScheduler());taskRegistrar.addTriggerTask(new Runnable() {public void run() {myTask().work();}},new CustomTrigger());}@Bean(destroyMethod="shutdown")public Executor taskScheduler() {return Executors.newScheduledThreadPool(42);}@Beanpublic MyTask myTask() {return new MyTask();}}
下面的代码展示了如何用xml的方式配置任务:
<beans><task:annotation-driven scheduler="taskScheduler"/><task:scheduler id="taskScheduler" pool-size="42"/><task:scheduled-tasks scheduler="taskScheduler"><task:scheduled ref="myTask" method="work" fixed-rate="1000"/></task:scheduled-tasks><bean id="myTask" class="com.foo.MyTask"/></beans>
上面用xml的方式配置了一个fixed-rate任务,跟用java方式是一样的,但是java方式更强大。
注意:@EnableScheduling只能作用在它自己的context中,因为context是存在父子关系的,如果是在web context或其他context中,需要重新声明@EnableScheduling 。
看下源码的处理:
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)//这种方式就不用过多解释了@Import(SchedulingConfiguration.class)@Documentedpublic @interface EnableScheduling {}
继续看SchedulingConfiguration:
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}
没什么好说的,只是创建了一个名字叫org.springframework.context.annotation.internalScheduledAnnotationProcessor类型是ScheduledAnnotationBeanPostProcessor的bean,继续看ScheduledAnnotationBeanPostProcessor:
public class ScheduledAnnotationBeanPostProcessorimplements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean
继承结构非常简单,只实现了一些回调接口,从MergedBeanDefinitionPostProcessor可知道,这个也是个BeanPostProcessor,那它里面有没有使用aop呢?继续往下面看。
看下构造函数:
public ScheduledAnnotationBeanPostProcessor() {this.registrar = new ScheduledTaskRegistrar();}
这个是用来实际注册task和执行task的类,首先创建出来,然后容器会回调:ScheduledAnnotationBeanPostProcessor#postProcessAfterInitialization:
@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {...Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);if (!this.nonAnnotatedClasses.contains(targetClass)) {//这里就是在查找标记了@Scheduled和@Schedules的方法Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);return (!scheduledMethods.isEmpty() ? scheduledMethods : null);});if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);if (logger.isTraceEnabled()) {logger.trace("No @Scheduled annotations found on bean class: " + targetClass);}}else {//如果找到了@Scheduled,遍历,调用processScheduled()// Non-empty set of methodsannotatedMethods.forEach((method, scheduledMethods) ->scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));if (logger.isTraceEnabled()) {logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +"': " + annotatedMethods);}}}return bean;}
继续看ScheduledAnnotationBeanPostProcessor#processScheduled,这里面会去解析所有的task:
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {try {Runnable runnable = createRunnable(bean, method);...Set<ScheduledTask> tasks = new LinkedHashSet<>(4);...// 这里是解析cron表达式,创建CronTask,然后把task加入到registrar中String cron = scheduled.cron();if (StringUtils.hasText(cron)) {String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}if (StringUtils.hasLength(cron)) {Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");processedSchedule = true;if (!Scheduled.CRON_DISABLED.equals(cron)) {TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}}}// 这里是解析FixedDelay,创建FixedDelayTask,然后把task加入到registrar中// Check fixed delaylong fixedDelay = scheduled.fixedDelay();if (fixedDelay >= 0) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}String fixedDelayString = scheduled.fixedDelayString();if (StringUtils.hasText(fixedDelayString)) {if (this.embeddedValueResolver != null) {fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);}if (StringUtils.hasLength(fixedDelayString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedDelay = parseDelayAsLong(fixedDelayString);}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));}}// 这里是解析FixedRate,创建FixedRateTask,然后把task加入到registrar中// Check fixed ratelong fixedRate = scheduled.fixedRate();if (fixedRate >= 0) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}String fixedRateString = scheduled.fixedRateString();if (StringUtils.hasText(fixedRateString)) {if (this.embeddedValueResolver != null) {fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);}if (StringUtils.hasLength(fixedRateString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;try {fixedRate = parseDelayAsLong(fixedRateString);}catch (RuntimeException ex) {throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");}tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));}}// 最后把所有的task都存放到自己的scheduledTasks中synchronized (this.scheduledTasks) {Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));regTasks.addAll(tasks);}}catch (IllegalArgumentException ex) {throw new IllegalStateException("Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());}}
解析完了所有的task之后,准备开始执行:
private void finishRegistration() {//默认调度器是nullif (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}//取出容器中所有的SchedulingConfigurer,定制registrarif (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}//有任务,但是没有调度器,下面的逻辑就是查找调度器,代码有删减if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {try { //首先按类型查找TaskSchedulerthis.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));}catch (NoUniqueBeanDefinitionException ex) {try {//然后按名称查找TaskScheduler,名称是:taskSchedulerthis.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));}catch (NoSuchBeanDefinitionException ex2) {...}}catch (NoSuchBeanDefinitionException ex) {try {//继续按照类型查找ScheduledExecutorServicethis.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));}catch (NoUniqueBeanDefinitionException ex2) {try {//继续按照名称查找ScheduledExecutorService,名称是:taskSchedulerthis.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));}catch (NoSuchBeanDefinitionException ex3) {}}catch (NoSuchBeanDefinitionException ex2) {}}}//最后执行afterPropertiesSetthis.registrar.afterPropertiesSet();}
上面定制了registrar,查找了调度器,看下真正的执行ScheduledTaskRegistrar#scheduleTasks:
protected void scheduleTasks() {//如果调度器为null,则创建一个ConcurrentTaskScheduler,使用的是单线程的Executor。if (this.taskScheduler == null) {this.localExecutor = Executors.newSingleThreadScheduledExecutor();this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}//下面是分别执行4种类型的任务if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}}
整体逻辑非常简单,具体使用的demo可以参考:https://github.com/xjs1919/enumdemo下面的 schedule-demo。




