Curator 框架实现在多台机器中选取leader
Curator对ZK的一些应用场景提供了非常好的实现,而且有很多扩充,这些都符合ZK使用规范
它的主要组件为:
- Recipes, ZooKeeper的系列recipe实现, 基于 Curator Framework.
- Framework, 封装了大量ZooKeeper常用API操作,降低了使用难度, 基于Zookeeper增加了一些新特性,对ZooKeeper链接的管理,对链接丢失自动重新链接。
- Utilities,一些ZooKeeper操作的工具类包括ZK的集群测试工具路径生成等非常有用,在Curator-Client包下org.apache.curator.utils。
- Client,ZooKeeper的客户端API封装,替代官方 ZooKeeper class,解决了一些繁琐低级的处理,提供一些工具类。
- Errors,异常处理, 连接异常等
- Extensions,对curator-recipes的扩展实现,拆分为 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基于RESTful的Recipes WEB服务.
下载jar包
// https://mvnrepository.com/artifact/org.apache.curator/curator-framework
compile group: 'org.apache.curator', name: 'curator-framework', version: '2.11.1'
compile group: 'org.apache.curator', name: 'curator-client', version: '2.11.1'
compile group: 'org.apache.curator', name: 'curator-recipes', version: '2.11.1'
compile group: 'org.apache.curator', name: 'curator-test', version: '2.11.1'
代码:
package org.gjp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatch.CloseMode;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class LeaderDemo3 {
public static void main(String[]args) throws Exception{
final List<LeaderLatch> leaders=new ArrayList<LeaderLatch>();
final List<CuratorFramework> clients=new ArrayList<CuratorFramework>();
final TestingServer server=new TestingServer();
try{
for(int i=0;i<10;i++){
//建立模拟客户端
CuratorFramework client=CuratorFrameworkFactory.newClient("127.0.0.1:2181",
new ExponentialBackoffRetry(20000,3));
clients.add(client);
//Leader选举 注意每个客户端的路径都要相同才会在同一个组中
final LeaderLatch leader=new LeaderLatch(client,"/zkpath2","clent"+i);
leader.addListener(new LeaderLatchListener(){
//获取leader权限时执行
public void isLeader() {
System.out.println(leader.getId()+" ;I am Leader");
//拥有leader权限的长度
try {
final int waitSeconds = (int)(5 * Math.random()) + 1;
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e1) {
e1.printStackTrace();
}
//模拟让出leader 权限
if(leader !=null){
try {
//关闭是通知所有客户端
leader.close(CloseMode.NOTIFY_LEADER);
} catch (IOException e) {
e.printStackTrace();
}
}
}
//是否leader权限时执行
public void notLeader() {
System.out.println(leader.getId()+" I am not Leader");
}});
leaders.add(leader);
//客户端启动
client.start();
try {
//必须启动LeaderLatch: leaderLatch.start(); 一旦启动, LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader
leader.start();
} catch (Exception e) {
e.printStackTrace();
}
}
Thread.sleep(Integer.MAX_VALUE);
}finally{
for(CuratorFramework client:clients){
CloseableUtils.closeQuietly(client);
}
for(LeaderLatch leader:leaders){
CloseableUtils.closeQuietly(leader);
}
CloseableUtils.closeQuietly(server);
}
Thread.sleep(Integer.MAX_VALUE);
}
}
运行结果: