Zookeeper分布式队列的实现

原文地址:http://blog.****.net/Evankaka/article/details/70806752

摘要:本文要通过zookeeper实现一个简单可靠的分布式队列

本文源码请在这里下载:https://github.com/yangchunjian/DistributeLearning

一、队列

Zookeeper可以处理两种类型的队列:
(1)同步队列
当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。例如一个班去旅游,看是否所有人都到齐了,到齐了就发车。例如有个大任务分解为多个子任务,要所有子任务都完成了才能进入到下一流程。
(2)先进先出队列
按照FIFO方式进行入队和出队

例如实现生产者和消费者模型

二、实现思路
(1)同步队列
在zookeeper中先创建一个根目录 queue_sync,做为队列队列的消费者监视/queue/start节点,刚开始还没有这个节点,所以什么都不会做。入队操作就是在queue_sync下创建子节点,然后计算子节点的总数,看是否和队列的目标数量相同。如果相同,创建/queue_sync/start节点,由于/queue_sync/start这个节点有了状态变化,zookeeper就会通知监视者:队员已经到齐了,监视者得到通知后进行自己的后续流程

实现代码

[java] view plain copy
  1. package com.github.distribute.queue;  
  2.   
  3. import java.io.ByteArrayInputStream;  
  4. import java.io.ByteArrayOutputStream;  
  5. import java.io.ObjectInputStream;  
  6. import java.io.ObjectOutputStream;  
  7. import java.util.Collections;  
  8. import java.util.Comparator;  
  9. import java.util.List;  
  10. import java.util.concurrent.CountDownLatch;  
  11.   
  12. import org.I0Itec.zkclient.ExceptionUtil;  
  13. import org.I0Itec.zkclient.exception.ZkNoNodeException;  
  14. import org.apache.zookeeper.CreateMode;  
  15. import org.apache.zookeeper.WatchedEvent;  
  16. import org.apache.zookeeper.Watcher;  
  17. import org.apache.zookeeper.Watcher.Event.EventType;  
  18. import org.apache.zookeeper.ZooDefs;  
  19. import org.apache.zookeeper.ZooKeeper;  
  20. import org.apache.zookeeper.data.Stat;  
  21. import org.slf4j.Logger;  
  22. import org.slf4j.LoggerFactory;  
  23.   
  24. /** 
  25.  * 分布式队列,同步队列的实现 
  26.  *  
  27.  * @author linbingwen 
  28.  * 
  29.  * @param <T> 
  30.  */  
  31. public class DistributedQueue<T> {  
  32.     private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);  
  33.   
  34.     protected final ZooKeeper zooKeeper;// 用于操作zookeeper集群  
  35.     protected final String root;// 代表根节点  
  36.     private int queueSize;  
  37.     private String startPath = "/queue/start";  
  38.   
  39.     protected static final String Node_NAME = "n_";// 顺序节点的名称  
  40.   
  41.     public DistributedQueue(ZooKeeper zooKeeper, String root, int queueSize) {  
  42.         this.zooKeeper = zooKeeper;  
  43.         this.root = root;  
  44.         this.queueSize = queueSize;  
  45.         init();  
  46.     }  
  47.   
  48.     /** 
  49.      * 初始化根目录 
  50.      */  
  51.     private void init() {  
  52.         try {  
  53.             Stat stat = zooKeeper.exists(root, false);// 判断一下根目录是否存在  
  54.             if (stat == null) {  
  55.                 zooKeeper.create(root, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  56.             }  
  57.             zooKeeper.delete(startPath, -1); // 删除队列满的标志  
  58.         } catch (Exception e) {  
  59.             logger.error("create rootPath error", e);  
  60.         }  
  61.     }  
  62.   
  63.     /** 
  64.      * 获取队列的大小 
  65.      *  
  66.      * @return 
  67.      * @throws Exception 
  68.      */  
  69.     public int size() throws Exception {  
  70.         return zooKeeper.getChildren(root, false).size();  
  71.     }  
  72.   
  73.     /** 
  74.      * 判断队列是否为空 
  75.      *  
  76.      * @return 
  77.      * @throws Exception 
  78.      */  
  79.     public boolean isEmpty() throws Exception {  
  80.         return zooKeeper.getChildren(root, false).size() == 0;  
  81.     }  
  82.   
  83.     /** 
  84.      * bytes 转object 
  85.      *  
  86.      * @param bytes 
  87.      * @return 
  88.      */  
  89.     private Object ByteToObject(byte[] bytes) {  
  90.         Object obj = null;  
  91.         try {  
  92.             // bytearray to object  
  93.             ByteArrayInputStream bi = new ByteArrayInputStream(bytes);  
  94.             ObjectInputStream oi = new ObjectInputStream(bi);  
  95.   
  96.             obj = oi.readObject();  
  97.             bi.close();  
  98.             oi.close();  
  99.         } catch (Exception e) {  
  100.             logger.error("translation" + e.getMessage());  
  101.             e.printStackTrace();  
  102.         }  
  103.         return obj;  
  104.     }  
  105.   
  106.     /** 
  107.      * Object 转byte 
  108.      *  
  109.      * @param obj 
  110.      * @return 
  111.      */  
  112.     private byte[] ObjectToByte(java.lang.Object obj) {  
  113.         byte[] bytes = null;  
  114.         try {  
  115.             // object to bytearray  
  116.             ByteArrayOutputStream bo = new ByteArrayOutputStream();  
  117.             ObjectOutputStream oo = new ObjectOutputStream(bo);  
  118.             oo.writeObject(obj);  
  119.   
  120.             bytes = bo.toByteArray();  
  121.   
  122.             bo.close();  
  123.             oo.close();  
  124.         } catch (Exception e) {  
  125.             logger.error("translation" + e.getMessage());  
  126.             e.printStackTrace();  
  127.         }  
  128.         return bytes;  
  129.     }  
  130.   
  131.     /** 
  132.      * 向队列提供数据,队列满的话会阻塞等待直到start标志位清除 
  133.      *  
  134.      * @param element 
  135.      * @return 
  136.      * @throws Exception 
  137.      */  
  138.     public boolean offer(T element) throws Exception {  
  139.         // 构建数据节点的完整路径  
  140.         String nodeFullPath = root.concat("/").concat(Node_NAME);  
  141.         try {  
  142.             if (queueSize > size()) {  
  143.                 // 创建持久的节点,写入数据  
  144.                 zooKeeper.create(nodeFullPath, ObjectToByte(element), ZooDefs.Ids.OPEN_ACL_UNSAFE,  
  145.                         CreateMode.PERSISTENT);  
  146.                 // 再判断一下队列是否满  
  147.                 if (queueSize > size()) {  
  148.                     zooKeeper.delete(startPath, -1); // 确保不存在  
  149.                 } else {  
  150.                     zooKeeper.create(startPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  151.                 }  
  152.             } else {  
  153.                 // 创建队列满的标记  
  154.                 if (zooKeeper.exists(startPath, false) != null) {  
  155.                     zooKeeper.create(startPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  156.                 }  
  157.   
  158.                 final CountDownLatch latch = new CountDownLatch(1);  
  159.                 final Watcher previousListener = new Watcher() {  
  160.                     public void process(WatchedEvent event) {  
  161.                         if (event.getType() == EventType.NodeDeleted) {  
  162.                             latch.countDown();  
  163.                         }  
  164.                     }  
  165.                 };  
  166.   
  167.                 // 如果节点不存在会出现异常  
  168.                 zooKeeper.exists(startPath, previousListener);  
  169.                 latch.await();  
  170.                 offer(element);  
  171.   
  172.             }  
  173.         } catch (ZkNoNodeException e) {  
  174.             logger.error("", e);  
  175.         } catch (Exception e) {  
  176.             throw ExceptionUtil.convertToRuntimeException(e);  
  177.         }  
  178.         return true;  
  179.     }  
  180.   
  181.     /** 
  182.      * 从队列取数据,当有start标志位时,开始取数据,全部取完数据后才删除start标志 
  183.      *  
  184.      * @return 
  185.      * @throws Exception 
  186.      */  
  187.     @SuppressWarnings("unchecked")  
  188.     public T poll() throws Exception {  
  189.   
  190.         try {  
  191.             // 队列还没满  
  192.             if (zooKeeper.exists(startPath, false) == null) {  
  193.                 final CountDownLatch latch = new CountDownLatch(1);  
  194.                 final Watcher previousListener = new Watcher() {  
  195.                     public void process(WatchedEvent event) {  
  196.                         if (event.getType() == EventType.NodeCreated) {  
  197.                             latch.countDown();  
  198.                         }  
  199.                     }  
  200.                 };  
  201.   
  202.                 // 如果节点不存在会出现异常  
  203.                 zooKeeper.exists(startPath, previousListener);  
  204.   
  205.                 // 如果节点不存在会出现异常  
  206.                 latch.await();  
  207.             }  
  208.   
  209.             List<String> list = zooKeeper.getChildren(root, false);  
  210.             if (list.size() == 0) {  
  211.                 return null;  
  212.             }  
  213.             // 将队列按照由小到大的顺序排序  
  214.             Collections.sort(list, new Comparator<String>() {  
  215.                 public int compare(String lhs, String rhs) {  
  216.                     return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));  
  217.                 }  
  218.             });  
  219.   
  220.             /** 
  221.              * 将队列中的元素做循环,然后构建完整的路径,在通过这个路径去读取数据 
  222.              */  
  223.             for (String nodeName : list) {  
  224.                 String nodeFullPath = root.concat("/").concat(nodeName);  
  225.                 try {  
  226.                     T node = (T) ByteToObject(zooKeeper.getData(nodeFullPath, falsenull));  
  227.                     zooKeeper.delete(nodeFullPath, -1);  
  228.                     return node;  
  229.                 } catch (ZkNoNodeException e) {  
  230.                     logger.error("", e);  
  231.                 }  
  232.             }  
  233.             return null;  
  234.         } catch (Exception e) {  
  235.             throw ExceptionUtil.convertToRuntimeException(e);  
  236.         }  
  237.   
  238.     }  
  239.   
  240.     /** 
  241.      * 截取节点的数字的方法 
  242.      *  
  243.      * @param str 
  244.      * @param nodeName 
  245.      * @return 
  246.      */  
  247.     private String getNodeNumber(String str, String nodeName) {  
  248.         int index = str.lastIndexOf(nodeName);  
  249.         if (index >= 0) {  
  250.             index += Node_NAME.length();  
  251.             return index <= str.length() ? str.substring(index) : "";  
  252.         }  
  253.         return str;  
  254.   
  255.     }  
  256.   
  257. }  

代码还没验证,可能会有问题!

(2)先进先出队列

在zookeeper中先创建一个根目录 queue_fifo,做为队列。入队操作就是在queue_fifo下创建自增序的子节点,并把数据放入节点内。出队操作就是先找到queue_fifo下序号最下的那个节点,取出数据,然后删除此节点。

实现代码:

[java] view plain copy
  1. package com.github.distribute.queue;  
  2.   
  3. import java.util.Collections;  
  4. import java.util.Comparator;  
  5. import java.util.List;  
  6. import org.I0Itec.zkclient.ExceptionUtil;  
  7. import org.I0Itec.zkclient.ZkClient;  
  8. import org.I0Itec.zkclient.exception.ZkNoNodeException;  
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. import com.github.distribute.lock.zookeeper.BaseDistributedLock;  
  13.   
  14. /** 
  15.  * 分布式队列,生产者,消费者的实现 
  16.  * @author linbingwen 
  17.  * 
  18.  * @param <T> 
  19.  */  
  20. public class DistributedSimpleQueue<T> {  
  21.   
  22.     private static Logger logger = LoggerFactory.getLogger(BaseDistributedLock.class);  
  23.   
  24.     protected final ZkClient zkClient;//用于操作zookeeper集群  
  25.     protected final String root;//代表根节点  
  26.   
  27.     protected static final String Node_NAME = "n_";//顺序节点的名称  
  28.       
  29.   
  30.   
  31.     public DistributedSimpleQueue(ZkClient zkClient, String root) {  
  32.         this.zkClient = zkClient;  
  33.         this.root = root;  
  34.     }  
  35.       
  36.     //获取队列的大小  
  37.     public int size() {  
  38.         /** 
  39.          * 通过获取根节点下的子节点列表 
  40.          */  
  41.         return zkClient.getChildren(root).size();  
  42.     }  
  43.       
  44.     //判断队列是否为空  
  45.     public boolean isEmpty() {  
  46.         return zkClient.getChildren(root).size() == 0;  
  47.     }  
  48.       
  49.     /** 
  50.      * 向队列提供数据 
  51.      * @param element 
  52.      * @return 
  53.      * @throws Exception 
  54.      */  
  55.     public boolean offer(T element) throws Exception{  
  56.           
  57.         //构建数据节点的完整路径  
  58.         String nodeFullPath = root .concat( "/" ).concat( Node_NAME );  
  59.         try {  
  60.             //创建持久的节点,写入数据  
  61.             zkClient.createPersistentSequential(nodeFullPath , element);  
  62.         }catch (ZkNoNodeException e) {  
  63.             zkClient.createPersistent(root);  
  64.             offer(element);  
  65.         } catch (Exception e) {  
  66.             throw ExceptionUtil.convertToRuntimeException(e);  
  67.         }  
  68.         return true;  
  69.     }  
  70.   
  71.   
  72.     //从队列取数据  
  73.     @SuppressWarnings("unchecked")  
  74.     public T poll() throws Exception {  
  75.           
  76.         try {  
  77.   
  78.             List<String> list = zkClient.getChildren(root);  
  79.             if (list.size() == 0) {  
  80.                 return null;  
  81.             }  
  82.             //将队列按照由小到大的顺序排序  
  83.             Collections.sort(list, new Comparator<String>() {  
  84.                 public int compare(String lhs, String rhs) {  
  85.                     return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));  
  86.                 }  
  87.             });  
  88.               
  89.             /** 
  90.              * 将队列中的元素做循环,然后构建完整的路径,在通过这个路径去读取数据 
  91.              */  
  92.             for ( String nodeName : list ){  
  93.                   
  94.                 String nodeFullPath = root.concat("/").concat(nodeName);      
  95.                 try {  
  96.                     T node = (T) zkClient.readData(nodeFullPath);  
  97.                     zkClient.delete(nodeFullPath);  
  98.                     return node;  
  99.                 } catch (ZkNoNodeException e) {  
  100.                     logger.error("",e);  
  101.                 }  
  102.             }  
  103.               
  104.             return null;  
  105.               
  106.         } catch (Exception e) {  
  107.             throw ExceptionUtil.convertToRuntimeException(e);  
  108.         }  
  109.   
  110.     }  
  111.   
  112.       
  113.     private String getNodeNumber(String str, String nodeName) {  
  114.         int index = str.lastIndexOf(nodeName);  
  115.         if (index >= 0) {  
  116.             index += Node_NAME.length();  
  117.             return index <= str.length() ? str.substring(index) : "";  
  118.         }  
  119.         return str;  
  120.   
  121.     }  
  122.   
  123. }  

测试一下:

[java] view plain copy
  1. package com.github.distribute.queue;  
  2.   
  3. import java.io.Serializable;  
  4.   
  5. import org.I0Itec.zkclient.ZkClient;  
  6. import org.I0Itec.zkclient.serialize.SerializableSerializer;  
  7.   
  8. public class DistributedQueueTest {  
  9.   
  10.     public static void main(String[] args) {  
  11.         ZkClient zkClient = new ZkClient("127.0.0.1:2181"50005000new SerializableSerializer());  
  12.         DistributedSimpleQueue<SendObject> queue = new DistributedSimpleQueue<SendObject>(zkClient, "/Queue");  
  13.         new Thread(new ConsumerThread(queue)).start();  
  14.         new Thread(new ProducerThread(queue)).start();  
  15.   
  16.     }  
  17.   
  18. }  
  19.   
  20. class ConsumerThread implements Runnable {  
  21.     private DistributedSimpleQueue<SendObject> queue;  
  22.   
  23.     public ConsumerThread(DistributedSimpleQueue<SendObject> queue) {  
  24.         this.queue = queue;  
  25.     }  
  26.   
  27.     public void run() {  
  28.         for (int i = 0; i < 10000; i++) {  
  29.             try {  
  30.                 Thread.sleep((int) (Math.random() * 5000));// 随机睡眠一下  
  31.                 SendObject sendObject = (SendObject) queue.poll();  
  32.                 System.out.println("消费一条消息成功:" + sendObject);  
  33.             } catch (Exception e) {  
  34.             }  
  35.         }  
  36.     }  
  37. }  
  38.   
  39. class ProducerThread implements Runnable {  
  40.   
  41.     private DistributedSimpleQueue<SendObject> queue;  
  42.   
  43.     public ProducerThread(DistributedSimpleQueue<SendObject> queue) {  
  44.         this.queue = queue;  
  45.     }  
  46.   
  47.     public void run() {  
  48.         for (int i = 0; i < 10000; i++) {  
  49.             try {  
  50.                 Thread.sleep((int) (Math.random() * 5000));// 随机睡眠一下  
  51.                 SendObject sendObject = new SendObject(String.valueOf(i), "content" + i);  
  52.                 queue.offer(sendObject);  
  53.                 System.out.println("发送一条消息成功:" + sendObject);  
  54.             } catch (Exception e) {  
  55.             }  
  56.         }  
  57.     }  
  58.   
  59. }  
  60.   
  61. class SendObject implements Serializable {  
  62.   
  63.     private static final long serialVersionUID = 1L;  
  64.   
  65.     public SendObject(String id, String content) {  
  66.         this.id = id;  
  67.         this.content = content;  
  68.     }  
  69.   
  70.     private String id;  
  71.   
  72.     private String content;  
  73.   
  74.     public String getId() {  
  75.         return id;  
  76.     }  
  77.   
  78.     public void setId(String id) {  
  79.         this.id = id;  
  80.     }  
  81.   
  82.     public String getContent() {  
  83.         return content;  
  84.     }  
  85.   
  86.     public void setContent(String content) {  
  87.         this.content = content;  
  88.     }  
  89.   
  90.     @Override  
  91.     public String toString() {  
  92.         return "SendObject [id=" + id + ", content=" + content + "]";  
  93.     }  
  94.   
  95. }  

输出结果:

Zookeeper分布式队列的实现

本文源码请在这里下载:https://github.com/yangchunjian/DistributeLearning