ZooKeeper构建配置服务

ZooKeeper构建配置服务

 * 配置服务是分布式应用所需要的基本服务之一,它使集群中的机器可以共享配置信息中那些公共的部分。

 * 简单的说,ZooKeeper可以作为一个具有高可用性的配置存储器,允许分布式应用的参与者检索和更新配置文件。

 * 使用ZooKeeper中的观察机制,可以建立一个活跃的配置服务,使那些感兴趣的客户端能够获得配置信息修改的通知。

在每个znode上存储一个键值对,ActiveKeyValueStore 提供了从zookeeper服务上写和读取键值方法。

public class ActiveKeyValueStore extends ConnectionWatcher{

	private static final Charset CHARSET =Charset.forName("GBk");
	private static final int MAX_RETRIES = 5;
	private static final long RETRY_PERIOD_SECONDS = 60;
	
	public void write(String path, String value) throws Exception{
		int retries = 0;
		while(true){
			try {
				Stat stat = zk.exists(path, false);
				if(stat == null){
					zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}else{
					zk.setData(path, value.getBytes(CHARSET), -1);
				}
			} catch (KeeperException.SessionExpiredException e) {
				throw e;
			}catch(KeeperException e){
				if(retries++ == MAX_RETRIES){
					throw e;
				}
				TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS);
			}
		}
	}
	
	public String read(String path, Watcher watcher) throws Exception{
		byte[] data  = zk.getData(path, watcher, null);
		return new String(data, CHARSET);
	}
}

与zookeeper服务创建连接

public class ConnectionWatcher implements Watcher{

	private static final int SESSION_TIMEOUT = 5000;
	protected ZooKeeper zk;
	private CountDownLatch connectedSignal = new CountDownLatch(1);
	
	public void connect(String hosts) throws Exception{
		//创建zookeeper实例的时候会启动一个线程连接到zookeeper服务。
		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
		connectedSignal.await();
	}

	//当客户端已经与zookeeper建立连接后,Watcher的process方法会被调用。
	public void process(WatchedEvent event) {
		if(event.getState() == KeeperState.SyncConnected){
			connectedSignal.countDown();
		}
	}

	public void close() throws Exception{
		zk.close();
	}
}

ResilientConfigUpdater类提供了管理更新配置信息方法。

public class ResilientConfigUpdater {
	
	public static final String PATH = "/config";
	
	private ActiveKeyValueStore store;
	private Random random = new Random();
	
	public ResilientConfigUpdater(String hosts) throws Exception{
		store = new ActiveKeyValueStore();
		store.connect(hosts);
	}
	
	public void run() throws Exception{
		while(true){
			String value = random.nextInt(100)+"";
			store.write(PATH, value);
			
			System.out.printf("Set %s to %s\n", PATH, value);
			TimeUnit.SECONDS.sleep(random.nextInt(10));
		}
	}
	public static void main(String[] args) throws Exception {
		while(true){
			try{
				ResilientConfigUpdater updater = new ResilientConfigUpdater("192.168.44.231");
				updater.run();
			}catch(KeeperException.SessionExpiredException e){
				//start a new session
			}catch(KeeperException e){
				e.printStackTrace();
				break;
			}
		}
	}
}

ConfigWatcher类提供了配置信息变更观察器,在信息修改后会触发显示方法被调用。

public class ConfigWatcher implements Watcher{

	private ActiveKeyValueStore store;
	
	public ConfigWatcher(String hosts) throws Exception{
		store = new ActiveKeyValueStore();
		store.connect(hosts);
	}
	
	public void displayConfig() throws Exception{
		String value = store.read(ConfigUpdater.PATH, this);
		System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
	}
	
	public void process(WatchedEvent event) {
		// TODO Auto-generated method stub
		if(event.getType() == EventType.NodeDataChanged){
			try {
				displayConfig();
			} catch (Exception e) {
				System.out.println("Interrupted. Exiting.");
				Thread.currentThread().interrupt();
			}
		}
	}
	
	public static void main(String[] args) throws Exception {
		ConfigWatcher watcher = new ConfigWatcher("192.168.44.231");
		watcher.displayConfig();
		
		Thread.sleep(Long.MAX_VALUE);
	}
}