Concurrent - Phaser - arrive()
原创转载请注明出处:http://agilestyle.iteye.com/blog/2344634
arrive()
arrive()作用是使parites值加1,并且不在屏障处等待,直接运行下面的代码,并且Phaser有计数重置功能
PhaserTest8.java
package org.fool.java.concurrent.phaser;
import java.util.concurrent.Phaser;
public class PhaserTest8 {
public static void main(String[] args) {
Phaser phaser = new Phaser(2) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("phase=" + phase + " registeredParties=" + registeredParties);
return super.onAdvance(phase, registeredParties);
}
};
System.out.println("A1 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arrive();
System.out.println("A1 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
System.out.println("A2 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arrive();
System.out.println("A2 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
//--------------------------------------------------------------------------
System.out.println("B1 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arrive();
System.out.println("B1 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
System.out.println("B2 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arrive();
System.out.println("B2 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
//--------------------------------------------------------------------------
System.out.println("C1 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arrive();
System.out.println("C1 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
System.out.println("C2 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arrive();
System.out.println("C2 phaser.getPhase()=" + phaser.getPhase() +
"\tphaser.getRegisteredParties()=" + phaser.getRegisteredParties() +
"\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
}
}
Run
Note:
arrive()功能是使getArrivedParties()计数加1,不等待其他线程到达屏障。
控制台中多次出现phaser.getArrivedParties()=0的运行结果,可以分析出Phaser在经过屏障点后计数被重置。
PhaserTest9.java
package org.fool.java.concurrent.phaser;
import java.util.concurrent.Phaser;
public class PhaserTest9 {
public static class Service {
private Phaser phaser;
public Service(Phaser phaser) {
this.phaser = phaser;
}
public void testMethodA() {
try {
System.out.println(Thread.currentThread().getName() + " begin A1 " + System.currentTimeMillis());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " end A1 " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " begin A2 " + System.currentTimeMillis());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " end A2 " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " begin A3 " + System.currentTimeMillis());
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "\tphaser.getArrivedParties()=" + phaser.getArrivedParties());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " end A3 " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void testMethodB() {
System.out.println(Thread.currentThread().getName() + " begin A1 " + System.currentTimeMillis());
phaser.arrive();
System.out.println(Thread.currentThread().getName() + " end A1 " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " begin A2 " + System.currentTimeMillis());
phaser.arrive();
System.out.println(Thread.currentThread().getName() + " end A2 " + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " begin A3 " + System.currentTimeMillis());
phaser.arrive();
System.out.println(Thread.currentThread().getName() + " end A3 " + System.currentTimeMillis());
}
}
public static class ThreadA implements Runnable {
private Service service;
public ThreadA(Service service) {
this.service = service;
}
@Override
public void run() {
service.testMethodA();
}
}
public static class ThreadB implements Runnable {
private Service service;
public ThreadB(Service service) {
this.service = service;
}
@Override
public void run() {
service.testMethodA();
}
}
public static class ThreadC implements Runnable {
private Service service;
public ThreadC(Service service) {
this.service = service;
}
@Override
public void run() {
service.testMethodB();
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
Service service = new Service(phaser);
Thread t1 = new Thread(new ThreadA(service));
Thread t2 = new Thread(new ThreadB(service));
Thread t3 = new Thread(new ThreadC(service));
t1.start();
t2.start();
t3.start();
}
}
Run
Note:
线程2在parties计数达到3后自动重置为0,线程0和1由于达不到parties为3的情况,所以依然呈等待状态。
Reference
Java并发编程核心方法与框架