Concurrent - CountDownLatch
原创转载请注明出处:http://agilestyle.iteye.com/blog/2343903
CountDownLatch
CountDownLatch所提供的功能是判断count计数不为0时,则当前线程处于wait状态。
await()的作用是实现等待,判断计数是否为0,如果不为0则呈等待状态。
countDown()的作用是继续运行,其他线程可以调用此方法将计数减1,当计数减到为0时,呈等待的线程继续运行。
getCount()的作用是获得当前的计数个数。
用CountDownLatch模拟一个田径短跑的例子,10个线程代表10名选手
MyThread.java
package org.fool.java.concurrent.countdownlatch;
import java.util.concurrent.CountDownLatch;
public class MyThread implements Runnable {
private CountDownLatch comingTag;
private CountDownLatch waitStartTag;
private CountDownLatch waitRunTag;
private CountDownLatch beginTag;
private CountDownLatch endTag;
public MyThread(CountDownLatch comingTag,
CountDownLatch waitStartTag,
CountDownLatch waitRunTag,
CountDownLatch beginTag,
CountDownLatch endTag) {
this.comingTag = comingTag;
this.waitStartTag = waitStartTag;
this.waitRunTag = waitRunTag;
this.beginTag = beginTag;
this.endTag = endTag;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 走向起跑点...");
Thread.sleep((int) (Math.random() * 10000));
System.out.println(Thread.currentThread().getName() + " 到达起跑点...");
comingTag.countDown();
waitStartTag.await();
Thread.sleep((int) (Math.random() * 10000));
waitRunTag.countDown();
beginTag.await();
System.out.println(Thread.currentThread().getName() + " 加速起跑...");
Thread.sleep((int) (Math.random() * 10000));
endTag.countDown();
System.out.println(Thread.currentThread().getName() + " 到达终点...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CountDownLatchTest4.java
package org.fool.java.concurrent.countdownlatch;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest4 {
public static void main(String[] args) {
try {
CountDownLatch comingTag = new CountDownLatch(10);
CountDownLatch waitStartTag = new CountDownLatch(1);
CountDownLatch waitRunTag = new CountDownLatch(10);
CountDownLatch beginTag = new CountDownLatch(1);
CountDownLatch endTag = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new MyThread(comingTag, waitStartTag, waitRunTag, beginTag, endTag));
thread.setName("Thread " + (i + 1));
thread.start();
}
System.out.println("裁判在等待选手的到来...");
comingTag.await();
System.out.println("裁判确认所有选手就位....");
Thread.sleep(5000);
waitStartTag.countDown();
System.out.println("各就各位!预备!");
waitRunTag.await();
Thread.sleep(2000);
System.out.println("发令枪响起!");
beginTag.countDown();
endTag.await();
System.out.println("所有选手到达终点,统计名次!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Run
await(long timeout, TimeUnit unit)
await(long timeout, TimeUnit unit)的作用是使线程在指定的最大时间单位内进入WAITING状态,如果超过这个时间则自动唤醒,程序继续向下运行。
CountDownLatchTest5.java
package org.fool.java.concurrent.countdownlatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchTest5 {
public static void main(String[] args) {
Service service = new Service();
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(new MyThread(service));
thread.start();
}
}
public static class Service {
private CountDownLatch latch = new CountDownLatch(1);
public void testMethod() {
try {
System.out.println(Thread.currentThread().getName() + " start...");
latch.await(3, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + " end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class MyThread implements Runnable {
private Service service;
public MyThread(Service service) {
this.service = service;
}
@Override
public void run() {
service.testMethod();
}
}
}
Run
getCount()
getCount()作用是获取当前计数的值
CountDownLatchTest6.java
package org.fool.java.concurrent.countdownlatch;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest6 {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(3);
System.out.println(latch.getCount());
latch.countDown();
System.out.println(latch.getCount());
latch.countDown();
System.out.println(latch.getCount());
latch.countDown();
System.out.println(latch.getCount());
latch.countDown();
System.out.println(latch.getCount());
latch.countDown();
System.out.println(latch.getCount());
}
}
Run
CountDownLatch使用例子
在这个例子中,模拟一个应用程序启动类,它开始时启动了n个线程类,这些线程将检查外部系统并通知闭锁,并且启动类一直在闭锁上等待着。一旦验证和检查了所有外部服务,那么启动类恢复执行。
BaseHealthChecker.java
package org.fool.test.countdownlatch;
import java.util.concurrent.CountDownLatch;
public abstract class BaseHealthChecker implements Runnable {
private CountDownLatch latch;
private String serviceName;
private boolean serviceUp;
public BaseHealthChecker(String serviceName, CountDownLatch latch) {
this.serviceName = serviceName;
this.latch = latch;
this.serviceUp = false;
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
System.out.println(getServiceName() + " down...");
}
}));
}
@Override
public void run() {
try {
verifyService();
serviceUp = true;
} catch (Exception e) {
e.printStackTrace();
serviceUp = false;
} finally {
if (latch != null) {
latch.countDown();
}
}
}
public String getServiceName() {
return serviceName;
}
public boolean isServiceUp() {
return serviceUp;
}
public abstract void verifyService();
}
NetworkHealthChecker.java
package org.fool.test.countdownlatch;
import java.util.concurrent.CountDownLatch;
public class NetworkHealthChecker extends BaseHealthChecker {
public NetworkHealthChecker(CountDownLatch latch) {
super("Network Service", latch);
}
@Override
public void verifyService() {
System.out.println("Checking " + this.getServiceName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
}
DatabaseHealthChecker.java
package org.fool.test.countdownlatch;
import java.util.concurrent.CountDownLatch;
public class DatabaseHealthChecker extends BaseHealthChecker {
public DatabaseHealthChecker(CountDownLatch latch) {
super("Database Service", latch);
}
@Override
public void verifyService() {
System.out.println("Checking " + this.getServiceName());
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
}
CacheHealthChecker.java
package org.fool.test.countdownlatch;
import java.util.concurrent.CountDownLatch;
public class CacheHealthChecker extends BaseHealthChecker {
public CacheHealthChecker(CountDownLatch latch) {
super("Cache Service", latch);
}
@Override
public void verifyService() {
System.out.println("Checking " + this.getServiceName());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
}
ApplicationStartupUtil.java
package org.fool.test.countdownlatch;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ApplicationStartupUtil {
private static List<BaseHealthChecker> services;
private static CountDownLatch latch;
private static ApplicationStartupUtil instance = new ApplicationStartupUtil();
public static ApplicationStartupUtil getInstance() {
return instance;
}
private ApplicationStartupUtil() {
}
public boolean checkExternalServices() throws Exception {
latch = new CountDownLatch(3);
services = new ArrayList<>();
services.add(new NetworkHealthChecker(latch));
services.add(new CacheHealthChecker(latch));
services.add(new DatabaseHealthChecker(latch));
ExecutorService executor = Executors.newFixedThreadPool(services.size());
for (BaseHealthChecker service : services) {
executor.execute(service);
}
latch.await();
for (BaseHealthChecker service : services) {
if(!service.isServiceUp()) {
return false;
}
}
return true;
}
}
MainTest.java
package org.fool.test.countdownlatch;
public class MainTest {
public static void main(String[] args) throws Exception {
boolean result = ApplicationStartupUtil.getInstance().checkExternalServices();
System.out.println("External services validation completed !! Result was :: "+ result);
}
}
Console Output
Reference
Java并发编程核心方法与框架
http://www.importnew.com/15731.html