基于zookeeper的分布式队列之FIFO队列
2、FIFO队列
fifo就是数据结构中的,first in first out的意思,就是先进先出,这是队列的特点,那么结合队列的特点如何通过zookeeper来实现这个特点了,看图:
1、入队列,我们可以在/queue节点下,不同的客户端创建临时有序节点,通过zookeeper的znode的类型我们知道,有序节点创建时会带编号的,且这个编号的有序递增的,就是为队列的先进创建的条件
2、出队列,如何先出队列呢,要获取到/queue节点下的所有子节点,然后获取子节点上的数字,对数字进行排序,然后取得到数字最小的节点,我们把这个节点pop出队列就可以了。
OK,代码搞起~~~~~
核心代码:
public class FIFOQueue<T> {
protected final ZkClient zkClient;
protected final String root;
// /queue/n_00000000010
protected static final String Node_name = "n_";
public FIFOQueue(ZkClient zkClient, String root) {
this.zkClient = zkClient;
this.root = root;
}
public int size() {
return zkClient.getChildren(root).size();
}
public boolean isEmpty() {
return zkClient.getChildren(root).size() == 0;
}
public boolean push(T element) throws Exception {
String path = root.concat("/").concat(Node_name);
try {
zkClient.createEphemeralSequential(path, element);
}catch (ZkNoNodeException e) {
zkClient.createPersistent(root);
push(element);
}catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
return true;
}
//拿到编号最小的,把最小数据删除
public T poll() throws Exception {
try {
List<String> childrens =
zkClient.getChildren(root);
if(childrens.size() == 0) {
return null;
}
Collections.sort(childrens, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return getNodeNumber(o1,Node_name).compareTo
(getNodeNumber(o2,Node_name));
}
});
//n_0000000001
String litterNode = childrens.get(0);
String fullPath = root.concat("/").concat(litterNode);
T data = (T)zkClient.readData(fullPath);
zkClient.delete(fullPath);
return data;
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
}
private String getNodeNumber(String str,String nodeName) {
int index = str.lastIndexOf(nodeName);
if(index >= 0) {
index += Node_name.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;
}
}
如是,zookeeper的FIFO队列就完成了~~~