juc高并发-forkJoin
forkJoin模型与原理
定义与使用场景:在存在大数据量计算的情况之下,单线程效率较大,用到forkjoin模型,以提高计算效率
模型与处理逻辑:
1.先将一个大任务分解为多个小任务
2.各个小任务处理完成后,再将各自结果汇总
3.结果汇总前会等待其它任务完成。
原理分析:工作窃取 底层维护的是一个双端队列;
优点:工作效率高
缺点:容易造成资源竞争
如下图:B队列的线程处理完成后,会去抢A队列中的任务执行
常见两种api使用
RecursiveTask<Long>--带参数处理结果
RecursiveAction--不带参数处理结果
demo01:
/**
* 文件名: com.example.demo.JucDemo.分支合并 - ForkJoinTaskTest
* 文件简介: fork线程
* @author zhouyunjian
* @date 17:51 2020/5/1
* Copyright (c) 2019 ERAYT. All Rights Reserved.
*/
public class ForkJoinTaskTest extends RecursiveTask<Long> {
private Long start;
private Long end;
private static final Long tempLong = 10000L;
public ForkJoinTaskTest(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if(end-start<tempLong){
long sum=0;
for (long i=start;i<end;i++){
System.out.println("=======打印:"+i);
sum+=i;
}
return sum;
}else{
System.out.println("需要fork线程"+Thread.currentThread().getName());
long middle = (end + start) / 2;
ForkJoinTaskTest task1=new ForkJoinTaskTest(start,middle);
task1.fork();
ForkJoinTaskTest task2=new ForkJoinTaskTest(middle+1,end);
task2.fork();
return task1.join()+task2.join();
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTaskTest forkJoinWork = new ForkJoinTaskTest(0L, 15000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinWork);
//此步骤需要forkJoin中的所有子任务处理完成后才执行,会产生等待
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("times:"+(end-start)+" r=>"+sum);
}
}
结果值:times:284 r=>112485000
分析:submit.get()会阻塞等待,等待所有结果值都计算完成后才进行累加
demo02:
/**
* 文件名: com.example.demo.JucDemo.分支合并 - ForkJoinTaskTest
* 文件简介: fork线程
* @author zhouyunjian
* @date 17:51 2020/5/1
* Copyright (c) 2019 ERAYT. All Rights Reserved.
*/
public class ForkJoinActionTest extends RecursiveAction{
private Long start;
private Long end;
private static final Long tempLong = 20L;
public ForkJoinActionTest(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if(end-start<tempLong){
for (long i=start;i<end;i++){
System.out.println("=======打印:"+i);
}
}else{
System.out.println("需要fork线程"+Thread.currentThread().getName());
long middle = (end + start) / 2;
ForkJoinActionTest task1=new ForkJoinActionTest(start,middle);
task1.fork();
ForkJoinActionTest task2=new ForkJoinActionTest(middle+1,end);
task2.fork();
}
}
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinActionTest forkJoinWork = new ForkJoinActionTest(0L, 15000L);
ForkJoinTask submit = forkJoinPool.submit(forkJoinWork);
submit.join();
long end = System.currentTimeMillis();
//阻塞到所有子任务处理完成
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
System.out.println("times:"+(end-start)+" r=>");
}
}
结果值:times:133 r=>
分析:分子任务需要根据forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);方法进行等待,否则不会阻塞任务
应用场景分析
适合计算密集型的任务