分布式服務(wù)框架 Zookeeper
Zookeeper名字的由來是比較有趣的,下面的片段摘抄自《從PAXOS到ZOOKEEPER分布式一致性原理與實(shí)踐》一書:
Zookeeper最早起源于雅虎的研究院的一個(gè)研究小組。在當(dāng)時(shí),研究人員發(fā)現(xiàn),在雅虎內(nèi)部很多大型的系統(tǒng)需要依賴一個(gè)類似的系統(tǒng)進(jìn)行分布式協(xié)調(diào),但是這些系統(tǒng)往往存在分布式單點(diǎn)問題。所以雅虎的開發(fā)人員就試圖開發(fā)一個(gè)通用的無單點(diǎn)問題的分布式協(xié)調(diào)框架。在立項(xiàng)初期,考慮到很多項(xiàng)目都是用動(dòng)物的名字來命名的(例如著名的Pig項(xiàng)目),雅虎的工程師希望給這個(gè)項(xiàng)目也取一個(gè)動(dòng)物的名字。時(shí)任研究院的首席科學(xué)家Raghu Ramakrishnan開玩笑說:再這樣下去,我們這兒就變成動(dòng)物園了。此話一出,大家紛紛表示就叫動(dòng)物園管理員吧——因?yàn)楦鱾€(gè)以動(dòng)物命名的分布式組件放在一起,雅虎的整個(gè)分布式系統(tǒng)看上去就像一個(gè)大型的動(dòng)物園了,而Zookeeper正好用來進(jìn)行分布式環(huán)境的協(xié)調(diào)——于是,Zookeeper的名字由此誕生了。
Zookeeper 分布式服務(wù)框架是 Apache Hadoop 的一個(gè)子項(xiàng)目,它主要是用來解決分布式應(yīng)用中經(jīng)常遇到的一些數(shù)據(jù)管理問題,如:統(tǒng)一命名服務(wù)、狀態(tài)同步服務(wù)、集群管理、分布式應(yīng)用配置項(xiàng)的管理等。本文將 從使用者角度詳細(xì)介紹 Zookeeper 的安裝和配置文件中各個(gè)配置項(xiàng)的意義,以及分析 Zookeeper 的典型的應(yīng)用場景(配置文件的管理、集群管理、同步鎖、Leader 選舉、隊(duì)列管理等),用 Java 實(shí)現(xiàn)它們并給出示例代碼。
安裝和配置詳解
本文介紹的 Zookeeper 是以 3.4.5 這個(gè)穩(wěn)定版本為基礎(chǔ),最新的版本可以通過官網(wǎng) http://hadoop.apache.org/zookeeper/來獲取,Zookeeper 的安裝非常簡單,下面將從單機(jī)模式和集群模式兩個(gè)方面介紹 Zookeeper 的安裝和配置。
單機(jī)模式
單機(jī)安裝非常簡單,只要獲取到 Zookeeper 的壓縮包并解壓到某個(gè)目錄如:/home/zookeeper-3.4.5 下,Zookeeper 的啟動(dòng)腳本在 bin 目錄下,Linux 下的啟動(dòng)腳本是 zkServer.sh。
在你執(zhí)行啟動(dòng)腳本之前,還有幾個(gè)基本的配置項(xiàng)需要配置一下,Zookeeper 的配置文件在 conf 目錄下,這個(gè)目錄下有 zoo_sample.cfg 和 log4j.properties,你需要做的就是將 zoo_sample.cfg 改名為 zoo.cfg,因?yàn)?Zookeeper 在啟動(dòng)時(shí)會(huì)找這個(gè)文件作為默認(rèn)配置文件。下面詳細(xì)介紹一下,這個(gè)配置文件中各個(gè)配置項(xiàng)的意義。
- tickTime=2000
tickTime:這個(gè)時(shí)間是作為 Zookeeper 服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔,也就是每個(gè) tickTime 時(shí)間就會(huì)發(fā)送一個(gè)心跳。
- dataDir=D:/software/zookeeper-3.4.5/data
dataDir:顧名思義就是 Zookeeper 保存數(shù)據(jù)的目錄,默認(rèn)情況下,Zookeeper 將寫數(shù)據(jù)的日志文件也保存在這個(gè)目錄里。
- clientPort=2181
clientPort:這個(gè)端口就是客戶端連接 Zookeeper 服務(wù)器的端口,Zookeeper 會(huì)監(jiān)聽這個(gè)端口,接受客戶端的訪問請(qǐng)求。
當(dāng)這些配置項(xiàng)配置好后,你現(xiàn)在就可以啟動(dòng) Zookeeper 了,啟動(dòng)后要檢查 Zookeeper 是否已經(jīng)在服務(wù),可以通過 lsof -i:2181 命令查看是否有你配置的 clientPort 端口號(hào)在監(jiān)聽服務(wù)。
集群模式
Zookeeper 不僅可以單機(jī)提供服務(wù),同時(shí)也支持多機(jī)組成集群來提供服務(wù)。實(shí)際上 Zookeeper 還支持另外一種偽集群的方式,也就是可以在一臺(tái)物理機(jī)上運(yùn)行多個(gè) Zookeeper 實(shí)例,下面將介紹集群模式的安裝和配置。
Zookeeper 的集群模式的安裝和配置也不是很復(fù)雜,所要做的就是增加幾個(gè)配置項(xiàng)。集群模式除了上面的三個(gè)配置項(xiàng)還要增加下面幾個(gè)配置項(xiàng):
- initLimit=5
initLimit:這個(gè)配置項(xiàng)是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務(wù)器的客戶端,而是 Zookeeper 服務(wù)器集群中連接到 Leader 的 Follower 服務(wù)器)初始化連接時(shí)最長能忍受多少個(gè)心跳時(shí)間間隔數(shù)。當(dāng)已經(jīng)超過 5 個(gè)心跳的時(shí)間(也就是 tickTime)長度后 Zookeeper 服務(wù)器還沒有收到客戶端的返回信息,那么表明這個(gè)客戶端連接失敗。總的時(shí)間長度就是 5*2000=10 秒。
- syncLimit=2
syncLimit:這個(gè)配置項(xiàng)標(biāo)識(shí) Leader 與 Follower 之間發(fā)送消息,請(qǐng)求和應(yīng)答時(shí)間長度,最長不能超過多少個(gè) tickTime 的時(shí)間長度,總的時(shí)間長度就是 2*2000=4 秒。
- server.1=192.168.211.1:2888:3888
server.2=192.168.211.2:2888:3888
server.3=192.168.211.3:2888:3888
server.A=B:C:D,其中 A 是一個(gè)數(shù)字,表示這個(gè)是第幾號(hào)服務(wù)器;B 是這個(gè)服務(wù)器的 ip 地址;C 表示的是這個(gè)服務(wù)器與集群中的 Leader 服務(wù)器交換信息的端口;D 表示的是萬一集群中的 Leader 服務(wù)器掛了,需要一個(gè)端口來重新進(jìn)行選舉,選出一個(gè)新的 Leader,而這個(gè)端口就是用來執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。如果是偽集群的配置方式,由于 B 都是一樣,所以不同的 Zookeeper 實(shí)例通信端口號(hào)不能一樣,所以要給它們分配不同的端口號(hào)。
除了修改 zoo.cfg 配置文件,集群模式下還要配置一個(gè)文件 myid,這個(gè)文件在 dataDir 目錄下,這個(gè)文件里面只有一個(gè)數(shù)據(jù)就是 A 的值,Zookeeper 啟動(dòng)時(shí)會(huì)讀取這個(gè)文件,拿到里面的數(shù)據(jù)與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個(gè) server。
數(shù)據(jù)模型
Zookeeper 會(huì)維護(hù)一個(gè)具有層次關(guān)系的數(shù)據(jù)結(jié)構(gòu),它非常類似于一個(gè)標(biāo)準(zhǔn)的文件系統(tǒng),如圖 1 所示:
圖 1 Zookeeper 數(shù)據(jù)結(jié)構(gòu)

Zookeeper 這種數(shù)據(jù)結(jié)構(gòu)有如下這些特點(diǎn):
- 每個(gè)子目錄項(xiàng)如 NameService 都被稱作為 znode,這個(gè) znode 是被它所在的路徑唯一標(biāo)識(shí),如 Server1 這個(gè) znode 的標(biāo)識(shí)為 /NameService/Server1
- znode 可以有子節(jié)點(diǎn)目錄,并且每個(gè) znode 可以存儲(chǔ)數(shù)據(jù),注意 EPHEMERAL 類型的目錄節(jié)點(diǎn)不能有子節(jié)點(diǎn)目錄
- znode 是有版本的,每個(gè) znode 中存儲(chǔ)的數(shù)據(jù)可以有多個(gè)版本,也就是一個(gè)訪問路徑中可以存儲(chǔ)多份數(shù)據(jù)
- znode 可以是臨時(shí)節(jié)點(diǎn),一旦創(chuàng)建這個(gè) znode 的客戶端與服務(wù)器失去聯(lián)系,這個(gè) znode 也將自動(dòng)刪除,Zookeeper 的客戶端和服務(wù)器通信采用長連接方式,每個(gè)客戶端和服務(wù)器通過心跳來保持連接,這個(gè)連接狀態(tài)稱為 session,如果 znode 是臨時(shí)節(jié)點(diǎn),這個(gè) session 失效,znode 也就刪除了
- znode 的目錄名可以自動(dòng)編號(hào),如 App1 已經(jīng)存在,再創(chuàng)建的話,將會(huì)自動(dòng)命名為 App2
- znode 可以被監(jiān)控,包括這個(gè)目錄節(jié)點(diǎn)中存儲(chǔ)的數(shù)據(jù)的修改,子節(jié)點(diǎn)目錄的變化等,一旦變化可以通知設(shè)置監(jiān)控的客戶端,這個(gè)是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于這個(gè)特性實(shí)現(xiàn)的,后面在典型的應(yīng)用場景中會(huì)有實(shí)例介紹
原生API
Zookeeper 作為一個(gè)分布式的服務(wù)框架,主要用來解決分布式集群中應(yīng)用系統(tǒng)的一致性問題,它能提供基于類似于文件系統(tǒng)的目錄節(jié)點(diǎn)樹方式的數(shù)據(jù)存儲(chǔ),但是 Zookeeper 并不是用來專門存儲(chǔ)數(shù)據(jù)的,它的作用主要是用來維護(hù)和監(jiān)控你存儲(chǔ)的數(shù)據(jù)的狀態(tài)變化。通過監(jiān)控這些數(shù)據(jù)狀態(tài)的變化,從而可以達(dá)到基于數(shù)據(jù)的集群管理,后面將會(huì)詳細(xì)介紹 Zookeeper 能夠解決的一些典型問題,這里先介紹一下,Zookeeper 的操作接口和簡單使用示例。
常用接口列表
客戶端要連接 Zookeeper 服務(wù)器可以通過創(chuàng)建 org.apache.zookeeper. ZooKeeper 的一個(gè)實(shí)例對(duì)象,然后調(diào)用這個(gè)類提供的接口來和服務(wù)器交互。
前面說了 ZooKeeper 主要是用來維護(hù)和監(jiān)控一個(gè)目錄節(jié)點(diǎn)樹中存儲(chǔ)的數(shù)據(jù)的狀態(tài),所有我們能夠操作 ZooKeeper 的也和操作目錄節(jié)點(diǎn)樹大體一樣,如創(chuàng)建一個(gè)目錄節(jié)點(diǎn),給某個(gè)目錄節(jié)點(diǎn)設(shè)置數(shù)據(jù),獲取某個(gè)目錄節(jié)點(diǎn)的所有子目錄節(jié)點(diǎn),給某個(gè)目錄節(jié)點(diǎn)設(shè)置權(quán)限和監(jiān)控這個(gè)目錄節(jié)點(diǎn)的狀態(tài)變化。
這些接口如下表所示:
| 方法名 | 方法功能描述 |
|---|---|
| String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) | 創(chuàng)建一個(gè)給定的目錄節(jié)點(diǎn) path, 并給它設(shè)置數(shù)據(jù),CreateMode 標(biāo)識(shí)有四種形式的目錄節(jié)點(diǎn),分別是 PERSISTENT:持久化目錄節(jié)點(diǎn),這個(gè)目錄節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)不會(huì)丟失;PERSISTENT_SEQUENTIAL:順序自動(dòng)編號(hào)的目錄節(jié)點(diǎn),這種目錄節(jié)點(diǎn)會(huì)根據(jù)當(dāng)前已近存在的節(jié)點(diǎn)數(shù)自動(dòng)加 1,然后返回給客戶端已經(jīng)成功創(chuàng)建的目錄節(jié)點(diǎn)名;EPHEMERAL:臨時(shí)目錄節(jié)點(diǎn),一旦創(chuàng)建這個(gè)節(jié)點(diǎn)的客戶端與服務(wù)器端也就是 session 超時(shí),這種節(jié)點(diǎn)會(huì)被自動(dòng)刪除;EPHEMERAL_SEQUENTIAL:臨時(shí)自動(dòng)編號(hào)節(jié)點(diǎn) |
| Stat exists(String path, boolean watch) | 判斷某個(gè) path 是否存在,并設(shè)置是否監(jiān)控這個(gè)目錄節(jié)點(diǎn),這里的 watcher 是在創(chuàng)建 ZooKeeper 實(shí)例時(shí)指定的 watcher,exists方法還有一個(gè)重載方法,可以指定特定的 watcher |
| Stat exists(String path, Watcher watcher) | 重載方法,這里給某個(gè)目錄節(jié)點(diǎn)設(shè)置特定的 watcher,Watcher 在 ZooKeeper 是一個(gè)核心功能,Watcher 可以監(jiān)控目錄節(jié)點(diǎn)的數(shù)據(jù)變化以及子目錄的變化,一旦這些狀態(tài)發(fā)生變化,服務(wù)器就會(huì)通知所有設(shè)置在這個(gè)目錄節(jié)點(diǎn)上的 Watcher,從而每個(gè)客戶端都很快知道它所關(guān)注的目錄節(jié)點(diǎn)的狀態(tài)發(fā)生變化,而做出相應(yīng)的反應(yīng) |
| void delete(String path, int version) | 刪除 path 對(duì)應(yīng)的目錄節(jié)點(diǎn),version 為 -1 可以匹配任何版本,也就刪除了這個(gè)目錄節(jié)點(diǎn)所有數(shù)據(jù) |
| List<String> getChildren(String path, boolean watch) | 獲取指定 path 下的所有子目錄節(jié)點(diǎn),同樣 getChildren方法也有一個(gè)重載方法可以設(shè)置特定的 watcher 監(jiān)控子節(jié)點(diǎn)的狀態(tài) |
| Stat setData(String path, byte[] data, int version) | 給 path 設(shè)置數(shù)據(jù),可以指定這個(gè)數(shù)據(jù)的版本號(hào),如果 version 為 -1 可以匹配任何版本 |
| byte[] getData(String path, boolean watch, Stat stat) | 獲取這個(gè) path 對(duì)應(yīng)的目錄節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù),數(shù)據(jù)的版本等信息可以通過 stat 來指定,同時(shí)還可以設(shè)置是否監(jiān)控這個(gè)目錄節(jié)點(diǎn)數(shù)據(jù)的狀態(tài) |
| void addAuthInfo(String scheme, byte[] auth) | 客戶端將自己的授權(quán)信息提交給服務(wù)器,服務(wù)器將根據(jù)這個(gè)授權(quán)信息驗(yàn)證客戶端的訪問權(quán)限 |
| List<ACL> getACL(String path, Stat stat) | 獲取某個(gè)目錄節(jié)點(diǎn)的訪問權(quán)限列表 |
除了以上這些列出的方法之外還有一些重載方法,如都提供了一個(gè)回調(diào)類的重載方法以及可以設(shè)置特定 Watcher 的重載方法,具體的方法可以參考 org.apache.zookeeper.ZooKeeper 類的 API 說明。
基本操作
下面給出操作 ZooKeeper 的示例代碼,這樣你就能對(duì) ZooKeeper 有直觀的認(rèn)識(shí)了。下面的清單包括了創(chuàng)建與 ZooKeeper 服務(wù)器的連接以及最基本的數(shù)據(jù)操作:
public class ZookeeperWatcher implements Watcher {
private ZooKeeper zk;
private CountDownLatch latch = new CountDownLatch(1);
private static final String CONNECTION_STR = "127.0.0.1:2181";
// 會(huì)話超時(shí)時(shí)間,參見下文對(duì)會(huì)話超時(shí)的描述
private static final int SESSION_TIMEOUT = 10000;
public ZookeeperWatcher() {
try {
zk = new ZooKeeper(CONNECTION_STR, SESSION_TIMEOUT, this);
System.out.println("開始建立zk連接...");
// 阻塞直到連接建立完畢
latch.await();
} catch (Exception e) {
if (zk != null) {
try {
zk.close();
zk = null;
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
String path = event.getPath();
System.out.println("受影響路徑:" + path);
KeeperState keeperState = event.getState();
EventType eventType = event.getType();
if (KeeperState.SyncConnected == keeperState) {
if (EventType.None == eventType) {
// 成功建立連接
System.out.println("event:成功建立與zk服務(wù)器的連接");
// 連接建立完成,通知程序往下執(zhí)行
latch.countDown();
}
if (EventType.NodeChildrenChanged == eventType) {
// 子節(jié)點(diǎn)發(fā)生變化
System.out.println("event:子節(jié)點(diǎn)發(fā)生變化");
}
if (EventType.NodeCreated == eventType) {
// 節(jié)點(diǎn)被創(chuàng)建
System.out.println("event:節(jié)點(diǎn)被創(chuàng)建");
}
if (EventType.NodeDataChanged == eventType) {
// 節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化
System.out.println("event:節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化");
}
if (EventType.NodeDeleted == eventType) {
// 節(jié)點(diǎn)被刪除
System.out.println("event:節(jié)點(diǎn)被刪除");
}
}
if (KeeperState.Disconnected == keeperState) {
System.out.println("斷開與zk的連接");
}
if (KeeperState.AuthFailed == keeperState) {
System.out.println("認(rèn)證失敗");
}
if (KeeperState.Expired == keeperState) {
System.out.println("會(huì)話超時(shí)");
}
}
/**
* 創(chuàng)建節(jié)點(diǎn)
* @param path
* @param data
* @param needWatch
*/
public void createNode(String path, String data, boolean needWatch) {
try {
Stat stat = this.zk.exists(path, needWatch);
if (stat == null) {
String res = this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("節(jié)點(diǎn)創(chuàng)建完成" + res);
} else {
System.out.println(path + " already exists..[" + stat + "]");
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 創(chuàng)建臨時(shí)節(jié)點(diǎn),會(huì)話結(jié)束后自動(dòng)刪除
* @param path
* @param data
*/
public void createTempNode(String path, String data, boolean needWatch) {
try {
this.zk.exists(path, needWatch);
this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 刪除指定節(jié)點(diǎn)
* @param path
*/
public void deleteNode(String path) {
try {
this.zk.delete(path, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* 讀取指定節(jié)點(diǎn)的數(shù)據(jù)
* @param path
* @param needWatch
* @return
*/
public String getData(String path, boolean needWatch) {
try {
String data = new String(this.zk.getData(path, needWatch, null));
return data;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
/**
* 更新指定節(jié)點(diǎn)的數(shù)據(jù)
* @param path
* @param data
*/
public void writeData(String path, String data) {
try {
this.zk.setData(path, data.getBytes(), -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 判斷指定節(jié)點(diǎn)是否存在
* @param path 節(jié)點(diǎn)路徑
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 獲取子節(jié)點(diǎn)
* @param path 節(jié)點(diǎn)路徑
*/
public List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 關(guān)閉連接
*/
public void closeConnection() {
if (zk != null) {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
ZookeeperWatcher zw = new ZookeeperWatcher();
// 觸發(fā)NodeCreated事件
zw.createNode("/super", "super", true);
// 觸發(fā)NodeChildrenChanged事件
List<String> childList = zw.getChildren("/super", true);
System.out.println(childList);
zw.createNode("/super/sub", "sub", true);
// 因?yàn)閣atch事件是一次性的,所以需要再次注冊(cè)
zw.getData("/super", true);
// 觸發(fā)NodeDataChanged事件
zw.writeData("/super", "newsuper");
// 再次注冊(cè)wacth事件
zw.getData("/super/sub", true);
// 觸發(fā)NodeDeleted事件
zw.deleteNode("/super/sub");
Thread.sleep(10000);
/*
// 關(guān)閉連接,臨時(shí)節(jié)點(diǎn)自動(dòng)被刪除
ZookeeperWatcher zw = new ZookeeperWatcher();
zw.createTempNode("/temp", "temp", true);
zw.exists("/temp", true);
zw.closeConnection();
*/
}
}
當(dāng)對(duì)目錄節(jié)點(diǎn)監(jiān)控狀態(tài)打開時(shí),一旦目錄節(jié)點(diǎn)的狀態(tài)發(fā)生變化,Watcher 對(duì)象的 process 方法就會(huì)被調(diào)用。
Watch觸發(fā)器
watch概述
ZooKeeper可以為所有的讀操作設(shè)置watch,這些讀操作包括:exists()、getChildren()及getData()。watch事件是一次性的觸發(fā)器,當(dāng)watch的對(duì)象狀態(tài)發(fā)生改變時(shí),將會(huì)觸發(fā)此對(duì)象上watch所對(duì)應(yīng)的事件。watch事件將被異步地發(fā)送給客戶端,并且ZooKeeper為watch機(jī)制提供了有序的一致性保證。理論上,客戶端接收watch事件的時(shí)間要快于其看到watch對(duì)象狀態(tài)變化的時(shí)間。
watch注冊(cè)與觸發(fā)

① exists操作上的watch,在被監(jiān)視的Znode創(chuàng)建、刪除或數(shù)據(jù)更新時(shí)被觸發(fā)。
② getData操作上的watch,在被監(jiān)視的Znode刪除或數(shù)據(jù)更新時(shí)被觸發(fā)。在被創(chuàng)建時(shí)不能被觸發(fā),因?yàn)橹挥衂node一定存在,getData操作才會(huì)成功。
③ getChildren操作上的watch,在被監(jiān)視的Znode的子節(jié)點(diǎn)創(chuàng)建或刪除,或是這個(gè)Znode自身被刪除時(shí)被觸發(fā)。可以通過查看watch事件類型來區(qū)分是Znode,還是他的子節(jié)點(diǎn)被刪除:NodeDelete表示Znode被刪除,NodeDeletedChanged表示子節(jié)點(diǎn)被刪除。
Watch由客戶端所連接的ZooKeeper服務(wù)器在本地維護(hù),因此watch可以非常容易地設(shè)置、管理和分派。當(dāng)客戶端連接到一個(gè)新的服務(wù)器時(shí),任何的會(huì)話事件都將可能觸發(fā)watch。另外,當(dāng)從服務(wù)器斷開連接的時(shí)候,watch將不會(huì)被接收。但是,當(dāng)一個(gè)客戶端重新建立連接的時(shí)候,任何先前注冊(cè)過的watch都會(huì)被重新注冊(cè)。
需要注意的幾點(diǎn)
Zookeeper的watch實(shí)際上要處理兩類事件:
① 連接狀態(tài)事件(type=None, path=null)
這類事件不需要注冊(cè),也不需要我們連續(xù)觸發(fā),我們只要處理就行了。
② 節(jié)點(diǎn)事件
節(jié)點(diǎn)的建立,刪除,數(shù)據(jù)的修改。它是one time trigger,我們需要不停的注冊(cè)觸發(fā),還可能發(fā)生事件丟失的情況。
上面2類事件都在Watch中處理,也就是重載的process(Event event)方法。
節(jié)點(diǎn)事件的觸發(fā),通過函數(shù)exists,getData或getChildren來處理這類函數(shù),有雙重作用:
① 注冊(cè)觸發(fā)事件
② 函數(shù)本身的功能
函數(shù)的本身的功能又可以用異步的回調(diào)函數(shù)來實(shí)現(xiàn),重載processResult()過程中處理函數(shù)本身的功能。
ZooKeeper 會(huì)話超時(shí)
1、會(huì)話概述
在ZooKeeper中,客戶端和服務(wù)端建立連接后,會(huì)話隨之建立,生成一個(gè)全局唯一的會(huì)話ID(Session ID)。服務(wù)器和客戶端之間維持的是一個(gè)長連接,在SESSION_TIMEOUT時(shí)間內(nèi),服務(wù)器會(huì)確定客戶端是否正常連接 (客戶端會(huì)定時(shí)向服務(wù)器發(fā)送heart_beat,服務(wù)器重置下次SESSION_TIMEOUT時(shí)間)。因此,在正常情況下,Session一直有效,并且ZK集群所有機(jī)器上都保存這個(gè)Session信息。在出現(xiàn)網(wǎng)絡(luò)或其它問題情況下(例如客戶端所連接的那臺(tái)ZK機(jī)器掛了,或是其它原因的網(wǎng)絡(luò)閃斷),客戶端與當(dāng)前連接的那臺(tái)服務(wù)器之間連接斷了,這個(gè)時(shí)候客戶端會(huì)主動(dòng)在地址列表(實(shí)例化ZK對(duì)象的時(shí)候傳入構(gòu)造方法的那個(gè)參數(shù)connectString)中選擇新的地址進(jìn)行連接。
2、連接斷開
好了,上面基本就是服務(wù)器與客戶端之間維持會(huì)話的過程了。在這個(gè)過程中,用戶可能會(huì)看到兩類異常CONNECTIONLOSS (連接斷開) 和 SESSIONEXPIRED (Session過期)。連接斷開(CONNECTIONLOSS)一般發(fā)生在網(wǎng)絡(luò)的閃斷或是客戶端所連接的服務(wù)器掛機(jī)的時(shí)候,這種情況下,ZooKeeper客戶端自己會(huì)首先感知到這個(gè)異常,具體邏輯是在如下方法中觸發(fā)的:一種場景是Server服務(wù)器掛了,這個(gè)時(shí)候,ZK客戶端首先會(huì)捕獲異常。
所以,現(xiàn)在對(duì)于“連接斷開”這個(gè)過程就一目了然了,核心流程如下:ZK客戶端捕獲“連接斷開”異常 ——> 獲取一個(gè)新的ZK地址 ——> 嘗試連接,
在這個(gè)流程中,我們可以發(fā)現(xiàn),整個(gè)過程不需要開發(fā)者額外的程序介入,都是ZK客戶端自己會(huì)進(jìn)行的,并且,使用的會(huì)話ID都是同一個(gè),所以結(jié)論就是:發(fā)生CONNECTIONLOSS的情況,應(yīng)用不需要做什么事情,等待ZK客戶端建立新的連接即可。
3、會(huì)話超時(shí)
SESSIONEXPIRED發(fā)生在上面藍(lán)色文字部分,這個(gè)通常是ZK客戶端與服務(wù)器的連接斷了,試圖連接上新的ZK機(jī)器,但是這個(gè)過程如果耗時(shí)過長,超過了SESSION_TIMEOUT 后還沒有成功連接上服務(wù)器,那么服務(wù)器認(rèn)為這個(gè)Session已經(jīng)結(jié)束了(服務(wù)器無法確認(rèn)是因?yàn)槠渌惓T蜻€是客戶端主動(dòng)結(jié)束會(huì)話),由于在ZK中,很多數(shù)據(jù)和狀態(tài)都是和會(huì)話綁定的,一旦會(huì)話失效,那么ZK就開始清除和這個(gè)會(huì)話有關(guān)的信息,包括這個(gè)會(huì)話創(chuàng)建的臨時(shí)節(jié)點(diǎn)和注冊(cè)的所有Watcher。在這之后,由于網(wǎng)絡(luò)恢復(fù)后,客戶端可能會(huì)重新連接上服務(wù)器,但是很不幸,服務(wù)器會(huì)告訴客戶端一個(gè)異常:SESSIONEXPIRED(會(huì)話過期)。此時(shí)客戶端的狀態(tài)變成 CLOSED狀態(tài),應(yīng)用要做的事情就是的看自己應(yīng)用的復(fù)雜程序了,要重新實(shí)例zookeeper對(duì)象,然后重新操作所有臨時(shí)數(shù)據(jù)(包括臨時(shí)節(jié)點(diǎn)和注冊(cè)Watcher),總之,會(huì)話超時(shí)在ZK使用過程中是真實(shí)存在的。
所以這里也簡單總結(jié)下,一旦發(fā)生會(huì)話超時(shí),那么存儲(chǔ)在ZK上的所有臨時(shí)數(shù)據(jù)與注冊(cè)的訂閱者都會(huì)被移除,此時(shí)需要重新創(chuàng)建一個(gè)ZooKeeper客戶端實(shí)例,需要自己編碼做一些額外的處理。
4、會(huì)話時(shí)間(Session Time)
在《ZooKeeper API 使用》一文中已經(jīng)提到,在實(shí)例化一個(gè)ZK客戶端的時(shí)候,需要設(shè)置一個(gè)會(huì)話的超時(shí)時(shí)間。這里需要注意的一點(diǎn)是,客戶端并不是可以隨意設(shè)置這個(gè)會(huì)話超時(shí)時(shí)間,在ZK服務(wù)器端對(duì)會(huì)話超時(shí)時(shí)間是有限制的,主要是minSessionTimeout和maxSessionTimeout這兩個(gè)參數(shù)設(shè)置的。(詳細(xì)查看這個(gè)文章《ZooKeeper管理員指南》)Session超時(shí)時(shí)間限制,如果客戶端設(shè)置的超時(shí)時(shí)間不在這個(gè)范圍,那么會(huì)被強(qiáng)制設(shè)置為最大或最小時(shí)間。 默認(rèn)的Session超時(shí)時(shí)間是在2 * tickTime ~ 20 * tickTime。所以,如果應(yīng)用對(duì)于這個(gè)會(huì)話超時(shí)時(shí)間有特殊的需求的話,一定要和ZK管理員溝通好,確認(rèn)好服務(wù)端是否設(shè)置了對(duì)會(huì)話時(shí)間的限制。
ZooKeeper 典型的應(yīng)用場景
Zookeeper 從設(shè)計(jì)模式角度來看,是一個(gè)基于觀察者設(shè)計(jì)模式的分布式服務(wù)管理框架,它負(fù)責(zé)存儲(chǔ)和管理大家都關(guān)心的數(shù)據(jù),然后接受觀察者的注冊(cè),一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper 就將負(fù)責(zé)通知已經(jīng)在 Zookeeper 上注冊(cè)的那些觀察者做出相應(yīng)的反應(yīng),從而實(shí)現(xiàn)集群中類似 Master/Slave 管理模式,關(guān)于 Zookeeper 的詳細(xì)架構(gòu)等內(nèi)部細(xì)節(jié)可以閱讀 Zookeeper 的源碼。
下面詳細(xì)介紹這些典型的應(yīng)用場景,也就是 Zookeeper 到底能幫我們解決那些問題?下面將給出答案。
統(tǒng)一命名服務(wù)(Name Service)
分布式應(yīng)用中,通常需要有一套完整的命名規(guī)則,既能夠產(chǎn)生唯一的名稱又便于人識(shí)別和記住,通常情況下用樹形的名稱結(jié)構(gòu)是一個(gè)理想的選擇,樹形的名稱結(jié)構(gòu)是一 個(gè)有層次的目錄結(jié)構(gòu),既對(duì)人友好又不會(huì)重復(fù)。說到這里你可能想到了 JNDI,沒錯(cuò) Zookeeper 的 Name Service 與 JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結(jié)構(gòu)關(guān)聯(lián)到一定資源上,但是 Zookeeper 的 Name Service 更加是廣泛意義上的關(guān)聯(lián),也許你并不需要將名稱關(guān)聯(lián)到特定資源上,你可能只需要一個(gè)不會(huì)重復(fù)名稱,就像數(shù)據(jù)庫中產(chǎn)生一個(gè)唯一的數(shù)字主鍵一樣。
Name Service 已經(jīng)是 Zookeeper 內(nèi)置的功能,你只要調(diào)用 Zookeeper 的 API 就能實(shí)現(xiàn)。如調(diào)用 create 接口就可以很容易創(chuàng)建一個(gè)目錄節(jié)點(diǎn)。
配置管理(Configuration Management)
配置的管理在分布式應(yīng)用環(huán)境中很常見,例如同一個(gè)應(yīng)用系統(tǒng)需要多臺(tái) PC Server 運(yùn)行,但是它們運(yùn)行的應(yīng)用系統(tǒng)的某些配置項(xiàng)是相同的,如果要修改這些相同的配置項(xiàng),那么就必須同時(shí)修改每臺(tái)運(yùn)行這個(gè)應(yīng)用系統(tǒng)的 PC Server,這樣非常麻煩而且容易出錯(cuò)。
像 這樣的配置信息完全可以交給 Zookeeper 來管理,將配置信息保存在 Zookeeper 的某個(gè)目錄節(jié)點(diǎn)中,然后將所有需要修改的應(yīng)用機(jī)器監(jiān)控配置信息的狀態(tài),一旦配置信息發(fā)生變化,每臺(tái)應(yīng)用機(jī)器就會(huì)收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應(yīng)用到系統(tǒng)中。

集群管理(Group Membership)
Zookeeper 能夠很容易的實(shí)現(xiàn)集群管理的功能,如有多臺(tái) Server 組成一個(gè)服務(wù)集群,那么必須要一個(gè)“總管”知道當(dāng)前集群中每臺(tái)機(jī)器的服務(wù)狀態(tài),一旦有機(jī)器不能提供服務(wù),集群中其它集群必須知道,從而做出調(diào)整重新分配服務(wù)策略。同樣當(dāng)增加集群的服務(wù)能力時(shí),就會(huì)增加一臺(tái)或多臺(tái) Server,同樣也必須讓“總管”知道。
Zookeeper 不僅能夠幫你維護(hù)當(dāng)前的集群中機(jī)器的服務(wù)狀態(tài),而且能夠幫你選出一個(gè)“總管”,讓這個(gè)總管來管理集群,這就是 Zookeeper 的另一個(gè)功能 Leader Election。
它們的實(shí)現(xiàn)方式都是在 Zookeeper 上創(chuàng)建一個(gè) EPHEMERAL 類型的目錄節(jié)點(diǎn),然后每個(gè) Server 在它們創(chuàng)建目錄節(jié)點(diǎn)的父目錄節(jié)點(diǎn)上調(diào)用 getChildren(String path, boolean watch) 方法并設(shè)置 watch 為 true,由于是 EPHEMERAL 目錄節(jié)點(diǎn),當(dāng)創(chuàng)建它的 Server 死去,這個(gè)目錄節(jié)點(diǎn)也隨之被刪除,所以 Children 將會(huì)變化,這時(shí) getChildren上的 Watch 將會(huì)被調(diào)用,所以其它 Server 就知道已經(jīng)有某臺(tái) Server 死去了。新增 Server 也是同樣的原理。
Zookeeper 如何實(shí)現(xiàn) Leader Election,也就是選出一個(gè) Master Server。和前面的一樣,每臺(tái) Server 創(chuàng)建一個(gè) EPHEMERAL 目錄節(jié)點(diǎn),不同的是它還是一個(gè) SEQUENTIAL 目錄節(jié)點(diǎn),所以它是個(gè) EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn)。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn),是因?yàn)槲覀兛梢越o每臺(tái) Server 編號(hào),我們可以選擇當(dāng)前是最小編號(hào)的 Server 為 Master,假如這個(gè)最小編號(hào)的 Server 死去,由于是 EPHEMERAL 節(jié)點(diǎn),死去的 Server 對(duì)應(yīng)的節(jié)點(diǎn)也被刪除,所以當(dāng)前的節(jié)點(diǎn)列表中又出現(xiàn)一個(gè)最小編號(hào)的節(jié)點(diǎn),我們就選擇這個(gè)節(jié)點(diǎn)為當(dāng)前 Master。這樣就實(shí)現(xiàn)了動(dòng)態(tài)選擇 Master,避免了傳統(tǒng)意義上單 Master 容易出現(xiàn)單點(diǎn)故障的問題。

共享鎖(Locks)
共享鎖在同一個(gè)進(jìn)程中很容易實(shí)現(xiàn),但是在跨進(jìn)程或者在不同 Server 之間就不好實(shí)現(xiàn)了。Zookeeper 卻很容易實(shí)現(xiàn)這個(gè)功能,實(shí)現(xiàn)方式也是需要獲得鎖的 Server 創(chuàng)建一個(gè) EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn),然后調(diào)用 getChildren方法獲取當(dāng)前的目錄節(jié)點(diǎn)列表中最小的目錄節(jié)點(diǎn)是不是就是自己創(chuàng)建的目錄節(jié)點(diǎn),如果正是自己創(chuàng)建的,那么它就獲得了這個(gè)鎖,如果不是那么它就調(diào)用 exists(String path, boolean watch) 方法并監(jiān)控 Zookeeper 上目錄節(jié)點(diǎn)列表的變化,一直到自己創(chuàng)建的節(jié)點(diǎn)是列表中最小編號(hào)的目錄節(jié)點(diǎn),從而獲得鎖,釋放鎖很簡單,只要?jiǎng)h除前面它自己所創(chuàng)建的目錄節(jié)點(diǎn)就行了。

同步鎖的關(guān)鍵代碼:
void getLock() throws KeeperException, InterruptedException{
List<String> list = zk.getChildren(root, false);
String[] nodes = list.toArray(new String[list.size()]);
Arrays.sort(nodes);
if(myZnode.equals(root+"/"+nodes[0])){
doAction();
}
else{
waitForLock(nodes[0]);
}
}
void waitForLock(String lower) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
if(stat != null){
mutex.wait();
}
else{
getLock();
}
}
隊(duì)列管理
Zookeeper 可以處理兩種類型的隊(duì)列:
當(dāng)一個(gè)隊(duì)列的成員都聚齊時(shí),這個(gè)隊(duì)列才可用,否則一直等待所有成員到達(dá),這種是同步隊(duì)列。
隊(duì)列按照 FIFO 方式進(jìn)行入隊(duì)和出隊(duì)操作,例如實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型。
同步隊(duì)列用 Zookeeper 實(shí)現(xiàn)的實(shí)現(xiàn)思路如下:
創(chuàng)建一個(gè)父目錄 /synchronizing,每個(gè)成員都監(jiān)控標(biāo)志位目錄 /synchronizing/start 是否存在,然后每個(gè)成員都加入這個(gè)隊(duì)列,加入隊(duì)列的方式就是創(chuàng)建 /synchronizing/member_i 的臨時(shí)目錄節(jié)點(diǎn),然后每個(gè)成員獲取 /synchronizing 目錄的所有目錄節(jié)點(diǎn),也就是 member_i。判斷 i 的值是否已經(jīng)是成員的個(gè)數(shù),如果小于成員個(gè)數(shù)等待 /synchronizing/start 的出現(xiàn),如果已經(jīng)相等就創(chuàng)建 /synchronizing/start。
用下面的流程圖更容易理解:

同步隊(duì)列的關(guān)鍵代碼如下:
void addQueue() throws KeeperException, InterruptedException{
zk.exists(root + "/start",true);
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
synchronized (mutex) {
List<String> list = zk.getChildren(root, false);
if (list.size() < size) {
mutex.wait();
} else {
zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
}
當(dāng)隊(duì)列沒滿是進(jìn)入 wait(),然后會(huì)一直等待 Watch 的通知,Watch 的代碼如下:
public void process(WatchedEvent event) {
if(event.getPath().equals(root + "/start") &&
event.getType() == Event.EventType.NodeCreated){
System.out.println("得到通知");
super.process(event);
doAction();
}
}
FIFO 隊(duì)列用 Zookeeper 實(shí)現(xiàn)思路如下:
實(shí)現(xiàn)的思路也非常簡單,就是在特定的目錄下創(chuàng)建 SEQUENTIAL 類型的子目錄 /queue_i,這樣就能保證所有成員加入隊(duì)列時(shí)都是有編號(hào)的,出隊(duì)列時(shí)通過 getChildren( ) 方法可以返回當(dāng)前所有的隊(duì)列中的元素,然后消費(fèi)其中最小的一個(gè),這樣就能保證 FIFO。
下面是生產(chǎn)者和消費(fèi)者這種隊(duì)列形式的示例代碼:
生產(chǎn)者代碼:
boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
消費(fèi)者代碼:
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
if(tempValue < min) min = tempValue;
}
byte[] b = zk.getData(root + "/element" + min,false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
總結(jié)
Zookeeper 作為 Hadoop 項(xiàng)目中的一個(gè)子項(xiàng)目,是 Hadoop 集群管理的一個(gè)必不可少的模塊,它主要用來控制集群中的數(shù)據(jù),如它管理 Hadoop 集群中的 NameNode,還有 Hbase 中 Master Election、Server 之間狀態(tài)同步等。
本文介紹的 Zookeeper 的基本知識(shí),以及介紹了幾個(gè)典型的應(yīng)用場景。這些都是 Zookeeper 的基本功能,最重要的是 Zoopkeeper 提供了一套很好的分布式集群管理的機(jī)制,就是它這種基于層次型的目錄樹的數(shù)據(jù)結(jié)構(gòu),并對(duì)樹中的節(jié)點(diǎn)進(jìn)行有效管理,從而可以設(shè)計(jì)出多種多樣的分布式的數(shù)據(jù)管理模型,而不僅僅局限于上面提到的幾個(gè)常用應(yīng)用場景。