JAVA中的Fork/Join框架
什么是fork/join框架
fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。
fork/join框架与其它ExecutorService
的实现类相似,会给线程池中的线程分发任务,不同之处在于它使用了工作窃取算法,所谓工作窃取,指的是对那些处理完自身任务的线程,会从其它线程窃取任务执行。
fork/join框架的核心是ForkJoinPool
类,该类继承了AbstractExecutorService类。ForkJoinPool
实现了工作窃取算法并且能够执行 ForkJoinTask
任务。
基本使用方法
在使用fork/join框架之前,我们需要先对任务进行分割,任务分割代码应该跟下面的伪代码类似:
if (任务足够小){ 直接执行该任务;
}else{ 将任务一分为二; 执行这两个任务并等待结果;
}
首先,我们会在ForkJoinTask的子类中封装以上代码,不过一般我们会使用更加具体的ForkJoinTask类型,如 RecursiveTask
(可以返回一个结果)或RecursiveAction
。
当写好ForkJoinTask的子类后,创建该对象,该对象代表了所有需要完成的任务;然后将这个任务对象传给ForkJoinPool实例的invoke()去执行即可。
Fork-Join框架的实现原理为:
首先,分割任务,只要任务的粒度超过阀值,就不停地将任务分拆为小任务;然后,将分割的任务任务添加到双端队列中,启动线程从双端队列获取任务执行,将执行结果统一放到一个队列中;
最后,再启动一个线程合并结果队列的值。
Fork-Join框架涉及的主要类如下:
RecursiveAction:用于没有返回值的任务。
RecursiveTask:用于需要返回值的任务,通过泛型参数设置计算的返回值类型。
ForkJoinPool:提供了一系列的submit方法,计算ForkJoinTask(需要实现computer方法)。
- public class CountTaskTmp extends RecursiveTask<Integer>{
- private static final int THRESHOLD = 2;
- private int start;
- private int end;
- public CountTaskTmp(int start, int end) {
- this.start = start;
- this.end = end;
- }
- @Override
- protected Integer compute() {
- int sum = 0;
- boolean canCompute = (end - start) <= THRESHOLD;
- if (canCompute) {
- for (int i = start; i <= end; i++)
- sum += i;
- } else {
- //如果任务大于阀值,就分裂成两个子任务计算
- int mid = (start + end) / 2;
- CountTask leftTask = new CountTask(start, mid);
- CountTask rightTask = new CountTask(mid+1, end);
- //执行子任务
- leftTask.fork();
- rightTask.fork();
- //等待子任务执行完,并得到结果
- int leftResult = (int)leftTask.join();
- int rightResult = (int)rightTask.join();
- sum = leftResult + rightResult;
- }
- return sum;
- }
- public static void main(String[] args) {
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- //生成一个计算资格,负责计算1+2+3+4
- CountTask task = new CountTask(1, 4);
- Future<Integer> result = forkJoinPool.submit(task);
- try {
- System.out.println(result.get());
- } catch (Exception e) {
- }
- }
- }