Spring TaskExecutor Scheduling and Async
Spring 提供了异步执行的TaskExeutor 和 TaskScheduler 的接口。对于有时候在web服务器想要定时任务时和异步请求接口,可以很简单的去完成。
What is Executor?
这里引用spring官方document 中的一段话:
Executors are the JDK name for the concept of thread pools. The “executor” naming is due to the fact that there is no guarantee that the underlying implementation is actually a pool; an executor may be single-threaded or even synchronous. Spring’s abstraction hides implementation details between Java SE and Java EE environments.
The TaskExecutor was originally created to give other Spring components an abstraction for thread pooling where needed.
TaskExecutor 最初被创建主要是给spring其他组件提供一个线程池。
TaskExecutor Types
This implementation does not reuse any threads, rather it starts up a new thread for each invocation. However, it does support a concurrency limit which will block any invocations that are over the limit until a slot has been freed up. If you are looking for true pooling, see the discussions of SimpleThreadPoolTaskExecutor and ThreadPoolTaskExecutor below.
Example:private static void testSimpleAsyncTaskExecutor() { SimpleAsyncTaskExecutor sexc = new SimpleAsyncTaskExecutor(); //最多执行10个任务 sexc.setConcurrencyLimit(10); ThreadFactory threadFactory = Executors.defaultThreadFactory(); sexc.setThreadFactory(threadFactory); for (int i = 0; i < 30; i++) { System.out.println("提交了task" + i); sexc.execute(new MessagePrinterTask("I am printing" + i)); } }
提交了task0 提交了task1 提交了task2 提交了task3 14时44分38秒--Thread : Thread[pool-2-thread-1,5,main] I am printing0 提交了task4 14时44分38秒--Thread : Thread[pool-2-thread-2,5,main] I am printing1 提交了task5 提交了task6 14时44分38秒--Thread : Thread[pool-2-thread-3,5,main] I am printing2 提交了task7 提交了task8 14时44分38秒--Thread : Thread[pool-2-thread-4,5,main] I am printing3 提交了task9 14时44分38秒--Thread : Thread[pool-2-thread-5,5,main] I am printing4 提交了task10 14时44分38秒--Thread : Thread[pool-2-thread-6,5,main] I am printing5 14时44分38秒--Thread : Thread[pool-2-thread-9,5,main] I am printing8 ... 提交了task29 14时44分58秒--Thread : Thread[pool-2-thread-29,5,main] I am printing28 14时44分58秒--Thread : Thread[pool-2-thread-30,5,main] I am printing29
public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { this.concurrencyThrottle.beforeAccess(); doExecute(new ConcurrencyThrottlingRunnable(taskToUse)); } else { doExecute(taskToUse); } }
protected void beforeAccess() { if (this.concurrencyLimit == NO_CONCURRENCY) { throw new IllegalStateException( "Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY"); } if (this.concurrencyLimit > 0) { boolean debug = logger.isDebugEnabled(); synchronized (this.monitor) { boolean interrupted = false; while (this.concurrencyCount >= this.concurrencyLimit) { if (interrupted) { throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " + "but concurrency limit still does not allow for entering"); } if (debug) { logger.debug("Concurrency count " + this.concurrencyCount + " has reached limit " + this.concurrencyLimit + " - blocking"); } try { this.monitor.wait(); } catch (InterruptedException ex) { // Re-interrupt current thread, to allow other threads to react. Thread.currentThread().interrupt(); interrupted = true; } } if (debug) { logger.debug("Entering throttle at concurrency count " + this.concurrencyCount); } this.concurrencyCount++; } } }
private class ConcurrencyThrottlingRunnable implements Runnable { private final Runnable target; public ConcurrencyThrottlingRunnable(Runnable target) { this.target = target; } @Override public void run() { try { this.target.run(); } finally { concurrencyThrottle.afterAccess(); } } }
protected void afterAccess() { if (this.concurrencyLimit >= 0) { synchronized (this.monitor) { this.concurrencyCount--; if (logger.isDebugEnabled()) { logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount); } this.monitor.notify(); } } }
其实看到这,实现原理就已经知道很明了了。使用了一个wait and notify模型,实现了控制并发线程数。
This implementation doesn’t execute invocations asynchronously. Instead, each invocation takes place in the calling thread. It is primarily used in situations where multi-threading isn’t necessary such as simple test cases.
exampleprivate static void testSyncTaskExecutor() { SyncTaskExecutor sexc = new SyncTaskExecutor(); for (int i = 0; i < 30; i++) { System.out.println("提交了task" + i); sexc.execute(new MessagePrinterTask("I am printing" + i)); } }
提交了task0 15时28分26秒--Thread : Thread[main,5,main] I am printing0 提交了task1 15时28分36秒--Thread : Thread[main,5,main] I am printing1 提交了task2 15时28分46秒--Thread : Thread[main,5,main] I am printing2
public void execute(Runnable task) { Assert.notNull(task, "Runnable must not be null"); task.run(); }
This implementation is an adapter for a java.util.concurrent.Executor object. There is an alternative, ThreadPoolTaskExecutor, that exposes the Executor configuration parameters as bean properties. It is rare to need to use the ConcurrentTaskExecutor, but if the ThreadPoolTaskExecutor isn’t flexible enough for your needs, the ConcurrentTaskExecutor is an alternative
exampleprivate static void testConcurrentTaskExecutor() { ConcurrentTaskExecutor cexc = new ConcurrentTaskExecutor(); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(19); ThreadPoolExecutor exc = new ThreadPoolExecutor(3, 30, 30000, TimeUnit.SECONDS, queue); cexc.setConcurrentExecutor(exc); for (int i = 0; i < 10; i++) { System.out.println("提交了task" + i); cexc.execute(new MessagePrinterTask("I am printing" + i)); } }
result提交了task0 提交了task1 提交了task2 提交了task3 15时37分21秒--Thread : Thread[pool-2-thread-1,5,main] I am printing0 提交了task4 提交了task5 15时37分21秒--Thread : Thread[pool-2-thread-3,5,main] I am printing2 15时37分21秒--Thread : Thread[pool-2-thread-2,5,main] I am printing1 提交了task6 提交了task7 提交了task8 提交了task9 15时37分31秒--Thread : Thread[pool-2-thread-1,5,main] I am printing3 15时37分31秒--Thread : Thread[pool-2-thread-3,5,main] I am printing4 15时37分31秒--Thread : Thread[pool-2-thread-2,5,main] I am printing5 15时37分41秒--Thread : Thread[pool-2-thread-1,5,main] I am printing6 15时37分41秒--Thread : Thread[pool-2-thread-2,5,main] I am printing8 15时37分41秒--Thread : Thread[pool-2-thread-3,5,main] I am printing7 15时37分51秒--Thread : Thread[pool-2-thread-1,5,main] I am printing9
- SimpleThreadPoolTaskExecutor
This implementation is actually a subclass of Quartz’s SimpleThreadPool which listens to Spring’s lifecycle callbacks. This is typically used when you have a thread pool that may need to be shared by both Quartz and non-Quartz components.
This implementation is the most commonly used one. It exposes bean properties for configuring a java.util.concurrent.ThreadPoolExecutor and wraps it in a TaskExecutor. If you need to adapt to a different kind of java.util.concurrent.Executor, it is recommended that you use a ConcurrentTaskExecutor instead.
exampleprivate static void testThreadPoolTaskExecutor() { RejectedExecutionHandler rejectionHandler = new RejectHandler(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); ThreadPoolTaskExecutor exc = new ThreadPoolTaskExecutor(); exc.setThreadFactory(threadFactory); exc.setRejectedExecutionHandler(rejectionHandler); exc.setQueueCapacity(4); exc.setCorePoolSize(5); exc.setMaxPoolSize(7); exc.initialize(); for (int i = 0; i < 11; i++) { System.out.println("提交了task" + i); exc.execute(new MessagePrinterTask("I am printing" + i)); } exc.getThreadPoolExecutor().shutdown(); }
JavaDoc:When a new task is submitted […], and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks.
提交了task0 提交了task1 提交了task2 提交了task3 提交了task4 提交了task5 15时43分48秒--Thread : Thread[pool-1-thread-2,5,main] I am printing1 提交了task6 15时43分48秒--Thread : Thread[pool-1-thread-3,5,main] I am printing2 15时43分48秒--Thread : Thread[pool-1-thread-1,5,main] I am printing0 提交了task7 提交了task8 提交了task9 15时43分48秒--Thread : Thread[pool-1-thread-4,5,main] I am printing3 15时43分48秒--Thread : Thread[pool-1-thread-5,5,main] I am printing4 提交了task10 15时43分48秒--Thread : Thread[pool-1-thread-6,5,main] I am printing9 15时43分48秒--Thread : Thread[pool-1-thread-7,5,main] I am printing10 15时43分58秒--Thread : Thread[pool-1-thread-2,5,main] I am printing6 15时43分58秒--Thread : Thread[pool-1-thread-3,5,main] I am printing7 15时43分58秒--Thread : Thread[pool-1-thread-1,5,main] I am printing5 15时43分58秒--Thread : Thread[pool-1-thread-7,5,main] I am printing8
假如MaxPoolSize+QueueCapacity< task的数量,会报错:
比如我们修改队列长度为QueueCapacity=3,提交了task0 提交了task1 提交了task2 提交了task3 提交了task4 16时04分53秒--Thread : Thread[pool-1-thread-1,5,main] I am printing0 16时04分53秒--Thread : Thread[pool-1-thread-2,5,main] I am printing1 提交了task5 16时04分53秒--Thread : Thread[pool-1-thread-3,5,main] I am printing2 提交了task6 提交了task7 提交了task8 16时04分53秒--Thread : Thread[pool-1-thread-5,5,main] I am printing4 提交了task9 16时04分53秒--Thread : Thread[pool-1-thread-4,5,main] I am printing3 提交了task10 16时04分53秒--Thread : Thread[pool-1-thread-6,5,main] I am printing8 16时04分53秒--Thread : Thread[pool-1-thread-7,5,main] I am printing9 Exception in thread "main" org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@5c1a8622[Running, pool size = 7, active threads = 7, queued tasks = 3, completed tasks = 0]] did not accept task: com.sap.csc.ems.configuration.util.TaskExecutorTest$MessagePrinterTask@5ad851c9 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:296) at com.sap.csc.ems.configuration.util.TaskExecutorTest.testThreadPoolTaskExecutor(TaskExecutorTest.java:126) at com.sap.csc.ems.configuration.util.TaskExecutorTest.main(TaskExecutorTest.java:49) Caused by: java.util.concurrent.RejectedExecutionException: 我满了,I am printing10失败 at com.sap.csc.ems.configuration.util.TaskExecutorTest$RejectHandler.rejectedExecution(TaskExecutorTest.java:163) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:293) ... 2 more 16时05分03秒--Thread : Thread[pool-1-thread-1,5,main] I am printing5 16时05分03秒--Thread : Thread[pool-1-thread-2,5,main] I am printing6 16时05分03秒--Thread : Thread[pool-1-thread-3,5,main] I am printing7
此部分来至spring 官方文档
In addition to the TaskExecutor abstraction, Spring 3.0 introduces a TaskScheduler with a variety of methods for scheduling tasks to run at some point in the future.
public interface TaskScheduler {
ScheduledFuture schedule(Runnable task, Trigger trigger);
ScheduledFuture schedule(Runnable task, Date startTime);
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
The simplest method is the one named ‘schedule’ that takes a Runnable and Date only. That will cause the task to run once after the specified time. All of the other methods are capable of scheduling tasks to run repeatedly. The fixed-rate and fixed-delay methods are for simple, periodic execution, but the method that accepts a Trigger is much more flexible.
- 添加注解在springboot启动类
@SpringBootApplication @EnableScheduling public class MiddlewareMain { public static void main(String[] args) { SpringApplication.run(MiddlewareMain.class, args); } }
- 在你需要定时的方法上添加@Scheduled注解
@Component public class UploadData { @Scheduled(initialDelay=10000,fixedRate=4000) public void sendData() { System.out.println("Thread:"+Thread.currentThread()+"开始"); System.out.println("Thread:"+Thread.currentThread()+"---"+dateFormat.format(System.currentTimeMillis())+"--- 我正在执行sendData!"); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread:"+Thread.currentThread()+"结束"); } }
Use Annotation Config Scheduling
public void doSomething() {
// something that should execute periodically
public void doSomething() {
// something that should execute periodically
public void doSomething() {
// something that should execute periodically
Seconds Minutes Hours DayofMonth Month DayofWeek Year
或 Seconds Minutes Hours DayofMonth Month DayofWeek
@Scheduled(cron="*/5 * * * * MON-FRI")
public void doSomething()
// something that should execute periodically
0 0 10,14,16 ? 每天上午10点,下午2点,4点
0 0/30 9-17 ? 朝九晚五工作时间内每半小时
0 15 10 ? 每天上午10:15触发
0 15 10 ? 每天上午10:15触发
0 15 10 ? 每天上午10:15触发
0 15 10 ? 2005 2005年的每天上午10:15触发
0 14 ? 在每天下午2点到下午2:59期间的每1分钟触发
0 0/5 14 ? 在每天下午2点到下午2:55期间的每5分钟触发
0 0/5 14,18 ? 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
0 0-5 14 ? 在每天下午2点到下午2:05期间的每1分钟触发
0 10,44 14 ? 3 WED 每年三月的星期三的下午2:10和2:44触发
0 15 10 ? MON-FRI 周一至周五的上午10:15触发
0 15 10 15 ? 每月15日上午10:15触发
0 15 10 L ? 每月最后一日的上午10:15触发
0 15 10 ? 6L 每月的最后一个星期五上午10:15触发
0 15 10 ? 6L 2002-2005 2002年至2005年的每月的最后一个星期五上午10:15触发
0 15 10 ? 6#3 每月的第三个星期五上午10:15触发
public class ScheduleConfig implements SchedulingConfigurer
public void configureTasks(ScheduledTaskRegistrar taskRegistrar)
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
public class ScheduleConfig implements SchedulingConfigurer
public void configureTasks(ScheduledTaskRegistrar taskRegistrar)
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
taskRegistrar.addFixedRateTask(new IntervalTask(
new Runnable() {
public void run() {
10000, 0));
- 在SpringApplication启动类上加上注解
@SpringBootApplication @EnableAsync public class MiddlewareMain { public static void main(String[] args) { SpringApplication.run(MiddlewareMain.class, args); } }
- 在你想要异步执行的方法上加上@Async注解
@Async public void testAsync(String a){ System.out.println("Thread:"+Thread.currentThread()+"开始"); System.out.println("Thread:"+Thread.currentThread()+"---"+dateFormat.format(System.currentTimeMillis())+"--- 我正在执行sendData!"+a); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } if("AAAA".equals(a)){ throw new RuntimeException("666666666666666666666666666"); } System.out.println("Thread:"+Thread.currentThread()+"结束"); }
AsyncConfigurer interfaces
- 配置Eexcutor
AsyncConfigurer接口可以改变你异步任务的Executor,我们可以使用Thread:Thread[SimpleAsyncTaskExecutor-1,5,main]开始 Thread:Thread[SimpleAsyncTaskExecutor-1,5,main]---2018-01-17 16时03分43秒 --- 我正在执行sendData2!AAAA1 Thread:Thread[SimpleAsyncTaskExecutor-1,5,main]结束
可以在控制台看到执行器改变:@Override @Configuration public class ExcutorConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler(); executor.setBeanName("myExecutor"); executor.setPoolSize(5); executor.initialize(); return executor; } ... }
Thread:Thread[myExecutor-1,5,main]开始 Thread:Thread[myExecutor-1,5,main]---2018-01-17 16时09分19秒 --- 我正在执行sendData!AAAA1 Thread:Thread[myExecutor-1,5,main]结束
- 配置ExceptionHandler
console:@Configuration public class ExcutorConfig implements AsyncConfigurer { @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { System.out.println("methodName:"+method.getName()+" Modifier"+method.getModifiers()+" param:"+(String)params[0]); System.out.println("执行出错了!!!"+ex); } }; } }
Thread:Thread[myExecutor-1,5,main]开始 Thread:Thread[myExecutor-1,5,main]---2018-01-17 16时13分09秒 --- 我正在执行sendData!AAAA methodName:testAsync Modifier1 param:AAAA 执行出错了!!!java.lang.RuntimeException: 666666666666666666666666666
- @Async支持namespace配置,支持不同任务使用不同的Executor
使用secondExecutor@Bean("secondExecutor") public Executor MyExecutor() { ThreadPoolTaskScheduler beanExecutor = new ThreadPoolTaskScheduler(); beanExecutor.setBeanName("secondExecutor"); beanExecutor.setPoolSize(5); beanExecutor.initialize(); return beanExecutor; }
console:@Async("secondExecutor") public void testAsync(String a){ System.out.println("Thread:"+Thread.currentThread()+"开始"); System.out.println("Thread:"+Thread.currentThread()+"---"+dateFormat.format(System.currentTimeMillis())+"--- 我正在执行sendData!"+a); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } if("AAAA".equals(a)){ throw new RuntimeException("666666666666666666666666666"); } System.out.println("Thread:"+Thread.currentThread()+"结束"); }
Thread:Thread[secondExecutor-1,5,main]开始 Thread:Thread[secondExecutor-1,5,main]---2018-01-17 16时18分01秒 --- 我正在执行sendData!AAAA1 Thread:Thread[secondExecutor-1,5,main]结束
- 使用Future
主线程等待执行完成:@Async public Future<String> testAsync2(String a){ System.out.println("Thread2:"+Thread.currentThread()+"开始"); System.out.println("Thread2:"+Thread.currentThread()+"---"+dateFormat.format(System.currentTimeMillis())+"--- 我正在执行sendData2!"+a); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } if("AAAA".equals(a)){ throw new RuntimeException("22222222222222"); } System.out.println("Thread2:"+Thread.currentThread()+"结束"); //return CompletableFuture.completedFuture(a); return new AsyncResult<String>(a); }
@RequestMapping("/testAsync2") public String hello2(@PathParam("a") String a){ Future<String> f = uploadData.testAsync2(a); try { System.out.println(f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return "OK2"; }