Java ForkJoin
在大数据的Mapreduce中,其运算方式就是分而治之,把一个复杂的大的运算任务分成若干个小任务,单独计算后进行汇总得到结果,JDK1.7的ForkJoin(Fork拆分,Join合并)就是这种思想,分而治之,是一个多线程并行(不是并发)处理框架
并发和并行
ForkJoin特点
forkjoin和多个thread有啥区别 ?
- ForkJoin是分而治之,把大任务递归分解成多个小任务,能高效利用多个CPU,而多改个thread则没有递归分解,线程独立。
- ForkJoin可以进行任务窃取,即当某个线程完成了自己的任务后,它会去窃取其他线程的任务来执行,自动负载均衡,而多个thread就做不到,需要手动平衡。
- ForkJoin有ForkJoinPool线程池,可以自适应,会根据cpu核心动态调整,而多thread设置线程池的话就要手动设置参数,并且要处理线程生命周期。
总之ForkJoin是基于任务分解的并行框架,在处理任务时有一定的自动平衡,相对多thread不需要很多手动控制管理。
Forkjoin原理
ForkJoin框架是从jdk1.7中引入的新特性,和ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,默认值是计算机可用的CPU数量。
ForkJoinPool能够使用相对较少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概200万+个。问题的关键在于,对于一个父任务而言,只有当它所有的子任务完成之后,父任务才能够被执行。
所以如果使用ThreadPoolExecutor+分治法会存在问题,因为ThreadPoolExecutor中的线程做不到父子间的任务关系,而使用ForkJoinPool就能够解决这个问题,它就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
那么使用ThreadPoolExecutor或者ForkJoinPool,性能上会有什么差异呢?
首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,很显然这是不可行的,也是很不合理的!!
ForkJoin局限性
- 任务只能使用Fork和Join操作来进行同步机制,如果使用了其他同步机制,则在同步操作时,工作线程就不能执行其他任务了。比如,在Fork/Join框架中,使任务进行了睡眠,那么,在睡眠期间内,正在执行这个任务的工作线程将不会执行其他任务了。
- 在Fork/Join框架中,所拆分的任务适合能快速执行并且不会被长时间阻塞的计算操作,不适合执行IO操作,比如:读写数据文件。
因为ForkJoinPool
依赖于工作窃取机制来保持线程繁忙,当线程在执行 I/O 操作时被阻塞,它无法被其它线程窃取任务,从而导致资源利用不充分,降低整体效率。- 任务不能抛出检查异常,必须通过必要的代码来处理这些异常。
Demo
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;//如果任务足够小就计算任务boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分裂成两个子任务计算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待任务执行结束合并其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkjoinPool = new ForkJoinPool();//生成一个计算任务,计算1+2+3+4ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);//执行一个任务Future<Integer> result = forkjoinPool.submit(task);try {log.info("result:{}", result.get());} catch (Exception e) {log.error("exception", e);}}
}
CompletableFuture+ThreadPoolExecutor VS ForkJoin
首先ForkJoinPool的特点是分而治之、父子任务、工作窃取,适用于大量计算密集型的任务,例如分治算法、图像处理、排序、递归计算等。
我们知道CompletableFuture的api如thenApply
、thenCombine
、thenCompose
可以将多个异步任务进行组合和联动,从而实现任务分解和组合。但是需要手动分解,并且没有显示的父子关系,不能做到自动的任务拆分调度,适用于异步编程和需要串联多个异步任务的场景,例如处理 IO 密集型任务、异步 API 调用、任务链等。
所以说CompletableFuture更适合异步任务编排,没有强烈的递归性质的任务。ForkJoin更适合CPU计算的任务,如果需要大量io读写,那么不建议forkJoin。