Zookeeper系列(三十二)Zookeeper场景应用之分布式队列

1.分布式队列的架构

Zookeeper系列(三十二)Zookeeper场景应用之分布式队列


2.向队列中提交数据流程


Zookeeper系列(三十二)Zookeeper场景应用之分布式队列


3.从队列中取数据流程

Zookeeper系列(三十二)Zookeeper场景应用之分布式队列


队列的组成

[java] view plain copy
 Zookeeper系列(三十二)Zookeeper场景应用之分布式队列Zookeeper系列(三十二)Zookeeper场景应用之分布式队列
  1. package com.jike.queue;  
  2.   
  3.   
  4. import java.util.Collections;  
  5. import java.util.Comparator;  
  6. import java.util.List;  
  7.   
  8.   
  9.   
  10.   
  11. import org.I0Itec.zkclient.ExceptionUtil;  
  12. import org.I0Itec.zkclient.ZkClient;  
  13. import org.I0Itec.zkclient.exception.ZkNoNodeException;  
  14.   
  15. public class DistributedSimpleQueue<T> {  
  16.   
  17.     protected final ZkClient zkClient;//用于操作zookeeper集群  
  18.     protected final String root;//代表根节点  
  19.   
  20.     protected static final String Node_NAME = "n_";//顺序节点的名称  
  21.       
  22.   
  23.   
  24.     public DistributedSimpleQueue(ZkClient zkClient, String root) {  
  25.         this.zkClient = zkClient;  
  26.         this.root = root;  
  27.     }  
  28.       
  29.     //获取队列的大小  
  30.     public int size() {  
  31.         /** 
  32.          * 通过获取根节点下的子节点列表 
  33.          */  
  34.         return zkClient.getChildren(root).size();  
  35.     }  
  36.       
  37.     //判断队列是否为空  
  38.     public boolean isEmpty() {  
  39.         return zkClient.getChildren(root).size() == 0;  
  40.     }  
  41.       
  42.     /** 
  43.      * 向队列提供数据 
  44.      * @param element 
  45.      * @return 
  46.      * @throws Exception 
  47.      */  
  48.     public boolean offer(T element) throws Exception{  
  49.           
  50.         //构建数据节点的完整路径  
  51.         String nodeFullPath = root .concat( "/" ).concat( Node_NAME );  
  52.         try {  
  53.             //创建持久的节点,写入数据  
  54.             zkClient.createPersistentSequential(nodeFullPath , element);  
  55.         }catch (ZkNoNodeException e) {  
  56.             zkClient.createPersistent(root);  
  57.             offer(element);  
  58.         } catch (Exception e) {  
  59.             throw ExceptionUtil.convertToRuntimeException(e);  
  60.         }  
  61.         return true;  
  62.     }  
  63.   
  64.   
  65.     //从队列取数据  
  66.     @SuppressWarnings("unchecked")  
  67.     public T poll() throws Exception {  
  68.           
  69.         try {  
  70.   
  71.             List<String> list = zkClient.getChildren(root);  
  72.             if (list.size() == 0) {  
  73.                 return null;  
  74.             }  
  75.             //将队列安装由小到大的顺序排序  
  76.             Collections.sort(list, new Comparator<String>() {  
  77.                 public int compare(String lhs, String rhs) {  
  78.                     return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));  
  79.                 }  
  80.             });  
  81.               
  82.             /** 
  83.              * 将队列中的元素做循环,然后构建完整的路径,在通过这个路径去读取数据 
  84.              */  
  85.             for ( String nodeName : list ){  
  86.                   
  87.                 String nodeFullPath = root.concat("/").concat(nodeName);      
  88.                 try {  
  89.                     T node = (T) zkClient.readData(nodeFullPath);  
  90.                     zkClient.delete(nodeFullPath);  
  91.                     return node;  
  92.                 } catch (ZkNoNodeException e) {  
  93.                     // ignore  
  94.                 }  
  95.             }  
  96.               
  97.             return null;  
  98.               
  99.         } catch (Exception e) {  
  100.             throw ExceptionUtil.convertToRuntimeException(e);  
  101.         }  
  102.   
  103.     }  
  104.   
  105.       
  106.     private String getNodeNumber(String str, String nodeName) {  
  107.         int index = str.lastIndexOf(nodeName);  
  108.         if (index >= 0) {  
  109.             index += Node_NAME.length();  
  110.             return index <= str.length() ? str.substring(index) : "";  
  111.         }  
  112.         return str;  
  113.   
  114.     }  
  115.   
  116. }  

发送的消息对象封装

[java] view plain copy
 Zookeeper系列(三十二)Zookeeper场景应用之分布式队列Zookeeper系列(三十二)Zookeeper场景应用之分布式队列
  1. package com.jike.queue;  
  2.   
  3. import java.io.Serializable;  
  4.   
  5. public class User implements Serializable {  
  6.       
  7.     String name;  
  8.     String id;  
  9.       
  10.     public String getName() {  
  11.         return name;  
  12.     }  
  13.     public void setName(String name) {  
  14.         this.name = name;  
  15.     }  
  16.     public String getId() {  
  17.         return id;  
  18.     }  
  19.     public void setId(String id) {  
  20.         this.id = id;  
  21.     }  
  22.       
  23.       
  24.   
  25. }  

Zookeeper构建分布式队列测试

[java] view plain copy
 Zookeeper系列(三十二)Zookeeper场景应用之分布式队列Zookeeper系列(三十二)Zookeeper场景应用之分布式队列
  1. package com.jike.queue;  
  2.   
  3. import org.I0Itec.zkclient.ZkClient;  
  4. import org.I0Itec.zkclient.serialize.SerializableSerializer;  
  5.   
  6. public class TestDistributedSimpleQueue {  
  7.   
  8.     public static void main(String[] args) {  
  9.           
  10.           
  11.         ZkClient zkClient = new ZkClient("192.168.1.105:2181"50005000new SerializableSerializer());  
  12.         DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue");  
  13.           
  14.         User user1 = new User();  
  15.         user1.setId("1");  
  16.         user1.setName("xiao wang");  
  17.           
  18.         User user2 = new User();  
  19.         user2.setId("2");  
  20.         user2.setName("xiao wang");       
  21.           
  22.         try {  
  23.             queue.offer(user1);  
  24.             queue.offer(user2);  
  25.             User u1 = (User) queue.poll();  
  26.             User u2 = (User) queue.poll();  
  27.               
  28.             if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){  
  29.                 System.out.println("Success!");  
  30.             }  
  31.               
  32.         } catch (Exception e) {  
  33.             e.printStackTrace();  
  34.         }  
  35.           
  36.     }  
  37.       
  38. }