知识点复习14.1 ForkJoinPool

 

1. 什么是ForkJoinPool
  ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。

可以设置最大并发线程数,大的任务拆分成多的小任务

知识点复习14.1 ForkJoinPool

1.1 work-stealing(工作窃取算法)
  work-stealing(工作窃取),ForkJoinPool提供了一个更有效的利用线程的机制,当ThreadPoolExecutor还在用单个队列存放任务时,ForkJoinPool已经分配了与线程数相等的队列,当有任务加入线程池时,会被平均分配到对应的队列上,各线程进行正常工作,当有线程提前完成时,会从队列的末端“窃取”其他线程未执行完的任务,当任务量特别大时,CPU多的计算机会表现出更好的性能。

 ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
  线程池监控
  在线程池使用监控方面,主要通过如下方法:
  isTerminated—判断线程池对应的workQueue中是否有待执行任务未执行完;
  awaitTermination—判断线程池是否在约定时间内完成,并返回完成状态;
  getQueuedSubmissionCount—获取所有待执行的任务数;
  getRunningThreadCount—获取正在运行的任务数。
4. java8 ParallelStreams
  Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。

使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法来执行指定任务了。

其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。


案列一:通过多线程分多个小任务进行打印数据  无返回值的

案列二:通过多线程分多个小任务进行数据累加  返回结果集

 

总结
1,invokeAll(task)方法,主动执行其它的ForkJoinTask,并等待Task完成。(同步的)

2,fork方法,让一个task执行(异步的)

3,join方法,让一个task执行(同步的,它和fork不同点是同步或者异步的区别)

4,可以使用join来取得ForkJoinTask的返回值。由于RecursiveTask类实现了Future接口,所以也可以使用get()取得返回值。 
get()和join()有两个主要的区别: 
join()方法不能被中断。如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。 
如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。

5,ForkJoinTask在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。 
使用fork/invoke方法执行时,其实原理也是在ForkJoinPool里执行,只不过使用的是一个“在ForkJoinPool内部生成的静态的”ForkJoinPool。

6,ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。他们之间的区别是,RecursiveAction没有返回值,RecursiveTask有返回值。

7,看看ForkjoinTask的Complete方法的使用场景 
这个方法好要是用来使一个任务结束。这个方法被用在结束异步任务上,或者为那些能不正常结束的任务,提供一个选择。

8,Task的completeExceptionally方法是怎么回事。 
这个方法被用来,在异步的Task中产生一个exception,或者强制结束那些“不会结束”的任务 
这个方法是在Task想要“自己结束自己”时,可以被使用。而cancel方法,被设计成被其它TASK调用。 
当你在一个任务中抛出一个未检查异常时,它也影响到它的父任务(把它提交到ForkJoinPool类的任务)和父任务的父任务,以此类推。

9,可以使用ForkJoinPool.execute(异步,不返回结果)、invoke(同步,返回结果)、submit(异步,返回结果)方法,来执行ForkJoinTask。

10,ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。 在jdk1.8里面才有。文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”(Runtime.getRuntime().availableProcessors() - 1)。 
ForkJoinTask自己启动时,使用的就是这个静态实例。

知识点复习14.1 ForkJoinPool

知识点复习14.1 ForkJoinPool

知识点复习14.1 ForkJoinPool

ForkJoinPool核心是work-stealing算法,翻译过来叫"工作窃取"算法,有点别扭,还是叫work-stealing吧。

ForkJoinPool里有三个重要的角色:

  • ForkJoinWorkerThread(下文简称worker):包装Thread;
  • WorkQueue:任务队列,双向;
  • ForkJoinTask:worker执行的对象,实现了Future。两种类型,一种叫submission,另一种就叫task。

ForkJoinPool使用数组保存所有WorkQueue(下文经常出现的WorkQueue[]),每个worker有属于自己的WorkQueue,但不是每个WorkQueue都有对应的worker。

  • 没有worker的WorkQueue:保存的是submission,来自外部提交,在WorkQueue[]的下标是偶数;
  • 属于worker的WorkQueue:保存的是task,在WorkQueue[]的下标是奇数。

WorkQueue是一个双端队列,同时支持LIFO(last-in-first-out)的push和pop操作,和FIFO(first-in-first-out)的poll操作,分别操作top端和base端。worker操作自己的WorkQueue是LIFO操作(可选FIFO),除此之外,worker会尝试steal其他WorkQueue里的任务,这个时候执行的是FIFO操作。

分开两端取任务的好处:

  • LIFO操作只有对应的worker才能执行,push和pop不需要考虑并发;
  • 拆分时,越大的任务越在WorkQueue的base端,尽早分解,能够尽快进入计算。

光看概念一知半解,我们进入ForkJoinPool的代码。本文首先从构造函数和类开始了解ForkJoinPool的基本参数,下篇再详细过一遍流程。

任务ForkJoinTask

ForkJoinPool执行任务的对象是ForkJoinTask,它是一个抽象类,有两个具体实现类RecursiveAction和RecursiveTask。

ForkJoinTask的抽象方法exec由RecursiveAction和RecursiveTask实现,它被定义为final,具体的执行步骤compute延迟到子类实现。很容易看出RecursiveAction和RecursiveTask的区别,前者没有result,getRawResult返回空,它们对应不需要返回结果和需要返回结果两种场景。

ForkJoinTask里很重要的字段是它的状态status,默认是0,当得出结果时变更为负数,有三种结果:

  • NORMAL
  • CANCELLED
  • EXCEPTIONAL

除此之外,在得出结果之前,任务状态能够被设置为SIGNAL,表示有线程等待这个任务的结果,执行完成后需要notify通知,具体看后文的join。

ForkJoinTask在触发执行后,并不支持其他什么特别操作,只能等待任务执行完成。CountedCompleter是ForkJoinTask的子类,它在子任务协作方面扩展了更多操作。我们聚焦ForkJoinPool主线流程,CountedCompleter相关内容另文再介绍。

WorkQueue

WorkQueue是一个双端队列,它定义在ForkJoinPool类里。

scanState描述WorkQueue当前状态:

  • 偶数表示RUNNING
  • 奇数表示SCANNING
  • 负数表示inactive

stackPred是WorkQueue组成TreiberStack时,保存前者的字段。

base和top分别指向WorkQueue的两端,小小区别是base带上了volatile,回答了对top端push和pop不需要考虑并发这个优点。

操作WorkQueue前需要锁定,记录在字段qlock:

  • 1:锁定;
  • 0:未锁定;
  • 负数:对应的worker已经撤销注册,WorkQueue也就终止使用。

WorkQueue也有config,不要和ForkJoinPool的config混淆了。WorkQueue的config记录了在WorkQueue[]的下标和当前mode。

ForkJoinPool状态修改

  • STARTED
  • STOP
  • TERMINATED
  • SHUTDOWN
  • RSLOCK
  • RSIGNAL

runState记录了ForkJoinPool的运行状态,除了SHUTDOWN是负数,其他都是正数。前面四种不用说了,线程池标准状态流转。在多线程环境修改runState,不能简单想改就改,需要先获取锁,RSLOCK和RSIGNAL就用在这里。

 

修改前调用lockRunState锁定,检查当前状态,尝试一次使用CAS修改runState为RSLOCK。需要状态变化的机会很少,大多数时间一次就能成功,但不能排除少几率的竞争,这时候进入awaitRunStateLock。

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Java7 ForkJoinPool使用及源码解析

Java7中引入了一种新的并发框架-Fork/Join,Fork/Join采用分治+work-stealing的思想,Fork/Join相教于其他并发框架有其适合的使用场景:如果一个任务能够被分为多个子任务,通过组合这些子任务的结果就能获得最终结果,那么这项任务就适合用Fork/Join模式解决。

一、ForkJoinPool使用示例

如上说述,递归问题比较适合用ForkJoin框架解决,如求Fibonacci数列的第N项值: F(n) = F(n-1) + F(n-2),实现如下:

二、ForkJoinPool解析

  • 2.1 ForkJoinPool构造函数

ForkJoinPool一共有3个共有构造函数,如下

知识点复习14.1 ForkJoinPool

execute方法和submit方法大致逻辑相同,都是先对任务做非空校验再讲任务压入ForkJoinPool中的执行队列,唯一不同的是submit会把任务对象本身返回,返回后可以通过get()方法获取执行结果。和execute方法、submit方法不同,invoke方法将任务压入队列后会执行join()方法,阻塞当前线程。

  • 2.3 ForkJoinWorkerThreadFactory与ForkJoinWorkerThread

ForkJoinPool提供了一个线程创建工厂接口并提供了默认实现,通过该工厂可以创建ForkJoinWorkerThread线程

ForkJoinWorkerThread主要有两个成员变量,一个是线程所归属的ForkJoinPool线程池,另一个是存放ForkJoinTask的任务队列,可见每个ForkJoinWorkerThread线程都维护一个任务队列,该队列提供LIFO和FIFO两种出队方式

  • 2.4 ForkJoinTask.fork()方法

fork()方法判断当前线程是否是ForkJoinWorkerThread的实例,如果是则将任务压入当前线程的队列中;如果不是则将任务压入ForkJoinPool的公用线程池的执行队列中

知识点复习14.1 ForkJoinPool