Programming With ZooKeeper
题图来自网络
2 前言
ZooKeeper
翻译成中文是“动物园管理员”,很有趣是不是,物如其名,它就是来协调管理分布式系统的。
ZooKeeper
是 Hadoop
分布式系统家族的一员,它是一个集中式的协调管理服务器,支持一系列的通用服务,比如管理配置信息,命名服务,提供分布式的同步,以及提供分组服务等。
所有这些类型的服务都在分布式应用中以不同形式在使用。
本文只是 Zookeeper
编程的初体验,让大家能从最最基础简单的例子来感受下,如果你想全面了解 ZooKeeper
,可以参考 http://ifeve.com/zookeeper-apache/
3 正文
在本教程中,我们会展示如何利用 ZooKeeper
实现简单的“屏障”和“生产者消费者队列”。我们称这两个类为 Barrier
和 Queue
。
这些例子运行的前提是你至少有一个 ZooKeeper
服务器。
这两个原语都用到了下面这段代码 SyncPrimitivep
:
static ZooKeeper zk = null; static final Object mutex = new Object(); String root; SyncPrimitive(String address) throws KeeperException, IOException { if(zk == null){ System.out.println("Starting ZK:"); zk = new ZooKeeper(address, 3000, this); System.out.println("Finished starting ZK: " + zk); } } public void process(WatcherEvent event) { synchronized (mutex) { mutex.notify(); } }
Barrier
和 Queue
都继承自 SyncPrimitive
。这样的话, SyncPrimitive
的构造函数里的步骤就是通用的,所有子类原语都会执行。
为了让例子简单,在初始化 Barrier
或 Queue
时,我们首先就创建了一个 ZooKeeper
对象,并且声明了一个静态变量 zk
指向这个 ZooKeeper
对象。
下面 Barrier
和 Queue
的例子中,会检查 ZooKeeper
对象是否存在。
另外,我们需要一个应用来创建一个 ZooKeeper
对象,并作为参数传递到 Barrier
和 Queue
的构造函数里。
4 屏障(Barriers)
屏障原语是阻塞一组进程节点,直到满足特定的条件,该组进程节点才能正常向前推进(开始一次计算or结束一次计算)。
实现 Barriers
的基本思路是:产生一个 barrier
节点,将其作为所有独立进程的父节点。
然后,如果 barrier
节点名叫 /b1
,每个进程 p
创建节点 /b1/p
。当足够的进程节点创建出来之后,这组进程节点就可以开始计算了。
在下面的代码实例化一个 Barrier
对象, Barrier
类的构造函数有3个参数:
- address—ZooKeeper 服务器地址
- name —屏障节点在
ZooKeeper
上的路径(例如, /b1=); - size —这一组进程的数量
Barrier
构造函数将 ZooKeeper
服务器地址传递给父类 SyncPrimitive
构造函数,父类会创建一个 ZooKeeper
实例(如果 ZooKeeper
为空的话)。
然后 Barrier
构造函数会在 ZooKeeper
对象上创建一个 Barrier
节点,它是所有进程的父节点,我们称之为根节点(注意,并非 ZooKeeper
的根节点)。
/** ******* Barrier constructor ******* ******* @param address ******* @param name ******* @param size ****** */ Barrier(String address, String name, int size) throws KeeperException, InterruptedException, UnknownHostException { super(address); this.root = name; this.size = size; // Create barrier node if (zk != null) { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); } } // My node name name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); }
进程调用 enter()
方法进入屏障。
进程创建一个节点挂在屏障节点下,用此进程的主机名表示节点名,然后它开始等待,直到足够数量的进程进入这一屏障。
进程通过 getChildren()
方法检查根节点下挂的子节点数量,并且在数量不足的情况下等待通知( wait
)。
进程必须设置 watch
来接收根节点的变化, getChildren()
就是设置 watch
的方法。这段代码中, getChildren()
有两个参数,
第一个是被监听的根节点,第二个参数是允许进程设置 watch
的标识.
/** ******* Join barrier ******* ******* @return ******* @throws KeeperException ******* @throws InterruptedException ****** */ boolean enter() throws KeeperException, InterruptedException{ zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL); while (true) { synchronized (mutex) { ArrayList<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { return true; } } } }
enter()
方法会跑出两个异常 KeeperException
和 InterruptedException
,应用必须捕捉处理这些异常。
一旦计算完成,进程调用 leave()
方法离开屏障。
首先删除进程节点,取得根节点的所有子节点,如果子节点数量多于1个,进程 wait
直到收到 notification
(注意: getChildren()
方法的第二个参数为 true
,表示 ZooKeeper
要在根节点上设置 watch
。
当收到 notification
后,还会在检查一次根节点是否还有子节点。)
/** ******* Wait until all reach barrier ******* ******* @return ******* @throws KeeperException ******* @throws InterruptedException ****** */ boolean leave() throws KeeperException, InterruptedException{ zk.delete(root + "/" + name, 0); while (true) { synchronized (mutex) { ArrayList<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } }
5 生产者-消费者队列(Producer-Consumer Queues)
生产者-消费者队列是一种分布式的数据结构,被一组进程用来生产物品和消费物品。生产者进程创造新元素,加入队列。消费者进程移除元素,拿去执行。
在下面的实现代码中,元素是简单的整数。一个根节点代表一个队列,生产者进程创造一个新节点,作为子节点,加入队列中。
下面的代码片段是 Queue
对象的构造函数,就像 Barrier
对象一样,它首先调用父类 SyncPrimitive
的构造函数,创造一个 ZooKeeper
对象如果它不存在的话。
然后检查 Queue
的根节点是否存在,如果没有就创建一个。
/** ******* Constructor of producer-consumer queue ******* ******* @param address ******* @param name ****** */ Queue(String address, String name) throws KeeperException, InterruptedException { super(address); this.root = name; // Create ZK node name if (zk != null) { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); } } }
产生一个元素:一个生产者进程调用 produce()
方法加入一个元素到队列里边,同时有一个整形参数。
本方法调用 create()
方法创建一个新节点,并用 SEQUENCE
标识告知 ZooKeeper
, ZooKeeper
将新节点对应的顺序计数器的值附加到根节点上。
如此一来,我们在队列的元素上强制维持了一个总顺序,这样就确保了最早加入队列的元素最先被消费(先进先出)。
/** ******* Add element to the queue. ******* ******* @param i ******* @return ****** */ boolean produce(int i) throws KeeperException, InterruptedException{ ByteBuffer b = ByteBuffer.allocate(4); byte[] value; // Add child with value i b.putInt(i); value = b.array(); zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE); return true; }
消耗一个元素:一个消耗着进程获取根节点的子节点,读取计数器值最小的节点(也就是最先加入队列的节点),返回这个元素。
注意,如果此处有冲突,只有一个进程能正确执行,另外一个无法删除节点,删除操作会抛出异常。
调用 getChildren()
方法会返回字典序顺序的子节点列表。字典序不一定按照计数器值的数字顺序来,我们就需要决定哪个元素最小。
为了决定出拥有最小计数器值的子节点,我们遍历列表,并删除每个节点的前缀 element
。
/** ******* Remove first element from the queue. ******* ******* @return ******* @throws KeeperException ******* @throws InterruptedException ****** */ int consume() throws KeeperException, InterruptedException{ int retvalue = -1; Stat stat = null; // Get the first element available while (true) { synchronized (mutex) { ArrayList<String> list = zk.getChildren(root, true); if (list.isEmpty()) { System.out.println("Going to wait"); mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for(String s : list){ Integer tempValue = new Integer(s.substring(7)); if(tempValue < min) min = tempValue; } System.out.println("Temporary value: " + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } }