Curator 框架实现在多台机器中选取leader

Curator对ZK的一些应用场景提供了非常好的实现,而且有很多扩充,这些都符合ZK使用规范

 它的主要组件为:

  • Recipes, ZooKeeper的系列recipe实现, 基于 Curator Framework.
  • Framework, 封装了大量ZooKeeper常用API操作,降低了使用难度, 基于Zookeeper增加了一些新特性,对ZooKeeper链接的管理,对链接丢失自动重新链接。
  • Utilities,一些ZooKeeper操作的工具类包括ZK的集群测试工具路径生成等非常有用,在Curator-Client包下org.apache.curator.utils。
  • Client,ZooKeeper的客户端API封装,替代官方 ZooKeeper class,解决了一些繁琐低级的处理,提供一些工具类。
  • Errors,异常处理, 连接异常等
  • Extensions,对curator-recipes的扩展实现,拆分为 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基于RESTful的Recipes WEB服务.

    下载jar包

// https://mvnrepository.com/artifact/org.apache.curator/curator-framework

compile group: 'org.apache.curator', name: 'curator-framework', version: '2.11.1'

compile group: 'org.apache.curator', name: 'curator-client', version: '2.11.1'

compile group: 'org.apache.curator', name: 'curator-recipes', version: '2.11.1'

compile group: 'org.apache.curator', name: 'curator-test', version: '2.11.1'

 

代码:

 

package org.gjp;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.TimeUnit;

 

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.leader.LeaderLatch;

import org.apache.curator.framework.recipes.leader.LeaderLatch.CloseMode;

import org.apache.curator.framework.recipes.leader.LeaderLatchListener;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.curator.test.TestingServer;

import org.apache.curator.utils.CloseableUtils;

 

public class LeaderDemo3 {

    public static void main(String[]args) throws Exception{

       final List<LeaderLatch> leaders=new ArrayList<LeaderLatch>();

       final List<CuratorFramework> clients=new ArrayList<CuratorFramework>();

        

       final TestingServer server=new TestingServer();

               

        try{

            for(int i=0;i<10;i++){

            //建立模拟客户端

CuratorFramework client=CuratorFrameworkFactory.newClient("127.0.0.1:2181",

             new ExponentialBackoffRetry(20000,3));

             clients.add(client);

           //Leader选举   注意每个客户端的路径都要相同才会在同一个组中  

           final  LeaderLatch leader=new LeaderLatch(client,"/zkpath2","clent"+i);

             leader.addListener(new LeaderLatchListener(){

           //获取leader权限时执行  

               public void isLeader() {

                   System.out.println(leader.getId()+" ;I am Leader");

                   

                   //拥有leader权限的长度

                   try {

                   final int  waitSeconds = (int)(5 * Math.random()) + 1;

Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));

} catch (InterruptedException e1) {

e1.printStackTrace();

}

                   

                   

                   //模拟让出leader 权限

                   if(leader !=null){

                   try {

                   //关闭是通知所有客户端

leader.close(CloseMode.NOTIFY_LEADER);

} catch (IOException e) {

e.printStackTrace();

}

                   }

               }

 

             //是否leader权限时执行  

               public void notLeader() {

                   System.out.println(leader.getId()+"   I am not Leader");

               }});

             

             

             

             

             

             leaders.add(leader);

             //客户端启动

             client.start();

             try {

           //必须启动LeaderLatch: leaderLatch.start(); 一旦启动, LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader

leader.start();

} catch (Exception e) {

e.printStackTrace();

}

 

            }

            

            Thread.sleep(Integer.MAX_VALUE);

        }finally{

            for(CuratorFramework client:clients){

              CloseableUtils.closeQuietly(client);    

            }

            

            for(LeaderLatch leader:leaders){

                CloseableUtils.closeQuietly(leader);

            }

            

            CloseableUtils.closeQuietly(server);

        }

        

        

        Thread.sleep(Integer.MAX_VALUE);

    }

 

}

 

运行结果:


Curator 框架实现在多台机器中选取leader