Programming With ZooKeeper

Zookeeper

题图来自网络

1

这是一篇关于快速入门ZooKeeper开发的翻译文章。
原文 参考-1 参考-2

2 前言

ZooKeeper 翻译成中文是“动物园管理员”,很有趣是不是,物如其名,它就是来协调管理分布式系统的。 ZooKeeperHadoop 分布式系统家族的一员,它是一个集中式的协调管理服务器,支持一系列的通用服务,比如管理配置信息,命名服务,提供分布式的同步,以及提供分组服务等。 所有这些类型的服务都在分布式应用中以不同形式在使用。

本文只是 Zookeeper 编程的初体验,让大家能从最最基础简单的例子来感受下,如果你想全面了解 ZooKeeper ,可以参考 http://ifeve.com/zookeeper-apache/

3 正文

在本教程中,我们会展示如何利用 ZooKeeper 实现简单的“屏障”和“生产者消费者队列”。我们称这两个类为 BarrierQueue 。 这些例子运行的前提是你至少有一个 ZooKeeper 服务器。

这两个原语都用到了下面这段代码 SyncPrimitivep

static ZooKeeper zk = null;
static final Object mutex = new Object();
String root;

SyncPrimitive(String address)
    throws KeeperException, IOException {
    if(zk == null){
        System.out.println("Starting ZK:");
        zk = new ZooKeeper(address, 3000, this);
        System.out.println("Finished starting ZK: " + zk);
    }
}

public void process(WatcherEvent event) {
    synchronized (mutex) {
        mutex.notify();
    }
}

BarrierQueue 都继承自 SyncPrimitive 。这样的话, SyncPrimitive 的构造函数里的步骤就是通用的,所有子类原语都会执行。 为了让例子简单,在初始化 BarrierQueue 时,我们首先就创建了一个 ZooKeeper 对象,并且声明了一个静态变量 zk 指向这个 ZooKeeper 对象。 下面 BarrierQueue 的例子中,会检查 ZooKeeper 对象是否存在。 另外,我们需要一个应用来创建一个 ZooKeeper 对象,并作为参数传递到 BarrierQueue 的构造函数里。

4 屏障(Barriers)

屏障原语是阻塞一组进程节点,直到满足特定的条件,该组进程节点才能正常向前推进(开始一次计算or结束一次计算)。 实现 Barriers 的基本思路是:产生一个 barrier 节点,将其作为所有独立进程的父节点。 然后,如果 barrier 节点名叫 /b1 ,每个进程 p 创建节点 /b1/p 。当足够的进程节点创建出来之后,这组进程节点就可以开始计算了。 在下面的代码实例化一个 Barrier 对象, Barrier 类的构造函数有3个参数:

  • address—ZooKeeper 服务器地址
  • name —屏障节点在 ZooKeeper 上的路径(例如, /b1=);
  • size —这一组进程的数量

Barrier 构造函数将 ZooKeeper 服务器地址传递给父类 SyncPrimitive 构造函数,父类会创建一个 ZooKeeper 实例(如果 ZooKeeper 为空的话)。 然后 Barrier 构造函数会在 ZooKeeper 对象上创建一个 Barrier 节点,它是所有进程的父节点,我们称之为根节点(注意,并非 ZooKeeper 的根节点)。

/**
******* Barrier constructor
*******
******* @param address
******* @param name
******* @param size
****** */
Barrier(String address, String name, int size)
    throws KeeperException, InterruptedException, UnknownHostException {
    super(address);
    this.root = name;
    this.size = size;

    // Create barrier node
    if (zk != null) {
        Stat s = zk.exists(root, false);
        if (s == null) {
            zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
        }
    }

    // My node name
    name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
}

进程调用 enter() 方法进入屏障。 进程创建一个节点挂在屏障节点下,用此进程的主机名表示节点名,然后它开始等待,直到足够数量的进程进入这一屏障。 进程通过 getChildren() 方法检查根节点下挂的子节点数量,并且在数量不足的情况下等待通知( wait )。 进程必须设置 watch 来接收根节点的变化, getChildren() 就是设置 watch 的方法。这段代码中, getChildren() 有两个参数, 第一个是被监听的根节点,第二个参数是允许进程设置 watch 的标识.

/**
******* Join barrier
*******
******* @return
******* @throws KeeperException
******* @throws InterruptedException
****** */
boolean enter() throws KeeperException, InterruptedException{
    zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateFlags.EPHEMERAL);
    while (true) {
        synchronized (mutex) {
            ArrayList<String> list = zk.getChildren(root, true);

            if (list.size() < size) {
                mutex.wait();
            } else {
                return true;
            }
        }
    }
}

enter() 方法会跑出两个异常 KeeperExceptionInterruptedException ,应用必须捕捉处理这些异常。 一旦计算完成,进程调用 leave() 方法离开屏障。 首先删除进程节点,取得根节点的所有子节点,如果子节点数量多于1个,进程 wait 直到收到 notification (注意: getChildren() 方法的第二个参数为 true ,表示 ZooKeeper 要在根节点上设置 watch 。 当收到 notification 后,还会在检查一次根节点是否还有子节点。)

/**
******* Wait until all reach barrier
*******
******* @return
******* @throws KeeperException
******* @throws InterruptedException
****** */
boolean leave() throws KeeperException, InterruptedException{
    zk.delete(root + "/" + name, 0);
    while (true) {
        synchronized (mutex) {
            ArrayList<String> list = zk.getChildren(root, true);
            if (list.size() > 0) {
                mutex.wait();
            } else {
                return true;
            }
        }
    }
}

5 生产者-消费者队列(Producer-Consumer Queues)

生产者-消费者队列是一种分布式的数据结构,被一组进程用来生产物品和消费物品。生产者进程创造新元素,加入队列。消费者进程移除元素,拿去执行。 在下面的实现代码中,元素是简单的整数。一个根节点代表一个队列,生产者进程创造一个新节点,作为子节点,加入队列中。 下面的代码片段是 Queue 对象的构造函数,就像 Barrier 对象一样,它首先调用父类 SyncPrimitive 的构造函数,创造一个 ZooKeeper 对象如果它不存在的话。 然后检查 Queue 的根节点是否存在,如果没有就创建一个。

/**
******* Constructor of producer-consumer queue
*******
******* @param address
******* @param name
****** */
Queue(String address, String name)
    throws KeeperException, InterruptedException {
    super(address);
    this.root = name;
    // Create ZK node name
    if (zk != null) {
        Stat s = zk.exists(root, false);
        if (s == null) {
            zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
        }
    }
}

产生一个元素:一个生产者进程调用 produce() 方法加入一个元素到队列里边,同时有一个整形参数。 本方法调用 create() 方法创建一个新节点,并用 SEQUENCE 标识告知 ZooKeeperZooKeeper 将新节点对应的顺序计数器的值附加到根节点上。 如此一来,我们在队列的元素上强制维持了一个总顺序,这样就确保了最早加入队列的元素最先被消费(先进先出)。

/**
******* Add element to the queue.
*******
******* @param i
******* @return
****** */
boolean produce(int i) throws KeeperException, InterruptedException{
    ByteBuffer b = ByteBuffer.allocate(4);
    byte[] value;

    // Add child with value i
    b.putInt(i);
    value = b.array();
    zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
              CreateFlags.SEQUENCE);

    return true;
}

消耗一个元素:一个消耗着进程获取根节点的子节点,读取计数器值最小的节点(也就是最先加入队列的节点),返回这个元素。 注意,如果此处有冲突,只有一个进程能正确执行,另外一个无法删除节点,删除操作会抛出异常。 调用 getChildren() 方法会返回字典序顺序的子节点列表。字典序不一定按照计数器值的数字顺序来,我们就需要决定哪个元素最小。 为了决定出拥有最小计数器值的子节点,我们遍历列表,并删除每个节点的前缀 element

/**
******* Remove first element from the queue.
*******
******* @return
******* @throws KeeperException
******* @throws InterruptedException
****** */
int consume() throws KeeperException, InterruptedException{
    int retvalue = -1;
    Stat stat = null;

    // Get the first element available
    while (true) {
        synchronized (mutex) {
            ArrayList<String> list = zk.getChildren(root, true);
            if (list.isEmpty()) {
                System.out.println("Going to wait");
                mutex.wait();
            } else {
                Integer min = new Integer(list.get(0).substring(7));
                for(String s : list){
                    Integer tempValue = new Integer(s.substring(7));
                    if(tempValue < min) min = tempValue;
                }
                System.out.println("Temporary value: " + root + "/element" + min);
                byte[] b = zk.getData(root + "/element" + min, false, stat);
                zk.delete(root + "/element" + min, 0);
                ByteBuffer buffer = ByteBuffer.wrap(b);
                retvalue = buffer.getInt();

                return retvalue;
            }
        }
    }
}

蜂蜜甜甜圈(Yi Ou)

2016-02-05

2016-11-10 Thu 13:03

Emacs 25.1.1 (Org mode 9.0)

2016-11-10 Thu 12:43