李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
45.ThreadPoolExecutor线程池提交和关闭方法介绍
Leefs
2022-11-20 PM
1323℃
0条
[TOC] ### 一、线程池提交任务方法 线程的各种提交方式的概念: ```java // 执行任务 void execute(Runnable command); // 提交任务 task,用返回值 Future 获得任务执行结果,会有返回值
Future
submit(Callable
task); // 提交 tasks 中所有任务
List
> invokeAll(Collection extends Callable
> tasks) throws InterruptedException; // 提交 tasks 中所有任务,带超时时间,时间超时后,会放弃执行后面的任务
List
> invokeAll(Collection extends Callable
> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
T invokeAny(Collection extends Callable
> tasks) throws InterruptedException, ExecutionException; // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
T invokeAny(Collection extends Callable
> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; ``` #### submit方法 > submit方法与execute方法的区别在于: > > + **execute方法**:接收的参数是Runnable类型的参数没有返回值 > + **submit方法**:接收的是Callable类型的参数有返回值且返回值用`Future<>`接收 **示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * Created by lilinchao * Date 2022/11/20 * Description submit方法示例 */ @Slf4j(topic = "c.ThreadPoolDemo01") public class ThreadPoolDemo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { // 创建线程池 ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); //调用submit提交任务 Future
future = threadpool.submit(new Callable
() { @Override public String call() throws Exception { log.debug("running"); Thread.sleep(1000); return "ok"; } }); //获取线程的返回结果 log.debug("{}",future.get()); } } ``` **运行结果** ``` 17:03:36.791 [pool-1-thread-1] DEBUG c.ThreadPoolDemo01 - running 17:03:37.796 [main] DEBUG c.ThreadPoolDemo01 - ok ``` **说明** 从代码中我们看到submit()提交任务中实现了Callable接口,并在睡眠了1s后返回字符串“ok”,最后在主线程中调用get()方法,打印返回值到控制台。 #### invokeAll方法 > invokeAll方法接收的是一个任务集合且有返回值,线程池中的线程执行这个任务集合。 > > **等待所有的任务执行完成后统一返回。** **示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; /** * Created by lilinchao * Date 2022/11/20 * Description invokeAll方法示例 */ @Slf4j(topic = "c.ThreadPoolDemo02") public class ThreadPoolDemo02 { public static void main(String[] args) throws InterruptedException { //创建线程池 ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); //返回的是一个泛型为Future的集合 List
> futures = threadpool.invokeAll(Arrays.asList( () -> { log.debug("begin1"); Thread.sleep(1000); return "1"; }, () -> { log.debug("begin2"); Thread.sleep(500); return "2"; }, () -> { log.debug("begin3"); Thread.sleep(2000); return "3"; } )); //遍历输出结果 futures.forEach(f -> { try { log.debug("{}",f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } } ``` **运行结果** ``` 17:22:03.437 [pool-1-thread-1] DEBUG c.ThreadPoolDemo02 - begin1 17:22:03.437 [pool-1-thread-2] DEBUG c.ThreadPoolDemo02 - begin2 17:22:03.942 [pool-1-thread-2] DEBUG c.ThreadPoolDemo02 - begin3 17:22:05.946 [main] DEBUG c.ThreadPoolDemo02 - 1 17:22:05.948 [main] DEBUG c.ThreadPoolDemo02 - 2 17:22:05.948 [main] DEBUG c.ThreadPoolDemo02 - 3 ``` **结果分析** 从结果可以看出,任务1与任务2同时被执行,但是因为线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后,线程2执行任务3,中间差了0.5s 是因为任务2执行了0.5s,最终遍历返回结果并打印。 #### invokeAny方法 > invokeAny方法提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消。 **代码示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by lilinchao * Date 2022/11/20 * Description invokeAny方法示例 */ @Slf4j(topic = "c.ThreadPoolDemo03") public class ThreadPoolDemo03 { public static void main(String[] args) throws ExecutionException, InterruptedException { //创建线程池,核心线程数为3 ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); String future = threadpool.invokeAny(Arrays.asList( () -> { log.debug("begin1"); Thread.sleep(1000); log.debug("end1"); return "1"; }, () -> { log.debug("begin2"); Thread.sleep(500); log.debug("end2"); return "2"; }, () -> { log.debug("begin3"); Thread.sleep(2000); log.debug("end3"); return "3"; } )); log.debug("{}",future); } } ``` **运行结果** ``` 17:34:43.143 [pool-1-thread-1] DEBUG c.ThreadPoolDemo03 - begin1 17:34:43.147 [pool-1-thread-3] DEBUG c.ThreadPoolDemo03 - begin3 17:34:43.147 [pool-1-thread-2] DEBUG c.ThreadPoolDemo03 - begin2 17:34:43.648 [pool-1-thread-2] DEBUG c.ThreadPoolDemo03 - end2 17:34:43.648 [main] DEBUG c.ThreadPoolDemo03 - 2 ``` **结果分析** 程序中设置三个核心线程数,表示三个任务可以同时运行,不需要加入到任务队列中,从结果可以看出,任务2使用0.5s时间最先执行完,并返回结果,此时整个任务队列停止等待任务1和任务3继续执行,输出结果任务2。 ### 二、关闭线程池 #### shutdown > 线程池状态变为 SHUTDOWN > > - 不会接收新任务 > - 但已提交任务会执行完 > - 此方法不会阻塞调用线程的执行,比如如果主线程此时调用了这个shutDown方法,此时并不会阻塞主线程,如果主线程 ```java void shutdown(); ``` + **源码** ```java public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(SHUTDOWN); // 仅会打断空闲线程 interruptIdleWorkers(); onShutdown(); // 扩展点 ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) tryTerminate(); } ``` #### shutdownNow > 线程池状态变为 STOP > - 不会接收新任务 > - 会将队列中的任务返回 > - 并用 interrupt 的方式中断正在执行的任务 ```java List
shutdownNow(); ``` + **源码** ```java public List
shutdownNow() { List
tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 修改线程池状态 advanceRunState(STOP); // 打断所有线程 interruptWorkers(); // 获取队列中剩余任务 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终结 tryTerminate(); return tasks; } ``` #### **其他方法**(了解) ```java // 不在 RUNNING 状态的线程池,此方法就返回 true boolean isShutdown(); // 线程池状态是否是 TERMINATED boolean isTerminated(); // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待 // 一般task是Callable类型的时候不用此方法,因为futureTask.get方法自带等待功能。 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; ``` **代码示例** ```java import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.concurrent.*; /** * Created by lilinchao * Date 2022/11/20 * Description 1.0 */ @Slf4j(topic = "c.ThreadPoolDemo04") public class ThreadPoolDemo04 { public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); Future
result1 = threadpool.submit(() -> { log.debug("task 1 running..."); Thread.sleep(1000); log.debug("task 1 finish..."); return 1; }); Future
result2 = threadpool.submit(() -> { log.debug("task 2 running..."); Thread.sleep(1000); log.debug("task 2 finish..."); return 2; }); Future
result3 = threadpool.submit(() -> { log.debug("task 3 running..."); Thread.sleep(1000); log.debug("task 3 finish..."); return 3; }); // shutdown() 部分的代码 log.debug("shutdown"); threadpool.shutdown(); threadpool.submit(()->{ log.debug("task 4 running..."); Thread.sleep(1000); log.debug("task 4 finish"); return "4"; }); threadpool.awaitTermination(3, TimeUnit.SECONDS); log.debug("other..."); // shutdownNow部分代码 // log.debug("shutdownNow"); // List
runnables = threadpool.shutdownNow(); // log.debug("other.... {}" , runnables); } } ``` **shutdown运行结果** ``` 18:03:12.408 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 running... 18:03:12.408 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 running... 18:03:12.408 [main] DEBUG c.ThreadPoolDemo04 - shutdown 18:03:13.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 finish... 18:03:13.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 3 running... 18:03:13.412 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 finish... 18:03:14.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 3 finish... 18:03:14.412 [main] DEBUG c.ThreadPoolDemo04 - other... ``` **结果分析** 可以看出在shutdown后依然可以把 shutdown之前的任务运行完毕,但是shutdown之后的任务就没有再运行了。 另外awaitTermination方法的作用是等待shutdown部分的任务运行完后主线程再运行awaitTermination方法之后的代码。 **shutdownNow运行结果** ``` 18:06:29.592 [main] DEBUG c.ThreadPoolDemo04 - shutdownNow 18:06:29.592 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 running... 18:06:29.592 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 running... 18:06:29.595 [main] DEBUG c.ThreadPoolDemo04 - other.... [java.util.concurrent.FutureTask@6aa8ceb6] ``` **结果分析** 从结果可以看出在shutdownNow之后只有一个任务运行成功了,也就是别的任务都已经被打断了。
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2591.html
上一篇
44.Executors创建线程池方法介绍
下一篇
46.异步模式之工作线程
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Tomcat
Hive
pytorch
JavaSE
数学
Spark Core
链表
Stream流
SpringBoot
Java阻塞队列
人工智能
Jquery
Linux
Zookeeper
Yarn
LeetCode刷题
Netty
RSA加解密
DataWarehouse
Ubuntu
Redis
Golang基础
MyBatis
Git
递归
算法
栈
前端
Scala
查找
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭