在共同点同步任务
在共同点同步任务
CyclicBarrier类,是Java并发API提供的一种允许在确定点的多个线程同步化的同步功能。此类与本章中“等待多并发任务”小节中学习的CountDownLatch类相似,但是表现出一些不同令其功能更强。
初始化CyclicBarrier类,包含一个整型数字,表示在确定点同步的线程数量。当其中一个线程到达确定点时,调用await()方法等待其它线程。当线程调用此方法时,CyclicBarrier类通过休眠阻断此线程直到其它线程到达。当最后一个线程调用CyclicBarrier对象的await()方法时,它唤醒所有等待的线程,继续执行任务。
CyclicBarrier类的一个有意思的优点是可以将其它Runnable对象作为初始化参数传递,当所有线程到达确定点时,CyclicBarrier类会此对象当作线程来执行。这种特性让此类能够充分的使用分治编程技术实现任务并行化。
在本节中,学习使用CyclicBarrier类在确定点同步一组线程,以及一个Runnable对象在所有线程到达确定点之后执行。本范例中,将在数字矩阵中检索一个数字,矩阵将被分成多个子集(使用分治技术),这样每个线程将会在一个子集中检索数字。一旦所有线程完成各自的任务,最终任务将统一它们的结果。
准备工作
本范例通过Eclipse开发工具实现。如果使用诸如NetBeans的开发工具,打开并创建一个新的Java项目。
实现过程
通过如下步骤完成范例:
-
实现本范例中的两个辅助类。首先创建名为MatrixMock的类,将会生成一个1到10的随机数字矩阵,线程将在此矩阵中检索数字:
public class MatrixMock {
-
定义名为data私有整型矩阵:
private final int data[][];
-
实现类构造函数,接收参数分别是矩阵的行数、列数,以及将要检索的数字。三个参数均为整型:
public MatrixMock(int size, int length, int number){
-
初始化构造函数中的变量和对象:
int counter = 0; data = new int[size][length]; Random random = new Random();
-
使用随机数填充矩阵。每次生成一个数字,和将要检索的数字比对,如果相同,增加计数器值:
for(int i = 0 ; i < size ; i ++){ for (int j = 0 ; j < length ; j ++){ data[i][j] = random.nextInt(10); if(data[i][j] == number){ counter ++ ; } } }
-
最后,输出在生成的矩阵中检索数字出现的次数到控制台中,这条信息用来检查线程是否得到正确的结果:
System.out.printf("Mock : There are %d ocurrences of number in generated data.\n", counter, number);
-
实现getRow()方法,此方法接收矩阵的行数为整型参数,如果参数存在则返回行数,否则返回null:
public int[] getRow(int row){ if((row >= 0) && (row < data.length)) { return data[row]; } return null; }
-
现在实现名为Results的类,将在数组中存储矩阵的每一行里,检索数字出现的次数:
public class Results {
-
定义名为data的整型数组:
private final int data[];
-
实现类构造函数,接收数组元素的数量为整型参数:
public Results(int size){ data = new int[size]; }
-
实现setData()方法,接收在数组中的位置和一个值为参数,将值赋给数组中对应的位置元素:
public void setData(int position , int value){ data[position] = value ; }
-
实现getData()方法,返回结果数组:
public int[] getData(){ return data; }
-
辅助类已经完成,开始实现线程。首先,首先Searcher类,用来在指定行数的随机数字矩阵中检索一个数字。实现名为Searcher的类,并实现其Runnable接口:
public class Searcher implements Runnable{
-
定义两个名为firstRow和lastRow的整型属性。这两个属性确定Search对象检索数字的行子集:
private final int firstRow; private final int lastRow;
-
定义名为mock的私有MatrixMock属性:
private final MatrixMock mock;
-
定义名为results的私有Results属性:
private final Results results;
-
定义名为number的私有整型属性,用来存储检索的数字:
private final int number;
-
定义名为barrier的CyclicBarrier对象:
private final CyclicBarrier barrier;
-
实现类构造函数,初始化前面定义的所有属性:
public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier) { this.firstRow = firstRow; this.lastRow = lastRow; this.mock = mock; this.results = results; this.number = number; this.barrier = barrier; }
-
实现run()方法用来检索数字,使用一个名为counter的内置计数器记录检索数字在每行中出现的次数:
@Override public void run() { int counter;
-
输出此任务分配的行数到控制台:
System.out.printf("%s : Processing lines from %d to %d.\n", Thread.currentThread().getName(), firstRow, lastRow);
-
处理所有指定的行到线程里,在每行中,记录检索数据出现的次数并存储到Results对象中对应的位置元素中:
for ( int i = firstRow; i < lastRow ; i ++) { int row[] = mock.getRow(i); counter = 0; for( int j = 0 ; j < row.length ; j ++) { if (row[j] == number) { counter ++; } } results.setData(i, counter); }
-
输出表明对象已经完成检索的信息到控制台:
System.out.printf("%s : Lines processed.\n", Thread.currentThread().getName());
-
调用CyclicBarrier对象的await()方法,并且增加必要的代码来处理此方法抛出的InterruptedException和BrokenBarrierException异常:
try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }
-
现在创建名为Grouper的类并实现Runnable接口,用来计算检索数字在矩阵中出现的总次数,此类使用存储矩阵的每一行检索数字出现的次数的Results对象来进行计算:
public class Grouper implements Runnable{
-
定义名为results的私有Results属性:
private final Results results;
-
实现类构造函数,初始化Results属性:
public Grouper(Results results){ this.results = results; }
-
实现run()方法,计算数组中检索数据出现的总次数:
@Override public void run() {
-
定义整型变量,输出表明运行结果开始的信息到控制台:
int finalResult = 0; System.out.printf("Grouper : Processing results...\n");
-
使用results对象的getData()方法得到每行检索数字的出现次数。然后,处理队列的所有元素,相加赋值到finalResult变量:
int data[] =results.getData(); for(int number:data){ finalResult += number; }
-
输出结果到控制台:
System.out.printf("Grouper : Total result : %d.\n", finalResult);
-
最后,实现范例的主类,创建一个包含main()方法的Main类:
public class Main { public static void main(String[] args) {
-
定义和初始化五个常量用来存储范例的参数:
final int ROWS = 10000; final int NUMBERS = 1000; final int SEARCH = 5; final int PARTICIPANTS = 5; final int LINES_PARTICIPANT = 2000;
-
创建名为mock的MatrixMock对象,10000行、1000列,检索数字是5:
MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH);
-
创建名为results的Results对象,包含10000个元素:
Results results = new Results(ROWS);
-
创建名为grouper的Grouper对象:
Grouper grouper = new Grouper(results);
-
创建名为barrier的CyclicBarrier对象,将等待五个线程。当这五个线程运行结束时,将运行刚创建的Grouper对象:
CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);
-
创建五个Searcher对象和执行对象的线程,然后启动线程:
Searcher searchers[] = new Searcher[PARTICIPANTS]; for ( int i = 0 ; i < PARTICIPANTS ; i ++){ searchers[i] = new Searcher(i * LINES_PARTICIPANT, (i * LINES_PARTICIPANT) + LINES_PARTICIPANT, mock, results, SEARCH, barrier); Thread thread = new Thread(searchers[i]); thread.start(); } System.out.printf("Main : The main thread has finished.\n");
工作原理
下图显示本范例在控制台输出的执行信息:
范例中解决的问题很简单,有一个随机整数的矩阵并且想要直到矩阵中一个数字的出现总次数。通过使用分治技术获得更好的性能,将矩阵分成五个子集,在每个子集中使用一个线程进行检索,这些线程是Searcher类的对象。
使用CyclicBarrier对象同步五个线程完成,执行Grouper任务来处理部分结果并计算最终结果。
如前所述,CyclicBarrier类包含一个内置计数器用来控制需要到达同步点的线程数。每次一个线程到达同步点时,此类调用await()方法通知CyclicBarrier对象一个线程已经到达同步点。CyclicBarrier让此线程休眠,直到所有线程到达同步点。
当所有线程到达时,CyclicBarrier对象唤醒所有一直在await()方法中等待的线程。同时也可以创建新的线程,通过执行在CyclicBarrier的构造函数(本范例中,是Grouper对象)中传参的Runnable对象,来进行额外任务。
扩展学习
CyclicBarrier类中await()方法有另一种形式:
- await(long time, TimeUnit unit):在这个方法中,线程将持续休眠直到它被中断, 也就是说,或者CountDownLatch的内置计数器值变成0,或者已过指定的时间。TimeUnit是一个枚举类型的类,包含如下常量:DAYS、HOURS、MICROSECONDS、MILLISECONDS、MINUTES、NANOSECONDS、和SECONDS。
CyclicBarrier类还提供getNumberWaiting()方法返回在await()方法中被阻塞的线程数。以及getParties()方法返回即将通过CyclicBarrier进行同步的任务数。
重置CyclicBarrier对象
CyclicBarrier类与CountDownLatch类有些共同点,但也有不同指出。其中最重要的一个不同点是CyclicBarrier对象能够被重置到初始状态,内置计数器值重新设定为初始值。
通过使用CyclicBarrier类中的reset()方法实现重置操作。当重置发生时,所有在await()方法中等待的线程接收到BrokenBarrierException异常。在本节的范例中此异常处理成输出堆栈异常,然而在复杂的应用中能够完成其它操作,例如在中断点处重启执行或者恢复操作。
损坏的CyclicBarrier对象
CyclicBarrier对象有一种特殊的状态,称为损坏状态。在await()方法中等待着的各种线程,当其中一个被中断时,这个被中断的线程接收到InterruptedException异常,而其它线程会接收到BrokenBarrierException异常,此时CyclicBarrier对象处于损坏状态。
CyclicBarrier类提供isBroken()方法,如果对象处于损坏状态返回true,否则返回false。
更多关注
- 本章“等待多并发事件”小节。