李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
47.任务调度线程池介绍
Leefs
2022-11-22 PM
1159℃
0条
[TOC] ### 前言 JDK 1.5开始提供`ScheduledThreadPoolExecutor`类,`ScheduledThreadPoolExecutor`类继承`ThreadPoolExecutor`类重用线程池实现了任务的周期性调度功能。 在JDK 1.5之前,实现任务的周期性调度主要使用的是Timer类和`TimerTask`类。 本文将简单介绍`ScheduledThreadPoolExecutor`类与Timer类的区别,`ScheduledThreadPoolExecutor`类相比于Timer类来说,究竟有哪些优势,以及二者分别实现任务调度的简单示例。 ### 一、Timer与TimerTask概述 + 使用 Timer 实现任务调度的核心类是 Timer 和 `TimerTask`。 + 其中 Timer 负责设定 `TimerTask` 的起始与间隔执行时间。 + 使用者只需要创建一个 `TimerTask` 的继承类,实现自己的 run 方法,然后将其丢给 Timer 去执行即可。 + Timer 的设计核心是一个 `TaskList` 和一个 `TaskThread`。 + Timer 将接收到的任务丢到自己的 `TaskList` 中,`TaskList` 按照 Task 的最初执行时间进行排序。 + `TimerThread` 在创建 Timer 时会启动成为一个守护线程。这个线程会轮询所有任务,找到一个最近要执行的任务,然后休眠,当到达最近要执行任务的开始时间点,`TimerThread` 被唤醒并执行该任务。 + 之后 `TimerThread` 更新最近一个要执行的任务,继续休眠。 **优缺点** Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,**同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务**。 ### 二、Timer与TimerTask方法介绍 + #### Timer类 | Timer方法 | 用途 | | ------------------------------------------------------------ | ------------------------------------------------------------ | | Timer() | 创建一个计时器并启动该计时器 | | cancel() | 停止该计时器,并放弃所有已安排的任务,对当前正在执行的任务没有影响 | | purge() | 将所有已经取消的任务移除,用于释放内存空间 | | schedule(TimerTask task,Date time) | 安排一个任务在指定的时间执行,如果已经超过该时间,则立即执行 | | schedule(TimerTask task,Date firstTime,long period) | 安排一个任务在指定的时间执行,然后以固定的频率重复执行 | | scheduleAtFixedRate(TimerTask task,Date firstTime,long period) | 安排一个任务在指定的时间执行,然后以近似固定的频率重复执行 | | schedule(TimerTask task,long delay) | 安排一个任务在一段时间后执行 | | schedule(TimerTask task,long delay,long period) | 安排一个任务在一段时间后执行,然后以固定的频率重复执行 | | scheduleAtFixedRate(TimerTask task,long delay,long period) | 安排一个任务在一段时间后执行,然后以近似固定的频率重复执行 | + #### TimerTask抽象类 | imerTask方法 | 用途 | | --------------- | ------------------------------------------------------------ | | cancel() | 用于终止此任务。如果该任务只执行一次且还没有执行,则永远不会再执行;如果为重复执行的任务,则之后不会再执行;如果该任务正在执行,则执行完后不会再执行 | | run() | 该任务所要执行的具体操作 | | ExecutionTime() | 返回最近一次要执行该任务的时间,如果正在执行,则返回此任务的执行安排时间,一般再run()方法中调用,用于判断当前是否有足够的时间来执行 | **代码示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.Timer; import java.util.TimerTask; import static com.lilinchao.concurrent.utils.Sleeper.sleep; /** * Created by lilinchao * Date 2022/11/22 * Description 1.0 */ @Slf4j(topic = "c.ThreadPoolDemo05") public class ThreadPoolDemo05 { public static void main(String[] args) { Timer timer = new Timer(); TimerTask task1 = new TimerTask() { @Override public void run() { log.debug("task 1"); // int i = 1 / 0; sleep(2); } }; TimerTask task2 = new TimerTask() { @Override public void run() { log.debug("task 2"); } }; timer.schedule(task1, 1000); timer.schedule(task2, 1000); } } ``` **运行结果** ``` 20:52:25.363 [Timer-0] DEBUG c.ThreadPoolDemo05 - task 1 20:52:27.371 [Timer-0] DEBUG c.ThreadPoolDemo05 - task 2 ``` **结果分析** + 使用 timer 添加两个任务,希望它们都在 1s 后执行; + 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行。 修改上述代码,去掉`int i = 1 / 0;`中的注释,使task1任务在运行使报错,在执行代码查看运行结果。 ``` 20:58:17.594 [Timer-0] DEBUG c.ThreadPoolDemo05 - task 1 Exception in thread "Timer-0" java.lang.ArithmeticException: / by zero at com.lilinchao.concurrent.demo_05.ThreadPoolDemo05$1.run(ThreadPoolDemo05.java:23) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) ``` 从结果可以看出,当`task1`运行报错时,将不在接着向下执行`task2`任务,而是直接退出程序。 ### 三、ScheduledThreadPoolExecutor #### 3.1 调度方法ScheduleAtFixedRate和ScheduleWithFixedDelay概述 + `ScheduleAtFixedRate` 每次执行时间为上一次任务开始起向后推一个时间间隔,即每次执行时间为 `initialDelay`, `initialDelay+period`, `initialDelay+2*period`, …; + `ScheduleWithFixedDelay` 每次执行时间为上一次任务结束起向后推一个时间间隔,即每次执行时间为:`initialDelay`, `initialDelay+executeTime+delay`, `initialDelay+2*executeTime+2*delay`。 `ScheduleAtFixedRate` 是基于固定时间间隔进行任务调度,`ScheduleWithFixedDelay` 取决于每次任务执行的时间长短,是基于不固定时间间隔进行任务调度。 #### 3.2 延时执行 使用 `Executors` 提供的带任务调度线程池对象来创建延时线程 **代码示例** ```java import java.io.IOException; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by lilinchao * Date 2022/11/22 * Description 1.0 */ @Slf4j(topic = "c.ThreadPoolDemo06") public class ThreadPoolDemo06 { public static void main(String[] args) throws IOException { ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); // 添加两个任务,希望它们都在 1s 后执行 executor.schedule(() -> { log.debug("task1 execute time {}" , new Date()); try { Thread.sleep(2000); } catch (InterruptedException e) { } }, 1, TimeUnit.SECONDS); executor.schedule(() -> { log.debug("task2 execute time {}" , new Date()); }, 1, TimeUnit.SECONDS); // 阻塞当前线程 System.in.read(); } } ``` **运行结果** ``` 21:15:42.462 [pool-1-thread-1] DEBUG c.ThreadPoolDemo06 - task1 execute time Tue Nov 22 21:15:42 CST 2022 21:15:42.462 [pool-1-thread-2] DEBUG c.ThreadPoolDemo06 - task2 execute time Tue Nov 22 21:15:42 CST 2022 ``` 可以看到两个任务是同一时间执行的, 并不会因为某个任务执行时间慢影响其他的任务。 #### 3.3 定时执行 + ##### scheduleAtFixedRate > 任务执行时间小于周期(周期不变) **示例** 在1s后执行任务,之后每2s执行一次任务 ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by lilinchao * Date 2022/11/22 * Description 任务执行时间小于周期(周期不变) */ @Slf4j(topic = "c.ThreadPoolDemo07") public class ThreadPoolDemo07 { public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); log.debug("start..."); // 在1s后执行任务,之后每2s执行一次任务 pool.scheduleAtFixedRate(() -> { log.debug("running..."); }, 1, 2, TimeUnit.SECONDS); } } ``` **运行结果** ``` 21:27:37.856 [main] DEBUG c.ThreadPoolDemo07 - start... 21:27:38.928 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... 21:27:40.925 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... 21:27:42.927 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... 21:27:44.925 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... 21:27:46.926 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... ......... ``` > 任务执行时间大于周期(此时周期为任务的执行时间) **示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by lilinchao * Date 2022/11/22 * Description 任务执行时间大于周期(此时周期为任务的执行时间) */ @Slf4j(topic = "c.ThreadPoolDemo07") public class ThreadPoolDemo07 { public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); log.debug("start..."); // 在1s后执行任务 之后每2s执行一次任务 pool.scheduleAtFixedRate(() -> { log.debug("running..."); try { // 任务休眠时间(3s)大于执行周期(2s) Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }, 1, 2, TimeUnit.SECONDS); } } ``` **运行结果** ``` 21:34:12.647 [main] DEBUG c.ThreadPoolDemo07 - start... 21:34:13.710 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... 21:34:16.710 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... 21:34:19.711 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... 21:34:22.711 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... 21:34:25.711 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... ........... ``` **结果分析** 一开始,延时 1s,接下来,由于`任务执行时间 > 间隔时间`,间隔被『撑』到了 3s。 + ##### scheduleWithFixedDelay `scheduleWithFixedDelay`中任务真正的执行周期为: **任务的执行时间 + 执行周期** **代码示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by lilinchao * Date 2022/11/22 * Description 1.0 */ @Slf4j(topic = "c.ThreadPoolDemo08") public class ThreadPoolDemo08 { public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); log.debug("start..."); pool.scheduleWithFixedDelay(() -> { log.debug("running..."); try { // 任务睡眠5s Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } // 任务在1s后执行,执行周期为2s }, 1, 2, TimeUnit.SECONDS); } } ``` **运行结果** ``` 21:44:22.093 [main] DEBUG c.ThreadPoolDemo08 - start... 21:44:23.150 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running... 21:44:30.152 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running... 21:44:37.152 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running... 21:44:44.153 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running... 21:44:51.156 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running... 21:44:58.157 [pool-1-thread-1] DEBUG c.ThreadPoolDemo08 - running... ....... ``` 从结果可以看出,除了开始运行延迟1s以外,剩下的每次执行时间间隔为7s,即:任务睡眠时间5s+执行周期2s > 评价 > > 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务 #### 3.4 正确处理线程中的异常 ##### 现象 ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by lilinchao * Date 2022/11/22 * Description 不进行异常处理 */ @Slf4j(topic = "c.ThreadPoolDemo07") public class ThreadPoolDemo07 { public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); log.debug("start..."); // 在1s后执行任务,之后每2s执行一次任务 pool.scheduleAtFixedRate(() -> { log.debug("running..."); //此处程序会出现异常 int number = 1 / 0; }, 1, 2, TimeUnit.SECONDS); } } ``` **运行结果** ``` 21:50:53.877 [main] DEBUG c.ThreadPoolDemo07 - start... 21:50:54.936 [pool-1-thread-1] DEBUG c.ThreadPoolDemo07 - running... ``` **结果分析** 当任务执行遇到异常时,程序会一直停留在异常这里不在向下运行。那么我们该怎么正确的处理异常呢? ##### 主动捕捉异常 ```java import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by lilinchao * Date 2022/11/22 * Description 主动捕捉异常 */ @Slf4j(topic = "c.ThreadPoolDemo09") public class ThreadPoolDemo09 { public static void main(String[] args) throws IOException { // 主动捕捉异常 ExecutorService pool = Executors.newFixedThreadPool(2); pool.execute(() -> { log.debug("task1 ..."); try { int i = 1 / 0; } catch (Exception e) { log.error("error : {}" , e); } }); System.in.read(); } } ``` **运行结果** ``` 21:55:20.752 [pool-1-thread-1] DEBUG c.ThreadPoolDemo09 - task1 ... 21:55:20.759 [pool-1-thread-1] ERROR c.ThreadPoolDemo09 - error : {} java.lang.ArithmeticException: / by zero at com.lilinchao.concurrent.demo_05.ThreadPoolDemo09.lambda$main$0(ThreadPoolDemo09.java:23) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ##### 使用Future获取异常 ```java import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * Created by lilinchao * Date 2022/11/22 * Description 使用Future获取异常 */ @Slf4j(topic = "c.ThreadPoolDemo09") public class ThreadPoolDemo09 { public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2); Future
future = pool.submit(() -> { log.debug("task1 ..."); int i = 1 / 0; return true; }); // 通过 future.get() 可以获取异常的结果 log.debug("result : {}" , future.get()); System.in.read(); } } ``` **运行结果** ``` 21:57:48.509 [pool-1-thread-1] DEBUG c.ThreadPoolDemo09 - task1 ... Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.lilinchao.concurrent.demo_05.ThreadPoolDemo09.main(ThreadPoolDemo09.java:42) Caused by: java.lang.ArithmeticException: / by zero at com.lilinchao.concurrent.demo_05.ThreadPoolDemo09.lambda$main$0(ThreadPoolDemo09.java:38) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### 四、线程池定时任务 > 需求:如何让每周四 18:00:00 定时执行任务? **基本思路** + 计算 `initialDelay` : 计算当前时间和周四的时间差 : `now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0)`; + 如果当前时间已经超过本周四 `18:00:00.000`, 那么找下周四 `18:00:00.000`; + 计算时间 : 1000 * 3600 * 24 * 7 ; ```java import lombok.extern.slf4j.Slf4j; import java.time.DayOfWeek; import java.time.Duration; import java.time.LocalDateTime; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @SuppressWarnings("all") @Slf4j(topic = "c.ThreadPoolScheduleTest") public class ThreadPoolScheduleTest { public static void main(String[] args) { // 获取当前的时间 LocalDateTime now = LocalDateTime.now(); // 获取本周四 18:00:00.00 LocalDateTime thursday = now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0); // 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000 if (now.compareTo(thursday) > 0) { thursday = thursday.plusWeeks(1); } // 计算延迟执行的时间差 long initialDelay = Duration.between(now, thursday).toMillis(); // 计算时间间隔 : 1周的毫秒值 long oneWeek = 1000 * 3600 * 24 * 7 ; ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); log.debug("开始时间 : {}" , new Date()); executorService.scheduleAtFixedRate(() ->{ log.debug("开始执行时间 : {} " , new Date()); }, initialDelay, oneWeek, TimeUnit.MILLISECONDS); } } ``` ### 总结 **线程角度** - Timer是单线程模式,如果某个TimerTask任务的执行时间比较久,会影响到其他任务的调度执行。 - ScheduledThreadPoolExecutor是多线程模式,并且重用线程池,某个ScheduledFutureTask任务执行的时间比较久,不会影响到其他任务的调度执行。 **系统时间敏感度** - Timer调度是基于操作系统的绝对时间的,对操作系统的时间敏感,一旦操作系统的时间改变,则Timer的调度不再精确。 - ScheduledThreadPoolExecutor调度是基于相对时间的,不受操作系统时间改变的影响。 **是否捕获异常** - Timer不会捕获TimerTask抛出的异常,加上Timer又是单线程的。一旦某个调度任务出现异常,则整个线程就会终止,其他需要调度的任务也不再执行。 - ScheduledThreadPoolExecutor基于线程池来实现调度功能,某个任务抛出异常后,其他任务仍能正常执行。 **是否支持对任务排序** - Timer不支持对任务的排序。 - ScheduledThreadPoolExecutor类中定义了一个静态内部类DelayedWorkQueue,DelayedWorkQueue类本质上是一个有序队列,为需要调度的每个任务按照距离下次执行时间间隔的大小来排序 **能否获取返回的结果** - Timer中执行的TimerTask类只是实现了java.lang.Runnable接口,无法从TimerTask中获取返回的结果。 - `ScheduledThreadPoolExecutor`中执行的`ScheduledFutureTask`类继承了`FutureTask`类,能够通过Future来获取返回的结果。 *附参考文章链接* *https://bbs.huaweicloud.com/blogs/379935* *https://blog.csdn.net/I_r_o_n_M_a_n/article/details/120588894*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2593.html
上一篇
46.异步模式之工作线程
下一篇
48.Tomcat线程池简单介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
MySQL
FastDFS
Sentinel
设计模式
NIO
并发编程
容器深入研究
Kibana
Shiro
二叉树
BurpSuite
Netty
Filter
Scala
高并发
Stream流
MyBatis
JVM
持有对象
线程池
Spark
Redis
ClickHouse
锁
Java
Hbase
Elastisearch
工具
字符串
SpringCloudAlibaba
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭