Zookeeper 学习-应用样例1-更新datanode执行
1.定制应用。更新数据节点后,自动执行测试操作。
客户端启动注册监控特定节点,当数据发生变化时,按照条件运行。
步骤一:客户端启动注册监控特定节点
启动五个客户端
java -cp Demo1-0.0.1-SNAPSHOT.jar com.zc.demo.zookeeper.jtest.TestClient1
在zookeeper中注册监控/TestAction数据节点变化
步骤二:zookeeper手动修改根数据节点
手动修改/TestAction数据节点值是1,
此值通过在zkcli中手动修改完成。
/oss/apache-zookeeper-3.6.1-bin/bin/zkCli.sh -server localhost:2181,localhost:2182,localhost:2183
set /TestAction 1
步骤三:客户端运行测试
每个客户端在节点数据更改时启动后续,按照进程ID创建一个新的数据节点/TestAction/pid,缺省值为0。运行十次中CURD操作。
步骤四:zookeeper手动修改进程数据节点
手动修改/TestAction/pid数据节点值是0,继续执行10此CRUD操作
此值通过在zkcli中手动修改完成。
/oss/apache-zookeeper-3.6.1-bin/bin/zkCli.sh -server localhost:2181,localhost:2182,localhost:2183
set /TestAction/pid 0
测试代码:
package com.zc.demo.zookeeper.jtest;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooDefs.Ids;
public class TestClient1 implements Watcher, Runnable {
private static String hostPort = "localhost:2181,localhost:2182,localhost:2183";
private static String zooDataPath = "/TestAction";
byte stateZooData[] = null;
byte processZooData[] = null;
ZooKeeper zk;
public TestClient1() {
try {
zk = new ZooKeeper(hostPort, 2000, this);
if (zk != null) {
try {
// Create the znode if it doesn't exist, with the following code:
if (zk.exists(zooDataPath, this) == null) {
zk.create(zooDataPath, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static byte[] intToByteArray(int a) {
return new byte[] { (byte) ((a >> 24) & 0xFF), (byte) ((a >> 16) & 0xFF), (byte) ((a >> 8) & 0xFF),
(byte) (a & 0xFF) };
}
public void runAction() throws InterruptedException, KeeperException {
stateZooData = zk.getData(zooDataPath, this, null);
String strStateData = new String(stateZooData);
// The following code prints the current content of the znode to the console:
System.out.printf("\nCurrent Data @ ZK Path %s: %s", zooDataPath, strStateData);
String name = ManagementFactory.getRuntimeMXBean().getName();
int index = name.indexOf('@');
Long pid = Long.parseLong(name.substring(0, index));
String processId = pid.toString();
if (strStateData.contentEquals("1")) {
System.out.printf("\nCurrent Data @ ZK Path %s: %s: %s", zooDataPath, strStateData, "Running");
String processDataPath = zooDataPath + "/" + pid;
try {
if (zk.exists(processDataPath, this) == null) {
zk.create(processDataPath, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
processZooData = zk.getData(processDataPath, this, null);
String strProcessData = new String(processZooData);
if (strProcessData.contentEquals("0")) {
System.out.printf("\nCurrent Data @ ZK Path %s: %s: %s", processDataPath, strProcessData, "Running!");
int x1 = 0;
while (x1 < 10) {
this.dataAction(pid, processId);
x1++;
if (x1 > 9) {
zk.setData(processDataPath, "1".getBytes(), -1);
}
}
} else {
System.out.printf("\nCurrent Data @ ZK Path %s: %s: %s", processDataPath, strProcessData,
"Stop running!");
}
} else {
System.out.printf("\nCurrent Data @ ZK Path %s: %s: %s", zooDataPath, strStateData, "No running");
}
}
public void dataAction(Long pid, String processId) throws InterruptedException, KeeperException {
String testDataPath = zooDataPath + "/" + pid + "/" + pid;
System.out.println("\n1. Create ZooKeeper Node (znode :" + testDataPath + ", Data: " + processId.getBytes()
+ ",Permissions: OPEN_ACL_UNSAFE ,NodeType: Persistent");
zk.create(testDataPath, processId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("\n2. See whether to create success: ");
System.out.println(new String(zk.getData(testDataPath, false, null)));
System.out.println("\n3. Go to modify the node data ");
zk.setData(testDataPath, "Test-Zookeeper".getBytes(), -1);
System.out.println("\n3-1. Modify the node data again ");
zk.setData(testDataPath, "Test-Zookeeper-CHGTWO".getBytes(), -1);
System.out.println("\n4. See if modified successfully: ");
System.out.println(new String(zk.getData(testDataPath, false, null)));
System.out.println("\n5. Remove nodes ");
zk.delete(testDataPath, -1);
System.out.println("\n6. Check to see if the node is removed: ");
System.out.println(" Node State: [" + zk.exists(testDataPath, true) + "]");
}
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
runAction();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) throws InterruptedException, KeeperException {
TestClient1 testClient1 = new TestClient1();
testClient1.runAction();
testClient1.run();
}
}
2.strace监控系统调用
在客户端运行中可以执行strace,收集客户端运行的相关系统调用
strace -f -F -o test.out java -cp Demo1-0.0.1-SNAPSHOT.jar com.zc.demo.zookeeper.jtest.TestClient1
收集已启动进程的数据,增加-p参数设置process id
strace -f -F -o test.out -p pid
3.epoll及相关理解
Linux 2.6内核中提出了epoll,epoll包括epoll_create,epoll_ctl,epoll_wait三个函数分别负责创建epoll,注册监听的事件和等待事件产生。
epoll每次注册新的事件到epoll中时,都会把所有fd拷贝进内核,而不是在epoll_wait时重复拷贝,保证每个fd在整个过程中仅拷贝一次。此外,epoll将内核与用户进程空间mmap到同一块内存,将fd消息存于该内存避免了不必要的拷贝。
epoll使用事件的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就通过回调函数把就绪的fd加入一个就绪链表,唤醒epoll_wait进入睡眠的进程,epoll_wait通知消息给应用程序后再次睡眠。因此epoll不随着fd数目增加效率下降,只有活跃fd才会调用回调函数,效率与连接总数无关。
epoll没有最大并发连接的限制,1G内存约能监听10万个端口。
linux执行如下命令,输出epoll帮助
man 7 epoll
之前测试中客户端输出结果中,关于epoll多路复用器调用摘录
26460 socket(AF_NETLINK, SOCK_RAW|SOCK_CLOEXEC, NETLINK_ROUTE) = 24
26460 bind(24, {sa_family=AF_NETLINK, nl_pid=0, nl_groups=00000000}, 12) = 0
26460 getsockname(24, {sa_family=AF_NETLINK, nl_pid=26459, nl_groups=00000000}, [12]) = 0
26460 epoll_create(256)
26460 epoll_ctl(26, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=139801185484824}}) = 0
26471 epoll_ctl(26, EPOLL_CTL_ADD, 27, {EPOLLOUT, {u32=27, u64=139801185484827}}) = 0
26471 epoll_ctl(26, EPOLL_CTL_MOD, 27, {EPOLLIN|EPOLLOUT, {u32=27, u64=139801185484827}}) = 0
26471 epoll_wait(26, [{EPOLLIN, {u32=24, u64=139801185484824}}, {EPOLLOUT, {u32=27, u64=139801185484827}}], 8192, 666) = 2
26471 epoll_wait(26, [{EPOLLOUT, {u32=27, u64=139801185484827}}], 8192, 666) = 1