ZooKeeper实战之Curator客户端实现master群首选举
声明一:此博客不再细说ZooKeeper相关基础知识,直奔主题。
声明二:此博客为学习笔记,学习自极客学院ZooKeeper相关视频;本文内容是本人照着视频里的前辈所讲知识敲了
一遍的记录,个别地方按照本人理解稍作修改。非常感谢众多大牛们的知识分享。
软硬件环境:Windows10、IntelliJ IDEA、SpringBoot、Curator客户端
准备工作:引入curator依赖
<!--
zookeeper相关依赖(注:该依赖本身又依赖有zookeeper包,所以这里引入这个依赖就够了)
具体信息为:
curator-recipes 依赖于 curator-framework
curator-framework 依赖于 curator-client
curator-client 依赖于 zookeeper
-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
群首选举的实现示例
相关类说明:
LeaderSelectorTest:群首选举测试类。
WorkServer:集群中的每个服务器(含相关抢占群首的逻辑)。
给出个各类的细节:
WorkServer:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import java.io.Closeable;
/**
* 集群中的服务器
*
* 注:继承LeaderSelectorListenerAdapter类的目的是:
* 重写takeLeadership方法,需要时还可重写stateChanged方法
* 注:实现Closeable的目的是:
* 更优雅地关闭释放资源
* @author JustryDeng
* @date 2018/11/28 9:33
*/
public class WorkServer extends LeaderSelectorListenerAdapter implements Closeable {
/** 服务器的基本属性,主要用来区分是不同的服务器 */
private final String serverName;
/** curator提供的监听器 */
private final LeaderSelector leaderSelector;
/** takeLeadership方法中设置线程阻塞多长时间,单位ms */
private final int SLEEP_MILLISECOND = 100000;
public WorkServer(CuratorFramework client, String path, String serverName) {
this.serverName = serverName;
// 传入客户端、监听路径、监听器(这里采用本实例this作为监听器)
leaderSelector = new LeaderSelector(client, path, this);
/*
* By default, when {@link LeaderSelectorListener#takeLeadership(CuratorFramework)} returns,
* this instance is not requeued. Calling this method puts the leader selector into a mode
* where it will always requeue itself.
* 即:当WorkServer从takeLeadership()退出时它就会放弃了Leader身份,
* 这时Curator会利用Zookeeper再从剩余的WorkServer中选出一个新的Leader。
* autoRequeue()方法使放弃Leader身份的WorkServer有机会重新获得Leader身份,
* 如果不设置的话放弃了的WorkServer是不会再变成Leader的。
*/
leaderSelector.autoRequeue();
}
/**
* LeaderSelector提供好了的 竞争群首的方法,直接调用即可
*
* @author JustryDeng
* @date 2018/11/28 12:27
*/
public void start() {
leaderSelector.start();
System.out.println("此时" + getServerName() + "开始运行了!");
// TODO 可在开始后do something
}
/**
* Shutdown this selector and remove yourself from the leadership group
* 关闭此Server,并从竞争群首组里面移除此Server成员
*
* @author JustryDeng
* @date 2018/11/28 11:37
*/
@Override
public void close() {
leaderSelector.close();
System.out.println("此时" + getServerName() + "释放资源了!");
// TODO 可在关闭后do something
}
/**
* 当 当前服务器被选中作为群首Leader时,会调用执行此方法!
*
* 注:由于此方法结束后,对应的workerServer就会放弃leader,所以我们不能让此方法马上结束
*
* @param client
* curator客户端
* @date 2018/11/28 11:36
*/
@Override
public void takeLeadership(CuratorFramework client) {
try {
System.out.println("此时" + getServerName() + "是群首!执行到takeLeadership()方法了!");
Thread.sleep(SLEEP_MILLISECOND);
// TODO 可do something
} catch ( InterruptedException e ) {
// 当此方法未运行完就调用了close()方法时,就会触发此异常
// 记录一下InterruptedException的发生
System.err.println(getServerName() + " was interrupted!");
Thread.currentThread().interrupt();
} finally {
// TODO 可do something
}
}
public String getServerName() {
return serverName;
}
public LeaderSelector getLeaderSelector() {
return leaderSelector;
}
}
LeaderSelectorTest:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.CloseableUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class LeaderSelectorTest {
private static final int SIZE = 10;
private static final String PATH = "/master";
private static final String IP_PORT = "10.8.109.60:2181";
private static final int SESSION_TIMEOUT = 10000;
private static final int CONNECT_TIMEOUT = 10000;
/** 重试策略:重试间隔时间为1000ms; 最多重试3次; */
private static RetryPolicy retryPolicy = new RetryNTimes(3, 1000);
public static void main(String[] args) throws InterruptedException {
List<CuratorFramework> clients = new ArrayList<>(16);
List<WorkServer> workServers = new ArrayList<>(16);
String name = "worker-server-";
try {
for (int i = 0; i < SIZE; i++) {
CuratorFramework client = CuratorFrameworkFactory.newClient(IP_PORT, SESSION_TIMEOUT, CONNECT_TIMEOUT, retryPolicy);
clients.add(client);
WorkServer workServer = new WorkServer(client, PATH, name + i);
workServers.add(workServer);
// 启动客户端
client.start();
// 开始WorkServer
workServer.start();
}
} finally {
System.out.println("开始关闭!");
WorkServer workServer;
for (int i = 0; i < workServers.size(); i++) {
workServer = workServers.get(i);
CloseableUtils.closeQuietly(workServer);
// 为方便观察,这里阻塞几秒
TimeUnit.SECONDS.sleep(3);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}
启动对应的ZooKeeper服务器,开放端口(或关闭防火墙),然后运行上述主函数,控制台输出(只截取了部分):
注:相比起ZkClient而言,Curator为我们封装提供了一些现成可用的类,如:LeaderSelector等,这使我们不需要做太
多的工作。
再次声明:此博客为学习笔记,学习自极客学院ZooKeeper相关视频;本文内容是本人照着视频里的前辈所讲知识敲了
一遍的记录,个别地方按照本人理解稍作修改。非常感谢众多大牛们的知识分享。