知识点复习14.1 ForkJoinPool
1. 什么是ForkJoinPool
ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。
可以设置最大并发线程数,大的任务拆分成多的小任务
1.1 work-stealing(工作窃取算法)
work-stealing(工作窃取),ForkJoinPool提供了一个更有效的利用线程的机制,当ThreadPoolExecutor还在用单个队列存放任务时,ForkJoinPool已经分配了与线程数相等的队列,当有任务加入线程池时,会被平均分配到对应的队列上,各线程进行正常工作,当有线程提前完成时,会从队列的末端“窃取”其他线程未执行完的任务,当任务量特别大时,CPU多的计算机会表现出更好的性能。
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
线程池监控
在线程池使用监控方面,主要通过如下方法:
isTerminated—判断线程池对应的workQueue中是否有待执行任务未执行完;
awaitTermination—判断线程池是否在约定时间内完成,并返回完成状态;
getQueuedSubmissionCount—获取所有待执行的任务数;
getRunningThreadCount—获取正在运行的任务数。
4. java8 ParallelStreams
Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。
使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。
其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。
案列一:通过多线程分多个小任务进行打印数据 无返回值的
案列二:通过多线程分多个小任务进行数据累加 返回结果集
总结
1,invokeAll(task)方法,主动执行其它的ForkJoinTask,并等待Task完成。(同步的)
2,fork方法,让一个task执行(异步的)
3,join方法,让一个task执行(同步的,它和fork不同点是同步或者异步的区别)
4,可以使用join来取得ForkJoinTask的返回值。由于RecursiveTask类实现了Future接口,所以也可以使用get()取得返回值。
get()和join()有两个主要的区别:
join()方法不能被中断。如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。
如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。
5,ForkJoinTask在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。
使用fork/invoke方法执行时,其实原理也是在ForkJoinPool里执行,只不过使用的是一个“在ForkJoinPool内部生成的静态的”ForkJoinPool。
6,ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。他们之间的区别是,RecursiveAction没有返回值,RecursiveTask有返回值。
7,看看ForkjoinTask的Complete方法的使用场景
这个方法好要是用来使一个任务结束。这个方法被用在结束异步任务上,或者为那些能不正常结束的任务,提供一个选择。
8,Task的completeExceptionally方法是怎么回事。
这个方法被用来,在异步的Task中产生一个exception,或者强制结束那些“不会结束”的任务
这个方法是在Task想要“自己结束自己”时,可以被使用。而cancel方法,被设计成被其它TASK调用。
当你在一个任务中抛出一个未检查异常时,它也影响到它的父任务(把它提交到ForkJoinPool类的任务)和父任务的父任务,以此类推。
9,可以使用ForkJoinPool.execute(异步,不返回结果)、invoke(同步,返回结果)、submit(异步,返回结果)方法,来执行ForkJoinTask。
10,ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。 在jdk1.8里面才有。文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”(Runtime.getRuntime().availableProcessors() - 1)。
ForkJoinTask自己启动时,使用的就是这个静态实例。
ForkJoinPool核心是work-stealing算法,翻译过来叫"工作窃取"算法,有点别扭,还是叫work-stealing吧。
ForkJoinPool里有三个重要的角色:
- ForkJoinWorkerThread(下文简称worker):包装Thread;
- WorkQueue:任务队列,双向;
- ForkJoinTask:worker执行的对象,实现了Future。两种类型,一种叫submission,另一种就叫task。
ForkJoinPool使用数组保存所有WorkQueue(下文经常出现的WorkQueue[]),每个worker有属于自己的WorkQueue,但不是每个WorkQueue都有对应的worker。
- 没有worker的WorkQueue:保存的是submission,来自外部提交,在WorkQueue[]的下标是偶数;
- 属于worker的WorkQueue:保存的是task,在WorkQueue[]的下标是奇数。
WorkQueue是一个双端队列,同时支持LIFO(last-in-first-out)的push和pop操作,和FIFO(first-in-first-out)的poll操作,分别操作top端和base端。worker操作自己的WorkQueue是LIFO操作(可选FIFO),除此之外,worker会尝试steal其他WorkQueue里的任务,这个时候执行的是FIFO操作。
分开两端取任务的好处:
- LIFO操作只有对应的worker才能执行,push和pop不需要考虑并发;
- 拆分时,越大的任务越在WorkQueue的base端,尽早分解,能够尽快进入计算。
光看概念一知半解,我们进入ForkJoinPool的代码。本文首先从构造函数和类开始了解ForkJoinPool的基本参数,下篇再详细过一遍流程。
任务ForkJoinTask
ForkJoinPool执行任务的对象是ForkJoinTask,它是一个抽象类,有两个具体实现类RecursiveAction和RecursiveTask。
ForkJoinTask的抽象方法exec由RecursiveAction和RecursiveTask实现,它被定义为final,具体的执行步骤compute延迟到子类实现。很容易看出RecursiveAction和RecursiveTask的区别,前者没有result,getRawResult返回空,它们对应不需要返回结果和需要返回结果两种场景。
ForkJoinTask里很重要的字段是它的状态status,默认是0,当得出结果时变更为负数,有三种结果:
- NORMAL
- CANCELLED
- EXCEPTIONAL
除此之外,在得出结果之前,任务状态能够被设置为SIGNAL,表示有线程等待这个任务的结果,执行完成后需要notify通知,具体看后文的join。
ForkJoinTask在触发执行后,并不支持其他什么特别操作,只能等待任务执行完成。CountedCompleter是ForkJoinTask的子类,它在子任务协作方面扩展了更多操作。我们聚焦ForkJoinPool主线流程,CountedCompleter相关内容另文再介绍。
WorkQueue
WorkQueue是一个双端队列,它定义在ForkJoinPool类里。
scanState描述WorkQueue当前状态:
- 偶数表示RUNNING
- 奇数表示SCANNING
- 负数表示inactive
stackPred是WorkQueue组成TreiberStack时,保存前者的字段。
base和top分别指向WorkQueue的两端,小小区别是base带上了volatile,回答了对top端push和pop不需要考虑并发这个优点。
操作WorkQueue前需要锁定,记录在字段qlock:
- 1:锁定;
- 0:未锁定;
- 负数:对应的worker已经撤销注册,WorkQueue也就终止使用。
WorkQueue也有config,不要和ForkJoinPool的config混淆了。WorkQueue的config记录了在WorkQueue[]的下标和当前mode。
ForkJoinPool状态修改
- STARTED
- STOP
- TERMINATED
- SHUTDOWN
- RSLOCK
- RSIGNAL
runState记录了ForkJoinPool的运行状态,除了SHUTDOWN是负数,其他都是正数。前面四种不用说了,线程池标准状态流转。在多线程环境修改runState,不能简单想改就改,需要先获取锁,RSLOCK和RSIGNAL就用在这里。
修改前调用lockRunState锁定,检查当前状态,尝试一次使用CAS修改runState为RSLOCK。需要状态变化的机会很少,大多数时间一次就能成功,但不能排除少几率的竞争,这时候进入awaitRunStateLock。
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Java7 ForkJoinPool使用及源码解析
Java7中引入了一种新的并发框架-Fork/Join,Fork/Join采用分治+work-stealing的思想,Fork/Join相教于其他并发框架有其适合的使用场景:如果一个任务能够被分为多个子任务,通过组合这些子任务的结果就能获得最终结果,那么这项任务就适合用Fork/Join模式解决。
一、ForkJoinPool使用示例
如上说述,递归问题比较适合用ForkJoin框架解决,如求Fibonacci数列的第N项值: F(n) = F(n-1) + F(n-2),实现如下:
二、ForkJoinPool解析
- 2.1 ForkJoinPool构造函数
ForkJoinPool一共有3个共有构造函数,如下
execute方法和submit方法大致逻辑相同,都是先对任务做非空校验再讲任务压入ForkJoinPool中的执行队列,唯一不同的是submit会把任务对象本身返回,返回后可以通过get()方法获取执行结果。和execute方法、submit方法不同,invoke方法将任务压入队列后会执行join()方法,阻塞当前线程。
- 2.3 ForkJoinWorkerThreadFactory与ForkJoinWorkerThread
ForkJoinPool提供了一个线程创建工厂接口并提供了默认实现,通过该工厂可以创建ForkJoinWorkerThread线程
ForkJoinWorkerThread主要有两个成员变量,一个是线程所归属的ForkJoinPool线程池,另一个是存放ForkJoinTask的任务队列,可见每个ForkJoinWorkerThread线程都维护一个任务队列,该队列提供LIFO和FIFO两种出队方式
- 2.4 ForkJoinTask.fork()方法
fork()方法判断当前线程是否是ForkJoinWorkerThread的实例,如果是则将任务压入当前线程的队列中;如果不是则将任务压入ForkJoinPool的公用线程池的执行队列中