Programming With ZooKeeper
题图来自网络
2 前言
ZooKeeper 翻译成中文是“动物园管理员”,很有趣是不是,物如其名,它就是来协调管理分布式系统的。
ZooKeeper 是 Hadoop 分布式系统家族的一员,它是一个集中式的协调管理服务器,支持一系列的通用服务,比如管理配置信息,命名服务,提供分布式的同步,以及提供分组服务等。
所有这些类型的服务都在分布式应用中以不同形式在使用。
本文只是 Zookeeper 编程的初体验,让大家能从最最基础简单的例子来感受下,如果你想全面了解 ZooKeeper ,可以参考 http://ifeve.com/zookeeper-apache/
3 正文
在本教程中,我们会展示如何利用 ZooKeeper 实现简单的“屏障”和“生产者消费者队列”。我们称这两个类为 Barrier 和 Queue 。
这些例子运行的前提是你至少有一个 ZooKeeper 服务器。
这两个原语都用到了下面这段代码 SyncPrimitivep :
static ZooKeeper zk = null; static final Object mutex = new Object(); String root; SyncPrimitive(String address) throws KeeperException, IOException { if(zk == null){ System.out.println("Starting ZK:"); zk = new ZooKeeper(address, 3000, this); System.out.println("Finished starting ZK: " + zk); } } public void process(WatcherEvent event) { synchronized (mutex) { mutex.notify(); } }
Barrier 和 Queue 都继承自 SyncPrimitive 。这样的话, SyncPrimitive 的构造函数里的步骤就是通用的,所有子类原语都会执行。
为了让例子简单,在初始化 Barrier 或 Queue 时,我们首先就创建了一个 ZooKeeper 对象,并且声明了一个静态变量 zk 指向这个 ZooKeeper 对象。
下面 Barrier 和 Queue 的例子中,会检查 ZooKeeper 对象是否存在。
另外,我们需要一个应用来创建一个 ZooKeeper 对象,并作为参数传递到 Barrier 和 Queue 的构造函数里。
4 屏障(Barriers)
屏障原语是阻塞一组进程节点,直到满足特定的条件,该组进程节点才能正常向前推进(开始一次计算or结束一次计算)。
实现 Barriers 的基本思路是:产生一个 barrier 节点,将其作为所有独立进程的父节点。
然后,如果 barrier 节点名叫 /b1 ,每个进程 p 创建节点 /b1/p 。当足够的进程节点创建出来之后,这组进程节点就可以开始计算了。
在下面的代码实例化一个 Barrier 对象, Barrier 类的构造函数有3个参数:
- address—ZooKeeper 服务器地址
- name —屏障节点在
ZooKeeper上的路径(例如, /b1=); - size —这一组进程的数量
Barrier 构造函数将 ZooKeeper 服务器地址传递给父类 SyncPrimitive 构造函数,父类会创建一个 ZooKeeper 实例(如果 ZooKeeper 为空的话)。
然后 Barrier 构造函数会在 ZooKeeper 对象上创建一个 Barrier 节点,它是所有进程的父节点,我们称之为根节点(注意,并非 ZooKeeper 的根节点)。
/** ******* Barrier constructor ******* ******* @param address ******* @param name ******* @param size ****** */ Barrier(String address, String name, int size) throws KeeperException, InterruptedException, UnknownHostException { super(address); this.root = name; this.size = size; // Create barrier node if (zk != null) { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); } } // My node name name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); }
进程调用 enter() 方法进入屏障。
进程创建一个节点挂在屏障节点下,用此进程的主机名表示节点名,然后它开始等待,直到足够数量的进程进入这一屏障。
进程通过 getChildren() 方法检查根节点下挂的子节点数量,并且在数量不足的情况下等待通知( wait )。
进程必须设置 watch 来接收根节点的变化, getChildren() 就是设置 watch 的方法。这段代码中, getChildren() 有两个参数,
第一个是被监听的根节点,第二个参数是允许进程设置 watch 的标识.
/** ******* Join barrier ******* ******* @return ******* @throws KeeperException ******* @throws InterruptedException ****** */ boolean enter() throws KeeperException, InterruptedException{ zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL); while (true) { synchronized (mutex) { ArrayList<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { return true; } } } }
enter() 方法会跑出两个异常 KeeperException 和 InterruptedException ,应用必须捕捉处理这些异常。
一旦计算完成,进程调用 leave() 方法离开屏障。
首先删除进程节点,取得根节点的所有子节点,如果子节点数量多于1个,进程 wait 直到收到 notification
(注意: getChildren() 方法的第二个参数为 true ,表示 ZooKeeper 要在根节点上设置 watch 。
当收到 notification 后,还会在检查一次根节点是否还有子节点。)
/** ******* Wait until all reach barrier ******* ******* @return ******* @throws KeeperException ******* @throws InterruptedException ****** */ boolean leave() throws KeeperException, InterruptedException{ zk.delete(root + "/" + name, 0); while (true) { synchronized (mutex) { ArrayList<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } }
5 生产者-消费者队列(Producer-Consumer Queues)
生产者-消费者队列是一种分布式的数据结构,被一组进程用来生产物品和消费物品。生产者进程创造新元素,加入队列。消费者进程移除元素,拿去执行。
在下面的实现代码中,元素是简单的整数。一个根节点代表一个队列,生产者进程创造一个新节点,作为子节点,加入队列中。
下面的代码片段是 Queue 对象的构造函数,就像 Barrier 对象一样,它首先调用父类 SyncPrimitive 的构造函数,创造一个 ZooKeeper 对象如果它不存在的话。
然后检查 Queue 的根节点是否存在,如果没有就创建一个。
/** ******* Constructor of producer-consumer queue ******* ******* @param address ******* @param name ****** */ Queue(String address, String name) throws KeeperException, InterruptedException { super(address); this.root = name; // Create ZK node name if (zk != null) { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); } } }
产生一个元素:一个生产者进程调用 produce() 方法加入一个元素到队列里边,同时有一个整形参数。
本方法调用 create() 方法创建一个新节点,并用 SEQUENCE 标识告知 ZooKeeper , ZooKeeper 将新节点对应的顺序计数器的值附加到根节点上。
如此一来,我们在队列的元素上强制维持了一个总顺序,这样就确保了最早加入队列的元素最先被消费(先进先出)。
/** ******* Add element to the queue. ******* ******* @param i ******* @return ****** */ boolean produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; // Add child with value i b.putInt(i); value = b.array(); zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE); return true; }
消耗一个元素:一个消耗着进程获取根节点的子节点,读取计数器值最小的节点(也就是最先加入队列的节点),返回这个元素。
注意,如果此处有冲突,只有一个进程能正确执行,另外一个无法删除节点,删除操作会抛出异常。
调用 getChildren() 方法会返回字典序顺序的子节点列表。字典序不一定按照计数器值的数字顺序来,我们就需要决定哪个元素最小。
为了决定出拥有最小计数器值的子节点,我们遍历列表,并删除每个节点的前缀 element 。
/** ******* Remove first element from the queue. ******* ******* @return ******* @throws KeeperException ******* @throws InterruptedException ****** */ int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null; // Get the first element available while (true) { synchronized (mutex) { ArrayList<String> list = zk.getChildren(root, true); if (list.isEmpty()) { System.out.println("Going to wait"); mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); if(tempValue < min) min = tempValue; } System.out.println("Temporary value: " + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } }