zookeeper 之 ZKClient 动态节点上下线教程
简单说一下: zookeeper是分布式协调系统。主要功能是:创建节点,删除节点,修改节点数据, 监听
节点分为4类:(封装在enum CreateMode 中)
PERSISTENT; 持久 (调用Delete 才会删除)
EPHEMERAL; 临时 (断开连接就会自动被删除)
PERSISTENT_SEQUENTIAL; (创建一个带序列的持久的节点,调用Delete 才会删除)
EPHEMERAL_SEQUENTIAL ; (创建一个临时的带序列的节点,断开连接就会自动被删除)
举例:原始API: String createNode = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL ); // createNode 的节点名称为: lock_0000001
zookeeper中主要就是监听: 分三种:(watch: 为监听器里面有一process 方法作为监听回调处理,通常我们写匿名内部类,或者写一个普通类实现Watcher接口 重写process方法)
监听节点的数据变化: keeper.getData(path, watch, stat) // stat 通常设置为null
监听节点被删除 : keeper.exists(minNode, watch)
监听节点下的子节点的创建和删除: keeper.getChildren(parentNode, watch);
zookeeper创建一个节点:常用API: keeper.create(path, data, acl, createMode)
path: 节点的路径(也可以叫节点的名字--最后一个路径名)
data: 节点的数据信息
acl : 访问权限,通常我们设置为 Ids.OPEN_ACL_UNSAFE,为随便访问。
createMode: 节点的类型,上面所述的4类: 持久,临时,持久有序,临时有序。 封装在枚举类:CreateMode中
以上为zookeeper中的原生常用API。
第三方API 本人喜欢:ZkClient maven:
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
这里注意事项: 本人在加载maven依赖的时候,刚开始发现maven加载不了zkclient-0.1.jar包,解决方法:
找到你的本地下载好的maven路径\com\github\sgroschupf\zkclient\0.1 下,删除其他文件,就保留zkclient-0.1.jar 和zkclient-0.1.pom 然后再重新让eclipse 加载maven依赖。
ZkClient 常用的节点API:
创建节点: zkClient.create(path, data, mode)
path: 节点路径名
data: 节点数据
mode: 节点类型,还是那4类
删除节点: zkClient.delete(path)
path: 节点路径名
更新节点: updateDataSerialized(String path, DataUpdater updater)
path: 节点路径名
updater : 为一个接口,我们需要写一个普通类,或者匿名内部类 实现该接口然后 重写update(Object oldData); 参数oldData 为 节点的老数据,在此方法中你只要
返回你自己想更新的数据即可。
ZkClient 常用监听API:
监听节点下的子节点的创建和删除 : zk.subscribeChildChanges(path, listener)
listener:为 IZkDataListener 类型接口,实现一个方法
handleChildChange来 监听处理
监听节点的数据变化和监听节点被删除 : zk.subscribeDataChanges(path, listener)
listener : IZkDataListener 类型接口,里面有两个抽象的方法:
handleDataChange : 监听节点数据改变进行处理
handleDataDeleted :监听节点数据被删除进行处理
使用案例:监听服务器节点的上下线
ServiceNodeUpAndDown 监听服务器上下线
package com.jvm.others.zookeeper.zkclient;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.WatchedEvent;
/**
* 服务器节点动态上下限
* @author pc
*
*/
public class ServiceNodeUpAndDown {
static ZkClient zk = new ZkClient("192.168.17.7:2181");
static String parentNode = "/ElecLock";
static int first = 1 ;
static HashMap<String, String> nodes = new HashMap<>(); //存放注册的节点名称
/**
* 监听服务器的 动态上下线通知,并且通过邮件告知信息
*/
public void listenerForServerNodes(){
List<String> children = zk.getChildren(parentNode); //返回子节点的名称
if(children==null || children.size() ==0){ //如果子parentNode 下面没有 服务器节点注册,则发送邮件通知没有
isDown(children);
System.out.println("邮件已经发送,当前没有任何服务器注册");
//在parentNode的节点写没有接的情况,应该清空节点名称历史。防止新加入的节点
}else{
//判断新加入的服务器几点,并发送邮件通知
for(String node : children){
String nodePath = parentNode+"/"+node; //拿到node在zookeeper中的路径
String nodeName = nodes.get(node);
if(nodeName==null){
//发送邮件 通知新加入的节点为 nodeName
if(first==1){
System.out.println("发送邮件 通知已存在的节点为: "+node);
nodes.put(node, zk.readData(nodePath).toString());
}else{
System.out.println("发送邮件 通知新加入的节点为: "+node);
nodes.put(node, zk.readData(nodePath).toString());
}
}else{//重新刷新 nodes 中的几点,保证nodes 中存放的都是当前正在运行并且稳定的节点
nodes.put(node, zk.readData(nodePath).toString());
}
}
first+=1;
isDown(children);
}
}
/**
* 判断宕机的节点
* @param children
*/
private void isDown(List<String> children) {
//判断哪些节点宕机
if(nodes.size()!=0){
Set<Entry<String,String>> entrySet = nodes.entrySet();
HashMap<String,String> downNodes = new HashMap<String,String>(); //存放宕机的节点
// 历史节点与当前存在的节点相比较,判断出哪个节点宕机了,
for(Entry<String,String> entry : entrySet){
String key = entry.getKey();
boolean is_exists = false; //表示key 是否存在默认不存在,如果存在则为true
for(String nodeName : children){
if(nodeName.equalsIgnoreCase(key)){
is_exists = true;
}
}
if(!is_exists){
downNodes.put(key,key);
}
}
//准备发送邮件通知 宕机的节点
System.out.println(downNodes.size()+"===========================");
if(downNodes.size()!=0){
for(Entry<String, String> node : downNodes.entrySet() ){
System.out.println("发送邮件通知宕机节点为 : " + node.getKey());
nodes.remove(node.getKey()); //移除宕机的节点
}
downNodes.clear();
}
}
}
private void serverNodeListener() {
zk.subscribeChildChanges(parentNode, new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> list) throws Exception {
listenerForServerNodes();
}
});
listenerForServerNodes();
}
public static void main(String[] args) throws Exception {
ServiceNodeUpAndDown sud = new ServiceNodeUpAndDown();
sud.serverNodeListener();
synchronized (sud) {
sud.wait(); //wai 等待 让出竞争锁
}
}
}
ZKclientDemo 用于向zookeeper注册服务器上线,可以多次运行该类,模拟多个服务器上线。
package com.jvm.others.zookeeper.zkclient;
import java.util.Arrays;
import java.util.List;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.poi.util.ArrayUtil;
import org.apache.zookeeper.CreateMode;
import com.sun.tools.javac.code.Attribute.Array;
/**
* 练习 ZkClient API
* zookeeper 共享锁,任然存在ShareLock的问题
* 目前本人想到的解决思路: 当线程等待的时间达到我们指定的等待时间后,重新callBackOption 方法,再次判断create是否为最小的节点
* 但是监听器就不起作用了,所以,zookeeper 做共享锁还是有缺陷的,
* 最好用zookeeper做集群节点管理
* @author pc
*
*/
public class ZKclientDemo {
static String parentNode = "/ElecLock";
static boolean finish = false;
public void tes() throws Exception {
ZkClient zkClient = new ZkClient("192.168.17.7:2181");
String create = zkClient.create(parentNode+"/lock_", "this is lock",CreateMode.EPHEMERAL_SEQUENTIAL);
}
public static void main(String[] args) throws Exception {
ZKclientDemo zKclientDemo = new ZKclientDemo();
zKclientDemo.tes();
synchronized (parentNode) {
parentNode.wait();
}
}
}
在启动后你可以查看控制台,
然后在结束某一个进程模拟服务器下线,然后在看控制台。
zkClient: 源码解剖: zkClient 是封装了zookeeper,并且把注册事件监听变得简单,zookeeper原来的监听需要重复监听,即每一次
执行完事件后,就会从监听列表中移除注册的监听器,如果还想继续监听就需要重复的注册监听器,而zkClient
API则只需要注册一次监听器就行,那是因为 zkclient API 已经做好了重复的注册监听逻辑:源码为:
ZkClient 类实现了 Watcher,也就是说 zkClient 实际是一个 watcher并且实现了 process方法,并且在 zkClient
中存在 属性:
protected IZkConnection _connection; // IZkConnection 封装了Zookeeper,并提供了 create,delete readData writeData
private final Map _childListener; //子节点监听器,注册一次就好。
private final ConcurrentHashMap _dataListener; //数据监听器 注册一次就好
private final Set _stateListener;
private org.apache.zookeeper.Watcher.Event.KeeperState _currentState;
private final ZkLock _zkEventLock;
private boolean _shutdownTriggered;
private ZkEventThread _eventThread;
private Thread _zookeeperEventThread;
private ZkSerializer _zkSerializer;
zkClient 的设计思想是,每一次注册监听器,该监听器就是zkClient 对象本身,当监听器触发时会回调zkClient的process方法
如下:
public void process(WatchedEvent event){
processDataOrChildChange(event); //关键代码
}
private void processDataOrChildChange(WatchedEvent event)
{
String path = event.getPath();
if(event.getType() == org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged || event.getType() == org.apache.zookeeper.Watcher.Event.EventType.NodeCreated || event.getType() == org.apache.zookeeper.Watcher.Event.EventType.NodeDeleted) // 判断事件的类型是否为: NodeChildrenChanged NodeCreated NodeDeleted
{
Set childListeners = (Set)_childListener.get(path); // 从子监听器容器中根据path获取指定的监听
if(childListeners != null && !childListeners.isEmpty())
fireChildChangedEvents(path, childListeners); // 触发监听器的执行
}
if(event.getType() == org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged || event.getType() == org.apache.zookeeper.Watcher.Event.EventType.NodeDeleted || event.getType() == org.apache.zookeeper.Watcher.Event.EventType.NodeCreated) // 判断事件的类型是否为: NodeDataChanged NodeDeleted NodeCreated
{
Set listeners = (Set)_dataListener.get(path); // 从数据监听器容器中根据path获取指定的监听
if(listeners != null && !listeners.isEmpty())
fireDataChangedEvents(event.getPath(), listeners); // 触发监听器的执行
}
}
呵呵 累了 先写到这吧!