Zookeeper教程:快速開(kāi)始以及結(jié)合java實(shí)現(xiàn)分布式Barrier和Queue

一、快速開(kāi)始:

(1)概述:Zookeeper是Hadoop的一個(gè)子項(xiàng)目,它是分布式系統(tǒng)中的協(xié)調(diào)系統(tǒng),可提供的服務(wù)主要有:配置服務(wù)、名字服務(wù)、分布式同步、組服務(wù)等。?

(2)使用常見(jiàn):1,統(tǒng)一配置:把配置放在ZooKeeper的節(jié)點(diǎn)中維護(hù),當(dāng)配置變更時(shí),客戶端可以收到變更的通知,并應(yīng)用最新的配置。2,集群管理:集群中的節(jié)點(diǎn),創(chuàng)建ephemeral的節(jié)點(diǎn),一旦斷開(kāi)連接,ephemeral的節(jié)點(diǎn)會(huì)消失,其它的集群機(jī)器可以收到消息。3,分布式鎖:多個(gè)客戶端發(fā)起節(jié)點(diǎn)創(chuàng)建操作,只有一個(gè)客戶端創(chuàng)建成功,從而獲得鎖。?

(3)安裝和配置:通過(guò)官方下載鏈接zookeeper?進(jìn)行下載,解壓后進(jìn)入conf目錄,新建一個(gè)zoo.conf文件,配置內(nèi)容如下:

tickTime=2000? ?

dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/data

dataLogDir=/Users/lsq/Documents/zookeeper/zookeeper0/dataLog

clientPort=4399

initLimit=5

syncLimit=2

tickTime: ZooKeeper基本時(shí)間單位(ms)?

initLimit: 指定了啟動(dòng)zookeeper時(shí),zookeeper實(shí)例中的隨從實(shí)例同步到領(lǐng)導(dǎo)實(shí)例的初始化連接時(shí)間限制,超出時(shí)間限制則連接失敗(以tickTime為時(shí)間單位);?

syncLimit: 指定了zookeeper正常運(yùn)行時(shí),主從節(jié)點(diǎn)之間同步數(shù)據(jù)的時(shí)間限制,若超過(guò)這個(gè)時(shí)間限制,那么隨從實(shí)例將會(huì)被丟棄?

dataDir: zookeeper存放數(shù)據(jù)的目錄;?

clientPort: 用于連接客戶端的端口

接下來(lái)進(jìn)入bin目錄啟動(dòng)ZooKeeper實(shí)例以及客戶端連接:

./zkServer.sh start

./zkCli.sh -server localhost:4399

接下來(lái)看看集群如何配置,其實(shí)跟單機(jī)差不多,這里我們把剛剛下載的Zookeeper復(fù)制多兩份,一共是三個(gè),配置信息如下:

tickTime=2000? ?

dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/data

dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/dataLog

clientPort=4399

initLimit=5

syncLimit=2

server.1=127.0.0.1:8880:9990

server.2=127.0.0.1:8881:9991

server.3=127.0.0.1:8882:9992

三個(gè)文件夾下面的zoo.conf都是這個(gè)格式,需要修改dataDir,dataDir,clientPort,?

然后在dataDir所指向的目錄下面新建一個(gè)myid文件,對(duì)應(yīng)server.x,比如第一個(gè)文件夾下面的myid就填入一個(gè)1,第二個(gè)就填入一個(gè)2,以此類推。接著依次啟動(dòng)即可??梢圆捎孟旅娴拿?/p>

echo "1" > myid

二、使用java來(lái)操作ZooKeeper實(shí)例?

一門技術(shù)最重要的就算實(shí)戰(zhàn)了,接下來(lái)的內(nèi)容將圍繞這一部分來(lái)講。?

(1)首先是Znode的創(chuàng)建和刪除?

Znode有兩種類型:短暫的和持久的。短暫的znode在創(chuàng)建的客戶端與服務(wù)器端斷開(kāi)(無(wú)論是明確的斷開(kāi)還是故障斷開(kāi))連接時(shí),該znode都會(huì)被刪除;相反,持久的znode則不會(huì)

public class CreateGroup implements Watcher {

? ? //會(huì)話延時(shí)

? ? private static final int SESSION_TIMEOUT = 1000;

? ? //zk對(duì)象

? ? private ZooKeeper zk = null;

? ? //同步計(jì)數(shù)器

? ? private CountDownLatch countDownLatch = new CountDownLatch(1);

? ? //客戶端連接到服務(wù)器時(shí)會(huì)觸發(fā)觀察者進(jìn)行調(diào)用

? ? public void process(WatchedEvent event) {

? ? ? ? if(event.getState() == KeeperState.SyncConnected){

? ? ? ? ? ? countDownLatch.countDown();//計(jì)數(shù)器減一

? ? ? ? }

? ? }

? ? public void connect(String hosts) throws IOException, InterruptedException {

? ? ? ? zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);

? ? ? ? countDownLatch.await();//阻塞程序繼續(xù)執(zhí)行

? ? }

? ? //創(chuàng)建GROUP

? ? public void create(String groupName) throws KeeperException, InterruptedException{

? ? ? ? String path = "/" + groupName;

? ? ? ? //允許任何客戶端對(duì)該znode進(jìn)行讀寫,以及znode進(jìn)行持久化

? ? ? ? String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

? ? ? ? System.out.println("Created "+createPath);

? ? }

? ? //關(guān)閉zk

? ? public void close() throws InterruptedException{

? ? ? ? if(zk != null){

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? zk.close();

? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? throw e;

? ? ? ? ? ? }finally{

? ? ? ? ? ? ? ? zk = null;

? ? ? ? ? ? ? ? System.gc();

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? //測(cè)試主類

? ? public static void main(String args[]){

? ? ? ? String host = "127.0.0.1:4399";

? ? ? ? String groupName = "test";

? ? ? ? CreateGroup createGroup = new CreateGroup();

? ? ? ? try {

? ? ? ? ? ? createGroup.connect(host);

? ? ? ? ? ? createGroup.create(groupName);

? ? ? ? ? ? createGroup.close();

? ? ? ? ? ? createGroup = null;

? ? ? ? ? ? System.gc();

? ? ? ? } catch (IOException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } catch (KeeperException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }?

? ? }

}

接下來(lái)把創(chuàng)建和銷毀分離出來(lái)作為一個(gè)獨(dú)立的類,以后相關(guān)操作可以直接使用

public class ConnetctionWatcher implements Watcher {

? ? private static final int SESSION_TIMEOUT = 5000;

? ? protected ZooKeeper zk = null;

? ? private CountDownLatch countDownLatch = new CountDownLatch(1);

? ? public void process(WatchedEvent event) {

? ? ? ? KeeperState state = event.getState();

? ? ? ? if(state == KeeperState.SyncConnected){

? ? ? ? ? ? countDownLatch.countDown();

? ? ? ? }

? ? }

? ? public void connection(String hosts) throws IOException, InterruptedException {

? ? ? ? zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);

? ? ? ? countDownLatch.await();

? ? }

? ? public void close() throws InterruptedException {

? ? ? ? if (null != zk) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? zk.close();

? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? throw e;

? ? ? ? ? ? }finally{

? ? ? ? ? ? ? ? zk = null;

? ? ? ? ? ? ? ? System.gc();

? ? ? ? ? ? }

? ? ? ? }

? ? }

}

接下來(lái)我們看看節(jié)點(diǎn)如何刪除

public class DeleteGroup extends ConnetctionWatcher {

? ? public void delete(String groupName) {

? ? ? ? String path = "/" + groupName;

? ? ? ? try {

? ? ? ? ? ? List<String> children = zk.getChildren(path, false);

? ? ? ? ? ? for(String child : children){

? ? ? ? ? ? ? ? zk.delete(path + "/" + child, -1);

? ? ? ? ? ? }

? ? ? ? ? ? zk.delete(path, -1);//版本號(hào)為-1,

? ? ? ? } catch (KeeperException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

}

git上面還有官方給的例子的代碼,放在類Executor.java,DataMonitor.java,DataMonitorListener.java。有興趣可以看一下,試著自己實(shí)現(xiàn)一下。

三、利用java實(shí)現(xiàn)分布式Barrier?

Barrier是一種控制和協(xié)調(diào)多個(gè)任務(wù)觸發(fā)次序的機(jī)制。簡(jiǎn)單來(lái)說(shuō)就是用一個(gè)屏障把將要執(zhí)行的任務(wù)攔住,等待所有任務(wù)都處于可運(yùn)行狀態(tài)才放開(kāi)屏障,其實(shí)在單機(jī)上我們可以利用CyclicBarrier來(lái)實(shí)現(xiàn)這個(gè)機(jī)制,但是在分布式環(huán)境下,我們可以利用ZooKeeper可以派上用場(chǎng),我們可以利用一個(gè)Node來(lái)作為Barrier的實(shí)體,然后要Barrier的任務(wù)通過(guò)調(diào)用exists檢測(cè)是否Node存在,當(dāng)需要打開(kāi)Barrier時(shí)候,刪除這個(gè)Node,這樣ZooKeeper的watch機(jī)制會(huì)通知到各個(gè)任務(wù)可以開(kāi)始執(zhí)行。接下來(lái)看代碼:

public class Barrier extends SyncPrimitive {

? ? int size;

? ? String name;

? ? Barrier(String address, String root, int size) {

? ? ? ? super(address);

? ? ? ? this.root = root;

? ? ? ? this.size = size;

? ? ? ? //創(chuàng)建Barrier的Node

? ? ? ? if (zk != null) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? Stat s = zk.exists(root, false);

? ? ? ? ? ? ? ? if (s == null) {

? ? ? ? ? ? ? ? ? ? zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } catch (KeeperException e) {

? ? ? ? ? ? ? ? System.out.println("Keeper exception when instantiating queue: " + e.toString());

? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? System.out.println("Interrupted exception");

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? try {

? ? ? ? ? ? name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());

? ? ? ? } catch (UnknownHostException e) {

? ? ? ? ? ? System.out.println(e.toString());

? ? ? ? }

? ? }

? ? /**

? ? * 加入Barrier等待

? ? */

? ? boolean enter() throws KeeperException, InterruptedException{

? ? ? ? zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

? ? ? ? while (true) {

? ? ? ? ? ? synchronized (mutex) {

? ? ? ? ? ? ? ? List<String> list = zk.getChildren(root, true);

? ? ? ? ? ? ? ? if (list.size() < size) {

? ? ? ? ? ? ? ? ? ? mutex.wait();

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? return true;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? /**

? ? * 一直等待知道指定數(shù)量節(jié)點(diǎn)到達(dá)

? ? */

? ? boolean leave() throws KeeperException, InterruptedException{

? ? ? ? zk.delete(root + "/" + name, 0);

? ? ? ? while (true) {

? ? ? ? ? ? synchronized (mutex) {

? ? ? ? ? ? ? ? List<String> list = zk.getChildren(root, true);

? ? ? ? ? ? ? ? ? ? if (list.size() > 0) {

? ? ? ? ? ? ? ? ? ? ? ? mutex.wait();

? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? return true;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? }

}

父類代碼如下:

public class SyncPrimitive implements Watcher {

? ? static ZooKeeper zk = null;

? ? static Integer mutex;

? ? //根節(jié)點(diǎn)

? ? String root;

? ? SyncPrimitive(String address) {

? ? ? ? if(zk == null){

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? System.out.println("Starting ZK:");

? ? ? ? ? ? ? ? zk = new ZooKeeper(address, 3000, this);

? ? ? ? ? ? ? ? mutex = new Integer(-1);

? ? ? ? ? ? ? ? System.out.println("Finished starting ZK: " + zk);

? ? ? ? ? ? } catch (IOException e) {

? ? ? ? ? ? ? ? System.out.println(e.toString());

? ? ? ? ? ? ? ? zk = null;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? //else mutex = new Integer(-1);

? ? }

? ? synchronized public void process(WatchedEvent event) {

? ? ? ? synchronized (mutex) {

? ? ? ? ? ? System.out.println("Process: " + event.getType());

? ? ? ? ? ? mutex.notify();

? ? ? ? }

? ? }

? ? public static void queueTest(String args[]) {

? ? ? ? Queue q = new Queue(args[1], "/app1");

? ? ? ? System.out.println("Input: " + args[1]);

? ? ? ? int i;

? ? ? ? Integer max = new Integer(args[2]);

? ? ? ? if (args[3].equals("p")) {

? ? ? ? ? ? System.out.println("Producer");

? ? ? ? ? ? for (i = 0; i < max; i++)

? ? ? ? ? ? ? ? try{

? ? ? ? ? ? ? ? ? ? q.produce(10 + i);

? ? ? ? ? ? ? ? } catch (KeeperException e){

? ? ? ? ? ? ? ? } catch (InterruptedException e){

? ? ? ? ? ? ? ? }

? ? ? ? } else {

? ? ? ? ? ? System.out.println("Consumer");

? ? ? ? ? ? for (i = 0; i < max; i++) {

? ? ? ? ? ? ? ? try{

? ? ? ? ? ? ? ? ? ? int r = q.consume();

? ? ? ? ? ? ? ? ? ? System.out.println("Item: " + r);

? ? ? ? ? ? ? ? } catch (KeeperException e){

? ? ? ? ? ? ? ? ? ? i--;

? ? ? ? ? ? ? ? } catch (InterruptedException e){

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? public static void barrierTest(String args[]) {

? ? ? ? Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));

? ? ? ? try{

? ? ? ? ? ? boolean flag = b.enter();

? ? ? ? ? ? System.out.println("Entered barrier: " + args[2]);

? ? ? ? ? ? if(!flag) System.out.println("Error when entering the barrier");

? ? ? ? } catch (KeeperException e){

? ? ? ? } catch (InterruptedException e){

? ? ? ? }

? ? ? ? Random rand = new Random();

? ? ? ? int r = rand.nextInt(100);

? ? ? ? for (int i = 0; i < r; i++) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? Thread.sleep(100);

? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? try{

? ? ? ? ? ? b.leave();

? ? ? ? } catch (KeeperException e){

? ? ? ? } catch (InterruptedException e){

? ? ? ? }

? ? ? ? System.out.println("Left barrier");

? ? }

? ? //測(cè)試用的主類

? ? public static void main(String args[]) {

? ? ? ? /*

? ? ? ? args =new String[] {"qTest","localhost:4399","3","c"};

? ? ? ? if (args[0].equals("qTest"))

? ? ? ? ? ? queueTest(args);

? ? ? ? else

? ? ? ? ? ? barrierTest(args);

? ? ? ? */

? ? }

}

(四)分布式隊(duì)列(Queue)?

在分布式環(huán)境下,實(shí)現(xiàn)Queue需要高一致性來(lái)保證,那么我們可以這樣來(lái)設(shè)計(jì)。把一個(gè)Node當(dāng)成一個(gè)隊(duì)列,然后children用來(lái)存儲(chǔ)內(nèi)容,利用ZooKeeper提供的順序遞增的模式(會(huì)自動(dòng)在name后面加入一個(gè)遞增的數(shù)字來(lái)插入新元素)。于是在offer時(shí)候我們可以使用create,take時(shí)候按照順序把children第一個(gè)delete就可以了。ZooKeeper保證了各個(gè)server上數(shù)據(jù)是一致的。廢話不多說(shuō)了,直接看代碼

/**

* 一個(gè)消費(fèi)者-生產(chǎn)者模式的消息隊(duì)列

*/

public class Queue extends SyncPrimitive {

? ? Queue(String address, String name) {

? ? ? ? super(address);

? ? ? ? this.root = name;

? ? ? ? if (zk != null) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? Stat s = zk.exists(root, false);

? ? ? ? ? ? ? ? if (s == null) {

? ? ? ? ? ? ? ? ? ? zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? } catch (KeeperException e) {

? ? ? ? ? ? ? ? System.out.println("Keeper exception when instantiating queue: " + e.toString());

? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? System.out.println("Interrupted exception");

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? /**

? ? * 隊(duì)列中插入數(shù)據(jù)

? ? */

? ? boolean produce(int i) throws KeeperException, InterruptedException{

? ? ? ? ByteBuffer b = ByteBuffer.allocate(4);

? ? ? ? byte[] value;

? ? ? ? b.putInt(i);

? ? ? ? value = b.array();

? ? ? ? zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);

? ? ? ? return true;

? ? }

? ? /**

? ? * 把元素從隊(duì)列中移除

? ? */

? ? int consume() throws KeeperException, InterruptedException{

? ? ? ? int retvalue = -1;

? ? ? ? Stat stat = null;

? ? ? ? //得到現(xiàn)在隊(duì)列中首個(gè)可用的節(jié)點(diǎn)

? ? ? ? while (true) {

? ? ? ? ? ? synchronized (mutex) {

? ? ? ? ? ? ? ? List<String> list = zk.getChildren(root, true);

? ? ? ? ? ? ? ? if (list.size() == 0) {

? ? ? ? ? ? ? ? ? ? System.out.println("Going to wait");

? ? ? ? ? ? ? ? ? ? mutex.wait();

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? Integer min = new Integer(list.get(0).substring(7));

? ? ? ? ? ? ? ? ? ? for(String s : list){

? ? ? ? ? ? ? ? ? ? ? ? Integer tempValue = new Integer(s.substring(7));

? ? ? ? ? ? ? ? ? ? ? ? //System.out.println("Temporary value: " + tempValue);

? ? ? ? ? ? ? ? ? ? ? ? if(tempValue < min) min = tempValue;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? System.out.println("Temporary value: " + root + "/element" + min);

? ? ? ? ? ? ? ? ? ? byte[] b = zk.getData(root + "/element" + min, false, stat);

? ? ? ? ? ? ? ? ? ? zk.delete(root + "/element" + min, 0);

? ? ? ? ? ? ? ? ? ? ByteBuffer buffer = ByteBuffer.wrap(b);

? ? ? ? ? ? ? ? ? ? retvalue = buffer.getInt();

? ? ? ? ? ? ? ? ? ? return retvalue;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容