Java多线程处理之Fork/Join框架
有一些业务场景,需要计算的数据量比较大,这时候如果单线程去处理,必定占用较多的执行时间,Java并发包中有没有给我们提供一种类似大数据处理中的MapReduce的处理方式呢?
但凡成熟的语言,它的API设计必然符合常识,为解决特定环境下的问题而生。于是,上述问题的答案是,有。
那就是Java的Fork/Join框架:ForkJoinTask!
Fork:英文翻译为叉子,分叉,意思是可以将一个庞然大物分割成很多小的部分去执行。Join:就是把分割计算的内容,再合体求和。正所谓分久必合合久必分。
那么分割是不是可以随意而任性的分割?当然不行,因为单机CPU和内存资源是有限的,如果不加考虑的分割,任由线程满天飞,自然会导致系统崩溃宕机,于是,还需要一个线程池来管理这些分割的线程。在此,JDK也为我们准备了准备用于此类场景的线程池:ForkJoinPool线程池子。
下面是Demo示例:
public class CountTask extends RecursiveTask<Long>{
private static final int THREADHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THREADHOLD;
if (canCompute) {
// 如果执行任务数量小于规定的执行单位数量,则一蹴而就,执行完毕
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果要执行的任务数量很大很多,则分解为100个子任务分别取执行,并把子任务加入到一个list里,以便后续join
long step = (start + end) / 100;
List<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if (lastOne > end) {
lastOne = end;
}
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();
}
// 將子任務的執行結果join
for (CountTask t : subTasks) {
sum += t.join();
}
}
return sum;
}
public static void main(String[] args) {
// 此处定义一个ForkJoinPool线程池!
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 定义要计算的任务,即0到20万的数相加的计算任务
CountTask task = new CountTask(0, 200000L);
// 将该任务提交到线程池执行,并且使用了submit方法
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
// 阻塞等待执行结果
long res = result.get();
System.out.println("sum = " + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
----------------------------------
执行结果:
sum = 20000100000