李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
49.Fork&Join框架介绍
Leefs
2022-11-24 PM
1260℃
0条
[TOC] ### 一、概述 Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的**CPU密集型运算**。 + 所谓的**任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解**。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。 + Fork/Join 在**分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成**,进一步提升了运算效率。 + Fork/Join 默认会创建与CPU核心数大小相同的线程池。 ### 二、Fork/Join框架介绍 **Fork/Join框架主要包含三个模块**: + 线程池:`ForkJoinPool` + 任务对象: `ForkJoinTask` (包括`RecursiveTask`、`RecursiveAction` 和 `CountedCompleter`) + 执行Fork/Join任务的线程: `ForkJoinWorkerThread` ![49.Fork&Join框架介绍01.png](https://lilinchao.com/usr/uploads/2022/11/210904555.png) **三者的关系** `ForkJoinPool`可以通过池中的`ForkJoinWorkerThread`来处理`ForkJoinTask`任务。 `ForkJoinPool` 只接收 `ForkJoinTask` 任务(在实际使用中,也可以接收 Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 `ForkJoinTask` 类型的任务),`RecursiveTask` 是 `ForkJoinTask` 的子类,是一个可以递归执行的 `ForkJoinTask`,`RecursiveAction` 是一个无返回值的 `RecursiveTask`,`CountedCompleter` 在任务完成执行后会触发执行一个自定义的钩子函数。 在实际运用中,我们一般都会继承 `RecursiveTask` 、`RecursiveAction` 或 `CountedCompleter` 来实现我们的业务需求,而不会直接继承 `ForkJoinTask` 类。 ### 三、Fork/Join框架核心思想 #### 3.1 分治思想(Divide-and-Conquer) 分治算法(Divide-and-Conquer)把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。 首先看一下 Fork/Join 框架的任务运行机制如下图所示: ![49.Fork&Join框架介绍02.png](https://lilinchao.com/usr/uploads/2022/11/2765276694.png) - **fork()**:开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。 - **join()**:等待该任务的处理线程处理完毕,获得返回值。 当然并不会每个 fork 都会创建新线程, 也不是每个 join 都会造成线程被阻塞, 而是采取work-stealing 原理。 #### 3.2 work-stealing(工作窃取)算法 线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。 这种特性使得 `ForkJoinPool` 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的 `ForkJoinPool` 时,对不需要合并(join)的事件类型任务也非常适用。 在 `ForkJoinPool` 中,线程池中每个工作线程(`ForkJoinWorkerThread`)都对应一个任务队列(`WorkQueue`),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。 **具体思路如下**: - 每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。 - 队列支持三个功能push、pop、poll - push/pop只能被队列的所有者线程调用,而poll可以被其他线程调用。 - 划分的子任务调用fork时,都会被push到自己的队列中。 - 默认情况下,工作线程从自己的双端队列获取任务并执行。 - 当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务 ![49.Fork&Join框架介绍03.png](https://lilinchao.com/usr/uploads/2022/11/4009308786.png) ### 四、基本使用 > 案例:计算对 `1~n` 之间的整数求和的任务 **注意**:提交给 Fork/Join 线程池的任务需要继承 `RecursiveTask`(有返回值)或 `RecursiveAction`(没有返回值) #### 通过递归来实现 ![49.Fork&Join框架介绍04.png](https://lilinchao.com/usr/uploads/2022/11/4189210341.png) **代码实现** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * Created by lilinchao * Date 2022/11/24 * Description 计算对 1~n 之间的整数求和的任务 */ public class AddTaskDemo01 { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); System.out.println(pool.invoke(new AddTask1(5))); } } @Slf4j(topic = "c.AddTask") class AddTask1 extends RecursiveTask
{ int n; public AddTask1(int n) { this.n = n; } @Override public String toString() { return "{" + n + "}"; } @Override protected Integer compute() { // 如果 n 已经为1,可以求得结果 if ( n == 1) { log.debug(" join() {}",n); return n; } // 将任务进行拆分(fork) AddTask1 t1 = new AddTask1(n - 1); // 安排在当前任务正在运行的池中异步执行此任务(如果适用) t1.fork(); log.debug("fork() {} + {}",n,t1); // 合并(join)结果 int result = n + t1.join(); log.debug("join() {} + {} = {}", n, t1, result); return result; } } ``` **运行结果** ``` 21:50:06.238 [ForkJoinPool-1-worker-2] DEBUG c.AddTask - fork() 4 + {3} 21:50:06.238 [ForkJoinPool-1-worker-1] DEBUG c.AddTask - fork() 5 + {4} 21:50:06.238 [ForkJoinPool-1-worker-3] DEBUG c.AddTask - fork() 3 + {2} 21:50:06.242 [ForkJoinPool-1-worker-3] DEBUG c.AddTask - join() 1 21:50:06.238 [ForkJoinPool-1-worker-0] DEBUG c.AddTask - fork() 2 + {1} 21:50:06.242 [ForkJoinPool-1-worker-0] DEBUG c.AddTask - join() 2 + {1} = 3 21:50:06.243 [ForkJoinPool-1-worker-3] DEBUG c.AddTask - join() 3 + {2} = 6 21:50:06.243 [ForkJoinPool-1-worker-2] DEBUG c.AddTask - join() 4 + {3} = 10 21:50:06.243 [ForkJoinPool-1-worker-1] DEBUG c.AddTask - join() 5 + {4} = 15 15 ``` #### 优化ForkJoin 可以在计算的时候中间再次拆分任务 ![49.Fork&Join框架介绍05.png](https://lilinchao.com/usr/uploads/2022/11/3048897368.png) **代码实现** ```java import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * Created by lilinchao * Date 2022/11/24 * Description 1.0 */ public class AddTaskDemo02 { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); System.out.println(pool.invoke(new AddTask2(1, 5))); } } @Slf4j(topic = "c.AddTask2") class AddTask2 extends RecursiveTask
{ int begin; int end; public AddTask2(int begin, int end) { this.begin = begin; this.end = end; } @Override public String toString() { return "{" + begin + "," + end + "}"; } @Override protected Integer compute() { // 当 begin == end 时, 无法再进行拆分 // 5,5 if (begin == end) { log.debug("join() {}", begin); return begin; } // 4, 5 if (end - begin == 1) { log.debug("join() {} + {} = {}", begin, end, end + begin); return end + begin; } // 1 5 int mid = (end + begin) / 2; // 3 AddTask2 t1 = new AddTask2(begin, mid); // 1,3 t1.fork(); AddTask2 t2 = new AddTask2(mid + 1, end); // 4,5 t2.fork(); log.debug("fork() {} + {} = ?", t1, t2); int result = t1.join() + t2.join(); log.debug("join() {} + {} = {}", t1, t2, result); return result; } } ``` **运行结果** ``` 21:58:03.222 [ForkJoinPool-1-worker-0] DEBUG c.AddTask2 - join() 1 + 2 = 3 21:58:03.222 [ForkJoinPool-1-worker-2] DEBUG c.AddTask2 - fork() {1,2} + {3,3} = ? 21:58:03.222 [ForkJoinPool-1-worker-1] DEBUG c.AddTask2 - fork() {1,3} + {4,5} = ? 21:58:03.222 [ForkJoinPool-1-worker-3] DEBUG c.AddTask2 - join() 4 + 5 = 9 21:58:03.226 [ForkJoinPool-1-worker-1] DEBUG c.AddTask2 - join() 3 21:58:03.226 [ForkJoinPool-1-worker-2] DEBUG c.AddTask2 - join() {1,2} + {3,3} = 6 21:58:03.226 [ForkJoinPool-1-worker-1] DEBUG c.AddTask2 - join() {1,3} + {4,5} = 15 15 ``` *附参考文章链接* *https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ForkJoinPool.html*
标签:
并发编程
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2602.html
上一篇
48.Tomcat线程池简单介绍
下一篇
50.AQS实现原理介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Stream流
正则表达式
Scala
字符串
二叉树
JavaScript
Quartz
Nacos
MySQL
国产数据库改造
Hadoop
Jquery
Http
Yarn
Spark RDD
Eclipse
线程池
JVM
HDFS
Golang
GET和POST
VUE
DataX
查找
Java
随笔
MyBatis
稀疏数组
机器学习
算法
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭