1-分布式服務(wù)框架Zookeeper簡介

分布式服務(wù)框架 Zookeeper

Zookeeper名字的由來是比較有趣的,下面的片段摘抄自《從PAXOS到ZOOKEEPER分布式一致性原理與實(shí)踐》一書:

Zookeeper最早起源于雅虎的研究院的一個(gè)研究小組。在當(dāng)時(shí),研究人員發(fā)現(xiàn),在雅虎內(nèi)部很多大型的系統(tǒng)需要依賴一個(gè)類似的系統(tǒng)進(jìn)行分布式協(xié)調(diào),但是這些系統(tǒng)往往存在分布式單點(diǎn)問題。所以雅虎的開發(fā)人員就試圖開發(fā)一個(gè)通用的無單點(diǎn)問題的分布式協(xié)調(diào)框架。在立項(xiàng)初期,考慮到很多項(xiàng)目都是用動(dòng)物的名字來命名的(例如著名的Pig項(xiàng)目),雅虎的工程師希望給這個(gè)項(xiàng)目也取一個(gè)動(dòng)物的名字。時(shí)任研究院的首席科學(xué)家Raghu Ramakrishnan開玩笑說:再這樣下去,我們這兒就變成動(dòng)物園了。此話一出,大家紛紛表示就叫動(dòng)物園管理員吧——因?yàn)楦鱾€(gè)以動(dòng)物命名的分布式組件放在一起,雅虎的整個(gè)分布式系統(tǒng)看上去就像一個(gè)大型的動(dòng)物園了,而Zookeeper正好用來進(jìn)行分布式環(huán)境的協(xié)調(diào)——于是,Zookeeper的名字由此誕生了。

Zookeeper 分布式服務(wù)框架是 Apache Hadoop 的一個(gè)子項(xiàng)目,它主要是用來解決分布式應(yīng)用中經(jīng)常遇到的一些數(shù)據(jù)管理問題,如:統(tǒng)一命名服務(wù)、狀態(tài)同步服務(wù)、集群管理、分布式應(yīng)用配置項(xiàng)的管理等。本文將 從使用者角度詳細(xì)介紹 Zookeeper 的安裝和配置文件中各個(gè)配置項(xiàng)的意義,以及分析 Zookeeper 的典型的應(yīng)用場景(配置文件的管理、集群管理、同步鎖、Leader 選舉、隊(duì)列管理等),用 Java 實(shí)現(xiàn)它們并給出示例代碼。

安裝和配置詳解

本文介紹的 Zookeeper 是以 3.4.5 這個(gè)穩(wěn)定版本為基礎(chǔ),最新的版本可以通過官網(wǎng) http://hadoop.apache.org/zookeeper/來獲取,Zookeeper 的安裝非常簡單,下面將從單機(jī)模式和集群模式兩個(gè)方面介紹 Zookeeper 的安裝和配置。

單機(jī)模式

單機(jī)安裝非常簡單,只要獲取到 Zookeeper 的壓縮包并解壓到某個(gè)目錄如:/home/zookeeper-3.4.5 下,Zookeeper 的啟動(dòng)腳本在 bin 目錄下,Linux 下的啟動(dòng)腳本是 zkServer.sh。

在你執(zhí)行啟動(dòng)腳本之前,還有幾個(gè)基本的配置項(xiàng)需要配置一下,Zookeeper 的配置文件在 conf 目錄下,這個(gè)目錄下有 zoo_sample.cfglog4j.properties,你需要做的就是將 zoo_sample.cfg 改名為 zoo.cfg,因?yàn)?Zookeeper 在啟動(dòng)時(shí)會(huì)找這個(gè)文件作為默認(rèn)配置文件。下面詳細(xì)介紹一下,這個(gè)配置文件中各個(gè)配置項(xiàng)的意義。

  • tickTime=2000

tickTime:這個(gè)時(shí)間是作為 Zookeeper 服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔,也就是每個(gè) tickTime 時(shí)間就會(huì)發(fā)送一個(gè)心跳。

  • dataDir=D:/software/zookeeper-3.4.5/data

dataDir:顧名思義就是 Zookeeper 保存數(shù)據(jù)的目錄,默認(rèn)情況下,Zookeeper 將寫數(shù)據(jù)的日志文件也保存在這個(gè)目錄里。

  • clientPort=2181

clientPort:這個(gè)端口就是客戶端連接 Zookeeper 服務(wù)器的端口,Zookeeper 會(huì)監(jiān)聽這個(gè)端口,接受客戶端的訪問請(qǐng)求。

當(dāng)這些配置項(xiàng)配置好后,你現(xiàn)在就可以啟動(dòng) Zookeeper 了,啟動(dòng)后要檢查 Zookeeper 是否已經(jīng)在服務(wù),可以通過 lsof -i:2181 命令查看是否有你配置的 clientPort 端口號(hào)在監(jiān)聽服務(wù)。

集群模式

Zookeeper 不僅可以單機(jī)提供服務(wù),同時(shí)也支持多機(jī)組成集群來提供服務(wù)。實(shí)際上 Zookeeper 還支持另外一種偽集群的方式,也就是可以在一臺(tái)物理機(jī)上運(yùn)行多個(gè) Zookeeper 實(shí)例,下面將介紹集群模式的安裝和配置。

Zookeeper 的集群模式的安裝和配置也不是很復(fù)雜,所要做的就是增加幾個(gè)配置項(xiàng)。集群模式除了上面的三個(gè)配置項(xiàng)還要增加下面幾個(gè)配置項(xiàng):

  • initLimit=5

initLimit:這個(gè)配置項(xiàng)是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務(wù)器的客戶端,而是 Zookeeper 服務(wù)器集群中連接到 Leader 的 Follower 服務(wù)器)初始化連接時(shí)最長能忍受多少個(gè)心跳時(shí)間間隔數(shù)。當(dāng)已經(jīng)超過 5 個(gè)心跳的時(shí)間(也就是 tickTime)長度后 Zookeeper 服務(wù)器還沒有收到客戶端的返回信息,那么表明這個(gè)客戶端連接失敗。總的時(shí)間長度就是 5*2000=10 秒。

  • syncLimit=2

syncLimit:這個(gè)配置項(xiàng)標(biāo)識(shí) Leader 與 Follower 之間發(fā)送消息,請(qǐng)求和應(yīng)答時(shí)間長度,最長不能超過多少個(gè) tickTime 的時(shí)間長度,總的時(shí)間長度就是 2*2000=4 秒。

  • server.1=192.168.211.1:2888:3888

    server.2=192.168.211.2:2888:3888

    server.3=192.168.211.3:2888:3888

server.A=B:C:D,其中 A 是一個(gè)數(shù)字,表示這個(gè)是第幾號(hào)服務(wù)器;B 是這個(gè)服務(wù)器的 ip 地址;C 表示的是這個(gè)服務(wù)器與集群中的 Leader 服務(wù)器交換信息的端口;D 表示的是萬一集群中的 Leader 服務(wù)器掛了,需要一個(gè)端口來重新進(jìn)行選舉,選出一個(gè)新的 Leader,而這個(gè)端口就是用來執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。如果是偽集群的配置方式,由于 B 都是一樣,所以不同的 Zookeeper 實(shí)例通信端口號(hào)不能一樣,所以要給它們分配不同的端口號(hào)。

除了修改 zoo.cfg 配置文件,集群模式下還要配置一個(gè)文件 myid,這個(gè)文件在 dataDir 目錄下,這個(gè)文件里面只有一個(gè)數(shù)據(jù)就是 A 的值,Zookeeper 啟動(dòng)時(shí)會(huì)讀取這個(gè)文件,拿到里面的數(shù)據(jù)與 zoo.cfg 里面的配置信息比較從而判斷到底是哪個(gè) server。

數(shù)據(jù)模型

Zookeeper 會(huì)維護(hù)一個(gè)具有層次關(guān)系的數(shù)據(jù)結(jié)構(gòu),它非常類似于一個(gè)標(biāo)準(zhǔn)的文件系統(tǒng),如圖 1 所示:

圖 1 Zookeeper 數(shù)據(jù)結(jié)構(gòu)

image.png

Zookeeper 這種數(shù)據(jù)結(jié)構(gòu)有如下這些特點(diǎn):

  1. 每個(gè)子目錄項(xiàng)如 NameService 都被稱作為 znode,這個(gè) znode 是被它所在的路徑唯一標(biāo)識(shí),如 Server1 這個(gè) znode 的標(biāo)識(shí)為 /NameService/Server1
  2. znode 可以有子節(jié)點(diǎn)目錄,并且每個(gè) znode 可以存儲(chǔ)數(shù)據(jù),注意 EPHEMERAL 類型的目錄節(jié)點(diǎn)不能有子節(jié)點(diǎn)目錄
  3. znode 是有版本的,每個(gè) znode 中存儲(chǔ)的數(shù)據(jù)可以有多個(gè)版本,也就是一個(gè)訪問路徑中可以存儲(chǔ)多份數(shù)據(jù)
  4. znode 可以是臨時(shí)節(jié)點(diǎn),一旦創(chuàng)建這個(gè) znode 的客戶端與服務(wù)器失去聯(lián)系,這個(gè) znode 也將自動(dòng)刪除,Zookeeper 的客戶端和服務(wù)器通信采用長連接方式,每個(gè)客戶端和服務(wù)器通過心跳來保持連接,這個(gè)連接狀態(tài)稱為 session,如果 znode 是臨時(shí)節(jié)點(diǎn),這個(gè) session 失效,znode 也就刪除了
  5. znode 的目錄名可以自動(dòng)編號(hào),如 App1 已經(jīng)存在,再創(chuàng)建的話,將會(huì)自動(dòng)命名為 App2
  6. znode 可以被監(jiān)控,包括這個(gè)目錄節(jié)點(diǎn)中存儲(chǔ)的數(shù)據(jù)的修改,子節(jié)點(diǎn)目錄的變化等,一旦變化可以通知設(shè)置監(jiān)控的客戶端,這個(gè)是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于這個(gè)特性實(shí)現(xiàn)的,后面在典型的應(yīng)用場景中會(huì)有實(shí)例介紹

原生API

Zookeeper 作為一個(gè)分布式的服務(wù)框架,主要用來解決分布式集群中應(yīng)用系統(tǒng)的一致性問題,它能提供基于類似于文件系統(tǒng)的目錄節(jié)點(diǎn)樹方式的數(shù)據(jù)存儲(chǔ),但是 Zookeeper 并不是用來專門存儲(chǔ)數(shù)據(jù)的,它的作用主要是用來維護(hù)和監(jiān)控你存儲(chǔ)的數(shù)據(jù)的狀態(tài)變化。通過監(jiān)控這些數(shù)據(jù)狀態(tài)的變化,從而可以達(dá)到基于數(shù)據(jù)的集群管理,后面將會(huì)詳細(xì)介紹 Zookeeper 能夠解決的一些典型問題,這里先介紹一下,Zookeeper 的操作接口和簡單使用示例。

常用接口列表

客戶端要連接 Zookeeper 服務(wù)器可以通過創(chuàng)建 org.apache.zookeeper. ZooKeeper 的一個(gè)實(shí)例對(duì)象,然后調(diào)用這個(gè)類提供的接口來和服務(wù)器交互。

前面說了 ZooKeeper 主要是用來維護(hù)和監(jiān)控一個(gè)目錄節(jié)點(diǎn)樹中存儲(chǔ)的數(shù)據(jù)的狀態(tài),所有我們能夠操作 ZooKeeper 的也和操作目錄節(jié)點(diǎn)樹大體一樣,如創(chuàng)建一個(gè)目錄節(jié)點(diǎn),給某個(gè)目錄節(jié)點(diǎn)設(shè)置數(shù)據(jù),獲取某個(gè)目錄節(jié)點(diǎn)的所有子目錄節(jié)點(diǎn),給某個(gè)目錄節(jié)點(diǎn)設(shè)置權(quán)限和監(jiān)控這個(gè)目錄節(jié)點(diǎn)的狀態(tài)變化。

這些接口如下表所示:

方法名 方法功能描述
String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 創(chuàng)建一個(gè)給定的目錄節(jié)點(diǎn) path, 并給它設(shè)置數(shù)據(jù),CreateMode 標(biāo)識(shí)有四種形式的目錄節(jié)點(diǎn),分別是 PERSISTENT:持久化目錄節(jié)點(diǎn),這個(gè)目錄節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)不會(huì)丟失;PERSISTENT_SEQUENTIAL:順序自動(dòng)編號(hào)的目錄節(jié)點(diǎn),這種目錄節(jié)點(diǎn)會(huì)根據(jù)當(dāng)前已近存在的節(jié)點(diǎn)數(shù)自動(dòng)加 1,然后返回給客戶端已經(jīng)成功創(chuàng)建的目錄節(jié)點(diǎn)名;EPHEMERAL:臨時(shí)目錄節(jié)點(diǎn),一旦創(chuàng)建這個(gè)節(jié)點(diǎn)的客戶端與服務(wù)器端也就是 session 超時(shí),這種節(jié)點(diǎn)會(huì)被自動(dòng)刪除;EPHEMERAL_SEQUENTIAL:臨時(shí)自動(dòng)編號(hào)節(jié)點(diǎn)
Stat exists(String path, boolean watch) 判斷某個(gè) path 是否存在,并設(shè)置是否監(jiān)控這個(gè)目錄節(jié)點(diǎn),這里的 watcher 是在創(chuàng)建 ZooKeeper 實(shí)例時(shí)指定的 watcher,exists方法還有一個(gè)重載方法,可以指定特定的 watcher
Stat exists(String path, Watcher watcher) 重載方法,這里給某個(gè)目錄節(jié)點(diǎn)設(shè)置特定的 watcher,Watcher 在 ZooKeeper 是一個(gè)核心功能,Watcher 可以監(jiān)控目錄節(jié)點(diǎn)的數(shù)據(jù)變化以及子目錄的變化,一旦這些狀態(tài)發(fā)生變化,服務(wù)器就會(huì)通知所有設(shè)置在這個(gè)目錄節(jié)點(diǎn)上的 Watcher,從而每個(gè)客戶端都很快知道它所關(guān)注的目錄節(jié)點(diǎn)的狀態(tài)發(fā)生變化,而做出相應(yīng)的反應(yīng)
void delete(String path, int version) 刪除 path 對(duì)應(yīng)的目錄節(jié)點(diǎn),version 為 -1 可以匹配任何版本,也就刪除了這個(gè)目錄節(jié)點(diǎn)所有數(shù)據(jù)
List<String> getChildren(String path, boolean watch) 獲取指定 path 下的所有子目錄節(jié)點(diǎn),同樣 getChildren方法也有一個(gè)重載方法可以設(shè)置特定的 watcher 監(jiān)控子節(jié)點(diǎn)的狀態(tài)
Stat setData(String path, byte[] data, int version) 給 path 設(shè)置數(shù)據(jù),可以指定這個(gè)數(shù)據(jù)的版本號(hào),如果 version 為 -1 可以匹配任何版本
byte[] getData(String path, boolean watch, Stat stat) 獲取這個(gè) path 對(duì)應(yīng)的目錄節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù),數(shù)據(jù)的版本等信息可以通過 stat 來指定,同時(shí)還可以設(shè)置是否監(jiān)控這個(gè)目錄節(jié)點(diǎn)數(shù)據(jù)的狀態(tài)
void addAuthInfo(String scheme, byte[] auth) 客戶端將自己的授權(quán)信息提交給服務(wù)器,服務(wù)器將根據(jù)這個(gè)授權(quán)信息驗(yàn)證客戶端的訪問權(quán)限
List<ACL> getACL(String path, Stat stat) 獲取某個(gè)目錄節(jié)點(diǎn)的訪問權(quán)限列表

除了以上這些列出的方法之外還有一些重載方法,如都提供了一個(gè)回調(diào)類的重載方法以及可以設(shè)置特定 Watcher 的重載方法,具體的方法可以參考 org.apache.zookeeper.ZooKeeper 類的 API 說明。

基本操作

下面給出操作 ZooKeeper 的示例代碼,這樣你就能對(duì) ZooKeeper 有直觀的認(rèn)識(shí)了。下面的清單包括了創(chuàng)建與 ZooKeeper 服務(wù)器的連接以及最基本的數(shù)據(jù)操作:

public class ZookeeperWatcher implements Watcher {
    private ZooKeeper zk;
    private CountDownLatch latch = new CountDownLatch(1);
    private static final String CONNECTION_STR = "127.0.0.1:2181";
    // 會(huì)話超時(shí)時(shí)間,參見下文對(duì)會(huì)話超時(shí)的描述
    private static final int SESSION_TIMEOUT = 10000;
    
    public ZookeeperWatcher() {
        try {
            zk = new ZooKeeper(CONNECTION_STR, SESSION_TIMEOUT, this);
            System.out.println("開始建立zk連接...");
            // 阻塞直到連接建立完畢
            latch.await();
        } catch (Exception e) {
            if (zk != null) {
                try {
                    zk.close();
                    zk = null;
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            e.printStackTrace();
        }
    }
    
    @Override
    public void process(WatchedEvent event) {
        String path = event.getPath();
        System.out.println("受影響路徑:" + path);
        
        KeeperState keeperState = event.getState();
        EventType eventType = event.getType();
        
        if (KeeperState.SyncConnected == keeperState) {
            if (EventType.None == eventType) {
                // 成功建立連接
                System.out.println("event:成功建立與zk服務(wù)器的連接");
                // 連接建立完成,通知程序往下執(zhí)行
                latch.countDown();
            }
            
            if (EventType.NodeChildrenChanged == eventType) {
                // 子節(jié)點(diǎn)發(fā)生變化
                System.out.println("event:子節(jié)點(diǎn)發(fā)生變化");
            }
            
            if (EventType.NodeCreated == eventType) {
                // 節(jié)點(diǎn)被創(chuàng)建
                System.out.println("event:節(jié)點(diǎn)被創(chuàng)建");
            }
            
            if (EventType.NodeDataChanged == eventType) {
                // 節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化
                System.out.println("event:節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化");
            }
            
            if (EventType.NodeDeleted == eventType) {
                // 節(jié)點(diǎn)被刪除
                System.out.println("event:節(jié)點(diǎn)被刪除");
            }
        }
        
        if (KeeperState.Disconnected == keeperState) {
            System.out.println("斷開與zk的連接");
        }
        
        if (KeeperState.AuthFailed == keeperState) {
            System.out.println("認(rèn)證失敗");
        }
        
        if (KeeperState.Expired == keeperState) {
            System.out.println("會(huì)話超時(shí)");
        }
    }
    
    /**
     * 創(chuàng)建節(jié)點(diǎn)
     * @param path
     * @param data
     * @param needWatch
     */
    public void createNode(String path, String data, boolean needWatch) {
        try {
            Stat stat = this.zk.exists(path, needWatch);
            if (stat == null) {
                String res = this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                System.out.println("節(jié)點(diǎn)創(chuàng)建完成" + res);
            } else {
                System.out.println(path + " already exists..[" + stat + "]");
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 創(chuàng)建臨時(shí)節(jié)點(diǎn),會(huì)話結(jié)束后自動(dòng)刪除
     * @param path
     * @param data
     */
    public void createTempNode(String path, String data, boolean needWatch) {
        try {
            this.zk.exists(path, needWatch);
            this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 刪除指定節(jié)點(diǎn)
     * @param path
     */
    public void deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 讀取指定節(jié)點(diǎn)的數(shù)據(jù)
     * @param path
     * @param needWatch
     * @return
     */
    public String getData(String path, boolean needWatch) {
         try {
            String data = new String(this.zk.getData(path, needWatch, null));
            return data;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    
    /**
     * 更新指定節(jié)點(diǎn)的數(shù)據(jù)
     * @param path
     * @param data
     */
    public void writeData(String path, String data) {
        try {
            this.zk.setData(path, data.getBytes(), -1);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 判斷指定節(jié)點(diǎn)是否存在
     * @param path 節(jié)點(diǎn)路徑
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 獲取子節(jié)點(diǎn)
     * @param path 節(jié)點(diǎn)路徑
     */
    public List<String> getChildren(String path, boolean needWatch) {
        try {
            return this.zk.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    
    /**
     * 關(guān)閉連接
     */
    public void closeConnection() {
        if (zk != null) {
            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        ZookeeperWatcher zw = new ZookeeperWatcher();
        
        // 觸發(fā)NodeCreated事件
        zw.createNode("/super", "super", true);
        
        // 觸發(fā)NodeChildrenChanged事件
        List<String> childList = zw.getChildren("/super", true);
        System.out.println(childList);
        zw.createNode("/super/sub", "sub", true);
        
        // 因?yàn)閣atch事件是一次性的,所以需要再次注冊(cè)
        zw.getData("/super", true);
        // 觸發(fā)NodeDataChanged事件
        zw.writeData("/super", "newsuper");
        
        // 再次注冊(cè)wacth事件
        zw.getData("/super/sub", true);
        // 觸發(fā)NodeDeleted事件
        zw.deleteNode("/super/sub");
        
        Thread.sleep(10000);
        
        
        
        /*
        // 關(guān)閉連接,臨時(shí)節(jié)點(diǎn)自動(dòng)被刪除
        ZookeeperWatcher zw = new ZookeeperWatcher();
        zw.createTempNode("/temp", "temp", true);
        zw.exists("/temp", true);
        zw.closeConnection();
        */
        
    }
}

當(dāng)對(duì)目錄節(jié)點(diǎn)監(jiān)控狀態(tài)打開時(shí),一旦目錄節(jié)點(diǎn)的狀態(tài)發(fā)生變化,Watcher 對(duì)象的 process 方法就會(huì)被調(diào)用。

Watch觸發(fā)器

watch概述

ZooKeeper可以為所有的讀操作設(shè)置watch,這些讀操作包括:exists()、getChildren()及getData()。watch事件是一次性的觸發(fā)器,當(dāng)watch的對(duì)象狀態(tài)發(fā)生改變時(shí),將會(huì)觸發(fā)此對(duì)象上watch所對(duì)應(yīng)的事件。watch事件將被異步地發(fā)送給客戶端,并且ZooKeeper為watch機(jī)制提供了有序的一致性保證。理論上,客戶端接收watch事件的時(shí)間要快于其看到watch對(duì)象狀態(tài)變化的時(shí)間。

watch注冊(cè)與觸發(fā)

image.png

① exists操作上的watch,在被監(jiān)視的Znode創(chuàng)建、刪除或數(shù)據(jù)更新時(shí)被觸發(fā)。

② getData操作上的watch,在被監(jiān)視的Znode刪除或數(shù)據(jù)更新時(shí)被觸發(fā)。在被創(chuàng)建時(shí)不能被觸發(fā),因?yàn)橹挥衂node一定存在,getData操作才會(huì)成功。

③ getChildren操作上的watch,在被監(jiān)視的Znode的子節(jié)點(diǎn)創(chuàng)建或刪除,或是這個(gè)Znode自身被刪除時(shí)被觸發(fā)。可以通過查看watch事件類型來區(qū)分是Znode,還是他的子節(jié)點(diǎn)被刪除:NodeDelete表示Znode被刪除,NodeDeletedChanged表示子節(jié)點(diǎn)被刪除。

Watch由客戶端所連接的ZooKeeper服務(wù)器在本地維護(hù),因此watch可以非常容易地設(shè)置、管理和分派。當(dāng)客戶端連接到一個(gè)新的服務(wù)器時(shí),任何的會(huì)話事件都將可能觸發(fā)watch。另外,當(dāng)從服務(wù)器斷開連接的時(shí)候,watch將不會(huì)被接收。但是,當(dāng)一個(gè)客戶端重新建立連接的時(shí)候,任何先前注冊(cè)過的watch都會(huì)被重新注冊(cè)。

需要注意的幾點(diǎn)

Zookeeper的watch實(shí)際上要處理兩類事件:

① 連接狀態(tài)事件(type=None, path=null)

這類事件不需要注冊(cè),也不需要我們連續(xù)觸發(fā),我們只要處理就行了。

② 節(jié)點(diǎn)事件

節(jié)點(diǎn)的建立,刪除,數(shù)據(jù)的修改。它是one time trigger,我們需要不停的注冊(cè)觸發(fā),還可能發(fā)生事件丟失的情況。

上面2類事件都在Watch中處理,也就是重載的process(Event event)方法。

節(jié)點(diǎn)事件的觸發(fā),通過函數(shù)exists,getData或getChildren來處理這類函數(shù),有雙重作用:

① 注冊(cè)觸發(fā)事件

② 函數(shù)本身的功能

函數(shù)的本身的功能又可以用異步的回調(diào)函數(shù)來實(shí)現(xiàn),重載processResult()過程中處理函數(shù)本身的功能。

ZooKeeper 會(huì)話超時(shí)

1、會(huì)話概述

在ZooKeeper中,客戶端和服務(wù)端建立連接后,會(huì)話隨之建立,生成一個(gè)全局唯一的會(huì)話ID(Session ID)。服務(wù)器和客戶端之間維持的是一個(gè)長連接,在SESSION_TIMEOUT時(shí)間內(nèi),服務(wù)器會(huì)確定客戶端是否正常連接 (客戶端會(huì)定時(shí)向服務(wù)器發(fā)送heart_beat,服務(wù)器重置下次SESSION_TIMEOUT時(shí)間)。因此,在正常情況下,Session一直有效,并且ZK集群所有機(jī)器上都保存這個(gè)Session信息。在出現(xiàn)網(wǎng)絡(luò)或其它問題情況下(例如客戶端所連接的那臺(tái)ZK機(jī)器掛了,或是其它原因的網(wǎng)絡(luò)閃斷),客戶端與當(dāng)前連接的那臺(tái)服務(wù)器之間連接斷了,這個(gè)時(shí)候客戶端會(huì)主動(dòng)在地址列表(實(shí)例化ZK對(duì)象的時(shí)候傳入構(gòu)造方法的那個(gè)參數(shù)connectString)中選擇新的地址進(jìn)行連接。

2、連接斷開

好了,上面基本就是服務(wù)器與客戶端之間維持會(huì)話的過程了。在這個(gè)過程中,用戶可能會(huì)看到兩類異常CONNECTIONLOSS (連接斷開) 和 SESSIONEXPIRED (Session過期)。連接斷開(CONNECTIONLOSS)一般發(fā)生在網(wǎng)絡(luò)的閃斷或是客戶端所連接的服務(wù)器掛機(jī)的時(shí)候,這種情況下,ZooKeeper客戶端自己會(huì)首先感知到這個(gè)異常,具體邏輯是在如下方法中觸發(fā)的:一種場景是Server服務(wù)器掛了,這個(gè)時(shí)候,ZK客戶端首先會(huì)捕獲異常。

所以,現(xiàn)在對(duì)于“連接斷開”這個(gè)過程就一目了然了,核心流程如下:ZK客戶端捕獲“連接斷開”異常 ——> 獲取一個(gè)新的ZK地址 ——> 嘗試連接,
在這個(gè)流程中,我們可以發(fā)現(xiàn),整個(gè)過程不需要開發(fā)者額外的程序介入,都是ZK客戶端自己會(huì)進(jìn)行的,并且,使用的會(huì)話ID都是同一個(gè),所以結(jié)論就是:發(fā)生CONNECTIONLOSS的情況,應(yīng)用不需要做什么事情,等待ZK客戶端建立新的連接即可。

3、會(huì)話超時(shí)

SESSIONEXPIRED發(fā)生在上面藍(lán)色文字部分,這個(gè)通常是ZK客戶端與服務(wù)器的連接斷了,試圖連接上新的ZK機(jī)器,但是這個(gè)過程如果耗時(shí)過長,超過了SESSION_TIMEOUT 后還沒有成功連接上服務(wù)器,那么服務(wù)器認(rèn)為這個(gè)Session已經(jīng)結(jié)束了(服務(wù)器無法確認(rèn)是因?yàn)槠渌惓T蜻€是客戶端主動(dòng)結(jié)束會(huì)話),由于在ZK中,很多數(shù)據(jù)和狀態(tài)都是和會(huì)話綁定的,一旦會(huì)話失效,那么ZK就開始清除和這個(gè)會(huì)話有關(guān)的信息,包括這個(gè)會(huì)話創(chuàng)建的臨時(shí)節(jié)點(diǎn)和注冊(cè)的所有Watcher。在這之后,由于網(wǎng)絡(luò)恢復(fù)后,客戶端可能會(huì)重新連接上服務(wù)器,但是很不幸,服務(wù)器會(huì)告訴客戶端一個(gè)異常:SESSIONEXPIRED(會(huì)話過期)。此時(shí)客戶端的狀態(tài)變成 CLOSED狀態(tài),應(yīng)用要做的事情就是的看自己應(yīng)用的復(fù)雜程序了,要重新實(shí)例zookeeper對(duì)象,然后重新操作所有臨時(shí)數(shù)據(jù)(包括臨時(shí)節(jié)點(diǎn)和注冊(cè)Watcher),總之,會(huì)話超時(shí)在ZK使用過程中是真實(shí)存在的。

所以這里也簡單總結(jié)下,一旦發(fā)生會(huì)話超時(shí),那么存儲(chǔ)在ZK上的所有臨時(shí)數(shù)據(jù)與注冊(cè)的訂閱者都會(huì)被移除,此時(shí)需要重新創(chuàng)建一個(gè)ZooKeeper客戶端實(shí)例,需要自己編碼做一些額外的處理。

4、會(huì)話時(shí)間(Session Time)

在《ZooKeeper API 使用》一文中已經(jīng)提到,在實(shí)例化一個(gè)ZK客戶端的時(shí)候,需要設(shè)置一個(gè)會(huì)話的超時(shí)時(shí)間。這里需要注意的一點(diǎn)是,客戶端并不是可以隨意設(shè)置這個(gè)會(huì)話超時(shí)時(shí)間,在ZK服務(wù)器端對(duì)會(huì)話超時(shí)時(shí)間是有限制的,主要是minSessionTimeout和maxSessionTimeout這兩個(gè)參數(shù)設(shè)置的。(詳細(xì)查看這個(gè)文章《ZooKeeper管理員指南》)Session超時(shí)時(shí)間限制,如果客戶端設(shè)置的超時(shí)時(shí)間不在這個(gè)范圍,那么會(huì)被強(qiáng)制設(shè)置為最大或最小時(shí)間。 默認(rèn)的Session超時(shí)時(shí)間是在2 * tickTime ~ 20 * tickTime。所以,如果應(yīng)用對(duì)于這個(gè)會(huì)話超時(shí)時(shí)間有特殊的需求的話,一定要和ZK管理員溝通好,確認(rèn)好服務(wù)端是否設(shè)置了對(duì)會(huì)話時(shí)間的限制。

ZooKeeper 典型的應(yīng)用場景

Zookeeper 從設(shè)計(jì)模式角度來看,是一個(gè)基于觀察者設(shè)計(jì)模式的分布式服務(wù)管理框架,它負(fù)責(zé)存儲(chǔ)和管理大家都關(guān)心的數(shù)據(jù),然后接受觀察者的注冊(cè),一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper 就將負(fù)責(zé)通知已經(jīng)在 Zookeeper 上注冊(cè)的那些觀察者做出相應(yīng)的反應(yīng),從而實(shí)現(xiàn)集群中類似 Master/Slave 管理模式,關(guān)于 Zookeeper 的詳細(xì)架構(gòu)等內(nèi)部細(xì)節(jié)可以閱讀 Zookeeper 的源碼。

下面詳細(xì)介紹這些典型的應(yīng)用場景,也就是 Zookeeper 到底能幫我們解決那些問題?下面將給出答案。

統(tǒng)一命名服務(wù)(Name Service)

分布式應(yīng)用中,通常需要有一套完整的命名規(guī)則,既能夠產(chǎn)生唯一的名稱又便于人識(shí)別和記住,通常情況下用樹形的名稱結(jié)構(gòu)是一個(gè)理想的選擇,樹形的名稱結(jié)構(gòu)是一 個(gè)有層次的目錄結(jié)構(gòu),既對(duì)人友好又不會(huì)重復(fù)。說到這里你可能想到了 JNDI,沒錯(cuò) Zookeeper 的 Name Service 與 JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結(jié)構(gòu)關(guān)聯(lián)到一定資源上,但是 Zookeeper 的 Name Service 更加是廣泛意義上的關(guān)聯(lián),也許你并不需要將名稱關(guān)聯(lián)到特定資源上,你可能只需要一個(gè)不會(huì)重復(fù)名稱,就像數(shù)據(jù)庫中產(chǎn)生一個(gè)唯一的數(shù)字主鍵一樣。

Name Service 已經(jīng)是 Zookeeper 內(nèi)置的功能,你只要調(diào)用 Zookeeper 的 API 就能實(shí)現(xiàn)。如調(diào)用 create 接口就可以很容易創(chuàng)建一個(gè)目錄節(jié)點(diǎn)。

配置管理(Configuration Management)

配置的管理在分布式應(yīng)用環(huán)境中很常見,例如同一個(gè)應(yīng)用系統(tǒng)需要多臺(tái) PC Server 運(yùn)行,但是它們運(yùn)行的應(yīng)用系統(tǒng)的某些配置項(xiàng)是相同的,如果要修改這些相同的配置項(xiàng),那么就必須同時(shí)修改每臺(tái)運(yùn)行這個(gè)應(yīng)用系統(tǒng)的 PC Server,這樣非常麻煩而且容易出錯(cuò)。

像 這樣的配置信息完全可以交給 Zookeeper 來管理,將配置信息保存在 Zookeeper 的某個(gè)目錄節(jié)點(diǎn)中,然后將所有需要修改的應(yīng)用機(jī)器監(jiān)控配置信息的狀態(tài),一旦配置信息發(fā)生變化,每臺(tái)應(yīng)用機(jī)器就會(huì)收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應(yīng)用到系統(tǒng)中。

image.png

集群管理(Group Membership)

Zookeeper 能夠很容易的實(shí)現(xiàn)集群管理的功能,如有多臺(tái) Server 組成一個(gè)服務(wù)集群,那么必須要一個(gè)“總管”知道當(dāng)前集群中每臺(tái)機(jī)器的服務(wù)狀態(tài),一旦有機(jī)器不能提供服務(wù),集群中其它集群必須知道,從而做出調(diào)整重新分配服務(wù)策略。同樣當(dāng)增加集群的服務(wù)能力時(shí),就會(huì)增加一臺(tái)或多臺(tái) Server,同樣也必須讓“總管”知道。

Zookeeper 不僅能夠幫你維護(hù)當(dāng)前的集群中機(jī)器的服務(wù)狀態(tài),而且能夠幫你選出一個(gè)“總管”,讓這個(gè)總管來管理集群,這就是 Zookeeper 的另一個(gè)功能 Leader Election。

它們的實(shí)現(xiàn)方式都是在 Zookeeper 上創(chuàng)建一個(gè) EPHEMERAL 類型的目錄節(jié)點(diǎn),然后每個(gè) Server 在它們創(chuàng)建目錄節(jié)點(diǎn)的父目錄節(jié)點(diǎn)上調(diào)用 getChildren(String path, boolean watch) 方法并設(shè)置 watch 為 true,由于是 EPHEMERAL 目錄節(jié)點(diǎn),當(dāng)創(chuàng)建它的 Server 死去,這個(gè)目錄節(jié)點(diǎn)也隨之被刪除,所以 Children 將會(huì)變化,這時(shí) getChildren上的 Watch 將會(huì)被調(diào)用,所以其它 Server 就知道已經(jīng)有某臺(tái) Server 死去了。新增 Server 也是同樣的原理。

Zookeeper 如何實(shí)現(xiàn) Leader Election,也就是選出一個(gè) Master Server。和前面的一樣,每臺(tái) Server 創(chuàng)建一個(gè) EPHEMERAL 目錄節(jié)點(diǎn),不同的是它還是一個(gè) SEQUENTIAL 目錄節(jié)點(diǎn),所以它是個(gè) EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn)。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn),是因?yàn)槲覀兛梢越o每臺(tái) Server 編號(hào),我們可以選擇當(dāng)前是最小編號(hào)的 Server 為 Master,假如這個(gè)最小編號(hào)的 Server 死去,由于是 EPHEMERAL 節(jié)點(diǎn),死去的 Server 對(duì)應(yīng)的節(jié)點(diǎn)也被刪除,所以當(dāng)前的節(jié)點(diǎn)列表中又出現(xiàn)一個(gè)最小編號(hào)的節(jié)點(diǎn),我們就選擇這個(gè)節(jié)點(diǎn)為當(dāng)前 Master。這樣就實(shí)現(xiàn)了動(dòng)態(tài)選擇 Master,避免了傳統(tǒng)意義上單 Master 容易出現(xiàn)單點(diǎn)故障的問題。

image.png

共享鎖(Locks)

共享鎖在同一個(gè)進(jìn)程中很容易實(shí)現(xiàn),但是在跨進(jìn)程或者在不同 Server 之間就不好實(shí)現(xiàn)了。Zookeeper 卻很容易實(shí)現(xiàn)這個(gè)功能,實(shí)現(xiàn)方式也是需要獲得鎖的 Server 創(chuàng)建一個(gè) EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn),然后調(diào)用 getChildren方法獲取當(dāng)前的目錄節(jié)點(diǎn)列表中最小的目錄節(jié)點(diǎn)是不是就是自己創(chuàng)建的目錄節(jié)點(diǎn),如果正是自己創(chuàng)建的,那么它就獲得了這個(gè)鎖,如果不是那么它就調(diào)用 exists(String path, boolean watch) 方法并監(jiān)控 Zookeeper 上目錄節(jié)點(diǎn)列表的變化,一直到自己創(chuàng)建的節(jié)點(diǎn)是列表中最小編號(hào)的目錄節(jié)點(diǎn),從而獲得鎖,釋放鎖很簡單,只要?jiǎng)h除前面它自己所創(chuàng)建的目錄節(jié)點(diǎn)就行了。

image.png

同步鎖的關(guān)鍵代碼:

void getLock() throws KeeperException, InterruptedException{
    List<String> list = zk.getChildren(root, false); 
    String[] nodes = list.toArray(new String[list.size()]); 
    Arrays.sort(nodes); 
    if(myZnode.equals(root+"/"+nodes[0])){ 
        doAction(); 
    } 
    else{ 
        waitForLock(nodes[0]); 
    } 
} 
void waitForLock(String lower) throws InterruptedException, KeeperException {
    Stat stat = zk.exists(root + "/" + lower,true); 
    if(stat != null){ 
        mutex.wait(); 
    } 
    else{ 
        getLock(); 
    } 
}

隊(duì)列管理

Zookeeper 可以處理兩種類型的隊(duì)列:

  1. 當(dāng)一個(gè)隊(duì)列的成員都聚齊時(shí),這個(gè)隊(duì)列才可用,否則一直等待所有成員到達(dá),這種是同步隊(duì)列。

  2. 隊(duì)列按照 FIFO 方式進(jìn)行入隊(duì)和出隊(duì)操作,例如實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型。

同步隊(duì)列用 Zookeeper 實(shí)現(xiàn)的實(shí)現(xiàn)思路如下:

創(chuàng)建一個(gè)父目錄 /synchronizing,每個(gè)成員都監(jiān)控標(biāo)志位目錄 /synchronizing/start 是否存在,然后每個(gè)成員都加入這個(gè)隊(duì)列,加入隊(duì)列的方式就是創(chuàng)建 /synchronizing/member_i 的臨時(shí)目錄節(jié)點(diǎn),然后每個(gè)成員獲取 /synchronizing 目錄的所有目錄節(jié)點(diǎn),也就是 member_i。判斷 i 的值是否已經(jīng)是成員的個(gè)數(shù),如果小于成員個(gè)數(shù)等待 /synchronizing/start 的出現(xiàn),如果已經(jīng)相等就創(chuàng)建 /synchronizing/start。

用下面的流程圖更容易理解:

image.png

同步隊(duì)列的關(guān)鍵代碼如下:

void addQueue() throws KeeperException, InterruptedException{ 
    zk.exists(root + "/start",true); 
    zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, 
    CreateMode.EPHEMERAL_SEQUENTIAL); 
    synchronized (mutex) { 
        List<String> list = zk.getChildren(root, false); 
        if (list.size() < size) { 
            mutex.wait(); 
        } else { 
            zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT); 
        } 
     } 
 }

當(dāng)隊(duì)列沒滿是進(jìn)入 wait(),然后會(huì)一直等待 Watch 的通知,Watch 的代碼如下:

public void process(WatchedEvent event) { 
    if(event.getPath().equals(root + "/start") &&
     event.getType() == Event.EventType.NodeCreated){ 
        System.out.println("得到通知"); 
        super.process(event); 
        doAction(); 
    } 
}

FIFO 隊(duì)列用 Zookeeper 實(shí)現(xiàn)思路如下:

實(shí)現(xiàn)的思路也非常簡單,就是在特定的目錄下創(chuàng)建 SEQUENTIAL 類型的子目錄 /queue_i,這樣就能保證所有成員加入隊(duì)列時(shí)都是有編號(hào)的,出隊(duì)列時(shí)通過 getChildren( ) 方法可以返回當(dāng)前所有的隊(duì)列中的元素,然后消費(fèi)其中最小的一個(gè),這樣就能保證 FIFO。

下面是生產(chǎn)者和消費(fèi)者這種隊(duì)列形式的示例代碼:

生產(chǎn)者代碼:

 boolean produce(int i) throws KeeperException, InterruptedException{ 
    ByteBuffer b = ByteBuffer.allocate(4); 
    byte[] value; 
    b.putInt(i); 
    value = b.array(); 
    zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                CreateMode.PERSISTENT_SEQUENTIAL); 
    return true; 
}

消費(fèi)者代碼:

int consume() throws KeeperException, InterruptedException{ 
    int retvalue = -1; 
    Stat stat = null; 
    while (true) { 
        synchronized (mutex) { 
            List<String> list = zk.getChildren(root, true); 
            if (list.size() == 0) { 
                mutex.wait(); 
            } else { 
                Integer min = new Integer(list.get(0).substring(7)); 
                for(String s : list){ 
                    Integer tempValue = new Integer(s.substring(7)); 
                    if(tempValue < min) min = tempValue; 
                } 
                byte[] b = zk.getData(root + "/element" + min,false, stat); 
                zk.delete(root + "/element" + min, 0); 
                ByteBuffer buffer = ByteBuffer.wrap(b); 
                retvalue = buffer.getInt(); 
                return retvalue; 
            } 
        } 
    } 
 }

總結(jié)

Zookeeper 作為 Hadoop 項(xiàng)目中的一個(gè)子項(xiàng)目,是 Hadoop 集群管理的一個(gè)必不可少的模塊,它主要用來控制集群中的數(shù)據(jù),如它管理 Hadoop 集群中的 NameNode,還有 Hbase 中 Master Election、Server 之間狀態(tài)同步等。

本文介紹的 Zookeeper 的基本知識(shí),以及介紹了幾個(gè)典型的應(yīng)用場景。這些都是 Zookeeper 的基本功能,最重要的是 Zoopkeeper 提供了一套很好的分布式集群管理的機(jī)制,就是它這種基于層次型的目錄樹的數(shù)據(jù)結(jié)構(gòu),并對(duì)樹中的節(jié)點(diǎn)進(jìn)行有效管理,從而可以設(shè)計(jì)出多種多樣的分布式的數(shù)據(jù)管理模型,而不僅僅局限于上面提到的幾個(gè)常用應(yīng)用場景。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容